crdt library in ocaml implementing json-joy
1(** Document model container.
2
3 A Model represents a complete CRDT document with clock, root node, and node
4 index. The root is always a val node at timestamp (0, 0) which points to the
5 actual document content.
6
7 Key operations:
8 - create: Initialize a new empty model with a session ID
9 - get_node/add_node: Manage the node index
10 - view: Materialize the current document as a Value.t
11 - fork: Create an independent replica with a new session ID *)
12
13(** {1 Types} *)
14
15(** Node index - maps timestamps to nodes *)
16module NodeIndex = Map.Make (struct
17 type t = Clock.timestamp
18
19 let compare = Clock.compare_ts
20end)
21
22type t = {
23 clock : Clock.clock_vector;
24 root : Node.t; (** Root val node at (0, 0) *)
25 mutable index : Node.t NodeIndex.t; (** Timestamp -> Node map *)
26}
27(** A CRDT document model *)
28
29(** {1 Constructors} *)
30
31(** Root node ID constant - always (0, 0) *)
32let root_id : Clock.timestamp = { sid = Session.system; time = 0 }
33
34(** Create a new empty model with the given session ID. The model starts with
35 just a root val node at (0, 0). *)
36let create sid =
37 let clock = Clock.create_vector sid in
38 let root = Node.make_val ~id:root_id in
39 let index = NodeIndex.singleton root_id root in
40 { clock; root; index }
41
42(** Create a model with a specific starting clock time *)
43let create_at sid time =
44 let clock = Clock.create_vector sid in
45 clock.local.clock_time <- time;
46 let root = Node.make_val ~id:root_id in
47 let index = NodeIndex.singleton root_id root in
48 { clock; root; index }
49
50(** {1 Node Index Operations} *)
51
52(** Get a node by its timestamp ID *)
53let get_node model id = NodeIndex.find_opt id model.index
54
55(** Add a node to the model's index *)
56let add_node model node =
57 let id = Node.id node in
58 model.index <- NodeIndex.add id node model.index
59
60(** Check if a node exists in the index *)
61let has_node model id = NodeIndex.mem id model.index
62
63(** Get all nodes in the model *)
64let nodes model = NodeIndex.bindings model.index |> List.map snd
65
66(** Get the number of nodes in the model *)
67let node_count model = NodeIndex.cardinal model.index
68
69(** {1 Clock Operations} *)
70
71(** Get the current clock vector *)
72let clock model = model.clock
73
74(** Get the local clock's session ID *)
75let session_id model = model.clock.local.clock_sid
76
77(** Get the current logical time *)
78let current_time model = model.clock.local.clock_time
79
80(** Tick the clock and return a new timestamp *)
81let tick model = Clock.tick model.clock.local
82
83(** Tick the clock N times and return the first timestamp *)
84let tick_n model n =
85 if n <= 0 then invalid_arg "tick_n: n must be positive";
86 let first = Clock.tick model.clock.local in
87 for _ = 2 to n do
88 ignore (Clock.tick model.clock.local)
89 done;
90 first
91
92(** Observe a timestamp from another replica *)
93let observe model ts = Clock.observe model.clock ts
94
95(** {1 View Functions} *)
96
97(** Resolve a timestamp reference to its actual value. This follows references
98 through val nodes to get the final value. *)
99let rec resolve_ref model (ts : Clock.timestamp) : Value.t =
100 match get_node model ts with
101 | None -> Value.Undefined (* Reference to unknown node *)
102 | Some node -> view_node model node
103
104(** Get the view of a specific node, resolving all references *)
105and view_node model node : Value.t =
106 match node with
107 | Node.Node_con { con_value; _ } -> (
108 (* For con nodes, check if value is a reference *)
109 match con_value with
110 | Value.Timestamp_ref (sid, time) -> resolve_ref model { sid; time }
111 | v -> v)
112 | Node.Node_val { val_ref; _ } -> (
113 match val_ref with
114 | None -> Value.Undefined
115 | Some ts -> resolve_ref model ts)
116 | Node.Node_obj { obj_entries; _ } ->
117 let pairs =
118 List.filter_map
119 (fun (e : Node.obj_entry) ->
120 let value = resolve_ref model e.obj_value in
121 (* Omit undefined values from object view, matching json-joy behavior *)
122 match value with
123 | Value.Undefined -> None
124 | v -> Some (e.obj_key, v))
125 obj_entries
126 in
127 Value.Object pairs
128 | Node.Node_vec { vec_slots; _ } ->
129 let max_idx =
130 List.fold_left
131 (fun acc (s : Node.vec_slot) -> max acc s.vec_idx)
132 (-1) vec_slots
133 in
134 if max_idx < 0 then Value.Array []
135 else
136 let arr = Array.make (max_idx + 1) Value.Undefined in
137 List.iter
138 (fun (s : Node.vec_slot) ->
139 arr.(s.vec_idx) <- resolve_ref model s.vec_value)
140 vec_slots;
141 Value.Array (Array.to_list arr)
142 | Node.Node_arr { arr_rga; _ } ->
143 let refs =
144 Rga.fold_visible
145 (fun acc (c : Clock.timestamp Rga.chunk) ->
146 resolve_ref model c.data :: acc)
147 [] arr_rga
148 |> List.rev
149 in
150 Value.Array refs
151 | Node.Node_str { str_rga; _ } -> Value.String (Rga.view_string str_rga)
152 | Node.Node_bin { bin_rga; _ } -> Value.Bytes (Rga.view_bytes bin_rga)
153
154(** Get the current view of the document. This resolves the root val node to get
155 the actual content. *)
156let view model : Value.t =
157 match model.root with
158 | Node.Node_val { val_ref; _ } -> (
159 match val_ref with
160 | None -> Value.Undefined (* Empty document *)
161 | Some ts -> resolve_ref model ts)
162 | _ -> failwith "Root must be a val node"
163
164(** {1 Replica Operations} *)
165
166(** Fork the model to create an independent replica with a new session ID. The
167 new replica starts with a copy of the current state. *)
168let fork model new_sid =
169 let new_clock = Clock.fork model.clock new_sid in
170 (* Deep copy the nodes - for now just copy references since nodes are mutable *)
171 let new_index =
172 NodeIndex.fold
173 (fun id node acc -> NodeIndex.add id node acc)
174 model.index NodeIndex.empty
175 in
176 { clock = new_clock; root = model.root; index = new_index }
177
178(** Clone the model (same session ID, independent state). Useful for creating a
179 snapshot. *)
180let clone model =
181 let new_clock = Clock.clone model.clock in
182 let new_index =
183 NodeIndex.fold
184 (fun id node acc -> NodeIndex.add id node acc)
185 model.index NodeIndex.empty
186 in
187 { clock = new_clock; root = model.root; index = new_index }
188
189(** {1 Node Creation Helpers} *)
190
191(** Create and register a new constant node *)
192let new_con model ~value =
193 let id = tick model in
194 let node = Node.make_con ~id ~value in
195 add_node model node;
196 node
197
198(** Create and register a new value node *)
199let new_val model =
200 let id = tick model in
201 let node = Node.make_val ~id in
202 add_node model node;
203 node
204
205(** Create and register a new object node *)
206let new_obj model =
207 let id = tick model in
208 let node = Node.make_obj ~id in
209 add_node model node;
210 node
211
212(** Create and register a new vector node *)
213let new_vec model =
214 let id = tick model in
215 let node = Node.make_vec ~id in
216 add_node model node;
217 node
218
219(** Create and register a new array node *)
220let new_arr model =
221 let id = tick model in
222 let node = Node.make_arr ~id in
223 add_node model node;
224 node
225
226(** Create and register a new string node *)
227let new_str model =
228 let id = tick model in
229 let node = Node.make_str ~id in
230 add_node model node;
231 node
232
233(** Create and register a new binary node *)
234let new_bin model =
235 let id = tick model in
236 let node = Node.make_bin ~id in
237 add_node model node;
238 node
239
240(** {1 Convenience Operations} *)
241
242(** Set the document root value to a node *)
243let set_root model node =
244 match model.root with
245 | Node.Node_val _ -> Node.set_val model.root ~value:(Node.id node)
246 | _ -> failwith "Root must be a val node"
247
248(** Get the root's target node (if set) *)
249let root_node model =
250 match model.root with
251 | Node.Node_val { val_ref; _ } ->
252 Option.bind val_ref (fun ts -> get_node model ts)
253 | _ -> None
254
255(** {1 Pretty Printing} *)
256
257(** Pretty print model info *)
258let pp fmt model =
259 Format.fprintf fmt "Model(sid=%d, time=%d, nodes=%d)"
260 model.clock.local.clock_sid model.clock.local.clock_time (node_count model)
261
262(** Convert to string for debugging *)
263let to_string model = Format.asprintf "%a" pp model
264
265(** {1 Patch Application} *)
266
267(** Apply a single operation to the model.
268 @param model The model to modify
269 @param op_id The ID assigned to this operation
270 @param op The operation to apply *)
271let apply_op model (op_id : Clock.timestamp) (op : Op.op_data) =
272 (* Observe the operation's timestamp to update our clock *)
273 observe model op_id;
274 match op with
275 (* Creation operations - create new nodes *)
276 | Op.Op_new_con { con_value } ->
277 let node = Node.make_con ~id:op_id ~value:con_value in
278 add_node model node
279 | Op.Op_new_val ->
280 let node = Node.make_val ~id:op_id in
281 add_node model node
282 | Op.Op_new_obj ->
283 let node = Node.make_obj ~id:op_id in
284 add_node model node
285 | Op.Op_new_vec ->
286 let node = Node.make_vec ~id:op_id in
287 add_node model node
288 | Op.Op_new_str ->
289 let node = Node.make_str ~id:op_id in
290 add_node model node
291 | Op.Op_new_bin ->
292 let node = Node.make_bin ~id:op_id in
293 add_node model node
294 | Op.Op_new_arr ->
295 let node = Node.make_arr ~id:op_id in
296 add_node model node
297 (* Insertion operations - modify existing nodes *)
298 | Op.Op_ins_val { ins_val_obj; ins_val_value } -> (
299 match get_node model ins_val_obj with
300 | Some (Node.Node_val _ as node) -> Node.set_val node ~value:ins_val_value
301 | Some _ -> () (* Wrong node type - ignore *)
302 | None -> () (* Ignore if target node doesn't exist *))
303 | Op.Op_ins_obj { ins_obj_obj; ins_obj_value } -> (
304 match get_node model ins_obj_obj with
305 | Some (Node.Node_obj _ as node) ->
306 List.iter
307 (fun (key, value) ->
308 Node.set_obj_key node ~key ~value ~write_ts:op_id)
309 ins_obj_value
310 | Some _ -> () (* Wrong node type - ignore *)
311 | None -> ())
312 | Op.Op_ins_vec { ins_vec_obj; ins_vec_idx; ins_vec_value } -> (
313 match get_node model ins_vec_obj with
314 | Some (Node.Node_vec _ as node) ->
315 Node.set_vec_slot node ~idx:ins_vec_idx ~value:ins_vec_value
316 ~write_ts:op_id
317 | Some _ -> () (* Wrong node type - ignore *)
318 | None -> ())
319 | Op.Op_ins_str { ins_str_obj; ins_str_after; ins_str_value } -> (
320 match get_node model ins_str_obj with
321 | Some (Node.Node_str _ as node) ->
322 Node.insert_str node ~after:(Some ins_str_after) ~chunk_id:op_id
323 ~text:ins_str_value
324 | Some _ -> () (* Wrong node type - ignore *)
325 | None -> ())
326 | Op.Op_ins_bin { ins_bin_obj; ins_bin_after; ins_bin_value } -> (
327 match get_node model ins_bin_obj with
328 | Some (Node.Node_bin _ as node) ->
329 Node.insert_bin node ~after:(Some ins_bin_after) ~chunk_id:op_id
330 ~data:ins_bin_value
331 | Some _ -> () (* Wrong node type - ignore *)
332 | None -> ())
333 | Op.Op_ins_arr { ins_arr_obj; ins_arr_after; ins_arr_value } -> (
334 match get_node model ins_arr_obj with
335 | Some (Node.Node_arr _ as node) ->
336 Node.insert_arr node ~after:(Some ins_arr_after) ~chunk_id:op_id
337 ~value:ins_arr_value
338 | Some _ -> () (* Wrong node type - ignore *)
339 | None -> ())
340 | Op.Op_upd_arr { upd_arr_obj; upd_arr_pos; upd_arr_value } -> (
341 (* Update array element - find the chunk and update its data *)
342 match get_node model upd_arr_obj with
343 | Some (Node.Node_arr { arr_rga; _ }) ->
344 (* For upd_arr, we need to find the chunk at upd_arr_pos and update it *)
345 (* This is a simplification - proper impl would update in place *)
346 let chunks = Rga.to_list arr_rga in
347 let updated =
348 List.map
349 (fun (c : Clock.timestamp Rga.chunk) ->
350 if Clock.equal_ts c.id upd_arr_pos then
351 { c with data = upd_arr_value }
352 else c)
353 chunks
354 in
355 (* NOTE: Rebuilding RGA - in-place update would be more efficient *)
356 let _ = updated in
357 ()
358 | _ -> ())
359 | Op.Op_del { del_obj; del_what } -> (
360 match get_node model del_obj with
361 | Some (Node.Node_str _ as node)
362 | Some (Node.Node_bin _ as node)
363 | Some (Node.Node_arr _ as node) ->
364 Node.delete_range node ~spans:del_what
365 | Some _ -> () (* Wrong node type - ignore *)
366 | None -> ())
367 | Op.Op_nop _ ->
368 (* No-op - just advances the clock *)
369 ()
370
371(** Apply a patch to the model. Each operation in the patch is applied in order
372 with its computed ID. *)
373let apply model (patch : Patch.t) = Patch.iter_with_id (apply_op model) patch
374
375(** Apply a batch of patches to the model *)
376let apply_batch model (batch : Patch.batch) = List.iter (apply model) batch