crdt library in ocaml implementing json-joy
at main 8.0 kB view raw
1(** Multi-Value Register extension (mval) - Concurrent write visibility. 2 3 A Multi-Value Register is a CRDT that preserves all concurrent writes, 4 making conflicts visible rather than silently resolving them with LWW. This 5 allows applications to implement custom conflict resolution. 6 7 It's built on an arr node where: 8 - Each write creates a new element in the array 9 - Concurrent writes from different sessions are all preserved 10 - Sequential writes from the same session replace the previous value 11 - Reading returns all concurrent values 12 13 The key insight is that arr nodes (RGA) preserve all concurrent inserts. By 14 using the arr's RGA semantics, we get concurrent write visibility for free. 15 16 Usage: 17 {[ 18 let model = Model.create session_id in 19 let mval = Mval.create model in 20 Mval.set mval (Value.string "value1"); 21 (* Concurrent set from another session would add another value *) 22 match Mval.get mval with 23 | [v] -> (* No conflict - single value *) 24 | values -> (* Conflict - multiple concurrent values *) 25 ]} 26 27 Conflict resolution strategies: 28 - First-wins: List.hd (Mval.get mval) 29 - Last-wins: List.hd (List.rev (Mval.get mval)) 30 - Custom: Application-specific merge *) 31 32(** {1 Types} *) 33 34type t = { 35 model : Model.t; 36 arr_node : Node.t; 37 mutable last_write : Clock.timestamp option; 38 (** Track last write from this session for sequential deduplication *) 39} 40(** Multi-value register handle wrapping an arr node *) 41 42(** {1 Creation} *) 43 44(** Create a new multi-value register. Creates an arr node in the model. *) 45let create model = 46 let id = Clock.tick model.Model.clock.local in 47 let arr_node = Node.make_arr ~id in 48 Model.add_node model arr_node; 49 { model; arr_node; last_write = None } 50 51(** Create a multi-value register from an existing arr node. Use this when 52 loading from a decoded model. *) 53let of_node model arr_node = 54 match arr_node with 55 | Node.Node_arr _ -> { model; arr_node; last_write = None } 56 | _ -> invalid_arg "Mval.of_node: expected arr node" 57 58(** Get the arr node's timestamp ID *) 59let id mval = Node.id mval.arr_node 60 61(** {1 Reading} *) 62 63(** Get all values in the register. 64 65 Returns a list of values representing all concurrent writes. If there's only 66 one value, there are no conflicts. Multiple values indicate concurrent 67 writes from different sessions. 68 69 Values are ordered by RGA ordering (timestamp-based). *) 70let get mval = 71 match mval.arr_node with 72 | Node.Node_arr { arr_rga; _ } -> 73 Rga.fold_visible 74 (fun acc (chunk : Clock.timestamp Rga.chunk) -> 75 (* Resolve the timestamp reference to get the actual value *) 76 let value = Model.resolve_ref mval.model chunk.data in 77 value :: acc) 78 [] arr_rga 79 |> List.rev 80 | _ -> [] 81 82(** Get the number of concurrent values (conflict count). 83 84 Returns 0 if empty, 1 if no conflict, >1 if conflict exists. *) 85let count mval = 86 match mval.arr_node with 87 | Node.Node_arr { arr_rga; _ } -> Rga.visible_span arr_rga 88 | _ -> 0 89 90(** Check if there's a conflict (multiple concurrent values). *) 91let has_conflict mval = count mval > 1 92 93(** Check if the register is empty (no values set). *) 94let is_empty mval = count mval = 0 95 96(** Get the first value (if any). Useful for first-wins conflict resolution. *) 97let first mval = match get mval with [] -> None | v :: _ -> Some v 98 99(** Get the last value (if any). Useful for last-wins conflict resolution. *) 100let last mval = 101 match get mval with [] -> None | values -> Some (List.hd (List.rev values)) 102 103(** {1 Writing} *) 104 105(** Set a value in the register. 106 107 If this is a sequential write from the same session, it replaces the 108 previous value. If this is the first write or a concurrent write, it adds to 109 the set of values. 110 111 @param value The value to set *) 112let set mval value = 113 let session_id = mval.model.clock.local.clock_sid in 114 match mval.arr_node with 115 | Node.Node_arr { arr_rga; _ } -> 116 (* Delete our previous write if any (sequential write semantics) *) 117 (match mval.last_write with 118 | Some prev_ts -> 119 (* Find and delete our previous value *) 120 let spans = [ Clock.timespan prev_ts.sid prev_ts.time 1 ] in 121 Rga.delete_elements arr_rga ~spans 122 | None -> ()); 123 124 (* Create a con node for the value *) 125 let con_id = Clock.tick mval.model.clock.local in 126 let con_node = Node.make_con ~id:con_id ~value in 127 Model.add_node mval.model con_node; 128 129 (* Insert into the arr *) 130 let chunk_id = Clock.tick mval.model.clock.local in 131 (* Insert at the end - after the last element *) 132 let after = Rga.last_id arr_rga in 133 Rga.insert_element arr_rga ~after ~chunk_id ~value:con_id; 134 135 (* Track this write for sequential deduplication *) 136 mval.last_write <- Some chunk_id; 137 138 (* Update the session's slot in the order tracking *) 139 ignore session_id 140 | _ -> () 141 142(** Clear all values from the register. *) 143let clear mval = 144 match mval.arr_node with 145 | Node.Node_arr { arr_rga; _ } -> 146 (* Delete all visible elements *) 147 let spans = 148 Rga.fold_visible 149 (fun acc (chunk : Clock.timestamp Rga.chunk) -> 150 Clock.timespan chunk.id.sid chunk.id.time chunk.span :: acc) 151 [] arr_rga 152 in 153 if spans <> [] then Rga.delete_elements arr_rga ~spans; 154 mval.last_write <- None 155 | _ -> () 156 157(** Internal: set a value without clearing our previous write *) 158let set_value_only mval value = 159 match mval.arr_node with 160 | Node.Node_arr { arr_rga; _ } -> 161 (* Create a con node for the value *) 162 let con_id = Clock.tick mval.model.clock.local in 163 let con_node = Node.make_con ~id:con_id ~value in 164 Model.add_node mval.model con_node; 165 166 (* Insert into the arr at the end *) 167 let chunk_id = Clock.tick mval.model.clock.local in 168 let after = Rga.last_id arr_rga in 169 Rga.insert_element arr_rga ~after ~chunk_id ~value:con_id; 170 mval.last_write <- Some chunk_id 171 | _ -> () 172 173(** Set multiple values atomically. 174 175 Creates entries for all values. Useful for explicitly setting multiple 176 concurrent values during conflict resolution. *) 177let set_all mval values = 178 (* Clear existing values first *) 179 clear mval; 180 (* Add all values *) 181 List.iter (fun v -> set_value_only mval v) values 182 183(** {1 Conflict Resolution} *) 184 185(** Resolve conflicts by keeping only one value. 186 187 @param strategy 188 How to select the value: `First | `Last | `Custom of (Value.t list -> 189 Value.t) *) 190type resolve_strategy = 191 | First (** Keep the first value (by RGA ordering) *) 192 | Last (** Keep the last value (by RGA ordering) *) 193 | Custom of (Value.t list -> Value.t) (** Custom resolver function *) 194 195(** Resolve conflicts using the given strategy. 196 197 After resolution, the register will have exactly one value (or zero if 198 empty). *) 199let resolve mval strategy = 200 let values = get mval in 201 match values with 202 | [] | [ _ ] -> () (* No conflict to resolve *) 203 | _ -> 204 let resolved = 205 match strategy with 206 | First -> List.hd values 207 | Last -> List.hd (List.rev values) 208 | Custom f -> f values 209 in 210 clear mval; 211 set mval resolved 212 213(** {1 Inspection} *) 214 215(** Get the raw arr node for advanced operations. *) 216let node mval = mval.arr_node 217 218(** Get all element timestamps (useful for debugging). *) 219let timestamps mval = 220 match mval.arr_node with 221 | Node.Node_arr { arr_rga; _ } -> 222 Rga.fold_visible 223 (fun acc (chunk : Clock.timestamp Rga.chunk) -> chunk.id :: acc) 224 [] arr_rga 225 |> List.rev 226 | _ -> [] 227 228(** Pretty print the multi-value register state. *) 229let pp fmt mval = 230 let values = get mval in 231 let id = Node.id mval.arr_node in 232 Format.fprintf fmt "mval[%d,%d]{%d values}" id.sid id.time 233 (List.length values) 234 235(** Convert to string for debugging. *) 236let to_string mval = Format.asprintf "%a" pp mval