a database layer insipred by caqti and ecto

Connection Pool#

repodb provides a scheduler-agnostic connection pool using lock-free data structures. It works with any OCaml concurrency library (Eio, Lwt, direct-style) without modifications.

Features#

  • Lock-free: Uses kcas for thread-safe operations without locks
  • Scheduler-agnostic: Works with Eio, Lwt, or direct-style OCaml
  • Connection validation: Optional health checks before reuse
  • Graceful shutdown: Clean connection cleanup
  • Statistics: Monitor pool health and usage

Setup#

Using the Driver Functor#

The simplest way to create a pool is using the driver-specific functor:

open Repodb

(* For SQLite *)
module Pool = Pool.Make(Repodb_sqlite)

(* For PostgreSQL *)
module Pool = Pool.Make(Repodb_postgresql)

let pool = Pool.create
  ~max_size:10
  ~conninfo:"host=localhost dbname=myapp"
  ()

Manual Configuration#

For more control, use the generic pool with custom configuration:

let config = Pool.{
  max_size = 10;
  connect = (fun () -> Repodb_postgresql.connect conninfo);
  close = Repodb_postgresql.close;
  validate = Some (fun conn ->
    (* Return true if connection is healthy *)
    match Repodb_postgresql.exec conn "SELECT 1" ~params:[] with
    | Ok _ -> true
    | Error _ -> false
  );
}

let pool = Pool.create config

Basic Usage#

Acquire and Release#

match Pool.acquire pool with
| Ok conn ->
    (* Use connection *)
    let result = Repo.all conn ~table:users_table ~decode:decode_user in
    Pool.release pool conn;
    result
| Error Pool.Pool_empty ->
    (* No connections available and at max capacity *)
    Error (Error.Connection_error "Pool exhausted")
| Error Pool.Pool_closed ->
    Error (Error.Connection_error "Pool is closed")
| Error (Pool.Connection_error msg) ->
    Error (Error.Connection_error msg)

Automatically releases the connection, even on exceptions:

let get_all_users pool =
  match Pool.with_connection pool (fun conn ->
    Repo.all conn ~table:users_table ~decode:decode_user
  ) with
  | Ok (Ok users) -> Ok users
  | Ok (Error e) -> Error e
  | Error Pool.Pool_empty -> Error (Error.Connection_error "Pool exhausted")
  | Error e -> Error (Error.Connection_error (Pool.error_to_string e))

Blocking Acquire#

Wait for a connection when the pool is exhausted:

(* Wait indefinitely *)
match Pool.acquire_blocking pool with
| Ok conn -> ...
| Error e -> ...

(* Wait with timeout (5 seconds) *)
match Pool.acquire_blocking ~timeoutf:5.0 pool with
| Ok conn -> ...
| Error Pool.Pool_timeout -> (* Timed out waiting *)
| Error e -> ...

(* with_connection variant *)
match Pool.with_connection_blocking ~timeoutf:5.0 pool (fun conn ->
  Repo.all conn ~table:users_table ~decode:decode_user
) with
| Ok result -> result
| Error Pool.Pool_timeout -> Error (Error.Connection_error "Timeout")
| Error e -> Error (Error.Connection_error (Pool.error_to_string e))

Concurrency Examples#

With Eio#

let run_with_eio () =
  Eio_main.run @@ fun env ->
  let pool = Pool.create
    ~max_size:10
    ~conninfo:"mydb.sqlite3"
    ()
  in
  
  (* Run concurrent fibers *)
  Eio.Fiber.all [
    (fun () ->
      match Pool.with_connection pool (fun conn ->
        Repo.all conn ~table:users_table ~decode:decode_user
      ) with
      | Ok (Ok users) -> process_users users
      | _ -> ());
    (fun () ->
      match Pool.with_connection pool (fun conn ->
        Repo.all conn ~table:posts_table ~decode:decode_post
      ) with
      | Ok (Ok posts) -> process_posts posts
      | _ -> ());
  ];
  
  Pool.shutdown pool

With Lwt#

let run_with_lwt () =
  let pool = Pool.create
    ~max_size:10
    ~conninfo:"mydb.sqlite3"
    ()
  in
  
  (* Run concurrent promises *)
  let%lwt () = Lwt.join [
    (Lwt_preemptive.detach (fun () ->
      match Pool.with_connection pool (fun conn ->
        Repo.all conn ~table:users_table ~decode:decode_user
      ) with
      | Ok (Ok users) -> process_users users
      | _ -> ()
    ) ());
    (Lwt_preemptive.detach (fun () ->
      match Pool.with_connection pool (fun conn ->
        Repo.all conn ~table:posts_table ~decode:decode_post
      ) with
      | Ok (Ok posts) -> process_posts posts
      | _ -> ()
    ) ());
  ] in
  
  Pool.shutdown pool;
  Lwt.return ()

Multi-Domain Parallelism#

The pool is safe to use across multiple domains:

let run_parallel () =
  let pool = Pool.create
    ~max_size:20
    ~conninfo:"host=localhost dbname=myapp"
    ()
  in
  
  let domains = Array.init 4 (fun i ->
    Domain.spawn (fun () ->
      for _ = 1 to 100 do
        match Pool.with_connection pool (fun conn ->
          Repo.get conn ~table:users_table ~id:(Random.int 1000) ~decode:decode_user
        ) with
        | Ok _ -> ()
        | Error _ -> ()
      done
    )
  ) in
  
  Array.iter Domain.join domains;
  Pool.shutdown pool

