crdt library in ocaml implementing json-joy
1(** Multi-Value Register extension (mval) - Concurrent write visibility.
2
3 A Multi-Value Register is a CRDT that preserves all concurrent writes,
4 making conflicts visible rather than silently resolving them with LWW. This
5 allows applications to implement custom conflict resolution.
6
7 It's built on an arr node where:
8 - Each write creates a new element in the array
9 - Concurrent writes from different sessions are all preserved
10 - Sequential writes from the same session replace the previous value
11 - Reading returns all concurrent values
12
13 The key insight is that arr nodes (RGA) preserve all concurrent inserts. By
14 using the arr's RGA semantics, we get concurrent write visibility for free.
15
16 Usage:
17 {[
18 let model = Model.create session_id in
19 let mval = Mval.create model in
20 Mval.set mval (Value.string "value1");
21 (* Concurrent set from another session would add another value *)
22 match Mval.get mval with
23 | [v] -> (* No conflict - single value *)
24 | values -> (* Conflict - multiple concurrent values *)
25 ]}
26
27 Conflict resolution strategies:
28 - First-wins: List.hd (Mval.get mval)
29 - Last-wins: List.hd (List.rev (Mval.get mval))
30 - Custom: Application-specific merge *)
31
32(** {1 Types} *)
33
34type t = {
35 model : Model.t;
36 arr_node : Node.t;
37 mutable last_write : Clock.timestamp option;
38 (** Track last write from this session for sequential deduplication *)
39}
40(** Multi-value register handle wrapping an arr node *)
41
42(** {1 Creation} *)
43
44(** Create a new multi-value register. Creates an arr node in the model. *)
45let create model =
46 let id = Clock.tick model.Model.clock.local in
47 let arr_node = Node.make_arr ~id in
48 Model.add_node model arr_node;
49 { model; arr_node; last_write = None }
50
51(** Create a multi-value register from an existing arr node. Use this when
52 loading from a decoded model. *)
53let of_node model arr_node =
54 match arr_node with
55 | Node.Node_arr _ -> { model; arr_node; last_write = None }
56 | _ -> invalid_arg "Mval.of_node: expected arr node"
57
58(** Get the arr node's timestamp ID *)
59let id mval = Node.id mval.arr_node
60
61(** {1 Reading} *)
62
63(** Get all values in the register.
64
65 Returns a list of values representing all concurrent writes. If there's only
66 one value, there are no conflicts. Multiple values indicate concurrent
67 writes from different sessions.
68
69 Values are ordered by RGA ordering (timestamp-based). *)
70let get mval =
71 match mval.arr_node with
72 | Node.Node_arr { arr_rga; _ } ->
73 Rga.fold_visible
74 (fun acc (chunk : Clock.timestamp Rga.chunk) ->
75 (* Resolve the timestamp reference to get the actual value *)
76 let value = Model.resolve_ref mval.model chunk.data in
77 value :: acc)
78 [] arr_rga
79 |> List.rev
80 | _ -> []
81
82(** Get the number of concurrent values (conflict count).
83
84 Returns 0 if empty, 1 if no conflict, >1 if conflict exists. *)
85let count mval =
86 match mval.arr_node with
87 | Node.Node_arr { arr_rga; _ } -> Rga.visible_span arr_rga
88 | _ -> 0
89
90(** Check if there's a conflict (multiple concurrent values). *)
91let has_conflict mval = count mval > 1
92
93(** Check if the register is empty (no values set). *)
94let is_empty mval = count mval = 0
95
96(** Get the first value (if any). Useful for first-wins conflict resolution. *)
97let first mval = match get mval with [] -> None | v :: _ -> Some v
98
99(** Get the last value (if any). Useful for last-wins conflict resolution. *)
100let last mval =
101 match get mval with [] -> None | values -> Some (List.hd (List.rev values))
102
103(** {1 Writing} *)
104
105(** Set a value in the register.
106
107 If this is a sequential write from the same session, it replaces the
108 previous value. If this is the first write or a concurrent write, it adds to
109 the set of values.
110
111 @param value The value to set *)
112let set mval value =
113 let session_id = mval.model.clock.local.clock_sid in
114 match mval.arr_node with
115 | Node.Node_arr { arr_rga; _ } ->
116 (* Delete our previous write if any (sequential write semantics) *)
117 (match mval.last_write with
118 | Some prev_ts ->
119 (* Find and delete our previous value *)
120 let spans = [ Clock.timespan prev_ts.sid prev_ts.time 1 ] in
121 Rga.delete_elements arr_rga ~spans
122 | None -> ());
123
124 (* Create a con node for the value *)
125 let con_id = Clock.tick mval.model.clock.local in
126 let con_node = Node.make_con ~id:con_id ~value in
127 Model.add_node mval.model con_node;
128
129 (* Insert into the arr *)
130 let chunk_id = Clock.tick mval.model.clock.local in
131 (* Insert at the end - after the last element *)
132 let after = Rga.last_id arr_rga in
133 Rga.insert_element arr_rga ~after ~chunk_id ~value:con_id;
134
135 (* Track this write for sequential deduplication *)
136 mval.last_write <- Some chunk_id;
137
138 (* Update the session's slot in the order tracking *)
139 ignore session_id
140 | _ -> ()
141
142(** Clear all values from the register. *)
143let clear mval =
144 match mval.arr_node with
145 | Node.Node_arr { arr_rga; _ } ->
146 (* Delete all visible elements *)
147 let spans =
148 Rga.fold_visible
149 (fun acc (chunk : Clock.timestamp Rga.chunk) ->
150 Clock.timespan chunk.id.sid chunk.id.time chunk.span :: acc)
151 [] arr_rga
152 in
153 if spans <> [] then Rga.delete_elements arr_rga ~spans;
154 mval.last_write <- None
155 | _ -> ()
156
157(** Internal: set a value without clearing our previous write *)
158let set_value_only mval value =
159 match mval.arr_node with
160 | Node.Node_arr { arr_rga; _ } ->
161 (* Create a con node for the value *)
162 let con_id = Clock.tick mval.model.clock.local in
163 let con_node = Node.make_con ~id:con_id ~value in
164 Model.add_node mval.model con_node;
165
166 (* Insert into the arr at the end *)
167 let chunk_id = Clock.tick mval.model.clock.local in
168 let after = Rga.last_id arr_rga in
169 Rga.insert_element arr_rga ~after ~chunk_id ~value:con_id;
170 mval.last_write <- Some chunk_id
171 | _ -> ()
172
173(** Set multiple values atomically.
174
175 Creates entries for all values. Useful for explicitly setting multiple
176 concurrent values during conflict resolution. *)
177let set_all mval values =
178 (* Clear existing values first *)
179 clear mval;
180 (* Add all values *)
181 List.iter (fun v -> set_value_only mval v) values
182
183(** {1 Conflict Resolution} *)
184
185(** Resolve conflicts by keeping only one value.
186
187 @param strategy
188 How to select the value: `First | `Last | `Custom of (Value.t list ->
189 Value.t) *)
190type resolve_strategy =
191 | First (** Keep the first value (by RGA ordering) *)
192 | Last (** Keep the last value (by RGA ordering) *)
193 | Custom of (Value.t list -> Value.t) (** Custom resolver function *)
194
195(** Resolve conflicts using the given strategy.
196
197 After resolution, the register will have exactly one value (or zero if
198 empty). *)
199let resolve mval strategy =
200 let values = get mval in
201 match values with
202 | [] | [ _ ] -> () (* No conflict to resolve *)
203 | _ ->
204 let resolved =
205 match strategy with
206 | First -> List.hd values
207 | Last -> List.hd (List.rev values)
208 | Custom f -> f values
209 in
210 clear mval;
211 set mval resolved
212
213(** {1 Inspection} *)
214
215(** Get the raw arr node for advanced operations. *)
216let node mval = mval.arr_node
217
218(** Get all element timestamps (useful for debugging). *)
219let timestamps mval =
220 match mval.arr_node with
221 | Node.Node_arr { arr_rga; _ } ->
222 Rga.fold_visible
223 (fun acc (chunk : Clock.timestamp Rga.chunk) -> chunk.id :: acc)
224 [] arr_rga
225 |> List.rev
226 | _ -> []
227
228(** Pretty print the multi-value register state. *)
229let pp fmt mval =
230 let values = get mval in
231 let id = Node.id mval.arr_node in
232 Format.fprintf fmt "mval[%d,%d]{%d values}" id.sid id.time
233 (List.length values)
234
235(** Convert to string for debugging. *)
236let to_string mval = Format.asprintf "%a" pp mval