atproto libraries implementation in ocaml
1(** Firehose (Event Stream) Client for AT Protocol.
2
3 The firehose provides real-time updates from the network using WebSockets.
4 Events are encoded as DAG-CBOR with a header+payload structure.
5
6 Wire protocol:
7 - Binary WebSocket frames
8 - Each frame: header (DAG-CBOR) + payload (DAG-CBOR)
9 - Header: {{"op": 1, "t": "#commit"}}
10
11 This module uses the unified effects from {!Atproto_effects.Effects}. *)
12
13open Atproto_ipld
14module Effects = Atproto_effects.Effects
15
16(** {1 Types} *)
17
18type operation = {
19 action : [ `Create | `Update | `Delete ];
20 path : string; (** collection/rkey format *)
21 cid : Cid.t option;
22}
23(** Operation in a commit event *)
24
25type commit_event = {
26 seq : int64;
27 repo : string; (** DID of the repo *)
28 rev : string; (** TID revision *)
29 since : string option; (** Previous revision *)
30 commit : Cid.t;
31 blocks : string;
32 (** CAR file slice containing blocks (raw bytes as string) *)
33 ops : operation list;
34 too_big : bool;
35}
36(** Commit event from the firehose *)
37
38type identity_event = {
39 seq : int64;
40 did : string;
41 time : string; (** ISO 8601 timestamp *)
42 handle : string option;
43}
44(** Identity event (handle changes, etc.) *)
45
46type account_event = {
47 seq : int64;
48 did : string;
49 time : string;
50 active : bool;
51 status : string option;
52}
53(** Account event (status changes) *)
54
55type handle_event = {
56 seq : int64;
57 did : string;
58 time : string;
59 handle : string;
60}
61(** Handle event (similar to identity but for handle changes specifically) *)
62
63type tombstone_event = { seq : int64; did : string; time : string }
64(** Tombstone event (repo deletion) *)
65
66type info_message = { name : string; message : string option }
67(** Info message *)
68
69(** Firehose event types *)
70type event =
71 | Commit of commit_event
72 | Identity of identity_event
73 | Account of account_event
74 | Handle of handle_event
75 | Tombstone of tombstone_event
76 | Info of info_message
77 | StreamError of string (** Error message from the stream *)
78
79type frame_header = {
80 op : int; (** 1 = message, -1 = error *)
81 t : string option; (** Event type like "#commit" *)
82}
83(** Frame header *)
84
85(** Firehose errors *)
86type error =
87 | Connection_error of string
88 | Decode_error of string
89 | Protocol_error of string
90
91let error_to_string = function
92 | Connection_error msg -> Printf.sprintf "Connection error: %s" msg
93 | Decode_error msg -> Printf.sprintf "Decode error: %s" msg
94 | Protocol_error msg -> Printf.sprintf "Protocol error: %s" msg
95
96(** {1 WebSocket Effects} *)
97
98type websocket = Effects.websocket
99(** Abstract WebSocket handle - uses unified type *)
100
101(** WebSocket effects.
102
103 Note: This module also supports the unified WebSocket effects from
104 {!Atproto_effects.Effects}. Handlers can match either these local effects or
105 the unified ones. The local effects are provided for backward compatibility.
106
107 The unified effects use {!Effects.ws_message} for recv, while this module
108 uses raw strings for simplicity. *)
109type _ Effect.t +=
110 | Ws_connect : Uri.t -> (websocket, string) result Effect.t
111 | Ws_recv : websocket -> (string, string) result Effect.t
112 | Ws_close : websocket -> unit Effect.t
113
114(** {1 Frame Decoding} *)
115
116(** Decode a frame header from DAG-CBOR *)
117let decode_header cbor =
118 match cbor with
119 | Dag_cbor.Map pairs ->
120 let op =
121 match List.assoc_opt "op" pairs with
122 | Some (Dag_cbor.Int i) ->
123 Atproto_json.int_of_int64_default ~default:0 i
124 | _ -> 0
125 in
126 let t =
127 match List.assoc_opt "t" pairs with
128 | Some (Dag_cbor.String s) -> Some s
129 | _ -> None
130 in
131 { op; t }
132 | _ -> { op = 0; t = None }
133
134(** Get string field from CBOR map *)
135let get_string key pairs =
136 match List.assoc_opt key pairs with
137 | Some (Dag_cbor.String s) -> Some s
138 | _ -> None
139
140(** Get int64 field from CBOR map *)
141let get_int key pairs =
142 match List.assoc_opt key pairs with
143 | Some (Dag_cbor.Int i) -> Some i
144 | _ -> None
145
146(** Get bool field from CBOR map *)
147let get_bool key pairs =
148 match List.assoc_opt key pairs with
149 | Some (Dag_cbor.Bool b) -> Some b
150 | _ -> None
151
152(** Get bytes field from CBOR map (DAG-CBOR stores bytes as string) *)
153let get_bytes key pairs =
154 match List.assoc_opt key pairs with
155 | Some (Dag_cbor.Bytes b) -> Some b
156 | _ -> None
157
158(** Get CID link field from CBOR map *)
159let get_link key pairs =
160 match List.assoc_opt key pairs with
161 | Some (Dag_cbor.Link cid) -> Some cid
162 | _ -> None
163
164(** Get array field from CBOR map *)
165let get_array key pairs =
166 match List.assoc_opt key pairs with
167 | Some (Dag_cbor.Array items) -> Some items
168 | _ -> None
169
170(** Decode an operation from CBOR *)
171let decode_operation cbor =
172 match cbor with
173 | Dag_cbor.Map pairs ->
174 let action =
175 match get_string "action" pairs with
176 | Some "create" -> `Create
177 | Some "update" -> `Update
178 | Some "delete" -> `Delete
179 | _ -> `Create
180 in
181 let path = get_string "path" pairs |> Option.value ~default:"" in
182 let cid = get_link "cid" pairs in
183 { action; path; cid }
184 | _ -> { action = `Create; path = ""; cid = None }
185
186(** Decode a commit event from CBOR *)
187let decode_commit pairs =
188 let seq = get_int "seq" pairs |> Option.value ~default:0L in
189 let repo = get_string "repo" pairs |> Option.value ~default:"" in
190 let rev = get_string "rev" pairs |> Option.value ~default:"" in
191 let since = get_string "since" pairs in
192 let commit = get_link "commit" pairs in
193 let blocks = get_bytes "blocks" pairs |> Option.value ~default:"" in
194 let ops =
195 get_array "ops" pairs |> Option.value ~default:[]
196 |> List.map decode_operation
197 in
198 let too_big = get_bool "tooBig" pairs |> Option.value ~default:false in
199 match commit with
200 | Some cid ->
201 Some { seq; repo; rev; since; commit = cid; blocks; ops; too_big }
202 | None -> None
203
204(** Decode an identity event from CBOR *)
205let decode_identity pairs : identity_event =
206 let seq = get_int "seq" pairs |> Option.value ~default:0L in
207 let did = get_string "did" pairs |> Option.value ~default:"" in
208 let time = get_string "time" pairs |> Option.value ~default:"" in
209 let handle = get_string "handle" pairs in
210 { seq; did; time; handle }
211
212(** Decode an account event from CBOR *)
213let decode_account pairs =
214 let seq = get_int "seq" pairs |> Option.value ~default:0L in
215 let did = get_string "did" pairs |> Option.value ~default:"" in
216 let time = get_string "time" pairs |> Option.value ~default:"" in
217 let active = get_bool "active" pairs |> Option.value ~default:true in
218 let status = get_string "status" pairs in
219 { seq; did; time; active; status }
220
221(** Decode a handle event from CBOR *)
222let decode_handle pairs =
223 let seq = get_int "seq" pairs |> Option.value ~default:0L in
224 let did = get_string "did" pairs |> Option.value ~default:"" in
225 let time = get_string "time" pairs |> Option.value ~default:"" in
226 let handle = get_string "handle" pairs |> Option.value ~default:"" in
227 { seq; did; time; handle }
228
229(** Decode a tombstone event from CBOR *)
230let decode_tombstone pairs =
231 let seq = get_int "seq" pairs |> Option.value ~default:0L in
232 let did = get_string "did" pairs |> Option.value ~default:"" in
233 let time = get_string "time" pairs |> Option.value ~default:"" in
234 { seq; did; time }
235
236(** Decode an info message from CBOR *)
237let decode_info pairs =
238 let name = get_string "name" pairs |> Option.value ~default:"" in
239 let message = get_string "message" pairs in
240 { name; message }
241
242(** Decode a frame (header + payload) from string. A frame consists of two
243 concatenated DAG-CBOR values. *)
244let decode_frame (data : string) : (event, error) result =
245 match Dag_cbor.decode_partial data with
246 | Error _ -> Error (Decode_error "invalid header CBOR")
247 | Ok (header_cbor, payload_data) ->
248 let header = decode_header header_cbor in
249 if String.length payload_data = 0 then
250 Error (Decode_error "missing payload")
251 else if header.op = -1 then
252 (* Error frame *)
253 match Dag_cbor.decode payload_data with
254 | Ok (Dag_cbor.Map pairs) ->
255 let msg =
256 get_string "error" pairs |> Option.value ~default:"unknown error"
257 in
258 Ok (StreamError msg)
259 | _ -> Ok (StreamError "unknown error")
260 else if header.op = 1 then
261 (* Message frame *)
262 match Dag_cbor.decode payload_data with
263 | Error _ -> Error (Decode_error "invalid payload CBOR")
264 | Ok payload -> (
265 match payload with
266 | Dag_cbor.Map pairs -> (
267 match header.t with
268 | Some "#commit" -> (
269 match decode_commit pairs with
270 | Some evt -> Ok (Commit evt)
271 | None -> Error (Decode_error "invalid commit"))
272 | Some "#identity" -> Ok (Identity (decode_identity pairs))
273 | Some "#account" -> Ok (Account (decode_account pairs))
274 | Some "#handle" -> Ok (Handle (decode_handle pairs))
275 | Some "#tombstone" -> Ok (Tombstone (decode_tombstone pairs))
276 | Some "#info" -> Ok (Info (decode_info pairs))
277 | Some t -> Error (Protocol_error ("unknown event type: " ^ t))
278 | None -> Error (Protocol_error "missing event type"))
279 | _ -> Error (Decode_error "payload must be object"))
280 else Error (Protocol_error (Printf.sprintf "unknown op: %d" header.op))
281
282(** {1 Subscription} *)
283
284type config = {
285 uri : Uri.t;
286 cursor : int64 option; (** Sequence number to start from *)
287}
288(** Firehose subscription configuration *)
289
290(** Create a subscription config *)
291let config ~uri ?cursor () = { uri; cursor }
292
293(** Build the subscription URI with cursor *)
294let build_uri config =
295 let base = config.uri in
296 match config.cursor with
297 | None -> base
298 | Some cursor ->
299 Uri.add_query_param base ("cursor", [ Int64.to_string cursor ])
300
301(** Subscribe to the firehose and call handler for each event. The handler
302 returns [true] to continue, [false] to stop. *)
303let subscribe config ~handler =
304 let uri = build_uri config in
305 match Effect.perform (Ws_connect uri) with
306 | Error msg -> Error (Connection_error msg)
307 | Ok ws ->
308 let rec loop () =
309 match Effect.perform (Ws_recv ws) with
310 | Error msg ->
311 Effect.perform (Ws_close ws);
312 Error (Connection_error msg)
313 | Ok data -> (
314 match decode_frame data with
315 | Error e ->
316 Effect.perform (Ws_close ws);
317 Error e
318 | Ok event ->
319 if handler event then loop ()
320 else (
321 Effect.perform (Ws_close ws);
322 Ok ()))
323 in
324 loop ()
325
326(** Get the sequence number from an event *)
327let event_seq = function
328 | Commit e -> Some e.seq
329 | Identity e -> Some e.seq
330 | Account e -> Some e.seq
331 | Handle e -> Some e.seq
332 | Tombstone e -> Some e.seq
333 | Info _ -> None
334 | StreamError _ -> None
335
336(** Get the DID from an event (if applicable) *)
337let event_did = function
338 | Commit e -> Some e.repo
339 | Identity e -> Some e.did
340 | Account e -> Some e.did
341 | Handle e -> Some e.did
342 | Tombstone e -> Some e.did
343 | Info _ -> None
344 | StreamError _ -> None
345
346(** {1 JSON Serialization} *)
347
348(** Convert an operation to JSON. Does not include record content - use
349 [add_record_to_op_json] to add it. *)
350let operation_to_json (op : operation) : Atproto_json.t =
351 let action =
352 match op.action with
353 | `Create -> "create"
354 | `Update -> "update"
355 | `Delete -> "delete"
356 in
357 let base =
358 [
359 ("action", Atproto_json.string action);
360 ("path", Atproto_json.string op.path);
361 ]
362 in
363 match op.cid with
364 | Some c ->
365 Atproto_json.object_
366 (base @ [ ("cid", Atproto_json.string (Cid.to_string c)) ])
367 | None -> Atproto_json.object_ base
368
369(** Add a record field to an operation JSON object *)
370let add_record_to_op_json (op_json : Atproto_json.t) (record : Atproto_json.t) :
371 Atproto_json.t =
372 match Atproto_json.to_object_opt op_json with
373 | Some fields -> Atproto_json.object_ (fields @ [ ("record", record) ])
374 | None -> op_json
375
376(** Convert a commit event to JSON. Operations don't include record content. *)
377let commit_event_to_json (evt : commit_event) : Atproto_json.t =
378 let ops = Atproto_json.array (List.map operation_to_json evt.ops) in
379 Atproto_json.object_
380 [
381 ("type", Atproto_json.string "commit");
382 ("seq", Atproto_json.int64 evt.seq);
383 ("repo", Atproto_json.string evt.repo);
384 ("rev", Atproto_json.string evt.rev);
385 ("commit", Atproto_json.string (Cid.to_string evt.commit));
386 ("ops", ops);
387 ]
388
389(** Convert an identity event to JSON *)
390let identity_event_to_json (e : identity_event) : Atproto_json.t =
391 let base =
392 [
393 ("type", Atproto_json.string "identity");
394 ("seq", Atproto_json.int64 e.seq);
395 ("did", Atproto_json.string e.did);
396 ("time", Atproto_json.string e.time);
397 ]
398 in
399 let with_handle =
400 match e.handle with
401 | Some h -> base @ [ ("handle", Atproto_json.string h) ]
402 | None -> base
403 in
404 Atproto_json.object_ with_handle
405
406(** Convert an account event to JSON *)
407let account_event_to_json (e : account_event) : Atproto_json.t =
408 let base =
409 [
410 ("type", Atproto_json.string "account");
411 ("seq", Atproto_json.int64 e.seq);
412 ("did", Atproto_json.string e.did);
413 ("time", Atproto_json.string e.time);
414 ("active", Atproto_json.bool e.active);
415 ]
416 in
417 let with_status =
418 match e.status with
419 | Some s -> base @ [ ("status", Atproto_json.string s) ]
420 | None -> base
421 in
422 Atproto_json.object_ with_status
423
424(** Convert a handle event to JSON *)
425let handle_event_to_json (e : handle_event) : Atproto_json.t =
426 Atproto_json.object_
427 [
428 ("type", Atproto_json.string "handle");
429 ("seq", Atproto_json.int64 e.seq);
430 ("did", Atproto_json.string e.did);
431 ("time", Atproto_json.string e.time);
432 ("handle", Atproto_json.string e.handle);
433 ]
434
435(** Convert a tombstone event to JSON *)
436let tombstone_event_to_json (e : tombstone_event) : Atproto_json.t =
437 Atproto_json.object_
438 [
439 ("type", Atproto_json.string "tombstone");
440 ("seq", Atproto_json.int64 e.seq);
441 ("did", Atproto_json.string e.did);
442 ("time", Atproto_json.string e.time);
443 ]
444
445(** Convert an info message to JSON *)
446let info_message_to_json (m : info_message) : Atproto_json.t =
447 let base =
448 [
449 ("type", Atproto_json.string "info"); ("name", Atproto_json.string m.name);
450 ]
451 in
452 let with_msg =
453 match m.message with
454 | Some msg -> base @ [ ("message", Atproto_json.string msg) ]
455 | None -> base
456 in
457 Atproto_json.object_ with_msg
458
459(** Convert any event to JSON. For commit events, operations don't include
460 record content - extract blocks separately and use [add_record_to_op_json].
461*)
462let event_to_json = function
463 | Commit evt -> commit_event_to_json evt
464 | Identity e -> identity_event_to_json e
465 | Account e -> account_event_to_json e
466 | Handle e -> handle_event_to_json e
467 | Tombstone e -> tombstone_event_to_json e
468 | Info m -> info_message_to_json m
469 | StreamError msg ->
470 Atproto_json.object_
471 [
472 ("type", Atproto_json.string "error");
473 ("message", Atproto_json.string msg);
474 ]
475
476(** Convert an event to a JSON string *)
477let event_to_json_string evt = Atproto_json.encode (event_to_json evt)