Pool Statistics#

Monitor pool health:

let stats = Pool.stats pool in
Printf.printf "Total connections: %d\n" stats.total;
Printf.printf "In use: %d\n" stats.in_use;
Printf.printf "Available: %d\n" stats.available;
Printf.printf "Closed: %b\n" stats.closed;

(* Individual accessors *)
let total = Pool.size pool in
let in_use = Pool.in_use pool in
let available = Pool.available pool in
let closed = Pool.is_closed pool in

Connection Validation#

Validate connections before reuse to handle stale connections:

let config = Pool.{
  max_size = 10;
  connect = (fun () -> Repodb_postgresql.connect conninfo);
  close = Repodb_postgresql.close;
  validate = Some (fun conn ->
    (* Simple ping *)
    match Repodb_postgresql.exec conn "SELECT 1" ~params:[] with
    | Ok _ -> true
    | Error _ -> false
  );
}

When validation fails, the pool automatically:

  1. Closes the invalid connection
  2. Decrements the total count
  3. Tries the next available connection (or creates a new one)

Lifecycle Management#

Drain#

Remove all available connections without closing the pool:

(* Remove idle connections to free resources *)
Pool.drain pool

Shutdown#

Gracefully close the pool and all connections:

(* After shutdown, all acquire calls return Pool_closed *)
Pool.shutdown pool

Note: Connections currently in use are closed when released back to the pool.

Error Handling#

type pool_error =
  | Pool_empty      (* No connections available, at max capacity *)
  | Pool_closed     (* Pool has been shut down *)
  | Pool_timeout    (* Timed out waiting for connection *)
  | Connection_error of string  (* Failed to create new connection *)

(* Convert to string *)
let msg = Pool.error_to_string error

Best Practices#

1. Size Your Pool Appropriately#

(* Rule of thumb: connections = (cores * 2) + spindle count *)
(* For SSDs, start with cores * 2 to cores * 4 *)
let max_size = Domain.recommended_domain_count () * 2

2. Always Use with_connection#

(* Good: connection always released *)
Pool.with_connection pool (fun conn -> ...)

(* Risky: must remember to release *)
let conn = Pool.acquire pool in
(* ... what if an exception is raised? *)
Pool.release pool conn

3. Handle Pool Exhaustion#

let with_retry ~max_retries pool f =
  let rec loop n =
    match Pool.with_connection pool f with
    | Ok result -> Ok result
    | Error Pool.Pool_empty when n < max_retries ->
        Unix.sleepf 0.1;  (* Back off *)
        loop (n + 1)
    | Error e -> Error e
  in
  loop 0

4. Monitor Pool Health#

let check_pool_health pool =
  let stats = Pool.stats pool in
  if stats.in_use > stats.total * 9 / 10 then
    log_warning "Pool nearly exhausted: %d/%d in use" stats.in_use stats.total;
  if stats.available = 0 && stats.in_use = stats.total then
    log_error "Pool exhausted!"

5. Use Validation for Long-Running Apps#

(* Connections can go stale after network issues, server restarts, etc. *)
let validate conn =
  match Repo.exec conn "SELECT 1" ~params:[] with
  | Ok _ -> true
  | Error _ -> false

let config = Pool.{ ...; validate = Some validate }

SQLite Considerations#

When using SQLite with concurrent access:

(* Enable WAL mode for better concurrency *)
let connect () =
  match Repodb_sqlite.connect "mydb.sqlite3" with
  | Error e -> Error e
  | Ok conn ->
      (* WAL mode allows concurrent readers *)
      ignore (Repodb_sqlite.exec conn "PRAGMA journal_mode=WAL" ~params:[]);
      (* Set busy timeout to wait instead of failing immediately *)
      ignore (Repodb_sqlite.exec conn "PRAGMA busy_timeout=5000" ~params:[]);
      Ok conn

Multi-Server Pool#

For load balancing across multiple database servers without an external proxy, use Pool.Make(Driver).Multi:

Setup#

module Pool = Pool.Make(Repodb_postgresql)

let multi = Pool.Multi.create_sized
  ~servers:[
    "host=server1 dbname=myapp";
    "host=server2 dbname=myapp";
    "host=server3 dbname=myapp";
  ]
  ~max_size_per_server:5
  ()

Usage#

(* Connections distributed via round-robin *)
let result = Pool.Multi.with_connection multi (fun conn ->
  Repo.all conn ~table:users ~decode
)

Health Management#

(* Mark a server as unhealthy (skipped in round-robin) *)
Pool.Multi.mark_unhealthy multi 1;

(* Mark healthy again *)
Pool.Multi.mark_healthy multi 0;

Statistics#

let stats = Pool.Multi.stats multi in
Printf.printf "Total servers: %d\n" stats.total_servers;
Printf.printf "Healthy servers: %d\n" stats.healthy_servers;
Printf.printf "Total connections: %d\n" stats.aggregate.total;

Shutdown#

Pool.Multi.shutdown multi

For read/write splitting with replicas, see CQRS.

Next Steps#

  • CQRS - Read/write splitting with replicas
  • Repo - Database operations
  • Transactions - Multi and transaction handling
  • Queries - Building complex queries