a love letter to tangled (android, iOS, and a search API)
at main 265 lines 6.1 kB view raw
1package tapclient 2 3import ( 4 "context" 5 "encoding/base64" 6 "encoding/json" 7 "fmt" 8 "log/slog" 9 "math/rand/v2" 10 "net/http" 11 "os" 12 "strconv" 13 "strings" 14 "sync" 15 "time" 16 17 "github.com/coder/websocket" 18 "tangled.org/desertthunder.dev/twister/internal/normalize" 19) 20 21const ( 22 minReconnectBackoff = 500 * time.Millisecond 23 maxReconnectBackoff = 10 * time.Second 24 keepAliveInterval = 2 * time.Minute 25 keepAliveTimeout = 20 * time.Second 26 maxReadMessageBytes = 8 << 20 27) 28 29// Client receives Tap events over WebSocket and sends acks after processing. 30type Client struct { 31 url string 32 password string 33 log *slog.Logger 34 35 mu sync.Mutex 36 conn *websocket.Conn 37 ackAsJSON bool 38 disableAcks bool 39 lastActivity time.Time 40} 41 42func New(url, password string, log *slog.Logger) *Client { 43 if log == nil { 44 log = slog.Default() 45 } 46 disableAcks, _ := strconv.ParseBool(strings.TrimSpace(os.Getenv("TAP_DISABLE_ACKS"))) 47 return &Client{ 48 url: url, 49 password: password, 50 log: log, 51 ackAsJSON: true, 52 disableAcks: disableAcks, 53 lastActivity: time.Now(), 54 } 55} 56 57func (c *Client) ReadEvent(ctx context.Context) (normalize.TapRecordEvent, error) { 58 for { 59 conn, err := c.ensureConnected(ctx) 60 if err != nil { 61 return normalize.TapRecordEvent{}, err 62 } 63 64 _, data, err := conn.Read(ctx) 65 if err != nil { 66 c.log.Warn("tap read failed", slog.String("error", err.Error())) 67 c.resetConn(websocket.StatusInternalError, "read failed") 68 if ctx.Err() != nil { 69 return normalize.TapRecordEvent{}, ctx.Err() 70 } 71 continue 72 } 73 74 c.markActivity() 75 76 var event normalize.TapRecordEvent 77 if err := json.Unmarshal(data, &event); err != nil { 78 c.log.Warn("tap decode failed", slog.String("error", err.Error())) 79 continue 80 } 81 82 return event, nil 83 } 84} 85 86func (c *Client) AckEvent(ctx context.Context, id int64) error { 87 if c.disableAcks { 88 return nil 89 } 90 91 conn, err := c.ensureConnected(ctx) 92 if err != nil { 93 return err 94 } 95 96 c.mu.Lock() 97 ackAsJSON := c.ackAsJSON 98 c.mu.Unlock() 99 100 if ackAsJSON { 101 payload, _ := json.Marshal(map[string]int64{"id": id}) 102 if err := conn.Write(ctx, websocket.MessageText, payload); err == nil { 103 c.markActivity() 104 return nil 105 } else if isConnectionWriteError(err) { 106 c.resetConn(websocket.StatusInternalError, "ack json write failed") 107 return fmt.Errorf("ack event %d: %w", id, err) 108 } else if !isAckFormatError(err) { 109 return fmt.Errorf("ack event %d: %w", id, err) 110 } 111 112 c.log.Warn("tap ack json failed; trying plain id", slog.Int64("event_id", id)) 113 plain := []byte(strconv.FormatInt(id, 10)) 114 if err := conn.Write(ctx, websocket.MessageText, plain); err != nil { 115 c.resetConn(websocket.StatusInternalError, "ack failed") 116 return fmt.Errorf("ack event %d: %w", id, err) 117 } 118 c.markActivity() 119 120 c.mu.Lock() 121 c.ackAsJSON = false 122 c.mu.Unlock() 123 return nil 124 } 125 126 if err := conn.Write(ctx, websocket.MessageText, []byte(strconv.FormatInt(id, 10))); err != nil { 127 c.resetConn(websocket.StatusInternalError, "ack failed") 128 return fmt.Errorf("ack event %d: %w", id, err) 129 } 130 c.markActivity() 131 return nil 132} 133 134func (c *Client) Close() error { 135 c.mu.Lock() 136 defer c.mu.Unlock() 137 if c.conn == nil { 138 return nil 139 } 140 err := c.conn.Close(websocket.StatusNormalClosure, "shutdown") 141 c.conn = nil 142 return err 143} 144 145func (c *Client) ensureConnected(ctx context.Context) (*websocket.Conn, error) { 146 c.mu.Lock() 147 if c.conn != nil { 148 conn := c.conn 149 c.mu.Unlock() 150 return conn, nil 151 } 152 c.mu.Unlock() 153 154 backoff := minReconnectBackoff 155 for { 156 if ctx.Err() != nil { 157 return nil, ctx.Err() 158 } 159 160 h := http.Header{} 161 if c.password != "" { 162 token := base64.StdEncoding.EncodeToString([]byte("admin:" + c.password)) 163 h.Set("Authorization", "Basic "+token) 164 } 165 166 conn, _, err := websocket.Dial(ctx, c.url, &websocket.DialOptions{HTTPHeader: h}) 167 if err == nil { 168 conn.SetReadLimit(maxReadMessageBytes) 169 c.mu.Lock() 170 if c.conn == nil { 171 c.conn = conn 172 c.lastActivity = time.Now() 173 c.startKeepAlive(conn) 174 } else { 175 _ = conn.Close(websocket.StatusNormalClosure, "duplicate") 176 } 177 existing := c.conn 178 c.mu.Unlock() 179 c.log.Info("tap connected") 180 return existing, nil 181 } 182 183 c.log.Warn("tap connect failed", slog.String("error", err.Error()), slog.Duration("retry_in", backoff)) 184 185 jitter := time.Duration(rand.Int64N(int64(backoff / 2))) 186 wait := backoff + jitter 187 select { 188 case <-ctx.Done(): 189 return nil, ctx.Err() 190 case <-time.After(wait): 191 } 192 193 backoff *= 2 194 if backoff > maxReconnectBackoff { 195 backoff = maxReconnectBackoff 196 } 197 } 198} 199 200func (c *Client) markActivity() { 201 c.mu.Lock() 202 c.lastActivity = time.Now() 203 c.mu.Unlock() 204} 205 206func (c *Client) resetConn(status websocket.StatusCode, reason string) { 207 c.mu.Lock() 208 defer c.mu.Unlock() 209 if c.conn == nil { 210 return 211 } 212 _ = c.conn.Close(status, reason) 213 c.conn = nil 214} 215 216func (c *Client) startKeepAlive(conn *websocket.Conn) { 217 go func() { 218 ticker := time.NewTicker(keepAliveInterval) 219 defer ticker.Stop() 220 221 for range ticker.C { 222 c.mu.Lock() 223 if c.conn != conn { 224 c.mu.Unlock() 225 return 226 } 227 if time.Since(c.lastActivity) < keepAliveInterval { 228 c.mu.Unlock() 229 continue 230 } 231 c.mu.Unlock() 232 233 ctx, cancel := context.WithTimeout(context.Background(), keepAliveTimeout) 234 err := conn.Ping(ctx) 235 cancel() 236 if err != nil { 237 c.log.Warn("tap keepalive ping failed", slog.String("error", err.Error())) 238 c.resetConn(websocket.StatusInternalError, "keepalive failed") 239 return 240 } 241 } 242 }() 243} 244 245func isConnectionWriteError(err error) bool { 246 if err == nil { 247 return false 248 } 249 msg := strings.ToLower(err.Error()) 250 return strings.Contains(msg, "broken pipe") || 251 strings.Contains(msg, "connection reset") || 252 strings.Contains(msg, "closed network connection") || 253 strings.Contains(msg, "i/o timeout") 254} 255 256func isAckFormatError(err error) bool { 257 if err == nil { 258 return false 259 } 260 msg := strings.ToLower(err.Error()) 261 return strings.Contains(msg, "invalid") || 262 strings.Contains(msg, "unsupported") || 263 strings.Contains(msg, "bad payload") || 264 strings.Contains(msg, "unexpected message") 265}