ocaml http/1, http/2 and websocket client and server library
at main 7.4 kB view raw
1type config = { 2 max_connections_per_host : int; 3 max_total_connections : int; 4 idle_timeout : float; 5 connection_timeout : float; 6} 7 8let default_config = 9 { 10 max_connections_per_host = 10; 11 max_total_connections = 100; 12 idle_timeout = 60.0; 13 connection_timeout = 30.0; 14 } 15 16type key = { host : string; port : int; tls : bool } 17 18let make_key ~host ~port ~tls = { host; port; tls } 19 20type 'conn entry = { 21 conn : 'conn; 22 created_at : float; 23 last_used : float Kcas.Loc.t; 24 in_use : bool Kcas.Loc.t; 25} 26 27type 'conn t = { 28 config : config; 29 connections : (key, 'conn entry list Kcas.Loc.t) Kcas_data.Hashtbl.t; 30 total_count : int Kcas.Loc.t; 31} 32 33let create ?(config = default_config) () = 34 { 35 config; 36 connections = Kcas_data.Hashtbl.create (); 37 total_count = Kcas.Loc.make 0; 38 } 39 40let get_entries_loc ~xt pool key = 41 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with 42 | Some loc -> loc 43 | None -> 44 let loc = Kcas.Loc.make [] in 45 Kcas_data.Hashtbl.Xt.replace ~xt pool.connections key loc; 46 loc 47 48let count_for_key pool key = 49 Kcas.Xt.commit 50 { 51 tx = 52 (fun ~xt -> 53 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with 54 | Some loc -> List.length (Kcas.Xt.get ~xt loc) 55 | None -> 0); 56 } 57 58let acquire pool key ~now = 59 Kcas.Xt.commit 60 { 61 tx = 62 (fun ~xt -> 63 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with 64 | None -> None 65 | Some entries_loc -> 66 let entries = Kcas.Xt.get ~xt entries_loc in 67 let rec find_idle = function 68 | [] -> None 69 | entry :: rest -> 70 let in_use = Kcas.Xt.get ~xt entry.in_use in 71 let last_used = Kcas.Xt.get ~xt entry.last_used in 72 if 73 (not in_use) 74 && now -. last_used < pool.config.idle_timeout 75 then begin 76 Kcas.Xt.set ~xt entry.in_use true; 77 Kcas.Xt.set ~xt entry.last_used now; 78 Some entry.conn 79 end 80 else find_idle rest 81 in 82 find_idle entries); 83 } 84 85let release pool key conn ~now = 86 Kcas.Xt.commit 87 { 88 tx = 89 (fun ~xt -> 90 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with 91 | None -> () 92 | Some entries_loc -> 93 let entries = Kcas.Xt.get ~xt entries_loc in 94 List.iter 95 (fun entry -> 96 if entry.conn == conn then begin 97 Kcas.Xt.set ~xt entry.in_use false; 98 Kcas.Xt.set ~xt entry.last_used now 99 end) 100 entries); 101 } 102 103let add pool key conn ~now = 104 Kcas.Xt.commit 105 { 106 tx = 107 (fun ~xt -> 108 let total = Kcas.Xt.get ~xt pool.total_count in 109 if total >= pool.config.max_total_connections then false 110 else 111 let entries_loc = get_entries_loc ~xt pool key in 112 let entries = Kcas.Xt.get ~xt entries_loc in 113 if List.length entries >= pool.config.max_connections_per_host then 114 false 115 else begin 116 let entry = 117 { 118 conn; 119 created_at = now; 120 last_used = Kcas.Loc.make now; 121 in_use = Kcas.Loc.make true; 122 } 123 in 124 Kcas.Xt.set ~xt entries_loc (entry :: entries); 125 Kcas.Xt.set ~xt pool.total_count (total + 1); 126 true 127 end); 128 } 129 130let remove pool key conn = 131 Kcas.Xt.commit 132 { 133 tx = 134 (fun ~xt -> 135 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with 136 | None -> () 137 | Some entries_loc -> 138 let entries = Kcas.Xt.get ~xt entries_loc in 139 let original_len = List.length entries in 140 let entries = 141 List.filter (fun entry -> entry.conn != conn) entries 142 in 143 let removed = original_len - List.length entries in 144 Kcas.Xt.set ~xt entries_loc entries; 145 let total = Kcas.Xt.get ~xt pool.total_count in 146 Kcas.Xt.set ~xt pool.total_count (total - removed)); 147 } 148 149let evict_idle pool ~now ~close = 150 let keys_snapshot = 151 Kcas_data.Hashtbl.to_seq_keys pool.connections |> List.of_seq 152 in 153 List.iter 154 (fun key -> 155 let to_close = 156 Kcas.Xt.commit 157 { 158 tx = 159 (fun ~xt -> 160 match 161 Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key 162 with 163 | None -> [] 164 | Some entries_loc -> 165 let entries = Kcas.Xt.get ~xt entries_loc in 166 let kept, evicted = 167 List.partition 168 (fun entry -> 169 let in_use = Kcas.Xt.get ~xt entry.in_use in 170 let last_used = Kcas.Xt.get ~xt entry.last_used in 171 in_use || now -. last_used < pool.config.idle_timeout) 172 entries 173 in 174 let removed = List.length evicted in 175 Kcas.Xt.set ~xt entries_loc kept; 176 let total = Kcas.Xt.get ~xt pool.total_count in 177 Kcas.Xt.set ~xt pool.total_count (total - removed); 178 List.map (fun e -> e.conn) evicted); 179 } 180 in 181 List.iter close to_close) 182 keys_snapshot 183 184let close_all pool ~close = 185 let keys_snapshot = 186 Kcas_data.Hashtbl.to_seq_keys pool.connections |> List.of_seq 187 in 188 List.iter 189 (fun key -> 190 let to_close = 191 Kcas.Xt.commit 192 { 193 tx = 194 (fun ~xt -> 195 match 196 Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key 197 with 198 | None -> [] 199 | Some entries_loc -> 200 let entries = Kcas.Xt.get ~xt entries_loc in 201 Kcas.Xt.set ~xt entries_loc []; 202 let total = Kcas.Xt.get ~xt pool.total_count in 203 Kcas.Xt.set ~xt pool.total_count 204 (total - List.length entries); 205 List.map (fun e -> e.conn) entries); 206 } 207 in 208 List.iter close to_close) 209 keys_snapshot 210 211type stats = { 212 total_connections : int; 213 idle_connections : int; 214 in_use_connections : int; 215 hosts : int; 216} 217 218let stats pool = 219 let keys_snapshot = 220 Kcas_data.Hashtbl.to_seq_keys pool.connections |> List.of_seq 221 in 222 let idle, in_use = 223 List.fold_left 224 (fun (idle, in_use) key -> 225 match Kcas_data.Hashtbl.find_opt pool.connections key with 226 | None -> (idle, in_use) 227 | Some entries_loc -> 228 let entries = Kcas.Loc.get entries_loc in 229 List.fold_left 230 (fun (idle, in_use) entry -> 231 if Kcas.Loc.get entry.in_use then (idle, in_use + 1) 232 else (idle + 1, in_use)) 233 (idle, in_use) entries) 234 (0, 0) keys_snapshot 235 in 236 { 237 total_connections = Kcas.Loc.get pool.total_count; 238 idle_connections = idle; 239 in_use_connections = in_use; 240 hosts = List.length keys_snapshot; 241 }