ocaml http/1, http/2 and websocket client and server library
1(** HTTP/1.1 Server implementation using h1.
2
3 This module provides HTTP/1.1 server functionality built on Eio.
4
5 Features:
6 - Lazy body reading: request body is not read until accessed
7 - Zero-copy responses: bigstring bodies avoid copying
8 - Streaming responses: write large responses without buffering
9 - Connection pooling via buffer reuse *)
10
11open Eio.Std
12
13(** {1 Read Buffer Pool} *)
14
15(** Lock-free buffer pool using Treiber stack via Kcas for thread-safe pooling.
16*)
17module Read_buffer_pool : sig
18 val acquire : unit -> Bigstringaf.t * Cstruct.t
19 val release : Bigstringaf.t -> unit
20end = struct
21 let buffer_size = 0x4000
22 let max_pooled = 256
23 let pool : Bigstringaf.t list Kcas.Loc.t = Kcas.Loc.make []
24 let pool_size : int Kcas.Loc.t = Kcas.Loc.make 0
25
26 let acquire () =
27 let buf =
28 Kcas.Xt.commit
29 {
30 tx =
31 (fun ~xt ->
32 match Kcas.Xt.get ~xt pool with
33 | [] -> None
34 | buf :: rest ->
35 Kcas.Xt.set ~xt pool rest;
36 Kcas.Xt.set ~xt pool_size (Kcas.Xt.get ~xt pool_size - 1);
37 Some buf);
38 }
39 in
40 let buf =
41 match buf with Some b -> b | None -> Bigstringaf.create buffer_size
42 in
43 (buf, Cstruct.of_bigarray buf ~off:0 ~len:buffer_size)
44
45 let release buf =
46 Kcas.Xt.commit
47 {
48 tx =
49 (fun ~xt ->
50 let size = Kcas.Xt.get ~xt pool_size in
51 if size < max_pooled then begin
52 Kcas.Xt.set ~xt pool (buf :: Kcas.Xt.get ~xt pool);
53 Kcas.Xt.set ~xt pool_size (size + 1)
54 end);
55 }
56end
57
58(** {1 Configuration} *)
59
60type config = {
61 (* Network *)
62 host : string; (** Bind address. Default: "0.0.0.0" *)
63 port : int; (** Listen port. Default: 8080 *)
64 backlog : int; (** Listen backlog. Default: 2048 *)
65 max_connections : int; (** Max concurrent connections. Default: 10000 *)
66 (* Parallelism *)
67 domain_count : int; (** Number of domains (CPUs) to use. Default: 1 *)
68 (* Timeouts *)
69 read_timeout : float; (** Read timeout in seconds. Default: 60.0 *)
70 write_timeout : float; (** Write timeout in seconds. Default: 60.0 *)
71 idle_timeout : float; (** Idle connection timeout. Default: 120.0 *)
72 request_timeout : float; (** Request processing timeout. Default: 30.0 *)
73 (* Limits *)
74 max_header_size : int; (** Max header size in bytes. Default: 8192 *)
75 max_body_size : int64 option;
76 (** Max body size. None = unlimited. Default: None *)
77 (* Buffers *)
78 buffer_size : int; (** Read buffer size. Default: 16384 *)
79 (* TLS *)
80 tls : Tls_config.Server.t option; (** TLS config. None = plain HTTP *)
81 (* Socket options *)
82 tcp_nodelay : bool; (** Enable TCP_NODELAY (disable Nagle). Default: true *)
83 reuse_addr : bool; (** Enable SO_REUSEADDR. Default: true *)
84 reuse_port : bool;
85 (** Enable SO_REUSEPORT for multi-process scaling. Default: true *)
86}
87(** Server configuration *)
88
89let default_config =
90 {
91 host = "0.0.0.0";
92 port = 8080;
93 backlog = 2048;
94 max_connections = 10000;
95 domain_count = 1;
96 read_timeout = 60.0;
97 write_timeout = 60.0;
98 idle_timeout = 120.0;
99 request_timeout = 30.0;
100 max_header_size = 8192;
101 max_body_size = None;
102 buffer_size = 16384;
103 tls = None;
104 tcp_nodelay = true;
105 reuse_addr = true;
106 reuse_port = true;
107 }
108
109(** {2 Config builders} *)
110
111let with_port port config = { config with port }
112let with_host host config = { config with host }
113let with_backlog backlog config = { config with backlog }
114let with_max_connections max config = { config with max_connections = max }
115let with_read_timeout timeout config = { config with read_timeout = timeout }
116let with_write_timeout timeout config = { config with write_timeout = timeout }
117let with_idle_timeout timeout config = { config with idle_timeout = timeout }
118
119let with_request_timeout timeout config =
120 { config with request_timeout = timeout }
121
122let with_domain_count count config = { config with domain_count = count }
123let with_max_header_size size config = { config with max_header_size = size }
124let with_max_body_size size config = { config with max_body_size = Some size }
125let with_buffer_size size config = { config with buffer_size = size }
126let with_tls tls config = { config with tls = Some tls }
127let with_tcp_nodelay enabled config = { config with tcp_nodelay = enabled }
128let with_reuse_addr enabled config = { config with reuse_addr = enabled }
129let with_reuse_port enabled config = { config with reuse_port = enabled }
130
131(** {1 GC Tuning} *)
132
133type gc_config = {
134 minor_heap_size : int;
135 major_heap_increment : int;
136 space_overhead : int;
137 max_overhead : int;
138}
139
140let default_gc_config =
141 {
142 minor_heap_size = 64 * 1024 * 1024;
143 major_heap_increment = 16 * 1024 * 1024;
144 space_overhead = 120;
145 max_overhead = 500;
146 }
147
148let tune_gc ?(config = default_gc_config) () =
149 let ctrl = Gc.get () in
150 Gc.set
151 {
152 ctrl with
153 minor_heap_size = config.minor_heap_size / (Sys.word_size / 8);
154 major_heap_increment = config.major_heap_increment / (Sys.word_size / 8);
155 space_overhead = config.space_overhead;
156 max_overhead = config.max_overhead;
157 }
158
159let gc_tuned = ref false
160
161let ensure_gc_tuned () =
162 if not !gc_tuned then begin
163 tune_gc ();
164 gc_tuned := true
165 end
166
167(** {1 Cached Prebuilt Response} *)
168
169type cached_prebuilt = {
170 base_response : H1.Response.t;
171 body : Bigstringaf.t;
172 cached_response : H1.Response.t Atomic.t;
173 cached_second : int Atomic.t;
174}
175
176let make_cached_prebuilt h1_response body =
177 let now = int_of_float (Unix.gettimeofday ()) in
178 let headers =
179 H1.Headers.add h1_response.H1.Response.headers "Date" (Date_cache.get ())
180 in
181 let resp = H1.Response.create ~headers h1_response.H1.Response.status in
182 {
183 base_response = h1_response;
184 body;
185 cached_response = Atomic.make resp;
186 cached_second = Atomic.make now;
187 }
188
189let[@inline] get_cached_response cached =
190 let now = int_of_float (Unix.gettimeofday ()) in
191 let last = Atomic.get cached.cached_second in
192 if now <> last then begin
193 let headers =
194 H1.Headers.add cached.base_response.H1.Response.headers "Date"
195 (Date_cache.get ())
196 in
197 let resp =
198 H1.Response.create ~headers cached.base_response.H1.Response.status
199 in
200 Atomic.set cached.cached_response resp;
201 Atomic.set cached.cached_second now
202 end;
203 Atomic.get cached.cached_response
204
205(** {1 Socket Helpers} *)
206
207let set_tcp_nodelay flow =
208 match Eio_unix.Resource.fd_opt flow with
209 | None -> ()
210 | Some fd ->
211 Eio_unix.Fd.use_exn "set_tcp_nodelay" fd (fun unix_fd ->
212 Unix.setsockopt unix_fd Unix.TCP_NODELAY true)
213
214(** {1 Body Types} *)
215
216type body_reader = {
217 read : unit -> string;
218 (** Read the entire body as a string. Can only be called once. *)
219 read_stream : unit -> Cstruct.t option;
220 (** Read body in chunks. Returns None when done. *)
221 close : unit -> unit; (** Close the body reader without reading. *)
222}
223(** Lazy body reader - body is only read when [read] is called *)
224
225(** Response body - supports string, bigstring, streaming, and pre-built *)
226type response_body =
227 | Body_string of string
228 | Body_bigstring of Bigstringaf.t
229 | Body_prebuilt of { h1_response : H1.Response.t; body : Bigstringaf.t }
230 | Body_cached_prebuilt of cached_prebuilt
231 | Body_stream of {
232 content_length : int64 option;
233 next : unit -> Cstruct.t option;
234 }
235
236(** {1 Request and Response Types} *)
237
238type request = {
239 meth : H1.Method.t;
240 target : string;
241 headers : H1.Headers.t;
242 body_reader : body_reader;
243 (** Lazy body reader - call [body_reader.read ()] to get the body *)
244}
245(** Request type with lazy body reading *)
246
247(** Read the request body as a string (convenience function) *)
248let read_body req = req.body_reader.read ()
249
250(** Read the request body as a stream (for large bodies) *)
251let read_body_stream req = req.body_reader.read_stream
252
253(** Close the request body without reading (for ignored bodies) *)
254let close_body req = req.body_reader.close ()
255
256type response = {
257 status : H1.Status.t;
258 headers : (string * string) list;
259 response_body : response_body;
260}
261(** Response type with optimized body variants *)
262
263type handler = request -> response
264(** Handler type *)
265
266let respond ?(status = `OK) ?(headers = []) body =
267 { status; headers; response_body = Body_string body }
268
269let respond_bigstring ?(status = `OK) ?(headers = []) bstr =
270 { status; headers; response_body = Body_bigstring bstr }
271
272let respond_stream ?(status = `OK) ?(headers = []) ?content_length next =
273 { status; headers; response_body = Body_stream { content_length; next } }
274
275let respond_prebuilt h1_response body =
276 {
277 status = `OK;
278 headers = [];
279 response_body = Body_prebuilt { h1_response; body };
280 }
281
282let respond_cached_prebuilt cached =
283 { status = `OK; headers = []; response_body = Body_cached_prebuilt cached }
284
285type static_response = response
286
287let make_static_response cached : static_response =
288 { status = `OK; headers = []; response_body = Body_cached_prebuilt cached }
289
290let[@inline always] respond_static (r : static_response) : response = r
291let make_h1_headers headers_list = H1.Headers.of_list headers_list
292
293let make_h1_response ?(status = `OK) headers =
294 H1.Response.create ~headers status
295
296(** {1 Internal Helpers} *)
297
298(** Write all IOVecs to the flow - optimized version *)
299let write_iovecs flow iovecs =
300 match iovecs with
301 | [] -> ()
302 | [ iov ] ->
303 (* Fast path for single iovec - common case *)
304 let cs =
305 Cstruct.of_bigarray ~off:iov.Httpun_types.IOVec.off
306 ~len:iov.Httpun_types.IOVec.len iov.Httpun_types.IOVec.buffer
307 in
308 Eio.Flow.write flow [ cs ]
309 | _ ->
310 (* Multiple iovecs - build list directly *)
311 let cstructs =
312 List.map
313 (fun iov ->
314 Cstruct.of_bigarray ~off:iov.Httpun_types.IOVec.off
315 ~len:iov.Httpun_types.IOVec.len iov.Httpun_types.IOVec.buffer)
316 iovecs
317 in
318 Eio.Flow.write flow cstructs
319
320(** Check if method typically has no body *)
321let[@inline] method_has_no_body = function
322 | `GET | `HEAD | `DELETE | `OPTIONS | `CONNECT | `TRACE -> true
323 | `POST | `PUT | `PATCH | `Other _ -> false
324
325exception Body_too_large
326
327let make_body_reader ?max_body_size (h1_body : H1.Body.Reader.t) : body_reader =
328 let read_called = ref false in
329 let closed = ref false in
330 let body_buffer = Buffer.create 4096 in
331 let current_size = ref 0 in
332 let too_large = ref false in
333 let done_promise, done_resolver = Eio.Promise.create () in
334
335 let check_size len =
336 match max_body_size with
337 | Some max when Int64.of_int (!current_size + len) > max ->
338 too_large := true;
339 H1.Body.Reader.close h1_body;
340 Eio.Promise.resolve done_resolver ();
341 false
342 | _ ->
343 current_size := !current_size + len;
344 true
345 in
346
347 let rec schedule_read () =
348 if (not !closed) && not !too_large then
349 H1.Body.Reader.schedule_read h1_body
350 ~on_eof:(fun () -> Eio.Promise.resolve done_resolver ())
351 ~on_read:(fun buf ~off ~len ->
352 if check_size len then begin
353 Buffer.add_string body_buffer (Bigstringaf.substring buf ~off ~len);
354 schedule_read ()
355 end)
356 in
357
358 {
359 read =
360 (fun () ->
361 if !read_called then failwith "Body already read"
362 else begin
363 read_called := true;
364 if !closed then ""
365 else begin
366 schedule_read ();
367 Eio.Promise.await done_promise;
368 if !too_large then raise Body_too_large
369 else Buffer.contents body_buffer
370 end
371 end);
372 read_stream =
373 (fun () ->
374 if !closed || !too_large then None
375 else begin
376 let chunk_promise, chunk_resolver = Eio.Promise.create () in
377 let got_chunk = ref false in
378 H1.Body.Reader.schedule_read h1_body
379 ~on_eof:(fun () ->
380 if not !got_chunk then Eio.Promise.resolve chunk_resolver None)
381 ~on_read:(fun buf ~off ~len ->
382 got_chunk := true;
383 if check_size len then begin
384 let cs = Cstruct.of_bigarray ~off ~len buf in
385 Eio.Promise.resolve chunk_resolver (Some cs)
386 end
387 else Eio.Promise.resolve chunk_resolver None);
388 Eio.Promise.await chunk_promise
389 end);
390 close =
391 (fun () ->
392 if not !closed then begin
393 closed := true;
394 H1.Body.Reader.close h1_body
395 end);
396 }
397
398(** Create a no-op body reader for methods without bodies *)
399let empty_body_reader () : body_reader =
400 {
401 read = (fun () -> "");
402 read_stream = (fun () -> None);
403 close = (fun () -> ());
404 }
405
406let respond_413 reqd =
407 let body = "Request Entity Too Large" in
408 let headers =
409 H1.Headers.of_list
410 [
411 ("Date", Date_cache.get ());
412 ("Content-Length", string_of_int (String.length body));
413 ("Connection", "close");
414 ]
415 in
416 let resp = H1.Response.create ~headers (`Code 413) in
417 H1.Reqd.respond_with_string reqd resp body
418
419let respond_408 reqd =
420 let body = "Request Timeout" in
421 let headers =
422 H1.Headers.of_list
423 [
424 ("Date", Date_cache.get ());
425 ("Content-Length", string_of_int (String.length body));
426 ("Connection", "close");
427 ]
428 in
429 let resp = H1.Response.create ~headers (`Code 408) in
430 H1.Reqd.respond_with_string reqd resp body
431
432let handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size
433 handler flow =
434 let read_buffer, read_cstruct = Read_buffer_pool.acquire () in
435 Fun.protect ~finally:(fun () -> Read_buffer_pool.release read_buffer)
436 @@ fun () ->
437 let request_handler reqd =
438 let req = H1.Reqd.request reqd in
439 let h1_body = H1.Reqd.request_body reqd in
440
441 let body_reader =
442 if method_has_no_body req.H1.Request.meth then begin
443 H1.Body.Reader.close h1_body;
444 empty_body_reader ()
445 end
446 else make_body_reader ?max_body_size h1_body
447 in
448
449 let request =
450 {
451 meth = req.H1.Request.meth;
452 target = req.target;
453 headers = req.headers;
454 body_reader;
455 }
456 in
457
458 let handle_request () =
459 match (clock, request_timeout) with
460 | Some clock, Some timeout ->
461 Eio.Time.with_timeout clock timeout (fun () ->
462 let response = handler request in
463 body_reader.close ();
464 Ok response)
465 | _ ->
466 let response = handler request in
467 body_reader.close ();
468 Ok response
469 in
470
471 match handle_request () with
472 | Error `Timeout ->
473 body_reader.close ();
474 respond_408 reqd
475 | exception Body_too_large ->
476 body_reader.close ();
477 respond_413 reqd
478 | Ok response -> (
479 let date_header = ("Date", Date_cache.get ()) in
480 let filter_reserved headers =
481 List.filter
482 (fun (k, _) ->
483 let lk = String.lowercase_ascii k in
484 lk <> "content-length" && lk <> "date")
485 headers
486 in
487
488 match response.response_body with
489 | Body_string body ->
490 let content_length = String.length body in
491 let headers =
492 H1.Headers.of_list
493 (date_header
494 :: ("Content-Length", string_of_int content_length)
495 :: filter_reserved response.headers)
496 in
497 let resp = H1.Response.create ~headers response.status in
498 H1.Reqd.respond_with_string reqd resp body
499 | Body_bigstring bstr ->
500 let content_length = Bigstringaf.length bstr in
501 let headers =
502 H1.Headers.of_list
503 (date_header
504 :: ("Content-Length", string_of_int content_length)
505 :: filter_reserved response.headers)
506 in
507 let resp = H1.Response.create ~headers response.status in
508 H1.Reqd.respond_with_bigstring reqd resp bstr
509 | Body_prebuilt { h1_response; body } ->
510 let headers =
511 H1.Headers.add h1_response.headers "Date" (Date_cache.get ())
512 in
513 let resp = { h1_response with H1.Response.headers } in
514 H1.Reqd.respond_with_bigstring reqd resp body
515 | Body_cached_prebuilt cached ->
516 let resp = get_cached_response cached in
517 H1.Reqd.respond_with_bigstring reqd resp cached.body
518 | Body_stream { content_length; next } ->
519 let headers =
520 match content_length with
521 | Some len ->
522 H1.Headers.of_list
523 (date_header
524 :: ("Content-Length", Int64.to_string len)
525 :: filter_reserved response.headers)
526 | None ->
527 H1.Headers.of_list
528 (date_header
529 :: ("Transfer-Encoding", "chunked")
530 :: filter_reserved response.headers)
531 in
532 let resp = H1.Response.create ~headers response.status in
533 let body_writer = H1.Reqd.respond_with_streaming reqd resp in
534 let rec write_chunks () =
535 match next () with
536 | None -> H1.Body.Writer.close body_writer
537 | Some cs ->
538 H1.Body.Writer.write_bigstring body_writer ~off:0
539 ~len:(Cstruct.length cs) (Cstruct.to_bigarray cs);
540 let flushed, resolve = Eio.Promise.create () in
541 H1.Body.Writer.flush body_writer (fun () ->
542 Eio.Promise.resolve resolve ());
543 Eio.Promise.await flushed;
544 write_chunks ()
545 in
546 write_chunks ())
547 in
548
549 let error_handler ?request:_ _error start_response =
550 let resp_body = start_response H1.Headers.empty in
551 H1.Body.Writer.write_string resp_body "Internal Server Error";
552 H1.Body.Writer.close resp_body
553 in
554
555 let conn = H1.Server_connection.create ~error_handler request_handler in
556
557 let shutdown = ref false in
558
559 let do_read () =
560 match (clock, read_timeout) with
561 | Some clock, Some timeout ->
562 Eio.Time.with_timeout clock timeout (fun () ->
563 Ok (Eio.Flow.single_read flow read_cstruct))
564 | _ -> Ok (Eio.Flow.single_read flow read_cstruct)
565 in
566
567 let rec read_loop () =
568 if not !shutdown then
569 match H1.Server_connection.next_read_operation conn with
570 | `Read -> (
571 match do_read () with
572 | Error `Timeout -> shutdown := true
573 | exception End_of_file ->
574 let _ =
575 H1.Server_connection.read_eof conn read_buffer ~off:0 ~len:0
576 in
577 shutdown := true
578 | Ok n ->
579 let _ =
580 H1.Server_connection.read conn read_buffer ~off:0 ~len:n
581 in
582 read_loop ())
583 | `Yield -> H1.Server_connection.yield_reader conn read_loop
584 | `Close | `Upgrade -> shutdown := true
585 in
586
587 let rec write_loop () =
588 if not !shutdown then
589 match H1.Server_connection.next_write_operation conn with
590 | `Write iovecs ->
591 write_iovecs flow iovecs;
592 let len =
593 List.fold_left
594 (fun acc iov -> acc + iov.Httpun_types.IOVec.len)
595 0 iovecs
596 in
597 H1.Server_connection.report_write_result conn (`Ok len);
598 write_loop ()
599 | `Yield ->
600 let continue = Eio.Promise.create () in
601 H1.Server_connection.yield_writer conn (fun () ->
602 Eio.Promise.resolve (snd continue) ());
603 Eio.Promise.await (fst continue);
604 write_loop ()
605 | `Upgrade -> shutdown := true
606 | `Close _ -> shutdown := true
607 in
608
609 Fiber.both read_loop write_loop
610
611let run ~sw ~net ?clock ?(config = default_config) handler =
612 ensure_gc_tuned ();
613 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in
614 let socket =
615 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr
616 ~reuse_port:config.reuse_port net addr
617 in
618 traceln "Server listening on port %d" config.port;
619 let max_body_size = config.max_body_size in
620 let read_timeout = Some config.read_timeout in
621 let request_timeout = Some config.request_timeout in
622 let connection_handler flow _addr =
623 if config.tcp_nodelay then set_tcp_nodelay flow;
624 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size
625 handler flow
626 in
627 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in
628 Eio.Net.run_server socket connection_handler
629 ~max_connections:config.max_connections ~on_error
630
631let run_parallel ~sw ~net ~domain_mgr ?clock ?(config = default_config) handler
632 =
633 ensure_gc_tuned ();
634 let domain_count = max 1 config.domain_count in
635 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in
636 let socket =
637 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr
638 ~reuse_port:config.reuse_port net addr
639 in
640 traceln "Server listening on port %d (%d domains)" config.port domain_count;
641 let max_body_size = config.max_body_size in
642 let read_timeout = Some config.read_timeout in
643 let request_timeout = Some config.request_timeout in
644 let connection_handler flow _addr =
645 if config.tcp_nodelay then set_tcp_nodelay flow;
646 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size
647 handler flow
648 in
649 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in
650 if domain_count <= 1 then
651 Eio.Net.run_server socket connection_handler
652 ~max_connections:config.max_connections ~on_error
653 else
654 Eio.Net.run_server socket connection_handler
655 ~max_connections:config.max_connections ~on_error
656 ~additional_domains:(domain_mgr, domain_count - 1)