crdt library in ocaml implementing json-joy
1(** CRDT document synchronization over JSON-Rx.
2
3 This module implements document sync protocol using JSON-Rx:
4 - Subscribe to document changes (real-time updates)
5 - Send/receive patches
6 - Initial sync (snapshot + pending patches)
7 - Conflict-free merge (CRDT semantics)
8
9 @see <https://jsonjoy.com/specs/json-crdt> JSON CRDT specification *)
10
11(** {1 Types} *)
12
13type doc_state = {
14 doc_id : string;
15 clock : Clock.clock_vector;
16 patch_count : int; (** Number of patches applied *)
17}
18(** Document state summary for sync *)
19
20(** {1 Protocol Constants} *)
21
22(** RPC method names *)
23module Methods = struct
24 let sync = "sync"
25 let patch = "patch"
26end
27
28(** Subscription channels *)
29module Channels = struct
30 let changes doc_id = "doc:" ^ doc_id
31end
32
33(** {1 Server-Side Sync} *)
34
35type server_doc = {
36 id : string;
37 model : Model.t;
38 mutable patches : Patch.t list; (** Patches applied in order *)
39 mutable subscribers : (Value.t -> unit) list;
40}
41(** Synchronized document on server *)
42
43type server = { docs : (string, server_doc) Hashtbl.t; rx_server : Rx_server.t }
44(** Server sync state *)
45
46(** Create a new sync server *)
47let create_server () =
48 let rx_server = Rx_server.create () in
49 let server = { docs = Hashtbl.create 16; rx_server } in
50 server
51
52(** Register a document for sync *)
53let register_doc server ~doc_id ~model =
54 let doc = { id = doc_id; model; patches = []; subscribers = [] } in
55 Hashtbl.replace server.docs doc_id doc;
56 doc
57
58(** Get document state *)
59let get_doc_state doc =
60 {
61 doc_id = doc.id;
62 clock = Model.clock doc.model;
63 patch_count = List.length doc.patches;
64 }
65
66(** Clock to list for encoding *)
67let clock_to_list clock =
68 let pairs = ref [] in
69 List.iter
70 (fun (sid, time) -> pairs := (sid, time) :: !pairs)
71 clock.Clock.peers;
72 pairs := (clock.Clock.local.clock_sid, clock.Clock.local.clock_time) :: !pairs;
73 List.rev !pairs
74
75(** Encode document state to Value.t for JSON-Rx *)
76let encode_doc_state state =
77 Value.Object
78 [
79 ("doc_id", Value.String state.doc_id);
80 ("patch_count", Value.Int state.patch_count);
81 ( "clock",
82 Value.Array
83 (List.map
84 (fun (sid, time) -> Value.Array [ Value.Int sid; Value.Int time ])
85 (clock_to_list state.clock)) );
86 ]
87
88(** Decode document state from Value.t *)
89let decode_doc_state value =
90 match value with
91 | Value.Object fields -> (
92 match
93 ( List.assoc_opt "doc_id" fields,
94 List.assoc_opt "patch_count" fields,
95 List.assoc_opt "clock" fields )
96 with
97 | ( Some (Value.String doc_id),
98 Some (Value.Int patch_count),
99 Some (Value.Array clock_pairs) ) ->
100 let clock = Clock.create_vector 0 in
101 List.iter
102 (function
103 | Value.Array [ Value.Int sid; Value.Int time ] ->
104 Clock.observe clock { Clock.sid; time }
105 | _ -> ())
106 clock_pairs;
107 Some { doc_id; clock; patch_count }
108 | _ -> None)
109 | _ -> None
110
111(** Apply patch to server document and broadcast to subscribers *)
112let apply_patch server doc_id patch_str =
113 match Hashtbl.find_opt server.docs doc_id with
114 | None -> Error "document not found"
115 | Some doc -> (
116 match Patch_codec_compact.decode patch_str with
117 | Error e -> Error ("patch decode: " ^ e)
118 | Ok patch ->
119 Model.apply doc.model patch;
120 doc.patches <- doc.patches @ [ patch ];
121 (* Broadcast to subscribers *)
122 let patch_value = Value.String patch_str in
123 List.iter (fun cb -> cb patch_value) doc.subscribers;
124 Ok (get_doc_state doc))
125
126(** Subscribe to document changes *)
127let subscribe_doc server doc_id callback =
128 match Hashtbl.find_opt server.docs doc_id with
129 | None -> Error "document not found"
130 | Some doc ->
131 doc.subscribers <- callback :: doc.subscribers;
132 Ok (get_doc_state doc)
133
134(** Unsubscribe from document changes *)
135let unsubscribe_doc server doc_id callback =
136 match Hashtbl.find_opt server.docs doc_id with
137 | None -> ()
138 | Some doc ->
139 doc.subscribers <- List.filter (fun cb -> cb != callback) doc.subscribers
140
141(** Get patches since a given clock (for initial sync) *)
142let get_patches_since doc since_clock =
143 (* Return patches not dominated by since_clock *)
144 List.filter
145 (fun patch ->
146 (* Check if patch ID is after since_clock *)
147 let patch_ts = patch.Patch.id in
148 let dominated =
149 match List.assoc_opt patch_ts.Clock.sid since_clock.Clock.peers with
150 | Some peer_time -> patch_ts.time <= peer_time
151 | None ->
152 if patch_ts.sid = since_clock.Clock.local.clock_sid then
153 patch_ts.time <= since_clock.Clock.local.clock_time
154 else false
155 in
156 not dominated)
157 doc.patches
158
159(** Encode a patch for transmission *)
160let encode_patch patch = Patch_codec_compact.encode patch
161
162(** {1 Client-Side Sync} *)
163
164type client_doc = {
165 id : string;
166 model : Model.t;
167 mutable pending_patches : Patch.t list;
168 (** Patches not yet acked by server *)
169 mutable on_change : (Patch.t -> unit) option;
170}
171(** Synchronized document on client *)
172
173type client = {
174 docs : (string, client_doc) Hashtbl.t;
175 rx_client : Rx_client.t;
176 subscription_ids : (string, int) Hashtbl.t;
177}
178(** Client sync state *)
179
180(** Create a new sync client *)
181let create_client rx_client =
182 { docs = Hashtbl.create 16; rx_client; subscription_ids = Hashtbl.create 16 }
183
184(** Open a document for sync *)
185let open_doc client ~doc_id ~model =
186 let doc = { id = doc_id; model; pending_patches = []; on_change = None } in
187 Hashtbl.replace client.docs doc_id doc;
188 doc
189
190(** Set change callback for a document *)
191let on_doc_change doc callback = doc.on_change <- Some callback
192
193(** Handle incoming patch from server *)
194let handle_incoming_patch client doc_id patch_str =
195 match Hashtbl.find_opt client.docs doc_id with
196 | None -> Error "document not found"
197 | Some doc -> (
198 match Patch_codec_compact.decode patch_str with
199 | Error e -> Error ("patch decode: " ^ e)
200 | Ok patch ->
201 (* Apply patch *)
202 Model.apply doc.model patch;
203 (match doc.on_change with Some cb -> cb patch | None -> ());
204 Ok ())
205
206(** {1 Sync Protocol Helpers} *)
207
208(** Compare two document states *)
209type sync_comparison = Equal | Behind | Ahead | Concurrent
210
211let compare_states local remote =
212 let local_time = local.clock.Clock.local.clock_time in
213 let remote_time = remote.clock.Clock.local.clock_time in
214 if local_time = remote_time && local.patch_count = remote.patch_count then
215 Equal
216 else if local_time < remote_time then Behind
217 else if local_time > remote_time then Ahead
218 else Concurrent
219
220(** {1 Testing Helpers} *)
221
222(** Create a pair of documents that can sync *)
223let create_sync_pair ~doc_id ~session1 ~session2 =
224 let model1 = Model.create session1 in
225 let model2 = Model.create session2 in
226 let server = create_server () in
227 let server_doc = register_doc server ~doc_id ~model:model1 in
228 let rx_client = Rx_client.create () in
229 let client = create_client rx_client in
230 let client_doc = open_doc client ~doc_id ~model:model2 in
231 (server, server_doc, client, client_doc)
232
233(** Helper to get pending patches from client doc *)
234let client_doc_patches client_doc = client_doc.pending_patches
235
236(** Add a patch to client pending list *)
237let add_client_patch client_doc patch =
238 client_doc.pending_patches <- client_doc.pending_patches @ [ patch ]
239
240(** Check if timestamp is dominated by clock *)
241let is_dominated_by_clock (ts : Clock.timestamp) clock =
242 match List.assoc_opt ts.Clock.sid clock.Clock.peers with
243 | Some peer_time -> ts.time <= peer_time
244 | None ->
245 if ts.sid = clock.Clock.local.clock_sid then
246 ts.time <= clock.Clock.local.clock_time
247 else false
248
249(** Get patches from client that server doesn't have *)
250let get_client_patches_since client_doc since_clock =
251 List.filter
252 (fun patch ->
253 let patch_ts = patch.Patch.id in
254 not (is_dominated_by_clock patch_ts since_clock))
255 client_doc.pending_patches
256
257(** Simulate sync: apply server patches to client *)
258let sync_server_to_client server_doc client_doc =
259 let client_clock = Model.clock client_doc.model in
260 let new_patches = get_patches_since server_doc client_clock in
261 List.iter (fun patch -> Model.apply client_doc.model patch) new_patches;
262 List.length new_patches
263
264(** Simulate sync: apply client pending patches to server *)
265let sync_client_to_server (client_doc : client_doc) (server_doc : server_doc) =
266 let server_clock = Model.clock server_doc.model in
267 let new_patches = get_client_patches_since client_doc server_clock in
268 List.iter
269 (fun patch ->
270 Model.apply server_doc.model patch;
271 server_doc.patches <- server_doc.patches @ [ patch ])
272 new_patches;
273 List.length new_patches
274
275(** Two-way sync between server and client *)
276let bidirectional_sync server_doc client_doc =
277 let s2c = sync_server_to_client server_doc client_doc in
278 let c2s = sync_client_to_server client_doc server_doc in
279 (s2c, c2s)
280
281(** Get view of server document *)
282let server_doc_view (doc : server_doc) = Model.view doc.model
283
284(** Get view of client document *)
285let client_doc_view (doc : client_doc) = Model.view doc.model