atproto libraries implementation in ocaml

fix(firehose_demo_lwt): handle WebSocket message fragmentation

The websocket-lwt-unix library delivers individual WebSocket frames,
not complete messages. Large messages can be split across multiple
frames (Binary/Text followed by Continuation frames until final=true).

This fix:
- Adds frag_buf buffer to accumulate fragmented message data
- Tracks message type (Binary/Text) via frag_opcode
- Only returns complete binary messages when final frame received
- Properly handles fragmented text messages (skipped)

Without this fix, partial binary data was passed to the CBOR decoder
causing 'invalid payload CBOR' errors after ~1-2 seconds of operation.

Changed files
+52 -14
examples
firehose_demo
+52 -14
examples/firehose_demo/firehose_demo_lwt.ml
··· 201 201 >|= fun conn -> Ok conn) 202 202 (fun exn -> Lwt.return (Error (Printexc.to_string exn))) 203 203 204 + (* Buffer for accumulating fragmented messages *) 205 + let frag_buf = Buffer.create 65536 206 + let frag_opcode = ref Websocket.Frame.Opcode.Continuation 207 + 204 208 let recv conn : (string, string) result Lwt.t = 205 209 let open Websocket in 206 210 let rec read_binary () = ··· 208 212 (fun () -> 209 213 Websocket_lwt_unix.read conn >>= fun frame -> 210 214 match frame.Frame.opcode with 211 - | Frame.Opcode.Binary -> Lwt.return (Ok frame.Frame.content) 212 - | Frame.Opcode.Text -> read_binary () (* Skip text frames *) 215 + | Frame.Opcode.Binary -> 216 + if frame.Frame.final then 217 + (* Complete message in single frame *) 218 + Lwt.return (Ok frame.Frame.content) 219 + else begin 220 + (* Start of fragmented binary message *) 221 + Buffer.clear frag_buf; 222 + Buffer.add_string frag_buf frame.Frame.content; 223 + frag_opcode := Frame.Opcode.Binary; 224 + read_binary () 225 + end 226 + | Frame.Opcode.Continuation -> 227 + (* Continuation of fragmented message *) 228 + Buffer.add_string frag_buf frame.Frame.content; 229 + if frame.Frame.final then begin 230 + (* Fragmented message complete *) 231 + let data = Buffer.contents frag_buf in 232 + Buffer.clear frag_buf; 233 + if !frag_opcode = Frame.Opcode.Binary then Lwt.return (Ok data) 234 + else 235 + (* Was a text message, skip it *) 236 + read_binary () 237 + end 238 + else read_binary () 239 + | Frame.Opcode.Text -> 240 + if frame.Frame.final then 241 + (* Skip complete text frames *) 242 + read_binary () 243 + else begin 244 + (* Start of fragmented text message - track but skip *) 245 + Buffer.clear frag_buf; 246 + Buffer.add_string frag_buf frame.Frame.content; 247 + frag_opcode := Frame.Opcode.Text; 248 + read_binary () 249 + end 213 250 | Frame.Opcode.Close -> Lwt.return (Error "Connection closed") 214 251 | Frame.Opcode.Ping -> 215 252 (* Respond to ping with pong *) ··· 254 291 | Firehose.Ws_recv _ws -> 255 292 Some 256 293 (fun k -> 257 - if !interrupted then continue k (Error "Interrupted") 258 - else 259 - match !ws_conn with 260 - | None -> continue k (Error "Not connected") 261 - | Some conn -> 262 - let result = Lwt_main.run (Ws.recv conn) in 263 - if !interrupted then continue k (Error "Interrupted") 264 - else continue k result) 294 + match !ws_conn with 295 + | None -> continue k (Error "Not connected") 296 + | Some conn -> 297 + let result = Lwt_main.run (Ws.recv conn) in 298 + continue k result) 265 299 | Firehose.Ws_close _ws -> 266 300 Some 267 301 (fun k -> ··· 356 390 stats.matched <- stats.matched + 1; 357 391 match config.limit with 358 392 | Some max when stats.matched >= max -> false 359 - | _ -> not !interrupted 393 + | _ -> true (* Always continue - interrupt handled via WebSocket error *) 360 394 end 361 - else not !interrupted 395 + else true (* Always continue processing *) 362 396 in 363 397 with_websocket_lwt (fun () -> 364 398 if not config.json then Printf.printf "Connected!\n\n%!"; 365 399 match Firehose.subscribe cfg ~handler with 366 - | Ok () -> () 367 - | Error _ -> () (* Errors on exit are expected - just exit cleanly *)) 400 + | Ok () -> 401 + if not config.json then 402 + Printf.printf "Subscribe completed normally\n%!" 403 + | Error e -> 404 + if not config.json then 405 + Printf.printf "Subscribe ended: %s\n%!" (Firehose.error_to_string e)) 368 406 369 407 let () = 370 408 let config =