a database layer insipred by caqti and ecto
1open Repodb
2
3type mock_conn = { id : int; server_idx : int; mutable closed : bool }
4
5let conn_counter = Atomic.make 0
6
7let find_index_in_list elem lst =
8 let rec aux i = function
9 | [] -> None
10 | x :: xs -> if x = elem then Some i else aux (i + 1) xs
11 in
12 aux 0 lst
13
14let mock_multi_config ?(servers = [ "s1"; "s2"; "s3" ]) ?(max_size = 2)
15 ?(fail_servers = []) () =
16 Pool.
17 {
18 servers;
19 max_size_per_server = max_size;
20 connect =
21 (fun conninfo ->
22 let server_idx =
23 match find_index_in_list conninfo servers with
24 | Some i -> i
25 | None -> -1
26 in
27 if List.mem server_idx fail_servers then
28 Error (Printf.sprintf "Failed to connect to %s" conninfo)
29 else
30 let id = Atomic.fetch_and_add conn_counter 1 in
31 Ok { id; server_idx; closed = false });
32 close = (fun conn -> conn.closed <- true);
33 validate = None;
34 }
35
36let test_create_multi () =
37 let multi = Pool.create_multi (mock_multi_config ()) in
38 Alcotest.(check int) "initial size" 0 (Pool.multi_size multi);
39 Alcotest.(check int) "initial available" 0 (Pool.multi_available multi);
40 Alcotest.(check int) "initial in_use" 0 (Pool.multi_in_use multi);
41 Alcotest.(check bool) "not closed" false (Pool.multi_is_closed multi);
42 Alcotest.(check int) "server count" 3 (Pool.multi_server_count multi)
43
44let test_create_multi_empty_servers () =
45 try
46 let _ = Pool.create_multi (mock_multi_config ~servers:[] ()) in
47 Alcotest.fail "Expected Invalid_argument"
48 with Invalid_argument _ -> ()
49
50let test_round_robin_distribution () =
51 let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in
52 let counts = [| 0; 0; 0 |] in
53 for _ = 1 to 9 do
54 match Pool.acquire_multi multi with
55 | Ok conn -> counts.(conn.server_idx) <- counts.(conn.server_idx) + 1
56 | Error _ -> Alcotest.fail "Unexpected acquire error"
57 done;
58 Alcotest.(check int) "server 0 count" 3 counts.(0);
59 Alcotest.(check int) "server 1 count" 3 counts.(1);
60 Alcotest.(check int) "server 2 count" 3 counts.(2);
61 Pool.shutdown_multi multi
62
63let test_health_check_skipping () =
64 let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in
65 Pool.mark_unhealthy multi 1;
66 let counts = [| 0; 0; 0 |] in
67 for _ = 1 to 6 do
68 match Pool.acquire_multi multi with
69 | Ok conn -> counts.(conn.server_idx) <- counts.(conn.server_idx) + 1
70 | Error _ -> Alcotest.fail "Unexpected acquire error"
71 done;
72 Alcotest.(check int) "server 1 should be skipped" 0 counts.(1);
73 Alcotest.(check bool)
74 "server 0 or 2 got connections" true
75 (counts.(0) > 0 || counts.(2) > 0);
76 Pool.shutdown_multi multi
77
78let test_all_unhealthy () =
79 let multi = Pool.create_multi (mock_multi_config ()) in
80 Pool.mark_unhealthy multi 0;
81 Pool.mark_unhealthy multi 1;
82 Pool.mark_unhealthy multi 2;
83 (match Pool.acquire_multi multi with
84 | Error Pool.Pool_empty -> ()
85 | _ -> Alcotest.fail "Expected Pool_empty when all servers unhealthy");
86 Pool.shutdown_multi multi
87
88let test_mark_healthy_recovery () =
89 let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in
90 Pool.mark_unhealthy multi 0;
91 Pool.mark_unhealthy multi 1;
92 Pool.mark_unhealthy multi 2;
93 (match Pool.acquire_multi multi with
94 | Error Pool.Pool_empty -> ()
95 | _ -> Alcotest.fail "Expected Pool_empty");
96 Pool.mark_healthy multi 1;
97 (match Pool.acquire_multi multi with
98 | Ok conn -> Alcotest.(check int) "should use server 1" 1 conn.server_idx
99 | Error _ -> Alcotest.fail "Expected successful acquire after marking healthy");
100 Pool.shutdown_multi multi
101
102let test_with_connection_multi () =
103 let multi = Pool.create_multi (mock_multi_config ()) in
104 let result =
105 Pool.with_connection_multi multi (fun conn ->
106 Alcotest.(check int) "in_use during work" 1 (Pool.multi_in_use multi);
107 conn.id + 100)
108 in
109 (match result with
110 | Ok n ->
111 Alcotest.(check bool) "got result" true (n >= 100);
112 Alcotest.(check int) "released after" 0 (Pool.multi_in_use multi)
113 | Error _ -> Alcotest.fail "Expected successful with_connection_multi");
114 Pool.shutdown_multi multi
115
116let test_with_connection_multi_exception () =
117 let multi = Pool.create_multi (mock_multi_config ()) in
118 (try
119 let _ =
120 Pool.with_connection_multi multi (fun _ ->
121 Alcotest.(check int) "in_use" 1 (Pool.multi_in_use multi);
122 failwith "boom")
123 in
124 ()
125 with Failure _ -> ());
126 Alcotest.(check int) "released after exception" 0 (Pool.multi_in_use multi);
127 Pool.shutdown_multi multi
128
129let test_shutdown_multi () =
130 let multi = Pool.create_multi (mock_multi_config ()) in
131 let conn =
132 match Pool.acquire_multi multi with
133 | Ok c -> c
134 | Error _ -> Alcotest.fail "Expected successful acquire"
135 in
136 Pool.release_multi multi conn;
137 Pool.shutdown_multi multi;
138 Alcotest.(check bool) "pool closed" true (Pool.multi_is_closed multi);
139 match Pool.acquire_multi multi with
140 | Error Pool.Pool_closed -> ()
141 | _ -> Alcotest.fail "Expected Pool_closed after shutdown"
142
143let test_stats_multi () =
144 let multi = Pool.create_multi (mock_multi_config ~max_size:5 ()) in
145 let conn1 =
146 match Pool.acquire_multi multi with
147 | Ok c -> c
148 | Error _ -> Alcotest.fail "Expected successful acquire"
149 in
150 let conn2 =
151 match Pool.acquire_multi multi with
152 | Ok c -> c
153 | Error _ -> Alcotest.fail "Expected successful acquire"
154 in
155 let stats = Pool.stats_multi multi in
156 Alcotest.(check int) "total servers" 3 stats.total_servers;
157 Alcotest.(check int) "healthy servers" 3 stats.healthy_servers;
158 Alcotest.(check int) "aggregate in_use" 2 stats.aggregate.in_use;
159 Alcotest.(check int) "aggregate total" 2 stats.aggregate.total;
160 Pool.release_multi multi conn1;
161 Pool.release_multi multi conn2;
162 Pool.shutdown_multi multi
163
164let test_single_server_multi () =
165 let multi = Pool.create_multi (mock_multi_config ~servers:[ "single" ] ()) in
166 let conn =
167 match Pool.acquire_multi multi with
168 | Ok c -> c
169 | Error _ -> Alcotest.fail "Expected successful acquire"
170 in
171 Alcotest.(check int) "single server idx" 0 conn.server_idx;
172 Pool.release_multi multi conn;
173 Pool.shutdown_multi multi
174
175let test_connection_reuse () =
176 let multi =
177 Pool.create_multi (mock_multi_config ~servers:[ "s1" ] ~max_size:2 ())
178 in
179 let conn1 =
180 match Pool.acquire_multi multi with
181 | Ok c -> c
182 | Error _ -> Alcotest.fail "Expected successful acquire"
183 in
184 let id1 = conn1.id in
185 Pool.release_multi multi conn1;
186 let conn2 =
187 match Pool.acquire_multi multi with
188 | Ok c -> c
189 | Error _ -> Alcotest.fail "Expected successful acquire"
190 in
191 Alcotest.(check int) "same connection reused" id1 conn2.id;
192 Pool.release_multi multi conn2;
193 Pool.shutdown_multi multi
194
195let test_max_size_per_server () =
196 let multi =
197 Pool.create_multi (mock_multi_config ~servers:[ "s1" ] ~max_size:2 ())
198 in
199 let c1 =
200 match Pool.acquire_multi multi with
201 | Ok c -> c
202 | Error _ -> Alcotest.fail "Expected 1st acquire"
203 in
204 let c2 =
205 match Pool.acquire_multi multi with
206 | Ok c -> c
207 | Error _ -> Alcotest.fail "Expected 2nd acquire"
208 in
209 (match Pool.acquire_multi multi with
210 | Error Pool.Pool_empty -> ()
211 | _ -> Alcotest.fail "Expected Pool_empty at capacity");
212 Pool.release_multi multi c1;
213 Pool.release_multi multi c2;
214 Pool.shutdown_multi multi
215
216let test_fallback_on_pool_exhaustion () =
217 let multi = Pool.create_multi (mock_multi_config ~max_size:1 ()) in
218 let conns = ref [] in
219 for _ = 1 to 3 do
220 match Pool.acquire_multi multi with
221 | Ok c -> conns := c :: !conns
222 | Error _ -> Alcotest.fail "Expected acquire"
223 done;
224 Alcotest.(check int) "3 connections acquired" 3 (List.length !conns);
225 let servers_used =
226 List.sort_uniq compare (List.map (fun c -> c.server_idx) !conns)
227 in
228 Alcotest.(check int) "all 3 servers used" 3 (List.length servers_used);
229 List.iter (Pool.release_multi multi) !conns;
230 Pool.shutdown_multi multi
231
232let tests =
233 [
234 ("create multi", `Quick, test_create_multi);
235 ("create multi empty servers", `Quick, test_create_multi_empty_servers);
236 ("round robin distribution", `Quick, test_round_robin_distribution);
237 ("health check skipping", `Quick, test_health_check_skipping);
238 ("all unhealthy", `Quick, test_all_unhealthy);
239 ("mark healthy recovery", `Quick, test_mark_healthy_recovery);
240 ("with_connection_multi", `Quick, test_with_connection_multi);
241 ( "with_connection_multi exception",
242 `Quick,
243 test_with_connection_multi_exception );
244 ("shutdown_multi", `Quick, test_shutdown_multi);
245 ("stats_multi", `Quick, test_stats_multi);
246 ("single server multi", `Quick, test_single_server_multi);
247 ("connection reuse", `Quick, test_connection_reuse);
248 ("max size per server", `Quick, test_max_size_per_server);
249 ("fallback on pool exhaustion", `Quick, test_fallback_on_pool_exhaustion);
250 ]