ocaml http/1, http/2 and websocket client and server library
1type config = {
2 max_connections_per_host : int;
3 max_total_connections : int;
4 idle_timeout : float;
5 connection_timeout : float;
6}
7
8let default_config =
9 {
10 max_connections_per_host = 10;
11 max_total_connections = 100;
12 idle_timeout = 60.0;
13 connection_timeout = 30.0;
14 }
15
16type key = { host : string; port : int; tls : bool }
17
18let make_key ~host ~port ~tls = { host; port; tls }
19
20type 'conn entry = {
21 conn : 'conn;
22 created_at : float;
23 last_used : float Kcas.Loc.t;
24 in_use : bool Kcas.Loc.t;
25}
26
27type 'conn t = {
28 config : config;
29 connections : (key, 'conn entry list Kcas.Loc.t) Kcas_data.Hashtbl.t;
30 total_count : int Kcas.Loc.t;
31}
32
33let create ?(config = default_config) () =
34 {
35 config;
36 connections = Kcas_data.Hashtbl.create ();
37 total_count = Kcas.Loc.make 0;
38 }
39
40let get_entries_loc ~xt pool key =
41 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with
42 | Some loc -> loc
43 | None ->
44 let loc = Kcas.Loc.make [] in
45 Kcas_data.Hashtbl.Xt.replace ~xt pool.connections key loc;
46 loc
47
48let count_for_key pool key =
49 Kcas.Xt.commit
50 {
51 tx =
52 (fun ~xt ->
53 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with
54 | Some loc -> List.length (Kcas.Xt.get ~xt loc)
55 | None -> 0);
56 }
57
58let acquire pool key ~now =
59 Kcas.Xt.commit
60 {
61 tx =
62 (fun ~xt ->
63 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with
64 | None -> None
65 | Some entries_loc ->
66 let entries = Kcas.Xt.get ~xt entries_loc in
67 let rec find_idle = function
68 | [] -> None
69 | entry :: rest ->
70 let in_use = Kcas.Xt.get ~xt entry.in_use in
71 let last_used = Kcas.Xt.get ~xt entry.last_used in
72 if
73 (not in_use)
74 && now -. last_used < pool.config.idle_timeout
75 then begin
76 Kcas.Xt.set ~xt entry.in_use true;
77 Kcas.Xt.set ~xt entry.last_used now;
78 Some entry.conn
79 end
80 else find_idle rest
81 in
82 find_idle entries);
83 }
84
85let release pool key conn ~now =
86 Kcas.Xt.commit
87 {
88 tx =
89 (fun ~xt ->
90 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with
91 | None -> ()
92 | Some entries_loc ->
93 let entries = Kcas.Xt.get ~xt entries_loc in
94 List.iter
95 (fun entry ->
96 if entry.conn == conn then begin
97 Kcas.Xt.set ~xt entry.in_use false;
98 Kcas.Xt.set ~xt entry.last_used now
99 end)
100 entries);
101 }
102
103let add pool key conn ~now =
104 Kcas.Xt.commit
105 {
106 tx =
107 (fun ~xt ->
108 let total = Kcas.Xt.get ~xt pool.total_count in
109 if total >= pool.config.max_total_connections then false
110 else
111 let entries_loc = get_entries_loc ~xt pool key in
112 let entries = Kcas.Xt.get ~xt entries_loc in
113 if List.length entries >= pool.config.max_connections_per_host then
114 false
115 else begin
116 let entry =
117 {
118 conn;
119 created_at = now;
120 last_used = Kcas.Loc.make now;
121 in_use = Kcas.Loc.make true;
122 }
123 in
124 Kcas.Xt.set ~xt entries_loc (entry :: entries);
125 Kcas.Xt.set ~xt pool.total_count (total + 1);
126 true
127 end);
128 }
129
130let remove pool key conn =
131 Kcas.Xt.commit
132 {
133 tx =
134 (fun ~xt ->
135 match Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key with
136 | None -> ()
137 | Some entries_loc ->
138 let entries = Kcas.Xt.get ~xt entries_loc in
139 let original_len = List.length entries in
140 let entries =
141 List.filter (fun entry -> entry.conn != conn) entries
142 in
143 let removed = original_len - List.length entries in
144 Kcas.Xt.set ~xt entries_loc entries;
145 let total = Kcas.Xt.get ~xt pool.total_count in
146 Kcas.Xt.set ~xt pool.total_count (total - removed));
147 }
148
149let evict_idle pool ~now ~close =
150 let keys_snapshot =
151 Kcas_data.Hashtbl.to_seq_keys pool.connections |> List.of_seq
152 in
153 List.iter
154 (fun key ->
155 let to_close =
156 Kcas.Xt.commit
157 {
158 tx =
159 (fun ~xt ->
160 match
161 Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key
162 with
163 | None -> []
164 | Some entries_loc ->
165 let entries = Kcas.Xt.get ~xt entries_loc in
166 let kept, evicted =
167 List.partition
168 (fun entry ->
169 let in_use = Kcas.Xt.get ~xt entry.in_use in
170 let last_used = Kcas.Xt.get ~xt entry.last_used in
171 in_use || now -. last_used < pool.config.idle_timeout)
172 entries
173 in
174 let removed = List.length evicted in
175 Kcas.Xt.set ~xt entries_loc kept;
176 let total = Kcas.Xt.get ~xt pool.total_count in
177 Kcas.Xt.set ~xt pool.total_count (total - removed);
178 List.map (fun e -> e.conn) evicted);
179 }
180 in
181 List.iter close to_close)
182 keys_snapshot
183
184let close_all pool ~close =
185 let keys_snapshot =
186 Kcas_data.Hashtbl.to_seq_keys pool.connections |> List.of_seq
187 in
188 List.iter
189 (fun key ->
190 let to_close =
191 Kcas.Xt.commit
192 {
193 tx =
194 (fun ~xt ->
195 match
196 Kcas_data.Hashtbl.Xt.find_opt ~xt pool.connections key
197 with
198 | None -> []
199 | Some entries_loc ->
200 let entries = Kcas.Xt.get ~xt entries_loc in
201 Kcas.Xt.set ~xt entries_loc [];
202 let total = Kcas.Xt.get ~xt pool.total_count in
203 Kcas.Xt.set ~xt pool.total_count
204 (total - List.length entries);
205 List.map (fun e -> e.conn) entries);
206 }
207 in
208 List.iter close to_close)
209 keys_snapshot
210
211type stats = {
212 total_connections : int;
213 idle_connections : int;
214 in_use_connections : int;
215 hosts : int;
216}
217
218let stats pool =
219 let keys_snapshot =
220 Kcas_data.Hashtbl.to_seq_keys pool.connections |> List.of_seq
221 in
222 let idle, in_use =
223 List.fold_left
224 (fun (idle, in_use) key ->
225 match Kcas_data.Hashtbl.find_opt pool.connections key with
226 | None -> (idle, in_use)
227 | Some entries_loc ->
228 let entries = Kcas.Loc.get entries_loc in
229 List.fold_left
230 (fun (idle, in_use) entry ->
231 if Kcas.Loc.get entry.in_use then (idle, in_use + 1)
232 else (idle + 1, in_use))
233 (idle, in_use) entries)
234 (0, 0) keys_snapshot
235 in
236 {
237 total_connections = Kcas.Loc.get pool.total_count;
238 idle_connections = idle;
239 in_use_connections = in_use;
240 hosts = List.length keys_snapshot;
241 }