(** Firehose (Event Stream) Client for AT Protocol. The firehose provides real-time updates from the network using WebSockets. Events are encoded as DAG-CBOR with a header+payload structure. Wire protocol: - Binary WebSocket frames - Each frame: header (DAG-CBOR) + payload (DAG-CBOR) - Header: {{"op": 1, "t": "#commit"}} This module uses the unified effects from {!Atproto_effects.Effects}. *) open Atproto_ipld module Effects = Atproto_effects.Effects (** {1 Types} *) type operation = { action : [ `Create | `Update | `Delete ]; path : string; (** collection/rkey format *) cid : Cid.t option; } (** Operation in a commit event *) type commit_event = { seq : int64; repo : string; (** DID of the repo *) rev : string; (** TID revision *) since : string option; (** Previous revision *) commit : Cid.t; blocks : string; (** CAR file slice containing blocks (raw bytes as string) *) ops : operation list; too_big : bool; } (** Commit event from the firehose *) type identity_event = { seq : int64; did : string; time : string; (** ISO 8601 timestamp *) handle : string option; } (** Identity event (handle changes, etc.) *) type account_event = { seq : int64; did : string; time : string; active : bool; status : string option; } (** Account event (status changes) *) type handle_event = { seq : int64; did : string; time : string; handle : string; } (** Handle event (similar to identity but for handle changes specifically) *) type tombstone_event = { seq : int64; did : string; time : string } (** Tombstone event (repo deletion) *) type info_message = { name : string; message : string option } (** Info message *) (** Firehose event types *) type event = | Commit of commit_event | Identity of identity_event | Account of account_event | Handle of handle_event | Tombstone of tombstone_event | Info of info_message | StreamError of string (** Error message from the stream *) type frame_header = { op : int; (** 1 = message, -1 = error *) t : string option; (** Event type like "#commit" *) } (** Frame header *) (** Firehose errors *) type error = | Connection_error of string | Decode_error of string | Protocol_error of string let error_to_string = function | Connection_error msg -> Printf.sprintf "Connection error: %s" msg | Decode_error msg -> Printf.sprintf "Decode error: %s" msg | Protocol_error msg -> Printf.sprintf "Protocol error: %s" msg (** {1 WebSocket Effects} *) type websocket = Effects.websocket (** Abstract WebSocket handle - uses unified type *) (** WebSocket effects. Note: This module also supports the unified WebSocket effects from {!Atproto_effects.Effects}. Handlers can match either these local effects or the unified ones. The local effects are provided for backward compatibility. The unified effects use {!Effects.ws_message} for recv, while this module uses raw strings for simplicity. *) type _ Effect.t += | Ws_connect : Uri.t -> (websocket, string) result Effect.t | Ws_recv : websocket -> (string, string) result Effect.t | Ws_close : websocket -> unit Effect.t (** {1 Frame Decoding} *) (** Decode a frame header from DAG-CBOR *) let decode_header cbor = match cbor with | Dag_cbor.Map pairs -> let op = match List.assoc_opt "op" pairs with | Some (Dag_cbor.Int i) -> Atproto_json.int_of_int64_default ~default:0 i | _ -> 0 in let t = match List.assoc_opt "t" pairs with | Some (Dag_cbor.String s) -> Some s | _ -> None in { op; t } | _ -> { op = 0; t = None } (** Get string field from CBOR map *) let get_string key pairs = match List.assoc_opt key pairs with | Some (Dag_cbor.String s) -> Some s | _ -> None (** Get int64 field from CBOR map *) let get_int key pairs = match List.assoc_opt key pairs with | Some (Dag_cbor.Int i) -> Some i | _ -> None (** Get bool field from CBOR map *) let get_bool key pairs = match List.assoc_opt key pairs with | Some (Dag_cbor.Bool b) -> Some b | _ -> None (** Get bytes field from CBOR map (DAG-CBOR stores bytes as string) *) let get_bytes key pairs = match List.assoc_opt key pairs with | Some (Dag_cbor.Bytes b) -> Some b | _ -> None (** Get CID link field from CBOR map *) let get_link key pairs = match List.assoc_opt key pairs with | Some (Dag_cbor.Link cid) -> Some cid | _ -> None (** Get array field from CBOR map *) let get_array key pairs = match List.assoc_opt key pairs with | Some (Dag_cbor.Array items) -> Some items | _ -> None (** Decode an operation from CBOR *) let decode_operation cbor = match cbor with | Dag_cbor.Map pairs -> let action = match get_string "action" pairs with | Some "create" -> `Create | Some "update" -> `Update | Some "delete" -> `Delete | _ -> `Create in let path = get_string "path" pairs |> Option.value ~default:"" in let cid = get_link "cid" pairs in { action; path; cid } | _ -> { action = `Create; path = ""; cid = None } (** Decode a commit event from CBOR *) let decode_commit pairs = let seq = get_int "seq" pairs |> Option.value ~default:0L in let repo = get_string "repo" pairs |> Option.value ~default:"" in let rev = get_string "rev" pairs |> Option.value ~default:"" in let since = get_string "since" pairs in let commit = get_link "commit" pairs in let blocks = get_bytes "blocks" pairs |> Option.value ~default:"" in let ops = get_array "ops" pairs |> Option.value ~default:[] |> List.map decode_operation in let too_big = get_bool "tooBig" pairs |> Option.value ~default:false in match commit with | Some cid -> Some { seq; repo; rev; since; commit = cid; blocks; ops; too_big } | None -> None (** Decode an identity event from CBOR *) let decode_identity pairs : identity_event = let seq = get_int "seq" pairs |> Option.value ~default:0L in let did = get_string "did" pairs |> Option.value ~default:"" in let time = get_string "time" pairs |> Option.value ~default:"" in let handle = get_string "handle" pairs in { seq; did; time; handle } (** Decode an account event from CBOR *) let decode_account pairs = let seq = get_int "seq" pairs |> Option.value ~default:0L in let did = get_string "did" pairs |> Option.value ~default:"" in let time = get_string "time" pairs |> Option.value ~default:"" in let active = get_bool "active" pairs |> Option.value ~default:true in let status = get_string "status" pairs in { seq; did; time; active; status } (** Decode a handle event from CBOR *) let decode_handle pairs = let seq = get_int "seq" pairs |> Option.value ~default:0L in let did = get_string "did" pairs |> Option.value ~default:"" in let time = get_string "time" pairs |> Option.value ~default:"" in let handle = get_string "handle" pairs |> Option.value ~default:"" in { seq; did; time; handle } (** Decode a tombstone event from CBOR *) let decode_tombstone pairs = let seq = get_int "seq" pairs |> Option.value ~default:0L in let did = get_string "did" pairs |> Option.value ~default:"" in let time = get_string "time" pairs |> Option.value ~default:"" in { seq; did; time } (** Decode an info message from CBOR *) let decode_info pairs = let name = get_string "name" pairs |> Option.value ~default:"" in let message = get_string "message" pairs in { name; message } (** Decode a frame (header + payload) from string. A frame consists of two concatenated DAG-CBOR values. *) let decode_frame (data : string) : (event, error) result = match Dag_cbor.decode_partial data with | Error _ -> Error (Decode_error "invalid header CBOR") | Ok (header_cbor, payload_data) -> let header = decode_header header_cbor in if String.length payload_data = 0 then Error (Decode_error "missing payload") else if header.op = -1 then (* Error frame *) match Dag_cbor.decode payload_data with | Ok (Dag_cbor.Map pairs) -> let msg = get_string "error" pairs |> Option.value ~default:"unknown error" in Ok (StreamError msg) | _ -> Ok (StreamError "unknown error") else if header.op = 1 then (* Message frame *) match Dag_cbor.decode payload_data with | Error _ -> Error (Decode_error "invalid payload CBOR") | Ok payload -> ( match payload with | Dag_cbor.Map pairs -> ( match header.t with | Some "#commit" -> ( match decode_commit pairs with | Some evt -> Ok (Commit evt) | None -> Error (Decode_error "invalid commit")) | Some "#identity" -> Ok (Identity (decode_identity pairs)) | Some "#account" -> Ok (Account (decode_account pairs)) | Some "#handle" -> Ok (Handle (decode_handle pairs)) | Some "#tombstone" -> Ok (Tombstone (decode_tombstone pairs)) | Some "#info" -> Ok (Info (decode_info pairs)) | Some t -> Error (Protocol_error ("unknown event type: " ^ t)) | None -> Error (Protocol_error "missing event type")) | _ -> Error (Decode_error "payload must be object")) else Error (Protocol_error (Printf.sprintf "unknown op: %d" header.op)) (** {1 Subscription} *) type config = { uri : Uri.t; cursor : int64 option; (** Sequence number to start from *) } (** Firehose subscription configuration *) (** Create a subscription config *) let config ~uri ?cursor () = { uri; cursor } (** Build the subscription URI with cursor *) let build_uri config = let base = config.uri in match config.cursor with | None -> base | Some cursor -> Uri.add_query_param base ("cursor", [ Int64.to_string cursor ]) (** Subscribe to the firehose and call handler for each event. The handler returns [true] to continue, [false] to stop. *) let subscribe config ~handler = let uri = build_uri config in match Effect.perform (Ws_connect uri) with | Error msg -> Error (Connection_error msg) | Ok ws -> let rec loop () = match Effect.perform (Ws_recv ws) with | Error msg -> Effect.perform (Ws_close ws); Error (Connection_error msg) | Ok data -> ( match decode_frame data with | Error e -> Effect.perform (Ws_close ws); Error e | Ok event -> if handler event then loop () else ( Effect.perform (Ws_close ws); Ok ())) in loop () (** Get the sequence number from an event *) let event_seq = function | Commit e -> Some e.seq | Identity e -> Some e.seq | Account e -> Some e.seq | Handle e -> Some e.seq | Tombstone e -> Some e.seq | Info _ -> None | StreamError _ -> None (** Get the DID from an event (if applicable) *) let event_did = function | Commit e -> Some e.repo | Identity e -> Some e.did | Account e -> Some e.did | Handle e -> Some e.did | Tombstone e -> Some e.did | Info _ -> None | StreamError _ -> None (** {1 JSON Serialization} *) (** Convert an operation to JSON. Does not include record content - use [add_record_to_op_json] to add it. *) let operation_to_json (op : operation) : Atproto_json.t = let action = match op.action with | `Create -> "create" | `Update -> "update" | `Delete -> "delete" in let base = [ ("action", Atproto_json.string action); ("path", Atproto_json.string op.path); ] in match op.cid with | Some c -> Atproto_json.object_ (base @ [ ("cid", Atproto_json.string (Cid.to_string c)) ]) | None -> Atproto_json.object_ base (** Add a record field to an operation JSON object *) let add_record_to_op_json (op_json : Atproto_json.t) (record : Atproto_json.t) : Atproto_json.t = match Atproto_json.to_object_opt op_json with | Some fields -> Atproto_json.object_ (fields @ [ ("record", record) ]) | None -> op_json (** Convert a commit event to JSON. Operations don't include record content. *) let commit_event_to_json (evt : commit_event) : Atproto_json.t = let ops = Atproto_json.array (List.map operation_to_json evt.ops) in Atproto_json.object_ [ ("type", Atproto_json.string "commit"); ("seq", Atproto_json.int64 evt.seq); ("repo", Atproto_json.string evt.repo); ("rev", Atproto_json.string evt.rev); ("commit", Atproto_json.string (Cid.to_string evt.commit)); ("ops", ops); ] (** Convert an identity event to JSON *) let identity_event_to_json (e : identity_event) : Atproto_json.t = let base = [ ("type", Atproto_json.string "identity"); ("seq", Atproto_json.int64 e.seq); ("did", Atproto_json.string e.did); ("time", Atproto_json.string e.time); ] in let with_handle = match e.handle with | Some h -> base @ [ ("handle", Atproto_json.string h) ] | None -> base in Atproto_json.object_ with_handle (** Convert an account event to JSON *) let account_event_to_json (e : account_event) : Atproto_json.t = let base = [ ("type", Atproto_json.string "account"); ("seq", Atproto_json.int64 e.seq); ("did", Atproto_json.string e.did); ("time", Atproto_json.string e.time); ("active", Atproto_json.bool e.active); ] in let with_status = match e.status with | Some s -> base @ [ ("status", Atproto_json.string s) ] | None -> base in Atproto_json.object_ with_status (** Convert a handle event to JSON *) let handle_event_to_json (e : handle_event) : Atproto_json.t = Atproto_json.object_ [ ("type", Atproto_json.string "handle"); ("seq", Atproto_json.int64 e.seq); ("did", Atproto_json.string e.did); ("time", Atproto_json.string e.time); ("handle", Atproto_json.string e.handle); ] (** Convert a tombstone event to JSON *) let tombstone_event_to_json (e : tombstone_event) : Atproto_json.t = Atproto_json.object_ [ ("type", Atproto_json.string "tombstone"); ("seq", Atproto_json.int64 e.seq); ("did", Atproto_json.string e.did); ("time", Atproto_json.string e.time); ] (** Convert an info message to JSON *) let info_message_to_json (m : info_message) : Atproto_json.t = let base = [ ("type", Atproto_json.string "info"); ("name", Atproto_json.string m.name); ] in let with_msg = match m.message with | Some msg -> base @ [ ("message", Atproto_json.string msg) ] | None -> base in Atproto_json.object_ with_msg (** Convert any event to JSON. For commit events, operations don't include record content - extract blocks separately and use [add_record_to_op_json]. *) let event_to_json = function | Commit evt -> commit_event_to_json evt | Identity e -> identity_event_to_json e | Account e -> account_event_to_json e | Handle e -> handle_event_to_json e | Tombstone e -> tombstone_event_to_json e | Info m -> info_message_to_json m | StreamError msg -> Atproto_json.object_ [ ("type", Atproto_json.string "error"); ("message", Atproto_json.string msg); ] (** Convert an event to a JSON string *) let event_to_json_string evt = Atproto_json.encode (event_to_json evt)