ocaml http/1, http/2 and websocket client and server library
at main 15 kB view raw
1(** HTTP/2 Server implementation using h2. 2 3 This module provides HTTP/2 server functionality built on the h2 library 4 with Eio for structured concurrency. *) 5 6open Eio.Std 7 8(** {1 Types} *) 9 10type request = { 11 meth : H2.Method.t; 12 target : string; 13 headers : H2.Headers.t; 14 body : string; 15} 16(** Request type exposed to handlers - same as Server.request for compatibility 17*) 18 19(** {1 Response Types} *) 20 21(** Response body - supports string, bigstring, streaming, and pre-built *) 22type response_body = 23 | Body_string of string 24 | Body_bigstring of Bigstringaf.t 25 | Body_prebuilt of { h2_response : H2.Response.t; body : Bigstringaf.t } 26 | Body_stream of { 27 content_length : int64 option; 28 next : unit -> Cstruct.t option; 29 } 30 31type response = { 32 status : H2.Status.t; 33 headers : (string * string) list; 34 response_body : response_body; 35} 36(** Response type with body variants *) 37 38type handler = request -> response 39 40(** {2 Optimized Response Constructors} *) 41 42let respond_opt ?(status = `OK) ?(headers = []) body = 43 { status; headers; response_body = Body_string body } 44 45let respond_bigstring ?(status = `OK) ?(headers = []) bstr = 46 { status; headers; response_body = Body_bigstring bstr } 47 48let respond_stream ?(status = `OK) ?(headers = []) ?content_length next = 49 { status; headers; response_body = Body_stream { content_length; next } } 50 51let respond_prebuilt h2_response body = 52 { 53 status = `OK; 54 headers = []; 55 response_body = Body_prebuilt { h2_response; body }; 56 } 57 58let make_h2_headers headers_list = H2.Headers.of_list headers_list 59 60let make_h2_response ?(status = `OK) headers = 61 H2.Response.create ~headers status 62 63(** Lock-free buffer pool using Treiber stack via Kcas for thread-safe pooling. 64*) 65module Read_buffer_pool : sig 66 val acquire : unit -> Bigstringaf.t * Cstruct.t 67 val release : Bigstringaf.t -> unit 68end = struct 69 let buffer_size = 0x4000 70 let max_pooled = 256 71 let pool : Bigstringaf.t list Kcas.Loc.t = Kcas.Loc.make [] 72 let pool_size : int Kcas.Loc.t = Kcas.Loc.make 0 73 74 let acquire () = 75 let buf = 76 Kcas.Xt.commit 77 { 78 tx = 79 (fun ~xt -> 80 match Kcas.Xt.get ~xt pool with 81 | [] -> None 82 | buf :: rest -> 83 Kcas.Xt.set ~xt pool rest; 84 Kcas.Xt.set ~xt pool_size (Kcas.Xt.get ~xt pool_size - 1); 85 Some buf); 86 } 87 in 88 let buf = 89 match buf with Some b -> b | None -> Bigstringaf.create buffer_size 90 in 91 (buf, Cstruct.of_bigarray buf ~off:0 ~len:buffer_size) 92 93 let release buf = 94 Kcas.Xt.commit 95 { 96 tx = 97 (fun ~xt -> 98 let size = Kcas.Xt.get ~xt pool_size in 99 if size < max_pooled then begin 100 Kcas.Xt.set ~xt pool (buf :: Kcas.Xt.get ~xt pool); 101 Kcas.Xt.set ~xt pool_size (size + 1) 102 end); 103 } 104end 105 106let set_tcp_nodelay flow = 107 match Eio_unix.Resource.fd_opt flow with 108 | None -> () 109 | Some fd -> 110 Eio_unix.Fd.use_exn "set_tcp_nodelay" fd (fun unix_fd -> 111 Unix.setsockopt unix_fd Unix.TCP_NODELAY true) 112 113let write_iovecs flow iovecs = 114 let cstructs = 115 List.map 116 (fun iov -> 117 Cstruct.of_bigarray ~off:iov.H2.IOVec.off ~len:iov.H2.IOVec.len 118 iov.H2.IOVec.buffer) 119 iovecs 120 in 121 Eio.Flow.write flow cstructs 122 123(** {1 Connection handling} *) 124 125type body_result = Ok_body of string | Body_too_large | Missing_path 126 127let respond_error reqd status body = 128 let headers = 129 H2.Headers.of_list 130 [ 131 ("content-length", string_of_int (String.length body)); 132 ("date", Date_cache.get ()); 133 ] 134 in 135 let resp = H2.Response.create ~headers status in 136 H2.Reqd.respond_with_string reqd resp body 137 138let handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size 139 handler flow = 140 let read_buffer, read_cstruct = Read_buffer_pool.acquire () in 141 Fun.protect ~finally:(fun () -> Read_buffer_pool.release read_buffer) 142 @@ fun () -> 143 let request_handler reqd = 144 let req = H2.Reqd.request reqd in 145 let body_reader = H2.Reqd.request_body reqd in 146 147 let body_result = 148 match req.meth with 149 | `GET | `HEAD -> 150 H2.Body.Reader.close body_reader; 151 Ok_body "" 152 | `POST | `PUT | `DELETE | `CONNECT | `OPTIONS | `TRACE | `Other _ -> 153 let body_buffer = Buffer.create 4096 in 154 let current_size = ref 0 in 155 let too_large = ref false in 156 let body_done_promise, body_done_resolver = Eio.Promise.create () in 157 let rec read_body () = 158 H2.Body.Reader.schedule_read body_reader 159 ~on_eof:(fun () -> Eio.Promise.resolve body_done_resolver ()) 160 ~on_read:(fun buf ~off ~len -> 161 let new_size = !current_size + len in 162 match max_body_size with 163 | Some max when Int64.of_int new_size > max -> 164 too_large := true; 165 H2.Body.Reader.close body_reader; 166 Eio.Promise.resolve body_done_resolver () 167 | _ -> 168 current_size := new_size; 169 Buffer.add_string body_buffer 170 (Bigstringaf.substring buf ~off ~len); 171 read_body ()) 172 in 173 read_body (); 174 Eio.Promise.await body_done_promise; 175 if !too_large then Body_too_large 176 else Ok_body (Buffer.contents body_buffer) 177 in 178 179 match body_result with 180 | Body_too_large -> 181 respond_error reqd (`Code 413) "Request Entity Too Large" 182 | Missing_path -> respond_error reqd `Bad_request "Missing :path header" 183 | Ok_body body -> ( 184 let target = 185 match H2.Headers.get req.headers ":path" with 186 | Some p -> Some p 187 | None -> None 188 in 189 match target with 190 | None -> respond_error reqd `Bad_request "Missing :path header" 191 | Some target -> ( 192 let request = 193 { meth = req.meth; target; headers = req.headers; body } 194 in 195 let handler_result = 196 match (clock, request_timeout) with 197 | Some clock, Some timeout -> 198 Eio.Time.with_timeout clock timeout (fun () -> 199 Ok (handler request)) 200 | _ -> Ok (handler request) 201 in 202 match handler_result with 203 | Error `Timeout -> respond_error reqd (`Code 408) "Request Timeout" 204 | Ok response -> ( 205 let date_header = ("date", Date_cache.get ()) in 206 match response.response_body with 207 | Body_string body -> 208 let headers = 209 H2.Headers.of_list 210 (date_header 211 :: ("content-length", string_of_int (String.length body)) 212 :: response.headers) 213 in 214 let resp = H2.Response.create ~headers response.status in 215 H2.Reqd.respond_with_string reqd resp body 216 | Body_bigstring bstr -> 217 let headers = 218 H2.Headers.of_list 219 (date_header 220 :: ( "content-length", 221 string_of_int (Bigstringaf.length bstr) ) 222 :: response.headers) 223 in 224 let resp = H2.Response.create ~headers response.status in 225 H2.Reqd.respond_with_bigstring reqd resp bstr 226 | Body_prebuilt { h2_response; body } -> 227 let headers = 228 H2.Headers.add h2_response.H2.Response.headers "date" 229 (Date_cache.get ()) 230 in 231 let resp = { h2_response with H2.Response.headers } in 232 H2.Reqd.respond_with_bigstring reqd resp body 233 | Body_stream { content_length; next } -> 234 let headers = 235 match content_length with 236 | Some len -> 237 H2.Headers.of_list 238 (date_header 239 :: ("content-length", Int64.to_string len) 240 :: response.headers) 241 | None -> 242 H2.Headers.of_list (date_header :: response.headers) 243 in 244 let resp = H2.Response.create ~headers response.status in 245 let body_writer = 246 H2.Reqd.respond_with_streaming reqd resp 247 in 248 let rec write_chunks () = 249 match next () with 250 | None -> H2.Body.Writer.close body_writer 251 | Some cs -> 252 H2.Body.Writer.write_bigstring body_writer ~off:0 253 ~len:(Cstruct.length cs) (Cstruct.to_bigarray cs); 254 let flushed, resolve = Eio.Promise.create () in 255 H2.Body.Writer.flush body_writer (fun _result -> 256 Eio.Promise.resolve resolve ()); 257 Eio.Promise.await flushed; 258 write_chunks () 259 in 260 write_chunks ()))) 261 in 262 263 let error_handler ?request:_ _error start_response = 264 let resp_body = 265 start_response (H2.Headers.of_list [ ("date", Date_cache.get ()) ]) 266 in 267 H2.Body.Writer.write_string resp_body "Internal Server Error"; 268 H2.Body.Writer.close resp_body 269 in 270 271 let conn = H2.Server_connection.create ~error_handler request_handler in 272 273 let shutdown = ref false in 274 275 let do_read () = 276 match (clock, read_timeout) with 277 | Some clock, Some timeout -> 278 Eio.Time.with_timeout clock timeout (fun () -> 279 Ok (Eio.Flow.single_read flow read_cstruct)) 280 | _ -> Ok (Eio.Flow.single_read flow read_cstruct) 281 in 282 283 let read_loop () = 284 let rec loop () = 285 if not !shutdown then 286 match H2.Server_connection.next_read_operation conn with 287 | `Read -> ( 288 match do_read () with 289 | Error `Timeout -> shutdown := true 290 | exception End_of_file -> 291 let _ = 292 H2.Server_connection.read_eof conn read_buffer ~off:0 ~len:0 293 in 294 shutdown := true 295 | Ok n -> 296 let _ = 297 H2.Server_connection.read conn read_buffer ~off:0 ~len:n 298 in 299 loop ()) 300 | `Close -> shutdown := true 301 in 302 loop () 303 in 304 305 let write_loop () = 306 let rec loop () = 307 if not !shutdown then 308 match H2.Server_connection.next_write_operation conn with 309 | `Write iovecs -> 310 write_iovecs flow iovecs; 311 let len = 312 List.fold_left (fun acc iov -> acc + iov.H2.IOVec.len) 0 iovecs 313 in 314 H2.Server_connection.report_write_result conn (`Ok len); 315 loop () 316 | `Yield -> 317 let continue = Eio.Promise.create () in 318 H2.Server_connection.yield_writer conn (fun () -> 319 Eio.Promise.resolve (snd continue) ()); 320 Eio.Promise.await (fst continue); 321 loop () 322 | `Close _ -> shutdown := true 323 in 324 loop () 325 in 326 327 Fiber.both read_loop write_loop 328 329(** {1 Public API} *) 330 331let run ~sw ~net ?clock ?(config = H1_server.default_config) handler = 332 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in 333 let socket = 334 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr 335 ~reuse_port:config.reuse_port net addr 336 in 337 traceln "HTTP/2 Server listening on port %d" config.port; 338 let max_body_size = config.max_body_size in 339 let read_timeout = Some config.read_timeout in 340 let request_timeout = Some config.request_timeout in 341 let connection_handler flow _addr = 342 if config.tcp_nodelay then set_tcp_nodelay flow; 343 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size 344 handler flow 345 in 346 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in 347 Eio.Net.run_server socket connection_handler 348 ~max_connections:config.max_connections ~on_error 349 350let run_tls ~sw ~net ?clock ?(config = H1_server.default_config) ~tls_config 351 handler = 352 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in 353 let socket = 354 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr 355 ~reuse_port:config.reuse_port net addr 356 in 357 traceln "HTTP/2 Server (TLS) listening on port %d" config.port; 358 let max_body_size = config.max_body_size in 359 let read_timeout = Some config.read_timeout in 360 let request_timeout = Some config.request_timeout in 361 let connection_handler flow _addr = 362 if config.tcp_nodelay then set_tcp_nodelay flow; 363 match Tls_config.Server.to_tls_config tls_config with 364 | Error (`Msg msg) -> traceln "TLS config error: %s" msg 365 | Ok tls_cfg -> ( 366 try 367 let tls_flow = Tls_eio.server_of_flow tls_cfg flow in 368 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size 369 handler tls_flow 370 with 371 | Tls_eio.Tls_failure failure -> 372 traceln "TLS error: %s" (Tls_config.failure_to_string failure) 373 | exn -> traceln "TLS error: %s" (Printexc.to_string exn)) 374 in 375 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in 376 Eio.Net.run_server socket connection_handler 377 ~max_connections:config.max_connections ~on_error 378 379let run_parallel ~sw ~net ~domain_mgr ?clock 380 ?(config = H1_server.default_config) handler = 381 let domain_count = max 1 config.domain_count in 382 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in 383 let socket = 384 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr 385 ~reuse_port:config.reuse_port net addr 386 in 387 traceln "HTTP/2 Server listening on port %d (%d domains)" config.port 388 domain_count; 389 let max_body_size = config.max_body_size in 390 let read_timeout = Some config.read_timeout in 391 let request_timeout = Some config.request_timeout in 392 let connection_handler flow _addr = 393 if config.tcp_nodelay then set_tcp_nodelay flow; 394 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size 395 handler flow 396 in 397 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in 398 if domain_count <= 1 then 399 Eio.Net.run_server socket connection_handler 400 ~max_connections:config.max_connections ~on_error 401 else 402 Eio.Net.run_server socket connection_handler 403 ~max_connections:config.max_connections ~on_error 404 ~additional_domains:(domain_mgr, domain_count - 1)