atproto libraries implementation in ocaml

Refactor firehose demo to use library's subscribe function

Major architectural change:
- Use Firehose.subscribe from the library instead of manual event loop
- Move WebSocket implementation into Ws_handler module as effect handler
- Demo now focuses on filtering/formatting, not protocol details

The demo provides effect handlers for:
- Firehose.Ws_connect: TLS connection with WebSocket upgrade
- Firehose.Ws_recv: Frame parsing with fragmentation support
- Firehose.Ws_close: Connection cleanup

This separates concerns:
- Library: protocol logic (subscribe, decode_frame, event types)
- Demo: presentation (filtering, formatting, statistics)
- Ws_handler: I/O implementation (TLS, WebSocket framing)

Line count: 795 (down from 1204, -34%)

Changed files
+325 -248
examples
firehose_demo
+325 -248
examples/firehose_demo/firehose_demo.ml
··· 12 12 module Dag_cbor = Atproto_ipld.Dag_cbor 13 13 module Cid = Atproto_ipld.Cid 14 14 15 - (* Record content types *) 15 + (* ============================================================================ 16 + Record Content Extraction 17 + ============================================================================ *) 18 + 16 19 type record_content = 17 20 | Post of { 18 21 text : string; ··· 26 29 | Profile of { display_name : string option; description : string option } 27 30 | Unknown of string 28 31 29 - (* CBOR helpers extending the library *) 30 32 let get_string_array key pairs = 31 33 Firehose.get_array key pairs 32 34 |> Option.map ··· 118 120 | Some cid -> List.assoc_opt (Cid.to_string cid) records 119 121 | None -> None 120 122 121 - (* Formatting helpers *) 123 + (* ============================================================================ 124 + Formatting 125 + ============================================================================ *) 126 + 122 127 let truncate n s = 123 128 if String.length s <= n then s else String.sub s 0 (n - 3) ^ "..." 124 129 ··· 134 139 135 140 let opt_or d = Option.value ~default:d 136 141 137 - (* Text formatting *) 138 142 let format_record = function 139 143 | Post { text; reply_to; _ } -> 140 144 Printf.sprintf "\027[36m\"%s\"\027[0m%s" ··· 274 278 | Firehose.StreamError err -> 275 279 Printf.sprintf "{\"type\":\"error\",\"message\":%s}" (json_str err) 276 280 277 - (* Statistics *) 281 + (* ============================================================================ 282 + Statistics 283 + ============================================================================ *) 284 + 278 285 type stats = { 279 286 mutable commits : int; 280 287 mutable identities : int; ··· 333 340 elapsed 334 341 (if elapsed > 0. then float_of_int total /. elapsed else 0.) 335 342 336 - (* WebSocket implementation *) 337 - let ws_nonce () = 338 - let b = Bytes.create 16 in 339 - for i = 0 to 15 do 340 - Bytes.set b i (Char.chr (Random.int 256)) 341 - done; 342 - Base64.encode_exn (Bytes.to_string b) 343 - 344 - let parse_ws_header buf off len = 345 - if len < 2 then None 346 - else 347 - let b0, b1 = 348 - (Char.code (Bytes.get buf off), Char.code (Bytes.get buf (off + 1))) 349 - in 350 - let fin, opcode = (b0 land 0x80 <> 0, b0 land 0x0f) in 351 - let masked, plen = (b1 land 0x80 <> 0, b1 land 0x7f) in 352 - let hlen, plen = 353 - if plen = 126 then 354 - if len < 4 then (0, -1) 355 - else 356 - ( 4, 357 - (Char.code (Bytes.get buf (off + 2)) lsl 8) 358 - lor Char.code (Bytes.get buf (off + 3)) ) 359 - else if plen = 127 then 360 - if len < 10 then (0, -1) 361 - else 362 - ( 10, 363 - (Char.code (Bytes.get buf (off + 6)) lsl 24) 364 - lor (Char.code (Bytes.get buf (off + 7)) lsl 16) 365 - lor (Char.code (Bytes.get buf (off + 8)) lsl 8) 366 - lor Char.code (Bytes.get buf (off + 9)) ) 367 - else (2, plen) 368 - in 369 - if plen < 0 then None 370 - else 371 - let mlen = if masked then 4 else 0 in 372 - if len < hlen + mlen then None 373 - else 374 - Some 375 - ( fin, 376 - opcode, 377 - plen, 378 - (if masked then Some (Bytes.sub buf (off + hlen) 4) else None), 379 - hlen + mlen ) 380 - 381 - let ws_frame opcode payload = 382 - let plen = String.length payload in 383 - let mask = Bytes.init 4 (fun _ -> Char.chr (Random.int 256)) in 384 - let hdr = 385 - if plen < 126 then ( 386 - let h = Bytes.create 6 in 387 - Bytes.set h 0 (Char.chr (0x80 lor opcode)); 388 - Bytes.set h 1 (Char.chr (0x80 lor plen)); 389 - Bytes.blit mask 0 h 2 4; 390 - Bytes.to_string h) 391 - else if plen < 65536 then ( 392 - let h = Bytes.create 8 in 393 - Bytes.set h 0 (Char.chr (0x80 lor opcode)); 394 - Bytes.set h 1 (Char.chr (0x80 lor 126)); 395 - Bytes.set h 2 (Char.chr ((plen lsr 8) land 0xff)); 396 - Bytes.set h 3 (Char.chr (plen land 0xff)); 397 - Bytes.blit mask 0 h 4 4; 398 - Bytes.to_string h) 399 - else 400 - let h = Bytes.create 14 in 401 - Bytes.set h 0 (Char.chr (0x80 lor opcode)); 402 - Bytes.set h 1 (Char.chr (0x80 lor 127)); 403 - for i = 2 to 5 do 404 - Bytes.set h i '\000' 405 - done; 406 - Bytes.set h 6 (Char.chr ((plen lsr 24) land 0xff)); 407 - Bytes.set h 7 (Char.chr ((plen lsr 16) land 0xff)); 408 - Bytes.set h 8 (Char.chr ((plen lsr 8) land 0xff)); 409 - Bytes.set h 9 (Char.chr (plen land 0xff)); 410 - Bytes.blit mask 0 h 10 4; 411 - Bytes.to_string h 412 - in 413 - let masked = 414 - Bytes.mapi 415 - (fun i c -> 416 - Char.chr (Char.code c lxor Char.code (Bytes.get mask (i mod 4)))) 417 - (Bytes.of_string payload) 418 - in 419 - hdr ^ Bytes.to_string masked 343 + (* ============================================================================ 344 + Event Filtering 345 + ============================================================================ *) 420 346 421 - (* Event filtering *) 422 347 type filter = 423 348 | Posts 424 349 | Likes ··· 466 391 | Tombstones -> "tombstones" 467 392 468 393 let prefix_match prefix path = 469 - let n = String.length prefix in 470 - String.length path >= n && String.sub path 0 n = prefix 394 + String.length path >= String.length prefix 395 + && String.sub path 0 (String.length prefix) = prefix 471 396 472 397 let op_matches filter (op : Firehose.operation) = 473 398 match filter with ··· 497 422 | Firehose.Tombstone _ -> List.mem Tombstones filters 498 423 | Firehose.Info _ | Firehose.StreamError _ -> true) 499 424 500 - (* Firehose connection *) 501 - let run_firehose ~net ~filters ~json ~rich ?cursor ?limit () = 502 - let host, port = ("bsky.network", 443) in 503 - let resource = 504 - "/xrpc/com.atproto.sync.subscribeRepos" 505 - ^ (cursor |> Option.map (Printf.sprintf "?cursor=%Ld") |> opt_or "") 506 - in 425 + (* ============================================================================ 426 + WebSocket Effect Handler (Eio-based implementation) 427 + ============================================================================ *) 507 428 508 - if not json then Printf.printf "Connecting to wss://%s%s...\n%!" host resource; 429 + module Ws_handler = struct 430 + type conn = { 431 + socket : Tls_eio.t; 432 + buf : bytes; 433 + mutable buf_len : int; 434 + frag_buf : Buffer.t; 435 + mutable frag_opcode : int; 436 + } 437 + (** WebSocket connection state *) 438 + 439 + let ws_nonce () = 440 + let b = Bytes.create 16 in 441 + for i = 0 to 15 do 442 + Bytes.set b i (Char.chr (Random.int 256)) 443 + done; 444 + Base64.encode_exn (Bytes.to_string b) 445 + 446 + let parse_frame_header buf off len = 447 + if len < 2 then None 448 + else 449 + let b0, b1 = 450 + (Char.code (Bytes.get buf off), Char.code (Bytes.get buf (off + 1))) 451 + in 452 + let fin, opcode = (b0 land 0x80 <> 0, b0 land 0x0f) in 453 + let masked, plen = (b1 land 0x80 <> 0, b1 land 0x7f) in 454 + let hlen, plen = 455 + if plen = 126 then 456 + if len < 4 then (0, -1) 457 + else 458 + ( 4, 459 + (Char.code (Bytes.get buf (off + 2)) lsl 8) 460 + lor Char.code (Bytes.get buf (off + 3)) ) 461 + else if plen = 127 then 462 + if len < 10 then (0, -1) 463 + else 464 + ( 10, 465 + (Char.code (Bytes.get buf (off + 6)) lsl 24) 466 + lor (Char.code (Bytes.get buf (off + 7)) lsl 16) 467 + lor (Char.code (Bytes.get buf (off + 8)) lsl 8) 468 + lor Char.code (Bytes.get buf (off + 9)) ) 469 + else (2, plen) 470 + in 471 + if plen < 0 then None 472 + else 473 + let mlen = if masked then 4 else 0 in 474 + if len < hlen + mlen then None 475 + else 476 + Some 477 + ( fin, 478 + opcode, 479 + plen, 480 + (if masked then Some (Bytes.sub buf (off + hlen) 4) else None), 481 + hlen + mlen ) 509 482 510 - let auth = 511 - match Ca_certs_nss.authenticator () with 512 - | Ok a -> a 513 - | Error (`Msg m) -> failwith ("CA certs: " ^ m) 514 - in 515 - let tls = 516 - match 517 - Tls.Config.client ~authenticator:auth ~alpn_protocols:[ "http/1.1" ] () 518 - with 519 - | Ok c -> c 520 - | Error (`Msg m) -> failwith ("TLS config: " ^ m) 521 - in 522 - let hostname = 523 - match Domain_name.of_string host with 524 - | Error _ -> None 525 - | Ok dn -> ( 526 - match Domain_name.host dn with Ok h -> Some h | Error _ -> None) 527 - in 483 + let make_frame opcode payload = 484 + let plen = String.length payload in 485 + let mask = Bytes.init 4 (fun _ -> Char.chr (Random.int 256)) in 486 + let hdr = 487 + if plen < 126 then ( 488 + let h = Bytes.create 6 in 489 + Bytes.set h 0 (Char.chr (0x80 lor opcode)); 490 + Bytes.set h 1 (Char.chr (0x80 lor plen)); 491 + Bytes.blit mask 0 h 2 4; 492 + Bytes.to_string h) 493 + else if plen < 65536 then ( 494 + let h = Bytes.create 8 in 495 + Bytes.set h 0 (Char.chr (0x80 lor opcode)); 496 + Bytes.set h 1 (Char.chr (0x80 lor 126)); 497 + Bytes.set h 2 (Char.chr ((plen lsr 8) land 0xff)); 498 + Bytes.set h 3 (Char.chr (plen land 0xff)); 499 + Bytes.blit mask 0 h 4 4; 500 + Bytes.to_string h) 501 + else 502 + let h = Bytes.create 14 in 503 + Bytes.set h 0 (Char.chr (0x80 lor opcode)); 504 + Bytes.set h 1 (Char.chr (0x80 lor 127)); 505 + for i = 2 to 5 do 506 + Bytes.set h i '\000' 507 + done; 508 + Bytes.set h 6 (Char.chr ((plen lsr 24) land 0xff)); 509 + Bytes.set h 7 (Char.chr ((plen lsr 16) land 0xff)); 510 + Bytes.set h 8 (Char.chr ((plen lsr 8) land 0xff)); 511 + Bytes.set h 9 (Char.chr (plen land 0xff)); 512 + Bytes.blit mask 0 h 10 4; 513 + Bytes.to_string h 514 + in 515 + let masked = 516 + Bytes.mapi 517 + (fun i c -> 518 + Char.chr (Char.code c lxor Char.code (Bytes.get mask (i mod 4)))) 519 + (Bytes.of_string payload) 520 + in 521 + hdr ^ Bytes.to_string masked 528 522 529 - let stats, count = (create_stats (), ref 0) in 530 - let addr = 531 - match Eio.Net.getaddrinfo_stream net host ~service:(string_of_int port) with 532 - | [] -> failwith ("DNS failed: " ^ host) 533 - | a :: _ -> a 534 - in 523 + let connect ~net ~sw uri = 524 + let host = Uri.host uri |> Option.value ~default:"localhost" in 525 + let port = Uri.port uri |> Option.value ~default:443 in 526 + let path = Uri.path_and_query uri in 535 527 536 - if not json then Printf.printf "Resolved, connecting...\n%!"; 528 + (* TLS setup *) 529 + let auth = 530 + match Ca_certs_nss.authenticator () with 531 + | Ok a -> a 532 + | Error (`Msg m) -> failwith ("CA certs: " ^ m) 533 + in 534 + let tls_config = 535 + match 536 + Tls.Config.client ~authenticator:auth ~alpn_protocols:[ "http/1.1" ] () 537 + with 538 + | Ok c -> c 539 + | Error (`Msg m) -> failwith ("TLS config: " ^ m) 540 + in 541 + let hostname = 542 + match Domain_name.of_string host with 543 + | Error _ -> None 544 + | Ok dn -> ( 545 + match Domain_name.host dn with Ok h -> Some h | Error _ -> None) 546 + in 537 547 538 - Eio.Switch.run @@ fun sw -> 539 - let sock = Eio.Net.connect ~sw net addr in 540 - if not json then Printf.printf "TCP connected, TLS handshake...\n%!"; 541 - let tls_sock = Tls_eio.client_of_flow tls ?host:hostname sock in 542 - if not json then Printf.printf "TLS done, WebSocket upgrade...\n%!"; 548 + (* Connect *) 549 + let addr = 550 + match 551 + Eio.Net.getaddrinfo_stream net host ~service:(string_of_int port) 552 + with 553 + | [] -> failwith ("DNS failed: " ^ host) 554 + | a :: _ -> a 555 + in 556 + let socket = Eio.Net.connect ~sw net addr in 557 + let tls_socket = Tls_eio.client_of_flow tls_config ?host:hostname socket in 543 558 544 - let nonce = ws_nonce () in 545 - Eio.Flow.copy_string 546 - (Printf.sprintf 547 - "GET %s HTTP/1.1\r\n\ 548 - Host: %s\r\n\ 549 - Upgrade: websocket\r\n\ 550 - Connection: Upgrade\r\n\ 551 - Sec-WebSocket-Key: %s\r\n\ 552 - Sec-WebSocket-Version: 13\r\n\ 553 - \r\n" 554 - resource host nonce) 555 - tls_sock; 559 + (* WebSocket handshake *) 560 + let nonce = ws_nonce () in 561 + Eio.Flow.copy_string 562 + (Printf.sprintf 563 + "GET %s HTTP/1.1\r\n\ 564 + Host: %s\r\n\ 565 + Upgrade: websocket\r\n\ 566 + Connection: Upgrade\r\n\ 567 + Sec-WebSocket-Key: %s\r\n\ 568 + Sec-WebSocket-Version: 13\r\n\ 569 + \r\n" 570 + path host nonce) 571 + tls_socket; 556 572 557 - let resp_buf = Cstruct.create 4096 in 558 - let n = Eio.Flow.single_read tls_sock resp_buf in 559 - let resp = Cstruct.to_string (Cstruct.sub resp_buf 0 n) in 560 - if not (String.length resp >= 12 && String.sub resp 0 12 = "HTTP/1.1 101") 561 - then 562 - failwith 563 - ("WebSocket upgrade failed: " 564 - ^ String.sub resp 0 (min 50 (String.length resp))); 573 + let resp_buf = Cstruct.create 4096 in 574 + let n = Eio.Flow.single_read tls_socket resp_buf in 575 + let resp = Cstruct.to_string (Cstruct.sub resp_buf 0 n) in 576 + if not (String.length resp >= 12 && String.sub resp 0 12 = "HTTP/1.1 101") 577 + then 578 + failwith 579 + ("WebSocket upgrade failed: " 580 + ^ String.sub resp 0 (min 50 (String.length resp))); 565 581 566 - if not json then Printf.printf "Connected! (Ctrl+C to stop)\n\n%!"; 582 + { 583 + socket = tls_socket; 584 + buf = Bytes.create 1048576; 585 + buf_len = 0; 586 + frag_buf = Buffer.create 65536; 587 + frag_opcode = 0; 588 + } 567 589 568 - let buf, buf_len = (Bytes.create 1048576, ref 0) in 569 - let frag_buf, frag_op = (Buffer.create 65536, ref 0) in 570 - let read_more () = 590 + let read_more conn = 571 591 let cs = Cstruct.create 65536 in 572 - let n = Eio.Flow.single_read tls_sock cs in 573 - Cstruct.blit_to_bytes cs 0 buf !buf_len n; 574 - buf_len := !buf_len + n 575 - in 576 - let stop = ref false in 592 + let n = Eio.Flow.single_read conn.socket cs in 593 + Cstruct.blit_to_bytes cs 0 conn.buf conn.buf_len n; 594 + conn.buf_len <- conn.buf_len + n 577 595 578 - let fmt = if json then format_event_json ~rich else format_event ~rich in 579 - let process evt = 580 - if event_matches filters evt then begin 581 - print_endline (fmt evt); 582 - update_stats stats evt; 583 - incr count; 584 - match limit with 585 - | Some m when !count >= m -> 586 - if not json then ( 587 - Printf.printf "\nLimit reached (%d)\n%!" m; 588 - print_stats stats); 589 - stop := true 590 - | _ -> () 591 - end 592 - else update_stats stats evt 593 - in 596 + let recv conn = 597 + (* Read until we have a complete message *) 598 + let rec read_frame () = 599 + if conn.buf_len < 2 then ( 600 + read_more conn; 601 + read_frame ()) 602 + else 603 + match parse_frame_header conn.buf 0 conn.buf_len with 604 + | None -> 605 + read_more conn; 606 + read_frame () 607 + | Some (fin, opcode, plen, mask, hlen) -> ( 608 + let total = hlen + plen in 609 + while conn.buf_len < total do 610 + read_more conn 611 + done; 612 + let payload = Bytes.sub conn.buf hlen plen in 613 + (* Unmask *) 614 + (match mask with 615 + | Some k -> 616 + for i = 0 to plen - 1 do 617 + Bytes.set payload i 618 + (Char.chr 619 + (Char.code (Bytes.get payload i) 620 + lxor Char.code (Bytes.get k (i mod 4)))) 621 + done 622 + | None -> ()); 623 + (* Shift buffer *) 624 + let rem = conn.buf_len - total in 625 + if rem > 0 then Bytes.blit conn.buf total conn.buf 0 rem; 626 + conn.buf_len <- rem; 627 + (* Handle opcode *) 628 + match opcode with 629 + | 0x0 (* continuation *) -> 630 + Buffer.add_bytes conn.frag_buf payload; 631 + if fin then ( 632 + let data = Buffer.contents conn.frag_buf in 633 + Buffer.clear conn.frag_buf; 634 + if conn.frag_opcode = 0x2 then Ok data else read_frame ()) 635 + else read_frame () 636 + | 0x1 (* text *) -> 637 + if not fin then ( 638 + Buffer.clear conn.frag_buf; 639 + Buffer.add_bytes conn.frag_buf payload; 640 + conn.frag_opcode <- 0x1); 641 + read_frame () 642 + | 0x2 (* binary *) -> 643 + if fin then Ok (Bytes.to_string payload) 644 + else ( 645 + Buffer.clear conn.frag_buf; 646 + Buffer.add_bytes conn.frag_buf payload; 647 + conn.frag_opcode <- 0x2; 648 + read_frame ()) 649 + | 0x8 (* close *) -> Error "Connection closed by server" 650 + | 0x9 (* ping *) -> 651 + Eio.Flow.copy_string 652 + (make_frame 0xA (Bytes.to_string payload)) 653 + conn.socket; 654 + read_frame () 655 + | 0xA (* pong *) -> read_frame () 656 + | _ -> read_frame ()) 657 + in 658 + try read_frame () with exn -> Error (Printexc.to_string exn) 594 659 595 - while not !stop do 596 - if !buf_len < 2 then read_more (); 597 - match parse_ws_header buf 0 !buf_len with 598 - | None -> read_more () 599 - | Some (fin, opcode, plen, mask, hlen) -> ( 600 - let total = hlen + plen in 601 - while !buf_len < total do 602 - read_more () 603 - done; 604 - let payload = Bytes.sub buf hlen plen in 605 - (match mask with 606 - | Some k -> 607 - for i = 0 to plen - 1 do 608 - Bytes.set payload i 609 - (Char.chr 610 - (Char.code (Bytes.get payload i) 611 - lxor Char.code (Bytes.get k (i mod 4)))) 612 - done 613 - | None -> ()); 614 - let rem = !buf_len - total in 615 - if rem > 0 then Bytes.blit buf total buf 0 rem; 616 - buf_len := rem; 617 - match opcode with 618 - | 0x0 -> 619 - Buffer.add_bytes frag_buf payload; 620 - if fin then ( 621 - let d = Buffer.contents frag_buf in 622 - Buffer.clear frag_buf; 623 - if !frag_op = 0x2 then 624 - match Firehose.decode_frame d with 625 - | Ok e -> process e 626 - | Error e -> 627 - Printf.eprintf "\027[31mDecode: %s\027[0m\n%!" 628 - (Firehose.error_to_string e); 629 - stats.errors <- stats.errors + 1) 630 - | 0x1 -> 631 - if not fin then ( 632 - Buffer.clear frag_buf; 633 - Buffer.add_bytes frag_buf payload; 634 - frag_op := 0x1) 635 - | 0x2 -> 636 - if fin then ( 637 - match Firehose.decode_frame (Bytes.to_string payload) with 638 - | Ok e -> process e 639 - | Error e -> 640 - Printf.eprintf "\027[31mDecode: %s\027[0m\n%!" 641 - (Firehose.error_to_string e); 642 - stats.errors <- stats.errors + 1) 643 - else ( 644 - Buffer.clear frag_buf; 645 - Buffer.add_bytes frag_buf payload; 646 - frag_op := 0x2) 647 - | 0x8 -> 648 - if not json then ( 649 - Printf.printf "\nServer closed\n%!"; 650 - print_stats stats); 651 - stop := true 652 - | 0x9 -> 653 - Eio.Flow.copy_string 654 - (ws_frame 0xA (Bytes.to_string payload)) 655 - tls_sock 656 - | 0xA -> () 657 - | _ -> if not json then Printf.printf "Unknown opcode: 0x%x\n%!" opcode) 658 - done 660 + let close _conn = () (* TLS socket closed by switch *) 661 + end 662 + 663 + (** Run a computation with WebSocket effect handlers *) 664 + let with_websocket ~net ~sw f = 665 + let open Effect.Deep in 666 + try_with f () 667 + { 668 + effc = 669 + (fun (type a) (eff : a Effect.t) -> 670 + match eff with 671 + | Firehose.Ws_connect uri -> 672 + Some 673 + (fun (k : (a, _) continuation) -> 674 + try 675 + let c = Ws_handler.connect ~net ~sw uri in 676 + continue k (Ok (Obj.magic c : Firehose.websocket)) 677 + with exn -> continue k (Error (Printexc.to_string exn))) 678 + | Firehose.Ws_recv ws -> 679 + Some 680 + (fun k -> 681 + let c : Ws_handler.conn = Obj.magic ws in 682 + continue k (Ws_handler.recv c)) 683 + | Firehose.Ws_close ws -> 684 + Some 685 + (fun k -> 686 + let c : Ws_handler.conn = Obj.magic ws in 687 + Ws_handler.close c; 688 + continue k ()) 689 + | _ -> None); 690 + } 691 + 692 + (* ============================================================================ 693 + Main 694 + ============================================================================ *) 659 695 660 - (* CLI *) 661 696 type config = { 662 697 cursor : int64 option; 663 698 limit : int option; ··· 698 733 rich = !rich; 699 734 } 700 735 736 + let run ~net ~sw config = 737 + let uri = 738 + Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos" 739 + in 740 + let firehose_config = Firehose.config ~uri ?cursor:config.cursor () in 741 + 742 + if not config.json then 743 + Printf.printf "Connecting to %s...\n%!" 744 + (Uri.to_string (Firehose.build_uri firehose_config)); 745 + 746 + let stats = create_stats () in 747 + let count = ref 0 in 748 + let stats_printed = ref false in 749 + let fmt = 750 + if config.json then format_event_json ~rich:config.rich 751 + else format_event ~rich:config.rich 752 + in 753 + 754 + let handler event = 755 + update_stats stats event; 756 + if event_matches config.filters event then begin 757 + print_endline (fmt event); 758 + incr count; 759 + match config.limit with 760 + | Some max when !count >= max -> 761 + if not config.json then ( 762 + Printf.printf "\nLimit reached (%d)\n%!" max; 763 + print_stats stats; 764 + stats_printed := true); 765 + false 766 + | _ -> true 767 + end 768 + else true 769 + in 770 + 771 + with_websocket ~net ~sw (fun () -> 772 + if not config.json then Printf.printf "Connected! (Ctrl+C to stop)\n\n%!"; 773 + match Firehose.subscribe firehose_config ~handler with 774 + | Ok () -> 775 + if (not config.json) && not !stats_printed then print_stats stats 776 + | Error e -> 777 + Printf.eprintf "\027[31mError: %s\027[0m\n%!" 778 + (Firehose.error_to_string e)) 779 + 701 780 let () = 702 - let c = parse_args () in 703 - if not c.json then begin 781 + let config = parse_args () in 782 + if not config.json then begin 704 783 Printf.printf 705 784 "\027[1mAT Protocol Firehose Demo\027[0m\n=========================\n\n%!"; 706 - if c.filters <> [] then 785 + if config.filters <> [] then 707 786 Printf.printf "Filtering: %s\n\n%!" 708 - (String.concat ", " (List.map filter_to_string c.filters)) 787 + (String.concat ", " (List.map filter_to_string config.filters)) 709 788 end; 710 789 Mirage_crypto_rng_unix.use_default (); 711 790 Random.self_init (); 712 791 Eio_main.run @@ fun env -> 713 - try 714 - run_firehose ~net:(Eio.Stdenv.net env) ~filters:c.filters ~json:c.json 715 - ~rich:c.rich ?cursor:c.cursor ?limit:c.limit () 716 - with 792 + Eio.Switch.run @@ fun sw -> 793 + try run ~net:(Eio.Stdenv.net env) ~sw config with 717 794 | Failure m -> Printf.eprintf "\027[31mError: %s\027[0m\n%!" m 718 795 | e -> Printf.eprintf "\027[31mError: %s\027[0m\n%!" (Printexc.to_string e)