a love letter to tangled (android, iOS, and a search API)
1package tapclient
2
3import (
4 "context"
5 "encoding/base64"
6 "encoding/json"
7 "fmt"
8 "log/slog"
9 "math/rand/v2"
10 "net/http"
11 "os"
12 "strconv"
13 "strings"
14 "sync"
15 "time"
16
17 "github.com/coder/websocket"
18 "tangled.org/desertthunder.dev/twister/internal/normalize"
19)
20
21const (
22 minReconnectBackoff = 500 * time.Millisecond
23 maxReconnectBackoff = 10 * time.Second
24 keepAliveInterval = 2 * time.Minute
25 keepAliveTimeout = 20 * time.Second
26 maxReadMessageBytes = 8 << 20
27)
28
29// Client receives Tap events over WebSocket and sends acks after processing.
30type Client struct {
31 url string
32 password string
33 log *slog.Logger
34
35 mu sync.Mutex
36 conn *websocket.Conn
37 ackAsJSON bool
38 disableAcks bool
39 lastActivity time.Time
40}
41
42func New(url, password string, log *slog.Logger) *Client {
43 if log == nil {
44 log = slog.Default()
45 }
46 disableAcks, _ := strconv.ParseBool(strings.TrimSpace(os.Getenv("TAP_DISABLE_ACKS")))
47 return &Client{
48 url: url,
49 password: password,
50 log: log,
51 ackAsJSON: true,
52 disableAcks: disableAcks,
53 lastActivity: time.Now(),
54 }
55}
56
57func (c *Client) ReadEvent(ctx context.Context) (normalize.TapRecordEvent, error) {
58 for {
59 conn, err := c.ensureConnected(ctx)
60 if err != nil {
61 return normalize.TapRecordEvent{}, err
62 }
63
64 _, data, err := conn.Read(ctx)
65 if err != nil {
66 c.log.Warn("tap read failed", slog.String("error", err.Error()))
67 c.resetConn(websocket.StatusInternalError, "read failed")
68 if ctx.Err() != nil {
69 return normalize.TapRecordEvent{}, ctx.Err()
70 }
71 continue
72 }
73
74 c.markActivity()
75
76 var event normalize.TapRecordEvent
77 if err := json.Unmarshal(data, &event); err != nil {
78 c.log.Warn("tap decode failed", slog.String("error", err.Error()))
79 continue
80 }
81
82 return event, nil
83 }
84}
85
86func (c *Client) AckEvent(ctx context.Context, id int64) error {
87 if c.disableAcks {
88 return nil
89 }
90
91 conn, err := c.ensureConnected(ctx)
92 if err != nil {
93 return err
94 }
95
96 c.mu.Lock()
97 ackAsJSON := c.ackAsJSON
98 c.mu.Unlock()
99
100 if ackAsJSON {
101 payload, _ := json.Marshal(map[string]int64{"id": id})
102 if err := conn.Write(ctx, websocket.MessageText, payload); err == nil {
103 c.markActivity()
104 return nil
105 } else if isConnectionWriteError(err) {
106 c.resetConn(websocket.StatusInternalError, "ack json write failed")
107 return fmt.Errorf("ack event %d: %w", id, err)
108 } else if !isAckFormatError(err) {
109 return fmt.Errorf("ack event %d: %w", id, err)
110 }
111
112 c.log.Warn("tap ack json failed; trying plain id", slog.Int64("event_id", id))
113 plain := []byte(strconv.FormatInt(id, 10))
114 if err := conn.Write(ctx, websocket.MessageText, plain); err != nil {
115 c.resetConn(websocket.StatusInternalError, "ack failed")
116 return fmt.Errorf("ack event %d: %w", id, err)
117 }
118 c.markActivity()
119
120 c.mu.Lock()
121 c.ackAsJSON = false
122 c.mu.Unlock()
123 return nil
124 }
125
126 if err := conn.Write(ctx, websocket.MessageText, []byte(strconv.FormatInt(id, 10))); err != nil {
127 c.resetConn(websocket.StatusInternalError, "ack failed")
128 return fmt.Errorf("ack event %d: %w", id, err)
129 }
130 c.markActivity()
131 return nil
132}
133
134func (c *Client) Close() error {
135 c.mu.Lock()
136 defer c.mu.Unlock()
137 if c.conn == nil {
138 return nil
139 }
140 err := c.conn.Close(websocket.StatusNormalClosure, "shutdown")
141 c.conn = nil
142 return err
143}
144
145func (c *Client) ensureConnected(ctx context.Context) (*websocket.Conn, error) {
146 c.mu.Lock()
147 if c.conn != nil {
148 conn := c.conn
149 c.mu.Unlock()
150 return conn, nil
151 }
152 c.mu.Unlock()
153
154 backoff := minReconnectBackoff
155 for {
156 if ctx.Err() != nil {
157 return nil, ctx.Err()
158 }
159
160 h := http.Header{}
161 if c.password != "" {
162 token := base64.StdEncoding.EncodeToString([]byte("admin:" + c.password))
163 h.Set("Authorization", "Basic "+token)
164 }
165
166 conn, _, err := websocket.Dial(ctx, c.url, &websocket.DialOptions{HTTPHeader: h})
167 if err == nil {
168 conn.SetReadLimit(maxReadMessageBytes)
169 c.mu.Lock()
170 if c.conn == nil {
171 c.conn = conn
172 c.lastActivity = time.Now()
173 c.startKeepAlive(conn)
174 } else {
175 _ = conn.Close(websocket.StatusNormalClosure, "duplicate")
176 }
177 existing := c.conn
178 c.mu.Unlock()
179 c.log.Info("tap connected")
180 return existing, nil
181 }
182
183 c.log.Warn("tap connect failed", slog.String("error", err.Error()), slog.Duration("retry_in", backoff))
184
185 jitter := time.Duration(rand.Int64N(int64(backoff / 2)))
186 wait := backoff + jitter
187 select {
188 case <-ctx.Done():
189 return nil, ctx.Err()
190 case <-time.After(wait):
191 }
192
193 backoff *= 2
194 if backoff > maxReconnectBackoff {
195 backoff = maxReconnectBackoff
196 }
197 }
198}
199
200func (c *Client) markActivity() {
201 c.mu.Lock()
202 c.lastActivity = time.Now()
203 c.mu.Unlock()
204}
205
206func (c *Client) resetConn(status websocket.StatusCode, reason string) {
207 c.mu.Lock()
208 defer c.mu.Unlock()
209 if c.conn == nil {
210 return
211 }
212 _ = c.conn.Close(status, reason)
213 c.conn = nil
214}
215
216func (c *Client) startKeepAlive(conn *websocket.Conn) {
217 go func() {
218 ticker := time.NewTicker(keepAliveInterval)
219 defer ticker.Stop()
220
221 for range ticker.C {
222 c.mu.Lock()
223 if c.conn != conn {
224 c.mu.Unlock()
225 return
226 }
227 if time.Since(c.lastActivity) < keepAliveInterval {
228 c.mu.Unlock()
229 continue
230 }
231 c.mu.Unlock()
232
233 ctx, cancel := context.WithTimeout(context.Background(), keepAliveTimeout)
234 err := conn.Ping(ctx)
235 cancel()
236 if err != nil {
237 c.log.Warn("tap keepalive ping failed", slog.String("error", err.Error()))
238 c.resetConn(websocket.StatusInternalError, "keepalive failed")
239 return
240 }
241 }
242 }()
243}
244
245func isConnectionWriteError(err error) bool {
246 if err == nil {
247 return false
248 }
249 msg := strings.ToLower(err.Error())
250 return strings.Contains(msg, "broken pipe") ||
251 strings.Contains(msg, "connection reset") ||
252 strings.Contains(msg, "closed network connection") ||
253 strings.Contains(msg, "i/o timeout")
254}
255
256func isAckFormatError(err error) bool {
257 if err == nil {
258 return false
259 }
260 msg := strings.ToLower(err.Error())
261 return strings.Contains(msg, "invalid") ||
262 strings.Contains(msg, "unsupported") ||
263 strings.Contains(msg, "bad payload") ||
264 strings.Contains(msg, "unexpected message")
265}