atproto libraries implementation in ocaml
1(** Repository Synchronization for AT Protocol.
2
3 This module provides repository synchronization functionality for fetching
4 and applying changes between repositories. It works with the firehose for
5 real-time updates and supports incremental sync.
6
7 Sync endpoints:
8 - com.atproto.sync.getRepo: Full repository export as CAR file
9 - com.atproto.sync.getCheckout: Specific commit as CAR file
10 - com.atproto.sync.subscribeRepos: Real-time event stream (firehose) *)
11
12open Atproto_ipld
13
14(** {1 Types} *)
15
16type diff_action = Create | Update | Delete
17
18type diff_entry = {
19 action : diff_action;
20 collection : string;
21 rkey : string;
22 cid : Cid.t option; (** CID of the record (None for deletes) *)
23}
24(** A single change in a repository diff *)
25
26type sync_state = { did : string; rev : string; commit : Cid.t }
27(** Current sync state for a repository *)
28
29type error =
30 | Parse_error of string
31 | Invalid_car of string
32 | Missing_block of Cid.t
33 | Invalid_commit of string
34 | Sync_error of string
35
36let error_to_string = function
37 | Parse_error msg -> Printf.sprintf "Parse error: %s" msg
38 | Invalid_car msg -> Printf.sprintf "Invalid CAR: %s" msg
39 | Missing_block cid -> Printf.sprintf "Missing block: %s" (Cid.to_string cid)
40 | Invalid_commit msg -> Printf.sprintf "Invalid commit: %s" msg
41 | Sync_error msg -> Printf.sprintf "Sync error: %s" msg
42
43(** {1 Firehose Event Processing} *)
44
45(** Extract diff entries from a firehose commit event *)
46let diff_from_commit_event (evt : Firehose.commit_event) : diff_entry list =
47 List.map
48 (fun (op : Firehose.operation) ->
49 let action =
50 match op.action with
51 | `Create -> Create
52 | `Update -> Update
53 | `Delete -> Delete
54 in
55 (* Parse collection/rkey from path *)
56 let collection, rkey =
57 match String.split_on_char '/' op.path with
58 | [ coll; key ] -> (coll, key)
59 | _ -> (op.path, "")
60 in
61 { action; collection; rkey; cid = op.cid })
62 evt.ops
63
64(** Get sync state from a commit event *)
65let sync_state_from_commit_event (evt : Firehose.commit_event) : sync_state =
66 { did = evt.repo; rev = evt.rev; commit = evt.commit }
67
68(** {1 CAR File Processing} *)
69
70type blockstore = {
71 get : Cid.t -> string option;
72 put : Cid.t -> string -> unit;
73}
74(** Block storage type for sync operations *)
75
76(** Create an in-memory blockstore *)
77let create_memory_blockstore () : blockstore =
78 let blocks = Hashtbl.create 256 in
79 {
80 get = (fun cid -> Hashtbl.find_opt blocks (Cid.to_string cid));
81 put = (fun cid data -> Hashtbl.replace blocks (Cid.to_string cid) data);
82 }
83
84(** Load blocks from a CAR file into a blockstore *)
85let load_car_blocks (store : blockstore) (car_data : string) :
86 (Cid.t list, error) result =
87 match Car.read car_data with
88 | Error e -> Error (Invalid_car (Car.error_to_string e))
89 | Ok (header, blocks) ->
90 List.iter
91 (fun (block : Car.block) -> store.put block.cid block.data)
92 blocks;
93 Ok header.roots
94
95(** Extract blocks from a firehose commit event *)
96let load_commit_blocks (store : blockstore) (evt : Firehose.commit_event) :
97 (unit, error) result =
98 if String.length evt.blocks = 0 then Ok ()
99 else
100 match Car.read evt.blocks with
101 | Error e -> Error (Invalid_car (Car.error_to_string e))
102 | Ok (_, blocks) ->
103 List.iter
104 (fun (block : Car.block) -> store.put block.cid block.data)
105 blocks;
106 Ok ()
107
108(** {1 Commit Parsing} *)
109
110type commit = {
111 did : string;
112 version : int;
113 data : Cid.t;
114 rev : string;
115 prev : Cid.t option;
116}
117(** Parsed commit object *)
118
119(** Parse a commit from DAG-CBOR *)
120let parse_commit (data : string) : (commit, error) result =
121 match Dag_cbor.decode data with
122 | Error e -> Error (Parse_error (Dag_cbor.error_to_string e))
123 | Ok cbor -> (
124 match cbor with
125 | Dag_cbor.Map pairs -> (
126 let get_string key =
127 match List.assoc_opt key pairs with
128 | Some (Dag_cbor.String s) -> Some s
129 | _ -> None
130 in
131 let get_int key =
132 match List.assoc_opt key pairs with
133 | Some (Dag_cbor.Int i) -> Some (Int64.to_int i)
134 | _ -> None
135 in
136 let get_link key =
137 match List.assoc_opt key pairs with
138 | Some (Dag_cbor.Link cid) -> Some cid
139 | _ -> None
140 in
141 match
142 ( get_string "did",
143 get_int "version",
144 get_link "data",
145 get_string "rev" )
146 with
147 | Some did, Some version, Some data, Some rev ->
148 Ok { did; version; data; rev; prev = get_link "prev" }
149 | _ -> Error (Invalid_commit "missing required fields"))
150 | _ -> Error (Invalid_commit "expected map"))
151
152(** {1 MST Traversal} *)
153
154type mst_entry = {
155 key : string; (** Full key: collection/rkey *)
156 value : Cid.t;
157 tree : Cid.t option; (** Subtree pointer *)
158}
159(** MST node structure (simplified for sync) *)
160
161(** Parse an MST node from DAG-CBOR *)
162let parse_mst_node (data : string) :
163 (mst_entry list * Cid.t option, error) result =
164 match Dag_cbor.decode data with
165 | Error e -> Error (Parse_error (Dag_cbor.error_to_string e))
166 | Ok cbor -> (
167 match cbor with
168 | Dag_cbor.Map pairs ->
169 let left_ptr =
170 match List.assoc_opt "l" pairs with
171 | Some (Dag_cbor.Link cid) -> Some cid
172 | _ -> None
173 in
174 let entries =
175 match List.assoc_opt "e" pairs with
176 | Some (Dag_cbor.Array items) ->
177 List.filter_map
178 (fun item ->
179 match item with
180 | Dag_cbor.Map epairs -> (
181 let prefix_len =
182 match List.assoc_opt "p" epairs with
183 | Some (Dag_cbor.Int i) -> Int64.to_int i
184 | _ -> 0
185 in
186 let key_suffix =
187 match List.assoc_opt "k" epairs with
188 | Some (Dag_cbor.Bytes s) -> s
189 | _ -> ""
190 in
191 let value =
192 match List.assoc_opt "v" epairs with
193 | Some (Dag_cbor.Link cid) -> Some cid
194 | _ -> None
195 in
196 let tree =
197 match List.assoc_opt "t" epairs with
198 | Some (Dag_cbor.Link cid) -> Some cid
199 | _ -> None
200 in
201 match value with
202 | Some v ->
203 (* For now, just use suffix as key - full key reconstruction
204 would need tracking prefix from previous entries *)
205 Some
206 {
207 key =
208 Printf.sprintf "%d:%s" prefix_len key_suffix;
209 value = v;
210 tree;
211 }
212 | None -> None)
213 | _ -> None)
214 items
215 | _ -> []
216 in
217 Ok (entries, left_ptr)
218 | _ -> Error (Parse_error "expected MST node map"))
219
220(** Collect all record CIDs from an MST by traversing it *)
221let collect_mst_records (store : blockstore) (root : Cid.t) :
222 (string * Cid.t) list =
223 let rec traverse cid acc =
224 match store.get cid with
225 | None -> acc
226 | Some data -> (
227 match parse_mst_node data with
228 | Error _ -> acc
229 | Ok (entries, left_ptr) ->
230 (* Traverse left subtree first *)
231 let acc =
232 match left_ptr with Some left -> traverse left acc | None -> acc
233 in
234 (* Add entries and traverse their subtrees *)
235 List.fold_left
236 (fun acc entry ->
237 let acc = (entry.key, entry.value) :: acc in
238 match entry.tree with
239 | Some tree -> traverse tree acc
240 | None -> acc)
241 acc entries)
242 in
243 List.rev (traverse root [])
244
245(** {1 Sync Operations} *)
246
247type apply_result = {
248 applied : int;
249 skipped : int;
250 errors : (diff_entry * string) list;
251}
252(** Apply a diff entry to update local state. This is a placeholder - actual
253 implementation would update a local repo. *)
254
255let apply_diff ~(store : blockstore)
256 ~(on_record : diff_entry -> string option -> unit) (diff : diff_entry list)
257 : apply_result =
258 let applied = ref 0 in
259 let skipped = ref 0 in
260 let errors = ref [] in
261 List.iter
262 (fun entry ->
263 match (entry.action, entry.cid) with
264 | Delete, _ ->
265 on_record entry None;
266 incr applied
267 | (Create | Update), Some cid -> (
268 match store.get cid with
269 | Some data ->
270 on_record entry (Some data);
271 incr applied
272 | None ->
273 errors := (entry, "missing block") :: !errors;
274 incr skipped)
275 | (Create | Update), None -> incr skipped)
276 diff;
277 { applied = !applied; skipped = !skipped; errors = List.rev !errors }
278
279(** Process a firehose commit event, loading blocks and extracting diff *)
280let process_commit_event ~(store : blockstore) (evt : Firehose.commit_event) :
281 (diff_entry list, error) result =
282 match load_commit_blocks store evt with
283 | Error e -> Error e
284 | Ok () -> Ok (diff_from_commit_event evt)
285
286(** {1 Cursor Management} *)
287
288type cursor = { seq : int64; timestamp : string option }
289(** Firehose cursor for resuming sync *)
290
291let cursor_of_event (evt : Firehose.event) : cursor option =
292 match Firehose.event_seq evt with
293 | Some seq -> Some { seq; timestamp = None }
294 | None -> None
295
296let cursor_to_string (c : cursor) : string = Int64.to_string c.seq
297
298let cursor_of_string (s : string) : cursor option =
299 match Int64.of_string_opt s with
300 | Some seq -> Some { seq; timestamp = None }
301 | None -> None