ocaml http/1, http/2 and websocket client and server library
1(** HTTP/1.1 Client implementation using h1.
2
3 This module provides HTTP/1.1 client functionality built on Eio with
4 connection pooling for high performance. *)
5
6(** {1 Configuration} *)
7
8type config = {
9 (* Timeouts *)
10 connect_timeout : float; (** Connection timeout in seconds. Default: 30.0 *)
11 read_timeout : float; (** Read timeout in seconds. Default: 30.0 *)
12 write_timeout : float; (** Write timeout in seconds. Default: 30.0 *)
13 (* Behavior *)
14 follow_redirects : int option;
15 (** Max redirects to follow. None = don't follow. Default: Some 10 *)
16 (* Buffers *)
17 buffer_size : int; (** Read buffer size. Default: 16384 *)
18 max_response_body : int64 option;
19 (** Max response body size. None = unlimited. Default: None *)
20 (* TLS *)
21 tls : Tls_config.Client.t; (** TLS configuration *)
22 (* Headers *)
23 default_headers : (string * string) list;
24 (** Headers to add to every request *)
25 (* Pool settings *)
26 pool : Pool.config; (** Connection pool configuration *)
27}
28(** Client configuration *)
29
30let default_config =
31 {
32 connect_timeout = 30.0;
33 read_timeout = 30.0;
34 write_timeout = 30.0;
35 follow_redirects = Some 10;
36 buffer_size = 16384;
37 max_response_body = None;
38 tls = Tls_config.Client.default;
39 default_headers = [ ("User-Agent", "hcs/" ^ Build_info.version) ];
40 pool = Pool.default_config;
41 }
42
43(** {2 Config builders} *)
44
45let with_timeout timeout config =
46 { config with connect_timeout = timeout; read_timeout = timeout }
47
48let with_connect_timeout timeout config =
49 { config with connect_timeout = timeout }
50
51let with_read_timeout timeout config = { config with read_timeout = timeout }
52let with_write_timeout timeout config = { config with write_timeout = timeout }
53
54let with_redirects max_redirects config =
55 { config with follow_redirects = Some max_redirects }
56
57let without_redirects config = { config with follow_redirects = None }
58let with_buffer_size size config = { config with buffer_size = size }
59
60let with_max_response_body max_size config =
61 { config with max_response_body = Some max_size }
62
63let with_tls tls config = { config with tls }
64let with_insecure_tls config = { config with tls = Tls_config.Client.insecure }
65
66let with_default_header name value config =
67 { config with default_headers = (name, value) :: config.default_headers }
68
69let with_default_headers headers config =
70 { config with default_headers = headers @ config.default_headers }
71
72let with_pool_config pool config = { config with pool }
73
74let with_max_connections max_conn config =
75 {
76 config with
77 pool = { config.pool with max_connections_per_host = max_conn };
78 }
79
80let with_idle_timeout timeout config =
81 { config with pool = { config.pool with idle_timeout = timeout } }
82
83(** Error type for client operations *)
84type error =
85 | Connection_failed of string
86 | Tls_error of string
87 | Timeout
88 | Invalid_response of string
89 | Too_many_redirects
90
91type response = { status : H1.Status.t; headers : H1.Headers.t; body : string }
92(** Response type *)
93
94(** {1 Connection type for pooling} *)
95
96type conn = {
97 flow : Eio.Flow.two_way_ty Eio.Std.r;
98 read_buffer : Bigstringaf.t;
99 mutable alive : bool;
100}
101(** Internal connection state *)
102
103(** {1 Client type} *)
104
105type t = {
106 config : config;
107 pool : conn Pool.t;
108 mutex : Eio.Mutex.t;
109 (* Functions that capture the Eio environment *)
110 connect : host:string -> port:int -> Eio.Flow.two_way_ty Eio.Std.r;
111 resolve : string -> bool; (* returns true if host can be resolved *)
112 now : unit -> float;
113 with_timeout : 'a. float -> (unit -> 'a) -> 'a option;
114}
115(** HTTP/1.1 client with connection pooling. Uses closures to capture Eio
116 environment. *)
117
118(** {1 Internal helpers} *)
119
120(** Write all IOVecs to the flow *)
121let write_iovecs flow iovecs =
122 let cstructs =
123 List.map
124 (fun iov ->
125 Cstruct.of_bigarray ~off:iov.Httpun_types.IOVec.off
126 ~len:iov.Httpun_types.IOVec.len iov.Httpun_types.IOVec.buffer)
127 iovecs
128 in
129 Eio.Flow.write flow cstructs
130
131(** Read from flow into bigstring buffer *)
132let read_into_bigstring flow buf ~off ~len =
133 let cs = Cstruct.of_bigarray ~off ~len buf in
134 try
135 let n = Eio.Flow.single_read flow cs in
136 `Ok n
137 with End_of_file -> `Eof
138
139(** Close a connection safely *)
140let close_conn c =
141 if c.alive then begin
142 c.alive <- false;
143 (* Use Obj.magic to close - the flow supports close even if type doesn't show it *)
144 try Eio.Flow.close (Obj.magic c.flow) with _ -> ()
145 end
146
147(** Perform an HTTP/1.1 request on a connected flow *)
148let do_request ?(request_body = "") conn req =
149 let flow = conn.flow in
150 (* Set up response handling *)
151 let response_received = Eio.Promise.create () in
152 let body_buffer = Buffer.create 4096 in
153 let resolved = ref false in
154 let keep_alive = ref true in
155
156 let resolve_once result =
157 if not !resolved then begin
158 resolved := true;
159 Eio.Promise.resolve (snd response_received) result
160 end
161 in
162
163 let response_handler resp body_reader =
164 (* Check Connection header for keep-alive *)
165 (match H1.Headers.get resp.H1.Response.headers "connection" with
166 | Some v when String.lowercase_ascii v = "close" -> keep_alive := false
167 | _ -> ());
168
169 let rec read_body () =
170 H1.Body.Reader.schedule_read body_reader
171 ~on_eof:(fun () ->
172 let body = Buffer.contents body_buffer in
173 resolve_once
174 (Ok
175 ( {
176 status = resp.H1.Response.status;
177 headers = resp.headers;
178 body;
179 },
180 !keep_alive )))
181 ~on_read:(fun buf ~off ~len ->
182 Buffer.add_string body_buffer (Bigstringaf.substring buf ~off ~len);
183 read_body ())
184 in
185 read_body ()
186 in
187
188 let error_handler err =
189 keep_alive := false;
190 conn.alive <- false;
191 let msg =
192 match err with
193 | `Malformed_response s -> s
194 | `Invalid_response_body_length _ -> "Invalid response body length"
195 | `Exn exn -> Printexc.to_string exn
196 in
197 resolve_once (Error (Invalid_response msg))
198 in
199
200 (* Create the client connection *)
201 let body_writer, h1_conn =
202 H1.Client_connection.request req ~error_handler ~response_handler
203 in
204
205 (* Write request body if provided, then close *)
206 if String.length request_body > 0 then begin
207 H1.Body.Writer.write_string body_writer request_body;
208 H1.Body.Writer.flush body_writer (fun () -> ())
209 end;
210 H1.Body.Writer.close body_writer;
211
212 (* Buffer for reading - track unconsumed bytes between reads *)
213 let read_buffer = conn.read_buffer in
214 let read_buffer_size = Bigstringaf.length read_buffer in
215 let unconsumed = ref 0 in
216
217 (* Connection loop - handle both read and write operations *)
218 let rec loop () =
219 (* First, handle any pending writes *)
220 let write_done =
221 match H1.Client_connection.next_write_operation h1_conn with
222 | `Write iovecs ->
223 write_iovecs flow iovecs;
224 let len =
225 List.fold_left
226 (fun acc iov -> acc + iov.Httpun_types.IOVec.len)
227 0 iovecs
228 in
229 H1.Client_connection.report_write_result h1_conn (`Ok len);
230 false
231 | `Yield -> true
232 | `Close _ -> true
233 in
234
235 (* Then handle reads *)
236 let read_done =
237 match H1.Client_connection.next_read_operation h1_conn with
238 | `Read -> (
239 let available = read_buffer_size - !unconsumed in
240 match
241 read_into_bigstring flow read_buffer ~off:!unconsumed ~len:available
242 with
243 | `Ok n ->
244 let total = !unconsumed + n in
245 let consumed =
246 H1.Client_connection.read h1_conn read_buffer ~off:0 ~len:total
247 in
248 (* Shift unconsumed bytes to start of buffer *)
249 let remaining = total - consumed in
250 if remaining > 0 && consumed > 0 then
251 Bigstringaf.blit read_buffer ~src_off:consumed read_buffer
252 ~dst_off:0 ~len:remaining;
253 unconsumed := remaining;
254 false
255 | `Eof ->
256 conn.alive <- false;
257 let _ =
258 H1.Client_connection.read_eof h1_conn read_buffer ~off:0
259 ~len:!unconsumed
260 in
261 true)
262 | `Close -> true
263 in
264
265 (* Continue until both read and write are done *)
266 if not (write_done && read_done) then loop ()
267 in
268
269 (try loop () with _ -> conn.alive <- false);
270 Eio.Promise.await (fst response_received)
271
272let create_connection t ~host ~port ~is_https =
273 if not (t.resolve host) then
274 Error (Connection_failed ("Cannot resolve host: " ^ host))
275 else
276 try
277 let tcp_flow = t.connect ~host ~port in
278 let flow =
279 if is_https then
280 match Tls_config.Client.to_tls_config t.config.tls ~host with
281 | Error msg -> raise (Failure msg)
282 | Ok tls_config ->
283 let host_domain =
284 match Domain_name.of_string host with
285 | Ok dn -> (
286 match Domain_name.host dn with
287 | Ok h -> Some h
288 | Error _ -> None)
289 | Error _ -> None
290 in
291 let tls_flow =
292 Tls_eio.client_of_flow tls_config ?host:host_domain
293 (Obj.magic tcp_flow)
294 in
295 (Obj.magic tls_flow : Eio.Flow.two_way_ty Eio.Std.r)
296 else tcp_flow
297 in
298 Ok
299 {
300 flow;
301 read_buffer = Bigstringaf.create t.config.buffer_size;
302 alive = true;
303 }
304 with
305 | Tls_eio.Tls_failure failure ->
306 Error (Tls_error (Tls_config.failure_to_string failure))
307 | Failure msg -> Error (Tls_error msg)
308 | exn -> Error (Connection_failed (Printexc.to_string exn))
309
310(** Acquire a connection from pool or create new one *)
311let acquire_connection t ~host ~port ~is_https =
312 let key = Pool.make_key ~host ~port ~tls:is_https in
313 Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
314 (* Try to get idle connection from pool *)
315 match Pool.acquire t.pool key ~now:(t.now ()) with
316 | Some conn when conn.alive -> Ok conn
317 | Some conn ->
318 (* Dead connection, remove and create new *)
319 Pool.remove t.pool key conn;
320 create_connection t ~host ~port ~is_https
321 | None ->
322 (* No idle connection, create new *)
323 create_connection t ~host ~port ~is_https)
324
325(** Release connection back to pool *)
326let release_connection t ~host ~port ~is_https conn ~keep_alive =
327 let key = Pool.make_key ~host ~port ~tls:is_https in
328 Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
329 if keep_alive && conn.alive then begin
330 (* Try to add back to pool *)
331 if not (Pool.add t.pool key conn ~now:(t.now ())) then
332 (* Pool full, just mark for release *)
333 Pool.release t.pool key conn ~now:(t.now ())
334 end
335 else begin
336 (* Connection not reusable *)
337 Pool.remove t.pool key conn;
338 close_conn conn
339 end)
340
341(** {1 Public API} *)
342
343(** Create a new HTTP/1.1 client with connection pooling *)
344let create ~sw ~net ~clock ?(config = default_config) () =
345 let connect ~host ~port =
346 let addrs = Eio.Net.getaddrinfo_stream net host in
347 match addrs with
348 | [] -> failwith ("Cannot resolve host: " ^ host)
349 | addr_info :: _ ->
350 let addr =
351 match addr_info with
352 | `Tcp (ip, _) -> `Tcp (ip, port)
353 | `Unix p -> `Unix p
354 in
355 let flow = Eio.Net.connect ~sw net addr in
356 (flow :> Eio.Flow.two_way_ty Eio.Std.r)
357 in
358 let resolve host =
359 match Eio.Net.getaddrinfo_stream net host with
360 | _ :: _ -> true
361 | [] -> false
362 in
363 let now () = Eio.Time.now clock in
364 let with_timeout timeout f =
365 try Some (Eio.Time.with_timeout_exn clock timeout f)
366 with Eio.Time.Timeout -> None
367 in
368 {
369 config;
370 pool = Pool.create ~config:config.pool ();
371 mutex = Eio.Mutex.create ();
372 connect;
373 resolve;
374 now;
375 with_timeout;
376 }
377
378(** Close all connections and cleanup *)
379let close t =
380 Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
381 Pool.close_all t.pool ~close:close_conn)
382
383let merge_headers ~default_headers ~headers =
384 let key name = String.lowercase_ascii name in
385 let overridden = Hashtbl.create 8 in
386 List.iter
387 (fun (name, _value) -> Hashtbl.replace overridden (key name) ())
388 headers;
389 let default_headers =
390 List.filter
391 (fun (name, _value) -> not (Hashtbl.mem overridden (key name)))
392 default_headers
393 in
394 default_headers @ headers
395
396(** Perform a GET request using pooled connections *)
397let get t url =
398 let uri = Uri.of_string url in
399 let scheme = Uri.scheme uri |> Option.value ~default:"http" in
400 let is_https = String.equal scheme "https" in
401 let host = Uri.host uri |> Option.value ~default:"localhost" in
402 let default_port = if is_https then 443 else 80 in
403 let port = Uri.port uri |> Option.value ~default:default_port in
404 let path = Uri.path_and_query uri in
405 let path = if path = "" then "/" else path in
406
407 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in
408 match
409 t.with_timeout total_timeout (fun () ->
410 match acquire_connection t ~host ~port ~is_https with
411 | Error e -> Error e
412 | Ok conn -> (
413 let base_headers =
414 [ ("Host", host); ("Connection", "keep-alive") ]
415 in
416 let headers =
417 if t.config.default_headers = [] then base_headers
418 else if t.config.default_headers = default_config.default_headers
419 then t.config.default_headers @ base_headers
420 else
421 merge_headers ~default_headers:t.config.default_headers
422 ~headers:base_headers
423 in
424 let req =
425 H1.Request.create ~headers:(H1.Headers.of_list headers) `GET path
426 in
427 match do_request conn req with
428 | Ok (resp, keep_alive) ->
429 release_connection t ~host ~port ~is_https conn ~keep_alive;
430 Ok resp
431 | Error e ->
432 release_connection t ~host ~port ~is_https conn
433 ~keep_alive:false;
434 Error e))
435 with
436 | Some result -> result
437 | None -> Error Timeout
438
439(** Perform a POST request using pooled connections *)
440let post t url ~body:request_body =
441 let uri = Uri.of_string url in
442 let scheme = Uri.scheme uri |> Option.value ~default:"http" in
443 let is_https = String.equal scheme "https" in
444 let host = Uri.host uri |> Option.value ~default:"localhost" in
445 let default_port = if is_https then 443 else 80 in
446 let port = Uri.port uri |> Option.value ~default:default_port in
447 let path = Uri.path_and_query uri in
448 let path = if path = "" then "/" else path in
449
450 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in
451 match
452 t.with_timeout total_timeout (fun () ->
453 match acquire_connection t ~host ~port ~is_https with
454 | Error e -> Error e
455 | Ok conn -> (
456 let content_length = String.length request_body in
457 let base_headers =
458 [
459 ("Host", host);
460 ("Connection", "keep-alive");
461 ("Content-Length", string_of_int content_length);
462 ]
463 in
464 let headers =
465 if t.config.default_headers = [] then base_headers
466 else if t.config.default_headers = default_config.default_headers
467 then t.config.default_headers @ base_headers
468 else
469 merge_headers ~default_headers:t.config.default_headers
470 ~headers:base_headers
471 in
472 let req =
473 H1.Request.create ~headers:(H1.Headers.of_list headers) `POST path
474 in
475 match do_request ~request_body conn req with
476 | Ok (resp, keep_alive) ->
477 release_connection t ~host ~port ~is_https conn ~keep_alive;
478 Ok resp
479 | Error e ->
480 release_connection t ~host ~port ~is_https conn
481 ~keep_alive:false;
482 Error e))
483 with
484 | Some result -> result
485 | None -> Error Timeout
486
487(** {1 Backward-compatible stateless API} *)
488
489(** Perform a GET request (creates temporary client, no pooling benefit) *)
490let get' ~sw ~net ~clock ?(config = default_config) url =
491 let t = create ~sw ~net ~clock ~config () in
492 let result = get t url in
493 close t;
494 result
495
496(** Perform a POST request (creates temporary client, no pooling benefit) *)
497let post' ~sw ~net ~clock ?(config = default_config) url ~body =
498 let t = create ~sw ~net ~clock ~config () in
499 let result = post t url ~body in
500 close t;
501 result