ocaml http/1, http/2 and websocket client and server library
at main 17 kB view raw
1(** HTTP/1.1 Client implementation using h1. 2 3 This module provides HTTP/1.1 client functionality built on Eio with 4 connection pooling for high performance. *) 5 6(** {1 Configuration} *) 7 8type config = { 9 (* Timeouts *) 10 connect_timeout : float; (** Connection timeout in seconds. Default: 30.0 *) 11 read_timeout : float; (** Read timeout in seconds. Default: 30.0 *) 12 write_timeout : float; (** Write timeout in seconds. Default: 30.0 *) 13 (* Behavior *) 14 follow_redirects : int option; 15 (** Max redirects to follow. None = don't follow. Default: Some 10 *) 16 (* Buffers *) 17 buffer_size : int; (** Read buffer size. Default: 16384 *) 18 max_response_body : int64 option; 19 (** Max response body size. None = unlimited. Default: None *) 20 (* TLS *) 21 tls : Tls_config.Client.t; (** TLS configuration *) 22 (* Headers *) 23 default_headers : (string * string) list; 24 (** Headers to add to every request *) 25 (* Pool settings *) 26 pool : Pool.config; (** Connection pool configuration *) 27} 28(** Client configuration *) 29 30let default_config = 31 { 32 connect_timeout = 30.0; 33 read_timeout = 30.0; 34 write_timeout = 30.0; 35 follow_redirects = Some 10; 36 buffer_size = 16384; 37 max_response_body = None; 38 tls = Tls_config.Client.default; 39 default_headers = [ ("User-Agent", "hcs/" ^ Build_info.version) ]; 40 pool = Pool.default_config; 41 } 42 43(** {2 Config builders} *) 44 45let with_timeout timeout config = 46 { config with connect_timeout = timeout; read_timeout = timeout } 47 48let with_connect_timeout timeout config = 49 { config with connect_timeout = timeout } 50 51let with_read_timeout timeout config = { config with read_timeout = timeout } 52let with_write_timeout timeout config = { config with write_timeout = timeout } 53 54let with_redirects max_redirects config = 55 { config with follow_redirects = Some max_redirects } 56 57let without_redirects config = { config with follow_redirects = None } 58let with_buffer_size size config = { config with buffer_size = size } 59 60let with_max_response_body max_size config = 61 { config with max_response_body = Some max_size } 62 63let with_tls tls config = { config with tls } 64let with_insecure_tls config = { config with tls = Tls_config.Client.insecure } 65 66let with_default_header name value config = 67 { config with default_headers = (name, value) :: config.default_headers } 68 69let with_default_headers headers config = 70 { config with default_headers = headers @ config.default_headers } 71 72let with_pool_config pool config = { config with pool } 73 74let with_max_connections max_conn config = 75 { 76 config with 77 pool = { config.pool with max_connections_per_host = max_conn }; 78 } 79 80let with_idle_timeout timeout config = 81 { config with pool = { config.pool with idle_timeout = timeout } } 82 83(** Error type for client operations *) 84type error = 85 | Connection_failed of string 86 | Tls_error of string 87 | Timeout 88 | Invalid_response of string 89 | Too_many_redirects 90 91type response = { status : H1.Status.t; headers : H1.Headers.t; body : string } 92(** Response type *) 93 94(** {1 Connection type for pooling} *) 95 96type conn = { 97 flow : Eio.Flow.two_way_ty Eio.Std.r; 98 read_buffer : Bigstringaf.t; 99 mutable alive : bool; 100} 101(** Internal connection state *) 102 103(** {1 Client type} *) 104 105type t = { 106 config : config; 107 pool : conn Pool.t; 108 mutex : Eio.Mutex.t; 109 (* Functions that capture the Eio environment *) 110 connect : host:string -> port:int -> Eio.Flow.two_way_ty Eio.Std.r; 111 resolve : string -> bool; (* returns true if host can be resolved *) 112 now : unit -> float; 113 with_timeout : 'a. float -> (unit -> 'a) -> 'a option; 114} 115(** HTTP/1.1 client with connection pooling. Uses closures to capture Eio 116 environment. *) 117 118(** {1 Internal helpers} *) 119 120(** Write all IOVecs to the flow *) 121let write_iovecs flow iovecs = 122 let cstructs = 123 List.map 124 (fun iov -> 125 Cstruct.of_bigarray ~off:iov.Httpun_types.IOVec.off 126 ~len:iov.Httpun_types.IOVec.len iov.Httpun_types.IOVec.buffer) 127 iovecs 128 in 129 Eio.Flow.write flow cstructs 130 131(** Read from flow into bigstring buffer *) 132let read_into_bigstring flow buf ~off ~len = 133 let cs = Cstruct.of_bigarray ~off ~len buf in 134 try 135 let n = Eio.Flow.single_read flow cs in 136 `Ok n 137 with End_of_file -> `Eof 138 139(** Close a connection safely *) 140let close_conn c = 141 if c.alive then begin 142 c.alive <- false; 143 (* Use Obj.magic to close - the flow supports close even if type doesn't show it *) 144 try Eio.Flow.close (Obj.magic c.flow) with _ -> () 145 end 146 147(** Perform an HTTP/1.1 request on a connected flow *) 148let do_request ?(request_body = "") conn req = 149 let flow = conn.flow in 150 (* Set up response handling *) 151 let response_received = Eio.Promise.create () in 152 let body_buffer = Buffer.create 4096 in 153 let resolved = ref false in 154 let keep_alive = ref true in 155 156 let resolve_once result = 157 if not !resolved then begin 158 resolved := true; 159 Eio.Promise.resolve (snd response_received) result 160 end 161 in 162 163 let response_handler resp body_reader = 164 (* Check Connection header for keep-alive *) 165 (match H1.Headers.get resp.H1.Response.headers "connection" with 166 | Some v when String.lowercase_ascii v = "close" -> keep_alive := false 167 | _ -> ()); 168 169 let rec read_body () = 170 H1.Body.Reader.schedule_read body_reader 171 ~on_eof:(fun () -> 172 let body = Buffer.contents body_buffer in 173 resolve_once 174 (Ok 175 ( { 176 status = resp.H1.Response.status; 177 headers = resp.headers; 178 body; 179 }, 180 !keep_alive ))) 181 ~on_read:(fun buf ~off ~len -> 182 Buffer.add_string body_buffer (Bigstringaf.substring buf ~off ~len); 183 read_body ()) 184 in 185 read_body () 186 in 187 188 let error_handler err = 189 keep_alive := false; 190 conn.alive <- false; 191 let msg = 192 match err with 193 | `Malformed_response s -> s 194 | `Invalid_response_body_length _ -> "Invalid response body length" 195 | `Exn exn -> Printexc.to_string exn 196 in 197 resolve_once (Error (Invalid_response msg)) 198 in 199 200 (* Create the client connection *) 201 let body_writer, h1_conn = 202 H1.Client_connection.request req ~error_handler ~response_handler 203 in 204 205 (* Write request body if provided, then close *) 206 if String.length request_body > 0 then begin 207 H1.Body.Writer.write_string body_writer request_body; 208 H1.Body.Writer.flush body_writer (fun () -> ()) 209 end; 210 H1.Body.Writer.close body_writer; 211 212 (* Buffer for reading - track unconsumed bytes between reads *) 213 let read_buffer = conn.read_buffer in 214 let read_buffer_size = Bigstringaf.length read_buffer in 215 let unconsumed = ref 0 in 216 217 (* Connection loop - handle both read and write operations *) 218 let rec loop () = 219 (* First, handle any pending writes *) 220 let write_done = 221 match H1.Client_connection.next_write_operation h1_conn with 222 | `Write iovecs -> 223 write_iovecs flow iovecs; 224 let len = 225 List.fold_left 226 (fun acc iov -> acc + iov.Httpun_types.IOVec.len) 227 0 iovecs 228 in 229 H1.Client_connection.report_write_result h1_conn (`Ok len); 230 false 231 | `Yield -> true 232 | `Close _ -> true 233 in 234 235 (* Then handle reads *) 236 let read_done = 237 match H1.Client_connection.next_read_operation h1_conn with 238 | `Read -> ( 239 let available = read_buffer_size - !unconsumed in 240 match 241 read_into_bigstring flow read_buffer ~off:!unconsumed ~len:available 242 with 243 | `Ok n -> 244 let total = !unconsumed + n in 245 let consumed = 246 H1.Client_connection.read h1_conn read_buffer ~off:0 ~len:total 247 in 248 (* Shift unconsumed bytes to start of buffer *) 249 let remaining = total - consumed in 250 if remaining > 0 && consumed > 0 then 251 Bigstringaf.blit read_buffer ~src_off:consumed read_buffer 252 ~dst_off:0 ~len:remaining; 253 unconsumed := remaining; 254 false 255 | `Eof -> 256 conn.alive <- false; 257 let _ = 258 H1.Client_connection.read_eof h1_conn read_buffer ~off:0 259 ~len:!unconsumed 260 in 261 true) 262 | `Close -> true 263 in 264 265 (* Continue until both read and write are done *) 266 if not (write_done && read_done) then loop () 267 in 268 269 (try loop () with _ -> conn.alive <- false); 270 Eio.Promise.await (fst response_received) 271 272let create_connection t ~host ~port ~is_https = 273 if not (t.resolve host) then 274 Error (Connection_failed ("Cannot resolve host: " ^ host)) 275 else 276 try 277 let tcp_flow = t.connect ~host ~port in 278 let flow = 279 if is_https then 280 match Tls_config.Client.to_tls_config t.config.tls ~host with 281 | Error msg -> raise (Failure msg) 282 | Ok tls_config -> 283 let host_domain = 284 match Domain_name.of_string host with 285 | Ok dn -> ( 286 match Domain_name.host dn with 287 | Ok h -> Some h 288 | Error _ -> None) 289 | Error _ -> None 290 in 291 let tls_flow = 292 Tls_eio.client_of_flow tls_config ?host:host_domain 293 (Obj.magic tcp_flow) 294 in 295 (Obj.magic tls_flow : Eio.Flow.two_way_ty Eio.Std.r) 296 else tcp_flow 297 in 298 Ok 299 { 300 flow; 301 read_buffer = Bigstringaf.create t.config.buffer_size; 302 alive = true; 303 } 304 with 305 | Tls_eio.Tls_failure failure -> 306 Error (Tls_error (Tls_config.failure_to_string failure)) 307 | Failure msg -> Error (Tls_error msg) 308 | exn -> Error (Connection_failed (Printexc.to_string exn)) 309 310(** Acquire a connection from pool or create new one *) 311let acquire_connection t ~host ~port ~is_https = 312 let key = Pool.make_key ~host ~port ~tls:is_https in 313 Eio.Mutex.use_rw ~protect:true t.mutex (fun () -> 314 (* Try to get idle connection from pool *) 315 match Pool.acquire t.pool key ~now:(t.now ()) with 316 | Some conn when conn.alive -> Ok conn 317 | Some conn -> 318 (* Dead connection, remove and create new *) 319 Pool.remove t.pool key conn; 320 create_connection t ~host ~port ~is_https 321 | None -> 322 (* No idle connection, create new *) 323 create_connection t ~host ~port ~is_https) 324 325(** Release connection back to pool *) 326let release_connection t ~host ~port ~is_https conn ~keep_alive = 327 let key = Pool.make_key ~host ~port ~tls:is_https in 328 Eio.Mutex.use_rw ~protect:true t.mutex (fun () -> 329 if keep_alive && conn.alive then begin 330 (* Try to add back to pool *) 331 if not (Pool.add t.pool key conn ~now:(t.now ())) then 332 (* Pool full, just mark for release *) 333 Pool.release t.pool key conn ~now:(t.now ()) 334 end 335 else begin 336 (* Connection not reusable *) 337 Pool.remove t.pool key conn; 338 close_conn conn 339 end) 340 341(** {1 Public API} *) 342 343(** Create a new HTTP/1.1 client with connection pooling *) 344let create ~sw ~net ~clock ?(config = default_config) () = 345 let connect ~host ~port = 346 let addrs = Eio.Net.getaddrinfo_stream net host in 347 match addrs with 348 | [] -> failwith ("Cannot resolve host: " ^ host) 349 | addr_info :: _ -> 350 let addr = 351 match addr_info with 352 | `Tcp (ip, _) -> `Tcp (ip, port) 353 | `Unix p -> `Unix p 354 in 355 let flow = Eio.Net.connect ~sw net addr in 356 (flow :> Eio.Flow.two_way_ty Eio.Std.r) 357 in 358 let resolve host = 359 match Eio.Net.getaddrinfo_stream net host with 360 | _ :: _ -> true 361 | [] -> false 362 in 363 let now () = Eio.Time.now clock in 364 let with_timeout timeout f = 365 try Some (Eio.Time.with_timeout_exn clock timeout f) 366 with Eio.Time.Timeout -> None 367 in 368 { 369 config; 370 pool = Pool.create ~config:config.pool (); 371 mutex = Eio.Mutex.create (); 372 connect; 373 resolve; 374 now; 375 with_timeout; 376 } 377 378(** Close all connections and cleanup *) 379let close t = 380 Eio.Mutex.use_rw ~protect:true t.mutex (fun () -> 381 Pool.close_all t.pool ~close:close_conn) 382 383let merge_headers ~default_headers ~headers = 384 let key name = String.lowercase_ascii name in 385 let overridden = Hashtbl.create 8 in 386 List.iter 387 (fun (name, _value) -> Hashtbl.replace overridden (key name) ()) 388 headers; 389 let default_headers = 390 List.filter 391 (fun (name, _value) -> not (Hashtbl.mem overridden (key name))) 392 default_headers 393 in 394 default_headers @ headers 395 396(** Perform a GET request using pooled connections *) 397let get t url = 398 let uri = Uri.of_string url in 399 let scheme = Uri.scheme uri |> Option.value ~default:"http" in 400 let is_https = String.equal scheme "https" in 401 let host = Uri.host uri |> Option.value ~default:"localhost" in 402 let default_port = if is_https then 443 else 80 in 403 let port = Uri.port uri |> Option.value ~default:default_port in 404 let path = Uri.path_and_query uri in 405 let path = if path = "" then "/" else path in 406 407 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in 408 match 409 t.with_timeout total_timeout (fun () -> 410 match acquire_connection t ~host ~port ~is_https with 411 | Error e -> Error e 412 | Ok conn -> ( 413 let base_headers = 414 [ ("Host", host); ("Connection", "keep-alive") ] 415 in 416 let headers = 417 if t.config.default_headers = [] then base_headers 418 else if t.config.default_headers = default_config.default_headers 419 then t.config.default_headers @ base_headers 420 else 421 merge_headers ~default_headers:t.config.default_headers 422 ~headers:base_headers 423 in 424 let req = 425 H1.Request.create ~headers:(H1.Headers.of_list headers) `GET path 426 in 427 match do_request conn req with 428 | Ok (resp, keep_alive) -> 429 release_connection t ~host ~port ~is_https conn ~keep_alive; 430 Ok resp 431 | Error e -> 432 release_connection t ~host ~port ~is_https conn 433 ~keep_alive:false; 434 Error e)) 435 with 436 | Some result -> result 437 | None -> Error Timeout 438 439(** Perform a POST request using pooled connections *) 440let post t url ~body:request_body = 441 let uri = Uri.of_string url in 442 let scheme = Uri.scheme uri |> Option.value ~default:"http" in 443 let is_https = String.equal scheme "https" in 444 let host = Uri.host uri |> Option.value ~default:"localhost" in 445 let default_port = if is_https then 443 else 80 in 446 let port = Uri.port uri |> Option.value ~default:default_port in 447 let path = Uri.path_and_query uri in 448 let path = if path = "" then "/" else path in 449 450 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in 451 match 452 t.with_timeout total_timeout (fun () -> 453 match acquire_connection t ~host ~port ~is_https with 454 | Error e -> Error e 455 | Ok conn -> ( 456 let content_length = String.length request_body in 457 let base_headers = 458 [ 459 ("Host", host); 460 ("Connection", "keep-alive"); 461 ("Content-Length", string_of_int content_length); 462 ] 463 in 464 let headers = 465 if t.config.default_headers = [] then base_headers 466 else if t.config.default_headers = default_config.default_headers 467 then t.config.default_headers @ base_headers 468 else 469 merge_headers ~default_headers:t.config.default_headers 470 ~headers:base_headers 471 in 472 let req = 473 H1.Request.create ~headers:(H1.Headers.of_list headers) `POST path 474 in 475 match do_request ~request_body conn req with 476 | Ok (resp, keep_alive) -> 477 release_connection t ~host ~port ~is_https conn ~keep_alive; 478 Ok resp 479 | Error e -> 480 release_connection t ~host ~port ~is_https conn 481 ~keep_alive:false; 482 Error e)) 483 with 484 | Some result -> result 485 | None -> Error Timeout 486 487(** {1 Backward-compatible stateless API} *) 488 489(** Perform a GET request (creates temporary client, no pooling benefit) *) 490let get' ~sw ~net ~clock ?(config = default_config) url = 491 let t = create ~sw ~net ~clock ~config () in 492 let result = get t url in 493 close t; 494 result 495 496(** Perform a POST request (creates temporary client, no pooling benefit) *) 497let post' ~sw ~net ~clock ?(config = default_config) url ~body = 498 let t = create ~sw ~net ~clock ~config () in 499 let result = post t url ~body in 500 close t; 501 result