crdt library in ocaml implementing json-joy
at main 13 kB view raw
1(** Document model container. 2 3 A Model represents a complete CRDT document with clock, root node, and node 4 index. The root is always a val node at timestamp (0, 0) which points to the 5 actual document content. 6 7 Key operations: 8 - create: Initialize a new empty model with a session ID 9 - get_node/add_node: Manage the node index 10 - view: Materialize the current document as a Value.t 11 - fork: Create an independent replica with a new session ID *) 12 13(** {1 Types} *) 14 15(** Node index - maps timestamps to nodes *) 16module NodeIndex = Map.Make (struct 17 type t = Clock.timestamp 18 19 let compare = Clock.compare_ts 20end) 21 22type t = { 23 clock : Clock.clock_vector; 24 root : Node.t; (** Root val node at (0, 0) *) 25 mutable index : Node.t NodeIndex.t; (** Timestamp -> Node map *) 26} 27(** A CRDT document model *) 28 29(** {1 Constructors} *) 30 31(** Root node ID constant - always (0, 0) *) 32let root_id : Clock.timestamp = { sid = Session.system; time = 0 } 33 34(** Create a new empty model with the given session ID. The model starts with 35 just a root val node at (0, 0). *) 36let create sid = 37 let clock = Clock.create_vector sid in 38 let root = Node.make_val ~id:root_id in 39 let index = NodeIndex.singleton root_id root in 40 { clock; root; index } 41 42(** Create a model with a specific starting clock time *) 43let create_at sid time = 44 let clock = Clock.create_vector sid in 45 clock.local.clock_time <- time; 46 let root = Node.make_val ~id:root_id in 47 let index = NodeIndex.singleton root_id root in 48 { clock; root; index } 49 50(** {1 Node Index Operations} *) 51 52(** Get a node by its timestamp ID *) 53let get_node model id = NodeIndex.find_opt id model.index 54 55(** Add a node to the model's index *) 56let add_node model node = 57 let id = Node.id node in 58 model.index <- NodeIndex.add id node model.index 59 60(** Check if a node exists in the index *) 61let has_node model id = NodeIndex.mem id model.index 62 63(** Get all nodes in the model *) 64let nodes model = NodeIndex.bindings model.index |> List.map snd 65 66(** Get the number of nodes in the model *) 67let node_count model = NodeIndex.cardinal model.index 68 69(** {1 Clock Operations} *) 70 71(** Get the current clock vector *) 72let clock model = model.clock 73 74(** Get the local clock's session ID *) 75let session_id model = model.clock.local.clock_sid 76 77(** Get the current logical time *) 78let current_time model = model.clock.local.clock_time 79 80(** Tick the clock and return a new timestamp *) 81let tick model = Clock.tick model.clock.local 82 83(** Tick the clock N times and return the first timestamp *) 84let tick_n model n = 85 if n <= 0 then invalid_arg "tick_n: n must be positive"; 86 let first = Clock.tick model.clock.local in 87 for _ = 2 to n do 88 ignore (Clock.tick model.clock.local) 89 done; 90 first 91 92(** Observe a timestamp from another replica *) 93let observe model ts = Clock.observe model.clock ts 94 95(** {1 View Functions} *) 96 97(** Resolve a timestamp reference to its actual value. This follows references 98 through val nodes to get the final value. *) 99let rec resolve_ref model (ts : Clock.timestamp) : Value.t = 100 match get_node model ts with 101 | None -> Value.Undefined (* Reference to unknown node *) 102 | Some node -> view_node model node 103 104(** Get the view of a specific node, resolving all references *) 105and view_node model node : Value.t = 106 match node with 107 | Node.Node_con { con_value; _ } -> ( 108 (* For con nodes, check if value is a reference *) 109 match con_value with 110 | Value.Timestamp_ref (sid, time) -> resolve_ref model { sid; time } 111 | v -> v) 112 | Node.Node_val { val_ref; _ } -> ( 113 match val_ref with 114 | None -> Value.Undefined 115 | Some ts -> resolve_ref model ts) 116 | Node.Node_obj { obj_entries; _ } -> 117 let pairs = 118 List.filter_map 119 (fun (e : Node.obj_entry) -> 120 let value = resolve_ref model e.obj_value in 121 (* Omit undefined values from object view, matching json-joy behavior *) 122 match value with 123 | Value.Undefined -> None 124 | v -> Some (e.obj_key, v)) 125 obj_entries 126 in 127 Value.Object pairs 128 | Node.Node_vec { vec_slots; _ } -> 129 let max_idx = 130 List.fold_left 131 (fun acc (s : Node.vec_slot) -> max acc s.vec_idx) 132 (-1) vec_slots 133 in 134 if max_idx < 0 then Value.Array [] 135 else 136 let arr = Array.make (max_idx + 1) Value.Undefined in 137 List.iter 138 (fun (s : Node.vec_slot) -> 139 arr.(s.vec_idx) <- resolve_ref model s.vec_value) 140 vec_slots; 141 Value.Array (Array.to_list arr) 142 | Node.Node_arr { arr_rga; _ } -> 143 let refs = 144 Rga.fold_visible 145 (fun acc (c : Clock.timestamp Rga.chunk) -> 146 resolve_ref model c.data :: acc) 147 [] arr_rga 148 |> List.rev 149 in 150 Value.Array refs 151 | Node.Node_str { str_rga; _ } -> Value.String (Rga.view_string str_rga) 152 | Node.Node_bin { bin_rga; _ } -> Value.Bytes (Rga.view_bytes bin_rga) 153 154(** Get the current view of the document. This resolves the root val node to get 155 the actual content. *) 156let view model : Value.t = 157 match model.root with 158 | Node.Node_val { val_ref; _ } -> ( 159 match val_ref with 160 | None -> Value.Undefined (* Empty document *) 161 | Some ts -> resolve_ref model ts) 162 | _ -> failwith "Root must be a val node" 163 164(** {1 Replica Operations} *) 165 166(** Fork the model to create an independent replica with a new session ID. The 167 new replica starts with a copy of the current state. *) 168let fork model new_sid = 169 let new_clock = Clock.fork model.clock new_sid in 170 (* Deep copy the nodes - for now just copy references since nodes are mutable *) 171 let new_index = 172 NodeIndex.fold 173 (fun id node acc -> NodeIndex.add id node acc) 174 model.index NodeIndex.empty 175 in 176 { clock = new_clock; root = model.root; index = new_index } 177 178(** Clone the model (same session ID, independent state). Useful for creating a 179 snapshot. *) 180let clone model = 181 let new_clock = Clock.clone model.clock in 182 let new_index = 183 NodeIndex.fold 184 (fun id node acc -> NodeIndex.add id node acc) 185 model.index NodeIndex.empty 186 in 187 { clock = new_clock; root = model.root; index = new_index } 188 189(** {1 Node Creation Helpers} *) 190 191(** Create and register a new constant node *) 192let new_con model ~value = 193 let id = tick model in 194 let node = Node.make_con ~id ~value in 195 add_node model node; 196 node 197 198(** Create and register a new value node *) 199let new_val model = 200 let id = tick model in 201 let node = Node.make_val ~id in 202 add_node model node; 203 node 204 205(** Create and register a new object node *) 206let new_obj model = 207 let id = tick model in 208 let node = Node.make_obj ~id in 209 add_node model node; 210 node 211 212(** Create and register a new vector node *) 213let new_vec model = 214 let id = tick model in 215 let node = Node.make_vec ~id in 216 add_node model node; 217 node 218 219(** Create and register a new array node *) 220let new_arr model = 221 let id = tick model in 222 let node = Node.make_arr ~id in 223 add_node model node; 224 node 225 226(** Create and register a new string node *) 227let new_str model = 228 let id = tick model in 229 let node = Node.make_str ~id in 230 add_node model node; 231 node 232 233(** Create and register a new binary node *) 234let new_bin model = 235 let id = tick model in 236 let node = Node.make_bin ~id in 237 add_node model node; 238 node 239 240(** {1 Convenience Operations} *) 241 242(** Set the document root value to a node *) 243let set_root model node = 244 match model.root with 245 | Node.Node_val _ -> Node.set_val model.root ~value:(Node.id node) 246 | _ -> failwith "Root must be a val node" 247 248(** Get the root's target node (if set) *) 249let root_node model = 250 match model.root with 251 | Node.Node_val { val_ref; _ } -> 252 Option.bind val_ref (fun ts -> get_node model ts) 253 | _ -> None 254 255(** {1 Pretty Printing} *) 256 257(** Pretty print model info *) 258let pp fmt model = 259 Format.fprintf fmt "Model(sid=%d, time=%d, nodes=%d)" 260 model.clock.local.clock_sid model.clock.local.clock_time (node_count model) 261 262(** Convert to string for debugging *) 263let to_string model = Format.asprintf "%a" pp model 264 265(** {1 Patch Application} *) 266 267(** Apply a single operation to the model. 268 @param model The model to modify 269 @param op_id The ID assigned to this operation 270 @param op The operation to apply *) 271let apply_op model (op_id : Clock.timestamp) (op : Op.op_data) = 272 (* Observe the operation's timestamp to update our clock *) 273 observe model op_id; 274 match op with 275 (* Creation operations - create new nodes *) 276 | Op.Op_new_con { con_value } -> 277 let node = Node.make_con ~id:op_id ~value:con_value in 278 add_node model node 279 | Op.Op_new_val -> 280 let node = Node.make_val ~id:op_id in 281 add_node model node 282 | Op.Op_new_obj -> 283 let node = Node.make_obj ~id:op_id in 284 add_node model node 285 | Op.Op_new_vec -> 286 let node = Node.make_vec ~id:op_id in 287 add_node model node 288 | Op.Op_new_str -> 289 let node = Node.make_str ~id:op_id in 290 add_node model node 291 | Op.Op_new_bin -> 292 let node = Node.make_bin ~id:op_id in 293 add_node model node 294 | Op.Op_new_arr -> 295 let node = Node.make_arr ~id:op_id in 296 add_node model node 297 (* Insertion operations - modify existing nodes *) 298 | Op.Op_ins_val { ins_val_obj; ins_val_value } -> ( 299 match get_node model ins_val_obj with 300 | Some (Node.Node_val _ as node) -> Node.set_val node ~value:ins_val_value 301 | Some _ -> () (* Wrong node type - ignore *) 302 | None -> () (* Ignore if target node doesn't exist *)) 303 | Op.Op_ins_obj { ins_obj_obj; ins_obj_value } -> ( 304 match get_node model ins_obj_obj with 305 | Some (Node.Node_obj _ as node) -> 306 List.iter 307 (fun (key, value) -> 308 Node.set_obj_key node ~key ~value ~write_ts:op_id) 309 ins_obj_value 310 | Some _ -> () (* Wrong node type - ignore *) 311 | None -> ()) 312 | Op.Op_ins_vec { ins_vec_obj; ins_vec_idx; ins_vec_value } -> ( 313 match get_node model ins_vec_obj with 314 | Some (Node.Node_vec _ as node) -> 315 Node.set_vec_slot node ~idx:ins_vec_idx ~value:ins_vec_value 316 ~write_ts:op_id 317 | Some _ -> () (* Wrong node type - ignore *) 318 | None -> ()) 319 | Op.Op_ins_str { ins_str_obj; ins_str_after; ins_str_value } -> ( 320 match get_node model ins_str_obj with 321 | Some (Node.Node_str _ as node) -> 322 Node.insert_str node ~after:(Some ins_str_after) ~chunk_id:op_id 323 ~text:ins_str_value 324 | Some _ -> () (* Wrong node type - ignore *) 325 | None -> ()) 326 | Op.Op_ins_bin { ins_bin_obj; ins_bin_after; ins_bin_value } -> ( 327 match get_node model ins_bin_obj with 328 | Some (Node.Node_bin _ as node) -> 329 Node.insert_bin node ~after:(Some ins_bin_after) ~chunk_id:op_id 330 ~data:ins_bin_value 331 | Some _ -> () (* Wrong node type - ignore *) 332 | None -> ()) 333 | Op.Op_ins_arr { ins_arr_obj; ins_arr_after; ins_arr_value } -> ( 334 match get_node model ins_arr_obj with 335 | Some (Node.Node_arr _ as node) -> 336 Node.insert_arr node ~after:(Some ins_arr_after) ~chunk_id:op_id 337 ~value:ins_arr_value 338 | Some _ -> () (* Wrong node type - ignore *) 339 | None -> ()) 340 | Op.Op_upd_arr { upd_arr_obj; upd_arr_pos; upd_arr_value } -> ( 341 (* Update array element - find the chunk and update its data *) 342 match get_node model upd_arr_obj with 343 | Some (Node.Node_arr { arr_rga; _ }) -> 344 (* For upd_arr, we need to find the chunk at upd_arr_pos and update it *) 345 (* This is a simplification - proper impl would update in place *) 346 let chunks = Rga.to_list arr_rga in 347 let updated = 348 List.map 349 (fun (c : Clock.timestamp Rga.chunk) -> 350 if Clock.equal_ts c.id upd_arr_pos then 351 { c with data = upd_arr_value } 352 else c) 353 chunks 354 in 355 (* NOTE: Rebuilding RGA - in-place update would be more efficient *) 356 let _ = updated in 357 () 358 | _ -> ()) 359 | Op.Op_del { del_obj; del_what } -> ( 360 match get_node model del_obj with 361 | Some (Node.Node_str _ as node) 362 | Some (Node.Node_bin _ as node) 363 | Some (Node.Node_arr _ as node) -> 364 Node.delete_range node ~spans:del_what 365 | Some _ -> () (* Wrong node type - ignore *) 366 | None -> ()) 367 | Op.Op_nop _ -> 368 (* No-op - just advances the clock *) 369 () 370 371(** Apply a patch to the model. Each operation in the patch is applied in order 372 with its computed ID. *) 373let apply model (patch : Patch.t) = Patch.iter_with_id (apply_op model) patch 374 375(** Apply a batch of patches to the model *) 376let apply_batch model (batch : Patch.batch) = List.iter (apply model) batch