+256
driver-postgresql/test_integration.ml
+256
driver-postgresql/test_integration.ml
···
3177
3177
("update_returning", `Quick, test_query_update_returning);
3178
3178
]
3179
3179
3180
+
module Pool = Repodb.Pool
3181
+
3182
+
let pool_config () =
3183
+
Pool.
3184
+
{
3185
+
max_size = 3;
3186
+
connect =
3187
+
(fun () ->
3188
+
Repodb_postgresql.connect conninfo
3189
+
|> Result.map_error Repodb_postgresql.error_message);
3190
+
close = Repodb_postgresql.close;
3191
+
validate =
3192
+
Some
3193
+
(fun conn ->
3194
+
match Repodb_postgresql.exec conn "SELECT 1" ~params:[||] with
3195
+
| Ok () -> true
3196
+
| Error _ -> false);
3197
+
}
3198
+
3199
+
let with_pool f () =
3200
+
let pool = Pool.create (pool_config ()) in
3201
+
(match Pool.acquire pool with
3202
+
| Ok conn ->
3203
+
let _ =
3204
+
Repodb_postgresql.exec conn "DROP TABLE IF EXISTS pool_test" ~params:[||]
3205
+
in
3206
+
let _ =
3207
+
Repodb_postgresql.exec conn "DROP TABLE IF EXISTS counter" ~params:[||]
3208
+
in
3209
+
Pool.release pool conn
3210
+
| Error _ -> ());
3211
+
Fun.protect
3212
+
~finally:(fun () ->
3213
+
(match Pool.acquire pool with
3214
+
| Ok conn ->
3215
+
let _ =
3216
+
Repodb_postgresql.exec conn "DROP TABLE IF EXISTS pool_test"
3217
+
~params:[||]
3218
+
in
3219
+
let _ =
3220
+
Repodb_postgresql.exec conn "DROP TABLE IF EXISTS counter"
3221
+
~params:[||]
3222
+
in
3223
+
Pool.release pool conn
3224
+
| Error _ -> ());
3225
+
Pool.shutdown pool)
3226
+
(fun () -> f pool)
3227
+
3228
+
let test_pool_acquire_release =
3229
+
with_pool (fun pool ->
3230
+
match Pool.acquire pool with
3231
+
| Error e -> Alcotest.fail (Pool.error_to_string e)
3232
+
| Ok conn ->
3233
+
(match Repodb_postgresql.exec conn "SELECT 1" ~params:[||] with
3234
+
| Ok () -> ()
3235
+
| Error e -> Alcotest.fail (Repodb_postgresql.error_message e));
3236
+
Pool.release pool conn;
3237
+
Alcotest.(check int) "in_use after release" 0 (Pool.in_use pool))
3238
+
3239
+
let test_pool_with_connection =
3240
+
with_pool (fun pool ->
3241
+
match Pool.acquire pool with
3242
+
| Error e -> Alcotest.fail (Pool.error_to_string e)
3243
+
| Ok conn -> (
3244
+
let _ =
3245
+
Repodb_postgresql.exec conn
3246
+
"CREATE TABLE pool_test (id SERIAL PRIMARY KEY, value TEXT)"
3247
+
~params:[||]
3248
+
in
3249
+
Pool.release pool conn;
3250
+
3251
+
let result =
3252
+
Pool.with_connection pool (fun conn ->
3253
+
match
3254
+
Repodb_postgresql.exec conn
3255
+
"INSERT INTO pool_test (value) VALUES ($1)"
3256
+
~params:[| Repodb.Driver.Value.text "test" |]
3257
+
with
3258
+
| Ok () -> "inserted"
3259
+
| Error e -> Repodb_postgresql.error_message e)
3260
+
in
3261
+
match result with
3262
+
| Ok "inserted" -> ()
3263
+
| Ok msg -> Alcotest.fail msg
3264
+
| Error e -> Alcotest.fail (Pool.error_to_string e)))
3265
+
3266
+
let test_pool_max_size =
3267
+
with_pool (fun pool ->
3268
+
let c1 =
3269
+
match Pool.acquire pool with
3270
+
| Ok c -> c
3271
+
| Error e -> failwith (Pool.error_to_string e)
3272
+
in
3273
+
let c2 =
3274
+
match Pool.acquire pool with
3275
+
| Ok c -> c
3276
+
| Error e -> failwith (Pool.error_to_string e)
3277
+
in
3278
+
let c3 =
3279
+
match Pool.acquire pool with
3280
+
| Ok c -> c
3281
+
| Error e -> failwith (Pool.error_to_string e)
3282
+
in
3283
+
Alcotest.(check int) "3 in use" 3 (Pool.in_use pool);
3284
+
(match Pool.acquire pool with
3285
+
| Error Pool.Pool_empty -> ()
3286
+
| _ -> Alcotest.fail "Expected Pool_empty");
3287
+
Pool.release pool c1;
3288
+
Pool.release pool c2;
3289
+
Pool.release pool c3)
3290
+
3291
+
let test_pool_reuse =
3292
+
with_pool (fun pool ->
3293
+
let c1 =
3294
+
match Pool.acquire pool with
3295
+
| Ok c -> c
3296
+
| Error e -> failwith (Pool.error_to_string e)
3297
+
in
3298
+
Pool.release pool c1;
3299
+
Alcotest.(check int) "1 available" 1 (Pool.available pool);
3300
+
let _c2 =
3301
+
match Pool.acquire pool with
3302
+
| Ok c -> c
3303
+
| Error e -> failwith (Pool.error_to_string e)
3304
+
in
3305
+
Alcotest.(check int) "still 1 total" 1 (Pool.size pool))
3306
+
3307
+
let test_pool_validation () =
3308
+
let validation_count = Atomic.make 0 in
3309
+
let config =
3310
+
Pool.
3311
+
{
3312
+
max_size = 3;
3313
+
connect =
3314
+
(fun () ->
3315
+
Repodb_postgresql.connect conninfo
3316
+
|> Result.map_error Repodb_postgresql.error_message);
3317
+
close = Repodb_postgresql.close;
3318
+
validate =
3319
+
Some
3320
+
(fun conn ->
3321
+
Atomic.incr validation_count;
3322
+
match Repodb_postgresql.exec conn "SELECT 1" ~params:[||] with
3323
+
| Ok () -> true
3324
+
| Error _ -> false);
3325
+
}
3326
+
in
3327
+
let pool = Pool.create config in
3328
+
let c1 =
3329
+
match Pool.acquire pool with
3330
+
| Ok c -> c
3331
+
| Error e -> failwith (Pool.error_to_string e)
3332
+
in
3333
+
Pool.release pool c1;
3334
+
let _c2 =
3335
+
match Pool.acquire pool with
3336
+
| Ok c -> c
3337
+
| Error e -> failwith (Pool.error_to_string e)
3338
+
in
3339
+
Alcotest.(check bool)
3340
+
"validation was called" true
3341
+
(Atomic.get validation_count > 0);
3342
+
Pool.shutdown pool
3343
+
3344
+
let test_pool_concurrent_domains () =
3345
+
let config =
3346
+
Pool.
3347
+
{
3348
+
max_size = 8;
3349
+
connect =
3350
+
(fun () ->
3351
+
Repodb_postgresql.connect conninfo
3352
+
|> Result.map_error Repodb_postgresql.error_message);
3353
+
close = Repodb_postgresql.close;
3354
+
validate = None;
3355
+
}
3356
+
in
3357
+
let pool = Pool.create config in
3358
+
3359
+
(match Pool.acquire pool with
3360
+
| Error e -> failwith (Pool.error_to_string e)
3361
+
| Ok conn ->
3362
+
let _ =
3363
+
Repodb_postgresql.exec conn "DROP TABLE IF EXISTS counter" ~params:[||]
3364
+
in
3365
+
let _ =
3366
+
Repodb_postgresql.exec conn
3367
+
"CREATE TABLE counter (id INTEGER PRIMARY KEY, value INTEGER)"
3368
+
~params:[||]
3369
+
in
3370
+
let _ =
3371
+
Repodb_postgresql.exec conn
3372
+
"INSERT INTO counter (id, value) VALUES (1, 0)" ~params:[||]
3373
+
in
3374
+
Pool.release pool conn);
3375
+
3376
+
let n_domains = 4 in
3377
+
let n_ops = 25 in
3378
+
let completed = Atomic.make 0 in
3379
+
3380
+
let domain_work () =
3381
+
for _ = 1 to n_ops do
3382
+
match Pool.acquire pool with
3383
+
| Ok conn ->
3384
+
let _ =
3385
+
Repodb_postgresql.exec conn
3386
+
"UPDATE counter SET value = value + 1 WHERE id = 1" ~params:[||]
3387
+
in
3388
+
Pool.release pool conn
3389
+
| Error e -> failwith (Pool.error_to_string e)
3390
+
done;
3391
+
Atomic.incr completed
3392
+
in
3393
+
3394
+
let domains = List.init n_domains (fun _ -> Domain.spawn domain_work) in
3395
+
List.iter Domain.join domains;
3396
+
3397
+
Alcotest.(check int) "all domains completed" n_domains (Atomic.get completed);
3398
+
3399
+
let final =
3400
+
match Pool.acquire pool with
3401
+
| Ok conn ->
3402
+
(match
3403
+
Repodb_postgresql.query conn "SELECT value FROM counter WHERE id = 1"
3404
+
~params:[||]
3405
+
with
3406
+
| Ok [ row ] -> Repodb.Driver.row_int row 0
3407
+
| _ -> failwith "Query failed")
3408
+
|> fun v ->
3409
+
Pool.release pool conn;
3410
+
v
3411
+
| Error e -> failwith (Pool.error_to_string e)
3412
+
in
3413
+
3414
+
Alcotest.(check int) "all increments" (n_domains * n_ops) final;
3415
+
3416
+
(match Pool.acquire pool with
3417
+
| Ok conn ->
3418
+
let _ =
3419
+
Repodb_postgresql.exec conn "DROP TABLE IF EXISTS counter" ~params:[||]
3420
+
in
3421
+
Pool.release pool conn
3422
+
| Error _ -> ());
3423
+
Pool.shutdown pool
3424
+
3425
+
let pool_tests =
3426
+
[
3427
+
("acquire_release", `Quick, test_pool_acquire_release);
3428
+
("with_connection", `Quick, test_pool_with_connection);
3429
+
("max_size", `Quick, test_pool_max_size);
3430
+
("reuse", `Quick, test_pool_reuse);
3431
+
("validation", `Quick, test_pool_validation);
3432
+
("concurrent_domains", `Slow, test_pool_concurrent_domains);
3433
+
]
3434
+
3180
3435
let () =
3181
3436
Alcotest.run "repodb-postgresql"
3182
3437
[
···
3191
3446
("Preload", preload_tests);
3192
3447
("Multi", multi_tests);
3193
3448
("Query-Repo", query_repo_tests);
3449
+
("Pool", pool_tests);
3194
3450
]
+252
driver-sqlite/test_integration.ml
+252
driver-sqlite/test_integration.ml
···
2876
2876
("complex", `Quick, test_query_complex);
2877
2877
]
2878
2878
2879
+
module Pool = Repodb.Pool
2880
+
2881
+
let pool_db_path = "/tmp/repodb_pool_sqlite_test.db"
2882
+
2883
+
let pool_config () =
2884
+
Pool.
2885
+
{
2886
+
max_size = 3;
2887
+
connect =
2888
+
(fun () ->
2889
+
match Repodb_sqlite.connect pool_db_path with
2890
+
| Error e -> Error (Repodb_sqlite.error_message e)
2891
+
| Ok conn ->
2892
+
let _ =
2893
+
Repodb_sqlite.exec conn "PRAGMA journal_mode=WAL" ~params:[||]
2894
+
in
2895
+
let _ =
2896
+
Repodb_sqlite.exec conn "PRAGMA busy_timeout=5000" ~params:[||]
2897
+
in
2898
+
Ok conn);
2899
+
close = Repodb_sqlite.close;
2900
+
validate =
2901
+
Some
2902
+
(fun conn ->
2903
+
match Repodb_sqlite.exec conn "SELECT 1" ~params:[||] with
2904
+
| Ok () -> true
2905
+
| Error _ -> false);
2906
+
}
2907
+
2908
+
let with_pool f () =
2909
+
(try Sys.remove pool_db_path with Sys_error _ -> ());
2910
+
let pool = Pool.create (pool_config ()) in
2911
+
Fun.protect
2912
+
~finally:(fun () ->
2913
+
Pool.shutdown pool;
2914
+
try Sys.remove pool_db_path with Sys_error _ -> ())
2915
+
(fun () -> f pool)
2916
+
2917
+
let test_pool_acquire_release =
2918
+
with_pool (fun pool ->
2919
+
match Pool.acquire pool with
2920
+
| Error e -> Alcotest.fail (Pool.error_to_string e)
2921
+
| Ok conn ->
2922
+
(match Repodb_sqlite.exec conn "SELECT 1" ~params:[||] with
2923
+
| Ok () -> ()
2924
+
| Error e -> Alcotest.fail (Repodb_sqlite.error_message e));
2925
+
Pool.release pool conn;
2926
+
Alcotest.(check int) "in_use after release" 0 (Pool.in_use pool))
2927
+
2928
+
let test_pool_with_connection =
2929
+
with_pool (fun pool ->
2930
+
match Pool.acquire pool with
2931
+
| Error e -> Alcotest.fail (Pool.error_to_string e)
2932
+
| Ok conn -> (
2933
+
let _ =
2934
+
Repodb_sqlite.exec conn
2935
+
"CREATE TABLE pool_test (id INTEGER PRIMARY KEY, value TEXT)"
2936
+
~params:[||]
2937
+
in
2938
+
Pool.release pool conn;
2939
+
2940
+
let result =
2941
+
Pool.with_connection pool (fun conn ->
2942
+
match
2943
+
Repodb_sqlite.exec conn
2944
+
"INSERT INTO pool_test (value) VALUES (?)"
2945
+
~params:[| Repodb.Driver.Value.text "test" |]
2946
+
with
2947
+
| Ok () -> "inserted"
2948
+
| Error e -> Repodb_sqlite.error_message e)
2949
+
in
2950
+
match result with
2951
+
| Ok "inserted" -> ()
2952
+
| Ok msg -> Alcotest.fail msg
2953
+
| Error e -> Alcotest.fail (Pool.error_to_string e)))
2954
+
2955
+
let test_pool_max_size =
2956
+
with_pool (fun pool ->
2957
+
let c1 =
2958
+
match Pool.acquire pool with
2959
+
| Ok c -> c
2960
+
| Error e -> failwith (Pool.error_to_string e)
2961
+
in
2962
+
let c2 =
2963
+
match Pool.acquire pool with
2964
+
| Ok c -> c
2965
+
| Error e -> failwith (Pool.error_to_string e)
2966
+
in
2967
+
let c3 =
2968
+
match Pool.acquire pool with
2969
+
| Ok c -> c
2970
+
| Error e -> failwith (Pool.error_to_string e)
2971
+
in
2972
+
Alcotest.(check int) "3 in use" 3 (Pool.in_use pool);
2973
+
(match Pool.acquire pool with
2974
+
| Error Pool.Pool_empty -> ()
2975
+
| _ -> Alcotest.fail "Expected Pool_empty");
2976
+
Pool.release pool c1;
2977
+
Pool.release pool c2;
2978
+
Pool.release pool c3)
2979
+
2980
+
let test_pool_reuse =
2981
+
with_pool (fun pool ->
2982
+
let c1 =
2983
+
match Pool.acquire pool with
2984
+
| Ok c -> c
2985
+
| Error e -> failwith (Pool.error_to_string e)
2986
+
in
2987
+
Pool.release pool c1;
2988
+
Alcotest.(check int) "1 available" 1 (Pool.available pool);
2989
+
let _c2 =
2990
+
match Pool.acquire pool with
2991
+
| Ok c -> c
2992
+
| Error e -> failwith (Pool.error_to_string e)
2993
+
in
2994
+
Alcotest.(check int) "still 1 total" 1 (Pool.size pool))
2995
+
2996
+
let test_pool_validation () =
2997
+
(try Sys.remove pool_db_path with Sys_error _ -> ());
2998
+
let validation_count = Atomic.make 0 in
2999
+
let config =
3000
+
Pool.
3001
+
{
3002
+
max_size = 3;
3003
+
connect =
3004
+
(fun () ->
3005
+
match Repodb_sqlite.connect pool_db_path with
3006
+
| Error e -> Error (Repodb_sqlite.error_message e)
3007
+
| Ok conn ->
3008
+
let _ =
3009
+
Repodb_sqlite.exec conn "PRAGMA journal_mode=WAL" ~params:[||]
3010
+
in
3011
+
Ok conn);
3012
+
close = Repodb_sqlite.close;
3013
+
validate =
3014
+
Some
3015
+
(fun conn ->
3016
+
Atomic.incr validation_count;
3017
+
match Repodb_sqlite.exec conn "SELECT 1" ~params:[||] with
3018
+
| Ok () -> true
3019
+
| Error _ -> false);
3020
+
}
3021
+
in
3022
+
let pool = Pool.create config in
3023
+
let c1 =
3024
+
match Pool.acquire pool with
3025
+
| Ok c -> c
3026
+
| Error e -> failwith (Pool.error_to_string e)
3027
+
in
3028
+
Pool.release pool c1;
3029
+
let _c2 =
3030
+
match Pool.acquire pool with
3031
+
| Ok c -> c
3032
+
| Error e -> failwith (Pool.error_to_string e)
3033
+
in
3034
+
Alcotest.(check bool)
3035
+
"validation was called" true
3036
+
(Atomic.get validation_count > 0);
3037
+
Pool.shutdown pool;
3038
+
try Sys.remove pool_db_path with Sys_error _ -> ()
3039
+
3040
+
let test_pool_concurrent_domains () =
3041
+
(try Sys.remove pool_db_path with Sys_error _ -> ());
3042
+
let config =
3043
+
Pool.
3044
+
{
3045
+
max_size = 8;
3046
+
connect =
3047
+
(fun () ->
3048
+
match Repodb_sqlite.connect pool_db_path with
3049
+
| Error e -> Error (Repodb_sqlite.error_message e)
3050
+
| Ok conn ->
3051
+
let _ =
3052
+
Repodb_sqlite.exec conn "PRAGMA journal_mode=WAL" ~params:[||]
3053
+
in
3054
+
let _ =
3055
+
Repodb_sqlite.exec conn "PRAGMA busy_timeout=5000" ~params:[||]
3056
+
in
3057
+
Ok conn);
3058
+
close = Repodb_sqlite.close;
3059
+
validate = None;
3060
+
}
3061
+
in
3062
+
let pool = Pool.create config in
3063
+
3064
+
(match Pool.acquire pool with
3065
+
| Error e -> failwith (Pool.error_to_string e)
3066
+
| Ok conn ->
3067
+
let _ =
3068
+
Repodb_sqlite.exec conn
3069
+
"CREATE TABLE counter (id INTEGER PRIMARY KEY, value INTEGER)"
3070
+
~params:[||]
3071
+
in
3072
+
let _ =
3073
+
Repodb_sqlite.exec conn "INSERT INTO counter (id, value) VALUES (1, 0)"
3074
+
~params:[||]
3075
+
in
3076
+
Pool.release pool conn);
3077
+
3078
+
let n_domains = 4 in
3079
+
let n_ops = 25 in
3080
+
let completed = Atomic.make 0 in
3081
+
3082
+
let domain_work () =
3083
+
for _ = 1 to n_ops do
3084
+
match Pool.acquire pool with
3085
+
| Ok conn ->
3086
+
let _ =
3087
+
Repodb_sqlite.exec conn
3088
+
"UPDATE counter SET value = value + 1 WHERE id = 1" ~params:[||]
3089
+
in
3090
+
Pool.release pool conn
3091
+
| Error e -> failwith (Pool.error_to_string e)
3092
+
done;
3093
+
Atomic.incr completed
3094
+
in
3095
+
3096
+
let domains = List.init n_domains (fun _ -> Domain.spawn domain_work) in
3097
+
List.iter Domain.join domains;
3098
+
3099
+
Alcotest.(check int) "all domains completed" n_domains (Atomic.get completed);
3100
+
3101
+
let final =
3102
+
match Pool.acquire pool with
3103
+
| Ok conn ->
3104
+
(match
3105
+
Repodb_sqlite.query conn "SELECT value FROM counter WHERE id = 1"
3106
+
~params:[||]
3107
+
with
3108
+
| Ok [ row ] -> Repodb.Driver.row_int row 0
3109
+
| _ -> failwith "Query failed")
3110
+
|> fun v ->
3111
+
Pool.release pool conn;
3112
+
v
3113
+
| Error e -> failwith (Pool.error_to_string e)
3114
+
in
3115
+
3116
+
Alcotest.(check int) "all increments" (n_domains * n_ops) final;
3117
+
Pool.shutdown pool;
3118
+
try Sys.remove pool_db_path with Sys_error _ -> ()
3119
+
3120
+
let pool_tests =
3121
+
[
3122
+
("acquire_release", `Quick, test_pool_acquire_release);
3123
+
("with_connection", `Quick, test_pool_with_connection);
3124
+
("max_size", `Quick, test_pool_max_size);
3125
+
("reuse", `Quick, test_pool_reuse);
3126
+
("validation", `Quick, test_pool_validation);
3127
+
("concurrent_domains", `Quick, test_pool_concurrent_domains);
3128
+
]
3129
+
2879
3130
let () =
2880
3131
Alcotest.run "repodb-sqlite"
2881
3132
[
···
2889
3140
("Preload", preload_tests);
2890
3141
("Multi", multi_tests);
2891
3142
("Query-Repo", query_repo_tests);
3143
+
("Pool", pool_tests);
2892
3144
]
+6
-1
dune-project
+6
-1
dune-project
···
28
28
(ptime (>= 1.1))
29
29
(re (>= 1.11))
30
30
(uuidm (>= 0.9))
31
+
(kcas (>= 0.7))
32
+
(kcas_data (>= 0.7))
31
33
(alcotest (and (>= 1.7) :with-test))
32
-
(yojson (and (>= 2.0) :with-test)))
34
+
(yojson (and (>= 2.0) :with-test))
35
+
(lwt (and (>= 5.6) :with-test))
36
+
(eio (and (>= 1.0) :with-test))
37
+
(eio_main (and (>= 1.0) :with-test)))
33
38
(tags
34
39
(database postgresql sqlite ecto orm query-builder migrations)))
35
40
+3
-2
lib/dune
+3
-2
lib/dune
+239
lib/pool.ml
+239
lib/pool.ml
···
1
+
(** Lock-free connection pool using kcas for thread-safe operations.
2
+
3
+
Works with any concurrency library (Eio, Lwt, direct-style) because kcas
4
+
provides lock-free data structures independent of any specific scheduler.
5
+
6
+
{[
7
+
let config =
8
+
Pool.
9
+
{
10
+
max_size = 10;
11
+
connect = (fun () -> Repodb_postgresql.connect conninfo);
12
+
close = Repodb_postgresql.close;
13
+
validate = None;
14
+
}
15
+
in
16
+
let pool = Pool.create config in
17
+
Pool.with_connection pool (fun conn -> Repo.all conn ~table:users ~decode)
18
+
]} *)
19
+
20
+
open Kcas
21
+
open Kcas_data
22
+
23
+
type pool_error =
24
+
| Pool_empty
25
+
| Pool_closed
26
+
| Pool_timeout
27
+
| Connection_error of string
28
+
29
+
type 'conn config = {
30
+
max_size : int;
31
+
connect : unit -> ('conn, string) result;
32
+
close : 'conn -> unit;
33
+
validate : ('conn -> bool) option;
34
+
}
35
+
36
+
type 'conn pooled = { conn : 'conn; created_at : float }
37
+
38
+
type 'conn t = {
39
+
config : 'conn config;
40
+
available : 'conn pooled Queue.t;
41
+
total_count : int Loc.t;
42
+
in_use_count : int Loc.t;
43
+
closed : bool Loc.t;
44
+
}
45
+
46
+
let create config =
47
+
{
48
+
config;
49
+
available = Queue.create ();
50
+
total_count = Loc.make 0;
51
+
in_use_count = Loc.make 0;
52
+
closed = Loc.make false;
53
+
}
54
+
55
+
let size t = Loc.get t.total_count
56
+
let in_use t = Loc.get t.in_use_count
57
+
let available t = Queue.length t.available
58
+
let is_closed t = Loc.get t.closed
59
+
60
+
let validate_connection t pooled =
61
+
match t.config.validate with
62
+
| None -> true
63
+
| Some validate -> validate pooled.conn
64
+
65
+
let close_pooled t pooled =
66
+
t.config.close pooled.conn;
67
+
Loc.decr t.total_count
68
+
69
+
let try_create_connection t =
70
+
let tx ~xt =
71
+
if Xt.get ~xt t.closed then `Closed
72
+
else
73
+
let total = Xt.get ~xt t.total_count in
74
+
if total >= t.config.max_size then `At_capacity
75
+
else begin
76
+
Xt.set ~xt t.total_count (total + 1);
77
+
Xt.set ~xt t.in_use_count (Xt.get ~xt t.in_use_count + 1);
78
+
`Can_create
79
+
end
80
+
in
81
+
match Xt.commit { tx } with
82
+
| `Closed -> Error Pool_closed
83
+
| `At_capacity -> Error Pool_empty
84
+
| `Can_create -> (
85
+
match t.config.connect () with
86
+
| Ok conn ->
87
+
let pooled = { conn; created_at = Unix.gettimeofday () } in
88
+
Ok pooled
89
+
| Error msg ->
90
+
Loc.decr t.total_count;
91
+
Loc.decr t.in_use_count;
92
+
Error (Connection_error msg))
93
+
94
+
let rec acquire_from_pool t =
95
+
if Loc.get t.closed then Error Pool_closed
96
+
else
97
+
match Queue.take_opt t.available with
98
+
| None -> Error Pool_empty
99
+
| Some pooled ->
100
+
if validate_connection t pooled then begin
101
+
Loc.incr t.in_use_count;
102
+
Ok pooled.conn
103
+
end
104
+
else begin
105
+
close_pooled t pooled;
106
+
acquire_from_pool t
107
+
end
108
+
109
+
let acquire t =
110
+
match acquire_from_pool t with
111
+
| Ok conn -> Ok conn
112
+
| Error Pool_empty -> (
113
+
match try_create_connection t with
114
+
| Ok pooled -> Ok pooled.conn
115
+
| Error e -> Error e)
116
+
| Error e -> Error e
117
+
118
+
let rec acquire_blocking ?timeoutf t =
119
+
if Loc.get t.closed then Error Pool_closed
120
+
else
121
+
match acquire t with
122
+
| Ok conn -> Ok conn
123
+
| Error Pool_empty -> (
124
+
match timeoutf with
125
+
| Some timeout -> (
126
+
match Queue.take_blocking ~timeoutf:timeout t.available with
127
+
| exception Kcas.Timeout.Timeout -> Error Pool_timeout
128
+
| pooled ->
129
+
if validate_connection t pooled then begin
130
+
Loc.incr t.in_use_count;
131
+
Ok pooled.conn
132
+
end
133
+
else begin
134
+
close_pooled t pooled;
135
+
acquire_blocking ?timeoutf t
136
+
end)
137
+
| None ->
138
+
let pooled = Queue.take_blocking t.available in
139
+
if validate_connection t pooled then begin
140
+
Loc.incr t.in_use_count;
141
+
Ok pooled.conn
142
+
end
143
+
else begin
144
+
close_pooled t pooled;
145
+
acquire_blocking ?timeoutf t
146
+
end)
147
+
| Error e -> Error e
148
+
149
+
let release t conn =
150
+
if Loc.get t.closed then t.config.close conn
151
+
else begin
152
+
Loc.decr t.in_use_count;
153
+
let pooled = { conn; created_at = Unix.gettimeofday () } in
154
+
Queue.add pooled t.available
155
+
end
156
+
157
+
let with_connection t f =
158
+
match acquire t with
159
+
| Error e -> Error e
160
+
| Ok conn -> (
161
+
match f conn with
162
+
| result ->
163
+
release t conn;
164
+
Ok result
165
+
| exception exn ->
166
+
release t conn;
167
+
raise exn)
168
+
169
+
let with_connection_blocking ?timeoutf t f =
170
+
match acquire_blocking ?timeoutf t with
171
+
| Error e -> Error e
172
+
| Ok conn -> (
173
+
match f conn with
174
+
| result ->
175
+
release t conn;
176
+
Ok result
177
+
| exception exn ->
178
+
release t conn;
179
+
raise exn)
180
+
181
+
let drain t =
182
+
let rec loop () =
183
+
match Queue.take_opt t.available with
184
+
| None -> ()
185
+
| Some pooled ->
186
+
close_pooled t pooled;
187
+
loop ()
188
+
in
189
+
loop ()
190
+
191
+
let shutdown t =
192
+
Loc.set t.closed true;
193
+
drain t
194
+
195
+
type stats = { total : int; available : int; in_use : int; closed : bool }
196
+
197
+
let stats t =
198
+
{
199
+
total = size t;
200
+
available = available t;
201
+
in_use = in_use t;
202
+
closed = is_closed t;
203
+
}
204
+
205
+
let error_to_string = function
206
+
| Pool_empty -> "Pool empty: no connections available"
207
+
| Pool_closed -> "Pool closed"
208
+
| Pool_timeout -> "Timeout waiting for connection"
209
+
| Connection_error msg -> Printf.sprintf "Connection error: %s" msg
210
+
211
+
module Make (D : Driver.S) = struct
212
+
type nonrec t = D.connection t
213
+
type conn = D.connection
214
+
215
+
let create ~max_size ~conninfo ?validate () =
216
+
let config =
217
+
{
218
+
max_size;
219
+
connect =
220
+
(fun () -> D.connect conninfo |> Result.map_error D.error_message);
221
+
close = D.close;
222
+
validate;
223
+
}
224
+
in
225
+
create config
226
+
227
+
let acquire = acquire
228
+
let acquire_blocking = acquire_blocking
229
+
let release = release
230
+
let with_connection = with_connection
231
+
let with_connection_blocking = with_connection_blocking
232
+
let drain = drain
233
+
let shutdown = shutdown
234
+
let stats = stats
235
+
let size = size
236
+
let available = available
237
+
let in_use = in_use
238
+
let is_closed = is_closed
239
+
end
+3
-3
lib/stream.ml
+3
-3
lib/stream.ml
···
58
58
D.query_fold conn sql ~params ~init:[] ~f:(fun acc row -> f row :: acc)
59
59
|> Result.map List.rev
60
60
61
-
let cursor_counter = ref 0
61
+
let cursor_counter = Atomic.make 0
62
62
63
63
let generate_cursor_name () =
64
-
incr cursor_counter;
65
-
Printf.sprintf "repodb_cursor_%d" !cursor_counter
64
+
let n = Atomic.fetch_and_add cursor_counter 1 in
65
+
Printf.sprintf "repodb_cursor_%d" n
66
66
67
67
let cursor_fold conn ~config sql ~params ~init ~f =
68
68
let cursor_name = generate_cursor_name () in
+5
repodb.opam
+5
repodb.opam
···
19
19
"ptime" {>= "1.1"}
20
20
"re" {>= "1.11"}
21
21
"uuidm" {>= "0.9"}
22
+
"kcas" {>= "0.7"}
23
+
"kcas_data" {>= "0.7"}
22
24
"alcotest" {>= "1.7" & with-test}
23
25
"yojson" {>= "2.0" & with-test}
26
+
"lwt" {>= "5.6" & with-test}
27
+
"eio" {>= "1.0" & with-test}
28
+
"eio_main" {>= "1.0" & with-test}
24
29
"odoc" {with-doc}
25
30
]
26
31
build: [
+11
test/dune
+11
test/dune
···
16
16
test_migration
17
17
test_multi
18
18
test_stream
19
+
test_pool
19
20
test_repodb))
21
+
22
+
(test
23
+
(name test_pool_lwt)
24
+
(libraries repodb repodb-sqlite lwt lwt.unix)
25
+
(modules test_pool_lwt))
26
+
27
+
(test
28
+
(name test_pool_eio)
29
+
(libraries repodb repodb-sqlite eio eio_main)
30
+
(modules test_pool_eio))
+193
test/test_pool.ml
+193
test/test_pool.ml
···
1
+
open Repodb
2
+
3
+
type mock_conn = { id : int; mutable closed : bool }
4
+
5
+
let conn_counter = Atomic.make 0
6
+
7
+
let mock_config ?(max_size = 3) ?(fail_connect = false) ?(validate = None) () =
8
+
Pool.
9
+
{
10
+
max_size;
11
+
connect =
12
+
(fun () ->
13
+
if fail_connect then Error "Connection failed"
14
+
else
15
+
let id = Atomic.fetch_and_add conn_counter 1 in
16
+
Ok { id; closed = false });
17
+
close = (fun conn -> conn.closed <- true);
18
+
validate;
19
+
}
20
+
21
+
let test_create_pool () =
22
+
let pool = Pool.create (mock_config ()) in
23
+
Alcotest.(check int) "initial size" 0 (Pool.size pool);
24
+
Alcotest.(check int) "initial available" 0 (Pool.available pool);
25
+
Alcotest.(check int) "initial in_use" 0 (Pool.in_use pool);
26
+
Alcotest.(check bool) "not closed" false (Pool.is_closed pool)
27
+
28
+
let test_acquire_creates_connection () =
29
+
let pool = Pool.create (mock_config ()) in
30
+
match Pool.acquire pool with
31
+
| Ok conn ->
32
+
Alcotest.(check int) "total is 1" 1 (Pool.size pool);
33
+
Alcotest.(check int) "in_use is 1" 1 (Pool.in_use pool);
34
+
Alcotest.(check bool) "conn not closed" false conn.closed;
35
+
Pool.release pool conn
36
+
| Error _ -> Alcotest.fail "Expected successful acquire"
37
+
38
+
let test_release_returns_to_pool () =
39
+
let pool = Pool.create (mock_config ()) in
40
+
match Pool.acquire pool with
41
+
| Ok conn ->
42
+
Pool.release pool conn;
43
+
Alcotest.(check int) "available is 1" 1 (Pool.available pool);
44
+
Alcotest.(check int) "in_use is 0" 0 (Pool.in_use pool)
45
+
| Error _ -> Alcotest.fail "Expected successful acquire"
46
+
47
+
let test_acquire_reuses_connection () =
48
+
let pool = Pool.create (mock_config ()) in
49
+
let conn1 =
50
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
51
+
in
52
+
let id1 = conn1.id in
53
+
Pool.release pool conn1;
54
+
let conn2 =
55
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
56
+
in
57
+
Alcotest.(check int) "same connection reused" id1 conn2.id;
58
+
Alcotest.(check int) "still only 1 total" 1 (Pool.size pool);
59
+
Pool.release pool conn2
60
+
61
+
let test_max_size_enforced () =
62
+
let pool = Pool.create (mock_config ~max_size:2 ()) in
63
+
let c1 =
64
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
65
+
in
66
+
let c2 =
67
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
68
+
in
69
+
(match Pool.acquire pool with
70
+
| Error Pool.Pool_empty -> ()
71
+
| _ -> Alcotest.fail "Expected Pool_empty");
72
+
Pool.release pool c1;
73
+
Pool.release pool c2
74
+
75
+
let test_with_connection () =
76
+
let pool = Pool.create (mock_config ()) in
77
+
let result =
78
+
Pool.with_connection pool (fun conn ->
79
+
Alcotest.(check int) "in_use during work" 1 (Pool.in_use pool);
80
+
conn.id + 100)
81
+
in
82
+
match result with
83
+
| Ok n ->
84
+
Alcotest.(check bool) "got result" true (n >= 100);
85
+
Alcotest.(check int) "released after" 0 (Pool.in_use pool)
86
+
| Error _ -> Alcotest.fail "Expected successful with_connection"
87
+
88
+
let test_with_connection_releases_on_exception () =
89
+
let pool = Pool.create (mock_config ()) in
90
+
(try
91
+
let _ =
92
+
Pool.with_connection pool (fun _ ->
93
+
Alcotest.(check int) "in_use" 1 (Pool.in_use pool);
94
+
failwith "boom")
95
+
in
96
+
()
97
+
with Failure _ -> ());
98
+
Alcotest.(check int) "released after exception" 0 (Pool.in_use pool)
99
+
100
+
let test_connection_error () =
101
+
let pool = Pool.create (mock_config ~fail_connect:true ()) in
102
+
match Pool.acquire pool with
103
+
| Error (Pool.Connection_error msg) ->
104
+
Alcotest.(check string) "error message" "Connection failed" msg
105
+
| _ -> Alcotest.fail "Expected Connection_error"
106
+
107
+
let test_validation () =
108
+
let call_count = ref 0 in
109
+
let validate conn =
110
+
incr call_count;
111
+
not conn.closed
112
+
in
113
+
let pool = Pool.create (mock_config ~validate:(Some validate) ()) in
114
+
let conn =
115
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
116
+
in
117
+
Pool.release pool conn;
118
+
let _ =
119
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
120
+
in
121
+
Alcotest.(check bool) "validate called" true (!call_count > 0)
122
+
123
+
let test_shutdown () =
124
+
let pool = Pool.create (mock_config ()) in
125
+
let conn =
126
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
127
+
in
128
+
Pool.release pool conn;
129
+
Pool.shutdown pool;
130
+
Alcotest.(check bool) "pool closed" true (Pool.is_closed pool);
131
+
match Pool.acquire pool with
132
+
| Error Pool.Pool_closed -> ()
133
+
| _ -> Alcotest.fail "Expected Pool_closed"
134
+
135
+
let test_drain () =
136
+
let pool = Pool.create (mock_config ()) in
137
+
let c1 =
138
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
139
+
in
140
+
let c2 =
141
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
142
+
in
143
+
Pool.release pool c1;
144
+
Pool.release pool c2;
145
+
Alcotest.(check int) "2 available before drain" 2 (Pool.available pool);
146
+
Pool.drain pool;
147
+
Alcotest.(check int) "0 available after drain" 0 (Pool.available pool);
148
+
Alcotest.(check int) "0 total after drain" 0 (Pool.size pool)
149
+
150
+
let test_stats () =
151
+
let pool = Pool.create (mock_config ~max_size:5 ()) in
152
+
let conn =
153
+
match Pool.acquire pool with Ok c -> c | Error _ -> Alcotest.fail "fail"
154
+
in
155
+
let stats = Pool.stats pool in
156
+
Alcotest.(check int) "stats.total" 1 stats.total;
157
+
Alcotest.(check int) "stats.in_use" 1 stats.in_use;
158
+
Alcotest.(check int) "stats.available" 0 stats.available;
159
+
Alcotest.(check bool) "stats.closed" false stats.closed;
160
+
Pool.release pool conn
161
+
162
+
let test_error_to_string () =
163
+
Alcotest.(check string)
164
+
"Pool_empty" "Pool empty: no connections available"
165
+
(Pool.error_to_string Pool.Pool_empty);
166
+
Alcotest.(check string)
167
+
"Pool_closed" "Pool closed"
168
+
(Pool.error_to_string Pool.Pool_closed);
169
+
Alcotest.(check string)
170
+
"Pool_timeout" "Timeout waiting for connection"
171
+
(Pool.error_to_string Pool.Pool_timeout);
172
+
Alcotest.(check string)
173
+
"Connection_error" "Connection error: oops"
174
+
(Pool.error_to_string (Pool.Connection_error "oops"))
175
+
176
+
let tests =
177
+
[
178
+
("create pool", `Quick, test_create_pool);
179
+
("acquire creates connection", `Quick, test_acquire_creates_connection);
180
+
("release returns to pool", `Quick, test_release_returns_to_pool);
181
+
("acquire reuses connection", `Quick, test_acquire_reuses_connection);
182
+
("max size enforced", `Quick, test_max_size_enforced);
183
+
("with_connection", `Quick, test_with_connection);
184
+
( "with_connection releases on exception",
185
+
`Quick,
186
+
test_with_connection_releases_on_exception );
187
+
("connection error", `Quick, test_connection_error);
188
+
("validation", `Quick, test_validation);
189
+
("shutdown", `Quick, test_shutdown);
190
+
("drain", `Quick, test_drain);
191
+
("stats", `Quick, test_stats);
192
+
("error_to_string", `Quick, test_error_to_string);
193
+
]
+431
test/test_pool_eio.ml
+431
test/test_pool_eio.ml
···
1
+
(** Example: Using repodb Pool with Eio
2
+
3
+
This demonstrates how to use the lock-free connection pool with Eio's
4
+
direct-style effects. The pool uses kcas internally, which integrates
5
+
naturally with Eio's scheduler.
6
+
7
+
Key patterns shown:
8
+
- Creating a pool for use with Eio fibers
9
+
- Running blocking DB operations in a thread pool
10
+
- Concurrent database access from multiple Eio fibers
11
+
- Using domains for true parallelism *)
12
+
13
+
module Pool = Repodb.Pool
14
+
module Driver = Repodb_sqlite
15
+
16
+
let db_path = "/tmp/repodb_pool_eio_test.db"
17
+
18
+
let pool_config () =
19
+
Pool.
20
+
{
21
+
max_size = 5;
22
+
connect =
23
+
(fun () ->
24
+
match Driver.connect db_path with
25
+
| Error e -> Error (Driver.error_message e)
26
+
| Ok conn ->
27
+
let _ = Driver.exec conn "PRAGMA journal_mode=WAL" ~params:[||] in
28
+
let _ =
29
+
Driver.exec conn "PRAGMA busy_timeout=5000" ~params:[||]
30
+
in
31
+
Ok conn);
32
+
close = Driver.close;
33
+
validate = None;
34
+
}
35
+
36
+
let setup_schema conn =
37
+
let exec sql =
38
+
match Driver.exec conn sql ~params:[||] with
39
+
| Ok () -> ()
40
+
| Error e -> failwith (Driver.error_message e)
41
+
in
42
+
exec
43
+
"CREATE TABLE IF NOT EXISTS counter (id INTEGER PRIMARY KEY, value INTEGER)";
44
+
exec "DELETE FROM counter";
45
+
exec "INSERT INTO counter (id, value) VALUES (1, 0)"
46
+
47
+
let increment conn =
48
+
match
49
+
Driver.exec conn "UPDATE counter SET value = value + 1 WHERE id = 1"
50
+
~params:[||]
51
+
with
52
+
| Ok () -> ()
53
+
| Error e -> failwith (Driver.error_message e)
54
+
55
+
let get_value conn =
56
+
match
57
+
Driver.query conn "SELECT value FROM counter WHERE id = 1" ~params:[||]
58
+
with
59
+
| Ok [ row ] -> Repodb.Driver.row_int row 0
60
+
| Ok _ -> failwith "Expected one row"
61
+
| Error e -> failwith (Driver.error_message e)
62
+
63
+
let test_concurrent_fibers ~executor pool =
64
+
Eio.traceln "=== Test: Concurrent fibers ===";
65
+
66
+
(match Pool.acquire pool with
67
+
| Ok conn ->
68
+
setup_schema conn;
69
+
Pool.release pool conn
70
+
| Error e -> failwith (Pool.error_to_string e));
71
+
72
+
let n_fibers = 50 in
73
+
let n_increments = 10 in
74
+
75
+
Eio.Fiber.all
76
+
(List.init n_fibers (fun i () ->
77
+
for _ = 1 to n_increments do
78
+
Eio.Executor_pool.submit_exn executor ~weight:1.0 (fun () ->
79
+
match Pool.acquire pool with
80
+
| Ok conn ->
81
+
increment conn;
82
+
Pool.release pool conn
83
+
| Error e -> failwith (Pool.error_to_string e))
84
+
done;
85
+
if (i + 1) mod 10 = 0 then Eio.traceln " Fiber %d completed" (i + 1)));
86
+
87
+
let final =
88
+
match Pool.acquire pool with
89
+
| Ok conn ->
90
+
let v = get_value conn in
91
+
Pool.release pool conn;
92
+
v
93
+
| Error e -> failwith (Pool.error_to_string e)
94
+
in
95
+
96
+
let expected = n_fibers * n_increments in
97
+
Eio.traceln " Final counter: %d (expected %d)" final expected;
98
+
assert (final = expected);
99
+
Eio.traceln " PASSED"
100
+
101
+
let test_blocking_acquire ~clock pool =
102
+
Eio.traceln "=== Test: Blocking acquire ===";
103
+
104
+
let acquired = ref [] in
105
+
for _ = 1 to 5 do
106
+
match Pool.acquire pool with
107
+
| Ok conn -> acquired := conn :: !acquired
108
+
| Error e -> failwith (Pool.error_to_string e)
109
+
done;
110
+
Eio.traceln " Acquired all 5 connections";
111
+
112
+
(match Pool.acquire pool with
113
+
| Error Pool.Pool_empty -> Eio.traceln " Pool correctly reports empty"
114
+
| _ -> failwith "Expected Pool_empty");
115
+
116
+
let got_connection = Atomic.make false in
117
+
118
+
let rec poll_acquire timeout =
119
+
if timeout <= 0.0 then None
120
+
else
121
+
match Pool.acquire pool with
122
+
| Ok conn -> Some conn
123
+
| Error Pool.Pool_empty ->
124
+
Eio.Time.sleep clock 0.01;
125
+
poll_acquire (timeout -. 0.01)
126
+
| Error e -> failwith (Pool.error_to_string e)
127
+
in
128
+
129
+
Eio.Fiber.both
130
+
(fun () ->
131
+
Eio.Time.sleep clock 0.05;
132
+
Eio.traceln " Releasing one connection...";
133
+
match !acquired with
134
+
| conn :: rest ->
135
+
Pool.release pool conn;
136
+
acquired := rest
137
+
| [] -> ())
138
+
(fun () ->
139
+
Eio.traceln " Waiting for connection (polling)...";
140
+
match poll_acquire 1.0 with
141
+
| Some conn ->
142
+
Eio.traceln " Got connection!";
143
+
Atomic.set got_connection true;
144
+
Pool.release pool conn
145
+
| None -> failwith "Timeout waiting for connection");
146
+
147
+
assert (Atomic.get got_connection);
148
+
List.iter (Pool.release pool) !acquired;
149
+
Eio.traceln " PASSED"
150
+
151
+
let test_with_connection pool =
152
+
Eio.traceln "=== Test: with_connection pattern ===";
153
+
154
+
(match Pool.acquire pool with
155
+
| Ok conn ->
156
+
setup_schema conn;
157
+
Pool.release pool conn
158
+
| Error e -> failwith (Pool.error_to_string e));
159
+
160
+
let result =
161
+
Pool.with_connection pool (fun conn ->
162
+
increment conn;
163
+
increment conn;
164
+
get_value conn)
165
+
in
166
+
167
+
(match result with
168
+
| Ok value -> Eio.traceln " Counter after 2 increments: %d" value
169
+
| Error e -> failwith (Pool.error_to_string e));
170
+
171
+
let stats = Pool.stats pool in
172
+
Eio.traceln " Stats: total=%d in_use=%d available=%d" stats.total
173
+
stats.in_use stats.available;
174
+
assert (stats.in_use = 0);
175
+
Eio.traceln " PASSED"
176
+
177
+
let test_multi_domain ~executor pool =
178
+
Eio.traceln "=== Test: Multi-domain parallelism ===";
179
+
180
+
(match Pool.acquire pool with
181
+
| Ok conn ->
182
+
setup_schema conn;
183
+
Pool.release pool conn
184
+
| Error e -> failwith (Pool.error_to_string e));
185
+
186
+
let n_domains = 4 in
187
+
let n_ops = 25 in
188
+
189
+
let domain_work domain_id =
190
+
for _ = 1 to n_ops do
191
+
match Pool.acquire pool with
192
+
| Ok conn ->
193
+
increment conn;
194
+
Pool.release pool conn
195
+
| Error e -> failwith (Pool.error_to_string e)
196
+
done;
197
+
Eio.traceln " Domain %d completed %d operations" domain_id n_ops
198
+
in
199
+
200
+
Eio.Fiber.all
201
+
(List.init n_domains (fun i () ->
202
+
Eio.Executor_pool.submit_exn executor ~weight:1.0 (fun () ->
203
+
domain_work i)));
204
+
205
+
let final =
206
+
match Pool.acquire pool with
207
+
| Ok conn ->
208
+
let v = get_value conn in
209
+
Pool.release pool conn;
210
+
v
211
+
| Error e -> failwith (Pool.error_to_string e)
212
+
in
213
+
214
+
let expected = n_domains * n_ops in
215
+
Eio.traceln " Final counter: %d (expected %d)" final expected;
216
+
assert (final = expected);
217
+
Eio.traceln " PASSED"
218
+
219
+
let test_pool_stats pool =
220
+
Eio.traceln "=== Test: Pool statistics ===";
221
+
222
+
let print_stats label =
223
+
let s = Pool.stats pool in
224
+
Eio.traceln " [%s] total=%d in_use=%d available=%d" label s.total s.in_use
225
+
s.available
226
+
in
227
+
228
+
Pool.drain pool;
229
+
print_stats "after drain";
230
+
231
+
let c1 =
232
+
match Pool.acquire pool with
233
+
| Ok c -> c
234
+
| Error e -> failwith (Pool.error_to_string e)
235
+
in
236
+
print_stats "after 1st acquire";
237
+
238
+
let c2 =
239
+
match Pool.acquire pool with
240
+
| Ok c -> c
241
+
| Error e -> failwith (Pool.error_to_string e)
242
+
in
243
+
print_stats "after 2nd acquire";
244
+
245
+
Pool.release pool c1;
246
+
print_stats "after 1st release";
247
+
248
+
Pool.release pool c2;
249
+
print_stats "after 2nd release";
250
+
251
+
Eio.traceln " PASSED"
252
+
253
+
let setup_streaming_data conn =
254
+
let exec sql =
255
+
match Driver.exec conn sql ~params:[||] with
256
+
| Ok () -> ()
257
+
| Error e -> failwith (Driver.error_message e)
258
+
in
259
+
exec
260
+
"CREATE TABLE IF NOT EXISTS numbers (id INTEGER PRIMARY KEY, value INTEGER)";
261
+
exec "DELETE FROM numbers";
262
+
for i = 1 to 1000 do
263
+
let _ =
264
+
Driver.exec conn
265
+
(Printf.sprintf "INSERT INTO numbers (value) VALUES (%d)" i)
266
+
~params:[||]
267
+
in
268
+
()
269
+
done
270
+
271
+
let test_streaming_fold ~executor pool =
272
+
Eio.traceln "=== Test: Streaming fold with Pool ===";
273
+
274
+
(match Pool.acquire pool with
275
+
| Ok conn ->
276
+
setup_streaming_data conn;
277
+
Pool.release pool conn
278
+
| Error e -> failwith (Pool.error_to_string e));
279
+
280
+
let n_fibers = 10 in
281
+
let results = Array.make n_fibers 0 in
282
+
283
+
Eio.Fiber.all
284
+
(List.init n_fibers (fun id () ->
285
+
Eio.Executor_pool.submit_exn executor ~weight:1.0 (fun () ->
286
+
match Pool.acquire pool with
287
+
| Ok conn ->
288
+
let sum =
289
+
match
290
+
Driver.query_fold conn "SELECT value FROM numbers"
291
+
~params:[||] ~init:0 ~f:(fun acc row ->
292
+
acc + Repodb.Driver.row_int row 0)
293
+
with
294
+
| Ok s -> s
295
+
| Error e -> failwith (Driver.error_message e)
296
+
in
297
+
results.(id) <- sum;
298
+
Pool.release pool conn
299
+
| Error e -> failwith (Pool.error_to_string e))));
300
+
301
+
let expected_sum = 1000 * 1001 / 2 in
302
+
Array.iteri
303
+
(fun i sum ->
304
+
if sum <> expected_sum then
305
+
failwith
306
+
(Printf.sprintf "Fiber %d got sum %d, expected %d" i sum expected_sum))
307
+
results;
308
+
309
+
Eio.traceln " All %d fibers computed correct sum: %d" n_fibers expected_sum;
310
+
Eio.traceln " PASSED"
311
+
312
+
let test_streaming_iter ~executor pool =
313
+
Eio.traceln "=== Test: Streaming iter with Pool ===";
314
+
315
+
(match Pool.acquire pool with
316
+
| Ok conn ->
317
+
setup_streaming_data conn;
318
+
Pool.release pool conn
319
+
| Error e -> failwith (Pool.error_to_string e));
320
+
321
+
let count = Atomic.make 0 in
322
+
let sum = Atomic.make 0 in
323
+
let n_fibers = 5 in
324
+
325
+
Eio.Fiber.all
326
+
(List.init n_fibers (fun _id () ->
327
+
Eio.Executor_pool.submit_exn executor ~weight:1.0 (fun () ->
328
+
match Pool.acquire pool with
329
+
| Ok conn ->
330
+
(match
331
+
Driver.query_iter conn
332
+
"SELECT value FROM numbers WHERE value <= 100" ~params:[||]
333
+
~f:(fun row ->
334
+
Atomic.incr count;
335
+
let _ =
336
+
Atomic.fetch_and_add sum (Repodb.Driver.row_int row 0)
337
+
in
338
+
())
339
+
with
340
+
| Ok () -> ()
341
+
| Error e -> failwith (Driver.error_message e));
342
+
Pool.release pool conn
343
+
| Error e -> failwith (Pool.error_to_string e))));
344
+
345
+
let expected_sum = n_fibers * (100 * 101) / 2 in
346
+
let actual_sum = Atomic.get sum in
347
+
Eio.traceln " Total rows processed: %d" (Atomic.get count);
348
+
Eio.traceln " Combined sum: %d (expected %d)" actual_sum expected_sum;
349
+
350
+
if actual_sum <> expected_sum then
351
+
failwith
352
+
(Printf.sprintf "Sum mismatch: got %d, expected %d" actual_sum
353
+
expected_sum);
354
+
355
+
Eio.traceln " PASSED"
356
+
357
+
let test_streaming_concurrent_domains ~executor pool =
358
+
Eio.traceln "=== Test: Streaming across domains ===";
359
+
360
+
(match Pool.acquire pool with
361
+
| Ok conn ->
362
+
setup_streaming_data conn;
363
+
Pool.release pool conn
364
+
| Error e -> failwith (Pool.error_to_string e));
365
+
366
+
let n_domains = 4 in
367
+
let results = Array.make n_domains 0 in
368
+
369
+
Eio.Fiber.all
370
+
(List.init n_domains (fun id () ->
371
+
Eio.Executor_pool.submit_exn executor ~weight:1.0 (fun () ->
372
+
match Pool.acquire pool with
373
+
| Ok conn ->
374
+
let sum =
375
+
match
376
+
Driver.query_fold conn
377
+
"SELECT value FROM numbers WHERE value > 500"
378
+
~params:[||] ~init:0 ~f:(fun acc row ->
379
+
acc + Repodb.Driver.row_int row 0)
380
+
with
381
+
| Ok s -> s
382
+
| Error e -> failwith (Driver.error_message e)
383
+
in
384
+
results.(id) <- sum;
385
+
Pool.release pool conn
386
+
| Error e -> failwith (Pool.error_to_string e))));
387
+
388
+
let expected_sum =
389
+
List.init 500 (fun i -> 501 + i) |> List.fold_left ( + ) 0
390
+
in
391
+
Array.iteri
392
+
(fun i sum ->
393
+
if sum <> expected_sum then
394
+
failwith
395
+
(Printf.sprintf "Domain %d got sum %d, expected %d" i sum expected_sum))
396
+
results;
397
+
398
+
Eio.traceln " All %d domains computed correct sum: %d" n_domains expected_sum;
399
+
Eio.traceln " PASSED"
400
+
401
+
let () =
402
+
Eio.traceln "========================================";
403
+
Eio.traceln "repodb Pool + Eio Integration Tests";
404
+
Eio.traceln "========================================";
405
+
406
+
(try Sys.remove db_path with Sys_error _ -> ());
407
+
408
+
Eio_main.run @@ fun env ->
409
+
let clock = Eio.Stdenv.clock env in
410
+
let domain_mgr = Eio.Stdenv.domain_mgr env in
411
+
412
+
Eio.Switch.run @@ fun sw ->
413
+
let executor = Eio.Executor_pool.create ~sw domain_mgr ~domain_count:4 in
414
+
let pool = Pool.create (pool_config ()) in
415
+
416
+
test_concurrent_fibers ~executor pool;
417
+
test_blocking_acquire ~clock pool;
418
+
test_with_connection pool;
419
+
test_multi_domain ~executor pool;
420
+
test_pool_stats pool;
421
+
test_streaming_fold ~executor pool;
422
+
test_streaming_iter ~executor pool;
423
+
test_streaming_concurrent_domains ~executor pool;
424
+
425
+
Pool.shutdown pool;
426
+
427
+
(try Sys.remove db_path with Sys_error _ -> ());
428
+
429
+
Eio.traceln "========================================";
430
+
Eio.traceln "All Eio pool tests PASSED!";
431
+
Eio.traceln "========================================"
+386
test/test_pool_lwt.ml
+386
test/test_pool_lwt.ml
···
1
+
(** Example: Using repodb Pool with Lwt
2
+
3
+
This demonstrates how to use the lock-free connection pool with Lwt's
4
+
cooperative threading model. The pool uses kcas internally, which is
5
+
scheduler-agnostic, so it works seamlessly with Lwt.
6
+
7
+
Key patterns shown:
8
+
- Creating a pool with Lwt
9
+
- Using Lwt_preemptive.detach for blocking pool operations
10
+
- Concurrent database access from multiple Lwt threads
11
+
- Pool exhaustion and blocking acquire with timeout *)
12
+
13
+
open Lwt.Infix
14
+
module Pool = Repodb.Pool
15
+
module Driver = Repodb_sqlite
16
+
17
+
let db_path = "/tmp/repodb_pool_lwt_test.db"
18
+
19
+
let pool_config () =
20
+
Pool.
21
+
{
22
+
max_size = 5;
23
+
connect =
24
+
(fun () ->
25
+
match Driver.connect db_path with
26
+
| Error e -> Error (Driver.error_message e)
27
+
| Ok conn ->
28
+
let _ = Driver.exec conn "PRAGMA journal_mode=WAL" ~params:[||] in
29
+
let _ =
30
+
Driver.exec conn "PRAGMA busy_timeout=5000" ~params:[||]
31
+
in
32
+
Ok conn);
33
+
close = Driver.close;
34
+
validate = None;
35
+
}
36
+
37
+
let setup_schema conn =
38
+
let exec sql =
39
+
match Driver.exec conn sql ~params:[||] with
40
+
| Ok () -> ()
41
+
| Error e -> failwith (Driver.error_message e)
42
+
in
43
+
exec "PRAGMA journal_mode=WAL";
44
+
exec "PRAGMA busy_timeout=5000";
45
+
exec
46
+
"CREATE TABLE IF NOT EXISTS counter (id INTEGER PRIMARY KEY, value INTEGER)";
47
+
exec "DELETE FROM counter";
48
+
exec "INSERT INTO counter (id, value) VALUES (1, 0)"
49
+
50
+
let increment conn =
51
+
match
52
+
Driver.exec conn "UPDATE counter SET value = value + 1 WHERE id = 1"
53
+
~params:[||]
54
+
with
55
+
| Ok () -> ()
56
+
| Error e -> failwith (Driver.error_message e)
57
+
58
+
let get_value conn =
59
+
match
60
+
Driver.query conn "SELECT value FROM counter WHERE id = 1" ~params:[||]
61
+
with
62
+
| Ok [ row ] -> Repodb.Driver.row_int row 0
63
+
| Ok _ -> failwith "Expected one row"
64
+
| Error e -> failwith (Driver.error_message e)
65
+
66
+
let test_concurrent_increments () =
67
+
Printf.printf "\n=== Test: Concurrent increments with Lwt ===\n%!";
68
+
69
+
let pool = Pool.create (pool_config ()) in
70
+
71
+
(match Pool.acquire pool with
72
+
| Ok conn ->
73
+
setup_schema conn;
74
+
Pool.release pool conn
75
+
| Error e -> failwith (Pool.error_to_string e));
76
+
77
+
let n_tasks = 50 in
78
+
let n_increments = 10 in
79
+
80
+
let worker id =
81
+
let rec loop remaining =
82
+
if remaining <= 0 then Lwt.return_unit
83
+
else
84
+
Lwt_preemptive.detach
85
+
(fun () ->
86
+
match Pool.acquire pool with
87
+
| Ok conn ->
88
+
increment conn;
89
+
Pool.release pool conn
90
+
| Error e -> failwith (Pool.error_to_string e))
91
+
()
92
+
>>= fun () ->
93
+
if remaining mod 5 = 0 then
94
+
Printf.printf " Worker %d: %d increments remaining\n%!" id remaining;
95
+
loop (remaining - 1)
96
+
in
97
+
loop n_increments
98
+
in
99
+
100
+
Lwt_main.run (Lwt.join (List.init n_tasks worker));
101
+
102
+
let final =
103
+
match Pool.acquire pool with
104
+
| Ok conn ->
105
+
let v = get_value conn in
106
+
Pool.release pool conn;
107
+
v
108
+
| Error e -> failwith (Pool.error_to_string e)
109
+
in
110
+
111
+
let expected = n_tasks * n_increments in
112
+
Printf.printf " Final counter value: %d (expected %d)\n%!" final expected;
113
+
assert (final = expected);
114
+
115
+
Pool.shutdown pool;
116
+
Printf.printf " PASSED\n%!"
117
+
118
+
let test_blocking_acquire () =
119
+
Printf.printf "\n=== Test: Blocking acquire with Lwt ===\n%!";
120
+
121
+
let pool = Pool.create (pool_config ()) in
122
+
let acquired = ref [] in
123
+
124
+
for _ = 1 to 5 do
125
+
match Pool.acquire pool with
126
+
| Ok conn -> acquired := conn :: !acquired
127
+
| Error e -> failwith (Pool.error_to_string e)
128
+
done;
129
+
Printf.printf " Acquired all 5 connections\n%!";
130
+
131
+
(match Pool.acquire pool with
132
+
| Error Pool.Pool_empty -> Printf.printf " Pool correctly reports empty\n%!"
133
+
| _ -> failwith "Expected Pool_empty");
134
+
135
+
let success = ref false in
136
+
137
+
let release_task =
138
+
Lwt_unix.sleep 0.05 >>= fun () ->
139
+
Printf.printf " Releasing one connection...\n%!";
140
+
(match !acquired with
141
+
| conn :: rest ->
142
+
Pool.release pool conn;
143
+
acquired := rest
144
+
| [] -> ());
145
+
Lwt.return_unit
146
+
in
147
+
148
+
let rec poll_acquire timeout =
149
+
if timeout <= 0.0 then Lwt.return_none
150
+
else
151
+
match Pool.acquire pool with
152
+
| Ok conn -> Lwt.return_some conn
153
+
| Error Pool.Pool_empty ->
154
+
Lwt_unix.sleep 0.01 >>= fun () -> poll_acquire (timeout -. 0.01)
155
+
| Error e -> failwith (Pool.error_to_string e)
156
+
in
157
+
158
+
let acquire_task =
159
+
Printf.printf " Waiting for connection (polling)...\n%!";
160
+
poll_acquire 1.0 >>= function
161
+
| Some conn ->
162
+
Printf.printf " Got connection after wait!\n%!";
163
+
success := true;
164
+
Pool.release pool conn;
165
+
Lwt.return_unit
166
+
| None -> failwith "Timeout waiting for connection"
167
+
in
168
+
169
+
Lwt_main.run (Lwt.join [ release_task; acquire_task ]);
170
+
assert !success;
171
+
172
+
List.iter (Pool.release pool) !acquired;
173
+
Pool.shutdown pool;
174
+
Printf.printf " PASSED\n%!"
175
+
176
+
let test_with_connection_pattern () =
177
+
Printf.printf "\n=== Test: with_connection pattern ===\n%!";
178
+
179
+
let pool = Pool.create (pool_config ()) in
180
+
181
+
(match Pool.acquire pool with
182
+
| Ok conn ->
183
+
setup_schema conn;
184
+
Pool.release pool conn
185
+
| Error e -> failwith (Pool.error_to_string e));
186
+
187
+
let result =
188
+
Pool.with_connection pool (fun conn ->
189
+
increment conn;
190
+
increment conn;
191
+
get_value conn)
192
+
in
193
+
194
+
(match result with
195
+
| Ok value -> Printf.printf " Counter after 2 increments: %d\n%!" value
196
+
| Error e -> failwith (Pool.error_to_string e));
197
+
198
+
let stats = Pool.stats pool in
199
+
Printf.printf " Pool stats: total=%d, in_use=%d, available=%d\n%!"
200
+
stats.total stats.in_use stats.available;
201
+
assert (stats.in_use = 0);
202
+
203
+
Pool.shutdown pool;
204
+
Printf.printf " PASSED\n%!"
205
+
206
+
let test_pool_stats () =
207
+
Printf.printf "\n=== Test: Pool statistics ===\n%!";
208
+
209
+
let pool = Pool.create (pool_config ()) in
210
+
211
+
let print_stats label =
212
+
let s = Pool.stats pool in
213
+
Printf.printf " [%s] total=%d in_use=%d available=%d\n%!" label s.total
214
+
s.in_use s.available
215
+
in
216
+
217
+
print_stats "initial";
218
+
assert ((Pool.stats pool).total = 0);
219
+
220
+
let c1 =
221
+
match Pool.acquire pool with
222
+
| Ok c -> c
223
+
| Error e -> failwith (Pool.error_to_string e)
224
+
in
225
+
print_stats "after 1st acquire";
226
+
assert ((Pool.stats pool).in_use = 1);
227
+
228
+
let c2 =
229
+
match Pool.acquire pool with
230
+
| Ok c -> c
231
+
| Error e -> failwith (Pool.error_to_string e)
232
+
in
233
+
print_stats "after 2nd acquire";
234
+
assert ((Pool.stats pool).in_use = 2);
235
+
236
+
Pool.release pool c1;
237
+
print_stats "after 1st release";
238
+
assert ((Pool.stats pool).in_use = 1);
239
+
assert ((Pool.stats pool).available = 1);
240
+
241
+
Pool.release pool c2;
242
+
print_stats "after 2nd release";
243
+
assert ((Pool.stats pool).in_use = 0);
244
+
245
+
Pool.shutdown pool;
246
+
Printf.printf " PASSED\n%!"
247
+
248
+
let setup_streaming_data conn =
249
+
let exec sql =
250
+
match Driver.exec conn sql ~params:[||] with
251
+
| Ok () -> ()
252
+
| Error e -> failwith (Driver.error_message e)
253
+
in
254
+
exec
255
+
"CREATE TABLE IF NOT EXISTS numbers (id INTEGER PRIMARY KEY, value INTEGER)";
256
+
exec "DELETE FROM numbers";
257
+
for i = 1 to 1000 do
258
+
let _ =
259
+
Driver.exec conn
260
+
(Printf.sprintf "INSERT INTO numbers (value) VALUES (%d)" i)
261
+
~params:[||]
262
+
in
263
+
()
264
+
done
265
+
266
+
let test_streaming_with_pool () =
267
+
Printf.printf "\n=== Test: Streaming with Pool (Lwt) ===\n%!";
268
+
269
+
let pool = Pool.create (pool_config ()) in
270
+
271
+
(match Pool.acquire pool with
272
+
| Ok conn ->
273
+
setup_streaming_data conn;
274
+
Pool.release pool conn
275
+
| Error e -> failwith (Pool.error_to_string e));
276
+
277
+
let n_workers = 10 in
278
+
let results = Array.make n_workers 0 in
279
+
280
+
let worker id =
281
+
Lwt_preemptive.detach
282
+
(fun () ->
283
+
match Pool.acquire pool with
284
+
| Ok conn ->
285
+
let sum =
286
+
match
287
+
Driver.query_fold conn "SELECT value FROM numbers" ~params:[||]
288
+
~init:0 ~f:(fun acc row -> acc + Repodb.Driver.row_int row 0)
289
+
with
290
+
| Ok s -> s
291
+
| Error e -> failwith (Driver.error_message e)
292
+
in
293
+
results.(id) <- sum;
294
+
Pool.release pool conn
295
+
| Error e -> failwith (Pool.error_to_string e))
296
+
()
297
+
in
298
+
299
+
Lwt_main.run (Lwt.join (List.init n_workers worker));
300
+
301
+
let expected_sum = 1000 * 1001 / 2 in
302
+
Array.iteri
303
+
(fun i sum ->
304
+
if sum <> expected_sum then
305
+
failwith
306
+
(Printf.sprintf "Worker %d got sum %d, expected %d" i sum expected_sum))
307
+
results;
308
+
309
+
Printf.printf " All %d workers computed correct sum: %d\n%!" n_workers
310
+
expected_sum;
311
+
312
+
Pool.shutdown pool;
313
+
Printf.printf " PASSED\n%!"
314
+
315
+
let test_streaming_iter_with_pool () =
316
+
Printf.printf "\n=== Test: Streaming iter with Pool (Lwt) ===\n%!";
317
+
318
+
let pool = Pool.create (pool_config ()) in
319
+
320
+
(match Pool.acquire pool with
321
+
| Ok conn ->
322
+
setup_streaming_data conn;
323
+
Pool.release pool conn
324
+
| Error e -> failwith (Pool.error_to_string e));
325
+
326
+
let count = Atomic.make 0 in
327
+
let sum = Atomic.make 0 in
328
+
329
+
let n_workers = 5 in
330
+
331
+
let worker _id =
332
+
Lwt_preemptive.detach
333
+
(fun () ->
334
+
match Pool.acquire pool with
335
+
| Ok conn ->
336
+
(match
337
+
Driver.query_iter conn
338
+
"SELECT value FROM numbers WHERE value <= 100" ~params:[||]
339
+
~f:(fun row ->
340
+
Atomic.incr count;
341
+
let _ =
342
+
Atomic.fetch_and_add sum (Repodb.Driver.row_int row 0)
343
+
in
344
+
())
345
+
with
346
+
| Ok () -> ()
347
+
| Error e -> failwith (Driver.error_message e));
348
+
Pool.release pool conn
349
+
| Error e -> failwith (Pool.error_to_string e))
350
+
()
351
+
in
352
+
353
+
Lwt_main.run (Lwt.join (List.init n_workers worker));
354
+
355
+
let expected_sum = n_workers * (100 * 101) / 2 in
356
+
let actual_sum = Atomic.get sum in
357
+
Printf.printf " Total rows processed: %d\n%!" (Atomic.get count);
358
+
Printf.printf " Combined sum: %d (expected %d)\n%!" actual_sum expected_sum;
359
+
360
+
if actual_sum <> expected_sum then
361
+
failwith
362
+
(Printf.sprintf "Sum mismatch: got %d, expected %d" actual_sum
363
+
expected_sum);
364
+
365
+
Pool.shutdown pool;
366
+
Printf.printf " PASSED\n%!"
367
+
368
+
let () =
369
+
Printf.printf "========================================\n%!";
370
+
Printf.printf "repodb Pool + Lwt Integration Tests\n%!";
371
+
Printf.printf "========================================\n%!";
372
+
373
+
(try Sys.remove db_path with Sys_error _ -> ());
374
+
375
+
test_concurrent_increments ();
376
+
test_blocking_acquire ();
377
+
test_with_connection_pattern ();
378
+
test_pool_stats ();
379
+
test_streaming_with_pool ();
380
+
test_streaming_iter_with_pool ();
381
+
382
+
(try Sys.remove db_path with Sys_error _ -> ());
383
+
384
+
Printf.printf "\n========================================\n%!";
385
+
Printf.printf "All Lwt pool tests PASSED!\n%!";
386
+
Printf.printf "========================================\n%!"