ocaml http/1, http/2 and websocket client and server library
at main 16 kB view raw
1(** HTTP/2 Client implementation using h2. 2 3 This module provides HTTP/2 client functionality built on the h2 library 4 with Eio for structured concurrency and connection pooling. *) 5 6open Eio.Std 7 8(** {1 Types} *) 9 10type error = 11 | Connection_failed of string 12 | Tls_error of string 13 | Protocol_error of string 14 | Timeout 15 | Invalid_response of string 16 17type response = { status : H2.Status.t; headers : H2.Headers.t; body : string } 18 19type config = { 20 connect_timeout : float; 21 read_timeout : float; 22 buffer_size : int; 23 default_headers : (string * string) list; 24 pool : Pool.config; 25} 26 27let default_config = 28 { 29 connect_timeout = 30.0; 30 read_timeout = 30.0; 31 buffer_size = 0x4000; 32 default_headers = [ ("User-Agent", "hcs/" ^ Build_info.version) ]; 33 pool = Pool.default_config; 34 } 35 36let default_default_headers = 37 List.map 38 (fun (k, v) -> (String.lowercase_ascii k, v)) 39 default_config.default_headers 40 41let with_timeout timeout config = 42 { config with connect_timeout = timeout; read_timeout = timeout } 43 44let with_default_header name value config = 45 { config with default_headers = (name, value) :: config.default_headers } 46 47let with_default_headers headers config = 48 { config with default_headers = headers @ config.default_headers } 49 50let with_pool_config pool config = { config with pool } 51 52(** {1 Internal connection type} *) 53 54type conn = { flow : Eio.Flow.two_way_ty r; mutable alive : bool } 55 56(** {1 Client type} *) 57 58type t = { 59 config : config; 60 pool : conn Pool.t; 61 mutex : Eio.Mutex.t; 62 connect : host:string -> port:int -> is_https:bool -> Eio.Flow.two_way_ty r; 63 resolve : string -> bool; 64 now : unit -> float; 65 with_timeout : 'a. float -> (unit -> 'a) -> 'a option; 66} 67 68(** {1 Internal helpers} *) 69 70let writev flow iovecs = 71 let lenv, cstructs = 72 List.fold_left_map 73 (fun acc iov -> 74 let len = iov.H2.IOVec.len in 75 let cs = 76 Cstruct.of_bigarray ~off:iov.H2.IOVec.off ~len iov.H2.IOVec.buffer 77 in 78 (acc + len, cs)) 79 0 iovecs 80 in 81 match Eio.Flow.write flow cstructs with 82 | () -> `Ok lenv 83 | exception End_of_file -> `Closed 84 85let shutdown flow cmd = 86 try Eio.Flow.shutdown flow cmd with 87 | Unix.Unix_error (Unix.ENOTCONN, _, _) -> () 88 | Eio.Io (Eio.Net.E (Eio.Net.Connection_reset _), _) -> () 89 90module Read_buffer : sig 91 type t 92 93 val create : int -> t 94 val read : t -> _ Eio.Flow.source -> int 95 val get : t -> f:(Bigstringaf.t -> off:int -> len:int -> int) -> int 96end = struct 97 type t = { 98 buffer : Bigstringaf.t; 99 mutable off : int; 100 mutable len : int; 101 cap : int; 102 } 103 104 let create size = 105 { buffer = Bigstringaf.create size; off = 0; len = 0; cap = size } 106 107 let compress t = 108 if t.len = 0 then begin 109 t.off <- 0; 110 t.len <- 0 111 end 112 else if t.off > 0 then begin 113 Bigstringaf.blit t.buffer ~src_off:t.off t.buffer ~dst_off:0 ~len:t.len; 114 t.off <- 0 115 end 116 117 let read t flow = 118 compress t; 119 let off = t.off + t.len in 120 let available = t.cap - t.len - t.off in 121 if available > 0 then begin 122 let cs = Cstruct.of_bigarray t.buffer ~off ~len:available in 123 let n = Eio.Flow.single_read flow cs in 124 t.len <- t.len + n; 125 n 126 end 127 else 0 128 129 let get t ~f = 130 let n = f t.buffer ~off:t.off ~len:t.len in 131 t.off <- t.off + n; 132 t.len <- t.len - n; 133 if t.len = 0 then t.off <- 0; 134 n 135end 136 137let close_conn c = 138 if c.alive then begin 139 c.alive <- false; 140 try Eio.Flow.close (Obj.magic c.flow) with _ -> () 141 end 142 143let do_request ?(body = "") flow req = 144 let response_received = Eio.Promise.create () in 145 let body_buffer = Buffer.create 4096 in 146 let resolved = ref false in 147 148 let resolve_once result = 149 if not !resolved then begin 150 resolved := true; 151 Eio.Promise.resolve (snd response_received) result 152 end 153 in 154 155 let response_handler resp body_reader = 156 let rec read_body () = 157 H2.Body.Reader.schedule_read body_reader 158 ~on_eof:(fun () -> 159 let body = Buffer.contents body_buffer in 160 resolve_once 161 (Ok 162 { 163 status = resp.H2.Response.status; 164 headers = resp.headers; 165 body; 166 })) 167 ~on_read:(fun buf ~off ~len -> 168 Buffer.add_string body_buffer (Bigstringaf.substring buf ~off ~len); 169 read_body ()) 170 in 171 read_body () 172 in 173 174 let error_handler err = 175 let msg = 176 match err with 177 | `Malformed_response s -> "Malformed response: " ^ s 178 | `Invalid_response_body_length _ -> "Invalid response body length" 179 | `Protocol_error (code, msg) -> 180 let code = H2.Error_code.to_string code in 181 Buf.build64 (fun b -> 182 Buf.string b "Protocol error "; 183 Buf.string b code; 184 Buf.string b ": "; 185 Buf.string b msg) 186 | `Exn exn -> Printexc.to_string exn 187 in 188 resolve_once (Error (Invalid_response msg)) 189 in 190 191 let conn = H2.Client_connection.create ~error_handler () in 192 193 let request_sent = ref false in 194 let send_request () = 195 if not !request_sent then begin 196 request_sent := true; 197 let body_writer = 198 H2.Client_connection.request conn req ~flush_headers_immediately:true 199 ~error_handler ~response_handler 200 in 201 if String.length body > 0 then begin 202 H2.Body.Writer.write_string body_writer body; 203 H2.Body.Writer.flush body_writer (fun _result -> 204 H2.Body.Writer.close body_writer) 205 end 206 else H2.Body.Writer.close body_writer 207 end 208 in 209 210 let read_buffer = Read_buffer.create 0x4000 in 211 let request_queued = ref false in 212 213 let read_loop () = 214 let rec read_loop_step () = 215 match H2.Client_connection.next_read_operation conn with 216 | `Read -> ( 217 match Read_buffer.read read_buffer flow with 218 | _n -> 219 let _consumed = 220 Read_buffer.get read_buffer ~f:(fun buf ~off ~len -> 221 H2.Client_connection.read conn buf ~off ~len) 222 in 223 read_loop_step () 224 | exception End_of_file -> 225 let _ = 226 Read_buffer.get read_buffer ~f:(fun buf ~off ~len -> 227 H2.Client_connection.read_eof conn buf ~off ~len) 228 in 229 ()) 230 | `Yield -> 231 let p, u = Eio.Promise.create () in 232 H2.Client_connection.yield_reader conn (fun () -> 233 Eio.Promise.resolve u ()); 234 Eio.Promise.await p; 235 read_loop_step () 236 | `Close -> shutdown flow `Receive 237 in 238 try read_loop_step () with exn -> H2.Client_connection.report_exn conn exn 239 in 240 241 let write_loop () = 242 let rec loop () = 243 match H2.Client_connection.next_write_operation conn with 244 | `Write iovecs -> ( 245 match writev flow iovecs with 246 | `Ok len -> 247 H2.Client_connection.report_write_result conn (`Ok len); 248 if not !request_queued then begin 249 request_queued := true; 250 send_request () 251 end; 252 loop () 253 | `Closed -> H2.Client_connection.report_write_result conn `Closed) 254 | `Yield -> 255 let p, u = Eio.Promise.create () in 256 H2.Client_connection.yield_writer conn (fun () -> 257 Eio.Promise.resolve u ()); 258 Eio.Promise.await p; 259 loop () 260 | `Close _ -> shutdown flow `Send 261 in 262 try loop () with exn -> H2.Client_connection.report_exn conn exn 263 in 264 265 let io_loops () = 266 Eio.Fiber.both read_loop write_loop; 267 Error (Protocol_error "Connection closed before response") 268 in 269 270 let wait_for_response () = 271 let result = Eio.Promise.await (fst response_received) in 272 H2.Client_connection.shutdown conn; 273 result 274 in 275 276 Eio.Fiber.any [ io_loops; wait_for_response ] 277 278let create_connection t ~host ~port ~is_https = 279 if not (t.resolve host) then 280 Error (Connection_failed ("Cannot resolve host: " ^ host)) 281 else 282 try 283 let flow = t.connect ~host ~port ~is_https in 284 Ok { flow; alive = true } 285 with 286 | Tls_eio.Tls_failure failure -> 287 Error (Tls_error (Tls_config.failure_to_string failure)) 288 | exn -> Error (Connection_failed (Printexc.to_string exn)) 289 290let acquire_connection t ~host ~port ~is_https = 291 let key = Pool.make_key ~host ~port ~tls:is_https in 292 Eio.Mutex.use_rw ~protect:true t.mutex (fun () -> 293 match Pool.acquire t.pool key ~now:(t.now ()) with 294 | Some conn when conn.alive -> Ok conn 295 | Some conn -> 296 Pool.remove t.pool key conn; 297 create_connection t ~host ~port ~is_https 298 | None -> create_connection t ~host ~port ~is_https) 299 300let release_connection t ~host ~port ~is_https conn ~keep_alive = 301 let key = Pool.make_key ~host ~port ~tls:is_https in 302 Eio.Mutex.use_rw ~protect:true t.mutex (fun () -> 303 if keep_alive && conn.alive then begin 304 if not (Pool.add t.pool key conn ~now:(t.now ())) then 305 Pool.release t.pool key conn ~now:(t.now ()) 306 end 307 else begin 308 Pool.remove t.pool key conn; 309 close_conn conn 310 end) 311 312(** {1 Public API} *) 313 314let create ~sw ~net ~clock ?(config = default_config) () = 315 let connect ~host ~port ~is_https = 316 let addrs = Eio.Net.getaddrinfo_stream net host in 317 match addrs with 318 | [] -> failwith ("Cannot resolve host: " ^ host) 319 | addr_info :: _ -> 320 let addr = 321 match addr_info with 322 | `Tcp (ip, _) -> `Tcp (ip, port) 323 | `Unix p -> `Unix p 324 in 325 let tcp_flow = Eio.Net.connect ~sw net addr in 326 if is_https then begin 327 let h2_tls = Tls_config.Client.h2 in 328 match Tls_config.Client.to_tls_config h2_tls ~host with 329 | Error msg -> failwith msg 330 | Ok tls_config -> 331 let host_domain = 332 match Domain_name.of_string host with 333 | Ok dn -> ( 334 match Domain_name.host dn with 335 | Ok h -> Some h 336 | Error _ -> None) 337 | Error _ -> None 338 in 339 let tls_flow = 340 Tls_eio.client_of_flow tls_config ?host:host_domain 341 (Obj.magic tcp_flow) 342 in 343 (Obj.magic tls_flow : Eio.Flow.two_way_ty r) 344 end 345 else (tcp_flow :> Eio.Flow.two_way_ty r) 346 in 347 let resolve host = 348 match Eio.Net.getaddrinfo_stream net host with 349 | _ :: _ -> true 350 | [] -> false 351 in 352 let now () = Eio.Time.now clock in 353 let with_timeout timeout f = 354 try Some (Eio.Time.with_timeout_exn clock timeout f) 355 with Eio.Time.Timeout -> None 356 in 357 let config = 358 { 359 config with 360 default_headers = 361 List.map 362 (fun (k, v) -> (String.lowercase_ascii k, v)) 363 config.default_headers; 364 } 365 in 366 { 367 config; 368 pool = Pool.create ~config:config.pool (); 369 mutex = Eio.Mutex.create (); 370 connect; 371 resolve; 372 now; 373 with_timeout; 374 } 375 376let close t = 377 Eio.Mutex.use_rw ~protect:true t.mutex (fun () -> 378 Pool.close_all t.pool ~close:close_conn) 379 380let merge_headers ~default_headers ~headers = 381 let is_pseudo name = String.length name > 0 && name.[0] = ':' in 382 let pseudo, regular = 383 List.partition (fun (name, _value) -> is_pseudo name) headers 384 in 385 386 let key name = String.lowercase_ascii name in 387 let overridden = Hashtbl.create 8 in 388 List.iter 389 (fun (name, _value) -> Hashtbl.replace overridden (key name) ()) 390 regular; 391 392 let default_headers = 393 List.filter 394 (fun (name, _value) -> 395 String.length name > 0 396 && (not (is_pseudo name)) 397 && not (Hashtbl.mem overridden (key name))) 398 default_headers 399 in 400 pseudo @ default_headers @ regular 401 402let get t url = 403 let uri = Uri.of_string url in 404 let scheme = Uri.scheme uri |> Option.value ~default:"https" in 405 let is_https = String.equal scheme "https" in 406 let host = Uri.host uri |> Option.value ~default:"localhost" in 407 let default_port = if is_https then 443 else 80 in 408 let port = Uri.port uri |> Option.value ~default:default_port in 409 let path = Uri.path_and_query uri in 410 let path = if path = "" then "/" else path in 411 412 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in 413 match 414 t.with_timeout total_timeout (fun () -> 415 match acquire_connection t ~host ~port ~is_https with 416 | Error e -> Error e 417 | Ok conn -> ( 418 let base_headers = [ (":authority", host) ] in 419 let headers = 420 if t.config.default_headers = [] then base_headers 421 else if t.config.default_headers = default_default_headers then 422 base_headers @ t.config.default_headers 423 else 424 merge_headers ~default_headers:t.config.default_headers 425 ~headers:base_headers 426 in 427 let headers = H2.Headers.of_list headers in 428 let req = H2.Request.create ~headers ~scheme `GET path in 429 match do_request conn.flow req with 430 | Ok resp -> 431 release_connection t ~host ~port ~is_https conn ~keep_alive:true; 432 Ok resp 433 | Error e -> 434 release_connection t ~host ~port ~is_https conn 435 ~keep_alive:false; 436 Error e)) 437 with 438 | Some result -> result 439 | None -> Error Timeout 440 441let post t url ~body:request_body = 442 let uri = Uri.of_string url in 443 let scheme = Uri.scheme uri |> Option.value ~default:"https" in 444 let is_https = String.equal scheme "https" in 445 let host = Uri.host uri |> Option.value ~default:"localhost" in 446 let default_port = if is_https then 443 else 80 in 447 let port = Uri.port uri |> Option.value ~default:default_port in 448 let path = Uri.path_and_query uri in 449 let path = if path = "" then "/" else path in 450 451 let total_timeout = t.config.connect_timeout +. t.config.read_timeout in 452 match 453 t.with_timeout total_timeout (fun () -> 454 match acquire_connection t ~host ~port ~is_https with 455 | Error e -> Error e 456 | Ok conn -> ( 457 let content_length = String.length request_body in 458 let base_pseudo = [ (":authority", host) ] in 459 let base_regular = 460 [ 461 ("content-length", string_of_int content_length); 462 ("content-type", "application/octet-stream"); 463 ] 464 in 465 let headers = 466 if t.config.default_headers = [] then base_pseudo @ base_regular 467 else if t.config.default_headers = default_default_headers then 468 base_pseudo @ t.config.default_headers @ base_regular 469 else 470 merge_headers ~default_headers:t.config.default_headers 471 ~headers:(base_pseudo @ base_regular) 472 in 473 let headers = H2.Headers.of_list headers in 474 let req = H2.Request.create ~headers ~scheme `POST path in 475 match do_request ~body:request_body conn.flow req with 476 | Ok resp -> 477 release_connection t ~host ~port ~is_https conn ~keep_alive:true; 478 Ok resp 479 | Error e -> 480 release_connection t ~host ~port ~is_https conn 481 ~keep_alive:false; 482 Error e)) 483 with 484 | Some result -> result 485 | None -> Error Timeout 486 487let post' ~sw ~net ~clock ?config url ~body = 488 let t = create ~sw ~net ~clock ?config () in 489 let result = post t url ~body in 490 close t; 491 result 492 493(** {1 Backward-compatible stateless API} *) 494 495let get' ~sw ~net ~clock ?config url = 496 let t = create ~sw ~net ~clock ?config () in 497 let result = get t url in 498 close t; 499 result