crdt library in ocaml implementing json-joy
at main 8.0 kB view raw
1(** JSON-Rx server router/dispatcher. 2 3 This module implements server-side RPC infrastructure for JSON-Rx protocol: 4 - Method registration and routing 5 - Request/response handling 6 - Subscription management 7 - Error handling 8 9 The server uses OCaml 5.4 effects for transport abstraction, allowing it to 10 work with any IO backend (Eio, Lwt, Unix blocking). 11 12 @see <https://jsonjoy.com/specs/json-rx> JSON-Rx specification *) 13 14(** {1 Handler Types} *) 15 16(** Result type for method handlers *) 17type 'a handler_result = Ok of 'a | Error of Value.t 18 19type method_handler = Value.t option -> Value.t handler_result 20(** Method handler function type. Takes request data and returns a result or 21 error. *) 22 23type notification_handler = Value.t option -> unit 24(** Notification handler function type. Takes notification data and returns unit 25 (no response). *) 26 27type subscription_handler = (Value.t -> unit) -> unit -> unit 28(** Subscription handler function type. Takes subscription callback and returns 29 a cleanup function. The callback is called for each data item to push to the 30 client. Returns a function to call when the subscription ends. *) 31 32(** {1 Server State} *) 33 34type subscription = { id : int; channel : string; cleanup : unit -> unit } 35(** Active subscription *) 36 37type t = { 38 mutable methods : (string, method_handler) Hashtbl.t; 39 mutable notifications : (string, notification_handler) Hashtbl.t; 40 mutable channels : (string, subscription_handler) Hashtbl.t; 41 mutable subscriptions : (int, subscription) Hashtbl.t; 42 mutable next_sub_id : int; 43} 44(** Server state *) 45 46(** {1 Server Creation} *) 47 48(** Create a new server *) 49let create () = 50 { 51 methods = Hashtbl.create 16; 52 notifications = Hashtbl.create 16; 53 channels = Hashtbl.create 16; 54 subscriptions = Hashtbl.create 16; 55 next_sub_id = 1; 56 } 57 58(** {1 Method Registration} *) 59 60(** Register a method handler *) 61let register_method t name handler = Hashtbl.replace t.methods name handler 62 63(** Register a notification handler *) 64let register_notification t name handler = 65 Hashtbl.replace t.notifications name handler 66 67(** Register a subscription channel *) 68let register_channel t name handler = Hashtbl.replace t.channels name handler 69 70(** Unregister a method *) 71let unregister_method t name = Hashtbl.remove t.methods name 72 73(** Unregister a notification handler *) 74let unregister_notification t name = Hashtbl.remove t.notifications name 75 76(** Unregister a channel *) 77let unregister_channel t name = Hashtbl.remove t.channels name 78 79(** {1 Message Handling} *) 80 81(** Handle a request message *) 82let handle_request t id method_ data = 83 match Hashtbl.find_opt t.methods method_ with 84 | None -> 85 Rx.error ~id 86 ~error: 87 (Value.Object 88 [ 89 ("code", Value.Int (-32601)); 90 ("message", Value.String ("Method not found: " ^ method_)); 91 ]) 92 | Some handler -> ( 93 match handler data with 94 | Ok result -> Rx.response ~id ~data:result 95 | Error err -> Rx.error ~id ~error:err) 96 97(** Handle a notification message (no response) *) 98let handle_notification t method_ data = 99 match Hashtbl.find_opt t.notifications method_ with 100 | None -> () (* Silently ignore unknown notifications *) 101 | Some handler -> handler data 102 103type send_fn = Rx.message -> unit 104(** Send callback for subscriptions *) 105 106(** Handle a subscribe message *) 107let handle_subscribe t (send : send_fn) id channel = 108 match Hashtbl.find_opt t.channels channel with 109 | None -> 110 send 111 (Rx.error ~id 112 ~error: 113 (Value.Object 114 [ 115 ("code", Value.Int (-32602)); 116 ("message", Value.String ("Channel not found: " ^ channel)); 117 ])) 118 | Some handler -> 119 (* Create callback to send data to client *) 120 let callback data = send (Rx.data ~id ~data) in 121 (* Start subscription and get cleanup function *) 122 let cleanup = handler callback in 123 (* Store subscription *) 124 let sub = { id; channel; cleanup } in 125 Hashtbl.replace t.subscriptions id sub 126 127(** Handle an unsubscribe message *) 128let handle_unsubscribe t id = 129 match Hashtbl.find_opt t.subscriptions id with 130 | None -> () (* Already unsubscribed or never subscribed *) 131 | Some sub -> 132 sub.cleanup (); 133 Hashtbl.remove t.subscriptions id 134 135(** Handle an incoming message and return optional response *) 136let handle_message t (send : send_fn) (msg : Rx.message) : Rx.message option = 137 match msg with 138 | Rx.Request { id; method_; data } -> Some (handle_request t id method_ data) 139 | Rx.Notification { method_; data } -> 140 handle_notification t method_ data; 141 None 142 | Rx.Subscribe { id; channel } -> 143 handle_subscribe t send id channel; 144 None 145 | Rx.Unsubscribe { id } -> 146 handle_unsubscribe t id; 147 None 148 (* Server should not receive these messages *) 149 | Rx.Response _ | Rx.Error _ | Rx.Data _ | Rx.Complete _ -> None 150 151(** {1 Server Loop} 152 153 Main server loop that processes messages from a connection. *) 154 155(** Process a single message from the connection. Uses IO effects for 156 reading/writing. *) 157let process_one t (send : send_fn) : bool = 158 try 159 let frame = Effect.perform Io_intf.Read_frame in 160 let msg_str = Bytes.to_string frame in 161 match Rx_codec.Compact.decode msg_str with 162 | Error e -> 163 (* Send parse error *) 164 send 165 (Rx.error ~id:0 166 ~error: 167 (Value.Object 168 [ 169 ("code", Value.Int (-32700)); 170 ("message", Value.String ("Parse error: " ^ e)); 171 ])); 172 true 173 | Ok msg -> ( 174 match handle_message t send msg with 175 | Some response -> 176 send response; 177 true 178 | None -> true) 179 with End_of_file -> false 180 181(** Send a message to the connection using effects *) 182let send_message msg = 183 let encoded = Rx_codec.Compact.encode msg in 184 Effect.perform (Io_intf.Write_frame (Bytes.of_string encoded)) 185 186(** Run the server loop on a connection until it closes *) 187let run_connection t = 188 while process_one t send_message do 189 Effect.perform Io_intf.Yield 190 done; 191 (* Cleanup all subscriptions for this connection *) 192 Hashtbl.iter (fun _ sub -> sub.cleanup ()) t.subscriptions; 193 Hashtbl.clear t.subscriptions 194 195(** {1 Convenience Functions} *) 196 197(** Register an echo method that returns its input *) 198let register_echo t = 199 register_method t "echo" (fun data -> 200 Ok (Option.value data ~default:Value.Null)) 201 202(** Register a ping method that returns "pong" *) 203let register_ping t = 204 register_method t "ping" (fun _ -> Ok (Value.String "pong")) 205 206(** Create a server with common methods registered *) 207let create_with_defaults () = 208 let t = create () in 209 register_echo t; 210 register_ping t; 211 t 212 213(** {1 Batch Processing} 214 215 Process multiple messages at once (for testing or batched protocols). *) 216 217(** Process a list of messages and collect responses *) 218let process_batch t (messages : Rx.message list) : Rx.message list = 219 let responses = ref [] in 220 let send msg = responses := msg :: !responses in 221 List.iter 222 (fun msg -> 223 match handle_message t send msg with 224 | Some response -> send response 225 | None -> ()) 226 messages; 227 List.rev !responses 228 229(** {1 Statistics} *) 230 231(** Get the number of registered methods *) 232let method_count t = Hashtbl.length t.methods 233 234(** Get the number of registered notification handlers *) 235let notification_count t = Hashtbl.length t.notifications 236 237(** Get the number of registered channels *) 238let channel_count t = Hashtbl.length t.channels 239 240(** Get the number of active subscriptions *) 241let subscription_count t = Hashtbl.length t.subscriptions 242 243(** Get list of registered method names *) 244let method_names t = Hashtbl.fold (fun k _ acc -> k :: acc) t.methods [] 245 246(** Get list of registered channel names *) 247let channel_names t = Hashtbl.fold (fun k _ acc -> k :: acc) t.channels []