a database layer insipred by caqti and ecto
1# CQRS (Command Query Responsibility Segregation)
2
3repodb supports CQRS through transparent read/write splitting. Reads are automatically routed to replica servers while writes go to the primary, all without requiring application code changes.
4
5## When to Use CQRS
6
7CQRS is beneficial when:
8- Read traffic significantly exceeds write traffic
9- You want to scale reads horizontally with replicas
10- You want to avoid database load balancers like pgbouncer
11- Your application can tolerate slight replica lag for reads
12
13## Quick Start
14
15```ocaml
16open Repodb
17
18module Cqrs = Cqrs.Make(Repodb_postgresql)
19module Repo = Repo.Make(Repodb_postgresql)
20
21let cqrs = Cqrs.create {
22 primary_conninfo = "host=primary dbname=myapp user=app";
23 primary_max_size = 10;
24 replica_conninfos = [
25 "host=replica1 dbname=myapp user=app";
26 "host=replica2 dbname=myapp user=app";
27 ];
28 replica_max_size_each = 5;
29 replica_selection = RoundRobin;
30 validate = None;
31}
32
33(* Reads automatically go to replicas *)
34let users = Cqrs.with_read cqrs (fun conn ->
35 Repo.all conn ~table:users_table ~decode:decode_user
36)
37
38(* Writes always go to primary *)
39let () = Cqrs.with_write cqrs (fun conn ->
40 Repo.insert conn ~table:users_table ~columns ~values
41)
42```
43
44## Read Operations
45
46Use `with_read` for operations that only query data:
47
48```ocaml
49(* Routed to a replica *)
50let result = Cqrs.with_read cqrs (fun conn ->
51 Repo.all_query conn query ~decode
52)
53```
54
55When replicas are configured, reads are distributed using the configured selection strategy. If all replicas are unhealthy, reads automatically fall back to the primary.
56
57### Forcing Read from Primary
58
59For read-after-write consistency, use `with_primary`:
60
61```ocaml
62(* Insert a user *)
63let user_id = Cqrs.with_write cqrs (fun conn ->
64 Repo.insert_returning conn ~table ~columns ~values ~decode
65)
66
67(* Read immediately after write - use primary to avoid replica lag *)
68let user = Cqrs.with_primary cqrs (fun conn ->
69 Repo.get conn ~table ~id:user_id ~decode
70)
71```
72
73## Write Operations
74
75Writes always go to the primary server:
76
77```ocaml
78let () = Cqrs.with_write cqrs (fun conn ->
79 Repo.update conn ~table ~columns ~values ~where_column ~where_value
80)
81```
82
83## Transactions
84
85Transactions are always executed on the primary with a pinned connection:
86
87```ocaml
88let result = Cqrs.transaction cqrs (fun conn ->
89 (* All operations use the same primary connection *)
90 match Repo.insert conn ~table:orders ~columns ~values with
91 | Ok () ->
92 Repo.update conn ~table:inventory ~columns:["stock"]
93 ~values:[Value.int (stock - 1)] ~where_column:"id" ~where_value:(Value.int item_id)
94 | Error e -> Error e
95)
96```
97
98## Replica Selection Strategies
99
100### RoundRobin (Default)
101
102Distributes requests evenly across healthy replicas:
103
104```ocaml
105replica_selection = RoundRobin;
106```
107
108### Random
109
110Selects a random healthy replica for each request:
111
112```ocaml
113replica_selection = Random;
114```
115
116### LeastConnections
117
118Routes to the replica with the fewest active connections:
119
120```ocaml
121replica_selection = LeastConnections;
122```
123
124## Health Management
125
126Mark replicas as unhealthy when they fail:
127
128```ocaml
129(* Mark replica 0 as unhealthy *)
130Cqrs.mark_replica_unhealthy cqrs 0;
131
132(* Later, mark it healthy again *)
133Cqrs.mark_replica_healthy cqrs 0;
134```
135
136When all replicas are marked unhealthy, reads automatically fall back to the primary.
137
138## Configuration Reference
139
140```ocaml
141type 'conn config = {
142 primary_conninfo : string; (* Primary server connection string *)
143 primary_max_size : int; (* Max connections to primary *)
144 replica_conninfos : string list; (* Replica connection strings, empty = no CQRS *)
145 replica_max_size_each : int; (* Max connections per replica *)
146 replica_selection : replica_selection; (* RoundRobin | Random | LeastConnections *)
147 validate : ('conn -> bool) option; (* Optional connection validator *)
148}
149```
150
151## Monitoring
152
153Check CQRS status with `stats`:
154
155```ocaml
156let s = Cqrs.stats cqrs in
157Printf.printf "Primary: total=%d in_use=%d available=%d\n"
158 s.primary.total s.primary.in_use s.primary.available;
159Printf.printf "Healthy replicas: %d\n" s.healthy_replica_count;
160```
161
162## Disabling CQRS
163
164To disable CQRS, pass an empty replica list:
165
166```ocaml
167let cqrs = Cqrs.create {
168 primary_conninfo = "host=localhost dbname=myapp";
169 primary_max_size = 10;
170 replica_conninfos = []; (* No replicas = all traffic goes to primary *)
171 replica_max_size_each = 0;
172 replica_selection = RoundRobin;
173 validate = None;
174}
175```
176
177## Shutdown
178
179Always shut down the CQRS instance when done:
180
181```ocaml
182Cqrs.shutdown cqrs
183```
184
185## Best Practices
186
1871. **Size pools appropriately**: Primary pool should handle write load; replica pools handle read load
1882. **Handle replica lag**: Use `with_primary` for reads that need immediate consistency
1893. **Monitor health**: Track unhealthy replica counts and investigate failures
1904. **Connection validation**: Use `validate` to detect stale connections
191
192## Next Steps
193
194- [Connection Pool](pool.md) - Lower-level pool management and MultiPool
195- [Repo](repo.md) - Database operations
196- [Transactions](transactions.md) - Transaction handling