atproto libraries implementation in ocaml

Add firehose demo example

Shows how to use the firehose module with OCaml 5 effects for
subscribing to AT Protocol real-time events (commits, identity
changes, etc.).

Changed files
+145
.beads
examples
firehose_demo
+2
.beads/issues.jsonl
··· 31 31 {"id":"atproto-60","title":"Implement effects-based I/O abstraction","description":"Implement the effects-based I/O abstraction layer that makes all libraries runtime-agnostic.","design":"## Module Structure\n\n```ocaml\n(* atproto-effects/lib/effects.ml *)\n\n(* HTTP effects *)\ntype http_request = {\n method_: [ `GET | `POST | `PUT | `DELETE ];\n uri: Uri.t;\n headers: (string * string) list;\n body: string option;\n}\n\ntype http_response = {\n status: int;\n headers: (string * string) list;\n body: string;\n}\n\ntype _ Effect.t +=\n | Http_request : http_request -\u003e http_response Effect.t\n\n(* DNS effects *)\ntype _ Effect.t +=\n | Dns_txt : string -\u003e string list Effect.t\n | Dns_a : string -\u003e string list Effect.t\n\n(* Time effects *)\ntype _ Effect.t +=\n | Now : Ptime.t Effect.t\n | Sleep : float -\u003e unit Effect.t\n\n(* Random effects *)\ntype _ Effect.t +=\n | Random_bytes : int -\u003e bytes Effect.t\n\n(* atproto-effects-eio/lib/handler.ml *)\nval run : (unit -\u003e 'a) -\u003e 'a\n```\n\n## Handler Example (eio)\n\n```ocaml\nlet run f =\n Effect.Deep.match_ f ()\n {\n retc = Fun.id;\n exnc = raise;\n effc = fun (type a) (e : a Effect.t) -\u003e\n match e with\n | Http_request req -\u003e\n Some (fun (k : (a, _) continuation) -\u003e\n let resp = Eio_client.request req in\n continue k resp)\n | Dns_txt domain -\u003e\n Some (fun k -\u003e\n let records = Eio_dns.txt domain in\n continue k records)\n | _ -\u003e None\n }\n```\n\n## Dependencies\n- eio (for testing handler)","acceptance_criteria":"- Effect types for HTTP, DNS, time, random\n- eio-based handler for testing\n- Handler composition utilities\n- Performance benchmarks","status":"closed","priority":1,"issue_type":"feature","assignee":"claude","created_at":"2025-12-28T00:12:29.021401617+01:00","updated_at":"2025-12-28T11:57:08.264086142+01:00","closed_at":"2025-12-28T11:57:08.264086142+01:00","labels":["effects","infrastructure"],"dependencies":[{"issue_id":"atproto-60","depends_on_id":"atproto-1","type":"parent-child","created_at":"2025-12-28T00:12:55.467983208+01:00","created_by":"daemon"}]} 32 32 {"id":"atproto-61","title":"Set up interoperability test suite","description":"Set up and run the AT Protocol interoperability tests from bluesky-social/atproto-interop-tests.","design":"## Test Structure\n\n```\ntest/\n├── interop/\n│ ├── syntax_test.ml # Handle, DID, NSID, TID, etc.\n│ ├── crypto_test.ml # Signatures, did:key\n│ ├── data_model_test.ml # DAG-CBOR, CID\n│ ├── mst_test.ml # Key heights, tree structure\n│ ├── lexicon_test.ml # Schema and record validation\n│ └── firehose_test.ml # Commit proofs\n├── fixtures/ # Cloned from atproto-interop-tests\n└── dune\n```\n\n## Test Approach\n\n1. Clone test vectors from GitHub\n2. Parse JSON fixtures using jsont\n3. Parse text fixtures line by line\n4. Run each test case\n5. Compare output to expected values\n\n## Example Test\n\n```ocaml\nlet load_json_fixtures path =\n let json = Jsont.of_file path in\n Jsont.decode (Jsont.list fixture_jsont) json\n\nlet%test \"handle_syntax_valid\" =\n let fixtures = load_lines \"fixtures/syntax/handle_syntax_valid.txt\" in\n List.for_all (fun line -\u003e\n match Handle.of_string line with\n | Ok _ -\u003e true\n | Error _ -\u003e false\n ) fixtures\n\nlet%test \"handle_syntax_invalid\" =\n let fixtures = load_lines \"fixtures/syntax/handle_syntax_invalid.txt\" in\n List.for_all (fun line -\u003e\n match Handle.of_string line with\n | Ok _ -\u003e false\n | Error _ -\u003e true\n ) fixtures\n\nlet%test \"crypto_signature_fixtures\" =\n let fixtures = load_json_fixtures \"fixtures/crypto/signature-fixtures.json\" in\n List.for_all (fun fixture -\u003e\n let message = Base64.decode fixture.message_base64 in\n let signature = Base64.decode fixture.signature_base64 in\n let key = Did_key.of_string fixture.public_key_did in\n let result = Crypto.verify key message signature in\n result = fixture.valid_signature\n ) fixtures\n```\n\n## Dependencies\n- alcotest or ounit2\n- jsont","acceptance_criteria":"- All syntax interop tests pass\n- All crypto interop tests pass\n- All data-model interop tests pass\n- All MST interop tests pass\n- All lexicon interop tests pass\n- All firehose interop tests pass","status":"closed","priority":1,"issue_type":"task","created_at":"2025-12-28T00:12:40.553908313+01:00","updated_at":"2025-12-28T13:25:34.614867702+01:00","closed_at":"2025-12-28T13:25:34.614867702+01:00","labels":["conformance","testing"],"dependencies":[{"issue_id":"atproto-61","depends_on_id":"atproto-1","type":"parent-child","created_at":"2025-12-28T00:12:56.180809368+01:00","created_by":"daemon"}]} 33 33 {"id":"atproto-62","title":"Set up monorepo package structure","description":"Set up the monorepo structure for multiple opam packages within a single repository.","design":"## Repository Structure\n\n```\natproto/\n├── dune-project # Root with all packages\n├── packages/\n│ ├── atproto-syntax/\n│ │ ├── lib/\n│ │ │ ├── dune\n│ │ │ └── *.ml\n│ │ ├── test/\n│ │ │ ├── dune\n│ │ │ └── *_test.ml\n│ │ └── atproto-syntax.opam\n│ ├── atproto-crypto/\n│ ├── atproto-multibase/\n│ ├── atproto-ipld/\n│ ├── atproto-mst/\n│ ├── atproto-repo/\n│ ├── atproto-identity/\n│ ├── atproto-xrpc/\n│ ├── atproto-sync/\n│ ├── atproto-lexicon/\n│ ├── atproto-lexicon-gen/\n│ ├── atproto-api/\n│ └── atproto-effects/\n├── examples/\n│ ├── simple_client/\n│ └── firehose_consumer/\n└── interop-tests/\n```\n\n## dune-project\n\n```lisp\n(lang dune 3.20)\n(name atproto)\n(generate_opam_files true)\n\n(package\n (name atproto-syntax)\n (synopsis \"AT Protocol identifier syntax parsing\")\n (depends\n (ocaml (\u003e= 5.4))\n re\n ptime))\n\n(package\n (name atproto-crypto)\n ...)\n```\n\n## CI (.github/workflows/ci.yml)\n\n- OCaml 5.4 matrix\n- Build all packages\n- Run all tests\n- Run interop tests","acceptance_criteria":"- Multi-package dune-project structure\n- Separate opam files per package\n- CI pipeline for building and testing\n- Documentation generation setup","status":"closed","priority":1,"issue_type":"task","created_at":"2025-12-28T00:12:50.547102438+01:00","updated_at":"2025-12-28T11:57:18.856810633+01:00","closed_at":"2025-12-28T11:57:18.856810633+01:00","labels":["infrastructure","setup"],"dependencies":[{"issue_id":"atproto-62","depends_on_id":"atproto-1","type":"parent-child","created_at":"2025-12-28T00:12:57.015938611+01:00","created_by":"daemon"}]} 34 + {"id":"atproto-h09","title":"Add package documentation","description":"Add documentation for each of the 11 AT Protocol packages including:\n- Module-level documentation with examples\n- README.md for the root project\n- CONTRIBUTING.md guide\n- API documentation generation with odoc","status":"open","priority":2,"issue_type":"task","created_at":"2025-12-28T13:34:10.559554696+01:00","updated_at":"2025-12-28T13:34:10.559554696+01:00","labels":["documentation"],"dependencies":[{"issue_id":"atproto-h09","depends_on_id":"atproto-1","type":"parent-child","created_at":"2025-12-28T13:34:16.081103184+01:00","created_by":"daemon"}]} 34 35 {"id":"atproto-pg8","title":"Add MST example_keys.txt fixture tests","description":"Add tests using the example_keys.txt fixture file which contains 156 structured MST keys.\n\nTests should:\n1. Load all 156 keys from the fixture\n2. Build an MST containing all keys\n3. Verify all keys are retrievable\n4. Verify iteration order matches sorted key order\n5. Optionally verify tree structure properties","acceptance_criteria":"- example_keys.txt is loaded and all 156 keys are used\n- MST is built with all keys\n- All keys are retrievable after insertion\n- Iteration produces keys in sorted order","status":"closed","priority":2,"issue_type":"task","created_at":"2025-12-28T12:12:19.180139823+01:00","updated_at":"2025-12-28T12:43:14.192342391+01:00","closed_at":"2025-12-28T12:43:14.192342391+01:00","labels":["conformance","mst","testing"]} 35 36 {"id":"atproto-q0h","title":"Add firehose commit-proof-fixtures.json tests","description":"Add tests for the commit-proof-fixtures.json file which contains 6 test cases for MST proof verification:\n\n1. two deep split\n2. two deep leafless split\n3. add on edge with neighbor two layers down\n4. merge and split in multi-op commit\n5. complex multi-op commit\n6. split with earlier leaves on same layer\n\nEach fixture includes:\n- keys (existing keys in MST)\n- adds (keys to add)\n- dels (keys to delete)\n- rootBeforeCommit / rootAfterCommit (expected CIDs)\n- blocksInProof (CIDs of blocks needed for proof)\n\nThis tests the commit proof verification needed for firehose sync.","acceptance_criteria":"- All 6 commit-proof fixtures are tested\n- MST operations (add/delete) produce correct root CIDs\n- Proof blocks are correctly identified\n- Tests verify rootBeforeCommit and rootAfterCommit match","status":"closed","priority":1,"issue_type":"task","created_at":"2025-12-28T12:12:34.999268893+01:00","updated_at":"2025-12-28T12:58:39.408679225+01:00","closed_at":"2025-12-28T12:58:39.408679225+01:00","labels":["conformance","firehose","testing"]} 36 37 {"id":"atproto-udz","title":"Add missing data-model conformance tests","description":"Add tests for data-model fixtures that are not currently covered:\n\n1. **data-model-valid.json** (5 entries) - Valid AT Protocol data model examples:\n - trivial record\n - float but integer-like (123.0)\n - empty list and object\n - list of nullable\n - list of lists\n\n2. **data-model-invalid.json** (12 entries) - Invalid examples that must be rejected:\n - top-level not an object\n - non-integer float\n - record with $type null/wrong type/empty\n - blob with string size/missing key\n - bytes with wrong field type/extra fields\n - link with wrong field type/bogus CID/extra fields","acceptance_criteria":"- test_data_model_valid() tests all 5 valid entries\n- test_data_model_invalid() tests all 12 invalid entries\n- Valid entries encode/decode correctly\n- Invalid entries are rejected with appropriate errors","status":"closed","priority":1,"issue_type":"task","created_at":"2025-12-28T12:12:14.579573063+01:00","updated_at":"2025-12-28T12:42:16.291981859+01:00","closed_at":"2025-12-28T12:42:16.291981859+01:00","labels":["conformance","ipld","testing"]} 38 + {"id":"atproto-w5i","title":"Create example applications","description":"Create example applications demonstrating the AT Protocol libraries:\n1. Simple client - authenticate and make posts\n2. Firehose consumer - subscribe to real-time events\n3. Bot example - automated posting/interactions","status":"in_progress","priority":2,"issue_type":"task","created_at":"2025-12-28T13:34:11.928213055+01:00","updated_at":"2025-12-28T13:34:24.611728558+01:00","labels":["documentation","examples"],"dependencies":[{"issue_id":"atproto-w5i","depends_on_id":"atproto-1","type":"parent-child","created_at":"2025-12-28T13:34:17.10878+01:00","created_by":"daemon"}]}
+5
examples/firehose_demo/dune
··· 1 + (executable 2 + (name firehose_demo) 3 + (public_name firehose_demo) 4 + (package atproto) 5 + (libraries atproto-sync uri))
+138
examples/firehose_demo/firehose_demo.ml
··· 1 + (** Firehose Demo - Subscribe to AT Protocol real-time events. 2 + 3 + This example demonstrates how to connect to the Bluesky firehose and process 4 + real-time events (posts, likes, follows, etc.). 5 + 6 + The firehose uses WebSockets with DAG-CBOR encoded messages. This example 7 + uses OCaml 5 effects for I/O. 8 + 9 + Usage: dune exec examples/firehose_demo/firehose_demo.exe 10 + 11 + Events shown: 12 + - #commit: Repository changes (new posts, likes, follows, etc.) 13 + - #identity: Handle/identity changes 14 + - #account: Account status changes 15 + - #handle: Handle updates 16 + - #tombstone: Repository deletions *) 17 + 18 + open Atproto_sync 19 + 20 + (** Format an operation for display *) 21 + let format_op (op : Firehose.operation) = 22 + let action = 23 + match op.action with 24 + | `Create -> "CREATE" 25 + | `Update -> "UPDATE" 26 + | `Delete -> "DELETE" 27 + in 28 + Printf.sprintf "%s %s" action op.path 29 + 30 + (** Format a commit event *) 31 + let format_commit (evt : Firehose.commit_event) = 32 + let ops_str = String.concat ", " (List.map format_op evt.ops) in 33 + Printf.sprintf "[COMMIT] seq=%Ld repo=%s rev=%s ops=[%s]" evt.seq evt.repo 34 + evt.rev ops_str 35 + 36 + (** Format an identity event *) 37 + let format_identity (evt : Firehose.identity_event) = 38 + let handle_str = 39 + match evt.handle with Some h -> h | None -> "(no handle)" 40 + in 41 + Printf.sprintf "[IDENTITY] seq=%Ld did=%s handle=%s" evt.seq evt.did 42 + handle_str 43 + 44 + (** Format an account event *) 45 + let format_account (evt : Firehose.account_event) = 46 + let status_str = 47 + match evt.status with Some s -> s | None -> "(no status)" 48 + in 49 + Printf.sprintf "[ACCOUNT] seq=%Ld did=%s active=%b status=%s" evt.seq evt.did 50 + evt.active status_str 51 + 52 + (** Format a handle event *) 53 + let format_handle (evt : Firehose.handle_event) = 54 + Printf.sprintf "[HANDLE] seq=%Ld did=%s handle=%s" evt.seq evt.did evt.handle 55 + 56 + (** Format a tombstone event *) 57 + let format_tombstone (evt : Firehose.tombstone_event) = 58 + Printf.sprintf "[TOMBSTONE] seq=%Ld did=%s" evt.seq evt.did 59 + 60 + (** Format an info message *) 61 + let format_info (msg : Firehose.info_message) = 62 + let msg_str = match msg.message with Some m -> m | None -> "(no message)" in 63 + Printf.sprintf "[INFO] name=%s message=%s" msg.name msg_str 64 + 65 + (** Format any event *) 66 + let format_event = function 67 + | Firehose.Commit evt -> format_commit evt 68 + | Firehose.Identity evt -> format_identity evt 69 + | Firehose.Account evt -> format_account evt 70 + | Firehose.Handle evt -> format_handle evt 71 + | Firehose.Tombstone evt -> format_tombstone evt 72 + | Firehose.Info msg -> format_info msg 73 + | Firehose.StreamError err -> Printf.sprintf "[ERROR] %s" err 74 + 75 + (** Event handler - prints each event and continues *) 76 + let handle_event event = 77 + print_endline (format_event event); 78 + true (* Continue receiving events *) 79 + 80 + (* Suppress unused warning - this is example code *) 81 + let _ = handle_event 82 + 83 + (** Main entry point *) 84 + let () = 85 + print_endline "AT Protocol Firehose Demo"; 86 + print_endline "========================="; 87 + print_endline ""; 88 + print_endline "This demo shows the structure of a firehose client."; 89 + print_endline ""; 90 + print_endline "The firehose module uses OCaml 5 effects for WebSocket I/O."; 91 + print_endline "To connect to the real Bluesky firehose, you need to provide"; 92 + print_endline "an effect handler that implements:"; 93 + print_endline ""; 94 + print_endline " - Firehose.Ws_connect : Uri.t -> (websocket, string) result"; 95 + print_endline " - Firehose.Ws_recv : websocket -> (string, string) result"; 96 + print_endline " - Firehose.Ws_close : websocket -> unit"; 97 + print_endline ""; 98 + print_endline "Example handler using eio-websocket:"; 99 + print_endline ""; 100 + print_endline 101 + {| let run_with_eio f = 102 + Eio_main.run @@ fun env -> 103 + Effect.Deep.match_with f () 104 + { 105 + retc = Fun.id; 106 + exnc = raise; 107 + effc = fun (type a) (e : a Effect.t) -> 108 + match e with 109 + | Firehose.Ws_connect uri -> 110 + Some (fun k -> 111 + let ws = Eio_websocket.connect env uri in 112 + Effect.Deep.continue k (Ok ws)) 113 + | Firehose.Ws_recv ws -> 114 + Some (fun k -> 115 + let msg = Eio_websocket.recv ws in 116 + Effect.Deep.continue k (Ok msg)) 117 + | Firehose.Ws_close ws -> 118 + Some (fun k -> 119 + Eio_websocket.close ws; 120 + Effect.Deep.continue k ()) 121 + | _ -> None 122 + }|}; 123 + print_endline ""; 124 + print_endline "Then subscribe to the firehose:"; 125 + print_endline ""; 126 + print_endline 127 + {| let config = 128 + Firehose.config 129 + ~uri:(Uri.of_string "wss://bsky.network/xrpc/com.atproto.sync.subscribeRepos") 130 + () 131 + in 132 + run_with_eio (fun () -> 133 + match Firehose.subscribe config ~handler:handle_event with 134 + | Ok () -> print_endline "Done" 135 + | Error e -> print_endline (Firehose.error_to_string e))|}; 136 + print_endline ""; 137 + print_endline "For a working example with eio, see:"; 138 + print_endline " https://github.com/bluesky-social/atproto"