ocaml http/1, http/2 and websocket client and server library
1(** HTTP/2 Client implementation using h2.
2
3 This module provides HTTP/2 client functionality built on the h2 library
4 with Eio for structured concurrency and connection pooling. *)
5
6open Eio.Std
7
8(** {1 Types} *)
9
10type error =
11 | Connection_failed of string
12 | Tls_error of string
13 | Protocol_error of string
14 | Timeout
15 | Invalid_response of string
16
17type response = { status : H2.Status.t; headers : H2.Headers.t; body : string }
18
19type config = {
20 connect_timeout : float;
21 read_timeout : float;
22 buffer_size : int;
23 default_headers : (string * string) list;
24 pool : Pool.config;
25}
26
27let default_config =
28 {
29 connect_timeout = 30.0;
30 read_timeout = 30.0;
31 buffer_size = 0x4000;
32 default_headers = [ ("User-Agent", "hcs/" ^ Build_info.version) ];
33 pool = Pool.default_config;
34 }
35
36let default_default_headers =
37 List.map
38 (fun (k, v) -> (String.lowercase_ascii k, v))
39 default_config.default_headers
40
41let with_timeout timeout config =
42 { config with connect_timeout = timeout; read_timeout = timeout }
43
44let with_default_header name value config =
45 { config with default_headers = (name, value) :: config.default_headers }
46
47let with_default_headers headers config =
48 { config with default_headers = headers @ config.default_headers }
49
50let with_pool_config pool config = { config with pool }
51
52(** {1 Internal connection type} *)
53
54type conn = { flow : Eio.Flow.two_way_ty r; mutable alive : bool }
55
56(** {1 Client type} *)
57
58type t = {
59 config : config;
60 pool : conn Pool.t;
61 mutex : Eio.Mutex.t;
62 connect : host:string -> port:int -> is_https:bool -> Eio.Flow.two_way_ty r;
63 resolve : string -> bool;
64 now : unit -> float;
65 with_timeout : 'a. float -> (unit -> 'a) -> 'a option;
66}
67
68(** {1 Internal helpers} *)
69
70let writev flow iovecs =
71 let lenv, cstructs =
72 List.fold_left_map
73 (fun acc iov ->
74 let len = iov.H2.IOVec.len in
75 let cs =
76 Cstruct.of_bigarray ~off:iov.H2.IOVec.off ~len iov.H2.IOVec.buffer
77 in
78 (acc + len, cs))
79 0 iovecs
80 in
81 match Eio.Flow.write flow cstructs with
82 | () -> `Ok lenv
83 | exception End_of_file -> `Closed
84
85let shutdown flow cmd =
86 try Eio.Flow.shutdown flow cmd with
87 | Unix.Unix_error (Unix.ENOTCONN, _, _) -> ()
88 | Eio.Io (Eio.Net.E (Eio.Net.Connection_reset _), _) -> ()
89
90module Read_buffer : sig
91 type t
92
93 val create : int -> t
94 val read : t -> _ Eio.Flow.source -> int
95 val get : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int
96end = struct
97 type t = {
98 buffer : Bigstringaf.t;
99 mutable off : int;
100 mutable len : int;
101 cap : int;
102 }
103
104 let create size =
105 { buffer = Bigstringaf.create size; off = 0; len = 0; cap = size }
106
107 let compress t =
108 if t.len = 0 then begin
109 t.off <- 0;
110 t.len <- 0
111 end
112 else if t.off > 0 then begin
113 Bigstringaf.blit t.buffer ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len;
114 t.off <- 0
115 end
116
117 let read t flow =
118 compress t;
119 let off = t.off + t.len in
120 let available = t.cap - t.len - t.off in
121 if available > 0 then begin
122 let cs = Cstruct.of_bigarray t.buffer ~off ~len:available in
123 let n = Eio.Flow.single_read flow cs in
124 t.len <- t.len + n;
125 n
126 end
127 else 0
128
129 let get t ~f =
130 let n = f t.buffer ~off:t.off ~len:t.len in
131 t.off <- t.off + n;
132 t.len <- t.len - n;
133 if t.len = 0 then t.off <- 0;
134 n
135end
136
137let close_conn c =
138 if c.alive then begin
139 c.alive <- false;
140 try Eio.Flow.close (Obj.magic c.flow) with _ -> ()
141 end
142
143let do_request ?(body = "") flow req =
144 let response_received = Eio.Promise.create () in
145 let body_buffer = Buffer.create 4096 in
146 let resolved = ref false in
147
148 let resolve_once result =
149 if not !resolved then begin
150 resolved := true;
151 Eio.Promise.resolve (snd response_received) result
152 end
153 in
154
155 let response_handler resp body_reader =
156 let rec read_body () =
157 H2.Body.Reader.schedule_read body_reader
158 ~on_eof:(fun () ->
159 let body = Buffer.contents body_buffer in
160 resolve_once
161 (Ok
162 {
163 status = resp.H2.Response.status;
164 headers = resp.headers;
165 body;
166 }))
167 ~on_read:(fun buf ~off ~len ->
168 Buffer.add_string body_buffer (Bigstringaf.substring buf ~off ~len);
169 read_body ())
170 in
171 read_body ()
172 in
173
174 let error_handler err =
175 let msg =
176 match err with
177 | `Malformed_response s -> "Malformed response: " ^ s
178 | `Invalid_response_body_length _ -> "Invalid response body length"
179 | `Protocol_error (code, msg) ->
180 let code = H2.Error_code.to_string code in
181 Buf.build64 (fun b ->
182 Buf.string b "Protocol error ";
183 Buf.string b code;
184 Buf.string b ": ";
185 Buf.string b msg)
186 | `Exn exn -> Printexc.to_string exn
187 in
188 resolve_once (Error (Invalid_response msg))
189 in
190
191 let conn = H2.Client_connection.create ~error_handler () in
192
193 let request_sent = ref false in
194 let send_request () =
195 if not !request_sent then begin
196 request_sent := true;
197 let body_writer =
198 H2.Client_connection.request conn req ~flush_headers_immediately:true
199 ~error_handler ~response_handler
200 in
201 if String.length body > 0 then begin
202 H2.Body.Writer.write_string body_writer body;
203 H2.Body.Writer.flush body_writer (fun _result ->
204 H2.Body.Writer.close body_writer)
205 end
206 else H2.Body.Writer.close body_writer
207 end
208 in
209
210 let read_buffer = Read_buffer.create 0x4000 in
211 let request_queued = ref false in
212
213 let read_loop () =
214 let rec read_loop_step () =
215 match H2.Client_connection.next_read_operation conn with
216 | `Read -> (
217 match Read_buffer.read read_buffer flow with
218 | _n ->
219 let _consumed =
220 Read_buffer.get read_buffer ~f:(fun buf ~off ~len ->
221 H2.Client_connection.read conn buf ~off ~len)
222 in
223 read_loop_step ()
224 | exception End_of_file ->
225 let _ =
226 Read_buffer.get read_buffer ~f:(fun buf ~off ~len ->
227 H2.Client_connection.read_eof conn buf ~off ~len)
228 in
229 ())
230 | `Yield ->
231 let p, u = Eio.Promise.create () in
232 H2.Client_connection.yield_reader conn (fun () ->
233 Eio.Promise.resolve u ());
234 Eio.Promise.await p;
235 read_loop_step ()
236 | `Close -> shutdown flow `Receive
237 in
238 try read_loop_step () with exn -> H2.Client_connection.report_exn conn exn
239 in
240
241 let write_loop () =
242 let rec loop () =
243 match H2.Client_connection.next_write_operation conn with
244 | `Write iovecs -> (
245 match writev flow iovecs with
246 | `Ok len ->
247 H2.Client_connection.report_write_result conn (`Ok len);
248 if not !request_queued then begin
249 request_queued := true;
250 send_request ()
251 end;
252 loop ()
253 | `Closed -> H2.Client_connection.report_write_result conn `Closed)
254 | `Yield ->
255 let p, u = Eio.Promise.create () in
256 H2.Client_connection.yield_writer conn (fun () ->
257 Eio.Promise.resolve u ());
258 Eio.Promise.await p;
259 loop ()
260 | `Close _ -> shutdown flow `Send
261 in
262 try loop () with exn -> H2.Client_connection.report_exn conn exn
263 in
264
265 let io_loops () =
266 Eio.Fiber.both read_loop write_loop;
267 Error (Protocol_error "Connection closed before response")
268 in
269
270 let wait_for_response () =
271 let result = Eio.Promise.await (fst response_received) in
272 H2.Client_connection.shutdown conn;
273 result
274 in
275
276 Eio.Fiber.any [ io_loops; wait_for_response ]
277
278let create_connection t ~host ~port ~is_https =
279 if not (t.resolve host) then
280 Error (Connection_failed ("Cannot resolve host: " ^ host))
281 else
282 try
283 let flow = t.connect ~host ~port ~is_https in
284 Ok { flow; alive = true }
285 with
286 | Tls_eio.Tls_failure failure ->
287 Error (Tls_error (Tls_config.failure_to_string failure))
288 | exn -> Error (Connection_failed (Printexc.to_string exn))
289
290let acquire_connection t ~host ~port ~is_https =
291 let key = Pool.make_key ~host ~port ~tls:is_https in
292 Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
293 match Pool.acquire t.pool key ~now:(t.now ()) with
294 | Some conn when conn.alive -> Ok conn
295 | Some conn ->
296 Pool.remove t.pool key conn;
297 create_connection t ~host ~port ~is_https
298 | None -> create_connection t ~host ~port ~is_https)
299
300let release_connection t ~host ~port ~is_https conn ~keep_alive =
301 let key = Pool.make_key ~host ~port ~tls:is_https in
302 Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
303 if keep_alive && conn.alive then begin
304 if not (Pool.add t.pool key conn ~now:(t.now ())) then
305 Pool.release t.pool key conn ~now:(t.now ())
306 end
307 else begin
308 Pool.remove t.pool key conn;
309 close_conn conn
310 end)
311
312(** {1 Public API} *)
313
314let create ~sw ~net ~clock ?(config = default_config) () =
315 let connect ~host ~port ~is_https =
316 let addrs = Eio.Net.getaddrinfo_stream net host in
317 match addrs with
318 | [] -> failwith ("Cannot resolve host: " ^ host)
319 | addr_info :: _ ->
320 let addr =
321 match addr_info with
322 | `Tcp (ip, _) -> `Tcp (ip, port)
323 | `Unix p -> `Unix p
324 in
325 let tcp_flow = Eio.Net.connect ~sw net addr in
326 if is_https then begin
327 let h2_tls = Tls_config.Client.h2 in
328 match Tls_config.Client.to_tls_config h2_tls ~host with
329 | Error msg -> failwith msg
330 | Ok tls_config ->
331 let host_domain =
332 match Domain_name.of_string host with
333 | Ok dn -> (
334 match Domain_name.host dn with
335 | Ok h -> Some h
336 | Error _ -> None)
337 | Error _ -> None
338 in
339 let tls_flow =
340 Tls_eio.client_of_flow tls_config ?host:host_domain
341 (Obj.magic tcp_flow)
342 in
343 (Obj.magic tls_flow : Eio.Flow.two_way_ty r)
344 end
345 else (tcp_flow :> Eio.Flow.two_way_ty r)
346 in
347 let resolve host =
348 match Eio.Net.getaddrinfo_stream net host with
349 | _ :: _ -> true
350 | [] -> false
351 in
352 let now () = Eio.Time.now clock in
353 let with_timeout timeout f =
354 try Some (Eio.Time.with_timeout_exn clock timeout f)
355 with Eio.Time.Timeout -> None
356 in
357 let config =
358 {
359 config with
360 default_headers =
361 List.map
362 (fun (k, v) -> (String.lowercase_ascii k, v))
363 config.default_headers;
364 }
365 in
366 {
367 config;
368 pool = Pool.create ~config:config.pool ();
369 mutex = Eio.Mutex.create ();
370 connect;
371 resolve;
372 now;
373 with_timeout;
374 }
375
376let close t =
377 Eio.Mutex.use_rw ~protect:true t.mutex (fun () ->
378 Pool.close_all t.pool ~close:close_conn)
379
380let merge_headers ~default_headers ~headers =
381 let is_pseudo name = String.length name > 0 && name.[0] = ':' in
382 let pseudo, regular =
383 List.partition (fun (name, _value) -> is_pseudo name) headers
384 in
385
386 let key name = String.lowercase_ascii name in
387 let overridden = Hashtbl.create 8 in
388 List.iter
389 (fun (name, _value) -> Hashtbl.replace overridden (key name) ())
390 regular;
391
392 let default_headers =
393 List.filter
394 (fun (name, _value) ->
395 String.length name > 0
396 && (not (is_pseudo name))
397 && not (Hashtbl.mem overridden (key name)))
398 default_headers
399 in
400 pseudo @ default_headers @ regular
401
402let get t url =
403 let uri = Uri.of_string url in
404 let scheme = Uri.scheme uri |> Option.value ~default:"https" in
405 let is_https = String.equal scheme "https" in
406 let host = Uri.host uri |> Option.value ~default:"localhost" in
407 let default_port = if is_https then 443 else 80 in
408 let port = Uri.port uri |> Option.value ~default:default_port in
409 let path = Uri.path_and_query uri in
410 let path = if path = "" then "/" else path in
411
412 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in
413 match
414 t.with_timeout total_timeout (fun () ->
415 match acquire_connection t ~host ~port ~is_https with
416 | Error e -> Error e
417 | Ok conn -> (
418 let base_headers = [ (":authority", host) ] in
419 let headers =
420 if t.config.default_headers = [] then base_headers
421 else if t.config.default_headers = default_default_headers then
422 base_headers @ t.config.default_headers
423 else
424 merge_headers ~default_headers:t.config.default_headers
425 ~headers:base_headers
426 in
427 let headers = H2.Headers.of_list headers in
428 let req = H2.Request.create ~headers ~scheme `GET path in
429 match do_request conn.flow req with
430 | Ok resp ->
431 release_connection t ~host ~port ~is_https conn ~keep_alive:true;
432 Ok resp
433 | Error e ->
434 release_connection t ~host ~port ~is_https conn
435 ~keep_alive:false;
436 Error e))
437 with
438 | Some result -> result
439 | None -> Error Timeout
440
441let post t url ~body:request_body =
442 let uri = Uri.of_string url in
443 let scheme = Uri.scheme uri |> Option.value ~default:"https" in
444 let is_https = String.equal scheme "https" in
445 let host = Uri.host uri |> Option.value ~default:"localhost" in
446 let default_port = if is_https then 443 else 80 in
447 let port = Uri.port uri |> Option.value ~default:default_port in
448 let path = Uri.path_and_query uri in
449 let path = if path = "" then "/" else path in
450
451 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in
452 match
453 t.with_timeout total_timeout (fun () ->
454 match acquire_connection t ~host ~port ~is_https with
455 | Error e -> Error e
456 | Ok conn -> (
457 let content_length = String.length request_body in
458 let base_pseudo = [ (":authority", host) ] in
459 let base_regular =
460 [
461 ("content-length", string_of_int content_length);
462 ("content-type", "application/octet-stream");
463 ]
464 in
465 let headers =
466 if t.config.default_headers = [] then base_pseudo @ base_regular
467 else if t.config.default_headers = default_default_headers then
468 base_pseudo @ t.config.default_headers @ base_regular
469 else
470 merge_headers ~default_headers:t.config.default_headers
471 ~headers:(base_pseudo @ base_regular)
472 in
473 let headers = H2.Headers.of_list headers in
474 let req = H2.Request.create ~headers ~scheme `POST path in
475 match do_request ~body:request_body conn.flow req with
476 | Ok resp ->
477 release_connection t ~host ~port ~is_https conn ~keep_alive:true;
478 Ok resp
479 | Error e ->
480 release_connection t ~host ~port ~is_https conn
481 ~keep_alive:false;
482 Error e))
483 with
484 | Some result -> result
485 | None -> Error Timeout
486
487let post' ~sw ~net ~clock ?config url ~body =
488 let t = create ~sw ~net ~clock ?config () in
489 let result = post t url ~body in
490 close t;
491 result
492
493(** {1 Backward-compatible stateless API} *)
494
495let get' ~sw ~net ~clock ?config url =
496 let t = create ~sw ~net ~clock ?config () in
497 let result = get t url in
498 close t;
499 result