a database layer insipred by caqti and ecto

CQRS (Command Query Responsibility Segregation)#

repodb supports CQRS through transparent read/write splitting. Reads are automatically routed to replica servers while writes go to the primary, all without requiring application code changes.

When to Use CQRS#

CQRS is beneficial when:

  • Read traffic significantly exceeds write traffic
  • You want to scale reads horizontally with replicas
  • You want to avoid database load balancers like pgbouncer
  • Your application can tolerate slight replica lag for reads

Quick Start#

open Repodb

module Cqrs = Cqrs.Make(Repodb_postgresql)
module Repo = Repo.Make(Repodb_postgresql)

let cqrs = Cqrs.create {
  primary_conninfo = "host=primary dbname=myapp user=app";
  primary_max_size = 10;
  replica_conninfos = [
    "host=replica1 dbname=myapp user=app";
    "host=replica2 dbname=myapp user=app";
  ];
  replica_max_size_each = 5;
  replica_selection = RoundRobin;
  validate = None;
}

(* Reads automatically go to replicas *)
let users = Cqrs.with_read cqrs (fun conn ->
  Repo.all conn ~table:users_table ~decode:decode_user
)

(* Writes always go to primary *)
let () = Cqrs.with_write cqrs (fun conn ->
  Repo.insert conn ~table:users_table ~columns ~values
)

Read Operations#

Use with_read for operations that only query data:

(* Routed to a replica *)
let result = Cqrs.with_read cqrs (fun conn ->
  Repo.all_query conn query ~decode
)

When replicas are configured, reads are distributed using the configured selection strategy. If all replicas are unhealthy, reads automatically fall back to the primary.

Forcing Read from Primary#

For read-after-write consistency, use with_primary:

(* Insert a user *)
let user_id = Cqrs.with_write cqrs (fun conn ->
  Repo.insert_returning conn ~table ~columns ~values ~decode
)

(* Read immediately after write - use primary to avoid replica lag *)
let user = Cqrs.with_primary cqrs (fun conn ->
  Repo.get conn ~table ~id:user_id ~decode
)

Write Operations#

Writes always go to the primary server:

let () = Cqrs.with_write cqrs (fun conn ->
  Repo.update conn ~table ~columns ~values ~where_column ~where_value
)

Transactions#

Transactions are always executed on the primary with a pinned connection:

let result = Cqrs.transaction cqrs (fun conn ->
  (* All operations use the same primary connection *)
  match Repo.insert conn ~table:orders ~columns ~values with
  | Ok () ->
      Repo.update conn ~table:inventory ~columns:["stock"] 
        ~values:[Value.int (stock - 1)] ~where_column:"id" ~where_value:(Value.int item_id)
  | Error e -> Error e
)

Replica Selection Strategies#

RoundRobin (Default)#

Distributes requests evenly across healthy replicas:

replica_selection = RoundRobin;

Random#

Selects a random healthy replica for each request:

replica_selection = Random;

LeastConnections#

Routes to the replica with the fewest active connections:

replica_selection = LeastConnections;

Health Management#

Mark replicas as unhealthy when they fail:

(* Mark replica 0 as unhealthy *)
Cqrs.mark_replica_unhealthy cqrs 0;

(* Later, mark it healthy again *)
Cqrs.mark_replica_healthy cqrs 0;

When all replicas are marked unhealthy, reads automatically fall back to the primary.

Configuration Reference#

type 'conn config = {
  primary_conninfo : string;       (* Primary server connection string *)
  primary_max_size : int;          (* Max connections to primary *)
  replica_conninfos : string list; (* Replica connection strings, empty = no CQRS *)
  replica_max_size_each : int;     (* Max connections per replica *)
  replica_selection : replica_selection; (* RoundRobin | Random | LeastConnections *)
  validate : ('conn -> bool) option;     (* Optional connection validator *)
}

Monitoring#

Check CQRS status with stats:

let s = Cqrs.stats cqrs in
Printf.printf "Primary: total=%d in_use=%d available=%d\n"
  s.primary.total s.primary.in_use s.primary.available;
Printf.printf "Healthy replicas: %d\n" s.healthy_replica_count;

Disabling CQRS#

To disable CQRS, pass an empty replica list:

let cqrs = Cqrs.create {
  primary_conninfo = "host=localhost dbname=myapp";
  primary_max_size = 10;
  replica_conninfos = [];  (* No replicas = all traffic goes to primary *)
  replica_max_size_each = 0;
  replica_selection = RoundRobin;
  validate = None;
}

Shutdown#

Always shut down the CQRS instance when done:

Cqrs.shutdown cqrs

Best Practices#

  1. Size pools appropriately: Primary pool should handle write load; replica pools handle read load
  2. Handle replica lag: Use with_primary for reads that need immediate consistency
  3. Monitor health: Track unhealthy replica counts and investigate failures
  4. Connection validation: Use validate to detect stale connections

Next Steps#