Connection Pool#
repodb provides a scheduler-agnostic connection pool using lock-free data structures. It works with any OCaml concurrency library (Eio, Lwt, direct-style) without modifications.
Features#
- Lock-free: Uses kcas for thread-safe operations without locks
- Scheduler-agnostic: Works with Eio, Lwt, or direct-style OCaml
- Connection validation: Optional health checks before reuse
- Graceful shutdown: Clean connection cleanup
- Statistics: Monitor pool health and usage
Setup#
Using the Driver Functor#
The simplest way to create a pool is using the driver-specific functor:
open Repodb
(* For SQLite *)
module Pool = Pool.Make(Repodb_sqlite)
(* For PostgreSQL *)
module Pool = Pool.Make(Repodb_postgresql)
let pool = Pool.create
~max_size:10
~conninfo:"host=localhost dbname=myapp"
()
Manual Configuration#
For more control, use the generic pool with custom configuration:
let config = Pool.{
max_size = 10;
connect = (fun () -> Repodb_postgresql.connect conninfo);
close = Repodb_postgresql.close;
validate = Some (fun conn ->
(* Return true if connection is healthy *)
match Repodb_postgresql.exec conn "SELECT 1" ~params:[] with
| Ok _ -> true
| Error _ -> false
);
}
let pool = Pool.create config
Basic Usage#
Acquire and Release#
match Pool.acquire pool with
| Ok conn ->
(* Use connection *)
let result = Repo.all conn ~table:users_table ~decode:decode_user in
Pool.release pool conn;
result
| Error Pool.Pool_empty ->
(* No connections available and at max capacity *)
Error (Error.Connection_error "Pool exhausted")
| Error Pool.Pool_closed ->
Error (Error.Connection_error "Pool is closed")
| Error (Pool.Connection_error msg) ->
Error (Error.Connection_error msg)
with_connection (Recommended)#
Automatically releases the connection, even on exceptions:
let get_all_users pool =
match Pool.with_connection pool (fun conn ->
Repo.all conn ~table:users_table ~decode:decode_user
) with
| Ok (Ok users) -> Ok users
| Ok (Error e) -> Error e
| Error Pool.Pool_empty -> Error (Error.Connection_error "Pool exhausted")
| Error e -> Error (Error.Connection_error (Pool.error_to_string e))
Blocking Acquire#
Wait for a connection when the pool is exhausted:
(* Wait indefinitely *)
match Pool.acquire_blocking pool with
| Ok conn -> ...
| Error e -> ...
(* Wait with timeout (5 seconds) *)
match Pool.acquire_blocking ~timeoutf:5.0 pool with
| Ok conn -> ...
| Error Pool.Pool_timeout -> (* Timed out waiting *)
| Error e -> ...
(* with_connection variant *)
match Pool.with_connection_blocking ~timeoutf:5.0 pool (fun conn ->
Repo.all conn ~table:users_table ~decode:decode_user
) with
| Ok result -> result
| Error Pool.Pool_timeout -> Error (Error.Connection_error "Timeout")
| Error e -> Error (Error.Connection_error (Pool.error_to_string e))
Concurrency Examples#
With Eio#
let run_with_eio () =
Eio_main.run @@ fun env ->
let pool = Pool.create
~max_size:10
~conninfo:"mydb.sqlite3"
()
in
(* Run concurrent fibers *)
Eio.Fiber.all [
(fun () ->
match Pool.with_connection pool (fun conn ->
Repo.all conn ~table:users_table ~decode:decode_user
) with
| Ok (Ok users) -> process_users users
| _ -> ());
(fun () ->
match Pool.with_connection pool (fun conn ->
Repo.all conn ~table:posts_table ~decode:decode_post
) with
| Ok (Ok posts) -> process_posts posts
| _ -> ());
];
Pool.shutdown pool
With Lwt#
let run_with_lwt () =
let pool = Pool.create
~max_size:10
~conninfo:"mydb.sqlite3"
()
in
(* Run concurrent promises *)
let%lwt () = Lwt.join [
(Lwt_preemptive.detach (fun () ->
match Pool.with_connection pool (fun conn ->
Repo.all conn ~table:users_table ~decode:decode_user
) with
| Ok (Ok users) -> process_users users
| _ -> ()
) ());
(Lwt_preemptive.detach (fun () ->
match Pool.with_connection pool (fun conn ->
Repo.all conn ~table:posts_table ~decode:decode_post
) with
| Ok (Ok posts) -> process_posts posts
| _ -> ()
) ());
] in
Pool.shutdown pool;
Lwt.return ()
Multi-Domain Parallelism#
The pool is safe to use across multiple domains:
let run_parallel () =
let pool = Pool.create
~max_size:20
~conninfo:"host=localhost dbname=myapp"
()
in
let domains = Array.init 4 (fun i ->
Domain.spawn (fun () ->
for _ = 1 to 100 do
match Pool.with_connection pool (fun conn ->
Repo.get conn ~table:users_table ~id:(Random.int 1000) ~decode:decode_user
) with
| Ok _ -> ()
| Error _ -> ()
done
)
) in
Array.iter Domain.join domains;
Pool.shutdown pool
Pool Statistics#
Monitor pool health:
let stats = Pool.stats pool in
Printf.printf "Total connections: %d\n" stats.total;
Printf.printf "In use: %d\n" stats.in_use;
Printf.printf "Available: %d\n" stats.available;
Printf.printf "Closed: %b\n" stats.closed;
(* Individual accessors *)
let total = Pool.size pool in
let in_use = Pool.in_use pool in
let available = Pool.available pool in
let closed = Pool.is_closed pool in
Connection Validation#
Validate connections before reuse to handle stale connections:
let config = Pool.{
max_size = 10;
connect = (fun () -> Repodb_postgresql.connect conninfo);
close = Repodb_postgresql.close;
validate = Some (fun conn ->
(* Simple ping *)
match Repodb_postgresql.exec conn "SELECT 1" ~params:[] with
| Ok _ -> true
| Error _ -> false
);
}
When validation fails, the pool automatically:
- Closes the invalid connection
- Decrements the total count
- Tries the next available connection (or creates a new one)
Lifecycle Management#
Drain#
Remove all available connections without closing the pool:
(* Remove idle connections to free resources *)
Pool.drain pool
Shutdown#
Gracefully close the pool and all connections:
(* After shutdown, all acquire calls return Pool_closed *)
Pool.shutdown pool
Note: Connections currently in use are closed when released back to the pool.
Error Handling#
type pool_error =
| Pool_empty (* No connections available, at max capacity *)
| Pool_closed (* Pool has been shut down *)
| Pool_timeout (* Timed out waiting for connection *)
| Connection_error of string (* Failed to create new connection *)
(* Convert to string *)
let msg = Pool.error_to_string error
Best Practices#
1. Size Your Pool Appropriately#
(* Rule of thumb: connections = (cores * 2) + spindle count *)
(* For SSDs, start with cores * 2 to cores * 4 *)
let max_size = Domain.recommended_domain_count () * 2
2. Always Use with_connection#
(* Good: connection always released *)
Pool.with_connection pool (fun conn -> ...)
(* Risky: must remember to release *)
let conn = Pool.acquire pool in
(* ... what if an exception is raised? *)
Pool.release pool conn
3. Handle Pool Exhaustion#
let with_retry ~max_retries pool f =
let rec loop n =
match Pool.with_connection pool f with
| Ok result -> Ok result
| Error Pool.Pool_empty when n < max_retries ->
Unix.sleepf 0.1; (* Back off *)
loop (n + 1)
| Error e -> Error e
in
loop 0
4. Monitor Pool Health#
let check_pool_health pool =
let stats = Pool.stats pool in
if stats.in_use > stats.total * 9 / 10 then
log_warning "Pool nearly exhausted: %d/%d in use" stats.in_use stats.total;
if stats.available = 0 && stats.in_use = stats.total then
log_error "Pool exhausted!"
5. Use Validation for Long-Running Apps#
(* Connections can go stale after network issues, server restarts, etc. *)
let validate conn =
match Repo.exec conn "SELECT 1" ~params:[] with
| Ok _ -> true
| Error _ -> false
let config = Pool.{ ...; validate = Some validate }
SQLite Considerations#
When using SQLite with concurrent access:
(* Enable WAL mode for better concurrency *)
let connect () =
match Repodb_sqlite.connect "mydb.sqlite3" with
| Error e -> Error e
| Ok conn ->
(* WAL mode allows concurrent readers *)
ignore (Repodb_sqlite.exec conn "PRAGMA journal_mode=WAL" ~params:[]);
(* Set busy timeout to wait instead of failing immediately *)
ignore (Repodb_sqlite.exec conn "PRAGMA busy_timeout=5000" ~params:[]);
Ok conn
Multi-Server Pool#
For load balancing across multiple database servers without an external proxy, use Pool.Make(Driver).Multi:
Setup#
module Pool = Pool.Make(Repodb_postgresql)
let multi = Pool.Multi.create_sized
~servers:[
"host=server1 dbname=myapp";
"host=server2 dbname=myapp";
"host=server3 dbname=myapp";
]
~max_size_per_server:5
()
Usage#
(* Connections distributed via round-robin *)
let result = Pool.Multi.with_connection multi (fun conn ->
Repo.all conn ~table:users ~decode
)
Health Management#
(* Mark a server as unhealthy (skipped in round-robin) *)
Pool.Multi.mark_unhealthy multi 1;
(* Mark healthy again *)
Pool.Multi.mark_healthy multi 0;
Statistics#
let stats = Pool.Multi.stats multi in
Printf.printf "Total servers: %d\n" stats.total_servers;
Printf.printf "Healthy servers: %d\n" stats.healthy_servers;
Printf.printf "Total connections: %d\n" stats.aggregate.total;
Shutdown#
Pool.Multi.shutdown multi
For read/write splitting with replicas, see CQRS.
Next Steps#
- CQRS - Read/write splitting with replicas
- Repo - Database operations
- Transactions - Multi and transaction handling
- Queries - Building complex queries