(** Repository Synchronization for AT Protocol. This module provides repository synchronization functionality for fetching and applying changes between repositories. It works with the firehose for real-time updates and supports incremental sync. Sync endpoints: - com.atproto.sync.getRepo: Full repository export as CAR file - com.atproto.sync.getCheckout: Specific commit as CAR file - com.atproto.sync.subscribeRepos: Real-time event stream (firehose) *) open Atproto_ipld (** {1 Types} *) type diff_action = Create | Update | Delete type diff_entry = { action : diff_action; collection : string; rkey : string; cid : Cid.t option; (** CID of the record (None for deletes) *) } (** A single change in a repository diff *) type sync_state = { did : string; rev : string; commit : Cid.t } (** Current sync state for a repository *) type error = | Parse_error of string | Invalid_car of string | Missing_block of Cid.t | Invalid_commit of string | Sync_error of string let error_to_string = function | Parse_error msg -> Printf.sprintf "Parse error: %s" msg | Invalid_car msg -> Printf.sprintf "Invalid CAR: %s" msg | Missing_block cid -> Printf.sprintf "Missing block: %s" (Cid.to_string cid) | Invalid_commit msg -> Printf.sprintf "Invalid commit: %s" msg | Sync_error msg -> Printf.sprintf "Sync error: %s" msg (** {1 Firehose Event Processing} *) (** Extract diff entries from a firehose commit event *) let diff_from_commit_event (evt : Firehose.commit_event) : diff_entry list = List.map (fun (op : Firehose.operation) -> let action = match op.action with | `Create -> Create | `Update -> Update | `Delete -> Delete in (* Parse collection/rkey from path *) let collection, rkey = match String.split_on_char '/' op.path with | [ coll; key ] -> (coll, key) | _ -> (op.path, "") in { action; collection; rkey; cid = op.cid }) evt.ops (** Get sync state from a commit event *) let sync_state_from_commit_event (evt : Firehose.commit_event) : sync_state = { did = evt.repo; rev = evt.rev; commit = evt.commit } (** {1 CAR File Processing} *) type blockstore = { get : Cid.t -> string option; put : Cid.t -> string -> unit; } (** Block storage type for sync operations *) (** Create an in-memory blockstore *) let create_memory_blockstore () : blockstore = let blocks = Hashtbl.create 256 in { get = (fun cid -> Hashtbl.find_opt blocks (Cid.to_string cid)); put = (fun cid data -> Hashtbl.replace blocks (Cid.to_string cid) data); } (** Load blocks from a CAR file into a blockstore *) let load_car_blocks (store : blockstore) (car_data : string) : (Cid.t list, error) result = match Car.read car_data with | Error e -> Error (Invalid_car (Car.error_to_string e)) | Ok (header, blocks) -> List.iter (fun (block : Car.block) -> store.put block.cid block.data) blocks; Ok header.roots (** Extract blocks from a firehose commit event *) let load_commit_blocks (store : blockstore) (evt : Firehose.commit_event) : (unit, error) result = if String.length evt.blocks = 0 then Ok () else match Car.read evt.blocks with | Error e -> Error (Invalid_car (Car.error_to_string e)) | Ok (_, blocks) -> List.iter (fun (block : Car.block) -> store.put block.cid block.data) blocks; Ok () (** {1 Commit Parsing} *) type commit = { did : string; version : int; data : Cid.t; rev : string; prev : Cid.t option; } (** Parsed commit object *) (** Parse a commit from DAG-CBOR *) let parse_commit (data : string) : (commit, error) result = match Dag_cbor.decode data with | Error e -> Error (Parse_error (Dag_cbor.error_to_string e)) | Ok cbor -> ( match cbor with | Dag_cbor.Map pairs -> ( let get_string key = match List.assoc_opt key pairs with | Some (Dag_cbor.String s) -> Some s | _ -> None in let get_int key = match List.assoc_opt key pairs with | Some (Dag_cbor.Int i) -> Some (Int64.to_int i) | _ -> None in let get_link key = match List.assoc_opt key pairs with | Some (Dag_cbor.Link cid) -> Some cid | _ -> None in match ( get_string "did", get_int "version", get_link "data", get_string "rev" ) with | Some did, Some version, Some data, Some rev -> Ok { did; version; data; rev; prev = get_link "prev" } | _ -> Error (Invalid_commit "missing required fields")) | _ -> Error (Invalid_commit "expected map")) (** {1 MST Traversal} *) type mst_entry = { key : string; (** Full key: collection/rkey *) value : Cid.t; tree : Cid.t option; (** Subtree pointer *) } (** MST node structure (simplified for sync) *) (** Parse an MST node from DAG-CBOR *) let parse_mst_node (data : string) : (mst_entry list * Cid.t option, error) result = match Dag_cbor.decode data with | Error e -> Error (Parse_error (Dag_cbor.error_to_string e)) | Ok cbor -> ( match cbor with | Dag_cbor.Map pairs -> let left_ptr = match List.assoc_opt "l" pairs with | Some (Dag_cbor.Link cid) -> Some cid | _ -> None in let entries = match List.assoc_opt "e" pairs with | Some (Dag_cbor.Array items) -> List.filter_map (fun item -> match item with | Dag_cbor.Map epairs -> ( let prefix_len = match List.assoc_opt "p" epairs with | Some (Dag_cbor.Int i) -> Int64.to_int i | _ -> 0 in let key_suffix = match List.assoc_opt "k" epairs with | Some (Dag_cbor.Bytes s) -> s | _ -> "" in let value = match List.assoc_opt "v" epairs with | Some (Dag_cbor.Link cid) -> Some cid | _ -> None in let tree = match List.assoc_opt "t" epairs with | Some (Dag_cbor.Link cid) -> Some cid | _ -> None in match value with | Some v -> (* For now, just use suffix as key - full key reconstruction would need tracking prefix from previous entries *) Some { key = Printf.sprintf "%d:%s" prefix_len key_suffix; value = v; tree; } | None -> None) | _ -> None) items | _ -> [] in Ok (entries, left_ptr) | _ -> Error (Parse_error "expected MST node map")) (** Collect all record CIDs from an MST by traversing it *) let collect_mst_records (store : blockstore) (root : Cid.t) : (string * Cid.t) list = let rec traverse cid acc = match store.get cid with | None -> acc | Some data -> ( match parse_mst_node data with | Error _ -> acc | Ok (entries, left_ptr) -> (* Traverse left subtree first *) let acc = match left_ptr with Some left -> traverse left acc | None -> acc in (* Add entries and traverse their subtrees *) List.fold_left (fun acc entry -> let acc = (entry.key, entry.value) :: acc in match entry.tree with | Some tree -> traverse tree acc | None -> acc) acc entries) in List.rev (traverse root []) (** {1 Sync Operations} *) type apply_result = { applied : int; skipped : int; errors : (diff_entry * string) list; } (** Apply a diff entry to update local state. This is a placeholder - actual implementation would update a local repo. *) let apply_diff ~(store : blockstore) ~(on_record : diff_entry -> string option -> unit) (diff : diff_entry list) : apply_result = let applied = ref 0 in let skipped = ref 0 in let errors = ref [] in List.iter (fun entry -> match (entry.action, entry.cid) with | Delete, _ -> on_record entry None; incr applied | (Create | Update), Some cid -> ( match store.get cid with | Some data -> on_record entry (Some data); incr applied | None -> errors := (entry, "missing block") :: !errors; incr skipped) | (Create | Update), None -> incr skipped) diff; { applied = !applied; skipped = !skipped; errors = List.rev !errors } (** Process a firehose commit event, loading blocks and extracting diff *) let process_commit_event ~(store : blockstore) (evt : Firehose.commit_event) : (diff_entry list, error) result = match load_commit_blocks store evt with | Error e -> Error e | Ok () -> Ok (diff_from_commit_event evt) (** {1 Cursor Management} *) type cursor = { seq : int64; timestamp : string option } (** Firehose cursor for resuming sync *) let cursor_of_event (evt : Firehose.event) : cursor option = match Firehose.event_seq evt with | Some seq -> Some { seq; timestamp = None } | None -> None let cursor_to_string (c : cursor) : string = Int64.to_string c.seq let cursor_of_string (s : string) : cursor option = match Int64.of_string_opt s with | Some seq -> Some { seq; timestamp = None } | None -> None