a database layer insipred by caqti and ecto
at main 8.9 kB view raw
1open Repodb 2 3type mock_conn = { id : int; server_idx : int; mutable closed : bool } 4 5let conn_counter = Atomic.make 0 6 7let find_index_in_list elem lst = 8 let rec aux i = function 9 | [] -> None 10 | x :: xs -> if x = elem then Some i else aux (i + 1) xs 11 in 12 aux 0 lst 13 14let mock_multi_config ?(servers = [ "s1"; "s2"; "s3" ]) ?(max_size = 2) 15 ?(fail_servers = []) () = 16 Pool. 17 { 18 servers; 19 max_size_per_server = max_size; 20 connect = 21 (fun conninfo -> 22 let server_idx = 23 match find_index_in_list conninfo servers with 24 | Some i -> i 25 | None -> -1 26 in 27 if List.mem server_idx fail_servers then 28 Error (Printf.sprintf "Failed to connect to %s" conninfo) 29 else 30 let id = Atomic.fetch_and_add conn_counter 1 in 31 Ok { id; server_idx; closed = false }); 32 close = (fun conn -> conn.closed <- true); 33 validate = None; 34 } 35 36let test_create_multi () = 37 let multi = Pool.create_multi (mock_multi_config ()) in 38 Alcotest.(check int) "initial size" 0 (Pool.multi_size multi); 39 Alcotest.(check int) "initial available" 0 (Pool.multi_available multi); 40 Alcotest.(check int) "initial in_use" 0 (Pool.multi_in_use multi); 41 Alcotest.(check bool) "not closed" false (Pool.multi_is_closed multi); 42 Alcotest.(check int) "server count" 3 (Pool.multi_server_count multi) 43 44let test_create_multi_empty_servers () = 45 try 46 let _ = Pool.create_multi (mock_multi_config ~servers:[] ()) in 47 Alcotest.fail "Expected Invalid_argument" 48 with Invalid_argument _ -> () 49 50let test_round_robin_distribution () = 51 let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in 52 let counts = [| 0; 0; 0 |] in 53 for _ = 1 to 9 do 54 match Pool.acquire_multi multi with 55 | Ok conn -> counts.(conn.server_idx) <- counts.(conn.server_idx) + 1 56 | Error _ -> Alcotest.fail "Unexpected acquire error" 57 done; 58 Alcotest.(check int) "server 0 count" 3 counts.(0); 59 Alcotest.(check int) "server 1 count" 3 counts.(1); 60 Alcotest.(check int) "server 2 count" 3 counts.(2); 61 Pool.shutdown_multi multi 62 63let test_health_check_skipping () = 64 let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in 65 Pool.mark_unhealthy multi 1; 66 let counts = [| 0; 0; 0 |] in 67 for _ = 1 to 6 do 68 match Pool.acquire_multi multi with 69 | Ok conn -> counts.(conn.server_idx) <- counts.(conn.server_idx) + 1 70 | Error _ -> Alcotest.fail "Unexpected acquire error" 71 done; 72 Alcotest.(check int) "server 1 should be skipped" 0 counts.(1); 73 Alcotest.(check bool) 74 "server 0 or 2 got connections" true 75 (counts.(0) > 0 || counts.(2) > 0); 76 Pool.shutdown_multi multi 77 78let test_all_unhealthy () = 79 let multi = Pool.create_multi (mock_multi_config ()) in 80 Pool.mark_unhealthy multi 0; 81 Pool.mark_unhealthy multi 1; 82 Pool.mark_unhealthy multi 2; 83 (match Pool.acquire_multi multi with 84 | Error Pool.Pool_empty -> () 85 | _ -> Alcotest.fail "Expected Pool_empty when all servers unhealthy"); 86 Pool.shutdown_multi multi 87 88let test_mark_healthy_recovery () = 89 let multi = Pool.create_multi (mock_multi_config ~max_size:10 ()) in 90 Pool.mark_unhealthy multi 0; 91 Pool.mark_unhealthy multi 1; 92 Pool.mark_unhealthy multi 2; 93 (match Pool.acquire_multi multi with 94 | Error Pool.Pool_empty -> () 95 | _ -> Alcotest.fail "Expected Pool_empty"); 96 Pool.mark_healthy multi 1; 97 (match Pool.acquire_multi multi with 98 | Ok conn -> Alcotest.(check int) "should use server 1" 1 conn.server_idx 99 | Error _ -> Alcotest.fail "Expected successful acquire after marking healthy"); 100 Pool.shutdown_multi multi 101 102let test_with_connection_multi () = 103 let multi = Pool.create_multi (mock_multi_config ()) in 104 let result = 105 Pool.with_connection_multi multi (fun conn -> 106 Alcotest.(check int) "in_use during work" 1 (Pool.multi_in_use multi); 107 conn.id + 100) 108 in 109 (match result with 110 | Ok n -> 111 Alcotest.(check bool) "got result" true (n >= 100); 112 Alcotest.(check int) "released after" 0 (Pool.multi_in_use multi) 113 | Error _ -> Alcotest.fail "Expected successful with_connection_multi"); 114 Pool.shutdown_multi multi 115 116let test_with_connection_multi_exception () = 117 let multi = Pool.create_multi (mock_multi_config ()) in 118 (try 119 let _ = 120 Pool.with_connection_multi multi (fun _ -> 121 Alcotest.(check int) "in_use" 1 (Pool.multi_in_use multi); 122 failwith "boom") 123 in 124 () 125 with Failure _ -> ()); 126 Alcotest.(check int) "released after exception" 0 (Pool.multi_in_use multi); 127 Pool.shutdown_multi multi 128 129let test_shutdown_multi () = 130 let multi = Pool.create_multi (mock_multi_config ()) in 131 let conn = 132 match Pool.acquire_multi multi with 133 | Ok c -> c 134 | Error _ -> Alcotest.fail "Expected successful acquire" 135 in 136 Pool.release_multi multi conn; 137 Pool.shutdown_multi multi; 138 Alcotest.(check bool) "pool closed" true (Pool.multi_is_closed multi); 139 match Pool.acquire_multi multi with 140 | Error Pool.Pool_closed -> () 141 | _ -> Alcotest.fail "Expected Pool_closed after shutdown" 142 143let test_stats_multi () = 144 let multi = Pool.create_multi (mock_multi_config ~max_size:5 ()) in 145 let conn1 = 146 match Pool.acquire_multi multi with 147 | Ok c -> c 148 | Error _ -> Alcotest.fail "Expected successful acquire" 149 in 150 let conn2 = 151 match Pool.acquire_multi multi with 152 | Ok c -> c 153 | Error _ -> Alcotest.fail "Expected successful acquire" 154 in 155 let stats = Pool.stats_multi multi in 156 Alcotest.(check int) "total servers" 3 stats.total_servers; 157 Alcotest.(check int) "healthy servers" 3 stats.healthy_servers; 158 Alcotest.(check int) "aggregate in_use" 2 stats.aggregate.in_use; 159 Alcotest.(check int) "aggregate total" 2 stats.aggregate.total; 160 Pool.release_multi multi conn1; 161 Pool.release_multi multi conn2; 162 Pool.shutdown_multi multi 163 164let test_single_server_multi () = 165 let multi = Pool.create_multi (mock_multi_config ~servers:[ "single" ] ()) in 166 let conn = 167 match Pool.acquire_multi multi with 168 | Ok c -> c 169 | Error _ -> Alcotest.fail "Expected successful acquire" 170 in 171 Alcotest.(check int) "single server idx" 0 conn.server_idx; 172 Pool.release_multi multi conn; 173 Pool.shutdown_multi multi 174 175let test_connection_reuse () = 176 let multi = 177 Pool.create_multi (mock_multi_config ~servers:[ "s1" ] ~max_size:2 ()) 178 in 179 let conn1 = 180 match Pool.acquire_multi multi with 181 | Ok c -> c 182 | Error _ -> Alcotest.fail "Expected successful acquire" 183 in 184 let id1 = conn1.id in 185 Pool.release_multi multi conn1; 186 let conn2 = 187 match Pool.acquire_multi multi with 188 | Ok c -> c 189 | Error _ -> Alcotest.fail "Expected successful acquire" 190 in 191 Alcotest.(check int) "same connection reused" id1 conn2.id; 192 Pool.release_multi multi conn2; 193 Pool.shutdown_multi multi 194 195let test_max_size_per_server () = 196 let multi = 197 Pool.create_multi (mock_multi_config ~servers:[ "s1" ] ~max_size:2 ()) 198 in 199 let c1 = 200 match Pool.acquire_multi multi with 201 | Ok c -> c 202 | Error _ -> Alcotest.fail "Expected 1st acquire" 203 in 204 let c2 = 205 match Pool.acquire_multi multi with 206 | Ok c -> c 207 | Error _ -> Alcotest.fail "Expected 2nd acquire" 208 in 209 (match Pool.acquire_multi multi with 210 | Error Pool.Pool_empty -> () 211 | _ -> Alcotest.fail "Expected Pool_empty at capacity"); 212 Pool.release_multi multi c1; 213 Pool.release_multi multi c2; 214 Pool.shutdown_multi multi 215 216let test_fallback_on_pool_exhaustion () = 217 let multi = Pool.create_multi (mock_multi_config ~max_size:1 ()) in 218 let conns = ref [] in 219 for _ = 1 to 3 do 220 match Pool.acquire_multi multi with 221 | Ok c -> conns := c :: !conns 222 | Error _ -> Alcotest.fail "Expected acquire" 223 done; 224 Alcotest.(check int) "3 connections acquired" 3 (List.length !conns); 225 let servers_used = 226 List.sort_uniq compare (List.map (fun c -> c.server_idx) !conns) 227 in 228 Alcotest.(check int) "all 3 servers used" 3 (List.length servers_used); 229 List.iter (Pool.release_multi multi) !conns; 230 Pool.shutdown_multi multi 231 232let tests = 233 [ 234 ("create multi", `Quick, test_create_multi); 235 ("create multi empty servers", `Quick, test_create_multi_empty_servers); 236 ("round robin distribution", `Quick, test_round_robin_distribution); 237 ("health check skipping", `Quick, test_health_check_skipping); 238 ("all unhealthy", `Quick, test_all_unhealthy); 239 ("mark healthy recovery", `Quick, test_mark_healthy_recovery); 240 ("with_connection_multi", `Quick, test_with_connection_multi); 241 ( "with_connection_multi exception", 242 `Quick, 243 test_with_connection_multi_exception ); 244 ("shutdown_multi", `Quick, test_shutdown_multi); 245 ("stats_multi", `Quick, test_stats_multi); 246 ("single server multi", `Quick, test_single_server_multi); 247 ("connection reuse", `Quick, test_connection_reuse); 248 ("max size per server", `Quick, test_max_size_per_server); 249 ("fallback on pool exhaustion", `Quick, test_fallback_on_pool_exhaustion); 250 ]