crdt library in ocaml implementing json-joy
at main 9.3 kB view raw
1(** CRDT document synchronization over JSON-Rx. 2 3 This module implements document sync protocol using JSON-Rx: 4 - Subscribe to document changes (real-time updates) 5 - Send/receive patches 6 - Initial sync (snapshot + pending patches) 7 - Conflict-free merge (CRDT semantics) 8 9 @see <https://jsonjoy.com/specs/json-crdt> JSON CRDT specification *) 10 11(** {1 Types} *) 12 13type doc_state = { 14 doc_id : string; 15 clock : Clock.clock_vector; 16 patch_count : int; (** Number of patches applied *) 17} 18(** Document state summary for sync *) 19 20(** {1 Protocol Constants} *) 21 22(** RPC method names *) 23module Methods = struct 24 let sync = "sync" 25 let patch = "patch" 26end 27 28(** Subscription channels *) 29module Channels = struct 30 let changes doc_id = "doc:" ^ doc_id 31end 32 33(** {1 Server-Side Sync} *) 34 35type server_doc = { 36 id : string; 37 model : Model.t; 38 mutable patches : Patch.t list; (** Patches applied in order *) 39 mutable subscribers : (Value.t -> unit) list; 40} 41(** Synchronized document on server *) 42 43type server = { docs : (string, server_doc) Hashtbl.t; rx_server : Rx_server.t } 44(** Server sync state *) 45 46(** Create a new sync server *) 47let create_server () = 48 let rx_server = Rx_server.create () in 49 let server = { docs = Hashtbl.create 16; rx_server } in 50 server 51 52(** Register a document for sync *) 53let register_doc server ~doc_id ~model = 54 let doc = { id = doc_id; model; patches = []; subscribers = [] } in 55 Hashtbl.replace server.docs doc_id doc; 56 doc 57 58(** Get document state *) 59let get_doc_state doc = 60 { 61 doc_id = doc.id; 62 clock = Model.clock doc.model; 63 patch_count = List.length doc.patches; 64 } 65 66(** Clock to list for encoding *) 67let clock_to_list clock = 68 let pairs = ref [] in 69 List.iter 70 (fun (sid, time) -> pairs := (sid, time) :: !pairs) 71 clock.Clock.peers; 72 pairs := (clock.Clock.local.clock_sid, clock.Clock.local.clock_time) :: !pairs; 73 List.rev !pairs 74 75(** Encode document state to Value.t for JSON-Rx *) 76let encode_doc_state state = 77 Value.Object 78 [ 79 ("doc_id", Value.String state.doc_id); 80 ("patch_count", Value.Int state.patch_count); 81 ( "clock", 82 Value.Array 83 (List.map 84 (fun (sid, time) -> Value.Array [ Value.Int sid; Value.Int time ]) 85 (clock_to_list state.clock)) ); 86 ] 87 88(** Decode document state from Value.t *) 89let decode_doc_state value = 90 match value with 91 | Value.Object fields -> ( 92 match 93 ( List.assoc_opt "doc_id" fields, 94 List.assoc_opt "patch_count" fields, 95 List.assoc_opt "clock" fields ) 96 with 97 | ( Some (Value.String doc_id), 98 Some (Value.Int patch_count), 99 Some (Value.Array clock_pairs) ) -> 100 let clock = Clock.create_vector 0 in 101 List.iter 102 (function 103 | Value.Array [ Value.Int sid; Value.Int time ] -> 104 Clock.observe clock { Clock.sid; time } 105 | _ -> ()) 106 clock_pairs; 107 Some { doc_id; clock; patch_count } 108 | _ -> None) 109 | _ -> None 110 111(** Apply patch to server document and broadcast to subscribers *) 112let apply_patch server doc_id patch_str = 113 match Hashtbl.find_opt server.docs doc_id with 114 | None -> Error "document not found" 115 | Some doc -> ( 116 match Patch_codec_compact.decode patch_str with 117 | Error e -> Error ("patch decode: " ^ e) 118 | Ok patch -> 119 Model.apply doc.model patch; 120 doc.patches <- doc.patches @ [ patch ]; 121 (* Broadcast to subscribers *) 122 let patch_value = Value.String patch_str in 123 List.iter (fun cb -> cb patch_value) doc.subscribers; 124 Ok (get_doc_state doc)) 125 126(** Subscribe to document changes *) 127let subscribe_doc server doc_id callback = 128 match Hashtbl.find_opt server.docs doc_id with 129 | None -> Error "document not found" 130 | Some doc -> 131 doc.subscribers <- callback :: doc.subscribers; 132 Ok (get_doc_state doc) 133 134(** Unsubscribe from document changes *) 135let unsubscribe_doc server doc_id callback = 136 match Hashtbl.find_opt server.docs doc_id with 137 | None -> () 138 | Some doc -> 139 doc.subscribers <- List.filter (fun cb -> cb != callback) doc.subscribers 140 141(** Get patches since a given clock (for initial sync) *) 142let get_patches_since doc since_clock = 143 (* Return patches not dominated by since_clock *) 144 List.filter 145 (fun patch -> 146 (* Check if patch ID is after since_clock *) 147 let patch_ts = patch.Patch.id in 148 let dominated = 149 match List.assoc_opt patch_ts.Clock.sid since_clock.Clock.peers with 150 | Some peer_time -> patch_ts.time <= peer_time 151 | None -> 152 if patch_ts.sid = since_clock.Clock.local.clock_sid then 153 patch_ts.time <= since_clock.Clock.local.clock_time 154 else false 155 in 156 not dominated) 157 doc.patches 158 159(** Encode a patch for transmission *) 160let encode_patch patch = Patch_codec_compact.encode patch 161 162(** {1 Client-Side Sync} *) 163 164type client_doc = { 165 id : string; 166 model : Model.t; 167 mutable pending_patches : Patch.t list; 168 (** Patches not yet acked by server *) 169 mutable on_change : (Patch.t -> unit) option; 170} 171(** Synchronized document on client *) 172 173type client = { 174 docs : (string, client_doc) Hashtbl.t; 175 rx_client : Rx_client.t; 176 subscription_ids : (string, int) Hashtbl.t; 177} 178(** Client sync state *) 179 180(** Create a new sync client *) 181let create_client rx_client = 182 { docs = Hashtbl.create 16; rx_client; subscription_ids = Hashtbl.create 16 } 183 184(** Open a document for sync *) 185let open_doc client ~doc_id ~model = 186 let doc = { id = doc_id; model; pending_patches = []; on_change = None } in 187 Hashtbl.replace client.docs doc_id doc; 188 doc 189 190(** Set change callback for a document *) 191let on_doc_change doc callback = doc.on_change <- Some callback 192 193(** Handle incoming patch from server *) 194let handle_incoming_patch client doc_id patch_str = 195 match Hashtbl.find_opt client.docs doc_id with 196 | None -> Error "document not found" 197 | Some doc -> ( 198 match Patch_codec_compact.decode patch_str with 199 | Error e -> Error ("patch decode: " ^ e) 200 | Ok patch -> 201 (* Apply patch *) 202 Model.apply doc.model patch; 203 (match doc.on_change with Some cb -> cb patch | None -> ()); 204 Ok ()) 205 206(** {1 Sync Protocol Helpers} *) 207 208(** Compare two document states *) 209type sync_comparison = Equal | Behind | Ahead | Concurrent 210 211let compare_states local remote = 212 let local_time = local.clock.Clock.local.clock_time in 213 let remote_time = remote.clock.Clock.local.clock_time in 214 if local_time = remote_time && local.patch_count = remote.patch_count then 215 Equal 216 else if local_time < remote_time then Behind 217 else if local_time > remote_time then Ahead 218 else Concurrent 219 220(** {1 Testing Helpers} *) 221 222(** Create a pair of documents that can sync *) 223let create_sync_pair ~doc_id ~session1 ~session2 = 224 let model1 = Model.create session1 in 225 let model2 = Model.create session2 in 226 let server = create_server () in 227 let server_doc = register_doc server ~doc_id ~model:model1 in 228 let rx_client = Rx_client.create () in 229 let client = create_client rx_client in 230 let client_doc = open_doc client ~doc_id ~model:model2 in 231 (server, server_doc, client, client_doc) 232 233(** Helper to get pending patches from client doc *) 234let client_doc_patches client_doc = client_doc.pending_patches 235 236(** Add a patch to client pending list *) 237let add_client_patch client_doc patch = 238 client_doc.pending_patches <- client_doc.pending_patches @ [ patch ] 239 240(** Check if timestamp is dominated by clock *) 241let is_dominated_by_clock (ts : Clock.timestamp) clock = 242 match List.assoc_opt ts.Clock.sid clock.Clock.peers with 243 | Some peer_time -> ts.time <= peer_time 244 | None -> 245 if ts.sid = clock.Clock.local.clock_sid then 246 ts.time <= clock.Clock.local.clock_time 247 else false 248 249(** Get patches from client that server doesn't have *) 250let get_client_patches_since client_doc since_clock = 251 List.filter 252 (fun patch -> 253 let patch_ts = patch.Patch.id in 254 not (is_dominated_by_clock patch_ts since_clock)) 255 client_doc.pending_patches 256 257(** Simulate sync: apply server patches to client *) 258let sync_server_to_client server_doc client_doc = 259 let client_clock = Model.clock client_doc.model in 260 let new_patches = get_patches_since server_doc client_clock in 261 List.iter (fun patch -> Model.apply client_doc.model patch) new_patches; 262 List.length new_patches 263 264(** Simulate sync: apply client pending patches to server *) 265let sync_client_to_server (client_doc : client_doc) (server_doc : server_doc) = 266 let server_clock = Model.clock server_doc.model in 267 let new_patches = get_client_patches_since client_doc server_clock in 268 List.iter 269 (fun patch -> 270 Model.apply server_doc.model patch; 271 server_doc.patches <- server_doc.patches @ [ patch ]) 272 new_patches; 273 List.length new_patches 274 275(** Two-way sync between server and client *) 276let bidirectional_sync server_doc client_doc = 277 let s2c = sync_server_to_client server_doc client_doc in 278 let c2s = sync_client_to_server client_doc server_doc in 279 (s2c, c2s) 280 281(** Get view of server document *) 282let server_doc_view (doc : server_doc) = Model.view doc.model 283 284(** Get view of client document *) 285let client_doc_view (doc : client_doc) = Model.view doc.model