ocaml http/1, http/2 and websocket client and server library
at v0.3.3 37 kB view raw
1(** Unified HTTP Server - High-performance server supporting HTTP/1.1, HTTP/2, 2 and WebSocket. 3 4 This module consolidates all server functionality into a single, 5 high-performance implementation. Protocol detection overhead is optional and 6 controlled via the [protocol] configuration. 7 8 {2 Protocol Modes} 9 10 - [Http1_only]: Fastest path. No protocol detection, direct HTTP/1.1 11 handling. 12 - [Http2_only]: HTTP/2 only. h2c (cleartext) or h2 (over TLS). 13 - [Auto]: Auto-detect protocol via connection preface peek (h2c) or ALPN 14 (TLS). 15 - [Auto_websocket]: Auto-detect with WebSocket upgrade support. 16 17 {2 Performance Features} 18 19 - GC tuning for high-throughput scenarios 20 - Cached Date headers (1-second resolution) 21 - Pre-built responses for zero-allocation hot paths 22 - Zero-copy bigstring responses 23 - Streaming response support 24 - Multi-domain parallelism via Eio 25 26 {2 Example} 27 28 {[ 29 (* Fastest HTTP/1.1 server *) 30 let handler req = Server.respond "Hello, World!" in 31 Server.run ~sw ~net handler 32 33 (* Multi-protocol with auto-detection *) 34 let config = Server.{ default_config with protocol = Auto } in 35 Server.run ~sw ~net ~config handler 36 ]} *) 37 38open Eio.Std 39 40(** {1 Protocol Configuration} *) 41 42(** Protocol mode controls detection overhead and supported protocols. *) 43type protocol = 44 | Http1_only 45 (** Fastest: No protocol detection, direct HTTP/1.1 handling. With TLS: 46 HTTP/1.1 over TLS (no ALPN negotiation). *) 47 | Http2_only 48 (** HTTP/2 only. Without TLS: h2c (HTTP/2 cleartext). With TLS: h2 with 49 ALPN advertising only "h2". *) 50 | Auto 51 (** Auto-detect protocol. Without TLS: Peek for "PRI " preface (h2c), 52 fallback to HTTP/1.1. With TLS: ALPN negotiation. *) 53 | Auto_websocket 54 (** Auto-detect with WebSocket support. Same as [Auto] but also handles 55 WebSocket upgrade requests in HTTP/1.1 mode. *) 56 57(** {1 GC Tuning} *) 58 59module Gc_tune = struct 60 type config = { 61 minor_heap_size : int; (** Minor heap size in bytes. Default: 64MB *) 62 major_heap_increment : int; 63 (** Major heap increment in bytes. Default: 32MB *) 64 space_overhead : int; (** Space overhead percentage. Default: 200 *) 65 max_overhead : int; (** Max overhead percentage. Default: 500 *) 66 } 67 68 let default = 69 { 70 minor_heap_size = 64 * 1024 * 1024; 71 major_heap_increment = 32 * 1024 * 1024; 72 space_overhead = 200; 73 max_overhead = 500; 74 } 75 76 let aggressive = 77 { 78 minor_heap_size = 128 * 1024 * 1024; 79 major_heap_increment = 64 * 1024 * 1024; 80 space_overhead = 400; 81 max_overhead = 1000; 82 } 83 84 let tuned = ref false 85 86 let apply ?(config = default) () = 87 if not !tuned then begin 88 let ctrl = Gc.get () in 89 Gc.set 90 { 91 ctrl with 92 minor_heap_size = config.minor_heap_size / (Sys.word_size / 8); 93 major_heap_increment = 94 config.major_heap_increment / (Sys.word_size / 8); 95 space_overhead = config.space_overhead; 96 max_overhead = config.max_overhead; 97 }; 98 tuned := true 99 end 100end 101 102module Prebuilt = Response.Prebuilt 103 104(** {1 Types} *) 105 106type config = { 107 (* Network *) 108 host : string; (** Bind address. Default: "0.0.0.0" *) 109 port : int; (** Listen port. Default: 8080 *) 110 backlog : int; (** Listen backlog. Default: 4096 *) 111 max_connections : int; (** Max concurrent connections. Default: 100000 *) 112 (* Parallelism *) 113 domain_count : int; (** Number of domains (CPUs) to use. Default: 1 *) 114 (* Protocol *) 115 protocol : protocol; (** Protocol mode. Default: Http1_only *) 116 (* Timeouts *) 117 read_timeout : float; (** Read timeout in seconds. Default: 60.0 *) 118 write_timeout : float; (** Write timeout in seconds. Default: 60.0 *) 119 idle_timeout : float; (** Idle connection timeout. Default: 120.0 *) 120 request_timeout : float; (** Request processing timeout. Default: 30.0 *) 121 (* Limits *) 122 max_header_size : int; (** Max header size in bytes. Default: 8192 *) 123 max_body_size : int64 option; 124 (** Max body size. None = unlimited. Default: None *) 125 (* Buffers *) 126 buffer_size : int; (** Read buffer size. Default: 16384 *) 127 (* Socket options *) 128 tcp_nodelay : bool; (** Set TCP_NODELAY on connections. Default: true *) 129 reuse_addr : bool; (** Set SO_REUSEADDR on listener. Default: true *) 130 reuse_port : bool; (** Set SO_REUSEPORT on listener. Default: true *) 131 (* TLS *) 132 tls : Tls_config.Server.t option; (** TLS config. None = plain HTTP *) 133 (* Performance *) 134 gc_tuning : Gc_tune.config option; 135 (** GC tuning config. Some = apply tuning. Default: Some Gc_tune.default 136 *) 137} 138(** Server configuration. *) 139 140(** Default configuration optimized for HTTP/1.1 performance. *) 141let default_config = 142 { 143 host = "0.0.0.0"; 144 port = 8080; 145 backlog = 4096; 146 max_connections = 100000; 147 domain_count = 1; 148 protocol = Http1_only; 149 read_timeout = 60.0; 150 write_timeout = 60.0; 151 idle_timeout = 120.0; 152 request_timeout = 30.0; 153 max_header_size = 8192; 154 max_body_size = None; 155 buffer_size = 16384; 156 tcp_nodelay = true; 157 reuse_addr = true; 158 reuse_port = false; 159 tls = None; 160 gc_tuning = Some Gc_tune.default; 161 } 162 163(** Configuration for auto-detection mode. *) 164let auto_config = { default_config with protocol = Auto } 165 166(** Configuration for WebSocket support. *) 167let websocket_config = { default_config with protocol = Auto_websocket } 168 169(** {2 Config Builders} *) 170 171let with_port port config = { config with port } 172let with_host host config = { config with host } 173let with_backlog backlog config = { config with backlog } 174let with_max_connections max config = { config with max_connections = max } 175let with_domain_count count config = { config with domain_count = count } 176let with_protocol protocol config = { config with protocol } 177let with_read_timeout timeout config = { config with read_timeout = timeout } 178let with_write_timeout timeout config = { config with write_timeout = timeout } 179let with_idle_timeout timeout config = { config with idle_timeout = timeout } 180 181let with_request_timeout timeout config = 182 { config with request_timeout = timeout } 183 184let with_max_header_size size config = { config with max_header_size = size } 185let with_max_body_size size config = { config with max_body_size = Some size } 186let with_buffer_size size config = { config with buffer_size = size } 187let with_tcp_nodelay enabled config = { config with tcp_nodelay = enabled } 188let with_tls tls config = { config with tls = Some tls } 189let with_gc_tuning gc config = { config with gc_tuning = Some gc } 190let without_gc_tuning config = { config with gc_tuning = None } 191 192(** {1 Request/Response Types} *) 193 194(** Protocol version indicator. *) 195type protocol_version = HTTP_1_1 | HTTP_2 196 197type request = { 198 meth : H1.Method.t; (** HTTP method *) 199 target : string; (** Request target (path + query) *) 200 headers : (string * string) list; (** Headers as association list *) 201 body : string; (** Request body (empty for GET/HEAD) *) 202 version : protocol_version; (** Protocol version *) 203} 204(** Request type exposed to handlers. *) 205 206type response = Response.t 207type handler = request -> response 208type ws_handler = Websocket.t -> unit 209 210type ws_config = { 211 origin_policy : Websocket.origin_policy; 212 max_payload_size : int; 213} 214 215let default_ws_config = 216 { 217 origin_policy = Websocket.Allow_all; 218 max_payload_size = Websocket.default_max_payload_size; 219 } 220 221let respond ?(status = `OK) ?(headers = []) body = 222 Response.make ~status ~headers body 223 224let respond_bigstring = Response.bigstring 225let respond_prebuilt = Response.prebuilt 226let respond_empty = Response.empty 227let respond_text = Response.text 228let respond_html = Response.html 229let respond_json = Response.json 230 231(** {1 Internal: Socket Helpers} *) 232 233let set_tcp_nodelay flow = 234 match Eio_unix.Resource.fd_opt flow with 235 | None -> () 236 | Some fd -> 237 Eio_unix.Fd.use_exn "set_tcp_nodelay" fd (fun unix_fd -> 238 Unix.setsockopt unix_fd Unix.TCP_NODELAY true) 239 240let shutdown_flow flow cmd = 241 try Eio.Flow.shutdown flow cmd with 242 | Unix.Unix_error (Unix.ENOTCONN, _, _) -> () 243 | Eio.Io (Eio.Exn.X (Eio_unix.Unix_error (Unix.ENOTCONN, _, _)), _) -> () 244 245let[@inline] writev flow iovecs = 246 match iovecs with 247 | [] -> `Ok 0 248 | [ { Httpun_types.IOVec.buffer; off; len } ] -> ( 249 let cs = Cstruct.of_bigarray buffer ~off ~len in 250 match Eio.Flow.write flow [ cs ] with 251 | () -> `Ok len 252 | exception End_of_file -> `Closed) 253 | _ -> ( 254 let lenv, cstructs = 255 List.fold_left_map 256 (fun acc { Httpun_types.IOVec.buffer; off; len } -> 257 (acc + len, Cstruct.of_bigarray buffer ~off ~len)) 258 0 iovecs 259 in 260 match Eio.Flow.write flow cstructs with 261 | () -> `Ok lenv 262 | exception End_of_file -> `Closed) 263 264(** {1 Internal: Header Conversion} *) 265 266let h1_headers_to_list headers = 267 let result = ref [] in 268 H1.Headers.iter 269 ~f:(fun name value -> result := (name, value) :: !result) 270 headers; 271 List.rev !result 272 273let h2_headers_to_list headers = 274 let result = ref [] in 275 H2.Headers.iter 276 ~f:(fun name value -> result := (name, value) :: !result) 277 headers; 278 List.rev !result 279 280(** {1 Internal: Protocol Detection} *) 281 282(** HTTP/2 connection preface starts with "PRI " *) 283let h2_preface_prefix = "PRI " 284 285let h2_preface_prefix_len = 4 286 287let peek_bytes flow n = 288 let buf = Cstruct.create n in 289 try 290 let read = Eio.Flow.single_read flow buf in 291 Ok (Cstruct.to_string (Cstruct.sub buf 0 read)) 292 with 293 | End_of_file -> Error `Eof 294 | exn -> Error (`Exn exn) 295 296let is_h2_preface data = 297 String.length data >= h2_preface_prefix_len 298 && String.sub data 0 h2_preface_prefix_len = h2_preface_prefix 299 300(** {1 Internal: HTTP/1.1 Connection Handler} *) 301 302module H1_handler = struct 303 let send_response reqd (response : Response.t) = 304 let date_header = ("date", Date_cache.get ()) in 305 match response.Response.body with 306 | Response.Prebuilt_body prebuilt -> Prebuilt.respond_h1 reqd prebuilt 307 | Response.Empty -> 308 let headers = 309 H1.Headers.of_list 310 (date_header :: ("content-length", "0") :: response.headers) 311 in 312 let resp = H1.Response.create ~headers response.status in 313 H1.Reqd.respond_with_string reqd resp "" 314 | Response.String body -> 315 let headers = 316 H1.Headers.of_list 317 (date_header 318 :: ("content-length", string_of_int (String.length body)) 319 :: response.headers) 320 in 321 let resp = H1.Response.create ~headers response.status in 322 H1.Reqd.respond_with_string reqd resp body 323 | Response.Bigstring body -> 324 let headers = 325 H1.Headers.of_list 326 (date_header 327 :: ("content-length", string_of_int (Bigstringaf.length body)) 328 :: response.headers) 329 in 330 let resp = H1.Response.create ~headers response.status in 331 H1.Reqd.respond_with_bigstring reqd resp body 332 | Response.Cstruct cs -> 333 let len = Cstruct.length cs in 334 let headers = 335 H1.Headers.of_list 336 (date_header 337 :: ("content-length", string_of_int len) 338 :: response.headers) 339 in 340 let resp = H1.Response.create ~headers response.status in 341 let body_writer = H1.Reqd.respond_with_streaming reqd resp in 342 H1.Body.Writer.write_bigstring body_writer ~off:cs.off ~len cs.buffer; 343 H1.Body.Writer.close body_writer 344 | Response.Stream { content_length; next } -> 345 let headers = 346 match content_length with 347 | Some len -> 348 H1.Headers.of_list 349 (date_header 350 :: ("content-length", Int64.to_string len) 351 :: response.headers) 352 | None -> 353 H1.Headers.of_list 354 (date_header 355 :: ("transfer-encoding", "chunked") 356 :: response.headers) 357 in 358 let resp = H1.Response.create ~headers response.status in 359 let body_writer = H1.Reqd.respond_with_streaming reqd resp in 360 let rec write_chunks () = 361 match next () with 362 | None -> H1.Body.Writer.close body_writer 363 | Some cs -> 364 H1.Body.Writer.write_bigstring body_writer ~off:0 365 ~len:(Cstruct.length cs) (Cstruct.to_bigarray cs); 366 H1.Body.Writer.flush body_writer (fun () -> ()); 367 write_chunks () 368 in 369 write_chunks () 370 371 let send_error_response reqd status body_str = 372 let headers = 373 H1.Headers.of_list 374 [ 375 ("date", Date_cache.get ()); 376 ("content-length", string_of_int (String.length body_str)); 377 ] 378 in 379 let resp = H1.Response.create ~headers status in 380 H1.Reqd.respond_with_string reqd resp body_str 381 382 let handle_request ~handler reqd req body = 383 let request = 384 { 385 meth = req.H1.Request.meth; 386 target = req.target; 387 headers = h1_headers_to_list req.headers; 388 body; 389 version = HTTP_1_1; 390 } 391 in 392 let response = handler request in 393 send_response reqd response 394 395 let handle ~handler ~ws_handler ?(ws_config = default_ws_config) 396 ?max_body_size ~initial_data flow = 397 let buffer_size = 16384 in 398 let read_buffer = Bigstringaf.create buffer_size in 399 let read_cstruct = 400 Cstruct.of_bigarray read_buffer ~off:0 ~len:buffer_size 401 in 402 403 let pending_data = ref initial_data in 404 let ws_upgrade = ref None in 405 406 let request_handler reqd = 407 let req = H1.Reqd.request reqd in 408 let h1_body = H1.Reqd.request_body reqd in 409 410 if Option.is_some ws_handler && Websocket.is_upgrade_request req.headers 411 then begin 412 match 413 Websocket.validate_origin ~policy:ws_config.origin_policy req.headers 414 with 415 | Error reason -> 416 H1.Body.Reader.close h1_body; 417 send_error_response reqd `Forbidden ("Forbidden: " ^ reason) 418 | Ok () -> ( 419 match Websocket.validate_websocket_version req.headers with 420 | Error _reason -> 421 H1.Body.Reader.close h1_body; 422 let headers = 423 H1.Headers.of_list 424 [ 425 ("date", Date_cache.get ()); 426 ("content-length", "16"); 427 ( "sec-websocket-version", 428 Websocket.supported_websocket_version ); 429 ] 430 in 431 let resp = H1.Response.create ~headers (`Code 426) in 432 H1.Reqd.respond_with_string reqd resp "Upgrade Required" 433 | Ok () -> ( 434 match Websocket.get_websocket_key req.headers with 435 | Some key -> 436 ws_upgrade := Some key; 437 H1.Body.Reader.close h1_body; 438 let accept = Websocket.compute_accept_key key in 439 let headers = 440 H1.Headers.of_list 441 [ 442 ("upgrade", "websocket"); 443 ("connection", "Upgrade"); 444 ("sec-websocket-accept", accept); 445 ] 446 in 447 H1.Reqd.respond_with_upgrade reqd headers 448 | None -> 449 H1.Body.Reader.close h1_body; 450 send_error_response reqd `Bad_request "Bad Request")) 451 end 452 else begin 453 match req.meth with 454 | `GET | `HEAD | `DELETE | `OPTIONS | `CONNECT | `TRACE -> 455 H1.Body.Reader.close h1_body; 456 handle_request ~handler reqd req "" 457 | `POST | `PUT | `Other _ -> 458 let body_buffer = Buffer.create 4096 in 459 let body_size = ref 0 in 460 let rec read_body () = 461 H1.Body.Reader.schedule_read h1_body 462 ~on_eof:(fun () -> 463 let body = Buffer.contents body_buffer in 464 handle_request ~handler reqd req body) 465 ~on_read:(fun buf ~off ~len -> 466 let new_size = !body_size + len in 467 match max_body_size with 468 | Some max when Int64.of_int new_size > max -> 469 H1.Body.Reader.close h1_body; 470 send_error_response reqd (`Code 413) 471 "Request body too large" 472 | _ -> 473 body_size := new_size; 474 Buffer.add_string body_buffer 475 (Bigstringaf.substring buf ~off ~len); 476 read_body ()) 477 in 478 read_body () 479 end 480 in 481 482 let error_handler ?request:_ _error start_response = 483 let resp_body = start_response H1.Headers.empty in 484 H1.Body.Writer.write_string resp_body "Internal Server Error"; 485 H1.Body.Writer.close resp_body 486 in 487 488 let conn = H1.Server_connection.create ~error_handler request_handler in 489 let shutdown = ref false in 490 491 let rec read_loop () = 492 if not !shutdown then 493 match H1.Server_connection.next_read_operation conn with 494 | `Read -> 495 let socket_data = 496 match Eio.Flow.single_read flow read_cstruct with 497 | n -> Cstruct.to_string (Cstruct.sub read_cstruct 0 n) 498 | exception End_of_file -> "" 499 in 500 let data = 501 if String.length !pending_data > 0 then begin 502 let combined = !pending_data ^ socket_data in 503 pending_data := ""; 504 combined 505 end 506 else socket_data 507 in 508 let len = String.length data in 509 if len = 0 then begin 510 let (_ : int) = 511 H1.Server_connection.read_eof conn read_buffer ~off:0 ~len:0 512 in 513 shutdown := true 514 end 515 else begin 516 Bigstringaf.blit_from_string data ~src_off:0 read_buffer 517 ~dst_off:0 ~len; 518 let (_ : int) = 519 H1.Server_connection.read conn read_buffer ~off:0 ~len 520 in 521 read_loop () 522 end 523 | `Yield -> H1.Server_connection.yield_reader conn read_loop 524 | `Close -> shutdown := true 525 | `Upgrade -> () 526 in 527 528 let rec write_loop () = 529 match H1.Server_connection.next_write_operation conn with 530 | `Write iovecs -> 531 let write_result = writev flow iovecs in 532 H1.Server_connection.report_write_result conn write_result; 533 write_loop () 534 | `Yield -> 535 if not !shutdown then begin 536 let continue = Eio.Promise.create () in 537 H1.Server_connection.yield_writer conn (fun () -> 538 Eio.Promise.resolve (snd continue) ()); 539 Eio.Promise.await (fst continue); 540 write_loop () 541 end 542 | `Close _ -> 543 shutdown := true; 544 shutdown_flow flow `Send 545 | `Upgrade -> () 546 in 547 548 Fiber.both read_loop write_loop; 549 550 (* Handle WebSocket upgrade if requested *) 551 match !ws_upgrade with 552 | Some _key -> ( 553 match ws_handler with 554 | Some ws_h -> 555 let ws = 556 { 557 Websocket.flow :> Eio.Flow.two_way_ty Eio.Std.r; 558 closed = false; 559 is_client = false; 560 read_buf = Buffer.create 4096; 561 max_payload_size = ws_config.max_payload_size; 562 } 563 in 564 (try ws_h ws with _ -> ()); 565 if Websocket.is_open ws then Websocket.close ws 566 | None -> ()) 567 | None -> () 568 569 let handle_direct ?max_body_size ~handler flow = 570 let buffer_size = 16384 in 571 let read_buffer = Bigstringaf.create buffer_size in 572 let read_cstruct = 573 Cstruct.of_bigarray read_buffer ~off:0 ~len:buffer_size 574 in 575 576 let request_handler reqd = 577 let req = H1.Reqd.request reqd in 578 let h1_body = H1.Reqd.request_body reqd in 579 580 match req.meth with 581 | `GET | `HEAD | `DELETE | `OPTIONS | `CONNECT | `TRACE -> 582 H1.Body.Reader.close h1_body; 583 handle_request ~handler reqd req "" 584 | `POST | `PUT | `Other _ -> 585 let body_buffer = Buffer.create 4096 in 586 let body_size = ref 0 in 587 let rec read_body () = 588 H1.Body.Reader.schedule_read h1_body 589 ~on_eof:(fun () -> 590 let body = Buffer.contents body_buffer in 591 handle_request ~handler reqd req body) 592 ~on_read:(fun buf ~off ~len -> 593 let new_size = !body_size + len in 594 match max_body_size with 595 | Some max when Int64.of_int new_size > max -> 596 H1.Body.Reader.close h1_body; 597 send_error_response reqd (`Code 413) 598 "Request body too large" 599 | _ -> 600 body_size := new_size; 601 Buffer.add_string body_buffer 602 (Bigstringaf.substring buf ~off ~len); 603 read_body ()) 604 in 605 read_body () 606 in 607 608 let error_handler ?request:_ _error start_response = 609 let resp_body = start_response H1.Headers.empty in 610 H1.Body.Writer.write_string resp_body "Internal Server Error"; 611 H1.Body.Writer.close resp_body 612 in 613 614 let conn = H1.Server_connection.create ~error_handler request_handler in 615 let shutdown = ref false in 616 617 let rec read_loop () = 618 if not !shutdown then 619 match H1.Server_connection.next_read_operation conn with 620 | `Read -> ( 621 match Eio.Flow.single_read flow read_cstruct with 622 | n -> 623 let (_ : int) = 624 H1.Server_connection.read conn read_buffer ~off:0 ~len:n 625 in 626 read_loop () 627 | exception End_of_file -> 628 let (_ : int) = 629 H1.Server_connection.read_eof conn read_buffer ~off:0 ~len:0 630 in 631 shutdown := true) 632 | `Yield -> H1.Server_connection.yield_reader conn read_loop 633 | `Close | `Upgrade -> shutdown := true 634 in 635 636 let rec write_loop () = 637 if not !shutdown then 638 match H1.Server_connection.next_write_operation conn with 639 | `Write iovecs -> 640 let write_result = writev flow iovecs in 641 H1.Server_connection.report_write_result conn write_result; 642 write_loop () 643 | `Yield -> 644 let continue = Eio.Promise.create () in 645 H1.Server_connection.yield_writer conn (fun () -> 646 Eio.Promise.resolve (snd continue) ()); 647 Eio.Promise.await (fst continue); 648 write_loop () 649 | `Close _ -> 650 shutdown := true; 651 shutdown_flow flow `Send 652 | `Upgrade -> shutdown := true 653 in 654 655 Fiber.both read_loop write_loop 656end 657 658module H2_handler = struct 659 let send_h2_response reqd (response : Response.t) = 660 let h2_status = (response.Response.status :> H2.Status.t) in 661 match response.Response.body with 662 | Response.Prebuilt_body prebuilt -> Prebuilt.respond_h2 reqd prebuilt 663 | Response.Empty -> 664 let headers = 665 H2.Headers.of_list (("content-length", "0") :: response.headers) 666 in 667 let resp = H2.Response.create ~headers h2_status in 668 H2.Reqd.respond_with_string reqd resp "" 669 | Response.String body -> 670 let headers = 671 H2.Headers.of_list 672 (("content-length", string_of_int (String.length body)) 673 :: response.headers) 674 in 675 let resp = H2.Response.create ~headers h2_status in 676 H2.Reqd.respond_with_string reqd resp body 677 | Response.Bigstring body -> 678 let headers = 679 H2.Headers.of_list 680 (("content-length", string_of_int (Bigstringaf.length body)) 681 :: response.headers) 682 in 683 let resp = H2.Response.create ~headers h2_status in 684 H2.Reqd.respond_with_bigstring reqd resp body 685 | Response.Cstruct cs -> 686 let len = Cstruct.length cs in 687 let headers = 688 H2.Headers.of_list 689 (("content-length", string_of_int len) :: response.headers) 690 in 691 let resp = H2.Response.create ~headers h2_status in 692 let body_writer = H2.Reqd.respond_with_streaming reqd resp in 693 H2.Body.Writer.write_bigstring body_writer ~off:cs.off ~len cs.buffer; 694 H2.Body.Writer.close body_writer 695 | Response.Stream { content_length; next } -> 696 let headers = 697 match content_length with 698 | Some len -> 699 H2.Headers.of_list 700 (("content-length", Int64.to_string len) :: response.headers) 701 | None -> H2.Headers.of_list response.headers 702 in 703 let resp = H2.Response.create ~headers h2_status in 704 let body_writer = H2.Reqd.respond_with_streaming reqd resp in 705 let rec write_chunks () = 706 match next () with 707 | None -> H2.Body.Writer.close body_writer 708 | Some cs -> 709 H2.Body.Writer.write_bigstring body_writer ~off:cs.off 710 ~len:(Cstruct.length cs) cs.buffer; 711 H2.Body.Writer.flush body_writer (fun _result -> ()); 712 write_chunks () 713 in 714 write_chunks () 715 716 let handle_h2_request ~handler reqd req body = 717 let target = 718 match H2.Headers.get req.H2.Request.headers ":path" with 719 | Some p -> p 720 | None -> "/" 721 in 722 let request = 723 { 724 meth = req.meth; 725 target; 726 headers = h2_headers_to_list req.headers; 727 body; 728 version = HTTP_2; 729 } 730 in 731 let response = handler request in 732 send_h2_response reqd response 733 734 let handle ~handler ~initial_data flow = 735 let read_buffer_size = 0x4000 in 736 let read_buffer = Bigstringaf.create read_buffer_size in 737 let pending_data = ref initial_data in 738 739 let request_handler reqd = 740 let req = H2.Reqd.request reqd in 741 let body_reader = H2.Reqd.request_body reqd in 742 743 match req.meth with 744 | `GET | `HEAD -> 745 H2.Body.Reader.close body_reader; 746 handle_h2_request ~handler reqd req "" 747 | _ -> 748 let body_buffer = Buffer.create 4096 in 749 let rec read_body () = 750 H2.Body.Reader.schedule_read body_reader 751 ~on_eof:(fun () -> 752 let body = Buffer.contents body_buffer in 753 handle_h2_request ~handler reqd req body) 754 ~on_read:(fun buf ~off ~len -> 755 Buffer.add_string body_buffer 756 (Bigstringaf.substring buf ~off ~len); 757 read_body ()) 758 in 759 read_body () 760 in 761 762 let error_handler ?request:_ _error start_response = 763 let resp_body = start_response H2.Headers.empty in 764 H2.Body.Writer.write_string resp_body "Internal Server Error"; 765 H2.Body.Writer.close resp_body 766 in 767 768 let conn = H2.Server_connection.create ~error_handler request_handler in 769 let shutdown = ref false in 770 771 let read_loop () = 772 let buf_off = ref 0 in 773 let buf_len = ref 0 in 774 775 let compress () = 776 if !buf_len > 0 && !buf_off > 0 then begin 777 Bigstringaf.blit read_buffer ~src_off:!buf_off read_buffer ~dst_off:0 778 ~len:!buf_len; 779 buf_off := 0 780 end 781 else if !buf_len = 0 then buf_off := 0 782 in 783 784 let rec loop () = 785 if not !shutdown then 786 match H2.Server_connection.next_read_operation conn with 787 | `Read -> 788 compress (); 789 let available = read_buffer_size - !buf_off - !buf_len in 790 if available > 0 then begin 791 let cs = 792 Cstruct.of_bigarray read_buffer ~off:(!buf_off + !buf_len) 793 ~len:available 794 in 795 let n = 796 try Eio.Flow.single_read flow cs with End_of_file -> 0 797 in 798 if n = 0 then begin 799 let _ = 800 H2.Server_connection.read_eof conn read_buffer ~off:!buf_off 801 ~len:!buf_len 802 in 803 shutdown := true 804 end 805 else begin 806 buf_len := !buf_len + n; 807 let consumed = 808 H2.Server_connection.read conn read_buffer ~off:!buf_off 809 ~len:!buf_len 810 in 811 buf_off := !buf_off + consumed; 812 buf_len := !buf_len - consumed; 813 loop () 814 end 815 end 816 else loop () 817 | `Close -> shutdown := true 818 | `Yield -> 819 let continue = Eio.Promise.create () in 820 H2.Server_connection.yield_reader conn (fun () -> 821 Eio.Promise.resolve (snd continue) ()); 822 Eio.Promise.await (fst continue); 823 loop () 824 in 825 826 if String.length !pending_data > 0 then begin 827 let len = String.length !pending_data in 828 Bigstringaf.blit_from_string !pending_data ~src_off:0 read_buffer 829 ~dst_off:0 ~len; 830 buf_len := len; 831 pending_data := ""; 832 let consumed = H2.Server_connection.read conn read_buffer ~off:0 ~len in 833 buf_off := consumed; 834 buf_len := len - consumed 835 end; 836 loop () 837 in 838 839 let write_loop () = 840 let rec loop () = 841 if not !shutdown then 842 match H2.Server_connection.next_write_operation conn with 843 | `Write iovecs -> 844 let cstructs = 845 List.map 846 (fun iov -> 847 Cstruct.of_bigarray ~off:iov.H2.IOVec.off 848 ~len:iov.H2.IOVec.len iov.H2.IOVec.buffer) 849 iovecs 850 in 851 let len = 852 List.fold_left (fun acc iov -> acc + iov.H2.IOVec.len) 0 iovecs 853 in 854 Eio.Flow.write flow cstructs; 855 H2.Server_connection.report_write_result conn (`Ok len); 856 loop () 857 | `Yield -> 858 let continue = Eio.Promise.create () in 859 H2.Server_connection.yield_writer conn (fun () -> 860 Eio.Promise.resolve (snd continue) ()); 861 Eio.Promise.await (fst continue); 862 loop () 863 | `Close _ -> shutdown := true 864 in 865 loop () 866 in 867 868 Fiber.both read_loop write_loop 869 870 (** Direct H2 handler - no protocol detection *) 871 let handle_direct ~handler flow = handle ~handler ~initial_data:"" flow 872end 873 874(** {1 Internal: TLS Connection Handler} *) 875 876module Tls_handler = struct 877 let handle ~config ~handler ~ws_handler tls_cfg flow = 878 try 879 let tls_flow = Tls_eio.server_of_flow tls_cfg flow in 880 let max_body_size = config.max_body_size in 881 match config.protocol with 882 | Http1_only -> H1_handler.handle_direct ?max_body_size ~handler tls_flow 883 | Http2_only -> H2_handler.handle_direct ~handler tls_flow 884 | Auto | Auto_websocket -> ( 885 match Tls_config.negotiated_protocol tls_flow with 886 | Some Tls_config.HTTP_2 -> H2_handler.handle_direct ~handler tls_flow 887 | Some Tls_config.HTTP_1_1 | None -> 888 if config.protocol = Auto_websocket then 889 H1_handler.handle ~handler ~ws_handler:(Some ws_handler) 890 ?max_body_size ~initial_data:"" tls_flow 891 else H1_handler.handle_direct ?max_body_size ~handler tls_flow) 892 with 893 | Tls_eio.Tls_failure failure -> 894 traceln "TLS error: %s" (Tls_config.failure_to_string failure) 895 | exn -> traceln "Connection error: %s" (Printexc.to_string exn) 896end 897 898(** {1 Internal: Connection Handler} *) 899 900let handle_connection ~config ~handler ~ws_handler flow = 901 let max_body_size = config.max_body_size in 902 match config.protocol with 903 | Http1_only -> H1_handler.handle_direct ?max_body_size ~handler flow 904 | Http2_only -> H2_handler.handle_direct ~handler flow 905 | Auto | Auto_websocket -> ( 906 match peek_bytes flow h2_preface_prefix_len with 907 | Error `Eof -> () 908 | Error (`Exn exn) -> 909 traceln "Connection error: %s" (Printexc.to_string exn) 910 | Ok initial_data -> 911 if is_h2_preface initial_data then 912 H2_handler.handle ~handler ~initial_data flow 913 else if config.protocol = Auto_websocket then 914 H1_handler.handle ~handler ~ws_handler:(Some ws_handler) 915 ?max_body_size ~initial_data flow 916 else 917 H1_handler.handle ~handler ~ws_handler:None ?max_body_size 918 ~initial_data flow) 919 920(** {1 Public API} *) 921 922(** Run an HTTP server. 923 924 @param sw Switch for resource management 925 @param net Eio network capability 926 @param config Server configuration (default: [default_config]) 927 @param ws_handler WebSocket handler (required for [Auto_websocket] mode) 928 @param handler Request handler *) 929let run ~sw ~net ?(config = default_config) ?ws_handler handler = 930 (* Apply GC tuning if configured *) 931 (match config.gc_tuning with 932 | Some gc_config -> Gc_tune.apply ~config:gc_config () 933 | None -> ()); 934 935 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in 936 let socket = 937 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr 938 ~reuse_port:config.reuse_port net addr 939 in 940 941 let protocol_str = 942 match (config.protocol, config.tls) with 943 | Http1_only, None -> "HTTP/1.1" 944 | Http1_only, Some _ -> "HTTP/1.1 (TLS)" 945 | Http2_only, None -> "HTTP/2 h2c" 946 | Http2_only, Some _ -> "HTTP/2 (TLS)" 947 | Auto, None -> "HTTP/1.1 + HTTP/2 h2c" 948 | Auto, Some _ -> "HTTP/1.1 + HTTP/2 (TLS, ALPN)" 949 | Auto_websocket, None -> "HTTP/1.1 + HTTP/2 h2c + WebSocket" 950 | Auto_websocket, Some _ -> "HTTP/1.1 + HTTP/2 + WebSocket (TLS, ALPN)" 951 in 952 traceln "Server listening on port %d (%s)" config.port protocol_str; 953 954 (* Validate ws_handler for Auto_websocket mode *) 955 let ws_handler = 956 match (config.protocol, ws_handler) with 957 | Auto_websocket, None -> 958 failwith "WebSocket handler required for Auto_websocket mode" 959 | Auto_websocket, Some h -> h 960 | _, _ -> fun _ -> () (* Dummy handler for non-WS modes *) 961 in 962 963 let connection_handler flow _addr = 964 if config.tcp_nodelay then set_tcp_nodelay flow; 965 match config.tls with 966 | None -> handle_connection ~config ~handler ~ws_handler flow 967 | Some tls_config -> ( 968 match Tls_config.Server.to_tls_config tls_config with 969 | Error (`Msg msg) -> traceln "TLS config error: %s" msg 970 | Ok tls_cfg -> 971 Tls_handler.handle ~config ~handler ~ws_handler tls_cfg flow) 972 in 973 974 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in 975 976 Eio.Net.run_server socket connection_handler 977 ~max_connections:config.max_connections ~on_error 978 979(** Run an HTTP server with multi-domain parallelism. 980 981 @param sw Switch for resource management 982 @param net Eio network capability 983 @param domain_mgr Eio domain manager 984 @param config Server configuration (default: [default_config]) 985 @param ws_handler WebSocket handler (required for [Auto_websocket] mode) 986 @param handler Request handler *) 987let run_parallel ~sw ~net ~domain_mgr ?(config = default_config) ?ws_handler 988 handler = 989 (* Apply GC tuning if configured *) 990 (match config.gc_tuning with 991 | Some gc_config -> Gc_tune.apply ~config:gc_config () 992 | None -> ()); 993 994 let domain_count = max 1 config.domain_count in 995 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in 996 let socket = 997 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr 998 ~reuse_port:config.reuse_port net addr 999 in 1000 1001 let protocol_str = 1002 match (config.protocol, config.tls) with 1003 | Http1_only, None -> "HTTP/1.1" 1004 | Http1_only, Some _ -> "HTTP/1.1 (TLS)" 1005 | Http2_only, None -> "HTTP/2 h2c" 1006 | Http2_only, Some _ -> "HTTP/2 (TLS)" 1007 | Auto, None -> "HTTP/1.1 + HTTP/2 h2c" 1008 | Auto, Some _ -> "HTTP/1.1 + HTTP/2 (TLS, ALPN)" 1009 | Auto_websocket, None -> "HTTP/1.1 + HTTP/2 h2c + WebSocket" 1010 | Auto_websocket, Some _ -> "HTTP/1.1 + HTTP/2 + WebSocket (TLS, ALPN)" 1011 in 1012 traceln "Server listening on port %d (%s, %d domains)" config.port 1013 protocol_str domain_count; 1014 1015 (* Validate ws_handler for Auto_websocket mode *) 1016 let ws_handler = 1017 match (config.protocol, ws_handler) with 1018 | Auto_websocket, None -> 1019 failwith "WebSocket handler required for Auto_websocket mode" 1020 | Auto_websocket, Some h -> h 1021 | _, _ -> fun _ -> () 1022 in 1023 1024 let connection_handler flow _addr = 1025 if config.tcp_nodelay then set_tcp_nodelay flow; 1026 match config.tls with 1027 | None -> handle_connection ~config ~handler ~ws_handler flow 1028 | Some tls_config -> ( 1029 match Tls_config.Server.to_tls_config tls_config with 1030 | Error (`Msg msg) -> traceln "TLS config error: %s" msg 1031 | Ok tls_cfg -> 1032 Tls_handler.handle ~config ~handler ~ws_handler tls_cfg flow) 1033 in 1034 1035 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in 1036 1037 if domain_count <= 1 then 1038 Eio.Net.run_server socket connection_handler 1039 ~max_connections:config.max_connections ~on_error 1040 else 1041 Eio.Net.run_server socket connection_handler 1042 ~max_connections:config.max_connections ~on_error 1043 ~additional_domains:(domain_mgr, domain_count - 1)