ocaml http/1, http/2 and websocket client and server library
1(** Unified HTTP Server - High-performance server supporting HTTP/1.1, HTTP/2,
2 and WebSocket.
3
4 This module consolidates all server functionality into a single,
5 high-performance implementation. Protocol detection overhead is optional and
6 controlled via the [protocol] configuration.
7
8 {2 Protocol Modes}
9
10 - [Http1_only]: Fastest path. No protocol detection, direct HTTP/1.1
11 handling.
12 - [Http2_only]: HTTP/2 only. h2c (cleartext) or h2 (over TLS).
13 - [Auto]: Auto-detect protocol via connection preface peek (h2c) or ALPN
14 (TLS).
15 - [Auto_websocket]: Auto-detect with WebSocket upgrade support.
16
17 {2 Performance Features}
18
19 - GC tuning for high-throughput scenarios
20 - Cached Date headers (1-second resolution)
21 - Pre-built responses for zero-allocation hot paths
22 - Zero-copy bigstring responses
23 - Streaming response support
24 - Multi-domain parallelism via Eio
25
26 {2 Example}
27
28 {[
29 (* Fastest HTTP/1.1 server *)
30 let handler req = Server.respond "Hello, World!" in
31 Server.run ~sw ~net handler
32
33 (* Multi-protocol with auto-detection *)
34 let config = Server.{ default_config with protocol = Auto } in
35 Server.run ~sw ~net ~config handler
36 ]} *)
37
38open Eio.Std
39
40(** {1 Protocol Configuration} *)
41
42(** Protocol mode controls detection overhead and supported protocols. *)
43type protocol =
44 | Http1_only
45 (** Fastest: No protocol detection, direct HTTP/1.1 handling. With TLS:
46 HTTP/1.1 over TLS (no ALPN negotiation). *)
47 | Http2_only
48 (** HTTP/2 only. Without TLS: h2c (HTTP/2 cleartext). With TLS: h2 with
49 ALPN advertising only "h2". *)
50 | Auto
51 (** Auto-detect protocol. Without TLS: Peek for "PRI " preface (h2c),
52 fallback to HTTP/1.1. With TLS: ALPN negotiation. *)
53 | Auto_websocket
54 (** Auto-detect with WebSocket support. Same as [Auto] but also handles
55 WebSocket upgrade requests in HTTP/1.1 mode. *)
56
57(** {1 GC Tuning} *)
58
59module Gc_tune = struct
60 type config = {
61 minor_heap_size : int; (** Minor heap size in bytes. Default: 64MB *)
62 major_heap_increment : int;
63 (** Major heap increment in bytes. Default: 32MB *)
64 space_overhead : int; (** Space overhead percentage. Default: 200 *)
65 max_overhead : int; (** Max overhead percentage. Default: 500 *)
66 }
67
68 let default =
69 {
70 minor_heap_size = 64 * 1024 * 1024;
71 major_heap_increment = 32 * 1024 * 1024;
72 space_overhead = 200;
73 max_overhead = 500;
74 }
75
76 let aggressive =
77 {
78 minor_heap_size = 128 * 1024 * 1024;
79 major_heap_increment = 64 * 1024 * 1024;
80 space_overhead = 400;
81 max_overhead = 1000;
82 }
83
84 let tuned = ref false
85
86 let apply ?(config = default) () =
87 if not !tuned then begin
88 let ctrl = Gc.get () in
89 Gc.set
90 {
91 ctrl with
92 minor_heap_size = config.minor_heap_size / (Sys.word_size / 8);
93 major_heap_increment =
94 config.major_heap_increment / (Sys.word_size / 8);
95 space_overhead = config.space_overhead;
96 max_overhead = config.max_overhead;
97 };
98 tuned := true
99 end
100end
101
102module Prebuilt = Response.Prebuilt
103
104(** {1 Types} *)
105
106type config = {
107 (* Network *)
108 host : string; (** Bind address. Default: "0.0.0.0" *)
109 port : int; (** Listen port. Default: 8080 *)
110 backlog : int; (** Listen backlog. Default: 4096 *)
111 max_connections : int; (** Max concurrent connections. Default: 100000 *)
112 (* Parallelism *)
113 domain_count : int; (** Number of domains (CPUs) to use. Default: 1 *)
114 (* Protocol *)
115 protocol : protocol; (** Protocol mode. Default: Http1_only *)
116 (* Timeouts *)
117 read_timeout : float; (** Read timeout in seconds. Default: 60.0 *)
118 write_timeout : float; (** Write timeout in seconds. Default: 60.0 *)
119 idle_timeout : float; (** Idle connection timeout. Default: 120.0 *)
120 request_timeout : float; (** Request processing timeout. Default: 30.0 *)
121 (* Limits *)
122 max_header_size : int; (** Max header size in bytes. Default: 8192 *)
123 max_body_size : int64 option;
124 (** Max body size. None = unlimited. Default: None *)
125 (* Buffers *)
126 buffer_size : int; (** Read buffer size. Default: 16384 *)
127 (* Socket options *)
128 tcp_nodelay : bool; (** Set TCP_NODELAY on connections. Default: true *)
129 reuse_addr : bool; (** Set SO_REUSEADDR on listener. Default: true *)
130 reuse_port : bool; (** Set SO_REUSEPORT on listener. Default: true *)
131 (* TLS *)
132 tls : Tls_config.Server.t option; (** TLS config. None = plain HTTP *)
133 (* Performance *)
134 gc_tuning : Gc_tune.config option;
135 (** GC tuning config. Some = apply tuning. Default: Some Gc_tune.default
136 *)
137}
138(** Server configuration. *)
139
140(** Default configuration optimized for HTTP/1.1 performance. *)
141let default_config =
142 {
143 host = "0.0.0.0";
144 port = 8080;
145 backlog = 4096;
146 max_connections = 100000;
147 domain_count = 1;
148 protocol = Http1_only;
149 read_timeout = 60.0;
150 write_timeout = 60.0;
151 idle_timeout = 120.0;
152 request_timeout = 30.0;
153 max_header_size = 8192;
154 max_body_size = None;
155 buffer_size = 16384;
156 tcp_nodelay = true;
157 reuse_addr = true;
158 reuse_port = false;
159 tls = None;
160 gc_tuning = Some Gc_tune.default;
161 }
162
163(** Configuration for auto-detection mode. *)
164let auto_config = { default_config with protocol = Auto }
165
166(** Configuration for WebSocket support. *)
167let websocket_config = { default_config with protocol = Auto_websocket }
168
169(** {2 Config Builders} *)
170
171let with_port port config = { config with port }
172let with_host host config = { config with host }
173let with_backlog backlog config = { config with backlog }
174let with_max_connections max config = { config with max_connections = max }
175let with_domain_count count config = { config with domain_count = count }
176let with_protocol protocol config = { config with protocol }
177let with_read_timeout timeout config = { config with read_timeout = timeout }
178let with_write_timeout timeout config = { config with write_timeout = timeout }
179let with_idle_timeout timeout config = { config with idle_timeout = timeout }
180
181let with_request_timeout timeout config =
182 { config with request_timeout = timeout }
183
184let with_max_header_size size config = { config with max_header_size = size }
185let with_max_body_size size config = { config with max_body_size = Some size }
186let with_buffer_size size config = { config with buffer_size = size }
187let with_tcp_nodelay enabled config = { config with tcp_nodelay = enabled }
188let with_tls tls config = { config with tls = Some tls }
189let with_gc_tuning gc config = { config with gc_tuning = Some gc }
190let without_gc_tuning config = { config with gc_tuning = None }
191
192(** {1 Request/Response Types} *)
193
194(** Protocol version indicator. *)
195type protocol_version = HTTP_1_1 | HTTP_2
196
197type request = {
198 meth : H1.Method.t; (** HTTP method *)
199 target : string; (** Request target (path + query) *)
200 headers : (string * string) list; (** Headers as association list *)
201 body : string; (** Request body (empty for GET/HEAD) *)
202 version : protocol_version; (** Protocol version *)
203}
204(** Request type exposed to handlers. *)
205
206type response = Response.t
207type handler = request -> response
208type ws_handler = Websocket.t -> unit
209
210type ws_config = {
211 origin_policy : Websocket.origin_policy;
212 max_payload_size : int;
213}
214
215let default_ws_config =
216 {
217 origin_policy = Websocket.Allow_all;
218 max_payload_size = Websocket.default_max_payload_size;
219 }
220
221let respond ?(status = `OK) ?(headers = []) body =
222 Response.make ~status ~headers body
223
224let respond_bigstring = Response.bigstring
225let respond_prebuilt = Response.prebuilt
226let respond_empty = Response.empty
227let respond_text = Response.text
228let respond_html = Response.html
229let respond_json = Response.json
230
231(** {1 Internal: Socket Helpers} *)
232
233let set_tcp_nodelay flow =
234 match Eio_unix.Resource.fd_opt flow with
235 | None -> ()
236 | Some fd ->
237 Eio_unix.Fd.use_exn "set_tcp_nodelay" fd (fun unix_fd ->
238 Unix.setsockopt unix_fd Unix.TCP_NODELAY true)
239
240let shutdown_flow flow cmd =
241 try Eio.Flow.shutdown flow cmd with
242 | Unix.Unix_error (Unix.ENOTCONN, _, _) -> ()
243 | Eio.Io (Eio.Exn.X (Eio_unix.Unix_error (Unix.ENOTCONN, _, _)), _) -> ()
244
245let[@inline] writev flow iovecs =
246 match iovecs with
247 | [] -> `Ok 0
248 | [ { Httpun_types.IOVec.buffer; off; len } ] -> (
249 let cs = Cstruct.of_bigarray buffer ~off ~len in
250 match Eio.Flow.write flow [ cs ] with
251 | () -> `Ok len
252 | exception End_of_file -> `Closed)
253 | _ -> (
254 let lenv, cstructs =
255 List.fold_left_map
256 (fun acc { Httpun_types.IOVec.buffer; off; len } ->
257 (acc + len, Cstruct.of_bigarray buffer ~off ~len))
258 0 iovecs
259 in
260 match Eio.Flow.write flow cstructs with
261 | () -> `Ok lenv
262 | exception End_of_file -> `Closed)
263
264(** {1 Internal: Header Conversion} *)
265
266let h1_headers_to_list headers =
267 let result = ref [] in
268 H1.Headers.iter
269 ~f:(fun name value -> result := (name, value) :: !result)
270 headers;
271 List.rev !result
272
273let h2_headers_to_list headers =
274 let result = ref [] in
275 H2.Headers.iter
276 ~f:(fun name value -> result := (name, value) :: !result)
277 headers;
278 List.rev !result
279
280(** {1 Internal: Protocol Detection} *)
281
282(** HTTP/2 connection preface starts with "PRI " *)
283let h2_preface_prefix = "PRI "
284
285let h2_preface_prefix_len = 4
286
287let peek_bytes flow n =
288 let buf = Cstruct.create n in
289 try
290 let read = Eio.Flow.single_read flow buf in
291 Ok (Cstruct.to_string (Cstruct.sub buf 0 read))
292 with
293 | End_of_file -> Error `Eof
294 | exn -> Error (`Exn exn)
295
296let is_h2_preface data =
297 String.length data >= h2_preface_prefix_len
298 && String.sub data 0 h2_preface_prefix_len = h2_preface_prefix
299
300(** {1 Internal: HTTP/1.1 Connection Handler} *)
301
302module H1_handler = struct
303 let send_response reqd (response : Response.t) =
304 let date_header = ("date", Date_cache.get ()) in
305 match response.Response.body with
306 | Response.Prebuilt_body prebuilt -> Prebuilt.respond_h1 reqd prebuilt
307 | Response.Empty ->
308 let headers =
309 H1.Headers.of_list
310 (date_header :: ("content-length", "0") :: response.headers)
311 in
312 let resp = H1.Response.create ~headers response.status in
313 H1.Reqd.respond_with_string reqd resp ""
314 | Response.String body ->
315 let headers =
316 H1.Headers.of_list
317 (date_header
318 :: ("content-length", string_of_int (String.length body))
319 :: response.headers)
320 in
321 let resp = H1.Response.create ~headers response.status in
322 H1.Reqd.respond_with_string reqd resp body
323 | Response.Bigstring body ->
324 let headers =
325 H1.Headers.of_list
326 (date_header
327 :: ("content-length", string_of_int (Bigstringaf.length body))
328 :: response.headers)
329 in
330 let resp = H1.Response.create ~headers response.status in
331 H1.Reqd.respond_with_bigstring reqd resp body
332 | Response.Cstruct cs ->
333 let len = Cstruct.length cs in
334 let headers =
335 H1.Headers.of_list
336 (date_header
337 :: ("content-length", string_of_int len)
338 :: response.headers)
339 in
340 let resp = H1.Response.create ~headers response.status in
341 let body_writer = H1.Reqd.respond_with_streaming reqd resp in
342 H1.Body.Writer.write_bigstring body_writer ~off:cs.off ~len cs.buffer;
343 H1.Body.Writer.close body_writer
344 | Response.Stream { content_length; next } ->
345 let headers =
346 match content_length with
347 | Some len ->
348 H1.Headers.of_list
349 (date_header
350 :: ("content-length", Int64.to_string len)
351 :: response.headers)
352 | None ->
353 H1.Headers.of_list
354 (date_header
355 :: ("transfer-encoding", "chunked")
356 :: response.headers)
357 in
358 let resp = H1.Response.create ~headers response.status in
359 let body_writer = H1.Reqd.respond_with_streaming reqd resp in
360 let rec write_chunks () =
361 match next () with
362 | None -> H1.Body.Writer.close body_writer
363 | Some cs ->
364 H1.Body.Writer.write_bigstring body_writer ~off:0
365 ~len:(Cstruct.length cs) (Cstruct.to_bigarray cs);
366 H1.Body.Writer.flush body_writer (fun () -> ());
367 write_chunks ()
368 in
369 write_chunks ()
370
371 let send_error_response reqd status body_str =
372 let headers =
373 H1.Headers.of_list
374 [
375 ("date", Date_cache.get ());
376 ("content-length", string_of_int (String.length body_str));
377 ]
378 in
379 let resp = H1.Response.create ~headers status in
380 H1.Reqd.respond_with_string reqd resp body_str
381
382 let handle_request ~handler reqd req body =
383 let request =
384 {
385 meth = req.H1.Request.meth;
386 target = req.target;
387 headers = h1_headers_to_list req.headers;
388 body;
389 version = HTTP_1_1;
390 }
391 in
392 let response = handler request in
393 send_response reqd response
394
395 let handle ~handler ~ws_handler ?(ws_config = default_ws_config)
396 ?max_body_size ~initial_data flow =
397 let buffer_size = 16384 in
398 let read_buffer = Bigstringaf.create buffer_size in
399 let read_cstruct =
400 Cstruct.of_bigarray read_buffer ~off:0 ~len:buffer_size
401 in
402
403 let pending_data = ref initial_data in
404 let ws_upgrade = ref None in
405
406 let request_handler reqd =
407 let req = H1.Reqd.request reqd in
408 let h1_body = H1.Reqd.request_body reqd in
409
410 if Option.is_some ws_handler && Websocket.is_upgrade_request req.headers
411 then begin
412 match
413 Websocket.validate_origin ~policy:ws_config.origin_policy req.headers
414 with
415 | Error reason ->
416 H1.Body.Reader.close h1_body;
417 send_error_response reqd `Forbidden ("Forbidden: " ^ reason)
418 | Ok () -> (
419 match Websocket.validate_websocket_version req.headers with
420 | Error _reason ->
421 H1.Body.Reader.close h1_body;
422 let headers =
423 H1.Headers.of_list
424 [
425 ("date", Date_cache.get ());
426 ("content-length", "16");
427 ( "sec-websocket-version",
428 Websocket.supported_websocket_version );
429 ]
430 in
431 let resp = H1.Response.create ~headers (`Code 426) in
432 H1.Reqd.respond_with_string reqd resp "Upgrade Required"
433 | Ok () -> (
434 match Websocket.get_websocket_key req.headers with
435 | Some key ->
436 ws_upgrade := Some key;
437 H1.Body.Reader.close h1_body;
438 let accept = Websocket.compute_accept_key key in
439 let headers =
440 H1.Headers.of_list
441 [
442 ("upgrade", "websocket");
443 ("connection", "Upgrade");
444 ("sec-websocket-accept", accept);
445 ]
446 in
447 H1.Reqd.respond_with_upgrade reqd headers
448 | None ->
449 H1.Body.Reader.close h1_body;
450 send_error_response reqd `Bad_request "Bad Request"))
451 end
452 else begin
453 match req.meth with
454 | `GET | `HEAD | `DELETE | `OPTIONS | `CONNECT | `TRACE ->
455 H1.Body.Reader.close h1_body;
456 handle_request ~handler reqd req ""
457 | `POST | `PUT | `Other _ ->
458 let body_buffer = Buffer.create 4096 in
459 let body_size = ref 0 in
460 let rec read_body () =
461 H1.Body.Reader.schedule_read h1_body
462 ~on_eof:(fun () ->
463 let body = Buffer.contents body_buffer in
464 handle_request ~handler reqd req body)
465 ~on_read:(fun buf ~off ~len ->
466 let new_size = !body_size + len in
467 match max_body_size with
468 | Some max when Int64.of_int new_size > max ->
469 H1.Body.Reader.close h1_body;
470 send_error_response reqd (`Code 413)
471 "Request body too large"
472 | _ ->
473 body_size := new_size;
474 Buffer.add_string body_buffer
475 (Bigstringaf.substring buf ~off ~len);
476 read_body ())
477 in
478 read_body ()
479 end
480 in
481
482 let error_handler ?request:_ _error start_response =
483 let resp_body = start_response H1.Headers.empty in
484 H1.Body.Writer.write_string resp_body "Internal Server Error";
485 H1.Body.Writer.close resp_body
486 in
487
488 let conn = H1.Server_connection.create ~error_handler request_handler in
489 let shutdown = ref false in
490
491 let rec read_loop () =
492 if not !shutdown then
493 match H1.Server_connection.next_read_operation conn with
494 | `Read ->
495 let socket_data =
496 match Eio.Flow.single_read flow read_cstruct with
497 | n -> Cstruct.to_string (Cstruct.sub read_cstruct 0 n)
498 | exception End_of_file -> ""
499 in
500 let data =
501 if String.length !pending_data > 0 then begin
502 let combined = !pending_data ^ socket_data in
503 pending_data := "";
504 combined
505 end
506 else socket_data
507 in
508 let len = String.length data in
509 if len = 0 then begin
510 let (_ : int) =
511 H1.Server_connection.read_eof conn read_buffer ~off:0 ~len:0
512 in
513 shutdown := true
514 end
515 else begin
516 Bigstringaf.blit_from_string data ~src_off:0 read_buffer
517 ~dst_off:0 ~len;
518 let (_ : int) =
519 H1.Server_connection.read conn read_buffer ~off:0 ~len
520 in
521 read_loop ()
522 end
523 | `Yield -> H1.Server_connection.yield_reader conn read_loop
524 | `Close -> shutdown := true
525 | `Upgrade -> ()
526 in
527
528 let rec write_loop () =
529 match H1.Server_connection.next_write_operation conn with
530 | `Write iovecs ->
531 let write_result = writev flow iovecs in
532 H1.Server_connection.report_write_result conn write_result;
533 write_loop ()
534 | `Yield ->
535 if not !shutdown then begin
536 let continue = Eio.Promise.create () in
537 H1.Server_connection.yield_writer conn (fun () ->
538 Eio.Promise.resolve (snd continue) ());
539 Eio.Promise.await (fst continue);
540 write_loop ()
541 end
542 | `Close _ ->
543 shutdown := true;
544 shutdown_flow flow `Send
545 | `Upgrade -> ()
546 in
547
548 Fiber.both read_loop write_loop;
549
550 (* Handle WebSocket upgrade if requested *)
551 match !ws_upgrade with
552 | Some _key -> (
553 match ws_handler with
554 | Some ws_h ->
555 let ws =
556 {
557 Websocket.flow :> Eio.Flow.two_way_ty Eio.Std.r;
558 closed = false;
559 is_client = false;
560 read_buf = Buffer.create 4096;
561 max_payload_size = ws_config.max_payload_size;
562 }
563 in
564 (try ws_h ws with _ -> ());
565 if Websocket.is_open ws then Websocket.close ws
566 | None -> ())
567 | None -> ()
568
569 let handle_direct ?max_body_size ~handler flow =
570 let buffer_size = 16384 in
571 let read_buffer = Bigstringaf.create buffer_size in
572 let read_cstruct =
573 Cstruct.of_bigarray read_buffer ~off:0 ~len:buffer_size
574 in
575
576 let request_handler reqd =
577 let req = H1.Reqd.request reqd in
578 let h1_body = H1.Reqd.request_body reqd in
579
580 match req.meth with
581 | `GET | `HEAD | `DELETE | `OPTIONS | `CONNECT | `TRACE ->
582 H1.Body.Reader.close h1_body;
583 handle_request ~handler reqd req ""
584 | `POST | `PUT | `Other _ ->
585 let body_buffer = Buffer.create 4096 in
586 let body_size = ref 0 in
587 let rec read_body () =
588 H1.Body.Reader.schedule_read h1_body
589 ~on_eof:(fun () ->
590 let body = Buffer.contents body_buffer in
591 handle_request ~handler reqd req body)
592 ~on_read:(fun buf ~off ~len ->
593 let new_size = !body_size + len in
594 match max_body_size with
595 | Some max when Int64.of_int new_size > max ->
596 H1.Body.Reader.close h1_body;
597 send_error_response reqd (`Code 413)
598 "Request body too large"
599 | _ ->
600 body_size := new_size;
601 Buffer.add_string body_buffer
602 (Bigstringaf.substring buf ~off ~len);
603 read_body ())
604 in
605 read_body ()
606 in
607
608 let error_handler ?request:_ _error start_response =
609 let resp_body = start_response H1.Headers.empty in
610 H1.Body.Writer.write_string resp_body "Internal Server Error";
611 H1.Body.Writer.close resp_body
612 in
613
614 let conn = H1.Server_connection.create ~error_handler request_handler in
615 let shutdown = ref false in
616
617 let rec read_loop () =
618 if not !shutdown then
619 match H1.Server_connection.next_read_operation conn with
620 | `Read -> (
621 match Eio.Flow.single_read flow read_cstruct with
622 | n ->
623 let (_ : int) =
624 H1.Server_connection.read conn read_buffer ~off:0 ~len:n
625 in
626 read_loop ()
627 | exception End_of_file ->
628 let (_ : int) =
629 H1.Server_connection.read_eof conn read_buffer ~off:0 ~len:0
630 in
631 shutdown := true)
632 | `Yield -> H1.Server_connection.yield_reader conn read_loop
633 | `Close | `Upgrade -> shutdown := true
634 in
635
636 let rec write_loop () =
637 if not !shutdown then
638 match H1.Server_connection.next_write_operation conn with
639 | `Write iovecs ->
640 let write_result = writev flow iovecs in
641 H1.Server_connection.report_write_result conn write_result;
642 write_loop ()
643 | `Yield ->
644 let continue = Eio.Promise.create () in
645 H1.Server_connection.yield_writer conn (fun () ->
646 Eio.Promise.resolve (snd continue) ());
647 Eio.Promise.await (fst continue);
648 write_loop ()
649 | `Close _ ->
650 shutdown := true;
651 shutdown_flow flow `Send
652 | `Upgrade -> shutdown := true
653 in
654
655 Fiber.both read_loop write_loop
656end
657
658module H2_handler = struct
659 let send_h2_response reqd (response : Response.t) =
660 let h2_status = (response.Response.status :> H2.Status.t) in
661 match response.Response.body with
662 | Response.Prebuilt_body prebuilt -> Prebuilt.respond_h2 reqd prebuilt
663 | Response.Empty ->
664 let headers =
665 H2.Headers.of_list (("content-length", "0") :: response.headers)
666 in
667 let resp = H2.Response.create ~headers h2_status in
668 H2.Reqd.respond_with_string reqd resp ""
669 | Response.String body ->
670 let headers =
671 H2.Headers.of_list
672 (("content-length", string_of_int (String.length body))
673 :: response.headers)
674 in
675 let resp = H2.Response.create ~headers h2_status in
676 H2.Reqd.respond_with_string reqd resp body
677 | Response.Bigstring body ->
678 let headers =
679 H2.Headers.of_list
680 (("content-length", string_of_int (Bigstringaf.length body))
681 :: response.headers)
682 in
683 let resp = H2.Response.create ~headers h2_status in
684 H2.Reqd.respond_with_bigstring reqd resp body
685 | Response.Cstruct cs ->
686 let len = Cstruct.length cs in
687 let headers =
688 H2.Headers.of_list
689 (("content-length", string_of_int len) :: response.headers)
690 in
691 let resp = H2.Response.create ~headers h2_status in
692 let body_writer = H2.Reqd.respond_with_streaming reqd resp in
693 H2.Body.Writer.write_bigstring body_writer ~off:cs.off ~len cs.buffer;
694 H2.Body.Writer.close body_writer
695 | Response.Stream { content_length; next } ->
696 let headers =
697 match content_length with
698 | Some len ->
699 H2.Headers.of_list
700 (("content-length", Int64.to_string len) :: response.headers)
701 | None -> H2.Headers.of_list response.headers
702 in
703 let resp = H2.Response.create ~headers h2_status in
704 let body_writer = H2.Reqd.respond_with_streaming reqd resp in
705 let rec write_chunks () =
706 match next () with
707 | None -> H2.Body.Writer.close body_writer
708 | Some cs ->
709 H2.Body.Writer.write_bigstring body_writer ~off:cs.off
710 ~len:(Cstruct.length cs) cs.buffer;
711 H2.Body.Writer.flush body_writer (fun _result -> ());
712 write_chunks ()
713 in
714 write_chunks ()
715
716 let handle_h2_request ~handler reqd req body =
717 let target =
718 match H2.Headers.get req.H2.Request.headers ":path" with
719 | Some p -> p
720 | None -> "/"
721 in
722 let request =
723 {
724 meth = req.meth;
725 target;
726 headers = h2_headers_to_list req.headers;
727 body;
728 version = HTTP_2;
729 }
730 in
731 let response = handler request in
732 send_h2_response reqd response
733
734 let handle ~handler ~initial_data flow =
735 let read_buffer_size = 0x4000 in
736 let read_buffer = Bigstringaf.create read_buffer_size in
737 let pending_data = ref initial_data in
738
739 let request_handler reqd =
740 let req = H2.Reqd.request reqd in
741 let body_reader = H2.Reqd.request_body reqd in
742
743 match req.meth with
744 | `GET | `HEAD ->
745 H2.Body.Reader.close body_reader;
746 handle_h2_request ~handler reqd req ""
747 | _ ->
748 let body_buffer = Buffer.create 4096 in
749 let rec read_body () =
750 H2.Body.Reader.schedule_read body_reader
751 ~on_eof:(fun () ->
752 let body = Buffer.contents body_buffer in
753 handle_h2_request ~handler reqd req body)
754 ~on_read:(fun buf ~off ~len ->
755 Buffer.add_string body_buffer
756 (Bigstringaf.substring buf ~off ~len);
757 read_body ())
758 in
759 read_body ()
760 in
761
762 let error_handler ?request:_ _error start_response =
763 let resp_body = start_response H2.Headers.empty in
764 H2.Body.Writer.write_string resp_body "Internal Server Error";
765 H2.Body.Writer.close resp_body
766 in
767
768 let conn = H2.Server_connection.create ~error_handler request_handler in
769 let shutdown = ref false in
770
771 let read_loop () =
772 let buf_off = ref 0 in
773 let buf_len = ref 0 in
774
775 let compress () =
776 if !buf_len > 0 && !buf_off > 0 then begin
777 Bigstringaf.blit read_buffer ~src_off:!buf_off read_buffer ~dst_off:0
778 ~len:!buf_len;
779 buf_off := 0
780 end
781 else if !buf_len = 0 then buf_off := 0
782 in
783
784 let rec loop () =
785 if not !shutdown then
786 match H2.Server_connection.next_read_operation conn with
787 | `Read ->
788 compress ();
789 let available = read_buffer_size - !buf_off - !buf_len in
790 if available > 0 then begin
791 let cs =
792 Cstruct.of_bigarray read_buffer ~off:(!buf_off + !buf_len)
793 ~len:available
794 in
795 let n =
796 try Eio.Flow.single_read flow cs with End_of_file -> 0
797 in
798 if n = 0 then begin
799 let _ =
800 H2.Server_connection.read_eof conn read_buffer ~off:!buf_off
801 ~len:!buf_len
802 in
803 shutdown := true
804 end
805 else begin
806 buf_len := !buf_len + n;
807 let consumed =
808 H2.Server_connection.read conn read_buffer ~off:!buf_off
809 ~len:!buf_len
810 in
811 buf_off := !buf_off + consumed;
812 buf_len := !buf_len - consumed;
813 loop ()
814 end
815 end
816 else loop ()
817 | `Close -> shutdown := true
818 | `Yield ->
819 let continue = Eio.Promise.create () in
820 H2.Server_connection.yield_reader conn (fun () ->
821 Eio.Promise.resolve (snd continue) ());
822 Eio.Promise.await (fst continue);
823 loop ()
824 in
825
826 if String.length !pending_data > 0 then begin
827 let len = String.length !pending_data in
828 Bigstringaf.blit_from_string !pending_data ~src_off:0 read_buffer
829 ~dst_off:0 ~len;
830 buf_len := len;
831 pending_data := "";
832 let consumed = H2.Server_connection.read conn read_buffer ~off:0 ~len in
833 buf_off := consumed;
834 buf_len := len - consumed
835 end;
836 loop ()
837 in
838
839 let write_loop () =
840 let rec loop () =
841 if not !shutdown then
842 match H2.Server_connection.next_write_operation conn with
843 | `Write iovecs ->
844 let cstructs =
845 List.map
846 (fun iov ->
847 Cstruct.of_bigarray ~off:iov.H2.IOVec.off
848 ~len:iov.H2.IOVec.len iov.H2.IOVec.buffer)
849 iovecs
850 in
851 let len =
852 List.fold_left (fun acc iov -> acc + iov.H2.IOVec.len) 0 iovecs
853 in
854 Eio.Flow.write flow cstructs;
855 H2.Server_connection.report_write_result conn (`Ok len);
856 loop ()
857 | `Yield ->
858 let continue = Eio.Promise.create () in
859 H2.Server_connection.yield_writer conn (fun () ->
860 Eio.Promise.resolve (snd continue) ());
861 Eio.Promise.await (fst continue);
862 loop ()
863 | `Close _ -> shutdown := true
864 in
865 loop ()
866 in
867
868 Fiber.both read_loop write_loop
869
870 (** Direct H2 handler - no protocol detection *)
871 let handle_direct ~handler flow = handle ~handler ~initial_data:"" flow
872end
873
874(** {1 Internal: TLS Connection Handler} *)
875
876module Tls_handler = struct
877 let handle ~config ~handler ~ws_handler tls_cfg flow =
878 try
879 let tls_flow = Tls_eio.server_of_flow tls_cfg flow in
880 let max_body_size = config.max_body_size in
881 match config.protocol with
882 | Http1_only -> H1_handler.handle_direct ?max_body_size ~handler tls_flow
883 | Http2_only -> H2_handler.handle_direct ~handler tls_flow
884 | Auto | Auto_websocket -> (
885 match Tls_config.negotiated_protocol tls_flow with
886 | Some Tls_config.HTTP_2 -> H2_handler.handle_direct ~handler tls_flow
887 | Some Tls_config.HTTP_1_1 | None ->
888 if config.protocol = Auto_websocket then
889 H1_handler.handle ~handler ~ws_handler:(Some ws_handler)
890 ?max_body_size ~initial_data:"" tls_flow
891 else H1_handler.handle_direct ?max_body_size ~handler tls_flow)
892 with
893 | Tls_eio.Tls_failure failure ->
894 traceln "TLS error: %s" (Tls_config.failure_to_string failure)
895 | exn -> traceln "Connection error: %s" (Printexc.to_string exn)
896end
897
898(** {1 Internal: Connection Handler} *)
899
900let handle_connection ~config ~handler ~ws_handler flow =
901 let max_body_size = config.max_body_size in
902 match config.protocol with
903 | Http1_only -> H1_handler.handle_direct ?max_body_size ~handler flow
904 | Http2_only -> H2_handler.handle_direct ~handler flow
905 | Auto | Auto_websocket -> (
906 match peek_bytes flow h2_preface_prefix_len with
907 | Error `Eof -> ()
908 | Error (`Exn exn) ->
909 traceln "Connection error: %s" (Printexc.to_string exn)
910 | Ok initial_data ->
911 if is_h2_preface initial_data then
912 H2_handler.handle ~handler ~initial_data flow
913 else if config.protocol = Auto_websocket then
914 H1_handler.handle ~handler ~ws_handler:(Some ws_handler)
915 ?max_body_size ~initial_data flow
916 else
917 H1_handler.handle ~handler ~ws_handler:None ?max_body_size
918 ~initial_data flow)
919
920(** {1 Public API} *)
921
922(** Run an HTTP server.
923
924 @param sw Switch for resource management
925 @param net Eio network capability
926 @param config Server configuration (default: [default_config])
927 @param ws_handler WebSocket handler (required for [Auto_websocket] mode)
928 @param handler Request handler *)
929let run ~sw ~net ?(config = default_config) ?ws_handler handler =
930 (* Apply GC tuning if configured *)
931 (match config.gc_tuning with
932 | Some gc_config -> Gc_tune.apply ~config:gc_config ()
933 | None -> ());
934
935 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in
936 let socket =
937 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr
938 ~reuse_port:config.reuse_port net addr
939 in
940
941 let protocol_str =
942 match (config.protocol, config.tls) with
943 | Http1_only, None -> "HTTP/1.1"
944 | Http1_only, Some _ -> "HTTP/1.1 (TLS)"
945 | Http2_only, None -> "HTTP/2 h2c"
946 | Http2_only, Some _ -> "HTTP/2 (TLS)"
947 | Auto, None -> "HTTP/1.1 + HTTP/2 h2c"
948 | Auto, Some _ -> "HTTP/1.1 + HTTP/2 (TLS, ALPN)"
949 | Auto_websocket, None -> "HTTP/1.1 + HTTP/2 h2c + WebSocket"
950 | Auto_websocket, Some _ -> "HTTP/1.1 + HTTP/2 + WebSocket (TLS, ALPN)"
951 in
952 traceln "Server listening on port %d (%s)" config.port protocol_str;
953
954 (* Validate ws_handler for Auto_websocket mode *)
955 let ws_handler =
956 match (config.protocol, ws_handler) with
957 | Auto_websocket, None ->
958 failwith "WebSocket handler required for Auto_websocket mode"
959 | Auto_websocket, Some h -> h
960 | _, _ -> fun _ -> () (* Dummy handler for non-WS modes *)
961 in
962
963 let connection_handler flow _addr =
964 if config.tcp_nodelay then set_tcp_nodelay flow;
965 match config.tls with
966 | None -> handle_connection ~config ~handler ~ws_handler flow
967 | Some tls_config -> (
968 match Tls_config.Server.to_tls_config tls_config with
969 | Error (`Msg msg) -> traceln "TLS config error: %s" msg
970 | Ok tls_cfg ->
971 Tls_handler.handle ~config ~handler ~ws_handler tls_cfg flow)
972 in
973
974 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in
975
976 Eio.Net.run_server socket connection_handler
977 ~max_connections:config.max_connections ~on_error
978
979(** Run an HTTP server with multi-domain parallelism.
980
981 @param sw Switch for resource management
982 @param net Eio network capability
983 @param domain_mgr Eio domain manager
984 @param config Server configuration (default: [default_config])
985 @param ws_handler WebSocket handler (required for [Auto_websocket] mode)
986 @param handler Request handler *)
987let run_parallel ~sw ~net ~domain_mgr ?(config = default_config) ?ws_handler
988 handler =
989 (* Apply GC tuning if configured *)
990 (match config.gc_tuning with
991 | Some gc_config -> Gc_tune.apply ~config:gc_config ()
992 | None -> ());
993
994 let domain_count = max 1 config.domain_count in
995 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in
996 let socket =
997 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr
998 ~reuse_port:config.reuse_port net addr
999 in
1000
1001 let protocol_str =
1002 match (config.protocol, config.tls) with
1003 | Http1_only, None -> "HTTP/1.1"
1004 | Http1_only, Some _ -> "HTTP/1.1 (TLS)"
1005 | Http2_only, None -> "HTTP/2 h2c"
1006 | Http2_only, Some _ -> "HTTP/2 (TLS)"
1007 | Auto, None -> "HTTP/1.1 + HTTP/2 h2c"
1008 | Auto, Some _ -> "HTTP/1.1 + HTTP/2 (TLS, ALPN)"
1009 | Auto_websocket, None -> "HTTP/1.1 + HTTP/2 h2c + WebSocket"
1010 | Auto_websocket, Some _ -> "HTTP/1.1 + HTTP/2 + WebSocket (TLS, ALPN)"
1011 in
1012 traceln "Server listening on port %d (%s, %d domains)" config.port
1013 protocol_str domain_count;
1014
1015 (* Validate ws_handler for Auto_websocket mode *)
1016 let ws_handler =
1017 match (config.protocol, ws_handler) with
1018 | Auto_websocket, None ->
1019 failwith "WebSocket handler required for Auto_websocket mode"
1020 | Auto_websocket, Some h -> h
1021 | _, _ -> fun _ -> ()
1022 in
1023
1024 let connection_handler flow _addr =
1025 if config.tcp_nodelay then set_tcp_nodelay flow;
1026 match config.tls with
1027 | None -> handle_connection ~config ~handler ~ws_handler flow
1028 | Some tls_config -> (
1029 match Tls_config.Server.to_tls_config tls_config with
1030 | Error (`Msg msg) -> traceln "TLS config error: %s" msg
1031 | Ok tls_cfg ->
1032 Tls_handler.handle ~config ~handler ~ws_handler tls_cfg flow)
1033 in
1034
1035 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in
1036
1037 if domain_count <= 1 then
1038 Eio.Net.run_server socket connection_handler
1039 ~max_connections:config.max_connections ~on_error
1040 else
1041 Eio.Net.run_server socket connection_handler
1042 ~max_connections:config.max_connections ~on_error
1043 ~additional_domains:(domain_mgr, domain_count - 1)