ocaml http/1, http/2 and websocket client and server library
at v0.3.3 22 kB view raw
1(** HTTP/1.1 Server implementation using h1. 2 3 This module provides HTTP/1.1 server functionality built on Eio. 4 5 Features: 6 - Lazy body reading: request body is not read until accessed 7 - Zero-copy responses: bigstring bodies avoid copying 8 - Streaming responses: write large responses without buffering 9 - Connection pooling via buffer reuse *) 10 11open Eio.Std 12 13(** {1 Read Buffer Pool} *) 14 15(** Lock-free buffer pool using Treiber stack via Kcas for thread-safe pooling. 16*) 17module Read_buffer_pool : sig 18 val acquire : unit -> Bigstringaf.t * Cstruct.t 19 val release : Bigstringaf.t -> unit 20end = struct 21 let buffer_size = 0x4000 22 let max_pooled = 256 23 let pool : Bigstringaf.t list Kcas.Loc.t = Kcas.Loc.make [] 24 let pool_size : int Kcas.Loc.t = Kcas.Loc.make 0 25 26 let acquire () = 27 let buf = 28 Kcas.Xt.commit 29 { 30 tx = 31 (fun ~xt -> 32 match Kcas.Xt.get ~xt pool with 33 | [] -> None 34 | buf :: rest -> 35 Kcas.Xt.set ~xt pool rest; 36 Kcas.Xt.set ~xt pool_size (Kcas.Xt.get ~xt pool_size - 1); 37 Some buf); 38 } 39 in 40 let buf = 41 match buf with Some b -> b | None -> Bigstringaf.create buffer_size 42 in 43 (buf, Cstruct.of_bigarray buf ~off:0 ~len:buffer_size) 44 45 let release buf = 46 Kcas.Xt.commit 47 { 48 tx = 49 (fun ~xt -> 50 let size = Kcas.Xt.get ~xt pool_size in 51 if size < max_pooled then begin 52 Kcas.Xt.set ~xt pool (buf :: Kcas.Xt.get ~xt pool); 53 Kcas.Xt.set ~xt pool_size (size + 1) 54 end); 55 } 56end 57 58(** {1 Configuration} *) 59 60type config = { 61 (* Network *) 62 host : string; (** Bind address. Default: "0.0.0.0" *) 63 port : int; (** Listen port. Default: 8080 *) 64 backlog : int; (** Listen backlog. Default: 2048 *) 65 max_connections : int; (** Max concurrent connections. Default: 10000 *) 66 (* Parallelism *) 67 domain_count : int; (** Number of domains (CPUs) to use. Default: 1 *) 68 (* Timeouts *) 69 read_timeout : float; (** Read timeout in seconds. Default: 60.0 *) 70 write_timeout : float; (** Write timeout in seconds. Default: 60.0 *) 71 idle_timeout : float; (** Idle connection timeout. Default: 120.0 *) 72 request_timeout : float; (** Request processing timeout. Default: 30.0 *) 73 (* Limits *) 74 max_header_size : int; (** Max header size in bytes. Default: 8192 *) 75 max_body_size : int64 option; 76 (** Max body size. None = unlimited. Default: None *) 77 (* Buffers *) 78 buffer_size : int; (** Read buffer size. Default: 16384 *) 79 (* TLS *) 80 tls : Tls_config.Server.t option; (** TLS config. None = plain HTTP *) 81 (* Socket options *) 82 tcp_nodelay : bool; (** Enable TCP_NODELAY (disable Nagle). Default: true *) 83 reuse_addr : bool; (** Enable SO_REUSEADDR. Default: true *) 84 reuse_port : bool; 85 (** Enable SO_REUSEPORT for multi-process scaling. Default: true *) 86} 87(** Server configuration *) 88 89let default_config = 90 { 91 host = "0.0.0.0"; 92 port = 8080; 93 backlog = 2048; 94 max_connections = 10000; 95 domain_count = 1; 96 read_timeout = 60.0; 97 write_timeout = 60.0; 98 idle_timeout = 120.0; 99 request_timeout = 30.0; 100 max_header_size = 8192; 101 max_body_size = None; 102 buffer_size = 16384; 103 tls = None; 104 tcp_nodelay = true; 105 reuse_addr = true; 106 reuse_port = true; 107 } 108 109(** {2 Config builders} *) 110 111let with_port port config = { config with port } 112let with_host host config = { config with host } 113let with_backlog backlog config = { config with backlog } 114let with_max_connections max config = { config with max_connections = max } 115let with_read_timeout timeout config = { config with read_timeout = timeout } 116let with_write_timeout timeout config = { config with write_timeout = timeout } 117let with_idle_timeout timeout config = { config with idle_timeout = timeout } 118 119let with_request_timeout timeout config = 120 { config with request_timeout = timeout } 121 122let with_domain_count count config = { config with domain_count = count } 123let with_max_header_size size config = { config with max_header_size = size } 124let with_max_body_size size config = { config with max_body_size = Some size } 125let with_buffer_size size config = { config with buffer_size = size } 126let with_tls tls config = { config with tls = Some tls } 127let with_tcp_nodelay enabled config = { config with tcp_nodelay = enabled } 128let with_reuse_addr enabled config = { config with reuse_addr = enabled } 129let with_reuse_port enabled config = { config with reuse_port = enabled } 130 131(** {1 GC Tuning} *) 132 133type gc_config = { 134 minor_heap_size : int; 135 major_heap_increment : int; 136 space_overhead : int; 137 max_overhead : int; 138} 139 140let default_gc_config = 141 { 142 minor_heap_size = 64 * 1024 * 1024; 143 major_heap_increment = 16 * 1024 * 1024; 144 space_overhead = 120; 145 max_overhead = 500; 146 } 147 148let tune_gc ?(config = default_gc_config) () = 149 let ctrl = Gc.get () in 150 Gc.set 151 { 152 ctrl with 153 minor_heap_size = config.minor_heap_size / (Sys.word_size / 8); 154 major_heap_increment = config.major_heap_increment / (Sys.word_size / 8); 155 space_overhead = config.space_overhead; 156 max_overhead = config.max_overhead; 157 } 158 159let gc_tuned = ref false 160 161let ensure_gc_tuned () = 162 if not !gc_tuned then begin 163 tune_gc (); 164 gc_tuned := true 165 end 166 167(** {1 Cached Prebuilt Response} *) 168 169type cached_prebuilt = { 170 base_response : H1.Response.t; 171 body : Bigstringaf.t; 172 cached_response : H1.Response.t Atomic.t; 173 cached_second : int Atomic.t; 174} 175 176let make_cached_prebuilt h1_response body = 177 let now = int_of_float (Unix.gettimeofday ()) in 178 let headers = 179 H1.Headers.add h1_response.H1.Response.headers "Date" (Date_cache.get ()) 180 in 181 let resp = H1.Response.create ~headers h1_response.H1.Response.status in 182 { 183 base_response = h1_response; 184 body; 185 cached_response = Atomic.make resp; 186 cached_second = Atomic.make now; 187 } 188 189let[@inline] get_cached_response cached = 190 let now = int_of_float (Unix.gettimeofday ()) in 191 let last = Atomic.get cached.cached_second in 192 if now <> last then begin 193 let headers = 194 H1.Headers.add cached.base_response.H1.Response.headers "Date" 195 (Date_cache.get ()) 196 in 197 let resp = 198 H1.Response.create ~headers cached.base_response.H1.Response.status 199 in 200 Atomic.set cached.cached_response resp; 201 Atomic.set cached.cached_second now 202 end; 203 Atomic.get cached.cached_response 204 205(** {1 Socket Helpers} *) 206 207let set_tcp_nodelay flow = 208 match Eio_unix.Resource.fd_opt flow with 209 | None -> () 210 | Some fd -> 211 Eio_unix.Fd.use_exn "set_tcp_nodelay" fd (fun unix_fd -> 212 Unix.setsockopt unix_fd Unix.TCP_NODELAY true) 213 214(** {1 Body Types} *) 215 216type body_reader = { 217 read : unit -> string; 218 (** Read the entire body as a string. Can only be called once. *) 219 read_stream : unit -> Cstruct.t option; 220 (** Read body in chunks. Returns None when done. *) 221 close : unit -> unit; (** Close the body reader without reading. *) 222} 223(** Lazy body reader - body is only read when [read] is called *) 224 225(** Response body - supports string, bigstring, streaming, and pre-built *) 226type response_body = 227 | Body_string of string 228 | Body_bigstring of Bigstringaf.t 229 | Body_prebuilt of { h1_response : H1.Response.t; body : Bigstringaf.t } 230 | Body_cached_prebuilt of cached_prebuilt 231 | Body_stream of { 232 content_length : int64 option; 233 next : unit -> Cstruct.t option; 234 } 235 236(** {1 Request and Response Types} *) 237 238type request = { 239 meth : H1.Method.t; 240 target : string; 241 headers : H1.Headers.t; 242 body_reader : body_reader; 243 (** Lazy body reader - call [body_reader.read ()] to get the body *) 244} 245(** Request type with lazy body reading *) 246 247(** Read the request body as a string (convenience function) *) 248let read_body req = req.body_reader.read () 249 250(** Read the request body as a stream (for large bodies) *) 251let read_body_stream req = req.body_reader.read_stream 252 253(** Close the request body without reading (for ignored bodies) *) 254let close_body req = req.body_reader.close () 255 256type response = { 257 status : H1.Status.t; 258 headers : (string * string) list; 259 response_body : response_body; 260} 261(** Response type with optimized body variants *) 262 263type handler = request -> response 264(** Handler type *) 265 266let respond ?(status = `OK) ?(headers = []) body = 267 { status; headers; response_body = Body_string body } 268 269let respond_bigstring ?(status = `OK) ?(headers = []) bstr = 270 { status; headers; response_body = Body_bigstring bstr } 271 272let respond_stream ?(status = `OK) ?(headers = []) ?content_length next = 273 { status; headers; response_body = Body_stream { content_length; next } } 274 275let respond_prebuilt h1_response body = 276 { 277 status = `OK; 278 headers = []; 279 response_body = Body_prebuilt { h1_response; body }; 280 } 281 282let respond_cached_prebuilt cached = 283 { status = `OK; headers = []; response_body = Body_cached_prebuilt cached } 284 285type static_response = response 286 287let make_static_response cached : static_response = 288 { status = `OK; headers = []; response_body = Body_cached_prebuilt cached } 289 290let[@inline always] respond_static (r : static_response) : response = r 291let make_h1_headers headers_list = H1.Headers.of_list headers_list 292 293let make_h1_response ?(status = `OK) headers = 294 H1.Response.create ~headers status 295 296(** {1 Internal Helpers} *) 297 298(** Write all IOVecs to the flow - optimized version *) 299let write_iovecs flow iovecs = 300 match iovecs with 301 | [] -> () 302 | [ iov ] -> 303 (* Fast path for single iovec - common case *) 304 let cs = 305 Cstruct.of_bigarray ~off:iov.Httpun_types.IOVec.off 306 ~len:iov.Httpun_types.IOVec.len iov.Httpun_types.IOVec.buffer 307 in 308 Eio.Flow.write flow [ cs ] 309 | _ -> 310 (* Multiple iovecs - build list directly *) 311 let cstructs = 312 List.map 313 (fun iov -> 314 Cstruct.of_bigarray ~off:iov.Httpun_types.IOVec.off 315 ~len:iov.Httpun_types.IOVec.len iov.Httpun_types.IOVec.buffer) 316 iovecs 317 in 318 Eio.Flow.write flow cstructs 319 320(** Check if method typically has no body *) 321let[@inline] method_has_no_body = function 322 | `GET | `HEAD | `DELETE | `OPTIONS | `CONNECT | `TRACE -> true 323 | `POST | `PUT | `PATCH | `Other _ -> false 324 325exception Body_too_large 326 327let make_body_reader ?max_body_size (h1_body : H1.Body.Reader.t) : body_reader = 328 let read_called = ref false in 329 let closed = ref false in 330 let body_buffer = Buffer.create 4096 in 331 let current_size = ref 0 in 332 let too_large = ref false in 333 let done_promise, done_resolver = Eio.Promise.create () in 334 335 let check_size len = 336 match max_body_size with 337 | Some max when Int64.of_int (!current_size + len) > max -> 338 too_large := true; 339 H1.Body.Reader.close h1_body; 340 Eio.Promise.resolve done_resolver (); 341 false 342 | _ -> 343 current_size := !current_size + len; 344 true 345 in 346 347 let rec schedule_read () = 348 if (not !closed) && not !too_large then 349 H1.Body.Reader.schedule_read h1_body 350 ~on_eof:(fun () -> Eio.Promise.resolve done_resolver ()) 351 ~on_read:(fun buf ~off ~len -> 352 if check_size len then begin 353 Buffer.add_string body_buffer (Bigstringaf.substring buf ~off ~len); 354 schedule_read () 355 end) 356 in 357 358 { 359 read = 360 (fun () -> 361 if !read_called then failwith "Body already read" 362 else begin 363 read_called := true; 364 if !closed then "" 365 else begin 366 schedule_read (); 367 Eio.Promise.await done_promise; 368 if !too_large then raise Body_too_large 369 else Buffer.contents body_buffer 370 end 371 end); 372 read_stream = 373 (fun () -> 374 if !closed || !too_large then None 375 else begin 376 let chunk_promise, chunk_resolver = Eio.Promise.create () in 377 let got_chunk = ref false in 378 H1.Body.Reader.schedule_read h1_body 379 ~on_eof:(fun () -> 380 if not !got_chunk then Eio.Promise.resolve chunk_resolver None) 381 ~on_read:(fun buf ~off ~len -> 382 got_chunk := true; 383 if check_size len then begin 384 let cs = Cstruct.of_bigarray ~off ~len buf in 385 Eio.Promise.resolve chunk_resolver (Some cs) 386 end 387 else Eio.Promise.resolve chunk_resolver None); 388 Eio.Promise.await chunk_promise 389 end); 390 close = 391 (fun () -> 392 if not !closed then begin 393 closed := true; 394 H1.Body.Reader.close h1_body 395 end); 396 } 397 398(** Create a no-op body reader for methods without bodies *) 399let empty_body_reader () : body_reader = 400 { 401 read = (fun () -> ""); 402 read_stream = (fun () -> None); 403 close = (fun () -> ()); 404 } 405 406let respond_413 reqd = 407 let body = "Request Entity Too Large" in 408 let headers = 409 H1.Headers.of_list 410 [ 411 ("Date", Date_cache.get ()); 412 ("Content-Length", string_of_int (String.length body)); 413 ("Connection", "close"); 414 ] 415 in 416 let resp = H1.Response.create ~headers (`Code 413) in 417 H1.Reqd.respond_with_string reqd resp body 418 419let respond_408 reqd = 420 let body = "Request Timeout" in 421 let headers = 422 H1.Headers.of_list 423 [ 424 ("Date", Date_cache.get ()); 425 ("Content-Length", string_of_int (String.length body)); 426 ("Connection", "close"); 427 ] 428 in 429 let resp = H1.Response.create ~headers (`Code 408) in 430 H1.Reqd.respond_with_string reqd resp body 431 432let handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size 433 handler flow = 434 let read_buffer, read_cstruct = Read_buffer_pool.acquire () in 435 Fun.protect ~finally:(fun () -> Read_buffer_pool.release read_buffer) 436 @@ fun () -> 437 let request_handler reqd = 438 let req = H1.Reqd.request reqd in 439 let h1_body = H1.Reqd.request_body reqd in 440 441 let body_reader = 442 if method_has_no_body req.H1.Request.meth then begin 443 H1.Body.Reader.close h1_body; 444 empty_body_reader () 445 end 446 else make_body_reader ?max_body_size h1_body 447 in 448 449 let request = 450 { 451 meth = req.H1.Request.meth; 452 target = req.target; 453 headers = req.headers; 454 body_reader; 455 } 456 in 457 458 let handle_request () = 459 match (clock, request_timeout) with 460 | Some clock, Some timeout -> 461 Eio.Time.with_timeout clock timeout (fun () -> 462 let response = handler request in 463 body_reader.close (); 464 Ok response) 465 | _ -> 466 let response = handler request in 467 body_reader.close (); 468 Ok response 469 in 470 471 match handle_request () with 472 | Error `Timeout -> 473 body_reader.close (); 474 respond_408 reqd 475 | exception Body_too_large -> 476 body_reader.close (); 477 respond_413 reqd 478 | Ok response -> ( 479 let date_header = ("Date", Date_cache.get ()) in 480 let filter_reserved headers = 481 List.filter 482 (fun (k, _) -> 483 let lk = String.lowercase_ascii k in 484 lk <> "content-length" && lk <> "date") 485 headers 486 in 487 488 match response.response_body with 489 | Body_string body -> 490 let content_length = String.length body in 491 let headers = 492 H1.Headers.of_list 493 (date_header 494 :: ("Content-Length", string_of_int content_length) 495 :: filter_reserved response.headers) 496 in 497 let resp = H1.Response.create ~headers response.status in 498 H1.Reqd.respond_with_string reqd resp body 499 | Body_bigstring bstr -> 500 let content_length = Bigstringaf.length bstr in 501 let headers = 502 H1.Headers.of_list 503 (date_header 504 :: ("Content-Length", string_of_int content_length) 505 :: filter_reserved response.headers) 506 in 507 let resp = H1.Response.create ~headers response.status in 508 H1.Reqd.respond_with_bigstring reqd resp bstr 509 | Body_prebuilt { h1_response; body } -> 510 let headers = 511 H1.Headers.add h1_response.headers "Date" (Date_cache.get ()) 512 in 513 let resp = { h1_response with H1.Response.headers } in 514 H1.Reqd.respond_with_bigstring reqd resp body 515 | Body_cached_prebuilt cached -> 516 let resp = get_cached_response cached in 517 H1.Reqd.respond_with_bigstring reqd resp cached.body 518 | Body_stream { content_length; next } -> 519 let headers = 520 match content_length with 521 | Some len -> 522 H1.Headers.of_list 523 (date_header 524 :: ("Content-Length", Int64.to_string len) 525 :: filter_reserved response.headers) 526 | None -> 527 H1.Headers.of_list 528 (date_header 529 :: ("Transfer-Encoding", "chunked") 530 :: filter_reserved response.headers) 531 in 532 let resp = H1.Response.create ~headers response.status in 533 let body_writer = H1.Reqd.respond_with_streaming reqd resp in 534 let rec write_chunks () = 535 match next () with 536 | None -> H1.Body.Writer.close body_writer 537 | Some cs -> 538 H1.Body.Writer.write_bigstring body_writer ~off:0 539 ~len:(Cstruct.length cs) (Cstruct.to_bigarray cs); 540 let flushed, resolve = Eio.Promise.create () in 541 H1.Body.Writer.flush body_writer (fun () -> 542 Eio.Promise.resolve resolve ()); 543 Eio.Promise.await flushed; 544 write_chunks () 545 in 546 write_chunks ()) 547 in 548 549 let error_handler ?request:_ _error start_response = 550 let resp_body = start_response H1.Headers.empty in 551 H1.Body.Writer.write_string resp_body "Internal Server Error"; 552 H1.Body.Writer.close resp_body 553 in 554 555 let conn = H1.Server_connection.create ~error_handler request_handler in 556 557 let shutdown = ref false in 558 559 let do_read () = 560 match (clock, read_timeout) with 561 | Some clock, Some timeout -> 562 Eio.Time.with_timeout clock timeout (fun () -> 563 Ok (Eio.Flow.single_read flow read_cstruct)) 564 | _ -> Ok (Eio.Flow.single_read flow read_cstruct) 565 in 566 567 let rec read_loop () = 568 if not !shutdown then 569 match H1.Server_connection.next_read_operation conn with 570 | `Read -> ( 571 match do_read () with 572 | Error `Timeout -> shutdown := true 573 | exception End_of_file -> 574 let _ = 575 H1.Server_connection.read_eof conn read_buffer ~off:0 ~len:0 576 in 577 shutdown := true 578 | Ok n -> 579 let _ = 580 H1.Server_connection.read conn read_buffer ~off:0 ~len:n 581 in 582 read_loop ()) 583 | `Yield -> H1.Server_connection.yield_reader conn read_loop 584 | `Close | `Upgrade -> shutdown := true 585 in 586 587 let rec write_loop () = 588 if not !shutdown then 589 match H1.Server_connection.next_write_operation conn with 590 | `Write iovecs -> 591 write_iovecs flow iovecs; 592 let len = 593 List.fold_left 594 (fun acc iov -> acc + iov.Httpun_types.IOVec.len) 595 0 iovecs 596 in 597 H1.Server_connection.report_write_result conn (`Ok len); 598 write_loop () 599 | `Yield -> 600 let continue = Eio.Promise.create () in 601 H1.Server_connection.yield_writer conn (fun () -> 602 Eio.Promise.resolve (snd continue) ()); 603 Eio.Promise.await (fst continue); 604 write_loop () 605 | `Upgrade -> shutdown := true 606 | `Close _ -> shutdown := true 607 in 608 609 Fiber.both read_loop write_loop 610 611let run ~sw ~net ?clock ?(config = default_config) handler = 612 ensure_gc_tuned (); 613 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in 614 let socket = 615 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr 616 ~reuse_port:config.reuse_port net addr 617 in 618 traceln "Server listening on port %d" config.port; 619 let max_body_size = config.max_body_size in 620 let read_timeout = Some config.read_timeout in 621 let request_timeout = Some config.request_timeout in 622 let connection_handler flow _addr = 623 if config.tcp_nodelay then set_tcp_nodelay flow; 624 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size 625 handler flow 626 in 627 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in 628 Eio.Net.run_server socket connection_handler 629 ~max_connections:config.max_connections ~on_error 630 631let run_parallel ~sw ~net ~domain_mgr ?clock ?(config = default_config) handler 632 = 633 ensure_gc_tuned (); 634 let domain_count = max 1 config.domain_count in 635 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in 636 let socket = 637 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr 638 ~reuse_port:config.reuse_port net addr 639 in 640 traceln "Server listening on port %d (%d domains)" config.port domain_count; 641 let max_body_size = config.max_body_size in 642 let read_timeout = Some config.read_timeout in 643 let request_timeout = Some config.request_timeout in 644 let connection_handler flow _addr = 645 if config.tcp_nodelay then set_tcp_nodelay flow; 646 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size 647 handler flow 648 in 649 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in 650 if domain_count <= 1 then 651 Eio.Net.run_server socket connection_handler 652 ~max_connections:config.max_connections ~on_error 653 else 654 Eio.Net.run_server socket connection_handler 655 ~max_connections:config.max_connections ~on_error 656 ~additional_domains:(domain_mgr, domain_count - 1)