a database layer insipred by caqti and ecto
at main 15 kB view raw
1(** Lock-free connection pool using kcas for thread-safe operations. 2 3 Works with any concurrency library (Eio, Lwt, direct-style) because kcas 4 provides lock-free data structures independent of any specific scheduler. 5 6 {[ 7 let config = 8 Pool. 9 { 10 max_size = 10; 11 connect = (fun () -> Repodb_postgresql.connect conninfo); 12 close = Repodb_postgresql.close; 13 validate = None; 14 } 15 in 16 let pool = Pool.create config in 17 Pool.with_connection pool (fun conn -> Repo.all conn ~table:users ~decode) 18 ]} *) 19 20open Kcas 21open Kcas_data 22 23type pool_error = 24 | Pool_empty 25 | Pool_closed 26 | Pool_timeout 27 | Connection_error of string 28 29type 'conn config = { 30 max_size : int; 31 connect : unit -> ('conn, string) result; 32 close : 'conn -> unit; 33 validate : ('conn -> bool) option; 34} 35 36type 'conn pooled = { conn : 'conn; created_at : float } 37 38type 'conn t = { 39 config : 'conn config; 40 available : 'conn pooled Queue.t; 41 total_count : int Loc.t; 42 in_use_count : int Loc.t; 43 closed : bool Loc.t; 44} 45 46let create config = 47 { 48 config; 49 available = Queue.create (); 50 total_count = Loc.make 0; 51 in_use_count = Loc.make 0; 52 closed = Loc.make false; 53 } 54 55let size t = Loc.get t.total_count 56let in_use t = Loc.get t.in_use_count 57let available t = Queue.length t.available 58let is_closed t = Loc.get t.closed 59 60let validate_connection t pooled = 61 match t.config.validate with 62 | None -> true 63 | Some validate -> validate pooled.conn 64 65let close_pooled t pooled = 66 t.config.close pooled.conn; 67 Loc.decr t.total_count 68 69let try_create_connection t = 70 let tx ~xt = 71 if Xt.get ~xt t.closed then `Closed 72 else 73 let total = Xt.get ~xt t.total_count in 74 if total >= t.config.max_size then `At_capacity 75 else begin 76 Xt.set ~xt t.total_count (total + 1); 77 Xt.set ~xt t.in_use_count (Xt.get ~xt t.in_use_count + 1); 78 `Can_create 79 end 80 in 81 match Xt.commit { tx } with 82 | `Closed -> Error Pool_closed 83 | `At_capacity -> Error Pool_empty 84 | `Can_create -> ( 85 match t.config.connect () with 86 | Ok conn -> 87 let pooled = { conn; created_at = Unix.gettimeofday () } in 88 Ok pooled 89 | Error msg -> 90 Loc.decr t.total_count; 91 Loc.decr t.in_use_count; 92 Error (Connection_error msg)) 93 94let rec acquire_from_pool t = 95 if Loc.get t.closed then Error Pool_closed 96 else 97 match Queue.take_opt t.available with 98 | None -> Error Pool_empty 99 | Some pooled -> 100 if validate_connection t pooled then begin 101 Loc.incr t.in_use_count; 102 Ok pooled.conn 103 end 104 else begin 105 close_pooled t pooled; 106 acquire_from_pool t 107 end 108 109let acquire t = 110 match acquire_from_pool t with 111 | Ok conn -> Ok conn 112 | Error Pool_empty -> ( 113 match try_create_connection t with 114 | Ok pooled -> Ok pooled.conn 115 | Error e -> Error e) 116 | Error e -> Error e 117 118let rec acquire_blocking ?timeoutf t = 119 if Loc.get t.closed then Error Pool_closed 120 else 121 match acquire t with 122 | Ok conn -> Ok conn 123 | Error Pool_empty -> ( 124 match timeoutf with 125 | Some timeout -> ( 126 match Queue.take_blocking ~timeoutf:timeout t.available with 127 | exception Kcas.Timeout.Timeout -> Error Pool_timeout 128 | pooled -> 129 if validate_connection t pooled then begin 130 Loc.incr t.in_use_count; 131 Ok pooled.conn 132 end 133 else begin 134 close_pooled t pooled; 135 acquire_blocking ?timeoutf t 136 end) 137 | None -> 138 let pooled = Queue.take_blocking t.available in 139 if validate_connection t pooled then begin 140 Loc.incr t.in_use_count; 141 Ok pooled.conn 142 end 143 else begin 144 close_pooled t pooled; 145 acquire_blocking ?timeoutf t 146 end) 147 | Error e -> Error e 148 149let release t conn = 150 if Loc.get t.closed then t.config.close conn 151 else begin 152 Loc.decr t.in_use_count; 153 let pooled = { conn; created_at = Unix.gettimeofday () } in 154 Queue.add pooled t.available 155 end 156 157let with_connection t f = 158 match acquire t with 159 | Error e -> Error e 160 | Ok conn -> ( 161 match f conn with 162 | result -> 163 release t conn; 164 Ok result 165 | exception exn -> 166 release t conn; 167 raise exn) 168 169let with_connection_blocking ?timeoutf t f = 170 match acquire_blocking ?timeoutf t with 171 | Error e -> Error e 172 | Ok conn -> ( 173 match f conn with 174 | result -> 175 release t conn; 176 Ok result 177 | exception exn -> 178 release t conn; 179 raise exn) 180 181let drain t = 182 let rec loop () = 183 match Queue.take_opt t.available with 184 | None -> () 185 | Some pooled -> 186 close_pooled t pooled; 187 loop () 188 in 189 loop () 190 191let shutdown t = 192 Loc.set t.closed true; 193 drain t 194 195type stats = { total : int; available : int; in_use : int; closed : bool } 196 197let stats t = 198 { 199 total = size t; 200 available = available t; 201 in_use = in_use t; 202 closed = is_closed t; 203 } 204 205let error_to_string = function 206 | Pool_empty -> "Pool empty: no connections available" 207 | Pool_closed -> "Pool closed" 208 | Pool_timeout -> "Timeout waiting for connection" 209 | Connection_error msg -> Printf.sprintf "Connection error: %s" msg 210 211(** Multi-server pool with round-robin load balancing. 212 213 Distributes connections across multiple database servers without requiring 214 an external load balancer. Each server maintains its own connection pool. 215 216 {[ 217 let config = 218 Pool. 219 { 220 servers = [ "host1"; "host2"; "host3" ]; 221 max_size_per_server = 5; 222 connect = (fun conninfo -> Repodb_postgresql.connect conninfo); 223 close = Repodb_postgresql.close; 224 validate = None; 225 } 226 in 227 let multi = Pool.create_multi config in 228 Pool.with_connection_multi multi (fun conn -> 229 Repo.all conn ~table:users ~decode) 230 ]} *) 231 232type 'conn multi_config = { 233 servers : string list; 234 max_size_per_server : int; 235 connect : string -> ('conn, string) result; 236 close : 'conn -> unit; 237 validate : ('conn -> bool) option; 238} 239 240type 'conn multi_t = { 241 pools : 'conn t array; 242 pool_conninfos : string array; 243 conn_to_pool : ('conn, int) Stdlib.Hashtbl.t; 244 healthy : bool Loc.t array; 245 next_index : int Loc.t; 246 closed : bool Loc.t; 247} 248 249let create_multi config = 250 if config.servers = [] then 251 invalid_arg "Pool.create_multi: servers list cannot be empty"; 252 let n = List.length config.servers in 253 let pools = 254 Array.of_list 255 (List.map 256 (fun conninfo -> 257 let pool_config = 258 { 259 max_size = config.max_size_per_server; 260 connect = (fun () -> config.connect conninfo); 261 close = config.close; 262 validate = config.validate; 263 } 264 in 265 create pool_config) 266 config.servers) 267 in 268 let healthy = Array.init n (fun _ -> Loc.make true) in 269 { 270 pools; 271 pool_conninfos = Array.of_list config.servers; 272 conn_to_pool = Stdlib.Hashtbl.create (n * config.max_size_per_server); 273 healthy; 274 next_index = Loc.make 0; 275 closed = Loc.make false; 276 } 277 278let multi_size multi = 279 Array.fold_left (fun acc pool -> acc + size pool) 0 multi.pools 280 281let multi_in_use multi = 282 Array.fold_left (fun acc pool -> acc + in_use pool) 0 multi.pools 283 284let multi_available multi = 285 Array.fold_left (fun acc pool -> acc + available pool) 0 multi.pools 286 287let multi_is_closed multi = Loc.get multi.closed 288let multi_server_count multi = Array.length multi.pools 289 290let multi_is_healthy multi idx = 291 if idx < 0 || idx >= Array.length multi.healthy then false 292 else Loc.get multi.healthy.(idx) 293 294let mark_unhealthy multi idx = 295 if idx >= 0 && idx < Array.length multi.healthy then 296 Loc.set multi.healthy.(idx) false 297 298let mark_healthy multi idx = 299 if idx >= 0 && idx < Array.length multi.healthy then 300 Loc.set multi.healthy.(idx) true 301 302let count_healthy multi = 303 Array.fold_left 304 (fun acc h -> if Loc.get h then acc + 1 else acc) 305 0 multi.healthy 306 307(** Find next healthy server using round-robin *) 308let next_healthy_index multi = 309 let n = Array.length multi.pools in 310 let start = Loc.fetch_and_add multi.next_index 1 mod n in 311 let rec find_healthy attempts idx = 312 if attempts >= n then None 313 else if Loc.get multi.healthy.(idx) then Some idx 314 else find_healthy (attempts + 1) ((idx + 1) mod n) 315 in 316 find_healthy 0 start 317 318let acquire_multi multi = 319 if Loc.get multi.closed then Error Pool_closed 320 else 321 match next_healthy_index multi with 322 | None -> Error Pool_empty (* All servers unhealthy *) 323 | Some idx -> ( 324 match acquire multi.pools.(idx) with 325 | Ok conn -> 326 Stdlib.Hashtbl.replace multi.conn_to_pool conn idx; 327 Ok conn 328 | Error Pool_empty -> 329 (* This server's pool is exhausted, try others *) 330 let n = Array.length multi.pools in 331 let rec try_others attempts current_idx = 332 if attempts >= n then Error Pool_empty 333 else if current_idx <> idx && Loc.get multi.healthy.(current_idx) 334 then 335 match acquire multi.pools.(current_idx) with 336 | Ok conn -> 337 Stdlib.Hashtbl.replace multi.conn_to_pool conn current_idx; 338 Ok conn 339 | Error Pool_empty -> 340 try_others (attempts + 1) ((current_idx + 1) mod n) 341 | Error e -> Error e 342 else try_others (attempts + 1) ((current_idx + 1) mod n) 343 in 344 try_others 1 ((idx + 1) mod n) 345 | Error e -> Error e) 346 347let release_multi multi conn = 348 match Stdlib.Hashtbl.find_opt multi.conn_to_pool conn with 349 | Some idx -> 350 Stdlib.Hashtbl.remove multi.conn_to_pool conn; 351 if Loc.get multi.closed then multi.pools.(idx).config.close conn 352 else release multi.pools.(idx) conn 353 | None -> 354 (* Connection not tracked, close it directly using first pool's config *) 355 if Array.length multi.pools > 0 then multi.pools.(0).config.close conn 356 357let with_connection_multi multi f = 358 match acquire_multi multi with 359 | Error e -> Error e 360 | Ok conn -> ( 361 match f conn with 362 | result -> 363 release_multi multi conn; 364 Ok result 365 | exception exn -> 366 release_multi multi conn; 367 raise exn) 368 369let rec acquire_multi_blocking ?timeoutf multi = 370 if Loc.get multi.closed then Error Pool_closed 371 else 372 match acquire_multi multi with 373 | Ok conn -> Ok conn 374 | Error Pool_empty -> ( 375 (* All pools exhausted, wait on a healthy one *) 376 match next_healthy_index multi with 377 | None -> Error Pool_empty 378 | Some idx -> ( 379 match acquire_blocking ?timeoutf multi.pools.(idx) with 380 | Ok conn -> 381 Stdlib.Hashtbl.replace multi.conn_to_pool conn idx; 382 Ok conn 383 | Error Pool_timeout -> Error Pool_timeout 384 | Error Pool_closed -> Error Pool_closed 385 | Error Pool_empty -> acquire_multi_blocking ?timeoutf multi 386 | Error e -> Error e)) 387 | Error e -> Error e 388 389let with_connection_multi_blocking ?timeoutf multi f = 390 match acquire_multi_blocking ?timeoutf multi with 391 | Error e -> Error e 392 | Ok conn -> ( 393 match f conn with 394 | result -> 395 release_multi multi conn; 396 Ok result 397 | exception exn -> 398 release_multi multi conn; 399 raise exn) 400 401let drain_multi multi = Array.iter drain multi.pools 402 403let shutdown_multi multi = 404 Loc.set multi.closed true; 405 Array.iter shutdown multi.pools; 406 Stdlib.Hashtbl.clear multi.conn_to_pool 407 408type multi_stats = { 409 total_servers : int; 410 healthy_servers : int; 411 per_server : (string * stats) list; 412 aggregate : stats; 413} 414 415let stats_multi multi = 416 let per_server = 417 Array.to_list 418 (Array.mapi 419 (fun i pool -> (multi.pool_conninfos.(i), stats pool)) 420 multi.pools) 421 in 422 let aggregate = 423 { 424 total = multi_size multi; 425 available = multi_available multi; 426 in_use = multi_in_use multi; 427 closed = multi_is_closed multi; 428 } 429 in 430 { 431 total_servers = Array.length multi.pools; 432 healthy_servers = count_healthy multi; 433 per_server; 434 aggregate; 435 } 436 437module Make (D : Driver.S) = struct 438 type nonrec t = D.connection t 439 type conn = D.connection 440 441 let create ~max_size ~conninfo ?validate () = 442 let config = 443 { 444 max_size; 445 connect = 446 (fun () -> D.connect conninfo |> Result.map_error D.error_message); 447 close = D.close; 448 validate; 449 } 450 in 451 create config 452 453 let acquire = acquire 454 let acquire_blocking = acquire_blocking 455 let release = release 456 let with_connection = with_connection 457 let with_connection_blocking = with_connection_blocking 458 let drain = drain 459 let shutdown = shutdown 460 let stats = stats 461 let size = size 462 let available = available 463 let in_use = in_use 464 let is_closed = is_closed 465 466 module Multi = struct 467 type t = D.connection multi_t 468 469 let create ~servers ?validate () = 470 let cfg = 471 { 472 servers; 473 max_size_per_server = 10; 474 connect = 475 (fun conninfo -> 476 D.connect conninfo |> Result.map_error D.error_message); 477 close = D.close; 478 validate; 479 } 480 in 481 create_multi cfg 482 483 let create_sized ~servers ~max_size_per_server ?validate () = 484 let cfg = 485 { 486 servers; 487 max_size_per_server; 488 connect = 489 (fun conninfo -> 490 D.connect conninfo |> Result.map_error D.error_message); 491 close = D.close; 492 validate; 493 } 494 in 495 create_multi cfg 496 497 let acquire = acquire_multi 498 let acquire_blocking = acquire_multi_blocking 499 let release = release_multi 500 let with_connection = with_connection_multi 501 let with_connection_blocking = with_connection_multi_blocking 502 let drain = drain_multi 503 let shutdown = shutdown_multi 504 let stats = stats_multi 505 let size = multi_size 506 let available = multi_available 507 let in_use = multi_in_use 508 let is_closed = multi_is_closed 509 let server_count = multi_server_count 510 let is_healthy = multi_is_healthy 511 let mark_healthy = mark_healthy 512 let mark_unhealthy = mark_unhealthy 513 end 514end