CQRS (Command Query Responsibility Segregation)#
repodb supports CQRS through transparent read/write splitting. Reads are automatically routed to replica servers while writes go to the primary, all without requiring application code changes.
When to Use CQRS#
CQRS is beneficial when:
- Read traffic significantly exceeds write traffic
- You want to scale reads horizontally with replicas
- You want to avoid database load balancers like pgbouncer
- Your application can tolerate slight replica lag for reads
Quick Start#
open Repodb
module Cqrs = Cqrs.Make(Repodb_postgresql)
module Repo = Repo.Make(Repodb_postgresql)
let cqrs = Cqrs.create {
primary_conninfo = "host=primary dbname=myapp user=app";
primary_max_size = 10;
replica_conninfos = [
"host=replica1 dbname=myapp user=app";
"host=replica2 dbname=myapp user=app";
];
replica_max_size_each = 5;
replica_selection = RoundRobin;
validate = None;
}
(* Reads automatically go to replicas *)
let users = Cqrs.with_read cqrs (fun conn ->
Repo.all conn ~table:users_table ~decode:decode_user
)
(* Writes always go to primary *)
let () = Cqrs.with_write cqrs (fun conn ->
Repo.insert conn ~table:users_table ~columns ~values
)
Read Operations#
Use with_read for operations that only query data:
(* Routed to a replica *)
let result = Cqrs.with_read cqrs (fun conn ->
Repo.all_query conn query ~decode
)
When replicas are configured, reads are distributed using the configured selection strategy. If all replicas are unhealthy, reads automatically fall back to the primary.
Forcing Read from Primary#
For read-after-write consistency, use with_primary:
(* Insert a user *)
let user_id = Cqrs.with_write cqrs (fun conn ->
Repo.insert_returning conn ~table ~columns ~values ~decode
)
(* Read immediately after write - use primary to avoid replica lag *)
let user = Cqrs.with_primary cqrs (fun conn ->
Repo.get conn ~table ~id:user_id ~decode
)
Write Operations#
Writes always go to the primary server:
let () = Cqrs.with_write cqrs (fun conn ->
Repo.update conn ~table ~columns ~values ~where_column ~where_value
)
Transactions#
Transactions are always executed on the primary with a pinned connection:
let result = Cqrs.transaction cqrs (fun conn ->
(* All operations use the same primary connection *)
match Repo.insert conn ~table:orders ~columns ~values with
| Ok () ->
Repo.update conn ~table:inventory ~columns:["stock"]
~values:[Value.int (stock - 1)] ~where_column:"id" ~where_value:(Value.int item_id)
| Error e -> Error e
)
Replica Selection Strategies#
RoundRobin (Default)#
Distributes requests evenly across healthy replicas:
replica_selection = RoundRobin;
Random#
Selects a random healthy replica for each request:
replica_selection = Random;
LeastConnections#
Routes to the replica with the fewest active connections:
replica_selection = LeastConnections;
Health Management#
Mark replicas as unhealthy when they fail:
(* Mark replica 0 as unhealthy *)
Cqrs.mark_replica_unhealthy cqrs 0;
(* Later, mark it healthy again *)
Cqrs.mark_replica_healthy cqrs 0;
When all replicas are marked unhealthy, reads automatically fall back to the primary.
Configuration Reference#
type 'conn config = {
primary_conninfo : string; (* Primary server connection string *)
primary_max_size : int; (* Max connections to primary *)
replica_conninfos : string list; (* Replica connection strings, empty = no CQRS *)
replica_max_size_each : int; (* Max connections per replica *)
replica_selection : replica_selection; (* RoundRobin | Random | LeastConnections *)
validate : ('conn -> bool) option; (* Optional connection validator *)
}
Monitoring#
Check CQRS status with stats:
let s = Cqrs.stats cqrs in
Printf.printf "Primary: total=%d in_use=%d available=%d\n"
s.primary.total s.primary.in_use s.primary.available;
Printf.printf "Healthy replicas: %d\n" s.healthy_replica_count;
Disabling CQRS#
To disable CQRS, pass an empty replica list:
let cqrs = Cqrs.create {
primary_conninfo = "host=localhost dbname=myapp";
primary_max_size = 10;
replica_conninfos = []; (* No replicas = all traffic goes to primary *)
replica_max_size_each = 0;
replica_selection = RoundRobin;
validate = None;
}
Shutdown#
Always shut down the CQRS instance when done:
Cqrs.shutdown cqrs
Best Practices#
- Size pools appropriately: Primary pool should handle write load; replica pools handle read load
- Handle replica lag: Use
with_primaryfor reads that need immediate consistency - Monitor health: Track unhealthy replica counts and investigate failures
- Connection validation: Use
validateto detect stale connections
Next Steps#
- Connection Pool - Lower-level pool management and MultiPool
- Repo - Database operations
- Transactions - Transaction handling