crdt library in ocaml implementing json-joy
1(** JSON-Rx server router/dispatcher.
2
3 This module implements server-side RPC infrastructure for JSON-Rx protocol:
4 - Method registration and routing
5 - Request/response handling
6 - Subscription management
7 - Error handling
8
9 The server uses OCaml 5.4 effects for transport abstraction, allowing it to
10 work with any IO backend (Eio, Lwt, Unix blocking).
11
12 @see <https://jsonjoy.com/specs/json-rx> JSON-Rx specification *)
13
14(** {1 Handler Types} *)
15
16(** Result type for method handlers *)
17type 'a handler_result = Ok of 'a | Error of Value.t
18
19type method_handler = Value.t option -> Value.t handler_result
20(** Method handler function type. Takes request data and returns a result or
21 error. *)
22
23type notification_handler = Value.t option -> unit
24(** Notification handler function type. Takes notification data and returns unit
25 (no response). *)
26
27type subscription_handler = (Value.t -> unit) -> unit -> unit
28(** Subscription handler function type. Takes subscription callback and returns
29 a cleanup function. The callback is called for each data item to push to the
30 client. Returns a function to call when the subscription ends. *)
31
32(** {1 Server State} *)
33
34type subscription = { id : int; channel : string; cleanup : unit -> unit }
35(** Active subscription *)
36
37type t = {
38 mutable methods : (string, method_handler) Hashtbl.t;
39 mutable notifications : (string, notification_handler) Hashtbl.t;
40 mutable channels : (string, subscription_handler) Hashtbl.t;
41 mutable subscriptions : (int, subscription) Hashtbl.t;
42 mutable next_sub_id : int;
43}
44(** Server state *)
45
46(** {1 Server Creation} *)
47
48(** Create a new server *)
49let create () =
50 {
51 methods = Hashtbl.create 16;
52 notifications = Hashtbl.create 16;
53 channels = Hashtbl.create 16;
54 subscriptions = Hashtbl.create 16;
55 next_sub_id = 1;
56 }
57
58(** {1 Method Registration} *)
59
60(** Register a method handler *)
61let register_method t name handler = Hashtbl.replace t.methods name handler
62
63(** Register a notification handler *)
64let register_notification t name handler =
65 Hashtbl.replace t.notifications name handler
66
67(** Register a subscription channel *)
68let register_channel t name handler = Hashtbl.replace t.channels name handler
69
70(** Unregister a method *)
71let unregister_method t name = Hashtbl.remove t.methods name
72
73(** Unregister a notification handler *)
74let unregister_notification t name = Hashtbl.remove t.notifications name
75
76(** Unregister a channel *)
77let unregister_channel t name = Hashtbl.remove t.channels name
78
79(** {1 Message Handling} *)
80
81(** Handle a request message *)
82let handle_request t id method_ data =
83 match Hashtbl.find_opt t.methods method_ with
84 | None ->
85 Rx.error ~id
86 ~error:
87 (Value.Object
88 [
89 ("code", Value.Int (-32601));
90 ("message", Value.String ("Method not found: " ^ method_));
91 ])
92 | Some handler -> (
93 match handler data with
94 | Ok result -> Rx.response ~id ~data:result
95 | Error err -> Rx.error ~id ~error:err)
96
97(** Handle a notification message (no response) *)
98let handle_notification t method_ data =
99 match Hashtbl.find_opt t.notifications method_ with
100 | None -> () (* Silently ignore unknown notifications *)
101 | Some handler -> handler data
102
103type send_fn = Rx.message -> unit
104(** Send callback for subscriptions *)
105
106(** Handle a subscribe message *)
107let handle_subscribe t (send : send_fn) id channel =
108 match Hashtbl.find_opt t.channels channel with
109 | None ->
110 send
111 (Rx.error ~id
112 ~error:
113 (Value.Object
114 [
115 ("code", Value.Int (-32602));
116 ("message", Value.String ("Channel not found: " ^ channel));
117 ]))
118 | Some handler ->
119 (* Create callback to send data to client *)
120 let callback data = send (Rx.data ~id ~data) in
121 (* Start subscription and get cleanup function *)
122 let cleanup = handler callback in
123 (* Store subscription *)
124 let sub = { id; channel; cleanup } in
125 Hashtbl.replace t.subscriptions id sub
126
127(** Handle an unsubscribe message *)
128let handle_unsubscribe t id =
129 match Hashtbl.find_opt t.subscriptions id with
130 | None -> () (* Already unsubscribed or never subscribed *)
131 | Some sub ->
132 sub.cleanup ();
133 Hashtbl.remove t.subscriptions id
134
135(** Handle an incoming message and return optional response *)
136let handle_message t (send : send_fn) (msg : Rx.message) : Rx.message option =
137 match msg with
138 | Rx.Request { id; method_; data } -> Some (handle_request t id method_ data)
139 | Rx.Notification { method_; data } ->
140 handle_notification t method_ data;
141 None
142 | Rx.Subscribe { id; channel } ->
143 handle_subscribe t send id channel;
144 None
145 | Rx.Unsubscribe { id } ->
146 handle_unsubscribe t id;
147 None
148 (* Server should not receive these messages *)
149 | Rx.Response _ | Rx.Error _ | Rx.Data _ | Rx.Complete _ -> None
150
151(** {1 Server Loop}
152
153 Main server loop that processes messages from a connection. *)
154
155(** Process a single message from the connection. Uses IO effects for
156 reading/writing. *)
157let process_one t (send : send_fn) : bool =
158 try
159 let frame = Effect.perform Io_intf.Read_frame in
160 let msg_str = Bytes.to_string frame in
161 match Rx_codec.Compact.decode msg_str with
162 | Error e ->
163 (* Send parse error *)
164 send
165 (Rx.error ~id:0
166 ~error:
167 (Value.Object
168 [
169 ("code", Value.Int (-32700));
170 ("message", Value.String ("Parse error: " ^ e));
171 ]));
172 true
173 | Ok msg -> (
174 match handle_message t send msg with
175 | Some response ->
176 send response;
177 true
178 | None -> true)
179 with End_of_file -> false
180
181(** Send a message to the connection using effects *)
182let send_message msg =
183 let encoded = Rx_codec.Compact.encode msg in
184 Effect.perform (Io_intf.Write_frame (Bytes.of_string encoded))
185
186(** Run the server loop on a connection until it closes *)
187let run_connection t =
188 while process_one t send_message do
189 Effect.perform Io_intf.Yield
190 done;
191 (* Cleanup all subscriptions for this connection *)
192 Hashtbl.iter (fun _ sub -> sub.cleanup ()) t.subscriptions;
193 Hashtbl.clear t.subscriptions
194
195(** {1 Convenience Functions} *)
196
197(** Register an echo method that returns its input *)
198let register_echo t =
199 register_method t "echo" (fun data ->
200 Ok (Option.value data ~default:Value.Null))
201
202(** Register a ping method that returns "pong" *)
203let register_ping t =
204 register_method t "ping" (fun _ -> Ok (Value.String "pong"))
205
206(** Create a server with common methods registered *)
207let create_with_defaults () =
208 let t = create () in
209 register_echo t;
210 register_ping t;
211 t
212
213(** {1 Batch Processing}
214
215 Process multiple messages at once (for testing or batched protocols). *)
216
217(** Process a list of messages and collect responses *)
218let process_batch t (messages : Rx.message list) : Rx.message list =
219 let responses = ref [] in
220 let send msg = responses := msg :: !responses in
221 List.iter
222 (fun msg ->
223 match handle_message t send msg with
224 | Some response -> send response
225 | None -> ())
226 messages;
227 List.rev !responses
228
229(** {1 Statistics} *)
230
231(** Get the number of registered methods *)
232let method_count t = Hashtbl.length t.methods
233
234(** Get the number of registered notification handlers *)
235let notification_count t = Hashtbl.length t.notifications
236
237(** Get the number of registered channels *)
238let channel_count t = Hashtbl.length t.channels
239
240(** Get the number of active subscriptions *)
241let subscription_count t = Hashtbl.length t.subscriptions
242
243(** Get list of registered method names *)
244let method_names t = Hashtbl.fold (fun k _ acc -> k :: acc) t.methods []
245
246(** Get list of registered channel names *)
247let channel_names t = Hashtbl.fold (fun k _ acc -> k :: acc) t.channels []