atproto libraries implementation in ocaml
at main 10 kB view raw
1(** Repository Synchronization for AT Protocol. 2 3 This module provides repository synchronization functionality for fetching 4 and applying changes between repositories. It works with the firehose for 5 real-time updates and supports incremental sync. 6 7 Sync endpoints: 8 - com.atproto.sync.getRepo: Full repository export as CAR file 9 - com.atproto.sync.getCheckout: Specific commit as CAR file 10 - com.atproto.sync.subscribeRepos: Real-time event stream (firehose) *) 11 12open Atproto_ipld 13 14(** {1 Types} *) 15 16type diff_action = Create | Update | Delete 17 18type diff_entry = { 19 action : diff_action; 20 collection : string; 21 rkey : string; 22 cid : Cid.t option; (** CID of the record (None for deletes) *) 23} 24(** A single change in a repository diff *) 25 26type sync_state = { did : string; rev : string; commit : Cid.t } 27(** Current sync state for a repository *) 28 29type error = 30 | Parse_error of string 31 | Invalid_car of string 32 | Missing_block of Cid.t 33 | Invalid_commit of string 34 | Sync_error of string 35 36let error_to_string = function 37 | Parse_error msg -> Printf.sprintf "Parse error: %s" msg 38 | Invalid_car msg -> Printf.sprintf "Invalid CAR: %s" msg 39 | Missing_block cid -> Printf.sprintf "Missing block: %s" (Cid.to_string cid) 40 | Invalid_commit msg -> Printf.sprintf "Invalid commit: %s" msg 41 | Sync_error msg -> Printf.sprintf "Sync error: %s" msg 42 43(** {1 Firehose Event Processing} *) 44 45(** Extract diff entries from a firehose commit event *) 46let diff_from_commit_event (evt : Firehose.commit_event) : diff_entry list = 47 List.map 48 (fun (op : Firehose.operation) -> 49 let action = 50 match op.action with 51 | `Create -> Create 52 | `Update -> Update 53 | `Delete -> Delete 54 in 55 (* Parse collection/rkey from path *) 56 let collection, rkey = 57 match String.split_on_char '/' op.path with 58 | [ coll; key ] -> (coll, key) 59 | _ -> (op.path, "") 60 in 61 { action; collection; rkey; cid = op.cid }) 62 evt.ops 63 64(** Get sync state from a commit event *) 65let sync_state_from_commit_event (evt : Firehose.commit_event) : sync_state = 66 { did = evt.repo; rev = evt.rev; commit = evt.commit } 67 68(** {1 CAR File Processing} *) 69 70type blockstore = { 71 get : Cid.t -> string option; 72 put : Cid.t -> string -> unit; 73} 74(** Block storage type for sync operations *) 75 76(** Create an in-memory blockstore *) 77let create_memory_blockstore () : blockstore = 78 let blocks = Hashtbl.create 256 in 79 { 80 get = (fun cid -> Hashtbl.find_opt blocks (Cid.to_string cid)); 81 put = (fun cid data -> Hashtbl.replace blocks (Cid.to_string cid) data); 82 } 83 84(** Load blocks from a CAR file into a blockstore *) 85let load_car_blocks (store : blockstore) (car_data : string) : 86 (Cid.t list, error) result = 87 match Car.read car_data with 88 | Error e -> Error (Invalid_car (Car.error_to_string e)) 89 | Ok (header, blocks) -> 90 List.iter 91 (fun (block : Car.block) -> store.put block.cid block.data) 92 blocks; 93 Ok header.roots 94 95(** Extract blocks from a firehose commit event *) 96let load_commit_blocks (store : blockstore) (evt : Firehose.commit_event) : 97 (unit, error) result = 98 if String.length evt.blocks = 0 then Ok () 99 else 100 match Car.read evt.blocks with 101 | Error e -> Error (Invalid_car (Car.error_to_string e)) 102 | Ok (_, blocks) -> 103 List.iter 104 (fun (block : Car.block) -> store.put block.cid block.data) 105 blocks; 106 Ok () 107 108(** {1 Commit Parsing} *) 109 110type commit = { 111 did : string; 112 version : int; 113 data : Cid.t; 114 rev : string; 115 prev : Cid.t option; 116} 117(** Parsed commit object *) 118 119(** Parse a commit from DAG-CBOR *) 120let parse_commit (data : string) : (commit, error) result = 121 match Dag_cbor.decode data with 122 | Error e -> Error (Parse_error (Dag_cbor.error_to_string e)) 123 | Ok cbor -> ( 124 match cbor with 125 | Dag_cbor.Map pairs -> ( 126 let get_string key = 127 match List.assoc_opt key pairs with 128 | Some (Dag_cbor.String s) -> Some s 129 | _ -> None 130 in 131 let get_int key = 132 match List.assoc_opt key pairs with 133 | Some (Dag_cbor.Int i) -> Some (Int64.to_int i) 134 | _ -> None 135 in 136 let get_link key = 137 match List.assoc_opt key pairs with 138 | Some (Dag_cbor.Link cid) -> Some cid 139 | _ -> None 140 in 141 match 142 ( get_string "did", 143 get_int "version", 144 get_link "data", 145 get_string "rev" ) 146 with 147 | Some did, Some version, Some data, Some rev -> 148 Ok { did; version; data; rev; prev = get_link "prev" } 149 | _ -> Error (Invalid_commit "missing required fields")) 150 | _ -> Error (Invalid_commit "expected map")) 151 152(** {1 MST Traversal} *) 153 154type mst_entry = { 155 key : string; (** Full key: collection/rkey *) 156 value : Cid.t; 157 tree : Cid.t option; (** Subtree pointer *) 158} 159(** MST node structure (simplified for sync) *) 160 161(** Parse an MST node from DAG-CBOR *) 162let parse_mst_node (data : string) : 163 (mst_entry list * Cid.t option, error) result = 164 match Dag_cbor.decode data with 165 | Error e -> Error (Parse_error (Dag_cbor.error_to_string e)) 166 | Ok cbor -> ( 167 match cbor with 168 | Dag_cbor.Map pairs -> 169 let left_ptr = 170 match List.assoc_opt "l" pairs with 171 | Some (Dag_cbor.Link cid) -> Some cid 172 | _ -> None 173 in 174 let entries = 175 match List.assoc_opt "e" pairs with 176 | Some (Dag_cbor.Array items) -> 177 List.filter_map 178 (fun item -> 179 match item with 180 | Dag_cbor.Map epairs -> ( 181 let prefix_len = 182 match List.assoc_opt "p" epairs with 183 | Some (Dag_cbor.Int i) -> Int64.to_int i 184 | _ -> 0 185 in 186 let key_suffix = 187 match List.assoc_opt "k" epairs with 188 | Some (Dag_cbor.Bytes s) -> s 189 | _ -> "" 190 in 191 let value = 192 match List.assoc_opt "v" epairs with 193 | Some (Dag_cbor.Link cid) -> Some cid 194 | _ -> None 195 in 196 let tree = 197 match List.assoc_opt "t" epairs with 198 | Some (Dag_cbor.Link cid) -> Some cid 199 | _ -> None 200 in 201 match value with 202 | Some v -> 203 (* For now, just use suffix as key - full key reconstruction 204 would need tracking prefix from previous entries *) 205 Some 206 { 207 key = 208 Printf.sprintf "%d:%s" prefix_len key_suffix; 209 value = v; 210 tree; 211 } 212 | None -> None) 213 | _ -> None) 214 items 215 | _ -> [] 216 in 217 Ok (entries, left_ptr) 218 | _ -> Error (Parse_error "expected MST node map")) 219 220(** Collect all record CIDs from an MST by traversing it *) 221let collect_mst_records (store : blockstore) (root : Cid.t) : 222 (string * Cid.t) list = 223 let rec traverse cid acc = 224 match store.get cid with 225 | None -> acc 226 | Some data -> ( 227 match parse_mst_node data with 228 | Error _ -> acc 229 | Ok (entries, left_ptr) -> 230 (* Traverse left subtree first *) 231 let acc = 232 match left_ptr with Some left -> traverse left acc | None -> acc 233 in 234 (* Add entries and traverse their subtrees *) 235 List.fold_left 236 (fun acc entry -> 237 let acc = (entry.key, entry.value) :: acc in 238 match entry.tree with 239 | Some tree -> traverse tree acc 240 | None -> acc) 241 acc entries) 242 in 243 List.rev (traverse root []) 244 245(** {1 Sync Operations} *) 246 247type apply_result = { 248 applied : int; 249 skipped : int; 250 errors : (diff_entry * string) list; 251} 252(** Apply a diff entry to update local state. This is a placeholder - actual 253 implementation would update a local repo. *) 254 255let apply_diff ~(store : blockstore) 256 ~(on_record : diff_entry -> string option -> unit) (diff : diff_entry list) 257 : apply_result = 258 let applied = ref 0 in 259 let skipped = ref 0 in 260 let errors = ref [] in 261 List.iter 262 (fun entry -> 263 match (entry.action, entry.cid) with 264 | Delete, _ -> 265 on_record entry None; 266 incr applied 267 | (Create | Update), Some cid -> ( 268 match store.get cid with 269 | Some data -> 270 on_record entry (Some data); 271 incr applied 272 | None -> 273 errors := (entry, "missing block") :: !errors; 274 incr skipped) 275 | (Create | Update), None -> incr skipped) 276 diff; 277 { applied = !applied; skipped = !skipped; errors = List.rev !errors } 278 279(** Process a firehose commit event, loading blocks and extracting diff *) 280let process_commit_event ~(store : blockstore) (evt : Firehose.commit_event) : 281 (diff_entry list, error) result = 282 match load_commit_blocks store evt with 283 | Error e -> Error e 284 | Ok () -> Ok (diff_from_commit_event evt) 285 286(** {1 Cursor Management} *) 287 288type cursor = { seq : int64; timestamp : string option } 289(** Firehose cursor for resuming sync *) 290 291let cursor_of_event (evt : Firehose.event) : cursor option = 292 match Firehose.event_seq evt with 293 | Some seq -> Some { seq; timestamp = None } 294 | None -> None 295 296let cursor_to_string (c : cursor) : string = Int64.to_string c.seq 297 298let cursor_of_string (s : string) : cursor option = 299 match Int64.of_string_opt s with 300 | Some seq -> Some { seq; timestamp = None } 301 | None -> None