a database layer insipred by caqti and ecto
1# CQRS (Command Query Responsibility Segregation) 2 3repodb supports CQRS through transparent read/write splitting. Reads are automatically routed to replica servers while writes go to the primary, all without requiring application code changes. 4 5## When to Use CQRS 6 7CQRS is beneficial when: 8- Read traffic significantly exceeds write traffic 9- You want to scale reads horizontally with replicas 10- You want to avoid database load balancers like pgbouncer 11- Your application can tolerate slight replica lag for reads 12 13## Quick Start 14 15```ocaml 16open Repodb 17 18module Cqrs = Cqrs.Make(Repodb_postgresql) 19module Repo = Repo.Make(Repodb_postgresql) 20 21let cqrs = Cqrs.create { 22 primary_conninfo = "host=primary dbname=myapp user=app"; 23 primary_max_size = 10; 24 replica_conninfos = [ 25 "host=replica1 dbname=myapp user=app"; 26 "host=replica2 dbname=myapp user=app"; 27 ]; 28 replica_max_size_each = 5; 29 replica_selection = RoundRobin; 30 validate = None; 31} 32 33(* Reads automatically go to replicas *) 34let users = Cqrs.with_read cqrs (fun conn -> 35 Repo.all conn ~table:users_table ~decode:decode_user 36) 37 38(* Writes always go to primary *) 39let () = Cqrs.with_write cqrs (fun conn -> 40 Repo.insert conn ~table:users_table ~columns ~values 41) 42``` 43 44## Read Operations 45 46Use `with_read` for operations that only query data: 47 48```ocaml 49(* Routed to a replica *) 50let result = Cqrs.with_read cqrs (fun conn -> 51 Repo.all_query conn query ~decode 52) 53``` 54 55When replicas are configured, reads are distributed using the configured selection strategy. If all replicas are unhealthy, reads automatically fall back to the primary. 56 57### Forcing Read from Primary 58 59For read-after-write consistency, use `with_primary`: 60 61```ocaml 62(* Insert a user *) 63let user_id = Cqrs.with_write cqrs (fun conn -> 64 Repo.insert_returning conn ~table ~columns ~values ~decode 65) 66 67(* Read immediately after write - use primary to avoid replica lag *) 68let user = Cqrs.with_primary cqrs (fun conn -> 69 Repo.get conn ~table ~id:user_id ~decode 70) 71``` 72 73## Write Operations 74 75Writes always go to the primary server: 76 77```ocaml 78let () = Cqrs.with_write cqrs (fun conn -> 79 Repo.update conn ~table ~columns ~values ~where_column ~where_value 80) 81``` 82 83## Transactions 84 85Transactions are always executed on the primary with a pinned connection: 86 87```ocaml 88let result = Cqrs.transaction cqrs (fun conn -> 89 (* All operations use the same primary connection *) 90 match Repo.insert conn ~table:orders ~columns ~values with 91 | Ok () -> 92 Repo.update conn ~table:inventory ~columns:["stock"] 93 ~values:[Value.int (stock - 1)] ~where_column:"id" ~where_value:(Value.int item_id) 94 | Error e -> Error e 95) 96``` 97 98## Replica Selection Strategies 99 100### RoundRobin (Default) 101 102Distributes requests evenly across healthy replicas: 103 104```ocaml 105replica_selection = RoundRobin; 106``` 107 108### Random 109 110Selects a random healthy replica for each request: 111 112```ocaml 113replica_selection = Random; 114``` 115 116### LeastConnections 117 118Routes to the replica with the fewest active connections: 119 120```ocaml 121replica_selection = LeastConnections; 122``` 123 124## Health Management 125 126Mark replicas as unhealthy when they fail: 127 128```ocaml 129(* Mark replica 0 as unhealthy *) 130Cqrs.mark_replica_unhealthy cqrs 0; 131 132(* Later, mark it healthy again *) 133Cqrs.mark_replica_healthy cqrs 0; 134``` 135 136When all replicas are marked unhealthy, reads automatically fall back to the primary. 137 138## Configuration Reference 139 140```ocaml 141type 'conn config = { 142 primary_conninfo : string; (* Primary server connection string *) 143 primary_max_size : int; (* Max connections to primary *) 144 replica_conninfos : string list; (* Replica connection strings, empty = no CQRS *) 145 replica_max_size_each : int; (* Max connections per replica *) 146 replica_selection : replica_selection; (* RoundRobin | Random | LeastConnections *) 147 validate : ('conn -> bool) option; (* Optional connection validator *) 148} 149``` 150 151## Monitoring 152 153Check CQRS status with `stats`: 154 155```ocaml 156let s = Cqrs.stats cqrs in 157Printf.printf "Primary: total=%d in_use=%d available=%d\n" 158 s.primary.total s.primary.in_use s.primary.available; 159Printf.printf "Healthy replicas: %d\n" s.healthy_replica_count; 160``` 161 162## Disabling CQRS 163 164To disable CQRS, pass an empty replica list: 165 166```ocaml 167let cqrs = Cqrs.create { 168 primary_conninfo = "host=localhost dbname=myapp"; 169 primary_max_size = 10; 170 replica_conninfos = []; (* No replicas = all traffic goes to primary *) 171 replica_max_size_each = 0; 172 replica_selection = RoundRobin; 173 validate = None; 174} 175``` 176 177## Shutdown 178 179Always shut down the CQRS instance when done: 180 181```ocaml 182Cqrs.shutdown cqrs 183``` 184 185## Best Practices 186 1871. **Size pools appropriately**: Primary pool should handle write load; replica pools handle read load 1882. **Handle replica lag**: Use `with_primary` for reads that need immediate consistency 1893. **Monitor health**: Track unhealthy replica counts and investigate failures 1904. **Connection validation**: Use `validate` to detect stale connections 191 192## Next Steps 193 194- [Connection Pool](pool.md) - Lower-level pool management and MultiPool 195- [Repo](repo.md) - Database operations 196- [Transactions](transactions.md) - Transaction handling