atproto libraries implementation in ocaml
at main 16 kB view raw
1(** Firehose (Event Stream) Client for AT Protocol. 2 3 The firehose provides real-time updates from the network using WebSockets. 4 Events are encoded as DAG-CBOR with a header+payload structure. 5 6 Wire protocol: 7 - Binary WebSocket frames 8 - Each frame: header (DAG-CBOR) + payload (DAG-CBOR) 9 - Header: {{"op": 1, "t": "#commit"}} 10 11 This module uses the unified effects from {!Atproto_effects.Effects}. *) 12 13open Atproto_ipld 14module Effects = Atproto_effects.Effects 15 16(** {1 Types} *) 17 18type operation = { 19 action : [ `Create | `Update | `Delete ]; 20 path : string; (** collection/rkey format *) 21 cid : Cid.t option; 22} 23(** Operation in a commit event *) 24 25type commit_event = { 26 seq : int64; 27 repo : string; (** DID of the repo *) 28 rev : string; (** TID revision *) 29 since : string option; (** Previous revision *) 30 commit : Cid.t; 31 blocks : string; 32 (** CAR file slice containing blocks (raw bytes as string) *) 33 ops : operation list; 34 too_big : bool; 35} 36(** Commit event from the firehose *) 37 38type identity_event = { 39 seq : int64; 40 did : string; 41 time : string; (** ISO 8601 timestamp *) 42 handle : string option; 43} 44(** Identity event (handle changes, etc.) *) 45 46type account_event = { 47 seq : int64; 48 did : string; 49 time : string; 50 active : bool; 51 status : string option; 52} 53(** Account event (status changes) *) 54 55type handle_event = { 56 seq : int64; 57 did : string; 58 time : string; 59 handle : string; 60} 61(** Handle event (similar to identity but for handle changes specifically) *) 62 63type tombstone_event = { seq : int64; did : string; time : string } 64(** Tombstone event (repo deletion) *) 65 66type info_message = { name : string; message : string option } 67(** Info message *) 68 69(** Firehose event types *) 70type event = 71 | Commit of commit_event 72 | Identity of identity_event 73 | Account of account_event 74 | Handle of handle_event 75 | Tombstone of tombstone_event 76 | Info of info_message 77 | StreamError of string (** Error message from the stream *) 78 79type frame_header = { 80 op : int; (** 1 = message, -1 = error *) 81 t : string option; (** Event type like "#commit" *) 82} 83(** Frame header *) 84 85(** Firehose errors *) 86type error = 87 | Connection_error of string 88 | Decode_error of string 89 | Protocol_error of string 90 91let error_to_string = function 92 | Connection_error msg -> Printf.sprintf "Connection error: %s" msg 93 | Decode_error msg -> Printf.sprintf "Decode error: %s" msg 94 | Protocol_error msg -> Printf.sprintf "Protocol error: %s" msg 95 96(** {1 WebSocket Effects} *) 97 98type websocket = Effects.websocket 99(** Abstract WebSocket handle - uses unified type *) 100 101(** WebSocket effects. 102 103 Note: This module also supports the unified WebSocket effects from 104 {!Atproto_effects.Effects}. Handlers can match either these local effects or 105 the unified ones. The local effects are provided for backward compatibility. 106 107 The unified effects use {!Effects.ws_message} for recv, while this module 108 uses raw strings for simplicity. *) 109type _ Effect.t += 110 | Ws_connect : Uri.t -> (websocket, string) result Effect.t 111 | Ws_recv : websocket -> (string, string) result Effect.t 112 | Ws_close : websocket -> unit Effect.t 113 114(** {1 Frame Decoding} *) 115 116(** Decode a frame header from DAG-CBOR *) 117let decode_header cbor = 118 match cbor with 119 | Dag_cbor.Map pairs -> 120 let op = 121 match List.assoc_opt "op" pairs with 122 | Some (Dag_cbor.Int i) -> 123 Atproto_json.int_of_int64_default ~default:0 i 124 | _ -> 0 125 in 126 let t = 127 match List.assoc_opt "t" pairs with 128 | Some (Dag_cbor.String s) -> Some s 129 | _ -> None 130 in 131 { op; t } 132 | _ -> { op = 0; t = None } 133 134(** Get string field from CBOR map *) 135let get_string key pairs = 136 match List.assoc_opt key pairs with 137 | Some (Dag_cbor.String s) -> Some s 138 | _ -> None 139 140(** Get int64 field from CBOR map *) 141let get_int key pairs = 142 match List.assoc_opt key pairs with 143 | Some (Dag_cbor.Int i) -> Some i 144 | _ -> None 145 146(** Get bool field from CBOR map *) 147let get_bool key pairs = 148 match List.assoc_opt key pairs with 149 | Some (Dag_cbor.Bool b) -> Some b 150 | _ -> None 151 152(** Get bytes field from CBOR map (DAG-CBOR stores bytes as string) *) 153let get_bytes key pairs = 154 match List.assoc_opt key pairs with 155 | Some (Dag_cbor.Bytes b) -> Some b 156 | _ -> None 157 158(** Get CID link field from CBOR map *) 159let get_link key pairs = 160 match List.assoc_opt key pairs with 161 | Some (Dag_cbor.Link cid) -> Some cid 162 | _ -> None 163 164(** Get array field from CBOR map *) 165let get_array key pairs = 166 match List.assoc_opt key pairs with 167 | Some (Dag_cbor.Array items) -> Some items 168 | _ -> None 169 170(** Decode an operation from CBOR *) 171let decode_operation cbor = 172 match cbor with 173 | Dag_cbor.Map pairs -> 174 let action = 175 match get_string "action" pairs with 176 | Some "create" -> `Create 177 | Some "update" -> `Update 178 | Some "delete" -> `Delete 179 | _ -> `Create 180 in 181 let path = get_string "path" pairs |> Option.value ~default:"" in 182 let cid = get_link "cid" pairs in 183 { action; path; cid } 184 | _ -> { action = `Create; path = ""; cid = None } 185 186(** Decode a commit event from CBOR *) 187let decode_commit pairs = 188 let seq = get_int "seq" pairs |> Option.value ~default:0L in 189 let repo = get_string "repo" pairs |> Option.value ~default:"" in 190 let rev = get_string "rev" pairs |> Option.value ~default:"" in 191 let since = get_string "since" pairs in 192 let commit = get_link "commit" pairs in 193 let blocks = get_bytes "blocks" pairs |> Option.value ~default:"" in 194 let ops = 195 get_array "ops" pairs |> Option.value ~default:[] 196 |> List.map decode_operation 197 in 198 let too_big = get_bool "tooBig" pairs |> Option.value ~default:false in 199 match commit with 200 | Some cid -> 201 Some { seq; repo; rev; since; commit = cid; blocks; ops; too_big } 202 | None -> None 203 204(** Decode an identity event from CBOR *) 205let decode_identity pairs : identity_event = 206 let seq = get_int "seq" pairs |> Option.value ~default:0L in 207 let did = get_string "did" pairs |> Option.value ~default:"" in 208 let time = get_string "time" pairs |> Option.value ~default:"" in 209 let handle = get_string "handle" pairs in 210 { seq; did; time; handle } 211 212(** Decode an account event from CBOR *) 213let decode_account pairs = 214 let seq = get_int "seq" pairs |> Option.value ~default:0L in 215 let did = get_string "did" pairs |> Option.value ~default:"" in 216 let time = get_string "time" pairs |> Option.value ~default:"" in 217 let active = get_bool "active" pairs |> Option.value ~default:true in 218 let status = get_string "status" pairs in 219 { seq; did; time; active; status } 220 221(** Decode a handle event from CBOR *) 222let decode_handle pairs = 223 let seq = get_int "seq" pairs |> Option.value ~default:0L in 224 let did = get_string "did" pairs |> Option.value ~default:"" in 225 let time = get_string "time" pairs |> Option.value ~default:"" in 226 let handle = get_string "handle" pairs |> Option.value ~default:"" in 227 { seq; did; time; handle } 228 229(** Decode a tombstone event from CBOR *) 230let decode_tombstone pairs = 231 let seq = get_int "seq" pairs |> Option.value ~default:0L in 232 let did = get_string "did" pairs |> Option.value ~default:"" in 233 let time = get_string "time" pairs |> Option.value ~default:"" in 234 { seq; did; time } 235 236(** Decode an info message from CBOR *) 237let decode_info pairs = 238 let name = get_string "name" pairs |> Option.value ~default:"" in 239 let message = get_string "message" pairs in 240 { name; message } 241 242(** Decode a frame (header + payload) from string. A frame consists of two 243 concatenated DAG-CBOR values. *) 244let decode_frame (data : string) : (event, error) result = 245 match Dag_cbor.decode_partial data with 246 | Error _ -> Error (Decode_error "invalid header CBOR") 247 | Ok (header_cbor, payload_data) -> 248 let header = decode_header header_cbor in 249 if String.length payload_data = 0 then 250 Error (Decode_error "missing payload") 251 else if header.op = -1 then 252 (* Error frame *) 253 match Dag_cbor.decode payload_data with 254 | Ok (Dag_cbor.Map pairs) -> 255 let msg = 256 get_string "error" pairs |> Option.value ~default:"unknown error" 257 in 258 Ok (StreamError msg) 259 | _ -> Ok (StreamError "unknown error") 260 else if header.op = 1 then 261 (* Message frame *) 262 match Dag_cbor.decode payload_data with 263 | Error _ -> Error (Decode_error "invalid payload CBOR") 264 | Ok payload -> ( 265 match payload with 266 | Dag_cbor.Map pairs -> ( 267 match header.t with 268 | Some "#commit" -> ( 269 match decode_commit pairs with 270 | Some evt -> Ok (Commit evt) 271 | None -> Error (Decode_error "invalid commit")) 272 | Some "#identity" -> Ok (Identity (decode_identity pairs)) 273 | Some "#account" -> Ok (Account (decode_account pairs)) 274 | Some "#handle" -> Ok (Handle (decode_handle pairs)) 275 | Some "#tombstone" -> Ok (Tombstone (decode_tombstone pairs)) 276 | Some "#info" -> Ok (Info (decode_info pairs)) 277 | Some t -> Error (Protocol_error ("unknown event type: " ^ t)) 278 | None -> Error (Protocol_error "missing event type")) 279 | _ -> Error (Decode_error "payload must be object")) 280 else Error (Protocol_error (Printf.sprintf "unknown op: %d" header.op)) 281 282(** {1 Subscription} *) 283 284type config = { 285 uri : Uri.t; 286 cursor : int64 option; (** Sequence number to start from *) 287} 288(** Firehose subscription configuration *) 289 290(** Create a subscription config *) 291let config ~uri ?cursor () = { uri; cursor } 292 293(** Build the subscription URI with cursor *) 294let build_uri config = 295 let base = config.uri in 296 match config.cursor with 297 | None -> base 298 | Some cursor -> 299 Uri.add_query_param base ("cursor", [ Int64.to_string cursor ]) 300 301(** Subscribe to the firehose and call handler for each event. The handler 302 returns [true] to continue, [false] to stop. *) 303let subscribe config ~handler = 304 let uri = build_uri config in 305 match Effect.perform (Ws_connect uri) with 306 | Error msg -> Error (Connection_error msg) 307 | Ok ws -> 308 let rec loop () = 309 match Effect.perform (Ws_recv ws) with 310 | Error msg -> 311 Effect.perform (Ws_close ws); 312 Error (Connection_error msg) 313 | Ok data -> ( 314 match decode_frame data with 315 | Error e -> 316 Effect.perform (Ws_close ws); 317 Error e 318 | Ok event -> 319 if handler event then loop () 320 else ( 321 Effect.perform (Ws_close ws); 322 Ok ())) 323 in 324 loop () 325 326(** Get the sequence number from an event *) 327let event_seq = function 328 | Commit e -> Some e.seq 329 | Identity e -> Some e.seq 330 | Account e -> Some e.seq 331 | Handle e -> Some e.seq 332 | Tombstone e -> Some e.seq 333 | Info _ -> None 334 | StreamError _ -> None 335 336(** Get the DID from an event (if applicable) *) 337let event_did = function 338 | Commit e -> Some e.repo 339 | Identity e -> Some e.did 340 | Account e -> Some e.did 341 | Handle e -> Some e.did 342 | Tombstone e -> Some e.did 343 | Info _ -> None 344 | StreamError _ -> None 345 346(** {1 JSON Serialization} *) 347 348(** Convert an operation to JSON. Does not include record content - use 349 [add_record_to_op_json] to add it. *) 350let operation_to_json (op : operation) : Atproto_json.t = 351 let action = 352 match op.action with 353 | `Create -> "create" 354 | `Update -> "update" 355 | `Delete -> "delete" 356 in 357 let base = 358 [ 359 ("action", Atproto_json.string action); 360 ("path", Atproto_json.string op.path); 361 ] 362 in 363 match op.cid with 364 | Some c -> 365 Atproto_json.object_ 366 (base @ [ ("cid", Atproto_json.string (Cid.to_string c)) ]) 367 | None -> Atproto_json.object_ base 368 369(** Add a record field to an operation JSON object *) 370let add_record_to_op_json (op_json : Atproto_json.t) (record : Atproto_json.t) : 371 Atproto_json.t = 372 match Atproto_json.to_object_opt op_json with 373 | Some fields -> Atproto_json.object_ (fields @ [ ("record", record) ]) 374 | None -> op_json 375 376(** Convert a commit event to JSON. Operations don't include record content. *) 377let commit_event_to_json (evt : commit_event) : Atproto_json.t = 378 let ops = Atproto_json.array (List.map operation_to_json evt.ops) in 379 Atproto_json.object_ 380 [ 381 ("type", Atproto_json.string "commit"); 382 ("seq", Atproto_json.int64 evt.seq); 383 ("repo", Atproto_json.string evt.repo); 384 ("rev", Atproto_json.string evt.rev); 385 ("commit", Atproto_json.string (Cid.to_string evt.commit)); 386 ("ops", ops); 387 ] 388 389(** Convert an identity event to JSON *) 390let identity_event_to_json (e : identity_event) : Atproto_json.t = 391 let base = 392 [ 393 ("type", Atproto_json.string "identity"); 394 ("seq", Atproto_json.int64 e.seq); 395 ("did", Atproto_json.string e.did); 396 ("time", Atproto_json.string e.time); 397 ] 398 in 399 let with_handle = 400 match e.handle with 401 | Some h -> base @ [ ("handle", Atproto_json.string h) ] 402 | None -> base 403 in 404 Atproto_json.object_ with_handle 405 406(** Convert an account event to JSON *) 407let account_event_to_json (e : account_event) : Atproto_json.t = 408 let base = 409 [ 410 ("type", Atproto_json.string "account"); 411 ("seq", Atproto_json.int64 e.seq); 412 ("did", Atproto_json.string e.did); 413 ("time", Atproto_json.string e.time); 414 ("active", Atproto_json.bool e.active); 415 ] 416 in 417 let with_status = 418 match e.status with 419 | Some s -> base @ [ ("status", Atproto_json.string s) ] 420 | None -> base 421 in 422 Atproto_json.object_ with_status 423 424(** Convert a handle event to JSON *) 425let handle_event_to_json (e : handle_event) : Atproto_json.t = 426 Atproto_json.object_ 427 [ 428 ("type", Atproto_json.string "handle"); 429 ("seq", Atproto_json.int64 e.seq); 430 ("did", Atproto_json.string e.did); 431 ("time", Atproto_json.string e.time); 432 ("handle", Atproto_json.string e.handle); 433 ] 434 435(** Convert a tombstone event to JSON *) 436let tombstone_event_to_json (e : tombstone_event) : Atproto_json.t = 437 Atproto_json.object_ 438 [ 439 ("type", Atproto_json.string "tombstone"); 440 ("seq", Atproto_json.int64 e.seq); 441 ("did", Atproto_json.string e.did); 442 ("time", Atproto_json.string e.time); 443 ] 444 445(** Convert an info message to JSON *) 446let info_message_to_json (m : info_message) : Atproto_json.t = 447 let base = 448 [ 449 ("type", Atproto_json.string "info"); ("name", Atproto_json.string m.name); 450 ] 451 in 452 let with_msg = 453 match m.message with 454 | Some msg -> base @ [ ("message", Atproto_json.string msg) ] 455 | None -> base 456 in 457 Atproto_json.object_ with_msg 458 459(** Convert any event to JSON. For commit events, operations don't include 460 record content - extract blocks separately and use [add_record_to_op_json]. 461*) 462let event_to_json = function 463 | Commit evt -> commit_event_to_json evt 464 | Identity e -> identity_event_to_json e 465 | Account e -> account_event_to_json e 466 | Handle e -> handle_event_to_json e 467 | Tombstone e -> tombstone_event_to_json e 468 | Info m -> info_message_to_json m 469 | StreamError msg -> 470 Atproto_json.object_ 471 [ 472 ("type", Atproto_json.string "error"); 473 ("message", Atproto_json.string msg); 474 ] 475 476(** Convert an event to a JSON string *) 477let event_to_json_string evt = Atproto_json.encode (event_to_json evt)