# Connection Pool repodb provides a scheduler-agnostic connection pool using lock-free data structures. It works with any OCaml concurrency library (Eio, Lwt, direct-style) without modifications. ## Features - **Lock-free**: Uses kcas for thread-safe operations without locks - **Scheduler-agnostic**: Works with Eio, Lwt, or direct-style OCaml - **Connection validation**: Optional health checks before reuse - **Graceful shutdown**: Clean connection cleanup - **Statistics**: Monitor pool health and usage ## Setup ### Using the Driver Functor The simplest way to create a pool is using the driver-specific functor: ```ocaml open Repodb (* For SQLite *) module Pool = Pool.Make(Repodb_sqlite) (* For PostgreSQL *) module Pool = Pool.Make(Repodb_postgresql) let pool = Pool.create ~max_size:10 ~conninfo:"host=localhost dbname=myapp" () ``` ### Manual Configuration For more control, use the generic pool with custom configuration: ```ocaml let config = Pool.{ max_size = 10; connect = (fun () -> Repodb_postgresql.connect conninfo); close = Repodb_postgresql.close; validate = Some (fun conn -> (* Return true if connection is healthy *) match Repodb_postgresql.exec conn "SELECT 1" ~params:[] with | Ok _ -> true | Error _ -> false ); } let pool = Pool.create config ``` ## Basic Usage ### Acquire and Release ```ocaml match Pool.acquire pool with | Ok conn -> (* Use connection *) let result = Repo.all conn ~table:users_table ~decode:decode_user in Pool.release pool conn; result | Error Pool.Pool_empty -> (* No connections available and at max capacity *) Error (Error.Connection_error "Pool exhausted") | Error Pool.Pool_closed -> Error (Error.Connection_error "Pool is closed") | Error (Pool.Connection_error msg) -> Error (Error.Connection_error msg) ``` ### with_connection (Recommended) Automatically releases the connection, even on exceptions: ```ocaml let get_all_users pool = match Pool.with_connection pool (fun conn -> Repo.all conn ~table:users_table ~decode:decode_user ) with | Ok (Ok users) -> Ok users | Ok (Error e) -> Error e | Error Pool.Pool_empty -> Error (Error.Connection_error "Pool exhausted") | Error e -> Error (Error.Connection_error (Pool.error_to_string e)) ``` ### Blocking Acquire Wait for a connection when the pool is exhausted: ```ocaml (* Wait indefinitely *) match Pool.acquire_blocking pool with | Ok conn -> ... | Error e -> ... (* Wait with timeout (5 seconds) *) match Pool.acquire_blocking ~timeoutf:5.0 pool with | Ok conn -> ... | Error Pool.Pool_timeout -> (* Timed out waiting *) | Error e -> ... (* with_connection variant *) match Pool.with_connection_blocking ~timeoutf:5.0 pool (fun conn -> Repo.all conn ~table:users_table ~decode:decode_user ) with | Ok result -> result | Error Pool.Pool_timeout -> Error (Error.Connection_error "Timeout") | Error e -> Error (Error.Connection_error (Pool.error_to_string e)) ``` ## Concurrency Examples ### With Eio ```ocaml let run_with_eio () = Eio_main.run @@ fun env -> let pool = Pool.create ~max_size:10 ~conninfo:"mydb.sqlite3" () in (* Run concurrent fibers *) Eio.Fiber.all [ (fun () -> match Pool.with_connection pool (fun conn -> Repo.all conn ~table:users_table ~decode:decode_user ) with | Ok (Ok users) -> process_users users | _ -> ()); (fun () -> match Pool.with_connection pool (fun conn -> Repo.all conn ~table:posts_table ~decode:decode_post ) with | Ok (Ok posts) -> process_posts posts | _ -> ()); ]; Pool.shutdown pool ``` ### With Lwt ```ocaml let run_with_lwt () = let pool = Pool.create ~max_size:10 ~conninfo:"mydb.sqlite3" () in (* Run concurrent promises *) let%lwt () = Lwt.join [ (Lwt_preemptive.detach (fun () -> match Pool.with_connection pool (fun conn -> Repo.all conn ~table:users_table ~decode:decode_user ) with | Ok (Ok users) -> process_users users | _ -> () ) ()); (Lwt_preemptive.detach (fun () -> match Pool.with_connection pool (fun conn -> Repo.all conn ~table:posts_table ~decode:decode_post ) with | Ok (Ok posts) -> process_posts posts | _ -> () ) ()); ] in Pool.shutdown pool; Lwt.return () ``` ### Multi-Domain Parallelism The pool is safe to use across multiple domains: ```ocaml let run_parallel () = let pool = Pool.create ~max_size:20 ~conninfo:"host=localhost dbname=myapp" () in let domains = Array.init 4 (fun i -> Domain.spawn (fun () -> for _ = 1 to 100 do match Pool.with_connection pool (fun conn -> Repo.get conn ~table:users_table ~id:(Random.int 1000) ~decode:decode_user ) with | Ok _ -> () | Error _ -> () done ) ) in Array.iter Domain.join domains; Pool.shutdown pool ``` ## Pool Statistics Monitor pool health: ```ocaml let stats = Pool.stats pool in Printf.printf "Total connections: %d\n" stats.total; Printf.printf "In use: %d\n" stats.in_use; Printf.printf "Available: %d\n" stats.available; Printf.printf "Closed: %b\n" stats.closed; (* Individual accessors *) let total = Pool.size pool in let in_use = Pool.in_use pool in let available = Pool.available pool in let closed = Pool.is_closed pool in ``` ## Connection Validation Validate connections before reuse to handle stale connections: ```ocaml let config = Pool.{ max_size = 10; connect = (fun () -> Repodb_postgresql.connect conninfo); close = Repodb_postgresql.close; validate = Some (fun conn -> (* Simple ping *) match Repodb_postgresql.exec conn "SELECT 1" ~params:[] with | Ok _ -> true | Error _ -> false ); } ``` When validation fails, the pool automatically: 1. Closes the invalid connection 2. Decrements the total count 3. Tries the next available connection (or creates a new one) ## Lifecycle Management ### Drain Remove all available connections without closing the pool: ```ocaml (* Remove idle connections to free resources *) Pool.drain pool ``` ### Shutdown Gracefully close the pool and all connections: ```ocaml (* After shutdown, all acquire calls return Pool_closed *) Pool.shutdown pool ``` Note: Connections currently in use are closed when released back to the pool. ## Error Handling ```ocaml type pool_error = | Pool_empty (* No connections available, at max capacity *) | Pool_closed (* Pool has been shut down *) | Pool_timeout (* Timed out waiting for connection *) | Connection_error of string (* Failed to create new connection *) (* Convert to string *) let msg = Pool.error_to_string error ``` ## Best Practices ### 1. Size Your Pool Appropriately ```ocaml (* Rule of thumb: connections = (cores * 2) + spindle count *) (* For SSDs, start with cores * 2 to cores * 4 *) let max_size = Domain.recommended_domain_count () * 2 ``` ### 2. Always Use with_connection ```ocaml (* Good: connection always released *) Pool.with_connection pool (fun conn -> ...) (* Risky: must remember to release *) let conn = Pool.acquire pool in (* ... what if an exception is raised? *) Pool.release pool conn ``` ### 3. Handle Pool Exhaustion ```ocaml let with_retry ~max_retries pool f = let rec loop n = match Pool.with_connection pool f with | Ok result -> Ok result | Error Pool.Pool_empty when n < max_retries -> Unix.sleepf 0.1; (* Back off *) loop (n + 1) | Error e -> Error e in loop 0 ``` ### 4. Monitor Pool Health ```ocaml let check_pool_health pool = let stats = Pool.stats pool in if stats.in_use > stats.total * 9 / 10 then log_warning "Pool nearly exhausted: %d/%d in use" stats.in_use stats.total; if stats.available = 0 && stats.in_use = stats.total then log_error "Pool exhausted!" ``` ### 5. Use Validation for Long-Running Apps ```ocaml (* Connections can go stale after network issues, server restarts, etc. *) let validate conn = match Repo.exec conn "SELECT 1" ~params:[] with | Ok _ -> true | Error _ -> false let config = Pool.{ ...; validate = Some validate } ``` ## SQLite Considerations When using SQLite with concurrent access: ```ocaml (* Enable WAL mode for better concurrency *) let connect () = match Repodb_sqlite.connect "mydb.sqlite3" with | Error e -> Error e | Ok conn -> (* WAL mode allows concurrent readers *) ignore (Repodb_sqlite.exec conn "PRAGMA journal_mode=WAL" ~params:[]); (* Set busy timeout to wait instead of failing immediately *) ignore (Repodb_sqlite.exec conn "PRAGMA busy_timeout=5000" ~params:[]); Ok conn ``` ## Multi-Server Pool For load balancing across multiple database servers without an external proxy, use `Pool.Make(Driver).Multi`: ### Setup ```ocaml module Pool = Pool.Make(Repodb_postgresql) let multi = Pool.Multi.create_sized ~servers:[ "host=server1 dbname=myapp"; "host=server2 dbname=myapp"; "host=server3 dbname=myapp"; ] ~max_size_per_server:5 () ``` ### Usage ```ocaml (* Connections distributed via round-robin *) let result = Pool.Multi.with_connection multi (fun conn -> Repo.all conn ~table:users ~decode ) ``` ### Health Management ```ocaml (* Mark a server as unhealthy (skipped in round-robin) *) Pool.Multi.mark_unhealthy multi 1; (* Mark healthy again *) Pool.Multi.mark_healthy multi 0; ``` ### Statistics ```ocaml let stats = Pool.Multi.stats multi in Printf.printf "Total servers: %d\n" stats.total_servers; Printf.printf "Healthy servers: %d\n" stats.healthy_servers; Printf.printf "Total connections: %d\n" stats.aggregate.total; ``` ### Shutdown ```ocaml Pool.Multi.shutdown multi ``` For read/write splitting with replicas, see [CQRS](cqrs.md). ## Next Steps - [CQRS](cqrs.md) - Read/write splitting with replicas - [Repo](repo.md) - Database operations - [Transactions](transactions.md) - Multi and transaction handling - [Queries](queries.md) - Building complex queries