a database layer insipred by caqti and ecto
1(** Lock-free connection pool using kcas for thread-safe operations.
2
3 Works with any concurrency library (Eio, Lwt, direct-style) because kcas
4 provides lock-free data structures independent of any specific scheduler.
5
6 {[
7 let config =
8 Pool.
9 {
10 max_size = 10;
11 connect = (fun () -> Repodb_postgresql.connect conninfo);
12 close = Repodb_postgresql.close;
13 validate = None;
14 }
15 in
16 let pool = Pool.create config in
17 Pool.with_connection pool (fun conn -> Repo.all conn ~table:users ~decode)
18 ]} *)
19
20open Kcas
21open Kcas_data
22
23type pool_error =
24 | Pool_empty
25 | Pool_closed
26 | Pool_timeout
27 | Connection_error of string
28
29type 'conn config = {
30 max_size : int;
31 connect : unit -> ('conn, string) result;
32 close : 'conn -> unit;
33 validate : ('conn -> bool) option;
34}
35
36type 'conn pooled = { conn : 'conn; created_at : float }
37
38type 'conn t = {
39 config : 'conn config;
40 available : 'conn pooled Queue.t;
41 total_count : int Loc.t;
42 in_use_count : int Loc.t;
43 closed : bool Loc.t;
44}
45
46let create config =
47 {
48 config;
49 available = Queue.create ();
50 total_count = Loc.make 0;
51 in_use_count = Loc.make 0;
52 closed = Loc.make false;
53 }
54
55let size t = Loc.get t.total_count
56let in_use t = Loc.get t.in_use_count
57let available t = Queue.length t.available
58let is_closed t = Loc.get t.closed
59
60let validate_connection t pooled =
61 match t.config.validate with
62 | None -> true
63 | Some validate -> validate pooled.conn
64
65let close_pooled t pooled =
66 t.config.close pooled.conn;
67 Loc.decr t.total_count
68
69let try_create_connection t =
70 let tx ~xt =
71 if Xt.get ~xt t.closed then `Closed
72 else
73 let total = Xt.get ~xt t.total_count in
74 if total >= t.config.max_size then `At_capacity
75 else begin
76 Xt.set ~xt t.total_count (total + 1);
77 Xt.set ~xt t.in_use_count (Xt.get ~xt t.in_use_count + 1);
78 `Can_create
79 end
80 in
81 match Xt.commit { tx } with
82 | `Closed -> Error Pool_closed
83 | `At_capacity -> Error Pool_empty
84 | `Can_create -> (
85 match t.config.connect () with
86 | Ok conn ->
87 let pooled = { conn; created_at = Unix.gettimeofday () } in
88 Ok pooled
89 | Error msg ->
90 Loc.decr t.total_count;
91 Loc.decr t.in_use_count;
92 Error (Connection_error msg))
93
94let rec acquire_from_pool t =
95 if Loc.get t.closed then Error Pool_closed
96 else
97 match Queue.take_opt t.available with
98 | None -> Error Pool_empty
99 | Some pooled ->
100 if validate_connection t pooled then begin
101 Loc.incr t.in_use_count;
102 Ok pooled.conn
103 end
104 else begin
105 close_pooled t pooled;
106 acquire_from_pool t
107 end
108
109let acquire t =
110 match acquire_from_pool t with
111 | Ok conn -> Ok conn
112 | Error Pool_empty -> (
113 match try_create_connection t with
114 | Ok pooled -> Ok pooled.conn
115 | Error e -> Error e)
116 | Error e -> Error e
117
118let rec acquire_blocking ?timeoutf t =
119 if Loc.get t.closed then Error Pool_closed
120 else
121 match acquire t with
122 | Ok conn -> Ok conn
123 | Error Pool_empty -> (
124 match timeoutf with
125 | Some timeout -> (
126 match Queue.take_blocking ~timeoutf:timeout t.available with
127 | exception Kcas.Timeout.Timeout -> Error Pool_timeout
128 | pooled ->
129 if validate_connection t pooled then begin
130 Loc.incr t.in_use_count;
131 Ok pooled.conn
132 end
133 else begin
134 close_pooled t pooled;
135 acquire_blocking ?timeoutf t
136 end)
137 | None ->
138 let pooled = Queue.take_blocking t.available in
139 if validate_connection t pooled then begin
140 Loc.incr t.in_use_count;
141 Ok pooled.conn
142 end
143 else begin
144 close_pooled t pooled;
145 acquire_blocking ?timeoutf t
146 end)
147 | Error e -> Error e
148
149let release t conn =
150 if Loc.get t.closed then t.config.close conn
151 else begin
152 Loc.decr t.in_use_count;
153 let pooled = { conn; created_at = Unix.gettimeofday () } in
154 Queue.add pooled t.available
155 end
156
157let with_connection t f =
158 match acquire t with
159 | Error e -> Error e
160 | Ok conn -> (
161 match f conn with
162 | result ->
163 release t conn;
164 Ok result
165 | exception exn ->
166 release t conn;
167 raise exn)
168
169let with_connection_blocking ?timeoutf t f =
170 match acquire_blocking ?timeoutf t with
171 | Error e -> Error e
172 | Ok conn -> (
173 match f conn with
174 | result ->
175 release t conn;
176 Ok result
177 | exception exn ->
178 release t conn;
179 raise exn)
180
181let drain t =
182 let rec loop () =
183 match Queue.take_opt t.available with
184 | None -> ()
185 | Some pooled ->
186 close_pooled t pooled;
187 loop ()
188 in
189 loop ()
190
191let shutdown t =
192 Loc.set t.closed true;
193 drain t
194
195type stats = { total : int; available : int; in_use : int; closed : bool }
196
197let stats t =
198 {
199 total = size t;
200 available = available t;
201 in_use = in_use t;
202 closed = is_closed t;
203 }
204
205let error_to_string = function
206 | Pool_empty -> "Pool empty: no connections available"
207 | Pool_closed -> "Pool closed"
208 | Pool_timeout -> "Timeout waiting for connection"
209 | Connection_error msg -> Printf.sprintf "Connection error: %s" msg
210
211(** Multi-server pool with round-robin load balancing.
212
213 Distributes connections across multiple database servers without requiring
214 an external load balancer. Each server maintains its own connection pool.
215
216 {[
217 let config =
218 Pool.
219 {
220 servers = [ "host1"; "host2"; "host3" ];
221 max_size_per_server = 5;
222 connect = (fun conninfo -> Repodb_postgresql.connect conninfo);
223 close = Repodb_postgresql.close;
224 validate = None;
225 }
226 in
227 let multi = Pool.create_multi config in
228 Pool.with_connection_multi multi (fun conn ->
229 Repo.all conn ~table:users ~decode)
230 ]} *)
231
232type 'conn multi_config = {
233 servers : string list;
234 max_size_per_server : int;
235 connect : string -> ('conn, string) result;
236 close : 'conn -> unit;
237 validate : ('conn -> bool) option;
238}
239
240type 'conn multi_t = {
241 pools : 'conn t array;
242 pool_conninfos : string array;
243 conn_to_pool : ('conn, int) Stdlib.Hashtbl.t;
244 healthy : bool Loc.t array;
245 next_index : int Loc.t;
246 closed : bool Loc.t;
247}
248
249let create_multi config =
250 if config.servers = [] then
251 invalid_arg "Pool.create_multi: servers list cannot be empty";
252 let n = List.length config.servers in
253 let pools =
254 Array.of_list
255 (List.map
256 (fun conninfo ->
257 let pool_config =
258 {
259 max_size = config.max_size_per_server;
260 connect = (fun () -> config.connect conninfo);
261 close = config.close;
262 validate = config.validate;
263 }
264 in
265 create pool_config)
266 config.servers)
267 in
268 let healthy = Array.init n (fun _ -> Loc.make true) in
269 {
270 pools;
271 pool_conninfos = Array.of_list config.servers;
272 conn_to_pool = Stdlib.Hashtbl.create (n * config.max_size_per_server);
273 healthy;
274 next_index = Loc.make 0;
275 closed = Loc.make false;
276 }
277
278let multi_size multi =
279 Array.fold_left (fun acc pool -> acc + size pool) 0 multi.pools
280
281let multi_in_use multi =
282 Array.fold_left (fun acc pool -> acc + in_use pool) 0 multi.pools
283
284let multi_available multi =
285 Array.fold_left (fun acc pool -> acc + available pool) 0 multi.pools
286
287let multi_is_closed multi = Loc.get multi.closed
288let multi_server_count multi = Array.length multi.pools
289
290let multi_is_healthy multi idx =
291 if idx < 0 || idx >= Array.length multi.healthy then false
292 else Loc.get multi.healthy.(idx)
293
294let mark_unhealthy multi idx =
295 if idx >= 0 && idx < Array.length multi.healthy then
296 Loc.set multi.healthy.(idx) false
297
298let mark_healthy multi idx =
299 if idx >= 0 && idx < Array.length multi.healthy then
300 Loc.set multi.healthy.(idx) true
301
302let count_healthy multi =
303 Array.fold_left
304 (fun acc h -> if Loc.get h then acc + 1 else acc)
305 0 multi.healthy
306
307(** Find next healthy server using round-robin *)
308let next_healthy_index multi =
309 let n = Array.length multi.pools in
310 let start = Loc.fetch_and_add multi.next_index 1 mod n in
311 let rec find_healthy attempts idx =
312 if attempts >= n then None
313 else if Loc.get multi.healthy.(idx) then Some idx
314 else find_healthy (attempts + 1) ((idx + 1) mod n)
315 in
316 find_healthy 0 start
317
318let acquire_multi multi =
319 if Loc.get multi.closed then Error Pool_closed
320 else
321 match next_healthy_index multi with
322 | None -> Error Pool_empty (* All servers unhealthy *)
323 | Some idx -> (
324 match acquire multi.pools.(idx) with
325 | Ok conn ->
326 Stdlib.Hashtbl.replace multi.conn_to_pool conn idx;
327 Ok conn
328 | Error Pool_empty ->
329 (* This server's pool is exhausted, try others *)
330 let n = Array.length multi.pools in
331 let rec try_others attempts current_idx =
332 if attempts >= n then Error Pool_empty
333 else if current_idx <> idx && Loc.get multi.healthy.(current_idx)
334 then
335 match acquire multi.pools.(current_idx) with
336 | Ok conn ->
337 Stdlib.Hashtbl.replace multi.conn_to_pool conn current_idx;
338 Ok conn
339 | Error Pool_empty ->
340 try_others (attempts + 1) ((current_idx + 1) mod n)
341 | Error e -> Error e
342 else try_others (attempts + 1) ((current_idx + 1) mod n)
343 in
344 try_others 1 ((idx + 1) mod n)
345 | Error e -> Error e)
346
347let release_multi multi conn =
348 match Stdlib.Hashtbl.find_opt multi.conn_to_pool conn with
349 | Some idx ->
350 Stdlib.Hashtbl.remove multi.conn_to_pool conn;
351 if Loc.get multi.closed then multi.pools.(idx).config.close conn
352 else release multi.pools.(idx) conn
353 | None ->
354 (* Connection not tracked, close it directly using first pool's config *)
355 if Array.length multi.pools > 0 then multi.pools.(0).config.close conn
356
357let with_connection_multi multi f =
358 match acquire_multi multi with
359 | Error e -> Error e
360 | Ok conn -> (
361 match f conn with
362 | result ->
363 release_multi multi conn;
364 Ok result
365 | exception exn ->
366 release_multi multi conn;
367 raise exn)
368
369let rec acquire_multi_blocking ?timeoutf multi =
370 if Loc.get multi.closed then Error Pool_closed
371 else
372 match acquire_multi multi with
373 | Ok conn -> Ok conn
374 | Error Pool_empty -> (
375 (* All pools exhausted, wait on a healthy one *)
376 match next_healthy_index multi with
377 | None -> Error Pool_empty
378 | Some idx -> (
379 match acquire_blocking ?timeoutf multi.pools.(idx) with
380 | Ok conn ->
381 Stdlib.Hashtbl.replace multi.conn_to_pool conn idx;
382 Ok conn
383 | Error Pool_timeout -> Error Pool_timeout
384 | Error Pool_closed -> Error Pool_closed
385 | Error Pool_empty -> acquire_multi_blocking ?timeoutf multi
386 | Error e -> Error e))
387 | Error e -> Error e
388
389let with_connection_multi_blocking ?timeoutf multi f =
390 match acquire_multi_blocking ?timeoutf multi with
391 | Error e -> Error e
392 | Ok conn -> (
393 match f conn with
394 | result ->
395 release_multi multi conn;
396 Ok result
397 | exception exn ->
398 release_multi multi conn;
399 raise exn)
400
401let drain_multi multi = Array.iter drain multi.pools
402
403let shutdown_multi multi =
404 Loc.set multi.closed true;
405 Array.iter shutdown multi.pools;
406 Stdlib.Hashtbl.clear multi.conn_to_pool
407
408type multi_stats = {
409 total_servers : int;
410 healthy_servers : int;
411 per_server : (string * stats) list;
412 aggregate : stats;
413}
414
415let stats_multi multi =
416 let per_server =
417 Array.to_list
418 (Array.mapi
419 (fun i pool -> (multi.pool_conninfos.(i), stats pool))
420 multi.pools)
421 in
422 let aggregate =
423 {
424 total = multi_size multi;
425 available = multi_available multi;
426 in_use = multi_in_use multi;
427 closed = multi_is_closed multi;
428 }
429 in
430 {
431 total_servers = Array.length multi.pools;
432 healthy_servers = count_healthy multi;
433 per_server;
434 aggregate;
435 }
436
437module Make (D : Driver.S) = struct
438 type nonrec t = D.connection t
439 type conn = D.connection
440
441 let create ~max_size ~conninfo ?validate () =
442 let config =
443 {
444 max_size;
445 connect =
446 (fun () -> D.connect conninfo |> Result.map_error D.error_message);
447 close = D.close;
448 validate;
449 }
450 in
451 create config
452
453 let acquire = acquire
454 let acquire_blocking = acquire_blocking
455 let release = release
456 let with_connection = with_connection
457 let with_connection_blocking = with_connection_blocking
458 let drain = drain
459 let shutdown = shutdown
460 let stats = stats
461 let size = size
462 let available = available
463 let in_use = in_use
464 let is_closed = is_closed
465
466 module Multi = struct
467 type t = D.connection multi_t
468
469 let create ~servers ?validate () =
470 let cfg =
471 {
472 servers;
473 max_size_per_server = 10;
474 connect =
475 (fun conninfo ->
476 D.connect conninfo |> Result.map_error D.error_message);
477 close = D.close;
478 validate;
479 }
480 in
481 create_multi cfg
482
483 let create_sized ~servers ~max_size_per_server ?validate () =
484 let cfg =
485 {
486 servers;
487 max_size_per_server;
488 connect =
489 (fun conninfo ->
490 D.connect conninfo |> Result.map_error D.error_message);
491 close = D.close;
492 validate;
493 }
494 in
495 create_multi cfg
496
497 let acquire = acquire_multi
498 let acquire_blocking = acquire_multi_blocking
499 let release = release_multi
500 let with_connection = with_connection_multi
501 let with_connection_blocking = with_connection_multi_blocking
502 let drain = drain_multi
503 let shutdown = shutdown_multi
504 let stats = stats_multi
505 let size = multi_size
506 let available = multi_available
507 let in_use = multi_in_use
508 let is_closed = multi_is_closed
509 let server_count = multi_server_count
510 let is_healthy = multi_is_healthy
511 let mark_healthy = mark_healthy
512 let mark_unhealthy = mark_unhealthy
513 end
514end