+15
LICENSE
+15
LICENSE
···
1
+
ISC License
2
+
3
+
Copyright (c) 2026 Gabriel Díaz López de la Llave
4
+
5
+
Permission to use, copy, modify, and/or distribute this software for any
6
+
purpose with or without fee is hereby granted, provided that the above
7
+
copyright notice and this permission notice appear in all copies.
8
+
9
+
THE SOFTWARE IS PROVIDED "AS IS" AND THE AUTHOR DISCLAIMS ALL WARRANTIES WITH
10
+
REGARD TO THIS SOFTWARE INCLUDING ALL IMPLIED WARRANTIES OF MERCHANTABILITY
11
+
AND FITNESS. IN NO EVENT SHALL THE AUTHOR BE LIABLE FOR ANY SPECIAL, DIRECT,
12
+
INDIRECT, OR CONSEQUENTIAL DAMAGES OR ANY DAMAGES WHATSOEVER RESULTING FROM
13
+
LOSS OF USE, DATA OR PROFITS, WHETHER IN AN ACTION OF CONTRACT, NEGLIGENCE OR
14
+
OTHER TORTIOUS ACTION, ARISING OUT OF OR IN CONNECTION WITH THE USE OR
15
+
PERFORMANCE OF THIS SOFTWARE.
+1
README.md
+1
README.md
···
1
+
docs/README.md
-2
bench/dune
-2
bench/dune
-10
bin/dune
-10
bin/dune
+4
-1
bin/main.ml
+4
-1
bin/main.ml
+1
bin/wb/static
+1
bin/wb/static
···
1
+
../wbd/static
+427
bin/wb/wb.ml
+427
bin/wb/wb.ml
···
1
+
(** Collaborative Webboard - HCS WebSocket Server
2
+
3
+
A real-time collaborative whiteboard using:
4
+
- HCS web server (Eio-based) with WebSocket support
5
+
- OCaml CRDT library for synchronization
6
+
- HTML Canvas + plain JS + json-joy on the client
7
+
8
+
The server maintains the authoritative CRDT state and relays binary patches
9
+
between clients. *)
10
+
11
+
open Eio.Std
12
+
open Crdt
13
+
module J = Simdjsont.Json
14
+
15
+
(* ============================================================================
16
+
Configuration
17
+
============================================================================ *)
18
+
19
+
let default_port = 8080
20
+
let static_dir = "static"
21
+
22
+
(* ============================================================================
23
+
Types
24
+
============================================================================ *)
25
+
26
+
type client = {
27
+
id : string;
28
+
mutable name : string;
29
+
websocket : Hcs.Websocket.t option;
30
+
}
31
+
32
+
type server_state = {
33
+
model : Model.t;
34
+
mutable clients : client list;
35
+
mutable strokes : J.t list; (* Store strokes for new clients *)
36
+
mutable chat_history : (string * string) list; (* (sender, text) pairs *)
37
+
mutex : Eio.Mutex.t;
38
+
}
39
+
40
+
(* ============================================================================
41
+
Utilities
42
+
============================================================================ *)
43
+
44
+
let generate_id () =
45
+
let chars = "abcdefghijklmnopqrstuvwxyz0123456789" in
46
+
String.init 8 (fun _ -> chars.[Random.int (String.length chars)])
47
+
48
+
let log fmt =
49
+
Printf.ksprintf
50
+
(fun s ->
51
+
Printf.printf "[%s] %s\n%!"
52
+
(let t = Unix.localtime (Unix.time ()) in
53
+
Printf.sprintf "%02d:%02d:%02d" t.Unix.tm_hour t.Unix.tm_min
54
+
t.Unix.tm_sec)
55
+
s)
56
+
fmt
57
+
58
+
(* ============================================================================
59
+
Message Protocol
60
+
61
+
Messages are JSON with a "type" field:
62
+
- "join": Client joining with name
63
+
- "patch": Binary CRDT patch (base64 encoded)
64
+
- "sync": Full model sync (binary, base64 encoded)
65
+
- "presence": Cursor position update
66
+
- "users": List of connected users
67
+
============================================================================ *)
68
+
69
+
let json_of_string s =
70
+
match Simdjsont.Decode.decode_string Simdjsont.Decode.value s with
71
+
| Ok json -> Some json
72
+
| Error _ -> None
73
+
74
+
let string_of_json json = J.to_string json
75
+
76
+
let get_string_field obj key =
77
+
match obj with
78
+
| J.Object fields ->
79
+
List.find_map
80
+
(fun (k, v) ->
81
+
if k = key then match v with J.String s -> Some s | _ -> None
82
+
else None)
83
+
fields
84
+
| _ -> None
85
+
86
+
let get_float_field obj key =
87
+
match obj with
88
+
| J.Object fields ->
89
+
List.find_map
90
+
(fun (k, v) ->
91
+
if k = key then
92
+
match v with
93
+
| J.Float f -> Some f
94
+
| J.Int i -> Some (Int64.to_float i)
95
+
| _ -> None
96
+
else None)
97
+
fields
98
+
| _ -> None
99
+
100
+
let get_json_field obj key =
101
+
match obj with
102
+
| J.Object fields ->
103
+
List.find_map (fun (k, v) -> if k = key then Some v else None) fields
104
+
| _ -> None
105
+
106
+
let str s = J.String s
107
+
let num f = J.Float f
108
+
let arr items = J.Array items
109
+
let obj fields = J.Object fields
110
+
111
+
(* Build a users list message *)
112
+
let make_users_msg clients =
113
+
let users =
114
+
List.map (fun c -> obj [ ("id", str c.id); ("name", str c.name) ]) clients
115
+
in
116
+
obj [ ("type", str "users"); ("users", arr users) ]
117
+
118
+
(* Build a sync message with the full model *)
119
+
let make_sync_msg model =
120
+
let binary = Model_codec.Binary.encode model in
121
+
let b64 = Base64.encode_string (Bytes.to_string binary) in
122
+
obj [ ("type", str "sync"); ("data", str b64) ]
123
+
124
+
(* Build a patch relay message *)
125
+
let make_patch_msg patch_b64 sender_id =
126
+
obj
127
+
[ ("type", str "patch"); ("data", str patch_b64); ("from", str sender_id) ]
128
+
129
+
(* Build a presence message *)
130
+
let make_presence_msg client_id client_name x y =
131
+
obj
132
+
[
133
+
("type", str "presence");
134
+
("id", str client_id);
135
+
("name", str client_name);
136
+
("x", num x);
137
+
("y", num y);
138
+
]
139
+
140
+
(* Build a stroke history message *)
141
+
let make_stroke_history_msg strokes =
142
+
obj [ ("type", str "stroke_history"); ("strokes", arr strokes) ]
143
+
144
+
(* Build a chat history message *)
145
+
let make_chat_history_msg history =
146
+
let messages =
147
+
List.map
148
+
(fun (sender, text) -> obj [ ("sender", str sender); ("text", str text) ])
149
+
history
150
+
in
151
+
obj [ ("type", str "chat_history"); ("messages", arr messages) ]
152
+
153
+
(* ============================================================================
154
+
WebSocket Handlers
155
+
============================================================================ *)
156
+
157
+
let send_to_ws ws msg_str =
158
+
match Hcs.Websocket.send_text ws msg_str with Ok () -> () | Error _ -> ()
159
+
160
+
let broadcast_to_others state sender_id msg_json =
161
+
let msg_str = string_of_json msg_json in
162
+
List.iter
163
+
(fun client ->
164
+
if client.id <> sender_id then
165
+
match client.websocket with
166
+
| Some ws -> send_to_ws ws msg_str
167
+
| None -> ())
168
+
state.clients
169
+
170
+
let broadcast_to_all state msg_json =
171
+
let msg_str = string_of_json msg_json in
172
+
List.iter
173
+
(fun client ->
174
+
match client.websocket with
175
+
| Some ws -> send_to_ws ws msg_str
176
+
| None -> ())
177
+
state.clients
178
+
179
+
let handle_message state client msg_str =
180
+
match json_of_string msg_str with
181
+
| None -> log "Invalid JSON from %s" client.id
182
+
| Some json -> (
183
+
match get_string_field json "type" with
184
+
| Some "patch" -> (
185
+
(* Client sent a patch - decode, apply, and relay *)
186
+
match get_string_field json "data" with
187
+
| Some b64 -> (
188
+
match Base64.decode b64 with
189
+
| Ok patch_bytes -> (
190
+
let patch_buf = Bytes.of_string patch_bytes in
191
+
match Patch_codec_binary.decode patch_buf with
192
+
| Ok patch ->
193
+
Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
194
+
(* Apply patch to server model *)
195
+
Patch.iter_with_id
196
+
(fun id op -> Model.apply_op state.model id op)
197
+
patch);
198
+
(* Relay to other clients *)
199
+
broadcast_to_others state client.id
200
+
(make_patch_msg b64 client.id);
201
+
log "Patch from %s applied and relayed" client.name
202
+
| Error e ->
203
+
log "Failed to decode patch from %s: %s" client.name e)
204
+
| Error _ ->
205
+
log "Failed to decode base64 patch from %s" client.name)
206
+
| None -> ())
207
+
| Some "presence" ->
208
+
(* Cursor position update - relay to others *)
209
+
let x = Option.value (get_float_field json "x") ~default:0.0 in
210
+
let y = Option.value (get_float_field json "y") ~default:0.0 in
211
+
broadcast_to_others state client.id
212
+
(make_presence_msg client.id client.name x y)
213
+
| Some "drawing" -> (
214
+
(* Real-time drawing update - relay to others without storing *)
215
+
let from_json = get_json_field json "from" in
216
+
let to_json = get_json_field json "to" in
217
+
let color = get_string_field json "color" in
218
+
let size = get_float_field json "size" in
219
+
match (from_json, to_json) with
220
+
| Some from_pt, Some to_pt ->
221
+
let drawing_msg =
222
+
obj
223
+
[
224
+
("type", str "drawing");
225
+
("from", from_pt);
226
+
("to", to_pt);
227
+
("color", str (Option.value color ~default:"#000000"));
228
+
("size", num (Option.value size ~default:3.0));
229
+
]
230
+
in
231
+
broadcast_to_others state client.id drawing_msg
232
+
| _ -> ())
233
+
| Some "stroke" -> (
234
+
(* Completed stroke - store and relay to others *)
235
+
match get_json_field json "data" with
236
+
| Some stroke_data ->
237
+
Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
238
+
state.strokes <- state.strokes @ [ stroke_data ]);
239
+
let relay_msg =
240
+
obj [ ("type", str "stroke"); ("data", stroke_data) ]
241
+
in
242
+
broadcast_to_others state client.id relay_msg;
243
+
log "Stroke from %s stored and relayed" client.name
244
+
| None -> ())
245
+
| Some "chat" -> (
246
+
(* Chat message - store and relay to others with sender name *)
247
+
match get_string_field json "text" with
248
+
| Some text ->
249
+
Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
250
+
state.chat_history <-
251
+
state.chat_history @ [ (client.name, text) ];
252
+
(* Keep only last 100 messages *)
253
+
if List.length state.chat_history > 100 then
254
+
state.chat_history <-
255
+
List.filteri
256
+
(fun i _ -> i >= List.length state.chat_history - 100)
257
+
state.chat_history);
258
+
let chat_msg =
259
+
obj
260
+
[
261
+
("type", str "chat");
262
+
("sender", str client.name);
263
+
("text", str text);
264
+
]
265
+
in
266
+
broadcast_to_others state client.id chat_msg;
267
+
log "Chat from %s: %s" client.name text
268
+
| None -> ())
269
+
| Some "request_sync" ->
270
+
(* Client requesting full sync *)
271
+
let sync_msg =
272
+
Eio.Mutex.use_ro state.mutex (fun () -> make_sync_msg state.model)
273
+
in
274
+
(match client.websocket with
275
+
| Some ws -> send_to_ws ws (string_of_json sync_msg)
276
+
| None -> ());
277
+
log "Sync sent to %s" client.name
278
+
| Some "set_name" -> (
279
+
(* Client setting their display name *)
280
+
match get_string_field json "name" with
281
+
| Some new_name ->
282
+
let users_msg =
283
+
Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
284
+
(* Update the client's name *)
285
+
client.name <- new_name;
286
+
make_users_msg state.clients)
287
+
in
288
+
(* Broadcast updated user list to all *)
289
+
broadcast_to_all state users_msg;
290
+
log "Client %s set name to '%s'" client.id new_name
291
+
| None -> ())
292
+
| Some other -> log "Unknown message type '%s' from %s" other client.name
293
+
| None -> log "Message without type from %s" client.name)
294
+
295
+
let handle_websocket state (ws : Hcs.Websocket.t) =
296
+
(* Generate client ID *)
297
+
let client_id = generate_id () in
298
+
let client =
299
+
{ id = client_id; name = "User-" ^ client_id; websocket = Some ws }
300
+
in
301
+
302
+
(* Add client to state *)
303
+
Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
304
+
state.clients <- client :: state.clients);
305
+
306
+
log "Client %s connected" client.id;
307
+
308
+
(* Send initial sync *)
309
+
let sync_msg, users_msg, stroke_history_msg, chat_history_msg =
310
+
Eio.Mutex.use_ro state.mutex (fun () ->
311
+
( make_sync_msg state.model,
312
+
make_users_msg state.clients,
313
+
make_stroke_history_msg state.strokes,
314
+
make_chat_history_msg state.chat_history ))
315
+
in
316
+
317
+
send_to_ws ws
318
+
(string_of_json (obj [ ("type", str "welcome"); ("id", str client_id) ]));
319
+
send_to_ws ws (string_of_json sync_msg);
320
+
send_to_ws ws (string_of_json users_msg);
321
+
send_to_ws ws (string_of_json stroke_history_msg);
322
+
send_to_ws ws (string_of_json chat_history_msg);
323
+
324
+
(* Broadcast new user to others *)
325
+
broadcast_to_others state client_id users_msg;
326
+
327
+
(* Message loop *)
328
+
let rec loop () =
329
+
match Hcs.Websocket.recv_message ws with
330
+
| Error Hcs.Websocket.Connection_closed ->
331
+
(* Client disconnected *)
332
+
let users_msg =
333
+
Eio.Mutex.use_rw ~protect:true state.mutex (fun () ->
334
+
state.clients <-
335
+
List.filter (fun c -> c.id <> client_id) state.clients;
336
+
make_users_msg state.clients)
337
+
in
338
+
broadcast_to_all state users_msg;
339
+
log "Client %s disconnected" client.id
340
+
| Error (Hcs.Websocket.Protocol_error msg) ->
341
+
log "Protocol error from %s: %s" client.id msg
342
+
| Error (Hcs.Websocket.Io_error msg) ->
343
+
log "IO error from %s: %s" client.id msg
344
+
| Ok (_opcode, msg) ->
345
+
handle_message state client msg;
346
+
loop ()
347
+
in
348
+
loop ()
349
+
350
+
(* ============================================================================
351
+
HTTP Handler with Routing
352
+
============================================================================ *)
353
+
354
+
let make_handler ~fs ~static_path _state (req : Hcs.Server.request) =
355
+
let target = req.target in
356
+
let path =
357
+
match String.index_opt target '?' with
358
+
| Some i -> String.sub target 0 i
359
+
| None -> target
360
+
in
361
+
(* Route /ws separately, everything else is static *)
362
+
if path = "/ws" then
363
+
(* This shouldn't be reached because WebSocket is handled separately *)
364
+
Hcs.Server.respond ~status:`Bad_request
365
+
"WebSocket connections should use the /ws endpoint"
366
+
else
367
+
(* Serve static files *)
368
+
let static_handler = Hcs.Plug.Static.server ~fs static_path in
369
+
static_handler req
370
+
371
+
(* ============================================================================
372
+
Main
373
+
============================================================================ *)
374
+
375
+
let () =
376
+
Random.self_init ();
377
+
378
+
(* Initialize server state with empty model *)
379
+
let state =
380
+
{
381
+
model = Model.create (Random.int 1000000);
382
+
clients = [];
383
+
strokes = [];
384
+
chat_history = [];
385
+
mutex = Eio.Mutex.create ();
386
+
}
387
+
in
388
+
389
+
(* Initialize the model with a root object for strokes *)
390
+
let root_obj_id = Clock.tick state.model.clock.local in
391
+
let root_obj = Node.make_obj ~id:root_obj_id in
392
+
Model.add_node state.model root_obj;
393
+
Node.set_val state.model.root ~value:root_obj_id;
394
+
395
+
(* Get port from environment or use default *)
396
+
let port =
397
+
match Sys.getenv_opt "PORT" with
398
+
| Some p -> ( try int_of_string p with _ -> default_port)
399
+
| None -> default_port
400
+
in
401
+
402
+
(* Find static directory *)
403
+
let static_path =
404
+
let candidates =
405
+
[
406
+
static_dir;
407
+
"bin/wb/" ^ static_dir;
408
+
Filename.concat (Filename.dirname Sys.executable_name) static_dir;
409
+
]
410
+
in
411
+
List.find_opt Sys.file_exists candidates |> Option.value ~default:static_dir
412
+
in
413
+
414
+
log "Starting Webboard server (HCS) on http://localhost:%d" port;
415
+
log "Static files from: %s" static_path;
416
+
417
+
Eio_main.run @@ fun env ->
418
+
let net = Eio.Stdenv.net env in
419
+
let fs = Eio.Stdenv.fs env in
420
+
Switch.run @@ fun sw ->
421
+
let config =
422
+
Hcs.Server.websocket_config |> Hcs.Server.with_port port
423
+
|> Hcs.Server.without_gc_tuning
424
+
in
425
+
let handler = make_handler ~fs ~static_path state in
426
+
let ws_handler ws = handle_websocket state ws in
427
+
Hcs.Server.run ~sw ~net ~config ~ws_handler handler
+1
-1
bin/webboard/dune
bin/wbd/dune
+1
-1
bin/webboard/dune
bin/wbd/dune
bin/webboard/static/index.html
bin/wbd/static/index.html
bin/webboard/static/index.html
bin/wbd/static/index.html
bin/webboard/webboard.ml
bin/wbd/wbd.ml
bin/webboard/webboard.ml
bin/wbd/wbd.ml
-5
bin/whiteboard/dune
-5
bin/whiteboard/dune
-1527
bin/whiteboard/whiteboard.ml
-1527
bin/whiteboard/whiteboard.ml
···
1
-
(** Collaborative Whiteboard Demo
2
-
3
-
A real-time collaborative whiteboard with:
4
-
- Drawing with multiple colors and brush sizes
5
-
- Real-time cursor presence
6
-
- Chat functionality
7
-
- Multi-user networking via Eio
8
-
9
-
Usage: ./whiteboard --name "Alice" --host # Start as server ./whiteboard
10
-
--name "Bob" --connect 192.168.1.1 # Connect to server ./whiteboard --name
11
-
"Carol" # Local-only mode *)
12
-
13
-
[@@@warning "-69-32-27-26"]
14
-
15
-
open Raylib
16
-
module J = Simdjsont.Json
17
-
18
-
(* ============================================================================
19
-
Configuration & Scaling
20
-
============================================================================ *)
21
-
22
-
let base_width = 1200
23
-
let base_height = 800
24
-
let default_port = 9999
25
-
let max_chat_messages = 50
26
-
let presence_timeout = 10.0 (* Time before cursor fades *)
27
-
let user_timeout = 30.0 (* Time before user is removed from list *)
28
-
29
-
let sync_interval =
30
-
0.5 (* Presence update interval - reduced to avoid flooding *)
31
-
32
-
(* Mutable layout values that update on resize *)
33
-
let window_width = ref base_width
34
-
let window_height = ref base_height
35
-
36
-
(* DPI scale - set once at startup based on display, doesn't change on resize *)
37
-
let dpi_scale = ref 1.0
38
-
39
-
(* Scale for DPI only (not window size) *)
40
-
let scale x = int_of_float (float_of_int x *. !dpi_scale)
41
-
let scalef x = x *. !dpi_scale
42
-
43
-
(* Get mouse position *)
44
-
let get_mouse_pos () =
45
-
let mx = float_of_int (get_mouse_x ()) in
46
-
let my = float_of_int (get_mouse_y ()) in
47
-
(mx, my)
48
-
49
-
(* Fixed UI element sizes (scaled for DPI only) *)
50
-
let margin () = scale 10
51
-
let sidebar_width () = scale 280
52
-
let palette_height () = scale 50
53
-
let chat_height () = scale 280
54
-
55
-
(* Layout calculations - canvas grows/shrinks with window, other elements stay fixed *)
56
-
let canvas_x () = margin ()
57
-
let canvas_y () = margin ()
58
-
59
-
let canvas_width () =
60
-
!window_width - canvas_x () - sidebar_width () - (margin () * 2)
61
-
62
-
let canvas_height () = !window_height - (margin () * 2) - palette_height ()
63
-
let sidebar_x () = !window_width - sidebar_width () - margin ()
64
-
let chat_y () = !window_height - chat_height () - margin ()
65
-
let presence_y () = canvas_y ()
66
-
let presence_height () = chat_y () - presence_y () - margin ()
67
-
68
-
(* ============================================================================
69
-
Types
70
-
============================================================================ *)
71
-
72
-
type point = { x : float; y : float }
73
-
74
-
type stroke = {
75
-
stroke_id : string;
76
-
color : int; (* Store as int for serialization *)
77
-
thickness : float;
78
-
points : point list;
79
-
}
80
-
81
-
type user_presence = {
82
-
user_id : string;
83
-
user_name : string;
84
-
cursor_x : float;
85
-
cursor_y : float;
86
-
user_color : int;
87
-
last_seen : float;
88
-
}
89
-
90
-
type chat_message = {
91
-
msg_id : string;
92
-
sender : string;
93
-
text : string;
94
-
timestamp : float;
95
-
}
96
-
97
-
type app_mode = Local | Server | Client of string
98
-
99
-
type drawing_segment = {
100
-
draw_user_id : string;
101
-
draw_color : int;
102
-
draw_thickness : float;
103
-
draw_from : point;
104
-
draw_to : point;
105
-
}
106
-
107
-
type net_message =
108
-
| MsgStroke of stroke
109
-
| MsgDrawing of drawing_segment
110
-
| MsgPresence of user_presence
111
-
| MsgChat of chat_message
112
-
| MsgSync of {
113
-
strokes : stroke list;
114
-
messages : chat_message list;
115
-
users : user_presence list;
116
-
}
117
-
| MsgJoin of { user_id : string; user_name : string; user_color : int }
118
-
| MsgLeave of { user_id : string }
119
-
120
-
type app_state = {
121
-
mutable mode : app_mode;
122
-
mutable my_id : string;
123
-
mutable my_name : string;
124
-
mutable my_color : int;
125
-
mutable is_drawing : bool;
126
-
mutable current_stroke : point list;
127
-
mutable strokes : stroke list;
128
-
mutable brush_color : int;
129
-
mutable brush_size : float;
130
-
mutable palette_index : int;
131
-
users : (string, user_presence) Hashtbl.t; (* user_id -> presence *)
132
-
mutable messages : chat_message list;
133
-
mutable chat_input : string;
134
-
mutable chat_focused : bool;
135
-
mutable connected : bool;
136
-
mutable last_presence_sync : float;
137
-
mutable name_input : string;
138
-
mutable entering_name : bool;
139
-
(* Remote drawings: segments from other users currently being drawn *)
140
-
mutable remote_drawings : drawing_segment list;
141
-
(* Eio streams for message passing *)
142
-
incoming : net_message Eio.Stream.t;
143
-
outgoing : net_message Eio.Stream.t;
144
-
}
145
-
146
-
(* ============================================================================
147
-
Colors
148
-
============================================================================ *)
149
-
150
-
(* Multiple palettes - each has black, white, and 8 colors *)
151
-
(* Palette 0: Pastel light *)
152
-
let palette_pastel_light =
153
-
[|
154
-
0x000000ff;
155
-
0xffffffff;
156
-
0xffb3baff;
157
-
0xffdfbaff;
158
-
0xffffbaff;
159
-
0xbaffc9ff;
160
-
0xbae1ffff;
161
-
0xbac8ffff;
162
-
0xe0baffff;
163
-
0xffbaffff;
164
-
|]
165
-
166
-
(* Palette 1: Pastel medium *)
167
-
let palette_pastel_medium =
168
-
[|
169
-
0x000000ff;
170
-
0xffffffff;
171
-
0xf8a5a5ff;
172
-
0xf8cba5ff;
173
-
0xf8f8a5ff;
174
-
0xa5f8b8ff;
175
-
0xa5d8f8ff;
176
-
0xa5a5f8ff;
177
-
0xd8a5f8ff;
178
-
0xf8a5d8ff;
179
-
|]
180
-
181
-
(* Palette 2: Pastel muted *)
182
-
let palette_pastel_muted =
183
-
[|
184
-
0x000000ff;
185
-
0xffffffff;
186
-
0xd4a5a5ff;
187
-
0xd4bfa5ff;
188
-
0xd4d4a5ff;
189
-
0xa5d4b0ff;
190
-
0xa5c4d4ff;
191
-
0xa5a5d4ff;
192
-
0xc4a5d4ff;
193
-
0xd4a5c4ff;
194
-
|]
195
-
196
-
(* Palette 3: Soft warm *)
197
-
let palette_soft_warm =
198
-
[|
199
-
0x000000ff;
200
-
0xffffffff;
201
-
0xe8998dff;
202
-
0xeab69fff;
203
-
0xead89fff;
204
-
0xb5deadff;
205
-
0x9fd5e8ff;
206
-
0x9fb5e8ff;
207
-
0xc49fe8ff;
208
-
0xe89fc4ff;
209
-
|]
210
-
211
-
(* Palette 4: Soft cool *)
212
-
let palette_soft_cool =
213
-
[|
214
-
0x000000ff;
215
-
0xffffffff;
216
-
0x9db5c4ff;
217
-
0x9dc4b5ff;
218
-
0xb5c49dff;
219
-
0xc4b59dff;
220
-
0xc49db5ff;
221
-
0xb59dc4ff;
222
-
0x9dc5c4ff;
223
-
0xa5c4c4ff;
224
-
|]
225
-
226
-
(* Palette 5: Vibrant *)
227
-
let palette_vibrant =
228
-
[|
229
-
0x000000ff;
230
-
0xffffffff;
231
-
0xff6b6bff;
232
-
0xffa94dff;
233
-
0xffd93dff;
234
-
0x6bcb77ff;
235
-
0x4d96ffff;
236
-
0x6b5bffff;
237
-
0xc56bffff;
238
-
0xff6bb5ff;
239
-
|]
240
-
241
-
(* Palette 6: Earth tones *)
242
-
let palette_earth =
243
-
[|
244
-
0x000000ff;
245
-
0xffffffff;
246
-
0xc9a87cff;
247
-
0xa8c97cff;
248
-
0x7cc9a8ff;
249
-
0x7ca8c9ff;
250
-
0xa87cc9ff;
251
-
0xc97ca8ff;
252
-
0xb5a88fff;
253
-
0x8fb5a8ff;
254
-
|]
255
-
256
-
(* Palette 7: Classic *)
257
-
let palette_classic =
258
-
[|
259
-
0x000000ff;
260
-
0xffffffff;
261
-
0xff0000ff;
262
-
0xff8000ff;
263
-
0xffff00ff;
264
-
0x00ff00ff;
265
-
0x00ffffff;
266
-
0x0000ffff;
267
-
0x8000ffff;
268
-
0xff00ffff;
269
-
|]
270
-
271
-
let all_palettes =
272
-
[|
273
-
palette_pastel_light;
274
-
palette_pastel_medium;
275
-
palette_pastel_muted;
276
-
palette_soft_warm;
277
-
palette_soft_cool;
278
-
palette_vibrant;
279
-
palette_earth;
280
-
palette_classic;
281
-
|]
282
-
283
-
let num_palettes = Array.length all_palettes
284
-
285
-
(* Get current palette colors *)
286
-
let get_palette idx = all_palettes.(idx mod num_palettes)
287
-
288
-
let user_colors =
289
-
[|
290
-
0xe74c3cff;
291
-
0x2ecc71ff;
292
-
0x3498dbff;
293
-
0x9b59b6ff;
294
-
0xf1c40fff;
295
-
0xe67e22ff;
296
-
0x1abc9cff;
297
-
0x34495eff;
298
-
|]
299
-
300
-
let color_of_int n =
301
-
Color.create
302
-
((n lsr 24) land 0xff)
303
-
((n lsr 16) land 0xff)
304
-
((n lsr 8) land 0xff)
305
-
(n land 0xff)
306
-
307
-
let get_user_color idx = user_colors.(abs idx mod Array.length user_colors)
308
-
309
-
(* ============================================================================
310
-
Utilities
311
-
============================================================================ *)
312
-
313
-
let generate_id () =
314
-
Random.self_init ();
315
-
String.init 8 (fun _ ->
316
-
let n = Random.int 36 in
317
-
if n < 10 then Char.chr (48 + n) else Char.chr (97 + n - 10))
318
-
319
-
let current_time () = Unix.gettimeofday ()
320
-
let clamp v lo hi = max lo (min hi v)
321
-
322
-
let point_in_rect x y rx ry rw rh =
323
-
x >= float_of_int rx
324
-
&& x <= float_of_int (rx + rw)
325
-
&& y >= float_of_int ry
326
-
&& y <= float_of_int (ry + rh)
327
-
328
-
(* ============================================================================
329
-
JSON Serialization using Simdjsont
330
-
============================================================================ *)
331
-
332
-
(* Helper to build JSON objects *)
333
-
let obj members = J.Object members
334
-
let arr items = J.Array items
335
-
let str s = J.String s
336
-
let num f = J.Float f
337
-
let int_num i = J.Float (Float.of_int i)
338
-
let mem k v = (k, v)
339
-
340
-
(* Encode to JSON string - MUST be minified (no newlines) for line-based protocol *)
341
-
let json_to_string json = J.to_string json
342
-
343
-
(* Build JSON for point *)
344
-
let point_to_json p = arr [ num p.x; num p.y ]
345
-
346
-
(* Build JSON for stroke (with or without type tag) *)
347
-
let stroke_to_json_obj ~with_type s =
348
-
let base =
349
-
[
350
-
mem "i" (str s.stroke_id);
351
-
mem "c" (int_num s.color);
352
-
mem "k" (num s.thickness);
353
-
mem "p" (arr (List.map point_to_json s.points));
354
-
]
355
-
in
356
-
obj (if with_type then mem "t" (str "s") :: base else base)
357
-
358
-
(* Build JSON for user/presence (with or without type tag) *)
359
-
let user_to_json_obj ~with_type p =
360
-
let base =
361
-
[
362
-
mem "i" (str p.user_id);
363
-
mem "n" (str p.user_name);
364
-
mem "x" (num p.cursor_x);
365
-
mem "y" (num p.cursor_y);
366
-
mem "c" (int_num p.user_color);
367
-
mem "s" (num p.last_seen);
368
-
]
369
-
in
370
-
obj (if with_type then mem "t" (str "p") :: base else base)
371
-
372
-
(* Build JSON for chat message (with or without type tag) *)
373
-
let chat_to_json_obj ~with_type m =
374
-
let base =
375
-
[
376
-
mem "i" (str m.msg_id);
377
-
mem "n" (str m.sender);
378
-
mem "x" (str m.text);
379
-
mem "s" (num m.timestamp);
380
-
]
381
-
in
382
-
obj (if with_type then mem "t" (str "m") :: base else base)
383
-
384
-
(* Build JSON for join message *)
385
-
let join_to_json_obj user_id user_name user_color =
386
-
obj
387
-
[
388
-
mem "t" (str "j");
389
-
mem "i" (str user_id);
390
-
mem "n" (str user_name);
391
-
mem "c" (int_num user_color);
392
-
]
393
-
394
-
(* Build JSON for leave message *)
395
-
let leave_to_json_obj user_id = obj [ mem "t" (str "l"); mem "i" (str user_id) ]
396
-
397
-
(* Build JSON for sync message *)
398
-
let sync_to_json_obj strokes messages users =
399
-
obj
400
-
[
401
-
mem "t" (str "y");
402
-
mem "s" (arr (List.map (stroke_to_json_obj ~with_type:false) strokes));
403
-
mem "m" (arr (List.map (chat_to_json_obj ~with_type:false) messages));
404
-
mem "u" (arr (List.map (user_to_json_obj ~with_type:false) users));
405
-
]
406
-
407
-
(* Build JSON for drawing segment (real-time line) *)
408
-
let drawing_to_json_obj d =
409
-
obj
410
-
[
411
-
mem "t" (str "d");
412
-
mem "i" (str d.draw_user_id);
413
-
mem "c" (int_num d.draw_color);
414
-
mem "k" (num d.draw_thickness);
415
-
mem "f" (point_to_json d.draw_from);
416
-
mem "o" (point_to_json d.draw_to);
417
-
]
418
-
419
-
(* Convert message to JSON string *)
420
-
let message_to_json = function
421
-
| MsgStroke s -> json_to_string (stroke_to_json_obj ~with_type:true s)
422
-
| MsgDrawing d -> json_to_string (drawing_to_json_obj d)
423
-
| MsgPresence p -> json_to_string (user_to_json_obj ~with_type:true p)
424
-
| MsgChat m -> json_to_string (chat_to_json_obj ~with_type:true m)
425
-
| MsgJoin { user_id; user_name; user_color } ->
426
-
json_to_string (join_to_json_obj user_id user_name user_color)
427
-
| MsgLeave { user_id } -> json_to_string (leave_to_json_obj user_id)
428
-
| MsgSync { strokes; messages; users } ->
429
-
json_to_string (sync_to_json_obj strokes messages users)
430
-
431
-
(* JSON parsing helpers *)
432
-
let get_string_field obj key =
433
-
List.find_map
434
-
(fun (k, v) ->
435
-
if k = key then match v with J.String s -> Some s | _ -> None else None)
436
-
obj
437
-
438
-
let get_int_field obj key =
439
-
List.find_map
440
-
(fun (k, v) ->
441
-
if k = key then
442
-
match v with
443
-
| J.Float f -> Some (Float.to_int f)
444
-
| J.Int i -> Some (Int64.to_int i)
445
-
| _ -> None
446
-
else None)
447
-
obj
448
-
449
-
let get_float_field obj key =
450
-
List.find_map
451
-
(fun (k, v) ->
452
-
if k = key then
453
-
match v with
454
-
| J.Float f -> Some f
455
-
| J.Int i -> Some (Int64.to_float i)
456
-
| _ -> None
457
-
else None)
458
-
obj
459
-
460
-
let get_array_field obj key =
461
-
List.find_map
462
-
(fun (k, v) ->
463
-
if k = key then match v with J.Array items -> Some items | _ -> None
464
-
else None)
465
-
obj
466
-
467
-
(* Parse point from JSON *)
468
-
let parse_point_json = function
469
-
| J.Array [ J.Float x; J.Float y ] -> Some { x; y }
470
-
| J.Array [ J.Int x; J.Int y ] ->
471
-
Some { x = Int64.to_float x; y = Int64.to_float y }
472
-
| J.Array [ J.Float x; J.Int y ] -> Some { x; y = Int64.to_float y }
473
-
| J.Array [ J.Int x; J.Float y ] -> Some { x = Int64.to_float x; y }
474
-
| _ -> None
475
-
476
-
(* Parse stroke from JSON object members *)
477
-
let parse_stroke_json obj =
478
-
let stroke_id =
479
-
Option.value (get_string_field obj "i") ~default:(generate_id ())
480
-
in
481
-
let color = Option.value (get_int_field obj "c") ~default:0x000000ff in
482
-
let thickness = Option.value (get_float_field obj "k") ~default:3.0 in
483
-
let points =
484
-
match get_array_field obj "p" with
485
-
| Some items -> List.filter_map parse_point_json items
486
-
| None -> []
487
-
in
488
-
{ stroke_id; color; thickness; points }
489
-
490
-
(* Parse presence/user from JSON object members *)
491
-
let parse_presence_json obj =
492
-
let user_id = Option.value (get_string_field obj "i") ~default:"" in
493
-
let user_name = Option.value (get_string_field obj "n") ~default:"?" in
494
-
let cursor_x = Option.value (get_float_field obj "x") ~default:0.0 in
495
-
let cursor_y = Option.value (get_float_field obj "y") ~default:0.0 in
496
-
let user_color = Option.value (get_int_field obj "c") ~default:0x808080ff in
497
-
let last_seen =
498
-
Option.value (get_float_field obj "s") ~default:(current_time ())
499
-
in
500
-
{ user_id; user_name; cursor_x; cursor_y; user_color; last_seen }
501
-
502
-
(* Parse chat from JSON object members *)
503
-
let parse_chat_json obj =
504
-
let msg_id =
505
-
Option.value (get_string_field obj "i") ~default:(generate_id ())
506
-
in
507
-
let sender = Option.value (get_string_field obj "n") ~default:"?" in
508
-
let text = Option.value (get_string_field obj "x") ~default:"" in
509
-
let timestamp =
510
-
Option.value (get_float_field obj "s") ~default:(current_time ())
511
-
in
512
-
{ msg_id; sender; text; timestamp }
513
-
514
-
(* Parse array of objects *)
515
-
let parse_array_json items parse_fn =
516
-
List.filter_map
517
-
(function J.Object obj -> Some (parse_fn obj) | _ -> None)
518
-
items
519
-
520
-
(* Parse drawing segment from JSON object members *)
521
-
let parse_drawing_json obj =
522
-
let user_id = Option.value (get_string_field obj "i") ~default:"" in
523
-
let color = Option.value (get_int_field obj "c") ~default:0x000000ff in
524
-
let thickness = Option.value (get_float_field obj "k") ~default:3.0 in
525
-
let from_point =
526
-
match get_array_field obj "f" with
527
-
| Some items -> (
528
-
match parse_point_json (J.Array items) with
529
-
| Some p -> p
530
-
| None -> { x = 0.; y = 0. })
531
-
| None -> { x = 0.; y = 0. }
532
-
in
533
-
let to_point =
534
-
match get_array_field obj "o" with
535
-
| Some items -> (
536
-
match parse_point_json (J.Array items) with
537
-
| Some p -> p
538
-
| None -> { x = 0.; y = 0. })
539
-
| None -> { x = 0.; y = 0. }
540
-
in
541
-
{
542
-
draw_user_id = user_id;
543
-
draw_color = color;
544
-
draw_thickness = thickness;
545
-
draw_from = from_point;
546
-
draw_to = to_point;
547
-
}
548
-
549
-
(* Parse a network message from JSON string *)
550
-
let parse_message json_str =
551
-
match Simdjsont.Decode.decode_string Simdjsont.Decode.value json_str with
552
-
| Error _ -> None
553
-
| Ok (J.Object obj) -> (
554
-
match get_string_field obj "t" with
555
-
| Some "s" -> Some (MsgStroke (parse_stroke_json obj))
556
-
| Some "d" -> Some (MsgDrawing (parse_drawing_json obj))
557
-
| Some "p" -> Some (MsgPresence (parse_presence_json obj))
558
-
| Some "m" -> Some (MsgChat (parse_chat_json obj))
559
-
| Some "j" ->
560
-
let user_id = Option.value (get_string_field obj "i") ~default:"" in
561
-
let user_name =
562
-
Option.value (get_string_field obj "n") ~default:"?"
563
-
in
564
-
let user_color =
565
-
Option.value (get_int_field obj "c") ~default:0x808080ff
566
-
in
567
-
Some (MsgJoin { user_id; user_name; user_color })
568
-
| Some "l" ->
569
-
let user_id = Option.value (get_string_field obj "i") ~default:"" in
570
-
Some (MsgLeave { user_id })
571
-
| Some "y" ->
572
-
let strokes =
573
-
match get_array_field obj "s" with
574
-
| Some items -> parse_array_json items parse_stroke_json
575
-
| None -> []
576
-
in
577
-
let messages =
578
-
match get_array_field obj "m" with
579
-
| Some items -> parse_array_json items parse_chat_json
580
-
| None -> []
581
-
in
582
-
let users =
583
-
match get_array_field obj "u" with
584
-
| Some items -> parse_array_json items parse_presence_json
585
-
| None -> []
586
-
in
587
-
Some (MsgSync { strokes; messages; users })
588
-
| _ -> None)
589
-
| Ok _ -> None
590
-
591
-
(* ============================================================================
592
-
State Management
593
-
============================================================================ *)
594
-
595
-
let create_state ~incoming ~outgoing mode name =
596
-
Random.self_init ();
597
-
let my_id = generate_id () in
598
-
let my_color = get_user_color (Hashtbl.hash my_id) in
599
-
{
600
-
mode;
601
-
my_id;
602
-
my_name = name;
603
-
my_color;
604
-
is_drawing = false;
605
-
current_stroke = [];
606
-
strokes = [];
607
-
brush_color = 0x000000ff;
608
-
brush_size = 3.0;
609
-
palette_index = 0;
610
-
users = Hashtbl.create 32;
611
-
messages = [];
612
-
chat_input = "";
613
-
chat_focused = false;
614
-
connected = false;
615
-
last_presence_sync = 0.0;
616
-
name_input = name;
617
-
entering_name = name = "";
618
-
remote_drawings = [];
619
-
incoming;
620
-
outgoing;
621
-
}
622
-
623
-
let send_message state msg = Eio.Stream.add state.outgoing msg
624
-
625
-
let add_stroke state stroke ~broadcast =
626
-
if not (List.exists (fun s -> s.stroke_id = stroke.stroke_id) state.strokes)
627
-
then begin
628
-
state.strokes <- state.strokes @ [ stroke ];
629
-
if broadcast then send_message state (MsgStroke stroke)
630
-
end
631
-
632
-
let add_chat state msg ~broadcast =
633
-
if not (List.exists (fun m -> m.msg_id = msg.msg_id) state.messages) then begin
634
-
state.messages <- state.messages @ [ msg ];
635
-
if List.length state.messages > max_chat_messages then
636
-
state.messages <-
637
-
List.filteri
638
-
(fun i _ -> i >= List.length state.messages - max_chat_messages)
639
-
state.messages;
640
-
if broadcast then send_message state (MsgChat msg)
641
-
end
642
-
643
-
let update_user state user ~broadcast =
644
-
Hashtbl.replace state.users user.user_id user;
645
-
if broadcast then send_message state (MsgPresence user)
646
-
647
-
let process_incoming state =
648
-
(* Non-blocking take from stream *)
649
-
let rec drain () =
650
-
match Eio.Stream.take_nonblocking state.incoming with
651
-
| None -> ()
652
-
| Some msg ->
653
-
(match msg with
654
-
| MsgStroke s -> add_stroke state s ~broadcast:false
655
-
| MsgDrawing d when d.draw_user_id <> state.my_id ->
656
-
(* Add to remote drawings for rendering *)
657
-
state.remote_drawings <- d :: state.remote_drawings;
658
-
(* Limit to last 100 segments to prevent memory issues *)
659
-
if List.length state.remote_drawings > 100 then
660
-
state.remote_drawings <-
661
-
List.filteri (fun i _ -> i < 100) state.remote_drawings
662
-
| MsgDrawing _ -> ()
663
-
| MsgPresence p when p.user_id <> state.my_id ->
664
-
update_user state p ~broadcast:false
665
-
| MsgPresence _ -> ()
666
-
| MsgChat m -> add_chat state m ~broadcast:false
667
-
| MsgJoin { user_id; user_name; user_color }
668
-
when user_id <> state.my_id && user_id <> "" ->
669
-
let p =
670
-
{
671
-
user_id;
672
-
user_name;
673
-
cursor_x = 0.;
674
-
cursor_y = 0.;
675
-
user_color;
676
-
last_seen = current_time ();
677
-
}
678
-
in
679
-
update_user state p ~broadcast:false;
680
-
add_chat state
681
-
{
682
-
msg_id = generate_id ();
683
-
sender = "System";
684
-
text = user_name ^ " joined";
685
-
timestamp = current_time ();
686
-
}
687
-
~broadcast:false
688
-
| MsgJoin _ -> ()
689
-
| MsgLeave { user_id } -> (
690
-
let user = Hashtbl.find_opt state.users user_id in
691
-
Hashtbl.remove state.users user_id;
692
-
(* Clear remote drawings from this user *)
693
-
state.remote_drawings <-
694
-
List.filter
695
-
(fun d -> d.draw_user_id <> user_id)
696
-
state.remote_drawings;
697
-
match user with
698
-
| Some u ->
699
-
add_chat state
700
-
{
701
-
msg_id = generate_id ();
702
-
sender = "System";
703
-
text = u.user_name ^ " left";
704
-
timestamp = current_time ();
705
-
}
706
-
~broadcast:false
707
-
| None -> ())
708
-
| MsgSync { strokes; messages; users } ->
709
-
List.iter (fun s -> add_stroke state s ~broadcast:false) strokes;
710
-
List.iter (fun m -> add_chat state m ~broadcast:false) messages;
711
-
List.iter
712
-
(fun u ->
713
-
if u.user_id <> state.my_id then
714
-
update_user state u ~broadcast:false)
715
-
users);
716
-
drain ()
717
-
in
718
-
drain ()
719
-
720
-
let cleanup_users state =
721
-
let now = current_time () in
722
-
let to_remove =
723
-
Hashtbl.fold
724
-
(fun user_id u acc ->
725
-
if user_id <> state.my_id && now -. u.last_seen >= user_timeout then
726
-
user_id :: acc
727
-
else acc)
728
-
state.users []
729
-
in
730
-
List.iter (Hashtbl.remove state.users) to_remove
731
-
732
-
(* ============================================================================
733
-
Networking with Eio
734
-
============================================================================ *)
735
-
736
-
(* Set TCP_NODELAY on a socket to disable Nagle's algorithm for low latency *)
737
-
let set_tcp_nodelay flow =
738
-
match Eio_unix.Resource.fd_opt flow with
739
-
| Some fd ->
740
-
Eio_unix.Fd.use_exn "setsockopt" fd (fun unix_fd ->
741
-
Unix.setsockopt unix_fd Unix.TCP_NODELAY true)
742
-
| None -> ()
743
-
744
-
(* Read a line from an Eio flow *)
745
-
let read_line flow buf_reader =
746
-
let buf = Buffer.create 256 in
747
-
let rec loop () =
748
-
match Eio.Buf_read.any_char buf_reader with
749
-
| '\n' -> Some (Buffer.contents buf)
750
-
| c ->
751
-
Buffer.add_char buf c;
752
-
loop ()
753
-
in
754
-
try loop () with End_of_file -> None
755
-
756
-
(* Write a line to an Eio flow *)
757
-
let write_line flow line = Eio.Flow.copy_string (line ^ "\n") flow
758
-
759
-
(* Client info tracked by server - using polymorphic type for flow *)
760
-
type 'a client_info = {
761
-
client_flow : 'a;
762
-
mutable client_user_id : string option;
763
-
}
764
-
765
-
(* Handle a single client connection (server-side) *)
766
-
let handle_client ~sw ~state ~clients ~clients_mutex client_info =
767
-
let flow = client_info.client_flow in
768
-
set_tcp_nodelay flow;
769
-
let buf_reader = Eio.Buf_read.of_flow ~max_size:65536 flow in
770
-
771
-
(* Send sync to new client *)
772
-
let my_presence =
773
-
{
774
-
user_id = state.my_id;
775
-
user_name = state.my_name;
776
-
cursor_x = 0.;
777
-
cursor_y = 0.;
778
-
user_color = state.my_color;
779
-
last_seen = current_time ();
780
-
}
781
-
in
782
-
let all_users =
783
-
my_presence :: Hashtbl.fold (fun _ u acc -> u :: acc) state.users []
784
-
in
785
-
let sync =
786
-
MsgSync
787
-
{ strokes = state.strokes; messages = state.messages; users = all_users }
788
-
in
789
-
(try write_line flow (message_to_json sync) with _ -> ());
790
-
791
-
(* Read messages from client *)
792
-
let rec read_loop () =
793
-
match read_line flow buf_reader with
794
-
| None -> () (* Client disconnected *)
795
-
| Some line -> (
796
-
match parse_message line with
797
-
| Some msg ->
798
-
(* Track user_id from join message *)
799
-
(match msg with
800
-
| MsgJoin { user_id; _ } ->
801
-
client_info.client_user_id <- Some user_id
802
-
| _ -> ());
803
-
(* Send to main via stream *)
804
-
Eio.Stream.add state.incoming msg;
805
-
(* Broadcast to other clients *)
806
-
Eio.Mutex.use_rw ~protect:true clients_mutex (fun () ->
807
-
List.iter
808
-
(fun ci ->
809
-
if ci.client_flow != flow then
810
-
try write_line ci.client_flow (message_to_json msg)
811
-
with _ -> ())
812
-
!clients);
813
-
(* If join, send server info back *)
814
-
(match msg with
815
-
| MsgJoin _ -> (
816
-
let server_join =
817
-
MsgJoin
818
-
{
819
-
user_id = state.my_id;
820
-
user_name = state.my_name;
821
-
user_color = state.my_color;
822
-
}
823
-
in
824
-
try write_line flow (message_to_json server_join) with _ -> ())
825
-
| _ -> ());
826
-
read_loop ()
827
-
| None -> read_loop ())
828
-
in
829
-
Fun.protect
830
-
~finally:(fun () ->
831
-
(* Send leave message when client disconnects *)
832
-
(match client_info.client_user_id with
833
-
| Some user_id ->
834
-
let leave_msg = MsgLeave { user_id } in
835
-
(* Notify server's main loop *)
836
-
Eio.Stream.add state.incoming leave_msg;
837
-
(* Broadcast to other clients *)
838
-
Eio.Mutex.use_rw ~protect:true clients_mutex (fun () ->
839
-
List.iter
840
-
(fun ci ->
841
-
if ci.client_flow != flow then
842
-
try write_line ci.client_flow (message_to_json leave_msg)
843
-
with _ -> ())
844
-
!clients)
845
-
| None -> ());
846
-
(* Remove from client list *)
847
-
Eio.Mutex.use_rw ~protect:true clients_mutex (fun () ->
848
-
clients := List.filter (fun ci -> ci.client_flow != flow) !clients))
849
-
read_loop
850
-
851
-
(* Server: accept connections and broadcast outgoing messages *)
852
-
let run_server ~sw ~net ~state ~clock port =
853
-
let addr = `Tcp (Eio.Net.Ipaddr.V4.any, port) in
854
-
let socket = Eio.Net.listen ~sw ~backlog:16 ~reuse_addr:true net addr in
855
-
let clients = ref [] in
856
-
let clients_mutex = Eio.Mutex.create () in
857
-
858
-
Printf.printf "Server listening on port %d\n%!" port;
859
-
state.connected <- true;
860
-
861
-
(* Fiber to broadcast outgoing messages from main *)
862
-
Eio.Fiber.fork ~sw (fun () ->
863
-
while true do
864
-
let msg = Eio.Stream.take state.outgoing in
865
-
let json = message_to_json msg in
866
-
Eio.Mutex.use_rw ~protect:true clients_mutex (fun () ->
867
-
clients :=
868
-
List.filter
869
-
(fun ci ->
870
-
try
871
-
write_line ci.client_flow json;
872
-
true
873
-
with _ -> false)
874
-
!clients)
875
-
done);
876
-
877
-
(* Accept connections *)
878
-
while true do
879
-
Eio.Net.accept_fork ~sw socket
880
-
~on_error:(fun _ -> ())
881
-
(fun flow addr ->
882
-
Printf.printf "Client connected\n%!";
883
-
let client_info = { client_flow = flow; client_user_id = None } in
884
-
Eio.Mutex.use_rw ~protect:true clients_mutex (fun () ->
885
-
clients := client_info :: !clients);
886
-
handle_client ~sw ~state ~clients ~clients_mutex client_info)
887
-
done
888
-
889
-
(* Client: connect and relay messages *)
890
-
let run_client ~sw ~net ~state ~clock host port =
891
-
(* Parse IP address from dotted decimal string using Unix *)
892
-
let inet_addr = Unix.inet_addr_of_string host in
893
-
let ip_bytes =
894
-
(* inet_addr is abstract, but we can get the string representation and parse it *)
895
-
let s = Unix.string_of_inet_addr inet_addr in
896
-
(* For IPv4, convert "a.b.c.d" to 4 bytes *)
897
-
let parts = String.split_on_char '.' s in
898
-
String.init 4 (fun i -> Char.chr (int_of_string (List.nth parts i)))
899
-
in
900
-
let ip = Eio.Net.Ipaddr.of_raw ip_bytes in
901
-
let addr = `Tcp (ip, port) in
902
-
let flow = Eio.Net.connect ~sw net addr in
903
-
set_tcp_nodelay flow;
904
-
let buf_reader = Eio.Buf_read.of_flow ~max_size:65536 flow in
905
-
906
-
Printf.printf "Connected to server\n%!";
907
-
state.connected <- true;
908
-
909
-
(* Send join *)
910
-
let join =
911
-
MsgJoin
912
-
{
913
-
user_id = state.my_id;
914
-
user_name = state.my_name;
915
-
user_color = state.my_color;
916
-
}
917
-
in
918
-
write_line flow (message_to_json join);
919
-
920
-
(* Fiber to send outgoing messages *)
921
-
Eio.Fiber.fork ~sw (fun () ->
922
-
try
923
-
while true do
924
-
let msg = Eio.Stream.take state.outgoing in
925
-
write_line flow (message_to_json msg)
926
-
done
927
-
with _ -> ());
928
-
929
-
(* Read from server *)
930
-
let rec read_loop () =
931
-
match read_line flow buf_reader with
932
-
| None -> ()
933
-
| Some line -> (
934
-
match parse_message line with
935
-
| Some msg ->
936
-
Eio.Stream.add state.incoming msg;
937
-
read_loop ()
938
-
| None -> read_loop ())
939
-
in
940
-
read_loop ()
941
-
942
-
(* ============================================================================
943
-
Input Handling
944
-
============================================================================ *)
945
-
946
-
let handle_name_input state ~start_network =
947
-
if not state.entering_name then ()
948
-
else begin
949
-
let rec get_chars () =
950
-
let key = get_char_pressed () in
951
-
let code = Uchar.to_int key in
952
-
if code >= 32 && code < 127 && String.length state.name_input < 20 then begin
953
-
state.name_input <- state.name_input ^ String.make 1 (Char.chr code);
954
-
get_chars ()
955
-
end
956
-
in
957
-
get_chars ();
958
-
959
-
if is_key_pressed Key.Backspace && String.length state.name_input > 0 then
960
-
state.name_input <-
961
-
String.sub state.name_input 0 (String.length state.name_input - 1);
962
-
963
-
if is_key_pressed Key.Enter && String.length state.name_input > 0 then begin
964
-
state.my_name <- state.name_input;
965
-
state.entering_name <- false;
966
-
start_network ()
967
-
end
968
-
end
969
-
970
-
let handle_drawing state =
971
-
if state.entering_name then ()
972
-
else
973
-
let mx, my = get_mouse_pos () in
974
-
let in_canvas =
975
-
point_in_rect mx my (canvas_x ()) (canvas_y ()) (canvas_width ())
976
-
(canvas_height ())
977
-
in
978
-
979
-
if in_canvas && not state.chat_focused then begin
980
-
if is_mouse_button_pressed MouseButton.Left then begin
981
-
state.is_drawing <- true;
982
-
state.current_stroke <- [ { x = mx; y = my } ]
983
-
end
984
-
else if is_mouse_button_down MouseButton.Left && state.is_drawing then begin
985
-
match state.current_stroke with
986
-
| p :: _ when ((mx -. p.x) ** 2.) +. ((my -. p.y) ** 2.) > 9. ->
987
-
let new_point = { x = mx; y = my } in
988
-
state.current_stroke <- new_point :: state.current_stroke;
989
-
(* Send real-time drawing update *)
990
-
send_message state
991
-
(MsgDrawing
992
-
{
993
-
draw_user_id = state.my_id;
994
-
draw_color = state.brush_color;
995
-
draw_thickness = state.brush_size;
996
-
draw_from = p;
997
-
draw_to = new_point;
998
-
})
999
-
| _ -> ()
1000
-
end
1001
-
else if is_mouse_button_released MouseButton.Left && state.is_drawing then begin
1002
-
state.is_drawing <- false;
1003
-
if List.length state.current_stroke > 1 then begin
1004
-
let stroke =
1005
-
{
1006
-
stroke_id = generate_id ();
1007
-
color = state.brush_color;
1008
-
thickness = state.brush_size;
1009
-
points = List.rev state.current_stroke;
1010
-
}
1011
-
in
1012
-
add_stroke state stroke ~broadcast:true
1013
-
end;
1014
-
state.current_stroke <- []
1015
-
end
1016
-
end
1017
-
else if is_mouse_button_released MouseButton.Left then begin
1018
-
state.is_drawing <- false;
1019
-
state.current_stroke <- []
1020
-
end
1021
-
1022
-
let handle_palette state =
1023
-
if state.entering_name then ()
1024
-
else
1025
-
let palette_y = canvas_y () + canvas_height () + scale 10 in
1026
-
let swatch = scale 30 in
1027
-
let mx, my = get_mouse_pos () in
1028
-
1029
-
if is_mouse_button_pressed MouseButton.Left then
1030
-
let palette = get_palette state.palette_index in
1031
-
Array.iteri
1032
-
(fun i c ->
1033
-
let sx = canvas_x () + scale 70 + (i * (swatch + scale 5)) in
1034
-
if point_in_rect mx my sx palette_y swatch swatch then
1035
-
state.brush_color <- c)
1036
-
palette
1037
-
1038
-
let handle_brush_size_and_palette state =
1039
-
if state.entering_name || state.chat_focused then ()
1040
-
else
1041
-
let wheel = get_mouse_wheel_move () in
1042
-
if wheel <> 0. then begin
1043
-
if is_key_down Key.Left_shift || is_key_down Key.Right_shift then begin
1044
-
(* Shift + Wheel: cycle palette *)
1045
-
let delta = if wheel > 0. then 1 else -1 in
1046
-
state.palette_index <-
1047
-
(state.palette_index + delta + num_palettes) mod num_palettes
1048
-
end
1049
-
else
1050
-
(* Wheel alone: change brush size *)
1051
-
state.brush_size <- clamp (state.brush_size +. (wheel *. 2.)) 1. 30.
1052
-
end
1053
-
1054
-
let handle_chat state =
1055
-
if state.entering_name then ()
1056
-
else
1057
-
let mx, my = get_mouse_pos () in
1058
-
let input_y = chat_y () + chat_height () - scale 35 in
1059
-
1060
-
if is_mouse_button_pressed MouseButton.Left then
1061
-
state.chat_focused <-
1062
-
point_in_rect mx my (sidebar_x ()) input_y (sidebar_width ()) (scale 30);
1063
-
1064
-
if state.chat_focused then begin
1065
-
let rec get_chars () =
1066
-
let key = get_char_pressed () in
1067
-
let code = Uchar.to_int key in
1068
-
if code >= 32 && code < 127 then begin
1069
-
state.chat_input <- state.chat_input ^ String.make 1 (Char.chr code);
1070
-
get_chars ()
1071
-
end
1072
-
in
1073
-
get_chars ();
1074
-
1075
-
if is_key_pressed Key.Backspace && String.length state.chat_input > 0 then
1076
-
state.chat_input <-
1077
-
String.sub state.chat_input 0 (String.length state.chat_input - 1);
1078
-
1079
-
if is_key_pressed Key.Enter && String.length state.chat_input > 0 then begin
1080
-
let msg =
1081
-
{
1082
-
msg_id = generate_id ();
1083
-
sender = state.my_name;
1084
-
text = state.chat_input;
1085
-
timestamp = current_time ();
1086
-
}
1087
-
in
1088
-
add_chat state msg ~broadcast:true;
1089
-
state.chat_input <- ""
1090
-
end;
1091
-
1092
-
if is_key_pressed Key.Escape then state.chat_focused <- false
1093
-
end
1094
-
1095
-
let handle_shortcuts state =
1096
-
if state.entering_name || state.chat_focused then ()
1097
-
else if is_key_pressed Key.C then state.strokes <- []
1098
-
1099
-
let handle_resize () =
1100
-
(* Update window dimensions - DPI scale stays constant *)
1101
-
let w = get_screen_width () in
1102
-
let h = get_screen_height () in
1103
-
if w <> !window_width || h <> !window_height then begin
1104
-
window_width := w;
1105
-
window_height := h
1106
-
end
1107
-
1108
-
let handle_input state ~start_network =
1109
-
handle_resize ();
1110
-
handle_name_input state ~start_network;
1111
-
handle_drawing state;
1112
-
handle_palette state;
1113
-
handle_brush_size_and_palette state;
1114
-
handle_chat state;
1115
-
handle_shortcuts state
1116
-
1117
-
(* ============================================================================
1118
-
Rendering
1119
-
============================================================================ *)
1120
-
1121
-
let draw_name_dialog state =
1122
-
let dw = scale 500 in
1123
-
let dh = scale 180 in
1124
-
let dx = (!window_width - dw) / 2 in
1125
-
let dy = (!window_height - dh) / 2 in
1126
-
1127
-
draw_rectangle 0 0 !window_width !window_height (Color.create 0 0 0 180);
1128
-
draw_rectangle dx dy dw dh Color.raywhite;
1129
-
draw_rectangle_lines dx dy dw dh Color.darkgray;
1130
-
1131
-
draw_text "Enter Your Name"
1132
-
(dx + scale 140)
1133
-
(dy + scale 25)
1134
-
(scale 28) Color.darkgray;
1135
-
1136
-
let ix = dx + scale 50 in
1137
-
let iy = dy + scale 70 in
1138
-
let iw = dw - scale 100 in
1139
-
draw_rectangle ix iy iw (scale 50) Color.white;
1140
-
draw_rectangle_lines ix iy iw (scale 50) Color.blue;
1141
-
draw_text (state.name_input ^ "_")
1142
-
(ix + scale 15)
1143
-
(iy + scale 12)
1144
-
(scale 26) Color.black;
1145
-
1146
-
draw_text "Press ENTER to continue"
1147
-
(dx + scale 140)
1148
-
(dy + scale 140)
1149
-
(scale 18) Color.gray
1150
-
1151
-
let draw_stroke stroke =
1152
-
match stroke.points with
1153
-
| [] | [ _ ] -> ()
1154
-
| points ->
1155
-
let color = color_of_int stroke.color in
1156
-
let rec draw = function
1157
-
| [] | [ _ ] -> ()
1158
-
| p1 :: p2 :: rest ->
1159
-
draw_line_ex (Vector2.create p1.x p1.y) (Vector2.create p2.x p2.y)
1160
-
stroke.thickness color;
1161
-
draw (p2 :: rest)
1162
-
in
1163
-
draw points
1164
-
1165
-
let draw_current_stroke state =
1166
-
if state.is_drawing && List.length state.current_stroke > 1 then begin
1167
-
let color = color_of_int state.brush_color in
1168
-
let points = List.rev state.current_stroke in
1169
-
let rec draw = function
1170
-
| [] | [ _ ] -> ()
1171
-
| p1 :: p2 :: rest ->
1172
-
draw_line_ex (Vector2.create p1.x p1.y) (Vector2.create p2.x p2.y)
1173
-
state.brush_size color;
1174
-
draw (p2 :: rest)
1175
-
in
1176
-
draw points
1177
-
end
1178
-
1179
-
let draw_remote_drawings state =
1180
-
List.iter
1181
-
(fun d ->
1182
-
let color = color_of_int d.draw_color in
1183
-
draw_line_ex
1184
-
(Vector2.create d.draw_from.x d.draw_from.y)
1185
-
(Vector2.create d.draw_to.x d.draw_to.y)
1186
-
d.draw_thickness color)
1187
-
state.remote_drawings
1188
-
1189
-
let draw_canvas state =
1190
-
let cx, cy, cw, ch =
1191
-
(canvas_x (), canvas_y (), canvas_width (), canvas_height ())
1192
-
in
1193
-
draw_rectangle cx cy cw ch Color.white;
1194
-
draw_rectangle_lines cx cy cw ch Color.lightgray;
1195
-
1196
-
begin_scissor_mode cx cy cw ch;
1197
-
List.iter draw_stroke state.strokes;
1198
-
draw_remote_drawings state;
1199
-
draw_current_stroke state;
1200
-
1201
-
(* Other users' cursors - skip if outside canvas (cursor at -1,-1) *)
1202
-
let now = current_time () in
1203
-
Hashtbl.iter
1204
-
(fun _ u ->
1205
-
if
1206
-
u.user_id <> state.my_id
1207
-
&& now -. u.last_seen < presence_timeout
1208
-
&& u.cursor_x >= 0. && u.cursor_y >= 0.
1209
-
then begin
1210
-
let color = color_of_int u.user_color in
1211
-
draw_circle (int_of_float u.cursor_x) (int_of_float u.cursor_y)
1212
-
(scalef 6.) color;
1213
-
draw_circle_lines (int_of_float u.cursor_x) (int_of_float u.cursor_y)
1214
-
(scalef 7.) Color.black;
1215
-
draw_text u.user_name
1216
-
(int_of_float u.cursor_x + scale 12)
1217
-
(int_of_float u.cursor_y - scale 6)
1218
-
(scale 14) color
1219
-
end)
1220
-
state.users;
1221
-
end_scissor_mode ()
1222
-
1223
-
let draw_palette state =
1224
-
let py = canvas_y () + canvas_height () + scale 10 in
1225
-
let swatch = scale 30 in
1226
-
let palette = get_palette state.palette_index in
1227
-
1228
-
draw_text "Color:" (canvas_x ()) (py + scale 5) (scale 18) Color.darkgray;
1229
-
1230
-
Array.iteri
1231
-
(fun i c ->
1232
-
let sx = canvas_x () + scale 70 + (i * (swatch + scale 5)) in
1233
-
draw_rectangle sx py swatch swatch (color_of_int c);
1234
-
if c = state.brush_color then
1235
-
draw_rectangle_lines (sx - 2) (py - 2) (swatch + 4) (swatch + 4)
1236
-
Color.gold
1237
-
else draw_rectangle_lines sx py swatch swatch Color.darkgray)
1238
-
palette;
1239
-
1240
-
let size_x =
1241
-
canvas_x () + scale 70
1242
-
+ (Array.length palette * (swatch + scale 5))
1243
-
+ scale 30
1244
-
in
1245
-
draw_text
1246
-
(Printf.sprintf "Size: %.0f" state.brush_size)
1247
-
size_x
1248
-
(py + scale 5)
1249
-
(scale 18) Color.darkgray;
1250
-
draw_circle
1251
-
(size_x + scale 100)
1252
-
(py + scale 15)
1253
-
state.brush_size
1254
-
(color_of_int state.brush_color)
1255
-
1256
-
let draw_users state =
1257
-
let sx, sy, sw, sh =
1258
-
(sidebar_x (), presence_y (), sidebar_width (), presence_height ())
1259
-
in
1260
-
draw_rectangle sx sy sw sh Color.raywhite;
1261
-
draw_rectangle_lines sx sy sw sh Color.lightgray;
1262
-
1263
-
draw_text "Users Online"
1264
-
(sx + scale 10)
1265
-
(sy + scale 10)
1266
-
(scale 22) Color.darkgray;
1267
-
1268
-
let status, color =
1269
-
match state.mode with
1270
-
| Local -> ("Local Mode", Color.gray)
1271
-
| Server ->
1272
-
( (if state.connected then "Hosting" else "Starting..."),
1273
-
if state.connected then Color.green else Color.orange )
1274
-
| Client _ ->
1275
-
( (if state.connected then "Connected" else "Connecting..."),
1276
-
if state.connected then Color.green else Color.orange )
1277
-
in
1278
-
draw_text status (sx + scale 10) (sy + scale 40) (scale 16) color;
1279
-
1280
-
let y = ref (sy + scale 70) in
1281
-
draw_circle
1282
-
(sx + scale 20)
1283
-
(!y + scale 10)
1284
-
(scalef 8.)
1285
-
(color_of_int state.my_color);
1286
-
draw_text (state.my_name ^ " (you)") (sx + scale 40) !y (scale 16) Color.black;
1287
-
y := !y + scale 30;
1288
-
1289
-
let now = current_time () in
1290
-
(* Sort users by user_id for stable ordering *)
1291
-
let sorted_users =
1292
-
Hashtbl.fold (fun _ u acc -> u :: acc) state.users []
1293
-
|> List.sort (fun a b -> String.compare a.user_id b.user_id)
1294
-
in
1295
-
List.iter
1296
-
(fun u ->
1297
-
if u.user_id <> state.my_id then begin
1298
-
let alpha =
1299
-
if now -. u.last_seen < presence_timeout then 255 else 100
1300
-
in
1301
-
let c = color_of_int u.user_color in
1302
-
let c = Color.create (Color.r c) (Color.g c) (Color.b c) alpha in
1303
-
draw_circle (sx + scale 20) (!y + scale 10) (scalef 8.) c;
1304
-
draw_text u.user_name
1305
-
(sx + scale 40)
1306
-
!y (scale 16)
1307
-
(if alpha = 255 then Color.black else Color.gray);
1308
-
y := !y + scale 30
1309
-
end)
1310
-
sorted_users
1311
-
1312
-
let draw_chat state =
1313
-
let sx, cy, sw, ch =
1314
-
(sidebar_x (), chat_y (), sidebar_width (), chat_height ())
1315
-
in
1316
-
draw_rectangle sx cy sw ch Color.raywhite;
1317
-
draw_rectangle_lines sx cy sw ch Color.lightgray;
1318
-
1319
-
draw_text "Chat" (sx + scale 10) (cy + scale 10) (scale 22) Color.darkgray;
1320
-
1321
-
let msg_y = ref (cy + scale 40) in
1322
-
let msg_h = ch - scale 80 in
1323
-
let line_h = scale 22 in
1324
-
let max_visible = msg_h / line_h in
1325
-
let visible =
1326
-
let total = List.length state.messages in
1327
-
if total > max_visible then
1328
-
List.filteri (fun i _ -> i >= total - max_visible) state.messages
1329
-
else state.messages
1330
-
in
1331
-
1332
-
begin_scissor_mode sx (cy + scale 40) sw msg_h;
1333
-
List.iter
1334
-
(fun m ->
1335
-
let text = Printf.sprintf "%s: %s" m.sender m.text in
1336
-
let max_chars = sw / scale 8 in
1337
-
let display =
1338
-
if String.length text > max_chars then
1339
-
String.sub text 0 (max_chars - 3) ^ "..."
1340
-
else text
1341
-
in
1342
-
let color = if m.sender = "System" then Color.blue else Color.black in
1343
-
draw_text display (sx + scale 10) !msg_y (scale 16) color;
1344
-
msg_y := !msg_y + line_h)
1345
-
visible;
1346
-
end_scissor_mode ();
1347
-
1348
-
let iy = cy + ch - scale 35 in
1349
-
let input_bg = if state.chat_focused then Color.white else Color.lightgray in
1350
-
draw_rectangle (sx + scale 5) iy (sw - scale 10) (scale 30) input_bg;
1351
-
draw_rectangle_lines
1352
-
(sx + scale 5)
1353
-
iy
1354
-
(sw - scale 10)
1355
-
(scale 30)
1356
-
(if state.chat_focused then Color.blue else Color.gray);
1357
-
1358
-
let display =
1359
-
if state.chat_input = "" && not state.chat_focused then "Type to chat..."
1360
-
else state.chat_input ^ if state.chat_focused then "_" else ""
1361
-
in
1362
-
draw_text display
1363
-
(sx + scale 12)
1364
-
(iy + scale 6)
1365
-
(scale 16)
1366
-
(if state.chat_input = "" && not state.chat_focused then Color.gray
1367
-
else Color.black)
1368
-
1369
-
let draw_help state =
1370
-
let py = canvas_y () + canvas_height () + palette_height () + scale 5 in
1371
-
draw_text
1372
-
(Printf.sprintf
1373
-
"LMB: Draw | Wheel: Size | Shift+Wheel: Palette (%d/%d) | C: Clear"
1374
-
(state.palette_index + 1) num_palettes)
1375
-
(canvas_x ()) py (scale 14) Color.gray
1376
-
1377
-
let render state =
1378
-
begin_drawing ();
1379
-
clear_background (Color.create 240 240 240 255);
1380
-
1381
-
draw_canvas state;
1382
-
draw_palette state;
1383
-
draw_users state;
1384
-
draw_chat state;
1385
-
draw_help state;
1386
-
1387
-
if state.entering_name then draw_name_dialog state;
1388
-
end_drawing ()
1389
-
1390
-
(* ============================================================================
1391
-
Update
1392
-
============================================================================ *)
1393
-
1394
-
let update_presence state =
1395
-
if state.entering_name then ()
1396
-
else
1397
-
let now = current_time () in
1398
-
if now -. state.last_presence_sync > sync_interval then begin
1399
-
state.last_presence_sync <- now;
1400
-
let mx, my = get_mouse_pos () in
1401
-
(* Always send presence to keep connection alive, use -1,-1 when outside canvas *)
1402
-
let in_canvas =
1403
-
point_in_rect mx my (canvas_x ()) (canvas_y ()) (canvas_width ())
1404
-
(canvas_height ())
1405
-
in
1406
-
let cursor_x, cursor_y = if in_canvas then (mx, my) else (-1., -1.) in
1407
-
let p =
1408
-
{
1409
-
user_id = state.my_id;
1410
-
user_name = state.my_name;
1411
-
cursor_x;
1412
-
cursor_y;
1413
-
user_color = state.my_color;
1414
-
last_seen = now;
1415
-
}
1416
-
in
1417
-
update_user state p ~broadcast:true
1418
-
end
1419
-
1420
-
let update state ~start_network =
1421
-
handle_input state ~start_network;
1422
-
process_incoming state;
1423
-
update_presence state;
1424
-
cleanup_users state
1425
-
1426
-
(* ============================================================================
1427
-
Main
1428
-
============================================================================ *)
1429
-
1430
-
let parse_args () =
1431
-
let mode = ref Local in
1432
-
let name = ref "" in
1433
-
let i = ref 1 in
1434
-
while !i < Array.length Sys.argv do
1435
-
(match Sys.argv.(!i) with
1436
-
| "--host" | "-h" -> mode := Server
1437
-
| ("--connect" | "-c") when !i + 1 < Array.length Sys.argv ->
1438
-
incr i;
1439
-
mode := Client Sys.argv.(!i)
1440
-
| ("--name" | "-n") when !i + 1 < Array.length Sys.argv ->
1441
-
incr i;
1442
-
name := Sys.argv.(!i)
1443
-
| s when String.length s > 0 && s.[0] <> '-' && !mode = Local ->
1444
-
mode := Client s
1445
-
| _ -> ());
1446
-
incr i
1447
-
done;
1448
-
(!mode, !name)
1449
-
1450
-
let () =
1451
-
let mode, name = parse_args () in
1452
-
1453
-
Eio_main.run @@ fun env ->
1454
-
Eio.Switch.run @@ fun sw ->
1455
-
let net = Eio.Stdenv.net env in
1456
-
let clock = Eio.Stdenv.clock env in
1457
-
1458
-
(* Create message streams *)
1459
-
let incoming = Eio.Stream.create 100 in
1460
-
let outgoing = Eio.Stream.create 100 in
1461
-
let state = create_state ~incoming ~outgoing mode name in
1462
-
1463
-
(* Track if networking has been started *)
1464
-
let network_started = ref false in
1465
-
1466
-
(* Function to start networking (called when name is entered) *)
1467
-
let start_network () =
1468
-
if not !network_started then begin
1469
-
network_started := true;
1470
-
match state.mode with
1471
-
| Local -> ()
1472
-
| Server ->
1473
-
Eio.Fiber.fork ~sw (fun () ->
1474
-
run_server ~sw ~net ~state ~clock default_port)
1475
-
| Client host ->
1476
-
Eio.Fiber.fork ~sw (fun () ->
1477
-
run_client ~sw ~net ~state ~clock host default_port)
1478
-
end
1479
-
in
1480
-
1481
-
(match mode with
1482
-
| Local -> Printf.printf "Local mode\n%!"
1483
-
| Server -> Printf.printf "Server mode on port %d\n%!" default_port
1484
-
| Client h -> Printf.printf "Connecting to %s:%d\n%!" h default_port);
1485
-
1486
-
(* Start networking if name provided *)
1487
-
if name <> "" then start_network ();
1488
-
1489
-
set_config_flags [ ConfigFlags.Msaa_4x_hint; ConfigFlags.Window_resizable ];
1490
-
init_window base_width base_height "CRDT Whiteboard";
1491
-
set_window_min_size 800 600;
1492
-
(* Don't use Raylib's FPS limiter - it blocks and prevents Eio fibers from running *)
1493
-
set_target_fps 0;
1494
-
1495
-
(* Initial dimensions *)
1496
-
window_width := get_screen_width ();
1497
-
window_height := get_screen_height ();
1498
-
1499
-
(* Get DPI scale from system - this detects HiDPI displays *)
1500
-
let dpi_vec = get_window_scale_dpi () in
1501
-
let detected_dpi = max (Vector2.x dpi_vec) (Vector2.y dpi_vec) in
1502
-
(* Use DPI scale if > 1, otherwise use 1.0 *)
1503
-
dpi_scale := if detected_dpi > 1.0 then detected_dpi else 1.0;
1504
-
Printf.printf "DPI scale: %.2f (detected: %.2f x %.2f)\n%!" !dpi_scale
1505
-
(Vector2.x dpi_vec) (Vector2.y dpi_vec);
1506
-
1507
-
let frame_time = 1.0 /. 60.0 in
1508
-
(* Target 60 FPS *)
1509
-
let last_frame = ref (Eio.Time.now clock) in
1510
-
1511
-
while not (window_should_close ()) do
1512
-
update state ~start_network;
1513
-
render state;
1514
-
1515
-
(* Use Eio sleep for frame timing - this allows network fibers to run *)
1516
-
let now = Eio.Time.now clock in
1517
-
let elapsed = now -. !last_frame in
1518
-
let sleep_time = frame_time -. elapsed in
1519
-
if sleep_time > 0.001 then Eio.Time.sleep clock sleep_time;
1520
-
last_frame := Eio.Time.now clock
1521
-
done;
1522
-
1523
-
if state.connected then
1524
-
send_message state (MsgLeave { user_id = state.my_id });
1525
-
1526
-
close_window ();
1527
-
Printf.printf "Goodbye!\n%!"
-37
crdt-eio.opam
-37
crdt-eio.opam
···
1
-
# This file is generated by dune, edit dune-project instead
2
-
opam-version: "2.0"
3
-
synopsis: "Eio effect handler for CRDT library"
4
-
description:
5
-
"Eio-based effect handler for the CRDT library, enabling high-performance concurrent IO for real-time collaborative applications."
6
-
maintainer: ["Guillermo Diaz-Romero <gdiazlo@gmail.com>"]
7
-
authors: ["Guillermo Diaz-Romero <gdiazlo@gmail.com>"]
8
-
license: "MIT"
9
-
tags: ["crdt" "eio" "async" "io"]
10
-
homepage: "https://github.com/gdiazlo/crdt"
11
-
doc: "https://github.com/gdiazlo/crdt"
12
-
bug-reports: "https://github.com/gdiazlo/crdt/issues"
13
-
depends: [
14
-
"ocaml" {>= "5.2"}
15
-
"dune" {>= "3.20" & >= "3.20"}
16
-
"crdt" {= version}
17
-
"eio" {>= "1.0"}
18
-
"alcotest" {with-test}
19
-
"eio_main" {with-test}
20
-
"odoc" {with-doc}
21
-
]
22
-
build: [
23
-
["dune" "subst"] {dev}
24
-
[
25
-
"dune"
26
-
"build"
27
-
"-p"
28
-
name
29
-
"-j"
30
-
jobs
31
-
"@install"
32
-
"@runtest" {with-test}
33
-
"@doc" {with-doc}
34
-
]
35
-
]
36
-
dev-repo: "git+https://github.com/gdiazlo/crdt.git"
37
-
x-maintenance-intent: ["(latest)"]
+9
-8
crdt.opam
+9
-8
crdt.opam
···
1
1
# This file is generated by dune, edit dune-project instead
2
2
opam-version: "2.0"
3
+
version: "0.1.0"
3
4
synopsis: "OCaml CRDT library compatible with json-joy"
4
5
description:
5
-
"A full OCaml 5.4 CRDT library implementing the json-joy specification. Includes JSON CRDT document model, patch operations, multiple codec formats, and JSON-Rx RPC for real-time synchronization."
6
-
maintainer: ["Guillermo Diaz-Romero <gdiazlo@gmail.com>"]
7
-
authors: ["Guillermo Diaz-Romero <gdiazlo@gmail.com>"]
8
-
license: "MIT"
6
+
"A full OCaml CRDT library implementing the json-joy specification. Includes JSON CRDT document model, patch operations, multiple codec formats, and JSON-Rx RPC for real-time synchronization."
7
+
maintainer: ["Gabriel Díaz"]
8
+
authors: ["Gabriel Díaz"]
9
+
license: "ISC"
9
10
tags: ["crdt" "json-crdt" "collaborative-editing" "json-joy" "ocaml"]
10
-
homepage: "https://github.com/gdiazlo/crdt"
11
-
doc: "https://github.com/gdiazlo/crdt"
12
-
bug-reports: "https://github.com/gdiazlo/crdt/issues"
11
+
homepage: "https://tangled.org/gdiazlo.tngl.sh/crdt"
12
+
doc: "https://tangled.org/gdiazlo.tngl.sh/crdt"
13
+
bug-reports: "https://tangled.org/gdiazlo.tngl.sh/crdt/issues"
13
14
depends: [
14
15
"ocaml" {>= "5.2"}
15
16
"dune" {>= "3.20" & >= "3.20"}
···
32
33
"@doc" {with-doc}
33
34
]
34
35
]
35
-
dev-repo: "git+https://github.com/gdiazlo/crdt.git"
36
+
dev-repo: "git+https://tangled.org/gdiazlo.tngl.sh/crdt"
36
37
x-maintenance-intent: ["(latest)"]
+128
docs/README.md
+128
docs/README.md
···
1
+
# OCaml CRDT Library
2
+
3
+
A full OCaml 5.2+ implementation of JSON CRDTs compatible with [json-joy](https://github.com/streamich/json-joy).
4
+
5
+
## Overview
6
+
7
+
This library provides:
8
+
9
+
- **JSON CRDT Document Model** - 7 node types for representing any JSON structure
10
+
- **Patch Operations** - 18 opcodes for document mutations
11
+
- **Multiple Codec Formats** - Verbose, Compact, Binary, Sidecar, Indexed
12
+
- **JSON-Rx RPC Protocol** - Real-time synchronization over WebSocket/TCP
13
+
- **Full ClockVector Support** - Multi-session conflict resolution
14
+
15
+
## Installation
16
+
17
+
```bash
18
+
opam install crdt
19
+
```
20
+
21
+
## Quick Example
22
+
23
+
```ocaml
24
+
open Crdt
25
+
26
+
(* Create a new document *)
27
+
let model = Model.create 12345
28
+
29
+
(* Build a patch to set root to an object with a "name" field *)
30
+
let patch = Patch_builder.create model
31
+
|> Patch_builder.set_root_obj
32
+
|> Patch_builder.set_string "name" "Alice"
33
+
|> Patch_builder.build
34
+
35
+
(* Apply the patch *)
36
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) patch;
37
+
38
+
(* View the document as JSON *)
39
+
let json = Model.view model
40
+
(* => Object [("name", String "Alice")] *)
41
+
42
+
(* Encode to binary for transmission *)
43
+
let bytes = Model_codec.Binary.encode model
44
+
```
45
+
46
+
## Documentation
47
+
48
+
- [Architecture](architecture.md) - Module structure and design
49
+
- [json-joy Mapping](json-joy-mapping.md) - TypeScript to OCaml mapping
50
+
- [Codecs](codecs.md) - Encoding formats explained
51
+
- [Getting Started](getting-started.md) - Tutorial with examples
52
+
53
+
## Compatibility
54
+
55
+
This library is wire-compatible with json-joy. Documents encoded in one can be decoded in the other. The conformance test suite in `test/fixtures/traces/` validates this compatibility.
56
+
57
+
## Module Index
58
+
59
+
| Module | Description |
60
+
|--------|-------------|
61
+
| `Crdt.Model` | Document container |
62
+
| `Crdt.Patch` | Operation batch |
63
+
| `Crdt.Node` | 7 CRDT node types |
64
+
| `Crdt.Op` | 18 operation opcodes |
65
+
| `Crdt.Clock` | Logical timestamps |
66
+
| `Crdt.Rx` | JSON-Rx RPC protocol |
67
+
| `Crdt.Model_codec` | All codec formats |
68
+
69
+
## Demo Applications
70
+
71
+
Three demo applications showcase real-time collaboration between OCaml (this library) and TypeScript (json-joy):
72
+
73
+
### wb - HCS WebSocket Server
74
+
75
+
A collaborative whiteboard server using the [HCS](https://github.com/dialohq/hcs) HTTP library (Eio-based).
76
+
77
+
```bash
78
+
dune exec bin/wb/wb.exe
79
+
# or with custom port
80
+
PORT=3000 dune exec bin/wb/wb.exe
81
+
```
82
+
83
+
- **Runtime**: Eio (OCaml 5 effects)
84
+
- **Protocol**: WebSocket with JSON messages
85
+
- **Frontend**: HTML Canvas + json-joy
86
+
- **Features**: Drawing, presence cursors, chat, stroke history
87
+
88
+
Open `http://localhost:8080` in multiple browsers to collaborate.
89
+
90
+
### wbd - Dream WebSocket Server
91
+
92
+
Same whiteboard application using the [Dream](https://aantron.github.io/dream/) framework (Lwt-based).
93
+
94
+
```bash
95
+
dune exec bin/wbd/wbd.exe
96
+
```
97
+
98
+
- **Runtime**: Lwt (promise-based async)
99
+
- **Protocol**: WebSocket with JSON messages
100
+
- **Frontend**: Shared with wb (`static/`)
101
+
102
+
Useful for comparing Eio vs Lwt implementations.
103
+
104
+
### wbr - Raylib Native GUI
105
+
106
+
A native desktop whiteboard using [Raylib](https://www.raylib.com/) for graphics.
107
+
108
+
```bash
109
+
dune exec bin/wbr/wbr.exe -- --name "Alice" --host # Start server
110
+
dune exec bin/wbr/wbr.exe -- --name "Bob" --connect HOST # Connect to server
111
+
dune exec bin/wbr/wbr.exe -- --name "Carol" # Local-only mode
112
+
```
113
+
114
+
- **Runtime**: Eio with Raylib bindings
115
+
- **Protocol**: Binary TCP with length-prefixed frames
116
+
- **Features**: Drawing, colors, brush sizes, presence, chat panel
117
+
118
+
### Interoperability
119
+
120
+
All demos demonstrate interoperability with json-joy:
121
+
122
+
1. **Wire format**: Binary patches encoded by OCaml can be decoded by json-joy and vice versa
123
+
2. **Web frontend**: The HTML/JS frontend uses json-joy for CRDT operations
124
+
3. **Sync protocol**: JSON-Rx messages are compatible between implementations
125
+
126
+
## License
127
+
128
+
ISC
+186
docs/architecture.md
+186
docs/architecture.md
···
1
+
# Architecture
2
+
3
+
## Module Organization
4
+
5
+
```
6
+
crdt/
7
+
├── Core Types
8
+
│ ├── Clock Logical timestamps and clock vectors
9
+
│ ├── Session Session ID constants
10
+
│ ├── Value JSON values + CBOR extensions
11
+
│ └── Pointer JSON Pointer (RFC 6901)
12
+
│
13
+
├── Document Model
14
+
│ ├── Node 7 CRDT node types
15
+
│ ├── Rga Replicated Growable Array
16
+
│ ├── Model Document container
17
+
│ └── Model_api High-level editing API
18
+
│
19
+
├── Patch Operations
20
+
│ ├── Op 18 operation opcodes
21
+
│ ├── Patch Patch container
22
+
│ └── Patch_builder Builder pattern for patches
23
+
│
24
+
├── Codecs
25
+
│ ├── Model_codec Unified interface
26
+
│ ├── Model_codec_verbose Human-readable JSON
27
+
│ ├── Model_codec_compact Compact JSON
28
+
│ ├── Model_codec_sidecar View + metadata split
29
+
│ ├── Model_codec_indexed Flat timestamp map
30
+
│ ├── Patch_codec Verbose patch JSON
31
+
│ ├── Patch_codec_compact Compact patch JSON
32
+
│ └── Patch_codec_binary Binary patch format
33
+
│
34
+
├── RPC
35
+
│ ├── Rx Message types
36
+
│ ├── Rx_codec Message encoding
37
+
│ ├── Rx_server Server dispatcher
38
+
│ └── Rx_client Client correlator
39
+
│
40
+
├── IO
41
+
│ ├── Io_intf Effect-based IO signatures
42
+
│ └── Io_unix Unix blocking handler
43
+
│
44
+
└── Extensions
45
+
├── Counter PN-Counter on vec
46
+
└── Mval Multi-value register on arr
47
+
```
48
+
49
+
## Core Concepts
50
+
51
+
### Timestamps
52
+
53
+
Every node and operation has a unique timestamp `{sid; time}`:
54
+
55
+
```ocaml
56
+
type timestamp = {
57
+
sid : int; (* Session ID - identifies the replica *)
58
+
time : int; (* Logical time - monotonically increasing *)
59
+
}
60
+
```
61
+
62
+
Timestamps provide:
63
+
- **Unique identity** - No two operations share a timestamp
64
+
- **Causal ordering** - Higher time = happened later (within session)
65
+
- **Conflict resolution** - LWW uses timestamp comparison
66
+
67
+
### Clock Vector
68
+
69
+
Tracks observed timestamps from all sessions:
70
+
71
+
```ocaml
72
+
type clock_vector = {
73
+
local : logical_clock; (* This session's clock *)
74
+
peers : (int * int) list; (* Observed (sid, time) pairs *)
75
+
}
76
+
```
77
+
78
+
### Document Model
79
+
80
+
A `Model.t` contains:
81
+
82
+
```ocaml
83
+
type t = {
84
+
clock : Clock.clock_vector; (* Vector clock *)
85
+
root : Node.t; (* Root val node at (0,0) *)
86
+
index : Node.t NodeIndex.t; (* Timestamp -> Node map *)
87
+
}
88
+
```
89
+
90
+
The root is always a `val` node at timestamp `(0, 0)`. It points to the actual document content.
91
+
92
+
### Node Types
93
+
94
+
Seven node types represent different JSON structures:
95
+
96
+
| Node | JSON Type | CRDT Type | Description |
97
+
|------|-----------|-----------|-------------|
98
+
| `con` | any | Immutable | Constant value (primitives, nested refs) |
99
+
| `val` | - | LWW Register | Mutable reference to another node |
100
+
| `obj` | object | LWW-Map | String-keyed map |
101
+
| `vec` | - | LWW Tuple | Fixed slots (0-255) |
102
+
| `arr` | array | RGA | Ordered list of references |
103
+
| `str` | string | RGA | Mutable text |
104
+
| `bin` | - | RGA | Mutable binary data |
105
+
106
+
### RGA (Replicated Growable Array)
107
+
108
+
The core algorithm for `arr`, `str`, and `bin` nodes. Each element has a unique timestamp allowing:
109
+
110
+
- Concurrent insertions without conflicts
111
+
- Tombstone-based deletion
112
+
- Deterministic ordering across replicas
113
+
114
+
```ocaml
115
+
type 'a chunk = {
116
+
id : Clock.timestamp; (* First element's ID *)
117
+
span : int; (* Number of elements *)
118
+
data : 'a; (* Content (string/bytes/refs) *)
119
+
deleted : bool; (* Tombstone flag *)
120
+
}
121
+
122
+
type 'a t = {
123
+
chunks : 'a chunk list; (* Ordered chunks *)
124
+
}
125
+
```
126
+
127
+
### Patch Operations
128
+
129
+
18 opcodes modify the document:
130
+
131
+
**Creation** (consume 1 ID each):
132
+
- `new_con` - Create constant node
133
+
- `new_val` - Create value node
134
+
- `new_obj` - Create object node
135
+
- `new_vec` - Create vector node
136
+
- `new_str` - Create string node
137
+
- `new_bin` - Create binary node
138
+
- `new_arr` - Create array node
139
+
140
+
**Mutation**:
141
+
- `ins_val` - Set value reference
142
+
- `ins_obj` - Set object entries
143
+
- `ins_vec` - Set vector slot
144
+
- `ins_str` - Insert text (consumes N IDs for N chars)
145
+
- `ins_bin` - Insert bytes (consumes N IDs for N bytes)
146
+
- `ins_arr` - Insert array element
147
+
148
+
**Update**:
149
+
- `upd_arr` - Update array element in place
150
+
151
+
**Delete**:
152
+
- `del` - Delete ranges from RGA nodes
153
+
154
+
**Utility**:
155
+
- `nop` - No-op padding
156
+
157
+
### Patch Structure
158
+
159
+
```ocaml
160
+
type t = {
161
+
id : Clock.timestamp; (* Starting timestamp *)
162
+
ops : Op.op_data list; (* Sequence of operations *)
163
+
}
164
+
```
165
+
166
+
Operations are assigned consecutive IDs based on their span. For example, `ins_str` with "hello" consumes 5 IDs.
167
+
168
+
## Data Flow
169
+
170
+
```
171
+
User Edit
172
+
│
173
+
▼
174
+
Patch_builder ──► Patch ──► Model.apply_op
175
+
│ │
176
+
▼ ▼
177
+
Patch_codec ◄──────────────► Model_codec
178
+
│ │
179
+
▼ ▼
180
+
Wire Wire
181
+
(JSON/Binary) (JSON/Binary)
182
+
```
183
+
184
+
## Thread Safety
185
+
186
+
The library is not thread-safe by default. Use external synchronization (e.g., `Eio.Mutex`) when sharing models across fibers/threads.
+183
docs/codecs.md
+183
docs/codecs.md
···
1
+
# Codec Formats
2
+
3
+
This library supports multiple encoding formats for documents and patches, matching json-joy's codec system.
4
+
5
+
## Document Codecs
6
+
7
+
### Binary (Recommended for Wire)
8
+
9
+
Compact binary format optimized for size and speed.
10
+
11
+
```ocaml
12
+
(* Encode *)
13
+
let bytes = Model_codec.Binary.encode model
14
+
15
+
(* Decode *)
16
+
let model = Model_codec.Binary.decode bytes
17
+
```
18
+
19
+
**Characteristics:**
20
+
- Smallest size
21
+
- Fastest encode/decode
22
+
- Not human-readable
23
+
- Use for: Network transmission, storage
24
+
25
+
### Verbose JSON
26
+
27
+
Human-readable JSON with explicit structure.
28
+
29
+
```ocaml
30
+
let json = Model_codec_verbose.encode model
31
+
let model = Model_codec_verbose.decode json
32
+
```
33
+
34
+
**Example output:**
35
+
```json
36
+
{
37
+
"root": [0, 0],
38
+
"clock": {"sid": 12345, "time": 5},
39
+
"nodes": [
40
+
{"type": "val", "id": [0, 0], "value": [12345, 1]},
41
+
{"type": "obj", "id": [12345, 1], "entries": [
42
+
{"key": "name", "value": [12345, 2], "ts": [12345, 3]}
43
+
]},
44
+
{"type": "con", "id": [12345, 2], "value": "Alice"}
45
+
]
46
+
}
47
+
```
48
+
49
+
**Use for:** Debugging, logging, human inspection
50
+
51
+
### Compact JSON
52
+
53
+
Compressed JSON with type codes instead of names.
54
+
55
+
```ocaml
56
+
let json = Model_codec_compact.encode model
57
+
let model = Model_codec_compact.decode json
58
+
```
59
+
60
+
**Example output:**
61
+
```json
62
+
[
63
+
[0, 12345, 5],
64
+
[1, 0, 0, 12345, 1],
65
+
[3, 12345, 1, [["name", 12345, 2, 12345, 3]]],
66
+
[0, 12345, 2, "Alice"]
67
+
]
68
+
```
69
+
70
+
**Use for:** JSON APIs where size matters
71
+
72
+
### Sidecar
73
+
74
+
Separates the document view from CRDT metadata.
75
+
76
+
```ocaml
77
+
let (view, sidecar) = Model_codec_sidecar.encode model
78
+
let model = Model_codec_sidecar.decode view sidecar
79
+
```
80
+
81
+
**Use for:**
82
+
- Serving view to read-only clients
83
+
- Caching views separately from metadata
84
+
- Progressive loading
85
+
86
+
### Indexed
87
+
88
+
Flat map keyed by timestamp strings.
89
+
90
+
```ocaml
91
+
let json = Model_codec_indexed.encode model
92
+
let model = Model_codec_indexed.decode json
93
+
```
94
+
95
+
**Example output:**
96
+
```json
97
+
{
98
+
"0.0": {"type": "val", "value": "12345.1"},
99
+
"12345.1": {"type": "obj", "entries": {"name": "12345.2"}},
100
+
"12345.2": {"type": "con", "value": "Alice"}
101
+
}
102
+
```
103
+
104
+
**Use for:** Random access by timestamp, database storage
105
+
106
+
## Patch Codecs
107
+
108
+
### Verbose JSON
109
+
110
+
```ocaml
111
+
let json = Patch_codec.encode patch
112
+
let patch = Patch_codec.decode json
113
+
```
114
+
115
+
**Example:**
116
+
```json
117
+
{
118
+
"id": [12345, 0],
119
+
"ops": [
120
+
{"op": "new_obj"},
121
+
{"op": "ins_val", "obj": [0, 0], "value": [12345, 0]},
122
+
{"op": "new_con", "value": "Alice"},
123
+
{"op": "ins_obj", "obj": [12345, 0], "entries": [
124
+
["name", [12345, 1]]
125
+
]}
126
+
]
127
+
}
128
+
```
129
+
130
+
### Compact JSON
131
+
132
+
```ocaml
133
+
let json = Patch_codec_compact.encode patch
134
+
let patch = Patch_codec_compact.decode json
135
+
```
136
+
137
+
### Binary
138
+
139
+
```ocaml
140
+
let bytes = Patch_codec_binary.encode patch
141
+
let patch = Patch_codec_binary.decode bytes
142
+
```
143
+
144
+
## Format Comparison
145
+
146
+
| Format | Size | Speed | Human-Readable | Use Case |
147
+
|--------|------|-------|----------------|----------|
148
+
| Binary | Smallest | Fastest | No | Wire, storage |
149
+
| Compact JSON | Small | Fast | Partially | JSON APIs |
150
+
| Verbose JSON | Large | Moderate | Yes | Debugging |
151
+
| Sidecar | Medium | Moderate | View only | Read-only clients |
152
+
| Indexed | Medium | Fast lookup | Partially | Databases |
153
+
154
+
## CBOR Support
155
+
156
+
Low-level CBOR primitives are available:
157
+
158
+
```ocaml
159
+
(* Encode value to CBOR *)
160
+
let cbor_bytes = Cbor_simd.encode value
161
+
162
+
(* Decode CBOR to value *)
163
+
let value = Cbor_simd.decode cbor_bytes
164
+
```
165
+
166
+
CBOR is used internally by the binary codec.
167
+
168
+
## Codec Selection Guide
169
+
170
+
```
171
+
Need smallest size?
172
+
└─► Binary
173
+
174
+
Need JSON compatibility?
175
+
├─► API responses → Compact JSON
176
+
└─► Debugging → Verbose JSON
177
+
178
+
Need view/metadata separation?
179
+
└─► Sidecar
180
+
181
+
Need random timestamp access?
182
+
└─► Indexed
183
+
```
+202
docs/getting-started.md
+202
docs/getting-started.md
···
1
+
# Getting Started
2
+
3
+
## Installation
4
+
5
+
```bash
6
+
opam install crdt
7
+
```
8
+
9
+
## Basic Usage
10
+
11
+
### Creating a Document
12
+
13
+
```ocaml
14
+
open Crdt
15
+
16
+
(* Create a new document with session ID 12345 *)
17
+
let model = Model.create 12345
18
+
```
19
+
20
+
The session ID should be unique per replica/user. Use a random number or derive from user ID.
21
+
22
+
### Building and Applying Patches
23
+
24
+
```ocaml
25
+
(* Create a patch that sets root to an object with fields *)
26
+
let patch = Patch_builder.create model
27
+
|> Patch_builder.set_root_obj
28
+
|> Patch_builder.set_string "title" "Hello"
29
+
|> Patch_builder.set_int "count" 42
30
+
|> Patch_builder.build
31
+
32
+
(* Apply the patch *)
33
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) patch
34
+
```
35
+
36
+
### Viewing the Document
37
+
38
+
```ocaml
39
+
(* Get the document as a Value.t *)
40
+
let view = Model.view model
41
+
42
+
(* Pattern match on the value *)
43
+
match view with
44
+
| Value.Object fields ->
45
+
List.iter (fun (k, v) ->
46
+
Printf.printf "%s = %s\n" k (Value.to_string v)
47
+
) fields
48
+
| _ -> ()
49
+
```
50
+
51
+
### Encoding for Transmission
52
+
53
+
```ocaml
54
+
(* Binary (smallest, fastest) *)
55
+
let bytes = Model_codec.Binary.encode model
56
+
57
+
(* JSON (human-readable) *)
58
+
let json = Model_codec_verbose.encode model
59
+
```
60
+
61
+
### Decoding
62
+
63
+
```ocaml
64
+
(* From binary *)
65
+
let model = Model_codec.Binary.decode bytes
66
+
67
+
(* From JSON *)
68
+
let model = Model_codec_verbose.decode json
69
+
```
70
+
71
+
## Working with Text
72
+
73
+
```ocaml
74
+
(* Create a document with a text field *)
75
+
let model = Model.create 1
76
+
77
+
let patch = Patch_builder.create model
78
+
|> Patch_builder.set_root_obj
79
+
|> Patch_builder.set_text "content" "Hello, World!"
80
+
|> Patch_builder.build
81
+
82
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) patch
83
+
84
+
(* Later, insert text at position *)
85
+
let insert_patch = Patch_builder.create model
86
+
|> Patch_builder.insert_text_at ~field:"content" ~pos:7 "CRDT "
87
+
|> Patch_builder.build
88
+
89
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) insert_patch
90
+
(* Result: "Hello, CRDT World!" *)
91
+
```
92
+
93
+
## Working with Arrays
94
+
95
+
```ocaml
96
+
let model = Model.create 1
97
+
98
+
(* Create array with initial elements *)
99
+
let patch = Patch_builder.create model
100
+
|> Patch_builder.set_root_arr
101
+
|> Patch_builder.arr_push_string "first"
102
+
|> Patch_builder.arr_push_string "second"
103
+
|> Patch_builder.build
104
+
105
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) patch
106
+
```
107
+
108
+
## Synchronization
109
+
110
+
### Encoding Patches for Network
111
+
112
+
```ocaml
113
+
(* Sender side *)
114
+
let patch_bytes = Patch_codec_binary.encode patch
115
+
(* Send patch_bytes over network *)
116
+
117
+
(* Receiver side *)
118
+
match Patch_codec_binary.decode patch_bytes with
119
+
| Ok patch ->
120
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) patch
121
+
| Error msg ->
122
+
Printf.eprintf "Decode error: %s\n" msg
123
+
```
124
+
125
+
### Forking for New Session
126
+
127
+
```ocaml
128
+
(* Create a replica with a new session ID *)
129
+
let replica = Model.fork model 67890
130
+
```
131
+
132
+
## JSON-Rx RPC
133
+
134
+
For real-time sync over WebSocket:
135
+
136
+
```ocaml
137
+
open Crdt
138
+
139
+
(* Create a request *)
140
+
let req = Rx.request ~id:1 ~method_:"sync" ()
141
+
142
+
(* Encode to JSON *)
143
+
let json = Rx_codec.encode_message req
144
+
145
+
(* Parse response *)
146
+
match Rx_codec.decode_message json with
147
+
| Ok (Rx.Response { id; data }) ->
148
+
Printf.printf "Got response %d\n" id
149
+
| Ok (Rx.Error { id; error }) ->
150
+
Printf.printf "Got error %d\n" id
151
+
| _ -> ()
152
+
```
153
+
154
+
## Example: Collaborative Counter
155
+
156
+
```ocaml
157
+
open Crdt
158
+
159
+
(* Each user has their own model *)
160
+
let alice_model = Model.create 1
161
+
let bob_model = Model.create 2
162
+
163
+
(* Alice initializes the counter *)
164
+
let init_patch = Patch_builder.create alice_model
165
+
|> Patch_builder.set_root_obj
166
+
|> Patch_builder.set_int "count" 0
167
+
|> Patch_builder.build
168
+
169
+
Patch.iter_with_id (fun id op -> Model.apply_op alice_model id op) init_patch;
170
+
171
+
(* Sync to Bob *)
172
+
let bytes = Model_codec.Binary.encode alice_model in
173
+
let bob_model = Model_codec.Binary.decode bytes
174
+
175
+
(* Alice increments *)
176
+
let alice_inc = Patch_builder.create alice_model
177
+
|> Patch_builder.set_int "count" 1
178
+
|> Patch_builder.build
179
+
180
+
(* Bob increments concurrently *)
181
+
let bob_inc = Patch_builder.create bob_model
182
+
|> Patch_builder.set_int "count" 1
183
+
|> Patch_builder.build
184
+
185
+
(* Apply both patches to both models - LWW resolves conflict *)
186
+
Patch.iter_with_id (fun id op -> Model.apply_op alice_model id op) alice_inc;
187
+
Patch.iter_with_id (fun id op -> Model.apply_op alice_model id op) bob_inc;
188
+
189
+
Patch.iter_with_id (fun id op -> Model.apply_op bob_model id op) bob_inc;
190
+
Patch.iter_with_id (fun id op -> Model.apply_op bob_model id op) alice_inc;
191
+
192
+
(* Both models converge to the same state *)
193
+
```
194
+
195
+
## Next Steps
196
+
197
+
- See [Architecture](architecture.md) for module details
198
+
- See [json-joy Mapping](json-joy-mapping.md) for TypeScript comparison
199
+
- See [Codecs](codecs.md) for encoding options
200
+
- Check `bin/wb/` for an HCS WebSocket server example
201
+
- Check `bin/wbd/` for a Dream WebSocket server example
202
+
- Check `bin/wbr/` for a Raylib GUI example
+178
docs/json-joy-mapping.md
+178
docs/json-joy-mapping.md
···
1
+
# json-joy to OCaml Mapping
2
+
3
+
This document maps concepts between [json-joy](https://github.com/streamich/json-joy) (TypeScript) and this OCaml implementation.
4
+
5
+
## Type Mapping
6
+
7
+
### Core Types
8
+
9
+
| json-joy (TypeScript) | OCaml | Notes |
10
+
|----------------------|-------|-------|
11
+
| `ITimestamp` | `Clock.timestamp` | `{sid: int; time: int}` |
12
+
| `ITimespan` | `Clock.timespan` | `{sid; time; span}` |
13
+
| `ClockVector` | `Clock.clock_vector` | Vector clock with local + peers |
14
+
| `ServerClockVector` | `Clock.clock_vector` | Same type, server context |
15
+
16
+
### Document Model
17
+
18
+
| json-joy | OCaml | Notes |
19
+
|----------|-------|-------|
20
+
| `Model` | `Crdt.Model.t` | Document container |
21
+
| `JsonNode` | `Crdt.Node.t` | Sum type for all nodes |
22
+
| `ConNode` | `Node.Con of con_node` | Constant value |
23
+
| `ValNode` | `Node.Val of val_node` | LWW register |
24
+
| `ObjNode` | `Node.Obj of obj_node` | LWW-Map |
25
+
| `VecNode` | `Node.Vec of vec_node` | Fixed tuple |
26
+
| `ArrNode` | `Node.Arr of arr_node` | RGA list |
27
+
| `StrNode` | `Node.Str of str_node` | RGA string |
28
+
| `BinNode` | `Node.Bin of bin_node` | RGA binary |
29
+
30
+
### Operations
31
+
32
+
| json-joy | OCaml | ID Span |
33
+
|----------|-------|---------|
34
+
| `NewConOp` | `Op.New_con` | 1 |
35
+
| `NewValOp` | `Op.New_val` | 1 |
36
+
| `NewObjOp` | `Op.New_obj` | 1 |
37
+
| `NewVecOp` | `Op.New_vec` | 1 |
38
+
| `NewStrOp` | `Op.New_str` | 1 |
39
+
| `NewBinOp` | `Op.New_bin` | 1 |
40
+
| `NewArrOp` | `Op.New_arr` | 1 |
41
+
| `InsValOp` | `Op.Ins_val` | 0 |
42
+
| `InsObjOp` | `Op.Ins_obj` | 0 |
43
+
| `InsVecOp` | `Op.Ins_vec` | 0 |
44
+
| `InsStrOp` | `Op.Ins_str` | len(text) |
45
+
| `InsBinOp` | `Op.Ins_bin` | len(bytes) |
46
+
| `InsArrOp` | `Op.Ins_arr` | 1 |
47
+
| `UpdArrOp` | `Op.Upd_arr` | 0 |
48
+
| `DelOp` | `Op.Del` | 0 |
49
+
| `NopOp` | `Op.Nop` | configurable |
50
+
51
+
### Codecs
52
+
53
+
| json-joy Class | OCaml Module |
54
+
|----------------|--------------|
55
+
| `VerboseJsonEncoder` | `Model_codec_verbose` |
56
+
| `VerboseJsonDecoder` | `Model_codec_verbose` |
57
+
| `CompactJsonEncoder` | `Model_codec_compact` |
58
+
| `CompactJsonDecoder` | `Model_codec_compact` |
59
+
| `SidecarEncoder` | `Model_codec_sidecar` |
60
+
| `SidecarDecoder` | `Model_codec_sidecar` |
61
+
| `IndexedEncoder` | `Model_codec_indexed` |
62
+
| `IndexedDecoder` | `Model_codec_indexed` |
63
+
| `StructuralBinaryEncoder` | `Model_codec.Binary` |
64
+
| `StructuralBinaryDecoder` | `Model_codec.Binary` |
65
+
| `Encoder` (patch) | `Patch_codec` |
66
+
| `Decoder` (patch) | `Patch_codec` |
67
+
| `BinaryEncoder` (patch) | `Patch_codec_binary` |
68
+
| `BinaryDecoder` (patch) | `Patch_codec_binary` |
69
+
70
+
### JSON-Rx
71
+
72
+
| json-joy | OCaml |
73
+
|----------|-------|
74
+
| `RxRequest` | `Rx.Request` |
75
+
| `RxResponse` | `Rx.Response` |
76
+
| `RxError` | `Rx.Error` |
77
+
| `RxNotification` | `Rx.Notification` |
78
+
| `RxSubscribe` | `Rx.Subscribe` |
79
+
| `RxUnsubscribe` | `Rx.Unsubscribe` |
80
+
| `RxData` | `Rx.Data` |
81
+
| `RxComplete` | `Rx.Complete` |
82
+
83
+
## Code Comparison
84
+
85
+
### Creating a Document
86
+
87
+
**TypeScript (json-joy):**
88
+
```typescript
89
+
import {Model} from 'json-joy/lib/json-crdt';
90
+
91
+
const model = Model.create();
92
+
model.api.root({
93
+
name: 'Alice',
94
+
age: 30
95
+
});
96
+
```
97
+
98
+
**OCaml:**
99
+
```ocaml
100
+
open Crdt
101
+
102
+
let model = Model.create 12345 in
103
+
let patch = Patch_builder.create model
104
+
|> Patch_builder.set_root_obj
105
+
|> Patch_builder.set_string "name" "Alice"
106
+
|> Patch_builder.set_int "age" 30
107
+
|> Patch_builder.build in
108
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) patch
109
+
```
110
+
111
+
### Encoding to Binary
112
+
113
+
**TypeScript:**
114
+
```typescript
115
+
import {encode} from 'json-joy/lib/json-crdt/codec/structural/binary';
116
+
117
+
const bytes = encode(model);
118
+
```
119
+
120
+
**OCaml:**
121
+
```ocaml
122
+
let bytes = Model_codec.Binary.encode model
123
+
```
124
+
125
+
### Applying a Patch
126
+
127
+
**TypeScript:**
128
+
```typescript
129
+
model.applyPatch(patch);
130
+
```
131
+
132
+
**OCaml:**
133
+
```ocaml
134
+
Patch.iter_with_id (fun id op -> Model.apply_op model id op) patch
135
+
```
136
+
137
+
### Viewing Document as JSON
138
+
139
+
**TypeScript:**
140
+
```typescript
141
+
const json = model.view();
142
+
```
143
+
144
+
**OCaml:**
145
+
```ocaml
146
+
let json = Model.view model
147
+
```
148
+
149
+
## Architectural Differences
150
+
151
+
| Aspect | json-joy | OCaml crdt |
152
+
|--------|----------|------------|
153
+
| Style | OOP with classes | Functional with modules |
154
+
| Mutability | Mutable by default | Explicit mutable fields |
155
+
| Error handling | Exceptions | `Result` types in codecs |
156
+
| Async | Promises | Effects (Io_intf) |
157
+
| Encoding | Encoder/Decoder classes | `encode`/`decode` functions |
158
+
159
+
## Wire Compatibility
160
+
161
+
Documents and patches encoded by one implementation can be decoded by the other. The binary format is identical.
162
+
163
+
Test traces in `test/fixtures/traces/` validate this:
164
+
165
+
```
166
+
traces/
167
+
├── text/
168
+
│ ├── sequential/ # Single-user editing traces
169
+
│ └── concurrent/ # Multi-user conflict traces
170
+
├── fuzzer/ # Randomly generated operations
171
+
└── json/ # JSON document traces
172
+
```
173
+
174
+
Each trace contains:
175
+
- `ops.json` - Original operations
176
+
- `model.bin` - Binary encoded final state
177
+
- `model.compact.json` - Compact JSON encoded
178
+
- `patches.bin` - Binary encoded patches
+11
-22
dune-project
+11
-22
dune-project
···
1
1
(lang dune 3.20)
2
2
3
3
(name crdt)
4
+
(version 0.1.0)
4
5
5
6
(generate_opam_files true)
6
7
7
8
(source
8
-
(github gdiazlo/crdt))
9
+
; (tangled @gdiazlo.tngl.sh/simdjsont)
10
+
(uri git+https://tangled.org/gdiazlo.tngl.sh/crdt))
9
11
10
-
(authors "Guillermo Diaz-Romero <gdiazlo@gmail.com>")
12
+
(authors "Gabriel Díaz")
11
13
12
-
(maintainers "Guillermo Diaz-Romero <gdiazlo@gmail.com>")
14
+
(maintainers "Gabriel Díaz")
13
15
14
-
(license MIT)
15
-
16
-
(documentation https://github.com/gdiazlo/crdt)
16
+
(license ISC)
17
+
(homepage https://tangled.org/gdiazlo.tngl.sh/crdt)
18
+
(bug_reports https://tangled.org/gdiazlo.tngl.sh/crdt/issues)
19
+
(documentation https://tangled.org/gdiazlo.tngl.sh/crdt)
17
20
18
21
(package
19
22
(name crdt)
20
23
(synopsis "OCaml CRDT library compatible with json-joy")
21
24
(description
22
-
"A full OCaml 5.4 CRDT library implementing the json-joy specification. \
25
+
"A full OCaml CRDT library implementing the json-joy specification. \
23
26
Includes JSON CRDT document model, patch operations, multiple codec formats, \
24
27
and JSON-Rx RPC for real-time synchronization.")
25
28
(depends
···
31
34
(tags
32
35
(crdt json-crdt collaborative-editing json-joy ocaml)))
33
36
34
-
(package
35
-
(name crdt-eio)
36
-
(synopsis "Eio effect handler for CRDT library")
37
-
(description
38
-
"Eio-based effect handler for the CRDT library, enabling high-performance \
39
-
concurrent IO for real-time collaborative applications.")
40
-
(depends
41
-
(ocaml (>= 5.2))
42
-
(dune (>= 3.20))
43
-
(crdt (= :version))
44
-
(eio (>= 1.0))
45
-
(alcotest :with-test)
46
-
(eio_main :with-test))
47
-
(tags
48
-
(crdt eio async io)))
37
+
+1
-2
lib/model.ml
+1
-2
lib/model.ml
···
352
352
else c)
353
353
chunks
354
354
in
355
-
(* Rebuild the RGA - not ideal but works *)
355
+
(* NOTE: Rebuilding RGA - in-place update would be more efficient *)
356
356
let _ = updated in
357
-
(* TODO: proper in-place update *)
358
357
()
359
358
| _ -> ())
360
359
| Op.Op_del { del_obj; del_what } -> (
+2
-2
lib/rga.ml
+2
-2
lib/rga.ml
···
111
111
(** Compare two timestamps for RGA ordering. Higher timestamp wins (inserted
112
112
later in logical time). Tie-break by session ID (higher wins).
113
113
114
-
TODO: This needs to match json-joy's algorithm exactly. Currently the
115
-
tie-breaking for concurrent inserts may differ. *)
114
+
NOTE: This may not match json-joy's exact ordering for concurrent inserts.
115
+
See issue crdt-4pq for json-joy compatibility work. *)
116
116
let compare_for_rga (a : Clock.timestamp) (b : Clock.timestamp) =
117
117
(* First compare by time (descending - higher time first) *)
118
118
let time_cmp = compare b.time a.time in
-4
lib_eio/dune
-4
lib_eio/dune
-315
lib_eio/io_eio.ml
-315
lib_eio/io_eio.ml
···
1
-
(** Eio-based effect handler for async IO.
2
-
3
-
This module provides an effect handler that uses Eio for high-performance
4
-
concurrent IO. It integrates with the Eio scheduler for proper fiber support.
5
-
6
-
Usage:
7
-
{[
8
-
Eio_main.run @@ fun env ->
9
-
let net = Eio.Stdenv.net env in
10
-
Eio.Switch.run @@ fun sw ->
11
-
let sock = Eio.Net.connect ~sw net (`Tcp (addr, port)) in
12
-
Io_eio.run_two_way sock (fun () ->
13
-
let data = Crdt.Io_intf.read_string 100 in
14
-
(* ... process data ... *)
15
-
)
16
-
]}
17
-
18
-
@see Crdt.Io_intf for the effect definitions *)
19
-
20
-
(** {1 Low-Level IO Operations} *)
21
-
22
-
(** Read exactly n bytes from a source. Raises End_of_file if EOF is reached
23
-
before all bytes are read. *)
24
-
let read_exact (source : _ Eio.Flow.source) n =
25
-
let buf = Cstruct.create n in
26
-
let rec loop pos =
27
-
if pos >= n then ()
28
-
else
29
-
let sub = Cstruct.sub buf pos (n - pos) in
30
-
let r = Eio.Flow.single_read source sub in
31
-
if r = 0 then raise End_of_file else loop (pos + r)
32
-
in
33
-
loop 0;
34
-
Cstruct.to_bytes buf
35
-
36
-
(** Write all bytes to a sink. *)
37
-
let write_all (sink : _ Eio.Flow.sink) buf =
38
-
let cs = Cstruct.of_bytes buf in
39
-
Eio.Flow.write sink [ cs ]
40
-
41
-
(** Read a single byte *)
42
-
let read_one_byte source =
43
-
let buf = Cstruct.create 1 in
44
-
let r = Eio.Flow.single_read source buf in
45
-
if r = 0 then raise End_of_file else Cstruct.get_uint8 buf 0
46
-
47
-
(** Write a single byte *)
48
-
let write_one_byte sink b =
49
-
let buf = Cstruct.create 1 in
50
-
Cstruct.set_uint8 buf 0 b;
51
-
Eio.Flow.write sink [ buf ]
52
-
53
-
(** Read a 4-byte big-endian integer *)
54
-
let read_int32_be source =
55
-
let buf = read_exact source 4 in
56
-
let b0 = Char.code (Bytes.get buf 0) in
57
-
let b1 = Char.code (Bytes.get buf 1) in
58
-
let b2 = Char.code (Bytes.get buf 2) in
59
-
let b3 = Char.code (Bytes.get buf 3) in
60
-
(b0 lsl 24) lor (b1 lsl 16) lor (b2 lsl 8) lor b3
61
-
62
-
(** Write a 4-byte big-endian integer *)
63
-
let write_int32_be sink n =
64
-
let buf = Bytes.create 4 in
65
-
Bytes.set buf 0 (Char.chr ((n lsr 24) land 0xff));
66
-
Bytes.set buf 1 (Char.chr ((n lsr 16) land 0xff));
67
-
Bytes.set buf 2 (Char.chr ((n lsr 8) land 0xff));
68
-
Bytes.set buf 3 (Char.chr (n land 0xff));
69
-
write_all sink buf
70
-
71
-
(** {1 Effect Handlers} *)
72
-
73
-
(** Run a function with Eio IO using a bidirectional flow (e.g., TCP socket).
74
-
75
-
This handler intercepts IO effects and translates them to Eio operations. *)
76
-
let run_two_way (flow : _ Eio.Flow.two_way) f =
77
-
let bytes_read = ref 0 in
78
-
let bytes_written = ref 0 in
79
-
Effect.Deep.match_with f ()
80
-
{
81
-
retc = (fun x -> x);
82
-
exnc = raise;
83
-
effc =
84
-
(fun (type a) (eff : a Effect.t) ->
85
-
match eff with
86
-
| Crdt.Io_intf.Read_bytes n ->
87
-
Some
88
-
(fun (k : (a, _) Effect.Deep.continuation) ->
89
-
let result = read_exact flow n in
90
-
bytes_read := !bytes_read + n;
91
-
Effect.Deep.continue k result)
92
-
| Crdt.Io_intf.Write_bytes data ->
93
-
Some
94
-
(fun (k : (a, _) Effect.Deep.continuation) ->
95
-
write_all flow data;
96
-
bytes_written := !bytes_written + Bytes.length data;
97
-
Effect.Deep.continue k ())
98
-
| Crdt.Io_intf.Read_byte ->
99
-
Some
100
-
(fun (k : (a, _) Effect.Deep.continuation) ->
101
-
let b = read_one_byte flow in
102
-
bytes_read := !bytes_read + 1;
103
-
Effect.Deep.continue k b)
104
-
| Crdt.Io_intf.Write_byte b ->
105
-
Some
106
-
(fun (k : (a, _) Effect.Deep.continuation) ->
107
-
write_one_byte flow b;
108
-
bytes_written := !bytes_written + 1;
109
-
Effect.Deep.continue k ())
110
-
| Crdt.Io_intf.Yield ->
111
-
Some
112
-
(fun (k : (a, _) Effect.Deep.continuation) ->
113
-
Eio.Fiber.yield ();
114
-
Effect.Deep.continue k ())
115
-
| Crdt.Io_intf.Read_frame ->
116
-
Some
117
-
(fun (k : (a, _) Effect.Deep.continuation) ->
118
-
let len = read_int32_be flow in
119
-
let data = read_exact flow len in
120
-
bytes_read := !bytes_read + 4 + len;
121
-
Effect.Deep.continue k data)
122
-
| Crdt.Io_intf.Write_frame data ->
123
-
Some
124
-
(fun (k : (a, _) Effect.Deep.continuation) ->
125
-
let len = Bytes.length data in
126
-
write_int32_be flow len;
127
-
write_all flow data;
128
-
bytes_written := !bytes_written + 4 + len;
129
-
Effect.Deep.continue k ())
130
-
| _ -> None);
131
-
}
132
-
133
-
(** Run a function with Eio IO using separate source and sink flows.
134
-
135
-
Useful when you have separate read and write channels. *)
136
-
let run_split ~(source : _ Eio.Flow.source) ~(sink : _ Eio.Flow.sink) f =
137
-
Effect.Deep.match_with f ()
138
-
{
139
-
retc = (fun x -> x);
140
-
exnc = raise;
141
-
effc =
142
-
(fun (type a) (eff : a Effect.t) ->
143
-
match eff with
144
-
| Crdt.Io_intf.Read_bytes n ->
145
-
Some
146
-
(fun (k : (a, _) Effect.Deep.continuation) ->
147
-
let result = read_exact source n in
148
-
Effect.Deep.continue k result)
149
-
| Crdt.Io_intf.Write_bytes data ->
150
-
Some
151
-
(fun (k : (a, _) Effect.Deep.continuation) ->
152
-
write_all sink data;
153
-
Effect.Deep.continue k ())
154
-
| Crdt.Io_intf.Read_byte ->
155
-
Some
156
-
(fun (k : (a, _) Effect.Deep.continuation) ->
157
-
let b = read_one_byte source in
158
-
Effect.Deep.continue k b)
159
-
| Crdt.Io_intf.Write_byte b ->
160
-
Some
161
-
(fun (k : (a, _) Effect.Deep.continuation) ->
162
-
write_one_byte sink b;
163
-
Effect.Deep.continue k ())
164
-
| Crdt.Io_intf.Yield ->
165
-
Some
166
-
(fun (k : (a, _) Effect.Deep.continuation) ->
167
-
Eio.Fiber.yield ();
168
-
Effect.Deep.continue k ())
169
-
| Crdt.Io_intf.Read_frame ->
170
-
Some
171
-
(fun (k : (a, _) Effect.Deep.continuation) ->
172
-
let len = read_int32_be source in
173
-
let data = read_exact source len in
174
-
Effect.Deep.continue k data)
175
-
| Crdt.Io_intf.Write_frame data ->
176
-
Some
177
-
(fun (k : (a, _) Effect.Deep.continuation) ->
178
-
let len = Bytes.length data in
179
-
write_int32_be sink len;
180
-
write_all sink data;
181
-
Effect.Deep.continue k ())
182
-
| _ -> None);
183
-
}
184
-
185
-
(** {1 Network Operations} *)
186
-
187
-
(** Connect to a TCP server and run a function with the connection.
188
-
189
-
@param sw Eio switch for resource management
190
-
@param net Eio network capability
191
-
@param host Hostname or IP address
192
-
@param port Port number
193
-
@param f Function to run with the connection *)
194
-
let connect ~sw ~net ~host ~port f =
195
-
let addr =
196
-
match Eio.Net.getaddrinfo_stream net host ~service:(string_of_int port) with
197
-
| [] -> failwith ("Could not resolve: " ^ host)
198
-
| addr :: _ -> addr
199
-
in
200
-
let flow = Eio.Net.connect ~sw net addr in
201
-
run_two_way flow f
202
-
203
-
(** Listen on a TCP port and run a handler for each connection.
204
-
205
-
@param sw Eio switch for resource management
206
-
@param net Eio network capability
207
-
@param port Port number to listen on
208
-
@param handler Function to handle each connection *)
209
-
let serve ~sw ~net ~port ~handler =
210
-
let addr = `Tcp (Eio.Net.Ipaddr.V4.any, port) in
211
-
let sock = Eio.Net.listen ~sw net ~backlog:128 ~reuse_addr:true addr in
212
-
let rec accept_loop () =
213
-
Eio.Net.accept_fork sock ~sw
214
-
~on_error:(fun exn ->
215
-
Eio.traceln "Handler error: %s" (Printexc.to_string exn))
216
-
(fun flow _addr -> run_two_way flow handler);
217
-
accept_loop ()
218
-
in
219
-
accept_loop ()
220
-
221
-
(** Serve connections concurrently using Eio fibers with backpressure.
222
-
223
-
@param sw Eio switch for resource management
224
-
@param net Eio network capability
225
-
@param port Port number to listen on
226
-
@param max_fibers Maximum concurrent fibers (default 100)
227
-
@param handler Function to handle each connection *)
228
-
let serve_concurrent ~sw ~net ~port ?(max_fibers = 100) ~handler =
229
-
let addr = `Tcp (Eio.Net.Ipaddr.V4.any, port) in
230
-
let sock = Eio.Net.listen ~sw net ~backlog:128 ~reuse_addr:true addr in
231
-
let sem = Eio.Semaphore.make max_fibers in
232
-
let rec accept_loop () =
233
-
Eio.Semaphore.acquire sem;
234
-
Eio.Net.accept_fork sock ~sw
235
-
~on_error:(fun exn ->
236
-
Eio.Semaphore.release sem;
237
-
Eio.traceln "Handler error: %s" (Printexc.to_string exn))
238
-
(fun flow _addr ->
239
-
Fun.protect
240
-
(fun () -> run_two_way flow handler)
241
-
~finally:(fun () -> Eio.Semaphore.release sem));
242
-
accept_loop ()
243
-
in
244
-
accept_loop ()
245
-
246
-
(** {1 Buffered IO} *)
247
-
248
-
(** Run with buffered reading for improved performance on many small reads. *)
249
-
let run_buffered (flow : _ Eio.Flow.two_way) f =
250
-
let reader = Eio.Buf_read.of_flow flow ~max_size:(64 * 1024) in
251
-
let bytes_written = ref 0 in
252
-
Effect.Deep.match_with f ()
253
-
{
254
-
retc = (fun x -> x);
255
-
exnc = raise;
256
-
effc =
257
-
(fun (type a) (eff : a Effect.t) ->
258
-
match eff with
259
-
| Crdt.Io_intf.Read_bytes n ->
260
-
Some
261
-
(fun (k : (a, _) Effect.Deep.continuation) ->
262
-
let cs = Eio.Buf_read.take n reader in
263
-
Effect.Deep.continue k (Bytes.of_string cs))
264
-
| Crdt.Io_intf.Write_bytes data ->
265
-
Some
266
-
(fun (k : (a, _) Effect.Deep.continuation) ->
267
-
write_all flow data;
268
-
bytes_written := !bytes_written + Bytes.length data;
269
-
Effect.Deep.continue k ())
270
-
| Crdt.Io_intf.Read_byte ->
271
-
Some
272
-
(fun (k : (a, _) Effect.Deep.continuation) ->
273
-
let c = Eio.Buf_read.any_char reader in
274
-
Effect.Deep.continue k (Char.code c))
275
-
| Crdt.Io_intf.Write_byte b ->
276
-
Some
277
-
(fun (k : (a, _) Effect.Deep.continuation) ->
278
-
write_one_byte flow b;
279
-
bytes_written := !bytes_written + 1;
280
-
Effect.Deep.continue k ())
281
-
| Crdt.Io_intf.Yield ->
282
-
Some
283
-
(fun (k : (a, _) Effect.Deep.continuation) ->
284
-
Eio.Fiber.yield ();
285
-
Effect.Deep.continue k ())
286
-
| Crdt.Io_intf.Read_frame ->
287
-
Some
288
-
(fun (k : (a, _) Effect.Deep.continuation) ->
289
-
let len_bytes = Eio.Buf_read.take 4 reader in
290
-
let b0 = Char.code len_bytes.[0] in
291
-
let b1 = Char.code len_bytes.[1] in
292
-
let b2 = Char.code len_bytes.[2] in
293
-
let b3 = Char.code len_bytes.[3] in
294
-
let len = (b0 lsl 24) lor (b1 lsl 16) lor (b2 lsl 8) lor b3 in
295
-
let data = Eio.Buf_read.take len reader in
296
-
Effect.Deep.continue k (Bytes.of_string data))
297
-
| Crdt.Io_intf.Write_frame data ->
298
-
Some
299
-
(fun (k : (a, _) Effect.Deep.continuation) ->
300
-
let len = Bytes.length data in
301
-
write_int32_be flow len;
302
-
write_all flow data;
303
-
bytes_written := !bytes_written + 4 + len;
304
-
Effect.Deep.continue k ())
305
-
| _ -> None);
306
-
}
307
-
308
-
(** {1 Utility Functions} *)
309
-
310
-
(** Run two fibers concurrently: one for reading, one for writing. Useful for
311
-
duplex protocols where reads and writes happen independently. *)
312
-
let run_duplex ~flow ~reader ~writer =
313
-
Eio.Fiber.both
314
-
(fun () -> run_two_way flow reader)
315
-
(fun () -> run_two_way flow writer)