a database layer insipred by caqti and ecto

feat: add CQRS module and MultiPool for read/write splitting

- Add Pool.Make(Driver).Multi submodule with lock-free round-robin load
balancing using kcas, health tracking, and full pool API
- Add Cqrs.Make(Driver) functor for transparent read/write splitting:
- Automatic routing: reads to replicas, writes to primary
- Three replica selection strategies: RoundRobin, Random, LeastConnections
- Transaction pinning to primary connection
- Automatic fallback when all replicas unhealthy
- Add comprehensive unit tests (test_multi_pool.ml, test_cqrs.ml)
- Add Podman-based integration tests with PostgreSQL replication
- Add documentation (docs/cqrs.md, docs/pool.md MultiPool section)

+7 -7
.beads/issues.jsonl
··· 1 {"id":"mlecto-0b6","title":"[EPIC] Repository Module","description":"Database operations gateway. Functor over Caqti/Eio connection. CRUD operations, transactions, error handling.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:10.437260357+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.810893227+01:00","closed_at":"2026-01-04T01:20:44.810893227+01:00","close_reason":"All child tasks completed"} 2 {"id":"mlecto-0hr","title":"Implement Query compilation to SQL","description":"Convert Query.t to SQL string + Caqti request. Parameter binding, SQL escaping, dialect-specific generation (Postgres first).","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:30.33363594+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:12:40.467657348+01:00","closed_at":"2026-01-04T01:12:40.467657348+01:00","close_reason":"Query compilation implemented in Query.to_sql - generates PostgreSQL SQL for SELECT/INSERT/UPDATE/DELETE.","dependencies":[{"issue_id":"mlecto-0hr","depends_on_id":"mlecto-7lc","type":"blocks","created_at":"2026-01-04T00:22:02.555339871+01:00","created_by":"gdiazlo"}]} 3 - {"id":"mlecto-1lq","title":"Create Cqrs module for read/write splitting","description":"Create a new lib/cqrs.ml module that provides transparent CQRS routing.\n\n## New File: lib/cqrs.ml\n\n### Types\n```ocaml\ntype replica_selection = RoundRobin | Random | LeastConnections\n\ntype 'conn config = {\n primary_conninfo: string;\n primary_max_size: int;\n replica_conninfos: string list; (* empty = CQRS disabled *)\n replica_max_size_each: int;\n replica_selection: replica_selection;\n validate: ('conn -\u003e bool) option;\n}\n\ntype 'conn t = {\n primary_pool: 'conn Pool.t;\n replica_pools: 'conn Pool.t array option; (* None when no replicas *)\n replica_index: int Kcas.Loc.t;\n selection: replica_selection;\n mutable in_transaction: bool; (* or use domain-local storage *)\n}\n\ntype intent = Read | Write\n```\n\n### Core API\n```ocaml\n(* Create CQRS-aware pool pair *)\nval create : (module Driver.S with type connection = 'conn) -\u003e 'conn config -\u003e 'conn t\n\n(* Auto-routing based on operation type *)\nval with_read : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\nval with_write : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\n\n(* Explicit routing *)\nval with_primary : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\nval with_replica : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\n\n(* Transaction - always uses primary, pins connection *)\nval transaction : 'conn t -\u003e ('conn -\u003e ('a, Error.db_error) result) -\u003e ('a, Error.db_error) result\n```\n\n### Functor Interface\n```ocaml\nmodule Make (D : Driver.S) : sig\n type t = D.connection t\n val create : D.connection config -\u003e t\n (* ... rest of API *)\nend\n```\n\n## Routing Logic\n\n### Read Operations (go to replica if available)\n- Repo.get, get_opt\n- Repo.all\n- Repo.all_query, one_query, one_query_opt\n\n### Write Operations (always go to primary)\n- Repo.insert, insert_returning\n- Repo.update\n- Repo.delete\n- Repo.insert_query, update_query, delete_query\n- All transaction operations\n\n### Query-Based Auto-Detection\n```ocaml\nlet route_query query =\n match query.Query.query_type with\n | Query.Select -\u003e Read\n | Query.Insert | Query.Update | Query.Delete -\u003e Write\n```\n\n## Edge Cases\n1. **No replicas configured**: All operations go to primary\n2. **All replicas down**: Fall back to primary with warning\n3. **Read after write**: Provide with_primary for consistency\n4. **Transaction context**: Pin all ops to primary connection\n\n## Acceptance Criteria\n- [ ] Cqrs.Make functor works with any Driver.S\n- [ ] Reads go to replicas when available\n- [ ] Writes always go to primary\n- [ ] Transactions pin to single primary connection\n- [ ] Graceful fallback when replicas unavailable\n- [ ] Round-robin across replicas","status":"open","priority":1,"issue_type":"task","created_at":"2026-01-05T11:13:56.804354904+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:13:56.804354904+01:00","dependencies":[{"issue_id":"mlecto-1lq","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:30.146332625+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-1lq","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:02.678918762+01:00","created_by":"gdiazlo"}]} 4 {"id":"mlecto-1ud","title":"Implement Schema DSL (Mlecto.Schema)","description":"DSL for defining tables: field definitions, constraints (primary_key, not_null, unique, foreign_key, check), table_name generation.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:24.765735157+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:09:24.538059462+01:00","closed_at":"2026-01-04T01:09:24.538059462+01:00","close_reason":"Schema DSL complete: table definitions, column constraints (PrimaryKey, NotNull, Unique, Default, Check, ForeignKey), foreign key actions, timestamps helper, SQL generation.","dependencies":[{"issue_id":"mlecto-1ud","depends_on_id":"mlecto-z72","type":"blocks","created_at":"2026-01-04T00:22:02.524126267+01:00","created_by":"gdiazlo"}]} 5 {"id":"mlecto-1w9","title":"[EPIC] Core Type System \u0026 Schema DSL","description":"Define SQL types mapped to OCaml, schema definition DSL for tables/fields/constraints. Foundation for all other modules.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:05.917266549+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.799339065+01:00","closed_at":"2026-01-04T01:20:44.799339065+01:00","close_reason":"All child tasks completed"} 6 - {"id":"mlecto-2gn","title":"Implement MultiPool in pool.ml","description":"Extend the existing Pool module with multi-server support.\n\n## Changes to lib/pool.ml\n\n### New Types\n```ocaml\ntype server = {\n conninfo: string;\n weight: int; (* for weighted round-robin, default 1 *)\n}\n\ntype 'conn multi_config = {\n servers: server list;\n max_size_per_server: int;\n connect: string -\u003e ('conn, string) result;\n close: 'conn -\u003e unit;\n validate: ('conn -\u003e bool) option;\n}\n\ntype 'conn multi_t = {\n pools: 'conn t array;\n healthy: bool Kcas.Loc.t array;\n next_index: int Kcas.Loc.t;\n}\n```\n\n### New Functions\n- `create_multi : 'conn multi_config -\u003e 'conn multi_t`\n- `acquire_multi : 'conn multi_t -\u003e ('conn, pool_error) result`\n- `release_multi : 'conn multi_t -\u003e 'conn -\u003e unit`\n- `with_connection_multi : 'conn multi_t -\u003e ('conn -\u003e 'a) -\u003e ('a, pool_error) result`\n- `shutdown_multi : 'conn multi_t -\u003e unit`\n- `stats_multi : 'conn multi_t -\u003e stats list`\n- `mark_unhealthy : 'conn multi_t -\u003e int -\u003e unit`\n- `mark_healthy : 'conn multi_t -\u003e int -\u003e unit`\n\n### Implementation Notes\n- Use atomic counter with Kcas.Loc for lock-free round-robin\n- Skip unhealthy servers in round-robin selection\n- When all servers unhealthy, return Pool_empty error\n- Each server gets its own pool with max_size_per_server connections\n\n## Acceptance Criteria\n- [ ] All new types defined\n- [ ] Round-robin distributes evenly across healthy servers\n- [ ] Unhealthy servers are skipped\n- [ ] Backward compatible - existing Pool API unchanged\n- [ ] Unit tests pass","status":"open","priority":1,"issue_type":"task","created_at":"2026-01-05T11:13:34.299242118+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:13:34.299242118+01:00","dependencies":[{"issue_id":"mlecto-2gn","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:25.102411986+01:00","created_by":"gdiazlo"}]} 7 {"id":"mlecto-3ow","title":"cast_assoc and put_assoc for nested changesets","status":"closed","priority":2,"issue_type":"feature","created_at":"2026-01-04T10:32:00.153765675+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.664932555+01:00","closed_at":"2026-01-04T10:38:58.664932555+01:00","close_reason":"Closed","dependencies":[{"issue_id":"mlecto-3ow","depends_on_id":"mlecto-d6f","type":"blocks","created_at":"2026-01-04T10:32:20.574240684+01:00","created_by":"gdiazlo"}]} 8 - {"id":"mlecto-3wf","title":"CQRS and Multi-Server Pool Support","description":"Add CQRS (Command Query Responsibility Segregation) support and multi-server connection pooling to repodb.\n\n## Goals\n1. **CQRS**: Transparent read/write splitting where reads go to replicas and writes go to primary\n2. **Multi-Server Pool**: Round-robin load balancing across multiple database servers\n3. **Backward Compatible**: Existing single-server users see no API changes\n\n## Key Features\n- Automatic routing based on query type (SELECT vs INSERT/UPDATE/DELETE)\n- Transaction pinning to primary (all ops in transaction use same connection)\n- Lock-free round-robin using kcas\n- Health checking to skip unhealthy servers\n- Fallback to primary when all replicas down\n\n## Testing\nIntegration tests with real PostgreSQL using Podman containers.","status":"open","priority":1,"issue_type":"epic","created_at":"2026-01-05T11:13:04.912905111+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:17:02.560139947+01:00"} 9 {"id":"mlecto-4g7","title":"Constraint error mapping from DB to changeset","status":"closed","priority":2,"issue_type":"feature","created_at":"2026-01-04T10:32:00.520652974+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.665466447+01:00","closed_at":"2026-01-04T10:38:58.665466447+01:00","close_reason":"Closed"} 10 {"id":"mlecto-5km","title":"Implement Changeset validators","description":"validate_required, validate_format (regex), validate_length (min/max/is), validate_inclusion, validate_exclusion, validate_number, validate_acceptance, validate_confirmation, validate_change (custom).","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:40.78012908+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:14:08.654694896+01:00","closed_at":"2026-01-04T01:14:08.654694896+01:00","close_reason":"Validators implemented in Changeset core: validate_required, validate_format, validate_length, validate_inclusion, validate_exclusion, validate_number, validate_acceptance, validate_confirmation, validate_change.","dependencies":[{"issue_id":"mlecto-5km","depends_on_id":"mlecto-m13","type":"blocks","created_at":"2026-01-04T00:22:03.958038572+01:00","created_by":"gdiazlo"}]} 11 {"id":"mlecto-7lc","title":"Implement Query builder (Mlecto.Query)","description":"SELECT, INSERT, UPDATE, DELETE builders. WHERE, JOIN, ORDER BY, GROUP BY, HAVING, LIMIT, OFFSET. Composable query pipelines.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:28.238632191+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:12:33.283719987+01:00","closed_at":"2026-01-04T01:12:33.283719987+01:00","close_reason":"Query builder complete: SELECT, INSERT, UPDATE, DELETE with WHERE, JOIN, ORDER BY, GROUP BY, HAVING, LIMIT, OFFSET. ON CONFLICT for upserts. RETURNING clause. Full SQL generation.","dependencies":[{"issue_id":"mlecto-7lc","depends_on_id":"mlecto-1ud","type":"blocks","created_at":"2026-01-04T00:22:02.538979501+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-7lc","depends_on_id":"mlecto-nv7","type":"blocks","created_at":"2026-01-04T00:22:02.547726588+01:00","created_by":"gdiazlo"}]} ··· 14 {"id":"mlecto-at2","title":"[EPIC] Migration System","description":"Versioned database migrations with up/down support. DSL for create_table, add_column, create_index, etc.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:12.288036672+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.812043932+01:00","closed_at":"2026-01-04T01:20:44.812043932+01:00","close_reason":"All child tasks completed"} 15 {"id":"mlecto-c1o","title":"Real Repo execution via Caqti/Eio","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T10:31:58.698847822+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.663072869+01:00","closed_at":"2026-01-04T10:38:58.663072869+01:00","close_reason":"Closed"} 16 {"id":"mlecto-d6f","title":"Schema associations (has_many, belongs_to, has_one, many_to_many)","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T10:31:59.069873954+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.661985306+01:00","closed_at":"2026-01-04T10:38:58.661985306+01:00","close_reason":"Closed"} 17 - {"id":"mlecto-dg4","title":"Add unit tests for MultiPool","description":"Add comprehensive unit tests for the MultiPool functionality.\n\n## New File: test/test_multi_pool.ml\n\n### Test Cases\n\n1. **Round-Robin Distribution**\n - Create multi-pool with 3 servers\n - Acquire N connections, verify even distribution\n - Assert each server gets ~N/3 connections\n\n2. **Health Check Skipping**\n - Create multi-pool with 3 servers\n - Mark one server unhealthy\n - Verify connections only go to healthy servers\n - Mark all unhealthy -\u003e expect Pool_empty error\n\n3. **Mark Healthy/Unhealthy**\n - Test mark_unhealthy and mark_healthy functions\n - Verify atomic updates work correctly\n\n4. **with_connection_multi**\n - Test automatic release on success\n - Test automatic release on exception\n\n5. **shutdown_multi**\n - Verify all pools are closed\n - Verify subsequent acquire returns Pool_closed\n\n6. **stats_multi**\n - Verify stats aggregation across all pools\n\n7. **Edge Cases**\n - Single server multi-pool (degenerate case)\n - Empty server list (should error)\n - All servers fail to connect\n\n## Mock Setup\nUse similar mock pattern to existing test_pool.ml:\n```ocaml\ntype mock_conn = { id : int; server_idx : int; mutable closed : bool }\n\nlet mock_multi_config ?(servers = 3) ?(max_size = 2) () = ...\n```\n\n## Acceptance Criteria\n- [ ] All test cases implemented\n- [ ] Tests pass with `dune test`\n- [ ] Edge cases covered","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T11:14:18.326292543+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:14:18.326292543+01:00","dependencies":[{"issue_id":"mlecto-dg4","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:35.191030964+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-dg4","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:07.722230492+01:00","created_by":"gdiazlo"}]} 18 - {"id":"mlecto-e6g","title":"Set up Podman-based PostgreSQL integration tests","description":"Create integration test infrastructure using Podman to run real PostgreSQL instances.\n\n## Goal\nTest CQRS and MultiPool against real PostgreSQL primary + replicas.\n\n## Infrastructure\n\n### Podman Compose File: test/integration/docker-compose.yml\n```yaml\nversion: '3.8'\nservices:\n primary:\n image: postgres:16\n environment:\n POSTGRES_USER: repodb\n POSTGRES_PASSWORD: repodb\n POSTGRES_DB: repodb_test\n ports:\n - '5432:5432'\n command: \u003e\n postgres\n -c wal_level=replica\n -c max_wal_senders=3\n -c max_replication_slots=3\n -c hot_standby=on\n volumes:\n - primary_data:/var/lib/postgresql/data\n healthcheck:\n test: ['CMD-SHELL', 'pg_isready -U repodb']\n interval: 5s\n timeout: 5s\n retries: 5\n\n replica1:\n image: postgres:16\n environment:\n POSTGRES_USER: repodb\n POSTGRES_PASSWORD: repodb\n POSTGRES_DB: repodb_test\n ports:\n - '5433:5432'\n depends_on:\n primary:\n condition: service_healthy\n command: \u003e\n bash -c \"\n until pg_basebackup -h primary -D /var/lib/postgresql/data -U repodb -Fp -Xs -P -R; do\n sleep 1\n done\n chmod 700 /var/lib/postgresql/data\n postgres\n \"\n volumes:\n - replica1_data:/var/lib/postgresql/data\n\n replica2:\n image: postgres:16\n environment:\n POSTGRES_USER: repodb\n POSTGRES_PASSWORD: repodb\n POSTGRES_DB: repodb_test\n ports:\n - '5434:5432'\n depends_on:\n primary:\n condition: service_healthy\n command: \u003e\n bash -c \"\n until pg_basebackup -h primary -D /var/lib/postgresql/data -U repodb -Fp -Xs -P -R; do\n sleep 1\n done\n chmod 700 /var/lib/postgresql/data\n postgres\n \"\n volumes:\n - replica2_data:/var/lib/postgresql/data\n\nvolumes:\n primary_data:\n replica1_data:\n replica2_data:\n```\n\n### Test Runner Script: test/integration/run_tests.sh\n```bash\n#!/bin/bash\nset -e\n\n# Start PostgreSQL cluster\npodman-compose -f test/integration/docker-compose.yml up -d\n\n# Wait for replicas to catch up\nsleep 10\n\n# Run integration tests\ndune exec test/integration/test_cqrs_integration.exe\n\n# Cleanup\npodman-compose -f test/integration/docker-compose.yml down -v\n```\n\n### Integration Test File: test/integration/test_cqrs_integration.ml\nTest cases:\n1. Write to primary, read from replica (verify replication)\n2. Transaction isolation (all ops on primary)\n3. Replica lag handling\n4. Connection pooling under load\n5. Failover when replica dies\n\n## Acceptance Criteria\n- [ ] Podman compose file creates working PG cluster\n- [ ] Replication works between primary and replicas\n- [ ] Integration tests pass\n- [ ] Cleanup script removes all containers/volumes","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T11:14:57.307024006+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:14:57.307024006+01:00","dependencies":[{"issue_id":"mlecto-e6g","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:45.282902638+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-e6g","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:17.810402811+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-e6g","depends_on_id":"mlecto-1lq","type":"blocks","created_at":"2026-01-05T11:16:22.852706838+01:00","created_by":"gdiazlo"}]} 19 {"id":"mlecto-flv","title":"Implement Multi module (Mlecto.Multi)","description":"Chain operations: Multi.new |\u003e Multi.insert |\u003e Multi.update |\u003e Multi.run. Named operations, access previous results, atomic execution.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T00:21:50.583520239+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:32.435485703+01:00","closed_at":"2026-01-04T01:20:32.435485703+01:00","close_reason":"Multi with operation chaining, named results, merge, validate, execute_sync","dependencies":[{"issue_id":"mlecto-flv","depends_on_id":"mlecto-8ki","type":"blocks","created_at":"2026-01-04T00:22:07.51964609+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-flv","depends_on_id":"mlecto-hwj","type":"blocks","created_at":"2026-01-04T00:22:07.527438964+01:00","created_by":"gdiazlo"}]} 20 {"id":"mlecto-gfo","title":"[EPIC] Changeset System","description":"Casting, validation, and error accumulation. Separate validation from persistence with composable validators.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:07.363157981+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.808207655+01:00","closed_at":"2026-01-04T01:20:44.808207655+01:00","close_reason":"All child tasks completed"} 21 {"id":"mlecto-gxh","title":"Implement Changeset constraints","description":"unique_constraint, foreign_key_constraint, check_constraint, exclusion_constraint. Convert DB errors to changeset errors.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:42.297745588+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:14:08.663238895+01:00","closed_at":"2026-01-04T01:14:08.663238895+01:00","close_reason":"Constraints implemented in Changeset core: unique_constraint, foreign_key_constraint, check_constraint.","dependencies":[{"issue_id":"mlecto-gxh","depends_on_id":"mlecto-m13","type":"blocks","created_at":"2026-01-04T00:22:03.965402454+01:00","created_by":"gdiazlo"}]} 22 {"id":"mlecto-hw6","title":"[EPIC] mlecto - Ecto-like database toolkit for OCaml","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:20:55.991644266+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:49.927971061+01:00","closed_at":"2026-01-04T01:20:49.927971061+01:00","close_reason":"Core mlecto library complete: Type, Schema, Expr, Query, Changeset, Repo, Migration, Multi modules implemented"} 23 {"id":"mlecto-hwj","title":"Implement Repo transactions","description":"transaction/1 function wrapping Caqti transactions. Automatic rollback on error. Nested transaction support (savepoints).","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:46.455864854+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:19:20.791036484+01:00","closed_at":"2026-01-04T01:19:20.791036484+01:00","close_reason":"Transaction state machine with BEGIN/COMMIT/ROLLBACK and nested savepoint support","dependencies":[{"issue_id":"mlecto-hwj","depends_on_id":"mlecto-8ki","type":"blocks","created_at":"2026-01-04T00:22:05.217260647+01:00","created_by":"gdiazlo"}]} 24 - {"id":"mlecto-hwq","title":"Write CQRS documentation","description":"Create documentation for the CQRS and MultiPool features.\n\n## New File: docs/cqrs.md\n\n### Sections\n\n1. **Overview**\n - What is CQRS\n - When to use it\n - How repodb implements it\n\n2. **Quick Start**\n ```ocaml\n (* Basic CQRS setup *)\n module Cqrs = Cqrs.Make(Repodb_postgresql)\n\n let cqrs = Cqrs.create {\n primary_conninfo = \"postgresql://primary/mydb\";\n primary_max_size = 10;\n replica_conninfos = [\n \"postgresql://replica1/mydb\";\n \"postgresql://replica2/mydb\";\n ];\n replica_max_size_each = 5;\n replica_selection = RoundRobin;\n validate = None;\n }\n ```\n\n3. **Read Operations**\n - Which operations are routed to replicas\n - How to force read from primary\n\n4. **Write Operations**\n - Always go to primary\n - Transaction handling\n\n5. **Explicit Routing**\n - with_primary for read-after-write consistency\n - with_replica for specific use cases\n\n6. **Configuration Options**\n - Full config reference\n - Replica selection strategies\n\n7. **Best Practices**\n - When to use CQRS\n - Handling replica lag\n - Monitoring\n\n## Update: docs/pool.md\n\nAdd section on MultiPool:\n- Multi-server configuration\n- Round-robin behavior\n- Health checking\n\n## Acceptance Criteria\n- [ ] docs/cqrs.md complete with examples\n- [ ] docs/pool.md updated for MultiPool\n- [ ] README.md mentions new features\n- [ ] Examples compile and work","status":"open","priority":3,"issue_type":"task","created_at":"2026-01-05T11:15:13.481713538+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:15:13.481713538+01:00","dependencies":[{"issue_id":"mlecto-hwq","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:50.325885062+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-hwq","depends_on_id":"mlecto-1lq","type":"blocks","created_at":"2026-01-05T11:16:27.893913307+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-hwq","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:32.936486931+01:00","created_by":"gdiazlo"}]} 25 {"id":"mlecto-l6s","title":"[EPIC] Query DSL","description":"Type-safe SQL query builder using GADTs and phantom types. Composable queries as first-class values.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:08.537372834+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.809745142+01:00","closed_at":"2026-01-04T01:20:44.809745142+01:00","close_reason":"All child tasks completed"} 26 - {"id":"mlecto-lg9","title":"Add unit tests for Cqrs module","description":"Add comprehensive unit tests for the CQRS routing functionality.\n\n## New File: test/test_cqrs.ml\n\n### Test Cases\n\n1. **Read Routing**\n - Configure CQRS with primary + replicas\n - Execute with_read -\u003e verify goes to replica\n - Execute Repo.all -\u003e verify goes to replica\n\n2. **Write Routing**\n - Execute with_write -\u003e verify goes to primary\n - Execute Repo.insert -\u003e verify goes to primary\n\n3. **Query-Based Routing**\n - SELECT query -\u003e replica\n - INSERT query -\u003e primary\n - UPDATE query -\u003e primary\n - DELETE query -\u003e primary\n\n4. **Transaction Pinning**\n - Start transaction -\u003e all ops go to primary\n - Even read operations inside transaction go to primary\n - Same connection used throughout transaction\n\n5. **No Replicas Mode**\n - Configure with empty replica list\n - All operations go to primary\n - No errors, works like single-server pool\n\n6. **Replica Fallback**\n - All replicas unhealthy\n - Reads fall back to primary\n - Warning logged (if logging added)\n\n7. **with_primary / with_replica**\n - Explicit routing works as expected\n - with_primary always uses primary even for reads\n - with_replica errors if no replicas configured\n\n8. **Round-Robin Across Replicas**\n - Multiple replicas configured\n - Reads distributed evenly\n\n## Mock Setup\n```ocaml\n(* Track which pool each operation went to *)\ntype routing_tracker = {\n mutable primary_calls: int;\n mutable replica_calls: int list; (* per-replica counts *)\n}\n```\n\n## Acceptance Criteria\n- [ ] All test cases implemented\n- [ ] Tests pass with `dune test`\n- [ ] Transaction isolation tested\n- [ ] Fallback behavior tested","status":"open","priority":2,"issue_type":"task","created_at":"2026-01-05T11:14:37.578081717+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:14:37.578081717+01:00","dependencies":[{"issue_id":"mlecto-lg9","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:40.237975143+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-lg9","depends_on_id":"mlecto-1lq","type":"blocks","created_at":"2026-01-05T11:16:12.765564629+01:00","created_by":"gdiazlo"}]} 27 {"id":"mlecto-m13","title":"Implement Changeset core (Mlecto.Changeset)","description":"Changeset record type: data, changes, errors, valid?, params, required. Cast function for external params. Change function for internal data.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:38.875652686+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:13:59.358336893+01:00","closed_at":"2026-01-04T01:13:59.358336893+01:00","close_reason":"Changeset core complete: type-safe field handling, validations (required, format, length, inclusion, exclusion, number, acceptance, confirmation, custom), constraints (unique, foreign_key, check), error handling.","dependencies":[{"issue_id":"mlecto-m13","depends_on_id":"mlecto-z72","type":"blocks","created_at":"2026-01-04T00:22:03.941717052+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-m13","depends_on_id":"mlecto-1ud","type":"blocks","created_at":"2026-01-04T00:22:03.95059939+01:00","created_by":"gdiazlo"}]} 28 {"id":"mlecto-nv7","title":"Implement Expression types (Mlecto.Expr)","description":"GADTs for SQL expressions: literals, columns, operators (+, -, =, \u003c\u003e, AND, OR), function calls, casts. Type-safe expression composition.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:26.662064082+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:11:24.632729849+01:00","closed_at":"2026-01-04T01:11:24.632729849+01:00","close_reason":"Expression types complete: GADT for all SQL expressions, operators (comparison, logical, arithmetic), functions (aggregate, string, date, math), CASE, BETWEEN, IN, CAST, SQL generation.","dependencies":[{"issue_id":"mlecto-nv7","depends_on_id":"mlecto-z72","type":"blocks","created_at":"2026-01-04T00:22:02.531940181+01:00","created_by":"gdiazlo"}]} 29 {"id":"mlecto-pvs","title":"Implement Migration runner","description":"schema_migrations table, version tracking, up/down execution, migrate/rollback commands, migration status reporting.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:48.916746145+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:18:41.847908163+01:00","closed_at":"2026-01-04T01:18:41.847908163+01:00","close_reason":"Migration runner with version tracking, plan_migrate, plan_rollback, format_status","dependencies":[{"issue_id":"mlecto-pvs","depends_on_id":"mlecto-uek","type":"blocks","created_at":"2026-01-04T00:22:06.667807096+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-pvs","depends_on_id":"mlecto-8ki","type":"blocks","created_at":"2026-01-04T00:22:06.6757586+01:00","created_by":"gdiazlo"}]}
··· 1 {"id":"mlecto-0b6","title":"[EPIC] Repository Module","description":"Database operations gateway. Functor over Caqti/Eio connection. CRUD operations, transactions, error handling.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:10.437260357+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.810893227+01:00","closed_at":"2026-01-04T01:20:44.810893227+01:00","close_reason":"All child tasks completed"} 2 {"id":"mlecto-0hr","title":"Implement Query compilation to SQL","description":"Convert Query.t to SQL string + Caqti request. Parameter binding, SQL escaping, dialect-specific generation (Postgres first).","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:30.33363594+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:12:40.467657348+01:00","closed_at":"2026-01-04T01:12:40.467657348+01:00","close_reason":"Query compilation implemented in Query.to_sql - generates PostgreSQL SQL for SELECT/INSERT/UPDATE/DELETE.","dependencies":[{"issue_id":"mlecto-0hr","depends_on_id":"mlecto-7lc","type":"blocks","created_at":"2026-01-04T00:22:02.555339871+01:00","created_by":"gdiazlo"}]} 3 + {"id":"mlecto-1lq","title":"Create Cqrs module for read/write splitting","description":"Create a new lib/cqrs.ml module that provides transparent CQRS routing.\n\n## New File: lib/cqrs.ml\n\n### Types\n```ocaml\ntype replica_selection = RoundRobin | Random | LeastConnections\n\ntype 'conn config = {\n primary_conninfo: string;\n primary_max_size: int;\n replica_conninfos: string list; (* empty = CQRS disabled *)\n replica_max_size_each: int;\n replica_selection: replica_selection;\n validate: ('conn -\u003e bool) option;\n}\n\ntype 'conn t = {\n primary_pool: 'conn Pool.t;\n replica_pools: 'conn Pool.t array option; (* None when no replicas *)\n replica_index: int Kcas.Loc.t;\n selection: replica_selection;\n mutable in_transaction: bool; (* or use domain-local storage *)\n}\n\ntype intent = Read | Write\n```\n\n### Core API\n```ocaml\n(* Create CQRS-aware pool pair *)\nval create : (module Driver.S with type connection = 'conn) -\u003e 'conn config -\u003e 'conn t\n\n(* Auto-routing based on operation type *)\nval with_read : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\nval with_write : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\n\n(* Explicit routing *)\nval with_primary : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\nval with_replica : 'conn t -\u003e ('conn -\u003e 'a) -\u003e ('a, Pool.pool_error) result\n\n(* Transaction - always uses primary, pins connection *)\nval transaction : 'conn t -\u003e ('conn -\u003e ('a, Error.db_error) result) -\u003e ('a, Error.db_error) result\n```\n\n### Functor Interface\n```ocaml\nmodule Make (D : Driver.S) : sig\n type t = D.connection t\n val create : D.connection config -\u003e t\n (* ... rest of API *)\nend\n```\n\n## Routing Logic\n\n### Read Operations (go to replica if available)\n- Repo.get, get_opt\n- Repo.all\n- Repo.all_query, one_query, one_query_opt\n\n### Write Operations (always go to primary)\n- Repo.insert, insert_returning\n- Repo.update\n- Repo.delete\n- Repo.insert_query, update_query, delete_query\n- All transaction operations\n\n### Query-Based Auto-Detection\n```ocaml\nlet route_query query =\n match query.Query.query_type with\n | Query.Select -\u003e Read\n | Query.Insert | Query.Update | Query.Delete -\u003e Write\n```\n\n## Edge Cases\n1. **No replicas configured**: All operations go to primary\n2. **All replicas down**: Fall back to primary with warning\n3. **Read after write**: Provide with_primary for consistency\n4. **Transaction context**: Pin all ops to primary connection\n\n## Acceptance Criteria\n- [ ] Cqrs.Make functor works with any Driver.S\n- [ ] Reads go to replicas when available\n- [ ] Writes always go to primary\n- [ ] Transactions pin to single primary connection\n- [ ] Graceful fallback when replicas unavailable\n- [ ] Round-robin across replicas","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T11:13:56.804354904+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:25:09.834278286+01:00","closed_at":"2026-01-05T11:25:09.834278286+01:00","close_reason":"Implemented Cqrs module with read/write splitting, replica selection strategies, and transaction support","dependencies":[{"issue_id":"mlecto-1lq","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:30.146332625+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-1lq","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:02.678918762+01:00","created_by":"gdiazlo"}]} 4 {"id":"mlecto-1ud","title":"Implement Schema DSL (Mlecto.Schema)","description":"DSL for defining tables: field definitions, constraints (primary_key, not_null, unique, foreign_key, check), table_name generation.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:24.765735157+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:09:24.538059462+01:00","closed_at":"2026-01-04T01:09:24.538059462+01:00","close_reason":"Schema DSL complete: table definitions, column constraints (PrimaryKey, NotNull, Unique, Default, Check, ForeignKey), foreign key actions, timestamps helper, SQL generation.","dependencies":[{"issue_id":"mlecto-1ud","depends_on_id":"mlecto-z72","type":"blocks","created_at":"2026-01-04T00:22:02.524126267+01:00","created_by":"gdiazlo"}]} 5 {"id":"mlecto-1w9","title":"[EPIC] Core Type System \u0026 Schema DSL","description":"Define SQL types mapped to OCaml, schema definition DSL for tables/fields/constraints. Foundation for all other modules.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:05.917266549+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.799339065+01:00","closed_at":"2026-01-04T01:20:44.799339065+01:00","close_reason":"All child tasks completed"} 6 + {"id":"mlecto-2gn","title":"Implement MultiPool in pool.ml","description":"Extend the existing Pool module with multi-server support.\n\n## Changes to lib/pool.ml\n\n### New Types\n```ocaml\ntype server = {\n conninfo: string;\n weight: int; (* for weighted round-robin, default 1 *)\n}\n\ntype 'conn multi_config = {\n servers: server list;\n max_size_per_server: int;\n connect: string -\u003e ('conn, string) result;\n close: 'conn -\u003e unit;\n validate: ('conn -\u003e bool) option;\n}\n\ntype 'conn multi_t = {\n pools: 'conn t array;\n healthy: bool Kcas.Loc.t array;\n next_index: int Kcas.Loc.t;\n}\n```\n\n### New Functions\n- `create_multi : 'conn multi_config -\u003e 'conn multi_t`\n- `acquire_multi : 'conn multi_t -\u003e ('conn, pool_error) result`\n- `release_multi : 'conn multi_t -\u003e 'conn -\u003e unit`\n- `with_connection_multi : 'conn multi_t -\u003e ('conn -\u003e 'a) -\u003e ('a, pool_error) result`\n- `shutdown_multi : 'conn multi_t -\u003e unit`\n- `stats_multi : 'conn multi_t -\u003e stats list`\n- `mark_unhealthy : 'conn multi_t -\u003e int -\u003e unit`\n- `mark_healthy : 'conn multi_t -\u003e int -\u003e unit`\n\n### Implementation Notes\n- Use atomic counter with Kcas.Loc for lock-free round-robin\n- Skip unhealthy servers in round-robin selection\n- When all servers unhealthy, return Pool_empty error\n- Each server gets its own pool with max_size_per_server connections\n\n## Acceptance Criteria\n- [ ] All new types defined\n- [ ] Round-robin distributes evenly across healthy servers\n- [ ] Unhealthy servers are skipped\n- [ ] Backward compatible - existing Pool API unchanged\n- [ ] Unit tests pass","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-05T11:13:34.299242118+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:23:43.1577121+01:00","closed_at":"2026-01-05T11:23:43.1577121+01:00","close_reason":"Implemented MultiPool with round-robin, health tracking, and Make functor","dependencies":[{"issue_id":"mlecto-2gn","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:25.102411986+01:00","created_by":"gdiazlo"}]} 7 {"id":"mlecto-3ow","title":"cast_assoc and put_assoc for nested changesets","status":"closed","priority":2,"issue_type":"feature","created_at":"2026-01-04T10:32:00.153765675+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.664932555+01:00","closed_at":"2026-01-04T10:38:58.664932555+01:00","close_reason":"Closed","dependencies":[{"issue_id":"mlecto-3ow","depends_on_id":"mlecto-d6f","type":"blocks","created_at":"2026-01-04T10:32:20.574240684+01:00","created_by":"gdiazlo"}]} 8 + {"id":"mlecto-3wf","title":"CQRS and Multi-Server Pool Support","description":"Add CQRS (Command Query Responsibility Segregation) support and multi-server connection pooling to repodb.\n\n## Goals\n1. **CQRS**: Transparent read/write splitting where reads go to replicas and writes go to primary\n2. **Multi-Server Pool**: Round-robin load balancing across multiple database servers\n3. **Backward Compatible**: Existing single-server users see no API changes\n\n## Key Features\n- Automatic routing based on query type (SELECT vs INSERT/UPDATE/DELETE)\n- Transaction pinning to primary (all ops in transaction use same connection)\n- Lock-free round-robin using kcas\n- Health checking to skip unhealthy servers\n- Fallback to primary when all replicas down\n\n## Testing\nIntegration tests with real PostgreSQL using Podman containers.","status":"closed","priority":1,"issue_type":"epic","created_at":"2026-01-05T11:13:04.912905111+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:34:13.477207902+01:00","closed_at":"2026-01-05T11:34:13.477207902+01:00","close_reason":"All subtasks completed: MultiPool, CQRS module, unit tests, integration tests, and documentation"} 9 {"id":"mlecto-4g7","title":"Constraint error mapping from DB to changeset","status":"closed","priority":2,"issue_type":"feature","created_at":"2026-01-04T10:32:00.520652974+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.665466447+01:00","closed_at":"2026-01-04T10:38:58.665466447+01:00","close_reason":"Closed"} 10 {"id":"mlecto-5km","title":"Implement Changeset validators","description":"validate_required, validate_format (regex), validate_length (min/max/is), validate_inclusion, validate_exclusion, validate_number, validate_acceptance, validate_confirmation, validate_change (custom).","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:40.78012908+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:14:08.654694896+01:00","closed_at":"2026-01-04T01:14:08.654694896+01:00","close_reason":"Validators implemented in Changeset core: validate_required, validate_format, validate_length, validate_inclusion, validate_exclusion, validate_number, validate_acceptance, validate_confirmation, validate_change.","dependencies":[{"issue_id":"mlecto-5km","depends_on_id":"mlecto-m13","type":"blocks","created_at":"2026-01-04T00:22:03.958038572+01:00","created_by":"gdiazlo"}]} 11 {"id":"mlecto-7lc","title":"Implement Query builder (Mlecto.Query)","description":"SELECT, INSERT, UPDATE, DELETE builders. WHERE, JOIN, ORDER BY, GROUP BY, HAVING, LIMIT, OFFSET. Composable query pipelines.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:28.238632191+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:12:33.283719987+01:00","closed_at":"2026-01-04T01:12:33.283719987+01:00","close_reason":"Query builder complete: SELECT, INSERT, UPDATE, DELETE with WHERE, JOIN, ORDER BY, GROUP BY, HAVING, LIMIT, OFFSET. ON CONFLICT for upserts. RETURNING clause. Full SQL generation.","dependencies":[{"issue_id":"mlecto-7lc","depends_on_id":"mlecto-1ud","type":"blocks","created_at":"2026-01-04T00:22:02.538979501+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-7lc","depends_on_id":"mlecto-nv7","type":"blocks","created_at":"2026-01-04T00:22:02.547726588+01:00","created_by":"gdiazlo"}]} ··· 14 {"id":"mlecto-at2","title":"[EPIC] Migration System","description":"Versioned database migrations with up/down support. DSL for create_table, add_column, create_index, etc.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:12.288036672+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.812043932+01:00","closed_at":"2026-01-04T01:20:44.812043932+01:00","close_reason":"All child tasks completed"} 15 {"id":"mlecto-c1o","title":"Real Repo execution via Caqti/Eio","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T10:31:58.698847822+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.663072869+01:00","closed_at":"2026-01-04T10:38:58.663072869+01:00","close_reason":"Closed"} 16 {"id":"mlecto-d6f","title":"Schema associations (has_many, belongs_to, has_one, many_to_many)","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T10:31:59.069873954+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T10:38:58.661985306+01:00","closed_at":"2026-01-04T10:38:58.661985306+01:00","close_reason":"Closed"} 17 + {"id":"mlecto-dg4","title":"Add unit tests for MultiPool","description":"Add comprehensive unit tests for the MultiPool functionality.\n\n## New File: test/test_multi_pool.ml\n\n### Test Cases\n\n1. **Round-Robin Distribution**\n - Create multi-pool with 3 servers\n - Acquire N connections, verify even distribution\n - Assert each server gets ~N/3 connections\n\n2. **Health Check Skipping**\n - Create multi-pool with 3 servers\n - Mark one server unhealthy\n - Verify connections only go to healthy servers\n - Mark all unhealthy -\u003e expect Pool_empty error\n\n3. **Mark Healthy/Unhealthy**\n - Test mark_unhealthy and mark_healthy functions\n - Verify atomic updates work correctly\n\n4. **with_connection_multi**\n - Test automatic release on success\n - Test automatic release on exception\n\n5. **shutdown_multi**\n - Verify all pools are closed\n - Verify subsequent acquire returns Pool_closed\n\n6. **stats_multi**\n - Verify stats aggregation across all pools\n\n7. **Edge Cases**\n - Single server multi-pool (degenerate case)\n - Empty server list (should error)\n - All servers fail to connect\n\n## Mock Setup\nUse similar mock pattern to existing test_pool.ml:\n```ocaml\ntype mock_conn = { id : int; server_idx : int; mutable closed : bool }\n\nlet mock_multi_config ?(servers = 3) ?(max_size = 2) () = ...\n```\n\n## Acceptance Criteria\n- [ ] All test cases implemented\n- [ ] Tests pass with `dune test`\n- [ ] Edge cases covered","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T11:14:18.326292543+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:29:05.161622523+01:00","closed_at":"2026-01-05T11:29:05.161622523+01:00","close_reason":"Added 14 unit tests for MultiPool covering round-robin, health checks, and edge cases","dependencies":[{"issue_id":"mlecto-dg4","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:35.191030964+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-dg4","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:07.722230492+01:00","created_by":"gdiazlo"}]} 18 + {"id":"mlecto-e6g","title":"Set up Podman-based PostgreSQL integration tests","description":"Create integration test infrastructure using Podman to run real PostgreSQL instances.\n\n## Goal\nTest CQRS and MultiPool against real PostgreSQL primary + replicas.\n\n## Infrastructure\n\n### Podman Compose File: test/integration/docker-compose.yml\n```yaml\nversion: '3.8'\nservices:\n primary:\n image: postgres:16\n environment:\n POSTGRES_USER: repodb\n POSTGRES_PASSWORD: repodb\n POSTGRES_DB: repodb_test\n ports:\n - '5432:5432'\n command: \u003e\n postgres\n -c wal_level=replica\n -c max_wal_senders=3\n -c max_replication_slots=3\n -c hot_standby=on\n volumes:\n - primary_data:/var/lib/postgresql/data\n healthcheck:\n test: ['CMD-SHELL', 'pg_isready -U repodb']\n interval: 5s\n timeout: 5s\n retries: 5\n\n replica1:\n image: postgres:16\n environment:\n POSTGRES_USER: repodb\n POSTGRES_PASSWORD: repodb\n POSTGRES_DB: repodb_test\n ports:\n - '5433:5432'\n depends_on:\n primary:\n condition: service_healthy\n command: \u003e\n bash -c \"\n until pg_basebackup -h primary -D /var/lib/postgresql/data -U repodb -Fp -Xs -P -R; do\n sleep 1\n done\n chmod 700 /var/lib/postgresql/data\n postgres\n \"\n volumes:\n - replica1_data:/var/lib/postgresql/data\n\n replica2:\n image: postgres:16\n environment:\n POSTGRES_USER: repodb\n POSTGRES_PASSWORD: repodb\n POSTGRES_DB: repodb_test\n ports:\n - '5434:5432'\n depends_on:\n primary:\n condition: service_healthy\n command: \u003e\n bash -c \"\n until pg_basebackup -h primary -D /var/lib/postgresql/data -U repodb -Fp -Xs -P -R; do\n sleep 1\n done\n chmod 700 /var/lib/postgresql/data\n postgres\n \"\n volumes:\n - replica2_data:/var/lib/postgresql/data\n\nvolumes:\n primary_data:\n replica1_data:\n replica2_data:\n```\n\n### Test Runner Script: test/integration/run_tests.sh\n```bash\n#!/bin/bash\nset -e\n\n# Start PostgreSQL cluster\npodman-compose -f test/integration/docker-compose.yml up -d\n\n# Wait for replicas to catch up\nsleep 10\n\n# Run integration tests\ndune exec test/integration/test_cqrs_integration.exe\n\n# Cleanup\npodman-compose -f test/integration/docker-compose.yml down -v\n```\n\n### Integration Test File: test/integration/test_cqrs_integration.ml\nTest cases:\n1. Write to primary, read from replica (verify replication)\n2. Transaction isolation (all ops on primary)\n3. Replica lag handling\n4. Connection pooling under load\n5. Failover when replica dies\n\n## Acceptance Criteria\n- [ ] Podman compose file creates working PG cluster\n- [ ] Replication works between primary and replicas\n- [ ] Integration tests pass\n- [ ] Cleanup script removes all containers/volumes","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T11:14:57.307024006+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:32:01.121669503+01:00","closed_at":"2026-01-05T11:32:01.121669503+01:00","close_reason":"Created Podman compose setup with primary+2 replicas, init script, test runner, and integration test suite","dependencies":[{"issue_id":"mlecto-e6g","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:45.282902638+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-e6g","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:17.810402811+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-e6g","depends_on_id":"mlecto-1lq","type":"blocks","created_at":"2026-01-05T11:16:22.852706838+01:00","created_by":"gdiazlo"}]} 19 {"id":"mlecto-flv","title":"Implement Multi module (Mlecto.Multi)","description":"Chain operations: Multi.new |\u003e Multi.insert |\u003e Multi.update |\u003e Multi.run. Named operations, access previous results, atomic execution.","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-04T00:21:50.583520239+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:32.435485703+01:00","closed_at":"2026-01-04T01:20:32.435485703+01:00","close_reason":"Multi with operation chaining, named results, merge, validate, execute_sync","dependencies":[{"issue_id":"mlecto-flv","depends_on_id":"mlecto-8ki","type":"blocks","created_at":"2026-01-04T00:22:07.51964609+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-flv","depends_on_id":"mlecto-hwj","type":"blocks","created_at":"2026-01-04T00:22:07.527438964+01:00","created_by":"gdiazlo"}]} 20 {"id":"mlecto-gfo","title":"[EPIC] Changeset System","description":"Casting, validation, and error accumulation. Separate validation from persistence with composable validators.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:07.363157981+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.808207655+01:00","closed_at":"2026-01-04T01:20:44.808207655+01:00","close_reason":"All child tasks completed"} 21 {"id":"mlecto-gxh","title":"Implement Changeset constraints","description":"unique_constraint, foreign_key_constraint, check_constraint, exclusion_constraint. Convert DB errors to changeset errors.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:42.297745588+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:14:08.663238895+01:00","closed_at":"2026-01-04T01:14:08.663238895+01:00","close_reason":"Constraints implemented in Changeset core: unique_constraint, foreign_key_constraint, check_constraint.","dependencies":[{"issue_id":"mlecto-gxh","depends_on_id":"mlecto-m13","type":"blocks","created_at":"2026-01-04T00:22:03.965402454+01:00","created_by":"gdiazlo"}]} 22 {"id":"mlecto-hw6","title":"[EPIC] mlecto - Ecto-like database toolkit for OCaml","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:20:55.991644266+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:49.927971061+01:00","closed_at":"2026-01-04T01:20:49.927971061+01:00","close_reason":"Core mlecto library complete: Type, Schema, Expr, Query, Changeset, Repo, Migration, Multi modules implemented"} 23 {"id":"mlecto-hwj","title":"Implement Repo transactions","description":"transaction/1 function wrapping Caqti transactions. Automatic rollback on error. Nested transaction support (savepoints).","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:46.455864854+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:19:20.791036484+01:00","closed_at":"2026-01-04T01:19:20.791036484+01:00","close_reason":"Transaction state machine with BEGIN/COMMIT/ROLLBACK and nested savepoint support","dependencies":[{"issue_id":"mlecto-hwj","depends_on_id":"mlecto-8ki","type":"blocks","created_at":"2026-01-04T00:22:05.217260647+01:00","created_by":"gdiazlo"}]} 24 + {"id":"mlecto-hwq","title":"Write CQRS documentation","description":"Create documentation for the CQRS and MultiPool features.\n\n## New File: docs/cqrs.md\n\n### Sections\n\n1. **Overview**\n - What is CQRS\n - When to use it\n - How repodb implements it\n\n2. **Quick Start**\n ```ocaml\n (* Basic CQRS setup *)\n module Cqrs = Cqrs.Make(Repodb_postgresql)\n\n let cqrs = Cqrs.create {\n primary_conninfo = \"postgresql://primary/mydb\";\n primary_max_size = 10;\n replica_conninfos = [\n \"postgresql://replica1/mydb\";\n \"postgresql://replica2/mydb\";\n ];\n replica_max_size_each = 5;\n replica_selection = RoundRobin;\n validate = None;\n }\n ```\n\n3. **Read Operations**\n - Which operations are routed to replicas\n - How to force read from primary\n\n4. **Write Operations**\n - Always go to primary\n - Transaction handling\n\n5. **Explicit Routing**\n - with_primary for read-after-write consistency\n - with_replica for specific use cases\n\n6. **Configuration Options**\n - Full config reference\n - Replica selection strategies\n\n7. **Best Practices**\n - When to use CQRS\n - Handling replica lag\n - Monitoring\n\n## Update: docs/pool.md\n\nAdd section on MultiPool:\n- Multi-server configuration\n- Round-robin behavior\n- Health checking\n\n## Acceptance Criteria\n- [ ] docs/cqrs.md complete with examples\n- [ ] docs/pool.md updated for MultiPool\n- [ ] README.md mentions new features\n- [ ] Examples compile and work","status":"closed","priority":3,"issue_type":"task","created_at":"2026-01-05T11:15:13.481713538+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:34:08.431613146+01:00","closed_at":"2026-01-05T11:34:08.431613146+01:00","close_reason":"Created docs/cqrs.md, updated docs/pool.md with MultiPool section, updated README.md","dependencies":[{"issue_id":"mlecto-hwq","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:50.325885062+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-hwq","depends_on_id":"mlecto-1lq","type":"blocks","created_at":"2026-01-05T11:16:27.893913307+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-hwq","depends_on_id":"mlecto-2gn","type":"blocks","created_at":"2026-01-05T11:16:32.936486931+01:00","created_by":"gdiazlo"}]} 25 {"id":"mlecto-l6s","title":"[EPIC] Query DSL","description":"Type-safe SQL query builder using GADTs and phantom types. Composable queries as first-class values.","status":"closed","priority":1,"issue_type":"feature","created_at":"2026-01-04T00:21:08.537372834+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:20:44.809745142+01:00","closed_at":"2026-01-04T01:20:44.809745142+01:00","close_reason":"All child tasks completed"} 26 + {"id":"mlecto-lg9","title":"Add unit tests for Cqrs module","description":"Add comprehensive unit tests for the CQRS routing functionality.\n\n## New File: test/test_cqrs.ml\n\n### Test Cases\n\n1. **Read Routing**\n - Configure CQRS with primary + replicas\n - Execute with_read -\u003e verify goes to replica\n - Execute Repo.all -\u003e verify goes to replica\n\n2. **Write Routing**\n - Execute with_write -\u003e verify goes to primary\n - Execute Repo.insert -\u003e verify goes to primary\n\n3. **Query-Based Routing**\n - SELECT query -\u003e replica\n - INSERT query -\u003e primary\n - UPDATE query -\u003e primary\n - DELETE query -\u003e primary\n\n4. **Transaction Pinning**\n - Start transaction -\u003e all ops go to primary\n - Even read operations inside transaction go to primary\n - Same connection used throughout transaction\n\n5. **No Replicas Mode**\n - Configure with empty replica list\n - All operations go to primary\n - No errors, works like single-server pool\n\n6. **Replica Fallback**\n - All replicas unhealthy\n - Reads fall back to primary\n - Warning logged (if logging added)\n\n7. **with_primary / with_replica**\n - Explicit routing works as expected\n - with_primary always uses primary even for reads\n - with_replica errors if no replicas configured\n\n8. **Round-Robin Across Replicas**\n - Multiple replicas configured\n - Reads distributed evenly\n\n## Mock Setup\n```ocaml\n(* Track which pool each operation went to *)\ntype routing_tracker = {\n mutable primary_calls: int;\n mutable replica_calls: int list; (* per-replica counts *)\n}\n```\n\n## Acceptance Criteria\n- [ ] All test cases implemented\n- [ ] Tests pass with `dune test`\n- [ ] Transaction isolation tested\n- [ ] Fallback behavior tested","status":"closed","priority":2,"issue_type":"task","created_at":"2026-01-05T11:14:37.578081717+01:00","created_by":"gdiazlo","updated_at":"2026-01-05T11:29:10.203623799+01:00","closed_at":"2026-01-05T11:29:10.203623799+01:00","close_reason":"Added 13 unit tests for Cqrs covering routing, replicas, fallback, and transactions","dependencies":[{"issue_id":"mlecto-lg9","depends_on_id":"mlecto-3wf","type":"blocks","created_at":"2026-01-05T11:15:40.237975143+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-lg9","depends_on_id":"mlecto-1lq","type":"blocks","created_at":"2026-01-05T11:16:12.765564629+01:00","created_by":"gdiazlo"}]} 27 {"id":"mlecto-m13","title":"Implement Changeset core (Mlecto.Changeset)","description":"Changeset record type: data, changes, errors, valid?, params, required. Cast function for external params. Change function for internal data.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:38.875652686+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:13:59.358336893+01:00","closed_at":"2026-01-04T01:13:59.358336893+01:00","close_reason":"Changeset core complete: type-safe field handling, validations (required, format, length, inclusion, exclusion, number, acceptance, confirmation, custom), constraints (unique, foreign_key, check), error handling.","dependencies":[{"issue_id":"mlecto-m13","depends_on_id":"mlecto-z72","type":"blocks","created_at":"2026-01-04T00:22:03.941717052+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-m13","depends_on_id":"mlecto-1ud","type":"blocks","created_at":"2026-01-04T00:22:03.95059939+01:00","created_by":"gdiazlo"}]} 28 {"id":"mlecto-nv7","title":"Implement Expression types (Mlecto.Expr)","description":"GADTs for SQL expressions: literals, columns, operators (+, -, =, \u003c\u003e, AND, OR), function calls, casts. Type-safe expression composition.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:26.662064082+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:11:24.632729849+01:00","closed_at":"2026-01-04T01:11:24.632729849+01:00","close_reason":"Expression types complete: GADT for all SQL expressions, operators (comparison, logical, arithmetic), functions (aggregate, string, date, math), CASE, BETWEEN, IN, CAST, SQL generation.","dependencies":[{"issue_id":"mlecto-nv7","depends_on_id":"mlecto-z72","type":"blocks","created_at":"2026-01-04T00:22:02.531940181+01:00","created_by":"gdiazlo"}]} 29 {"id":"mlecto-pvs","title":"Implement Migration runner","description":"schema_migrations table, version tracking, up/down execution, migrate/rollback commands, migration status reporting.","status":"closed","priority":1,"issue_type":"task","created_at":"2026-01-04T00:21:48.916746145+01:00","created_by":"gdiazlo","updated_at":"2026-01-04T01:18:41.847908163+01:00","closed_at":"2026-01-04T01:18:41.847908163+01:00","close_reason":"Migration runner with version tracking, plan_migrate, plan_rollback, format_status","dependencies":[{"issue_id":"mlecto-pvs","depends_on_id":"mlecto-uek","type":"blocks","created_at":"2026-01-04T00:22:06.667807096+01:00","created_by":"gdiazlo"},{"issue_id":"mlecto-pvs","depends_on_id":"mlecto-8ki","type":"blocks","created_at":"2026-01-04T00:22:06.6757586+01:00","created_by":"gdiazlo"}]}
+3
README.md
··· 15 - **Multi** - Compose multiple operations in a single transaction 16 - **Migrations** - Version your database schema 17 - **Connection Pool** - Scheduler-agnostic, lock-free connection pooling 18 19 ## Quick Example 20 ··· 59 - [Transactions](docs/transactions.md) - Multi and transaction handling 60 - [Migrations](docs/migrations.md) - Database schema versioning 61 - [Connection Pool](docs/pool.md) - Scheduler-agnostic connection pooling 62 63 ## Packages 64
··· 15 - **Multi** - Compose multiple operations in a single transaction 16 - **Migrations** - Version your database schema 17 - **Connection Pool** - Scheduler-agnostic, lock-free connection pooling 18 + - **CQRS** - Transparent read/write splitting with replica support 19 + - **Multi-Server Pool** - Round-robin load balancing across database servers 20 21 ## Quick Example 22 ··· 61 - [Transactions](docs/transactions.md) - Multi and transaction handling 62 - [Migrations](docs/migrations.md) - Database schema versioning 63 - [Connection Pool](docs/pool.md) - Scheduler-agnostic connection pooling 64 + - [CQRS](docs/cqrs.md) - Read/write splitting with replicas 65 66 ## Packages 67
+196
docs/cqrs.md
···
··· 1 + # CQRS (Command Query Responsibility Segregation) 2 + 3 + repodb supports CQRS through transparent read/write splitting. Reads are automatically routed to replica servers while writes go to the primary, all without requiring application code changes. 4 + 5 + ## When to Use CQRS 6 + 7 + CQRS is beneficial when: 8 + - Read traffic significantly exceeds write traffic 9 + - You want to scale reads horizontally with replicas 10 + - You want to avoid database load balancers like pgbouncer 11 + - Your application can tolerate slight replica lag for reads 12 + 13 + ## Quick Start 14 + 15 + ```ocaml 16 + open Repodb 17 + 18 + module Cqrs = Cqrs.Make(Repodb_postgresql) 19 + module Repo = Repo.Make(Repodb_postgresql) 20 + 21 + let cqrs = Cqrs.create { 22 + primary_conninfo = "host=primary dbname=myapp user=app"; 23 + primary_max_size = 10; 24 + replica_conninfos = [ 25 + "host=replica1 dbname=myapp user=app"; 26 + "host=replica2 dbname=myapp user=app"; 27 + ]; 28 + replica_max_size_each = 5; 29 + replica_selection = RoundRobin; 30 + validate = None; 31 + } 32 + 33 + (* Reads automatically go to replicas *) 34 + let users = Cqrs.with_read cqrs (fun conn -> 35 + Repo.all conn ~table:users_table ~decode:decode_user 36 + ) 37 + 38 + (* Writes always go to primary *) 39 + let () = Cqrs.with_write cqrs (fun conn -> 40 + Repo.insert conn ~table:users_table ~columns ~values 41 + ) 42 + ``` 43 + 44 + ## Read Operations 45 + 46 + Use `with_read` for operations that only query data: 47 + 48 + ```ocaml 49 + (* Routed to a replica *) 50 + let result = Cqrs.with_read cqrs (fun conn -> 51 + Repo.all_query conn query ~decode 52 + ) 53 + ``` 54 + 55 + When replicas are configured, reads are distributed using the configured selection strategy. If all replicas are unhealthy, reads automatically fall back to the primary. 56 + 57 + ### Forcing Read from Primary 58 + 59 + For read-after-write consistency, use `with_primary`: 60 + 61 + ```ocaml 62 + (* Insert a user *) 63 + let user_id = Cqrs.with_write cqrs (fun conn -> 64 + Repo.insert_returning conn ~table ~columns ~values ~decode 65 + ) 66 + 67 + (* Read immediately after write - use primary to avoid replica lag *) 68 + let user = Cqrs.with_primary cqrs (fun conn -> 69 + Repo.get conn ~table ~id:user_id ~decode 70 + ) 71 + ``` 72 + 73 + ## Write Operations 74 + 75 + Writes always go to the primary server: 76 + 77 + ```ocaml 78 + let () = Cqrs.with_write cqrs (fun conn -> 79 + Repo.update conn ~table ~columns ~values ~where_column ~where_value 80 + ) 81 + ``` 82 + 83 + ## Transactions 84 + 85 + Transactions are always executed on the primary with a pinned connection: 86 + 87 + ```ocaml 88 + let result = Cqrs.transaction cqrs (fun conn -> 89 + (* All operations use the same primary connection *) 90 + match Repo.insert conn ~table:orders ~columns ~values with 91 + | Ok () -> 92 + Repo.update conn ~table:inventory ~columns:["stock"] 93 + ~values:[Value.int (stock - 1)] ~where_column:"id" ~where_value:(Value.int item_id) 94 + | Error e -> Error e 95 + ) 96 + ``` 97 + 98 + ## Replica Selection Strategies 99 + 100 + ### RoundRobin (Default) 101 + 102 + Distributes requests evenly across healthy replicas: 103 + 104 + ```ocaml 105 + replica_selection = RoundRobin; 106 + ``` 107 + 108 + ### Random 109 + 110 + Selects a random healthy replica for each request: 111 + 112 + ```ocaml 113 + replica_selection = Random; 114 + ``` 115 + 116 + ### LeastConnections 117 + 118 + Routes to the replica with the fewest active connections: 119 + 120 + ```ocaml 121 + replica_selection = LeastConnections; 122 + ``` 123 + 124 + ## Health Management 125 + 126 + Mark replicas as unhealthy when they fail: 127 + 128 + ```ocaml 129 + (* Mark replica 0 as unhealthy *) 130 + Cqrs.mark_replica_unhealthy cqrs 0; 131 + 132 + (* Later, mark it healthy again *) 133 + Cqrs.mark_replica_healthy cqrs 0; 134 + ``` 135 + 136 + When all replicas are marked unhealthy, reads automatically fall back to the primary. 137 + 138 + ## Configuration Reference 139 + 140 + ```ocaml 141 + type 'conn config = { 142 + primary_conninfo : string; (* Primary server connection string *) 143 + primary_max_size : int; (* Max connections to primary *) 144 + replica_conninfos : string list; (* Replica connection strings, empty = no CQRS *) 145 + replica_max_size_each : int; (* Max connections per replica *) 146 + replica_selection : replica_selection; (* RoundRobin | Random | LeastConnections *) 147 + validate : ('conn -> bool) option; (* Optional connection validator *) 148 + } 149 + ``` 150 + 151 + ## Monitoring 152 + 153 + Check CQRS status with `stats`: 154 + 155 + ```ocaml 156 + let s = Cqrs.stats cqrs in 157 + Printf.printf "Primary: total=%d in_use=%d available=%d\n" 158 + s.primary.total s.primary.in_use s.primary.available; 159 + Printf.printf "Healthy replicas: %d\n" s.healthy_replica_count; 160 + ``` 161 + 162 + ## Disabling CQRS 163 + 164 + To disable CQRS, pass an empty replica list: 165 + 166 + ```ocaml 167 + let cqrs = Cqrs.create { 168 + primary_conninfo = "host=localhost dbname=myapp"; 169 + primary_max_size = 10; 170 + replica_conninfos = []; (* No replicas = all traffic goes to primary *) 171 + replica_max_size_each = 0; 172 + replica_selection = RoundRobin; 173 + validate = None; 174 + } 175 + ``` 176 + 177 + ## Shutdown 178 + 179 + Always shut down the CQRS instance when done: 180 + 181 + ```ocaml 182 + Cqrs.shutdown cqrs 183 + ``` 184 + 185 + ## Best Practices 186 + 187 + 1. **Size pools appropriately**: Primary pool should handle write load; replica pools handle read load 188 + 2. **Handle replica lag**: Use `with_primary` for reads that need immediate consistency 189 + 3. **Monitor health**: Track unhealthy replica counts and investigate failures 190 + 4. **Connection validation**: Use `validate` to detect stale connections 191 + 192 + ## Next Steps 193 + 194 + - [Connection Pool](pool.md) - Lower-level pool management and MultiPool 195 + - [Repo](repo.md) - Database operations 196 + - [Transactions](transactions.md) - Transaction handling
+56
docs/pool.md
··· 356 Ok conn 357 ``` 358 359 ## Next Steps 360 361 - [Repo](repo.md) - Database operations 362 - [Transactions](transactions.md) - Multi and transaction handling 363 - [Queries](queries.md) - Building complex queries
··· 356 Ok conn 357 ``` 358 359 + ## Multi-Server Pool 360 + 361 + For load balancing across multiple database servers without an external proxy, use `Pool.Make(Driver).Multi`: 362 + 363 + ### Setup 364 + 365 + ```ocaml 366 + module Pool = Pool.Make(Repodb_postgresql) 367 + 368 + let multi = Pool.Multi.create_sized 369 + ~servers:[ 370 + "host=server1 dbname=myapp"; 371 + "host=server2 dbname=myapp"; 372 + "host=server3 dbname=myapp"; 373 + ] 374 + ~max_size_per_server:5 375 + () 376 + ``` 377 + 378 + ### Usage 379 + 380 + ```ocaml 381 + (* Connections distributed via round-robin *) 382 + let result = Pool.Multi.with_connection multi (fun conn -> 383 + Repo.all conn ~table:users ~decode 384 + ) 385 + ``` 386 + 387 + ### Health Management 388 + 389 + ```ocaml 390 + (* Mark a server as unhealthy (skipped in round-robin) *) 391 + Pool.Multi.mark_unhealthy multi 1; 392 + 393 + (* Mark healthy again *) 394 + Pool.Multi.mark_healthy multi 0; 395 + ``` 396 + 397 + ### Statistics 398 + 399 + ```ocaml 400 + let stats = Pool.Multi.stats multi in 401 + Printf.printf "Total servers: %d\n" stats.total_servers; 402 + Printf.printf "Healthy servers: %d\n" stats.healthy_servers; 403 + Printf.printf "Total connections: %d\n" stats.aggregate.total; 404 + ``` 405 + 406 + ### Shutdown 407 + 408 + ```ocaml 409 + Pool.Multi.shutdown multi 410 + ``` 411 + 412 + For read/write splitting with replicas, see [CQRS](cqrs.md). 413 + 414 ## Next Steps 415 416 + - [CQRS](cqrs.md) - Read/write splitting with replicas 417 - [Repo](repo.md) - Database operations 418 - [Transactions](transactions.md) - Multi and transaction handling 419 - [Queries](queries.md) - Building complex queries
+275
lib/cqrs.ml
···
··· 1 + open Kcas 2 + 3 + type replica_selection = RoundRobin | Random | LeastConnections 4 + 5 + type 'conn config = { 6 + primary_conninfo : string; 7 + primary_max_size : int; 8 + replica_conninfos : string list; 9 + replica_max_size_each : int; 10 + replica_selection : replica_selection; 11 + validate : ('conn -> bool) option; 12 + } 13 + 14 + type 'conn t = { 15 + primary_pool : 'conn Pool.t; 16 + replica_pools : 'conn Pool.t array option; 17 + replica_healthy : bool Loc.t array option; 18 + replica_index : int Loc.t; 19 + selection : replica_selection; 20 + } 21 + 22 + type intent = Read | Write 23 + 24 + let route_query_type = function 25 + | Query.Select -> Read 26 + | Query.Insert | Query.Update | Query.Delete -> Write 27 + 28 + let has_replicas t = Option.is_some t.replica_pools 29 + 30 + let count_healthy_replicas t = 31 + match t.replica_healthy with 32 + | None -> 0 33 + | Some healthy -> 34 + Array.fold_left 35 + (fun acc h -> if Loc.get h then acc + 1 else acc) 36 + 0 healthy 37 + 38 + let next_replica_round_robin t = 39 + match (t.replica_pools, t.replica_healthy) with 40 + | None, _ | _, None -> None 41 + | Some pools, Some healthy -> 42 + let n = Array.length pools in 43 + let start = Loc.fetch_and_add t.replica_index 1 mod n in 44 + let rec find_healthy attempts idx = 45 + if attempts >= n then None 46 + else if Loc.get healthy.(idx) then Some pools.(idx) 47 + else find_healthy (attempts + 1) ((idx + 1) mod n) 48 + in 49 + find_healthy 0 start 50 + 51 + let next_replica_random t = 52 + match (t.replica_pools, t.replica_healthy) with 53 + | None, _ | _, None -> None 54 + | Some pools, Some healthy -> 55 + let healthy_indices = 56 + Array.to_list (Array.mapi (fun i h -> (i, h)) healthy) 57 + |> List.filter (fun (_, h) -> Loc.get h) 58 + |> List.map fst 59 + in 60 + if healthy_indices = [] then None 61 + else 62 + let idx = 63 + List.nth healthy_indices (Random.int (List.length healthy_indices)) 64 + in 65 + Some pools.(idx) 66 + 67 + let next_replica_least_connections t = 68 + match (t.replica_pools, t.replica_healthy) with 69 + | None, _ | _, None -> None 70 + | Some pools, Some healthy -> 71 + let n = Array.length pools in 72 + let rec find_min best_idx best_count idx = 73 + if idx >= n then if best_idx < 0 then None else Some pools.(best_idx) 74 + else if Loc.get healthy.(idx) then 75 + let count = Pool.in_use pools.(idx) in 76 + if best_idx < 0 || count < best_count then find_min idx count (idx + 1) 77 + else find_min best_idx best_count (idx + 1) 78 + else find_min best_idx best_count (idx + 1) 79 + in 80 + find_min (-1) max_int 0 81 + 82 + let select_replica t = 83 + match t.selection with 84 + | RoundRobin -> next_replica_round_robin t 85 + | Random -> next_replica_random t 86 + | LeastConnections -> next_replica_least_connections t 87 + 88 + let mark_replica_unhealthy t idx = 89 + match t.replica_healthy with 90 + | None -> () 91 + | Some healthy -> 92 + if idx >= 0 && idx < Array.length healthy then Loc.set healthy.(idx) false 93 + 94 + let mark_replica_healthy t idx = 95 + match t.replica_healthy with 96 + | None -> () 97 + | Some healthy -> 98 + if idx >= 0 && idx < Array.length healthy then Loc.set healthy.(idx) true 99 + 100 + let acquire_read t = 101 + match select_replica t with 102 + | Some pool -> ( 103 + match Pool.acquire pool with 104 + | Ok conn -> Ok conn 105 + | Error Pool.Pool_empty | Error Pool.Pool_closed -> 106 + Pool.acquire t.primary_pool 107 + | Error e -> Error e) 108 + | None -> Pool.acquire t.primary_pool 109 + 110 + let acquire_write t = Pool.acquire t.primary_pool 111 + 112 + let release_read t conn = 113 + match t.replica_pools with 114 + | Some pools -> 115 + let released = ref false in 116 + Array.iter 117 + (fun pool -> 118 + if not !released then 119 + try 120 + Pool.release pool conn; 121 + released := true 122 + with _ -> ()) 123 + pools; 124 + if not !released then Pool.release t.primary_pool conn 125 + | None -> Pool.release t.primary_pool conn 126 + 127 + let release_write t conn = Pool.release t.primary_pool conn 128 + 129 + let with_read t f = 130 + match acquire_read t with 131 + | Error e -> Error e 132 + | Ok conn -> ( 133 + match f conn with 134 + | result -> 135 + release_read t conn; 136 + Ok result 137 + | exception exn -> 138 + release_read t conn; 139 + raise exn) 140 + 141 + let with_write t f = 142 + match acquire_write t with 143 + | Error e -> Error e 144 + | Ok conn -> ( 145 + match f conn with 146 + | result -> 147 + release_write t conn; 148 + Ok result 149 + | exception exn -> 150 + release_write t conn; 151 + raise exn) 152 + 153 + let with_primary t f = with_write t f 154 + 155 + let with_replica t f = 156 + match t.replica_pools with 157 + | None -> Error Pool.Pool_empty 158 + | Some _ -> with_read t f 159 + 160 + type stats = { 161 + primary : Pool.stats; 162 + replicas : Pool.stats list option; 163 + healthy_replica_count : int; 164 + } 165 + 166 + let stats t = 167 + let replicas = 168 + Option.map 169 + (fun pools -> Array.to_list (Array.map Pool.stats pools)) 170 + t.replica_pools 171 + in 172 + { 173 + primary = Pool.stats t.primary_pool; 174 + replicas; 175 + healthy_replica_count = count_healthy_replicas t; 176 + } 177 + 178 + let shutdown t = 179 + Pool.shutdown t.primary_pool; 180 + Option.iter (Array.iter Pool.shutdown) t.replica_pools 181 + 182 + module Make (D : Driver.S) = struct 183 + type nonrec t = D.connection t 184 + 185 + let create config = 186 + let primary_pool = 187 + let pool_config = 188 + Pool. 189 + { 190 + max_size = config.primary_max_size; 191 + connect = 192 + (fun () -> 193 + D.connect config.primary_conninfo 194 + |> Result.map_error D.error_message); 195 + close = D.close; 196 + validate = config.validate; 197 + } 198 + in 199 + Pool.create pool_config 200 + in 201 + let replica_pools, replica_healthy = 202 + if config.replica_conninfos = [] then (None, None) 203 + else 204 + let pools = 205 + Array.of_list 206 + (List.map 207 + (fun conninfo -> 208 + let pool_config = 209 + Pool. 210 + { 211 + max_size = config.replica_max_size_each; 212 + connect = 213 + (fun () -> 214 + D.connect conninfo 215 + |> Result.map_error D.error_message); 216 + close = D.close; 217 + validate = config.validate; 218 + } 219 + in 220 + Pool.create pool_config) 221 + config.replica_conninfos) 222 + in 223 + let healthy = 224 + Array.init (Array.length pools) (fun _ -> Loc.make true) 225 + in 226 + (Some pools, Some healthy) 227 + in 228 + { 229 + primary_pool; 230 + replica_pools; 231 + replica_healthy; 232 + replica_index = Loc.make 0; 233 + selection = config.replica_selection; 234 + } 235 + 236 + let with_read = with_read 237 + let with_write = with_write 238 + let with_primary = with_primary 239 + let with_replica = with_replica 240 + 241 + let transaction t f = 242 + match Pool.acquire t.primary_pool with 243 + | Error e -> Error (Error.Query_failed (Pool.error_to_string e)) 244 + | Ok conn -> ( 245 + match D.exec conn "BEGIN" ~params:[||] with 246 + | Error e -> 247 + Pool.release t.primary_pool conn; 248 + Error (Error.Query_failed (D.error_message e)) 249 + | Ok () -> ( 250 + match f conn with 251 + | Ok v -> ( 252 + match D.exec conn "COMMIT" ~params:[||] with 253 + | Ok () -> 254 + Pool.release t.primary_pool conn; 255 + Ok v 256 + | Error e -> 257 + Pool.release t.primary_pool conn; 258 + Error (Error.Query_failed (D.error_message e))) 259 + | Error err -> 260 + let _ = D.exec conn "ROLLBACK" ~params:[||] in 261 + Pool.release t.primary_pool conn; 262 + Error err 263 + | exception exn -> 264 + let _ = D.exec conn "ROLLBACK" ~params:[||] in 265 + Pool.release t.primary_pool conn; 266 + raise exn)) 267 + 268 + let has_replicas = has_replicas 269 + let count_healthy_replicas = count_healthy_replicas 270 + let mark_replica_healthy = mark_replica_healthy 271 + let mark_replica_unhealthy = mark_replica_unhealthy 272 + let stats = stats 273 + let shutdown = shutdown 274 + let route_query_type = route_query_type 275 + end
+2 -1
lib/dune
··· 17 migration 18 multi 19 stream 20 - pool))
··· 17 migration 18 multi 19 stream 20 + pool 21 + cqrs))
+275
lib/pool.ml
··· 208 | Pool_timeout -> "Timeout waiting for connection" 209 | Connection_error msg -> Printf.sprintf "Connection error: %s" msg 210 211 module Make (D : Driver.S) = struct 212 type nonrec t = D.connection t 213 type conn = D.connection ··· 236 let available = available 237 let in_use = in_use 238 let is_closed = is_closed 239 end
··· 208 | Pool_timeout -> "Timeout waiting for connection" 209 | Connection_error msg -> Printf.sprintf "Connection error: %s" msg 210 211 + (** Multi-server pool with round-robin load balancing. 212 + 213 + Distributes connections across multiple database servers without requiring 214 + an external load balancer. Each server maintains its own connection pool. 215 + 216 + {[ 217 + let config = 218 + Pool. 219 + { 220 + servers = [ "host1"; "host2"; "host3" ]; 221 + max_size_per_server = 5; 222 + connect = (fun conninfo -> Repodb_postgresql.connect conninfo); 223 + close = Repodb_postgresql.close; 224 + validate = None; 225 + } 226 + in 227 + let multi = Pool.create_multi config in 228 + Pool.with_connection_multi multi (fun conn -> 229 + Repo.all conn ~table:users ~decode) 230 + ]} *) 231 + 232 + type 'conn multi_config = { 233 + servers : string list; 234 + max_size_per_server : int; 235 + connect : string -> ('conn, string) result; 236 + close : 'conn -> unit; 237 + validate : ('conn -> bool) option; 238 + } 239 + 240 + type 'conn multi_t = { 241 + pools : 'conn t array; 242 + pool_conninfos : string array; 243 + conn_to_pool : ('conn, int) Stdlib.Hashtbl.t; 244 + healthy : bool Loc.t array; 245 + next_index : int Loc.t; 246 + closed : bool Loc.t; 247 + } 248 + 249 + let create_multi config = 250 + if config.servers = [] then 251 + invalid_arg "Pool.create_multi: servers list cannot be empty"; 252 + let n = List.length config.servers in 253 + let pools = 254 + Array.of_list 255 + (List.map 256 + (fun conninfo -> 257 + let pool_config = 258 + { 259 + max_size = config.max_size_per_server; 260 + connect = (fun () -> config.connect conninfo); 261 + close = config.close; 262 + validate = config.validate; 263 + } 264 + in 265 + create pool_config) 266 + config.servers) 267 + in 268 + let healthy = Array.init n (fun _ -> Loc.make true) in 269 + { 270 + pools; 271 + pool_conninfos = Array.of_list config.servers; 272 + conn_to_pool = Stdlib.Hashtbl.create (n * config.max_size_per_server); 273 + healthy; 274 + next_index = Loc.make 0; 275 + closed = Loc.make false; 276 + } 277 + 278 + let multi_size multi = 279 + Array.fold_left (fun acc pool -> acc + size pool) 0 multi.pools 280 + 281 + let multi_in_use multi = 282 + Array.fold_left (fun acc pool -> acc + in_use pool) 0 multi.pools 283 + 284 + let multi_available multi = 285 + Array.fold_left (fun acc pool -> acc + available pool) 0 multi.pools 286 + 287 + let multi_is_closed multi = Loc.get multi.closed 288 + let multi_server_count multi = Array.length multi.pools 289 + 290 + let multi_is_healthy multi idx = 291 + if idx < 0 || idx >= Array.length multi.healthy then false 292 + else Loc.get multi.healthy.(idx) 293 + 294 + let mark_unhealthy multi idx = 295 + if idx >= 0 && idx < Array.length multi.healthy then 296 + Loc.set multi.healthy.(idx) false 297 + 298 + let mark_healthy multi idx = 299 + if idx >= 0 && idx < Array.length multi.healthy then 300 + Loc.set multi.healthy.(idx) true 301 + 302 + let count_healthy multi = 303 + Array.fold_left 304 + (fun acc h -> if Loc.get h then acc + 1 else acc) 305 + 0 multi.healthy 306 + 307 + (** Find next healthy server using round-robin *) 308 + let next_healthy_index multi = 309 + let n = Array.length multi.pools in 310 + let start = Loc.fetch_and_add multi.next_index 1 mod n in 311 + let rec find_healthy attempts idx = 312 + if attempts >= n then None 313 + else if Loc.get multi.healthy.(idx) then Some idx 314 + else find_healthy (attempts + 1) ((idx + 1) mod n) 315 + in 316 + find_healthy 0 start 317 + 318 + let acquire_multi multi = 319 + if Loc.get multi.closed then Error Pool_closed 320 + else 321 + match next_healthy_index multi with 322 + | None -> Error Pool_empty (* All servers unhealthy *) 323 + | Some idx -> ( 324 + match acquire multi.pools.(idx) with 325 + | Ok conn -> 326 + Stdlib.Hashtbl.replace multi.conn_to_pool conn idx; 327 + Ok conn 328 + | Error Pool_empty -> 329 + (* This server's pool is exhausted, try others *) 330 + let n = Array.length multi.pools in 331 + let rec try_others attempts current_idx = 332 + if attempts >= n then Error Pool_empty 333 + else if current_idx <> idx && Loc.get multi.healthy.(current_idx) 334 + then 335 + match acquire multi.pools.(current_idx) with 336 + | Ok conn -> 337 + Stdlib.Hashtbl.replace multi.conn_to_pool conn current_idx; 338 + Ok conn 339 + | Error Pool_empty -> 340 + try_others (attempts + 1) ((current_idx + 1) mod n) 341 + | Error e -> Error e 342 + else try_others (attempts + 1) ((current_idx + 1) mod n) 343 + in 344 + try_others 1 ((idx + 1) mod n) 345 + | Error e -> Error e) 346 + 347 + let release_multi multi conn = 348 + match Stdlib.Hashtbl.find_opt multi.conn_to_pool conn with 349 + | Some idx -> 350 + Stdlib.Hashtbl.remove multi.conn_to_pool conn; 351 + if Loc.get multi.closed then multi.pools.(idx).config.close conn 352 + else release multi.pools.(idx) conn 353 + | None -> 354 + (* Connection not tracked, close it directly using first pool's config *) 355 + if Array.length multi.pools > 0 then multi.pools.(0).config.close conn 356 + 357 + let with_connection_multi multi f = 358 + match acquire_multi multi with 359 + | Error e -> Error e 360 + | Ok conn -> ( 361 + match f conn with 362 + | result -> 363 + release_multi multi conn; 364 + Ok result 365 + | exception exn -> 366 + release_multi multi conn; 367 + raise exn) 368 + 369 + let rec acquire_multi_blocking ?timeoutf multi = 370 + if Loc.get multi.closed then Error Pool_closed 371 + else 372 + match acquire_multi multi with 373 + | Ok conn -> Ok conn 374 + | Error Pool_empty -> ( 375 + (* All pools exhausted, wait on a healthy one *) 376 + match next_healthy_index multi with 377 + | None -> Error Pool_empty 378 + | Some idx -> ( 379 + match acquire_blocking ?timeoutf multi.pools.(idx) with 380 + | Ok conn -> 381 + Stdlib.Hashtbl.replace multi.conn_to_pool conn idx; 382 + Ok conn 383 + | Error Pool_timeout -> Error Pool_timeout 384 + | Error Pool_closed -> Error Pool_closed 385 + | Error Pool_empty -> acquire_multi_blocking ?timeoutf multi 386 + | Error e -> Error e)) 387 + | Error e -> Error e 388 + 389 + let with_connection_multi_blocking ?timeoutf multi f = 390 + match acquire_multi_blocking ?timeoutf multi with 391 + | Error e -> Error e 392 + | Ok conn -> ( 393 + match f conn with 394 + | result -> 395 + release_multi multi conn; 396 + Ok result 397 + | exception exn -> 398 + release_multi multi conn; 399 + raise exn) 400 + 401 + let drain_multi multi = Array.iter drain multi.pools 402 + 403 + let shutdown_multi multi = 404 + Loc.set multi.closed true; 405 + Array.iter shutdown multi.pools; 406 + Stdlib.Hashtbl.clear multi.conn_to_pool 407 + 408 + type multi_stats = { 409 + total_servers : int; 410 + healthy_servers : int; 411 + per_server : (string * stats) list; 412 + aggregate : stats; 413 + } 414 + 415 + let stats_multi multi = 416 + let per_server = 417 + Array.to_list 418 + (Array.mapi 419 + (fun i pool -> (multi.pool_conninfos.(i), stats pool)) 420 + multi.pools) 421 + in 422 + let aggregate = 423 + { 424 + total = multi_size multi; 425 + available = multi_available multi; 426 + in_use = multi_in_use multi; 427 + closed = multi_is_closed multi; 428 + } 429 + in 430 + { 431 + total_servers = Array.length multi.pools; 432 + healthy_servers = count_healthy multi; 433 + per_server; 434 + aggregate; 435 + } 436 + 437 module Make (D : Driver.S) = struct 438 type nonrec t = D.connection t 439 type conn = D.connection ··· 462 let available = available 463 let in_use = in_use 464 let is_closed = is_closed 465 + 466 + module Multi = struct 467 + type t = D.connection multi_t 468 + 469 + let create ~servers ?validate () = 470 + let cfg = 471 + { 472 + servers; 473 + max_size_per_server = 10; 474 + connect = 475 + (fun conninfo -> 476 + D.connect conninfo |> Result.map_error D.error_message); 477 + close = D.close; 478 + validate; 479 + } 480 + in 481 + create_multi cfg 482 + 483 + let create_sized ~servers ~max_size_per_server ?validate () = 484 + let cfg = 485 + { 486 + servers; 487 + max_size_per_server; 488 + connect = 489 + (fun conninfo -> 490 + D.connect conninfo |> Result.map_error D.error_message); 491 + close = D.close; 492 + validate; 493 + } 494 + in 495 + create_multi cfg 496 + 497 + let acquire = acquire_multi 498 + let acquire_blocking = acquire_multi_blocking 499 + let release = release_multi 500 + let with_connection = with_connection_multi 501 + let with_connection_blocking = with_connection_multi_blocking 502 + let drain = drain_multi 503 + let shutdown = shutdown_multi 504 + let stats = stats_multi 505 + let size = multi_size 506 + let available = multi_available 507 + let in_use = multi_in_use 508 + let is_closed = multi_is_closed 509 + let server_count = multi_server_count 510 + let is_healthy = multi_is_healthy 511 + let mark_healthy = mark_healthy 512 + let mark_unhealthy = mark_unhealthy 513 + end 514 end
+1 -1
repodb-postgresql.opam
··· 1 # This file is generated by dune, edit dune-project instead 2 opam-version: "2.0" 3 - version: "0.2.0" 4 synopsis: "PostgreSQL driver for repodb" 5 description: 6 "PostgreSQL driver implementation for repodb using the postgresql-ocaml library."
··· 1 # This file is generated by dune, edit dune-project instead 2 opam-version: "2.0" 3 + version: "0.2.1" 4 synopsis: "PostgreSQL driver for repodb" 5 description: 6 "PostgreSQL driver implementation for repodb using the postgresql-ocaml library."
+1 -1
repodb-sqlite.opam
··· 1 # This file is generated by dune, edit dune-project instead 2 opam-version: "2.0" 3 - version: "0.2.0" 4 synopsis: "SQLite driver for repodb" 5 description: 6 "SQLite driver implementation for repodb using the sqlite3-ocaml library."
··· 1 # This file is generated by dune, edit dune-project instead 2 opam-version: "2.0" 3 + version: "0.2.1" 4 synopsis: "SQLite driver for repodb" 5 description: 6 "SQLite driver implementation for repodb using the sqlite3-ocaml library."
+1 -1
repodb.opam
··· 1 # This file is generated by dune, edit dune-project instead 2 opam-version: "2.0" 3 - version: "0.2.0" 4 synopsis: "Ecto-like database toolkit for OCaml" 5 description: 6 "repodb is a database toolkit inspired by Elixir's Ecto. It provides type-safe query building, schema definitions, changesets for data validation, and database migrations."
··· 1 # This file is generated by dune, edit dune-project instead 2 opam-version: "2.0" 3 + version: "0.2.1" 4 synopsis: "Ecto-like database toolkit for OCaml" 5 description: 6 "repodb is a database toolkit inspired by Elixir's Ecto. It provides type-safe query building, schema definitions, changesets for data validation, and database migrations."
+2
test/dune
··· 17 test_multi 18 test_stream 19 test_pool 20 test_repodb)) 21 22 (test
··· 17 test_multi 18 test_stream 19 test_pool 20 + test_multi_pool 21 + test_cqrs 22 test_repodb)) 23 24 (test
+86
test/integration/docker-compose.yml
···
··· 1 + version: '3.8' 2 + 3 + services: 4 + primary: 5 + image: postgres:16 6 + container_name: repodb_primary 7 + environment: 8 + POSTGRES_USER: repodb 9 + POSTGRES_PASSWORD: repodb 10 + POSTGRES_DB: repodb_test 11 + ports: 12 + - '5432:5432' 13 + command: > 14 + postgres 15 + -c wal_level=replica 16 + -c max_wal_senders=3 17 + -c max_replication_slots=3 18 + -c hot_standby=on 19 + -c listen_addresses='*' 20 + volumes: 21 + - ./init-primary.sh:/docker-entrypoint-initdb.d/init-primary.sh 22 + healthcheck: 23 + test: ['CMD-SHELL', 'pg_isready -U repodb -d repodb_test'] 24 + interval: 5s 25 + timeout: 5s 26 + retries: 10 27 + 28 + replica1: 29 + image: postgres:16 30 + container_name: repodb_replica1 31 + environment: 32 + POSTGRES_USER: repodb 33 + POSTGRES_PASSWORD: repodb 34 + PGUSER: repodb 35 + PGPASSWORD: repodb 36 + ports: 37 + - '5433:5432' 38 + depends_on: 39 + primary: 40 + condition: service_healthy 41 + entrypoint: /bin/bash 42 + command: 43 + - -c 44 + - | 45 + rm -rf /var/lib/postgresql/data/* 46 + until PGPASSWORD=repodb pg_basebackup -h primary -D /var/lib/postgresql/data -U repodb -Fp -Xs -P -R; do 47 + echo "Waiting for primary..." 48 + sleep 2 49 + done 50 + chmod 700 /var/lib/postgresql/data 51 + exec postgres 52 + healthcheck: 53 + test: ['CMD-SHELL', 'pg_isready -U repodb'] 54 + interval: 5s 55 + timeout: 5s 56 + retries: 10 57 + 58 + replica2: 59 + image: postgres:16 60 + container_name: repodb_replica2 61 + environment: 62 + POSTGRES_USER: repodb 63 + POSTGRES_PASSWORD: repodb 64 + PGUSER: repodb 65 + PGPASSWORD: repodb 66 + ports: 67 + - '5434:5432' 68 + depends_on: 69 + primary: 70 + condition: service_healthy 71 + entrypoint: /bin/bash 72 + command: 73 + - -c 74 + - | 75 + rm -rf /var/lib/postgresql/data/* 76 + until PGPASSWORD=repodb pg_basebackup -h primary -D /var/lib/postgresql/data -U repodb -Fp -Xs -P -R; do 77 + echo "Waiting for primary..." 78 + sleep 2 79 + done 80 + chmod 700 /var/lib/postgresql/data 81 + exec postgres 82 + healthcheck: 83 + test: ['CMD-SHELL', 'pg_isready -U repodb'] 84 + interval: 5s 85 + timeout: 5s 86 + retries: 10
+3
test/integration/dune
···
··· 1 + (executable 2 + (name test_cqrs_integration) 3 + (libraries repodb repodb-postgresql unix))
+16
test/integration/init-primary.sh
···
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + # Configure pg_hba.conf to allow replication connections 5 + echo "host replication repodb 0.0.0.0/0 md5" >> "$PGDATA/pg_hba.conf" 6 + echo "host all repodb 0.0.0.0/0 md5" >> "$PGDATA/pg_hba.conf" 7 + 8 + # Create test table 9 + psql -v ON_ERROR_STOP=1 --username "$POSTGRES_USER" --dbname "$POSTGRES_DB" <<-EOSQL 10 + CREATE TABLE IF NOT EXISTS test_users ( 11 + id SERIAL PRIMARY KEY, 12 + name VARCHAR(100) NOT NULL, 13 + email VARCHAR(255) NOT NULL, 14 + created_at TIMESTAMP DEFAULT NOW() 15 + ); 16 + EOSQL
+51
test/integration/run_tests.sh
···
··· 1 + #!/bin/bash 2 + set -e 3 + 4 + SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 5 + PROJECT_ROOT="$(cd "$SCRIPT_DIR/../.." && pwd)" 6 + 7 + cleanup() { 8 + echo "Cleaning up..." 9 + cd "$SCRIPT_DIR" 10 + podman-compose down -v 2>/dev/null || docker-compose down -v 2>/dev/null || true 11 + } 12 + 13 + trap cleanup EXIT 14 + 15 + echo "=== Starting PostgreSQL cluster ===" 16 + cd "$SCRIPT_DIR" 17 + chmod +x init-primary.sh 18 + 19 + if command -v podman-compose &> /dev/null; then 20 + COMPOSE_CMD="podman-compose" 21 + elif command -v docker-compose &> /dev/null; then 22 + COMPOSE_CMD="docker-compose" 23 + else 24 + echo "Error: Neither podman-compose nor docker-compose found" 25 + exit 1 26 + fi 27 + 28 + $COMPOSE_CMD down -v 2>/dev/null || true 29 + $COMPOSE_CMD up -d 30 + 31 + echo "=== Waiting for primary to be ready ===" 32 + for i in {1..30}; do 33 + if $COMPOSE_CMD exec -T primary pg_isready -U repodb -d repodb_test 2>/dev/null; then 34 + echo "Primary is ready" 35 + break 36 + fi 37 + echo "Waiting for primary... ($i/30)" 38 + sleep 2 39 + done 40 + 41 + echo "=== Waiting for replicas to sync ===" 42 + sleep 15 43 + 44 + echo "=== Verifying replication ===" 45 + $COMPOSE_CMD exec -T primary psql -U repodb -d repodb_test -c "SELECT client_addr, state FROM pg_stat_replication;" || true 46 + 47 + echo "=== Running integration tests ===" 48 + cd "$PROJECT_ROOT" 49 + dune exec test/integration/test_cqrs_integration.exe 50 + 51 + echo "=== Tests completed successfully ==="
+250
test/integration/test_cqrs_integration.ml
···
··· 1 + open Repodb 2 + module PgCqrs = Cqrs.Make (Repodb_postgresql) 3 + module Repo = Repo.Make (Repodb_postgresql) 4 + 5 + let primary_conninfo = 6 + "host=localhost port=5432 dbname=repodb_test user=repodb password=repodb" 7 + 8 + let replica1_conninfo = 9 + "host=localhost port=5433 dbname=repodb_test user=repodb password=repodb" 10 + 11 + let replica2_conninfo = 12 + "host=localhost port=5434 dbname=repodb_test user=repodb password=repodb" 13 + 14 + let _users_table = Schema.table "test_users" 15 + 16 + type user = { id : int; name : string; email : string [@warning "-69"] } 17 + 18 + let decode_user row = 19 + { 20 + id = Driver.row_int row 0; 21 + name = Driver.row_text row 1; 22 + email = Driver.row_text row 2; 23 + } 24 + 25 + let test_write_to_primary_read_from_replica () = 26 + Printf.printf "=== Test: Write to primary, read from replica ===\n%!"; 27 + let cqrs = 28 + PgCqrs.create 29 + { 30 + primary_conninfo; 31 + primary_max_size = 5; 32 + replica_conninfos = [ replica1_conninfo; replica2_conninfo ]; 33 + replica_max_size_each = 5; 34 + replica_selection = Cqrs.RoundRobin; 35 + validate = None; 36 + } 37 + in 38 + let test_name = Printf.sprintf "test_user_%d" (Random.int 10000) in 39 + let test_email = Printf.sprintf "%s@example.com" test_name in 40 + 41 + let insert_result = 42 + PgCqrs.with_write cqrs (fun conn -> 43 + let sql = 44 + Printf.sprintf 45 + "INSERT INTO test_users (name, email) VALUES ('%s', '%s') \ 46 + RETURNING id" 47 + test_name test_email 48 + in 49 + Repodb_postgresql.query_one conn sql ~params:[||]) 50 + in 51 + (match insert_result with 52 + | Ok (Ok (Some row)) -> 53 + let id = Driver.row_int row 0 in 54 + Printf.printf " Inserted user with id=%d\n%!" id 55 + | _ -> failwith "Insert failed"); 56 + 57 + Unix.sleepf 2.0; 58 + 59 + let read_result = 60 + PgCqrs.with_read cqrs (fun conn -> 61 + let sql = 62 + Printf.sprintf 63 + "SELECT id, name, email FROM test_users WHERE name = '%s'" test_name 64 + in 65 + Repodb_postgresql.query_one conn sql ~params:[||]) 66 + in 67 + (match read_result with 68 + | Ok (Ok (Some row)) -> 69 + let user = decode_user row in 70 + Printf.printf " Read user from replica: id=%d, name=%s\n%!" user.id 71 + user.name; 72 + assert (user.name = test_name) 73 + | _ -> failwith "Read from replica failed"); 74 + 75 + PgCqrs.shutdown cqrs; 76 + Printf.printf " PASSED\n%!" 77 + 78 + let test_transaction_uses_primary () = 79 + Printf.printf "=== Test: Transaction uses primary ===\n%!"; 80 + let cqrs = 81 + PgCqrs.create 82 + { 83 + primary_conninfo; 84 + primary_max_size = 5; 85 + replica_conninfos = [ replica1_conninfo ]; 86 + replica_max_size_each = 5; 87 + replica_selection = Cqrs.RoundRobin; 88 + validate = None; 89 + } 90 + in 91 + let test_name = Printf.sprintf "tx_user_%d" (Random.int 10000) in 92 + 93 + let tx_result = 94 + PgCqrs.transaction cqrs (fun conn -> 95 + let insert_sql = 96 + Printf.sprintf 97 + "INSERT INTO test_users (name, email) VALUES ('%s', '%s@tx.com')" 98 + test_name test_name 99 + in 100 + match Repodb_postgresql.exec conn insert_sql ~params:[||] with 101 + | Ok () -> ( 102 + let select_sql = 103 + Printf.sprintf 104 + "SELECT id, name, email FROM test_users WHERE name = '%s'" 105 + test_name 106 + in 107 + match Repodb_postgresql.query_one conn select_sql ~params:[||] with 108 + | Ok (Some row) -> 109 + let user = decode_user row in 110 + Ok user 111 + | Ok None -> Error Error.Not_found 112 + | Error e -> 113 + Error (Error.Query_failed (Repodb_postgresql.error_message e))) 114 + | Error e -> 115 + Error (Error.Query_failed (Repodb_postgresql.error_message e))) 116 + in 117 + (match tx_result with 118 + | Ok user -> 119 + Printf.printf " Transaction completed: id=%d, name=%s\n%!" user.id 120 + user.name; 121 + assert (user.name = test_name) 122 + | Error e -> 123 + Printf.printf " Error: %s\n%!" (Error.show_db_error e); 124 + failwith "Transaction failed"); 125 + 126 + PgCqrs.shutdown cqrs; 127 + Printf.printf " PASSED\n%!" 128 + 129 + module TestPool = Pool.Make (Repodb_postgresql) 130 + 131 + let test_multi_pool_round_robin () = 132 + Printf.printf "=== Test: MultiPool round-robin ===\n%!"; 133 + let multi = 134 + TestPool.Multi.create_sized 135 + ~servers:[ primary_conninfo; replica1_conninfo; replica2_conninfo ] 136 + ~max_size_per_server:3 () 137 + in 138 + let successful_queries = ref 0 in 139 + for _ = 1 to 9 do 140 + match 141 + TestPool.Multi.with_connection multi (fun conn -> 142 + match Repodb_postgresql.query_one conn "SELECT 1" ~params:[||] with 143 + | Ok _ -> incr successful_queries 144 + | Error _ -> ()) 145 + with 146 + | Ok () -> () 147 + | Error e -> Printf.printf " Warning: %s\n%!" (Pool.error_to_string e) 148 + done; 149 + Printf.printf " Successful queries: %d/9\n%!" !successful_queries; 150 + assert (!successful_queries >= 6); 151 + TestPool.Multi.shutdown multi; 152 + Printf.printf " PASSED\n%!" 153 + 154 + let test_cqrs_fallback_when_replica_down () = 155 + Printf.printf "=== Test: CQRS fallback when replicas marked unhealthy ===\n%!"; 156 + let cqrs = 157 + PgCqrs.create 158 + { 159 + primary_conninfo; 160 + primary_max_size = 5; 161 + replica_conninfos = [ replica1_conninfo; replica2_conninfo ]; 162 + replica_max_size_each = 5; 163 + replica_selection = Cqrs.RoundRobin; 164 + validate = None; 165 + } 166 + in 167 + PgCqrs.mark_replica_unhealthy cqrs 0; 168 + PgCqrs.mark_replica_unhealthy cqrs 1; 169 + 170 + let read_result = 171 + PgCqrs.with_read cqrs (fun conn -> 172 + Repodb_postgresql.query_one conn "SELECT 1" ~params:[||]) 173 + in 174 + (match read_result with 175 + | Ok (Ok _) -> Printf.printf " Read succeeded via fallback to primary\n%!" 176 + | _ -> failwith "Fallback read failed"); 177 + 178 + PgCqrs.shutdown cqrs; 179 + Printf.printf " PASSED\n%!" 180 + 181 + let test_concurrent_load () = 182 + Printf.printf "=== Test: Concurrent load ===\n%!"; 183 + let cqrs = 184 + PgCqrs.create 185 + { 186 + primary_conninfo; 187 + primary_max_size = 10; 188 + replica_conninfos = [ replica1_conninfo; replica2_conninfo ]; 189 + replica_max_size_each = 5; 190 + replica_selection = Cqrs.RoundRobin; 191 + validate = None; 192 + } 193 + in 194 + let success_count = Atomic.make 0 in 195 + let error_count = Atomic.make 0 in 196 + let num_domains = 4 in 197 + let ops_per_domain = 25 in 198 + 199 + let domains = 200 + Array.init num_domains (fun i -> 201 + Domain.spawn (fun () -> 202 + for j = 1 to ops_per_domain do 203 + let op = ((i * ops_per_domain) + j) mod 3 in 204 + if op = 0 then 205 + match 206 + PgCqrs.with_write cqrs (fun conn -> 207 + Repodb_postgresql.exec conn 208 + (Printf.sprintf 209 + "INSERT INTO test_users (name, email) VALUES \ 210 + ('load_%d_%d', 'load@test.com')" 211 + i j) 212 + ~params:[||]) 213 + with 214 + | Ok (Ok ()) -> Atomic.incr success_count 215 + | _ -> Atomic.incr error_count 216 + else 217 + match 218 + PgCqrs.with_read cqrs (fun conn -> 219 + Repodb_postgresql.query_one conn 220 + "SELECT COUNT(*) FROM test_users" ~params:[||]) 221 + with 222 + | Ok (Ok _) -> Atomic.incr success_count 223 + | _ -> Atomic.incr error_count 224 + done)) 225 + in 226 + Array.iter Domain.join domains; 227 + 228 + let total_success = Atomic.get success_count in 229 + let total_errors = Atomic.get error_count in 230 + Printf.printf " Successful: %d, Errors: %d\n%!" total_success total_errors; 231 + assert (total_success > num_domains * ops_per_domain * 8 / 10); 232 + 233 + PgCqrs.shutdown cqrs; 234 + Printf.printf " PASSED\n%!" 235 + 236 + let () = 237 + Random.self_init (); 238 + Printf.printf "\n========================================\n"; 239 + Printf.printf "repodb CQRS Integration Tests\n"; 240 + Printf.printf "========================================\n\n%!"; 241 + 242 + test_write_to_primary_read_from_replica (); 243 + test_transaction_uses_primary (); 244 + test_multi_pool_round_robin (); 245 + test_cqrs_fallback_when_replica_down (); 246 + test_concurrent_load (); 247 + 248 + Printf.printf "\n========================================\n"; 249 + Printf.printf "All integration tests PASSED!\n"; 250 + Printf.printf "========================================\n%!"
+337
test/test_cqrs.ml
···
··· 1 + open Repodb 2 + 3 + type mock_conn = { id : int; mutable closed : bool; server : string } 4 + 5 + let conn_counter = Atomic.make 0 6 + 7 + module MockDriver : Driver.S with type connection = mock_conn = struct 8 + type connection = mock_conn 9 + type error = string 10 + 11 + let dialect = Driver.PostgreSQL 12 + let error_message e = e 13 + 14 + let connect conninfo = 15 + let id = Atomic.fetch_and_add conn_counter 1 in 16 + Ok { id; closed = false; server = conninfo } 17 + 18 + let close conn = conn.closed <- true 19 + 20 + let with_connection conninfo f = 21 + match connect conninfo with 22 + | Error e -> Error e 23 + | Ok conn -> 24 + let result = f conn in 25 + close conn; 26 + result 27 + 28 + let exec _conn _sql ~params:_ = Ok () 29 + let query _conn _sql ~params:_ = Ok [] 30 + let query_one _conn _sql ~params:_ = Ok None 31 + let query_fold _conn _sql ~params:_ ~init ~f:_ = Ok init 32 + let query_iter _conn _sql ~params:_ ~f:_ = Ok () 33 + let transaction conn f = f conn 34 + let placeholder n = Printf.sprintf "$%d" n 35 + let returning_supported = true 36 + let upsert_syntax = `PostgreSQL 37 + let last_insert_id _conn = Error "Use RETURNING" 38 + end 39 + 40 + module TestCqrs = Cqrs.Make (MockDriver) 41 + 42 + let test_create_no_replicas () = 43 + let cqrs = 44 + TestCqrs.create 45 + { 46 + primary_conninfo = "primary"; 47 + primary_max_size = 5; 48 + replica_conninfos = []; 49 + replica_max_size_each = 5; 50 + replica_selection = Cqrs.RoundRobin; 51 + validate = None; 52 + } 53 + in 54 + Alcotest.(check bool) "no replicas" false (TestCqrs.has_replicas cqrs); 55 + Alcotest.(check int) "healthy count" 0 (TestCqrs.count_healthy_replicas cqrs); 56 + TestCqrs.shutdown cqrs 57 + 58 + let test_create_with_replicas () = 59 + let cqrs = 60 + TestCqrs.create 61 + { 62 + primary_conninfo = "primary"; 63 + primary_max_size = 5; 64 + replica_conninfos = [ "replica1"; "replica2" ]; 65 + replica_max_size_each = 5; 66 + replica_selection = Cqrs.RoundRobin; 67 + validate = None; 68 + } 69 + in 70 + Alcotest.(check bool) "has replicas" true (TestCqrs.has_replicas cqrs); 71 + Alcotest.(check int) "healthy count" 2 (TestCqrs.count_healthy_replicas cqrs); 72 + TestCqrs.shutdown cqrs 73 + 74 + let test_with_read_uses_replica () = 75 + let cqrs = 76 + TestCqrs.create 77 + { 78 + primary_conninfo = "primary"; 79 + primary_max_size = 5; 80 + replica_conninfos = [ "replica1" ]; 81 + replica_max_size_each = 5; 82 + replica_selection = Cqrs.RoundRobin; 83 + validate = None; 84 + } 85 + in 86 + let server_used = ref "" in 87 + let result = 88 + TestCqrs.with_read cqrs (fun conn -> 89 + server_used := conn.server; 90 + 42) 91 + in 92 + (match result with Ok 42 -> () | _ -> Alcotest.fail "Expected Ok 42"); 93 + Alcotest.(check string) "read used replica" "replica1" !server_used; 94 + TestCqrs.shutdown cqrs 95 + 96 + let test_with_write_uses_primary () = 97 + let cqrs = 98 + TestCqrs.create 99 + { 100 + primary_conninfo = "primary"; 101 + primary_max_size = 5; 102 + replica_conninfos = [ "replica1" ]; 103 + replica_max_size_each = 5; 104 + replica_selection = Cqrs.RoundRobin; 105 + validate = None; 106 + } 107 + in 108 + let server_used = ref "" in 109 + let result = 110 + TestCqrs.with_write cqrs (fun conn -> 111 + server_used := conn.server; 112 + 42) 113 + in 114 + (match result with Ok 42 -> () | _ -> Alcotest.fail "Expected Ok 42"); 115 + Alcotest.(check string) "write used primary" "primary" !server_used; 116 + TestCqrs.shutdown cqrs 117 + 118 + let test_with_primary_uses_primary () = 119 + let cqrs = 120 + TestCqrs.create 121 + { 122 + primary_conninfo = "primary"; 123 + primary_max_size = 5; 124 + replica_conninfos = [ "replica1" ]; 125 + replica_max_size_each = 5; 126 + replica_selection = Cqrs.RoundRobin; 127 + validate = None; 128 + } 129 + in 130 + let server_used = ref "" in 131 + let result = 132 + TestCqrs.with_primary cqrs (fun conn -> 133 + server_used := conn.server; 134 + 42) 135 + in 136 + (match result with Ok 42 -> () | _ -> Alcotest.fail "Expected Ok 42"); 137 + Alcotest.(check string) "with_primary used primary" "primary" !server_used; 138 + TestCqrs.shutdown cqrs 139 + 140 + let test_no_replicas_read_falls_back_to_primary () = 141 + let cqrs = 142 + TestCqrs.create 143 + { 144 + primary_conninfo = "primary"; 145 + primary_max_size = 5; 146 + replica_conninfos = []; 147 + replica_max_size_each = 5; 148 + replica_selection = Cqrs.RoundRobin; 149 + validate = None; 150 + } 151 + in 152 + let server_used = ref "" in 153 + let result = 154 + TestCqrs.with_read cqrs (fun conn -> 155 + server_used := conn.server; 156 + 42) 157 + in 158 + (match result with Ok 42 -> () | _ -> Alcotest.fail "Expected Ok 42"); 159 + Alcotest.(check string) "read fallback to primary" "primary" !server_used; 160 + TestCqrs.shutdown cqrs 161 + 162 + let test_round_robin_replicas () = 163 + let cqrs = 164 + TestCqrs.create 165 + { 166 + primary_conninfo = "primary"; 167 + primary_max_size = 5; 168 + replica_conninfos = [ "r1"; "r2"; "r3" ]; 169 + replica_max_size_each = 5; 170 + replica_selection = Cqrs.RoundRobin; 171 + validate = None; 172 + } 173 + in 174 + let servers = ref [] in 175 + for _ = 1 to 6 do 176 + let _ = 177 + TestCqrs.with_read cqrs (fun conn -> 178 + servers := conn.server :: !servers; 179 + ()) 180 + in 181 + () 182 + done; 183 + let counts = 184 + [ "r1"; "r2"; "r3" ] 185 + |> List.map (fun s -> List.length (List.filter (( = ) s) !servers)) 186 + in 187 + Alcotest.(check int) "r1 count" 2 (List.nth counts 0); 188 + Alcotest.(check int) "r2 count" 2 (List.nth counts 1); 189 + Alcotest.(check int) "r3 count" 2 (List.nth counts 2); 190 + TestCqrs.shutdown cqrs 191 + 192 + let test_unhealthy_replica_skipped () = 193 + let cqrs = 194 + TestCqrs.create 195 + { 196 + primary_conninfo = "primary"; 197 + primary_max_size = 5; 198 + replica_conninfos = [ "r1"; "r2" ]; 199 + replica_max_size_each = 5; 200 + replica_selection = Cqrs.RoundRobin; 201 + validate = None; 202 + } 203 + in 204 + TestCqrs.mark_replica_unhealthy cqrs 0; 205 + let servers = ref [] in 206 + for _ = 1 to 3 do 207 + let _ = 208 + TestCqrs.with_read cqrs (fun conn -> 209 + servers := conn.server :: !servers; 210 + ()) 211 + in 212 + () 213 + done; 214 + let r1_count = List.length (List.filter (( = ) "r1") !servers) in 215 + let r2_count = List.length (List.filter (( = ) "r2") !servers) in 216 + Alcotest.(check int) "r1 should be skipped" 0 r1_count; 217 + Alcotest.(check int) "r2 should handle all" 3 r2_count; 218 + TestCqrs.shutdown cqrs 219 + 220 + let test_all_replicas_unhealthy_fallback () = 221 + let cqrs = 222 + TestCqrs.create 223 + { 224 + primary_conninfo = "primary"; 225 + primary_max_size = 5; 226 + replica_conninfos = [ "r1"; "r2" ]; 227 + replica_max_size_each = 5; 228 + replica_selection = Cqrs.RoundRobin; 229 + validate = None; 230 + } 231 + in 232 + TestCqrs.mark_replica_unhealthy cqrs 0; 233 + TestCqrs.mark_replica_unhealthy cqrs 1; 234 + let server_used = ref "" in 235 + let result = 236 + TestCqrs.with_read cqrs (fun conn -> 237 + server_used := conn.server; 238 + 42) 239 + in 240 + (match result with Ok 42 -> () | _ -> Alcotest.fail "Expected Ok 42"); 241 + Alcotest.(check string) "fallback to primary" "primary" !server_used; 242 + TestCqrs.shutdown cqrs 243 + 244 + let test_transaction_uses_primary () = 245 + let cqrs = 246 + TestCqrs.create 247 + { 248 + primary_conninfo = "primary"; 249 + primary_max_size = 5; 250 + replica_conninfos = [ "r1" ]; 251 + replica_max_size_each = 5; 252 + replica_selection = Cqrs.RoundRobin; 253 + validate = None; 254 + } 255 + in 256 + let server_used = ref "" in 257 + let result = 258 + TestCqrs.transaction cqrs (fun conn -> 259 + server_used := conn.server; 260 + Ok 42) 261 + in 262 + (match result with Ok 42 -> () | _ -> Alcotest.fail "Expected Ok 42"); 263 + Alcotest.(check string) "transaction used primary" "primary" !server_used; 264 + TestCqrs.shutdown cqrs 265 + 266 + let test_stats () = 267 + let cqrs = 268 + TestCqrs.create 269 + { 270 + primary_conninfo = "primary"; 271 + primary_max_size = 5; 272 + replica_conninfos = [ "r1"; "r2" ]; 273 + replica_max_size_each = 5; 274 + replica_selection = Cqrs.RoundRobin; 275 + validate = None; 276 + } 277 + in 278 + let stats = TestCqrs.stats cqrs in 279 + Alcotest.(check int) "healthy count" 2 stats.healthy_replica_count; 280 + (match stats.replicas with 281 + | Some replicas -> 282 + Alcotest.(check int) "replica pools" 2 (List.length replicas) 283 + | None -> Alcotest.fail "Expected replicas"); 284 + TestCqrs.shutdown cqrs 285 + 286 + let test_route_query_type () = 287 + Alcotest.(check bool) 288 + "SELECT is Read" true 289 + (TestCqrs.route_query_type Query.Select = Cqrs.Read); 290 + Alcotest.(check bool) 291 + "INSERT is Write" true 292 + (TestCqrs.route_query_type Query.Insert = Cqrs.Write); 293 + Alcotest.(check bool) 294 + "UPDATE is Write" true 295 + (TestCqrs.route_query_type Query.Update = Cqrs.Write); 296 + Alcotest.(check bool) 297 + "DELETE is Write" true 298 + (TestCqrs.route_query_type Query.Delete = Cqrs.Write) 299 + 300 + let test_with_replica_no_replicas () = 301 + let cqrs = 302 + TestCqrs.create 303 + { 304 + primary_conninfo = "primary"; 305 + primary_max_size = 5; 306 + replica_conninfos = []; 307 + replica_max_size_each = 5; 308 + replica_selection = Cqrs.RoundRobin; 309 + validate = None; 310 + } 311 + in 312 + let result = TestCqrs.with_replica cqrs (fun _ -> 42) in 313 + (match result with 314 + | Error Pool.Pool_empty -> () 315 + | _ -> Alcotest.fail "Expected Pool_empty when no replicas"); 316 + TestCqrs.shutdown cqrs 317 + 318 + let tests = 319 + [ 320 + ("create no replicas", `Quick, test_create_no_replicas); 321 + ("create with replicas", `Quick, test_create_with_replicas); 322 + ("with_read uses replica", `Quick, test_with_read_uses_replica); 323 + ("with_write uses primary", `Quick, test_with_write_uses_primary); 324 + ("with_primary uses primary", `Quick, test_with_primary_uses_primary); 325 + ( "no replicas read fallback", 326 + `Quick, 327 + test_no_replicas_read_falls_back_to_primary ); 328 + ("round robin replicas", `Quick, test_round_robin_replicas); 329 + ("unhealthy replica skipped", `Quick, test_unhealthy_replica_skipped); 330 + ( "all replicas unhealthy fallback", 331 + `Quick, 332 + test_all_replicas_unhealthy_fallback ); 333 + ("transaction uses primary", `Quick, test_transaction_uses_primary); 334 + ("stats", `Quick, test_stats); 335 + ("route_query_type", `Quick, test_route_query_type); 336 + ("with_replica no replicas", `Quick, test_with_replica_no_replicas); 337 + ]
+250
test/test_multi_pool.ml
···
··· 1 + open Repodb 2 + 3 + type mock_conn = { id : int; server_idx : int; mutable closed : bool } 4 + 5 + let conn_counter = Atomic.make 0 6 + 7 + let find_index_in_list elem lst = 8 + let rec aux i = function 9 + | [] -> None 10 + | x :: xs -> if x = elem then Some i else aux (i + 1) xs 11 + in 12 + aux 0 lst 13 + 14 + let mock_multi_config ?(servers = [ "s1"; "s2"; "s3" ]) ?(max_size = 2) 15 + ?(fail_servers = []) () = 16 + Pool. 17 + { 18 + servers; 19 + max_size_per_server = max_size; 20 + connect = 21 + (fun conninfo -> 22 + let server_idx = 23 + match find_index_in_list conninfo servers with 24 + | Some i -> i 25 + | None -> -1 26 + in 27 + if List.mem server_idx fail_servers then 28 + Error (Printf.sprintf "Failed to connect to %s" conninfo) 29 + else 30 + let id = Atomic.fetch_and_add conn_counter 1 in 31 + Ok { id; server_idx; closed = false }); 32 + close = (fun conn -> conn.closed <- true); 33 + validate = None; 34 + } 35 + 36 + let test_create_multi () = 37 + let multi = Pool.create_multi (mock_multi_config ()) in 38 + Alcotest.(check int) "initial size" 0 (Pool.multi_size multi); 39 + Alcotest.(check int) "initial available" 0 (Pool.multi_available multi); 40 + Alcotest.(check int) "initial in_use" 0 (Pool.multi_in_use multi); 41 + Alcotest.(check bool) "not closed" false (Pool.multi_is_closed multi); 42 + Alcotest.(check int) "server count" 3 (Pool.multi_server_count multi) 43 + 44 + let test_create_multi_empty_servers () = 45 + try 46 + let _ = Pool.create_multi (mock_multi_config ~servers:[] ()) in 47 + Alcotest.fail "Expected Invalid_argument" 48 + with Invalid_argument _ -> () 49 + 50 + let test_round_robin_distribution () = 51 + let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in 52 + let counts = [| 0; 0; 0 |] in 53 + for _ = 1 to 9 do 54 + match Pool.acquire_multi multi with 55 + | Ok conn -> counts.(conn.server_idx) <- counts.(conn.server_idx) + 1 56 + | Error _ -> Alcotest.fail "Unexpected acquire error" 57 + done; 58 + Alcotest.(check int) "server 0 count" 3 counts.(0); 59 + Alcotest.(check int) "server 1 count" 3 counts.(1); 60 + Alcotest.(check int) "server 2 count" 3 counts.(2); 61 + Pool.shutdown_multi multi 62 + 63 + let test_health_check_skipping () = 64 + let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in 65 + Pool.mark_unhealthy multi 1; 66 + let counts = [| 0; 0; 0 |] in 67 + for _ = 1 to 6 do 68 + match Pool.acquire_multi multi with 69 + | Ok conn -> counts.(conn.server_idx) <- counts.(conn.server_idx) + 1 70 + | Error _ -> Alcotest.fail "Unexpected acquire error" 71 + done; 72 + Alcotest.(check int) "server 1 should be skipped" 0 counts.(1); 73 + Alcotest.(check bool) 74 + "server 0 or 2 got connections" true 75 + (counts.(0) > 0 || counts.(2) > 0); 76 + Pool.shutdown_multi multi 77 + 78 + let test_all_unhealthy () = 79 + let multi = Pool.create_multi (mock_multi_config ()) in 80 + Pool.mark_unhealthy multi 0; 81 + Pool.mark_unhealthy multi 1; 82 + Pool.mark_unhealthy multi 2; 83 + (match Pool.acquire_multi multi with 84 + | Error Pool.Pool_empty -> () 85 + | _ -> Alcotest.fail "Expected Pool_empty when all servers unhealthy"); 86 + Pool.shutdown_multi multi 87 + 88 + let test_mark_healthy_recovery () = 89 + let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in 90 + Pool.mark_unhealthy multi 0; 91 + Pool.mark_unhealthy multi 1; 92 + Pool.mark_unhealthy multi 2; 93 + (match Pool.acquire_multi multi with 94 + | Error Pool.Pool_empty -> () 95 + | _ -> Alcotest.fail "Expected Pool_empty"); 96 + Pool.mark_healthy multi 1; 97 + (match Pool.acquire_multi multi with 98 + | Ok conn -> Alcotest.(check int) "should use server 1" 1 conn.server_idx 99 + | Error _ -> Alcotest.fail "Expected successful acquire after marking healthy"); 100 + Pool.shutdown_multi multi 101 + 102 + let test_with_connection_multi () = 103 + let multi = Pool.create_multi (mock_multi_config ()) in 104 + let result = 105 + Pool.with_connection_multi multi (fun conn -> 106 + Alcotest.(check int) "in_use during work" 1 (Pool.multi_in_use multi); 107 + conn.id + 100) 108 + in 109 + (match result with 110 + | Ok n -> 111 + Alcotest.(check bool) "got result" true (n >= 100); 112 + Alcotest.(check int) "released after" 0 (Pool.multi_in_use multi) 113 + | Error _ -> Alcotest.fail "Expected successful with_connection_multi"); 114 + Pool.shutdown_multi multi 115 + 116 + let test_with_connection_multi_exception () = 117 + let multi = Pool.create_multi (mock_multi_config ()) in 118 + (try 119 + let _ = 120 + Pool.with_connection_multi multi (fun _ -> 121 + Alcotest.(check int) "in_use" 1 (Pool.multi_in_use multi); 122 + failwith "boom") 123 + in 124 + () 125 + with Failure _ -> ()); 126 + Alcotest.(check int) "released after exception" 0 (Pool.multi_in_use multi); 127 + Pool.shutdown_multi multi 128 + 129 + let test_shutdown_multi () = 130 + let multi = Pool.create_multi (mock_multi_config ()) in 131 + let conn = 132 + match Pool.acquire_multi multi with 133 + | Ok c -> c 134 + | Error _ -> Alcotest.fail "Expected successful acquire" 135 + in 136 + Pool.release_multi multi conn; 137 + Pool.shutdown_multi multi; 138 + Alcotest.(check bool) "pool closed" true (Pool.multi_is_closed multi); 139 + match Pool.acquire_multi multi with 140 + | Error Pool.Pool_closed -> () 141 + | _ -> Alcotest.fail "Expected Pool_closed after shutdown" 142 + 143 + let test_stats_multi () = 144 + let multi = Pool.create_multi (mock_multi_config ~max_size:5 ()) in 145 + let conn1 = 146 + match Pool.acquire_multi multi with 147 + | Ok c -> c 148 + | Error _ -> Alcotest.fail "Expected successful acquire" 149 + in 150 + let conn2 = 151 + match Pool.acquire_multi multi with 152 + | Ok c -> c 153 + | Error _ -> Alcotest.fail "Expected successful acquire" 154 + in 155 + let stats = Pool.stats_multi multi in 156 + Alcotest.(check int) "total servers" 3 stats.total_servers; 157 + Alcotest.(check int) "healthy servers" 3 stats.healthy_servers; 158 + Alcotest.(check int) "aggregate in_use" 2 stats.aggregate.in_use; 159 + Alcotest.(check int) "aggregate total" 2 stats.aggregate.total; 160 + Pool.release_multi multi conn1; 161 + Pool.release_multi multi conn2; 162 + Pool.shutdown_multi multi 163 + 164 + let test_single_server_multi () = 165 + let multi = Pool.create_multi (mock_multi_config ~servers:[ "single" ] ()) in 166 + let conn = 167 + match Pool.acquire_multi multi with 168 + | Ok c -> c 169 + | Error _ -> Alcotest.fail "Expected successful acquire" 170 + in 171 + Alcotest.(check int) "single server idx" 0 conn.server_idx; 172 + Pool.release_multi multi conn; 173 + Pool.shutdown_multi multi 174 + 175 + let test_connection_reuse () = 176 + let multi = 177 + Pool.create_multi (mock_multi_config ~servers:[ "s1" ] ~max_size:2 ()) 178 + in 179 + let conn1 = 180 + match Pool.acquire_multi multi with 181 + | Ok c -> c 182 + | Error _ -> Alcotest.fail "Expected successful acquire" 183 + in 184 + let id1 = conn1.id in 185 + Pool.release_multi multi conn1; 186 + let conn2 = 187 + match Pool.acquire_multi multi with 188 + | Ok c -> c 189 + | Error _ -> Alcotest.fail "Expected successful acquire" 190 + in 191 + Alcotest.(check int) "same connection reused" id1 conn2.id; 192 + Pool.release_multi multi conn2; 193 + Pool.shutdown_multi multi 194 + 195 + let test_max_size_per_server () = 196 + let multi = 197 + Pool.create_multi (mock_multi_config ~servers:[ "s1" ] ~max_size:2 ()) 198 + in 199 + let c1 = 200 + match Pool.acquire_multi multi with 201 + | Ok c -> c 202 + | Error _ -> Alcotest.fail "Expected 1st acquire" 203 + in 204 + let c2 = 205 + match Pool.acquire_multi multi with 206 + | Ok c -> c 207 + | Error _ -> Alcotest.fail "Expected 2nd acquire" 208 + in 209 + (match Pool.acquire_multi multi with 210 + | Error Pool.Pool_empty -> () 211 + | _ -> Alcotest.fail "Expected Pool_empty at capacity"); 212 + Pool.release_multi multi c1; 213 + Pool.release_multi multi c2; 214 + Pool.shutdown_multi multi 215 + 216 + let test_fallback_on_pool_exhaustion () = 217 + let multi = Pool.create_multi (mock_multi_config ~max_size:1 ()) in 218 + let conns = ref [] in 219 + for _ = 1 to 3 do 220 + match Pool.acquire_multi multi with 221 + | Ok c -> conns := c :: !conns 222 + | Error _ -> Alcotest.fail "Expected acquire" 223 + done; 224 + Alcotest.(check int) "3 connections acquired" 3 (List.length !conns); 225 + let servers_used = 226 + List.sort_uniq compare (List.map (fun c -> c.server_idx) !conns) 227 + in 228 + Alcotest.(check int) "all 3 servers used" 3 (List.length servers_used); 229 + List.iter (Pool.release_multi multi) !conns; 230 + Pool.shutdown_multi multi 231 + 232 + let tests = 233 + [ 234 + ("create multi", `Quick, test_create_multi); 235 + ("create multi empty servers", `Quick, test_create_multi_empty_servers); 236 + ("round robin distribution", `Quick, test_round_robin_distribution); 237 + ("health check skipping", `Quick, test_health_check_skipping); 238 + ("all unhealthy", `Quick, test_all_unhealthy); 239 + ("mark healthy recovery", `Quick, test_mark_healthy_recovery); 240 + ("with_connection_multi", `Quick, test_with_connection_multi); 241 + ( "with_connection_multi exception", 242 + `Quick, 243 + test_with_connection_multi_exception ); 244 + ("shutdown_multi", `Quick, test_shutdown_multi); 245 + ("stats_multi", `Quick, test_stats_multi); 246 + ("single server multi", `Quick, test_single_server_multi); 247 + ("connection reuse", `Quick, test_connection_reuse); 248 + ("max size per server", `Quick, test_max_size_per_server); 249 + ("fallback on pool exhaustion", `Quick, test_fallback_on_pool_exhaustion); 250 + ]
+2
test/test_repodb.ml
··· 15 ("Multi", Test_multi.tests); 16 ("Stream", Test_stream.tests); 17 ("Pool", Test_pool.tests); 18 ]
··· 15 ("Multi", Test_multi.tests); 16 ("Stream", Test_stream.tests); 17 ("Pool", Test_pool.tests); 18 + ("MultiPool", Test_multi_pool.tests); 19 + ("Cqrs", Test_cqrs.tests); 20 ]