ocaml http/1, http/2 and websocket client and server library
1(** HTTP/2 Server implementation using h2.
2
3 This module provides HTTP/2 server functionality built on the h2 library
4 with Eio for structured concurrency. *)
5
6open Eio.Std
7
8(** {1 Types} *)
9
10type request = {
11 meth : H2.Method.t;
12 target : string;
13 headers : H2.Headers.t;
14 body : string;
15}
16(** Request type exposed to handlers - same as Server.request for compatibility
17*)
18
19(** {1 Response Types} *)
20
21(** Response body - supports string, bigstring, streaming, and pre-built *)
22type response_body =
23 | Body_string of string
24 | Body_bigstring of Bigstringaf.t
25 | Body_prebuilt of { h2_response : H2.Response.t; body : Bigstringaf.t }
26 | Body_stream of {
27 content_length : int64 option;
28 next : unit -> Cstruct.t option;
29 }
30
31type response = {
32 status : H2.Status.t;
33 headers : (string * string) list;
34 response_body : response_body;
35}
36(** Response type with body variants *)
37
38type handler = request -> response
39
40(** {2 Optimized Response Constructors} *)
41
42let respond_opt ?(status = `OK) ?(headers = []) body =
43 { status; headers; response_body = Body_string body }
44
45let respond_bigstring ?(status = `OK) ?(headers = []) bstr =
46 { status; headers; response_body = Body_bigstring bstr }
47
48let respond_stream ?(status = `OK) ?(headers = []) ?content_length next =
49 { status; headers; response_body = Body_stream { content_length; next } }
50
51let respond_prebuilt h2_response body =
52 {
53 status = `OK;
54 headers = [];
55 response_body = Body_prebuilt { h2_response; body };
56 }
57
58let make_h2_headers headers_list = H2.Headers.of_list headers_list
59
60let make_h2_response ?(status = `OK) headers =
61 H2.Response.create ~headers status
62
63(** Lock-free buffer pool using Treiber stack via Kcas for thread-safe pooling.
64*)
65module Read_buffer_pool : sig
66 val acquire : unit -> Bigstringaf.t * Cstruct.t
67 val release : Bigstringaf.t -> unit
68end = struct
69 let buffer_size = 0x4000
70 let max_pooled = 256
71 let pool : Bigstringaf.t list Kcas.Loc.t = Kcas.Loc.make []
72 let pool_size : int Kcas.Loc.t = Kcas.Loc.make 0
73
74 let acquire () =
75 let buf =
76 Kcas.Xt.commit
77 {
78 tx =
79 (fun ~xt ->
80 match Kcas.Xt.get ~xt pool with
81 | [] -> None
82 | buf :: rest ->
83 Kcas.Xt.set ~xt pool rest;
84 Kcas.Xt.set ~xt pool_size (Kcas.Xt.get ~xt pool_size - 1);
85 Some buf);
86 }
87 in
88 let buf =
89 match buf with Some b -> b | None -> Bigstringaf.create buffer_size
90 in
91 (buf, Cstruct.of_bigarray buf ~off:0 ~len:buffer_size)
92
93 let release buf =
94 Kcas.Xt.commit
95 {
96 tx =
97 (fun ~xt ->
98 let size = Kcas.Xt.get ~xt pool_size in
99 if size < max_pooled then begin
100 Kcas.Xt.set ~xt pool (buf :: Kcas.Xt.get ~xt pool);
101 Kcas.Xt.set ~xt pool_size (size + 1)
102 end);
103 }
104end
105
106let set_tcp_nodelay flow =
107 match Eio_unix.Resource.fd_opt flow with
108 | None -> ()
109 | Some fd ->
110 Eio_unix.Fd.use_exn "set_tcp_nodelay" fd (fun unix_fd ->
111 Unix.setsockopt unix_fd Unix.TCP_NODELAY true)
112
113let write_iovecs flow iovecs =
114 let cstructs =
115 List.map
116 (fun iov ->
117 Cstruct.of_bigarray ~off:iov.H2.IOVec.off ~len:iov.H2.IOVec.len
118 iov.H2.IOVec.buffer)
119 iovecs
120 in
121 Eio.Flow.write flow cstructs
122
123(** {1 Connection handling} *)
124
125type body_result = Ok_body of string | Body_too_large | Missing_path
126
127let respond_error reqd status body =
128 let headers =
129 H2.Headers.of_list
130 [
131 ("content-length", string_of_int (String.length body));
132 ("date", Date_cache.get ());
133 ]
134 in
135 let resp = H2.Response.create ~headers status in
136 H2.Reqd.respond_with_string reqd resp body
137
138let handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size
139 handler flow =
140 let read_buffer, read_cstruct = Read_buffer_pool.acquire () in
141 Fun.protect ~finally:(fun () -> Read_buffer_pool.release read_buffer)
142 @@ fun () ->
143 let request_handler reqd =
144 let req = H2.Reqd.request reqd in
145 let body_reader = H2.Reqd.request_body reqd in
146
147 let body_result =
148 match req.meth with
149 | `GET | `HEAD ->
150 H2.Body.Reader.close body_reader;
151 Ok_body ""
152 | `POST | `PUT | `DELETE | `CONNECT | `OPTIONS | `TRACE | `Other _ ->
153 let body_buffer = Buffer.create 4096 in
154 let current_size = ref 0 in
155 let too_large = ref false in
156 let body_done_promise, body_done_resolver = Eio.Promise.create () in
157 let rec read_body () =
158 H2.Body.Reader.schedule_read body_reader
159 ~on_eof:(fun () -> Eio.Promise.resolve body_done_resolver ())
160 ~on_read:(fun buf ~off ~len ->
161 let new_size = !current_size + len in
162 match max_body_size with
163 | Some max when Int64.of_int new_size > max ->
164 too_large := true;
165 H2.Body.Reader.close body_reader;
166 Eio.Promise.resolve body_done_resolver ()
167 | _ ->
168 current_size := new_size;
169 Buffer.add_string body_buffer
170 (Bigstringaf.substring buf ~off ~len);
171 read_body ())
172 in
173 read_body ();
174 Eio.Promise.await body_done_promise;
175 if !too_large then Body_too_large
176 else Ok_body (Buffer.contents body_buffer)
177 in
178
179 match body_result with
180 | Body_too_large ->
181 respond_error reqd (`Code 413) "Request Entity Too Large"
182 | Missing_path -> respond_error reqd `Bad_request "Missing :path header"
183 | Ok_body body -> (
184 let target =
185 match H2.Headers.get req.headers ":path" with
186 | Some p -> Some p
187 | None -> None
188 in
189 match target with
190 | None -> respond_error reqd `Bad_request "Missing :path header"
191 | Some target -> (
192 let request =
193 { meth = req.meth; target; headers = req.headers; body }
194 in
195 let handler_result =
196 match (clock, request_timeout) with
197 | Some clock, Some timeout ->
198 Eio.Time.with_timeout clock timeout (fun () ->
199 Ok (handler request))
200 | _ -> Ok (handler request)
201 in
202 match handler_result with
203 | Error `Timeout -> respond_error reqd (`Code 408) "Request Timeout"
204 | Ok response -> (
205 let date_header = ("date", Date_cache.get ()) in
206 match response.response_body with
207 | Body_string body ->
208 let headers =
209 H2.Headers.of_list
210 (date_header
211 :: ("content-length", string_of_int (String.length body))
212 :: response.headers)
213 in
214 let resp = H2.Response.create ~headers response.status in
215 H2.Reqd.respond_with_string reqd resp body
216 | Body_bigstring bstr ->
217 let headers =
218 H2.Headers.of_list
219 (date_header
220 :: ( "content-length",
221 string_of_int (Bigstringaf.length bstr) )
222 :: response.headers)
223 in
224 let resp = H2.Response.create ~headers response.status in
225 H2.Reqd.respond_with_bigstring reqd resp bstr
226 | Body_prebuilt { h2_response; body } ->
227 let headers =
228 H2.Headers.add h2_response.H2.Response.headers "date"
229 (Date_cache.get ())
230 in
231 let resp = { h2_response with H2.Response.headers } in
232 H2.Reqd.respond_with_bigstring reqd resp body
233 | Body_stream { content_length; next } ->
234 let headers =
235 match content_length with
236 | Some len ->
237 H2.Headers.of_list
238 (date_header
239 :: ("content-length", Int64.to_string len)
240 :: response.headers)
241 | None ->
242 H2.Headers.of_list (date_header :: response.headers)
243 in
244 let resp = H2.Response.create ~headers response.status in
245 let body_writer =
246 H2.Reqd.respond_with_streaming reqd resp
247 in
248 let rec write_chunks () =
249 match next () with
250 | None -> H2.Body.Writer.close body_writer
251 | Some cs ->
252 H2.Body.Writer.write_bigstring body_writer ~off:0
253 ~len:(Cstruct.length cs) (Cstruct.to_bigarray cs);
254 let flushed, resolve = Eio.Promise.create () in
255 H2.Body.Writer.flush body_writer (fun _result ->
256 Eio.Promise.resolve resolve ());
257 Eio.Promise.await flushed;
258 write_chunks ()
259 in
260 write_chunks ())))
261 in
262
263 let error_handler ?request:_ _error start_response =
264 let resp_body =
265 start_response (H2.Headers.of_list [ ("date", Date_cache.get ()) ])
266 in
267 H2.Body.Writer.write_string resp_body "Internal Server Error";
268 H2.Body.Writer.close resp_body
269 in
270
271 let conn = H2.Server_connection.create ~error_handler request_handler in
272
273 let shutdown = ref false in
274
275 let do_read () =
276 match (clock, read_timeout) with
277 | Some clock, Some timeout ->
278 Eio.Time.with_timeout clock timeout (fun () ->
279 Ok (Eio.Flow.single_read flow read_cstruct))
280 | _ -> Ok (Eio.Flow.single_read flow read_cstruct)
281 in
282
283 let read_loop () =
284 let rec loop () =
285 if not !shutdown then
286 match H2.Server_connection.next_read_operation conn with
287 | `Read -> (
288 match do_read () with
289 | Error `Timeout -> shutdown := true
290 | exception End_of_file ->
291 let _ =
292 H2.Server_connection.read_eof conn read_buffer ~off:0 ~len:0
293 in
294 shutdown := true
295 | Ok n ->
296 let _ =
297 H2.Server_connection.read conn read_buffer ~off:0 ~len:n
298 in
299 loop ())
300 | `Close -> shutdown := true
301 in
302 loop ()
303 in
304
305 let write_loop () =
306 let rec loop () =
307 if not !shutdown then
308 match H2.Server_connection.next_write_operation conn with
309 | `Write iovecs ->
310 write_iovecs flow iovecs;
311 let len =
312 List.fold_left (fun acc iov -> acc + iov.H2.IOVec.len) 0 iovecs
313 in
314 H2.Server_connection.report_write_result conn (`Ok len);
315 loop ()
316 | `Yield ->
317 let continue = Eio.Promise.create () in
318 H2.Server_connection.yield_writer conn (fun () ->
319 Eio.Promise.resolve (snd continue) ());
320 Eio.Promise.await (fst continue);
321 loop ()
322 | `Close _ -> shutdown := true
323 in
324 loop ()
325 in
326
327 Fiber.both read_loop write_loop
328
329(** {1 Public API} *)
330
331let run ~sw ~net ?clock ?(config = H1_server.default_config) handler =
332 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in
333 let socket =
334 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr
335 ~reuse_port:config.reuse_port net addr
336 in
337 traceln "HTTP/2 Server listening on port %d" config.port;
338 let max_body_size = config.max_body_size in
339 let read_timeout = Some config.read_timeout in
340 let request_timeout = Some config.request_timeout in
341 let connection_handler flow _addr =
342 if config.tcp_nodelay then set_tcp_nodelay flow;
343 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size
344 handler flow
345 in
346 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in
347 Eio.Net.run_server socket connection_handler
348 ~max_connections:config.max_connections ~on_error
349
350let run_tls ~sw ~net ?clock ?(config = H1_server.default_config) ~tls_config
351 handler =
352 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in
353 let socket =
354 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr
355 ~reuse_port:config.reuse_port net addr
356 in
357 traceln "HTTP/2 Server (TLS) listening on port %d" config.port;
358 let max_body_size = config.max_body_size in
359 let read_timeout = Some config.read_timeout in
360 let request_timeout = Some config.request_timeout in
361 let connection_handler flow _addr =
362 if config.tcp_nodelay then set_tcp_nodelay flow;
363 match Tls_config.Server.to_tls_config tls_config with
364 | Error (`Msg msg) -> traceln "TLS config error: %s" msg
365 | Ok tls_cfg -> (
366 try
367 let tls_flow = Tls_eio.server_of_flow tls_cfg flow in
368 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size
369 handler tls_flow
370 with
371 | Tls_eio.Tls_failure failure ->
372 traceln "TLS error: %s" (Tls_config.failure_to_string failure)
373 | exn -> traceln "TLS error: %s" (Printexc.to_string exn))
374 in
375 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in
376 Eio.Net.run_server socket connection_handler
377 ~max_connections:config.max_connections ~on_error
378
379let run_parallel ~sw ~net ~domain_mgr ?clock
380 ?(config = H1_server.default_config) handler =
381 let domain_count = max 1 config.domain_count in
382 let addr = `Tcp (Eio.Net.Ipaddr.V4.any, config.port) in
383 let socket =
384 Eio.Net.listen ~sw ~backlog:config.backlog ~reuse_addr:config.reuse_addr
385 ~reuse_port:config.reuse_port net addr
386 in
387 traceln "HTTP/2 Server listening on port %d (%d domains)" config.port
388 domain_count;
389 let max_body_size = config.max_body_size in
390 let read_timeout = Some config.read_timeout in
391 let request_timeout = Some config.request_timeout in
392 let connection_handler flow _addr =
393 if config.tcp_nodelay then set_tcp_nodelay flow;
394 handle_connection ?clock ?read_timeout ?request_timeout ?max_body_size
395 handler flow
396 in
397 let on_error exn = traceln "Connection error: %s" (Printexc.to_string exn) in
398 if domain_count <= 1 then
399 Eio.Net.run_server socket connection_handler
400 ~max_connections:config.max_connections ~on_error
401 else
402 Eio.Net.run_server socket connection_handler
403 ~max_connections:config.max_connections ~on_error
404 ~additional_domains:(domain_mgr, domain_count - 1)