+8
-59
lib/buffer_pool.ml
+8
-59
lib/buffer_pool.ml
···
1
-
(** Lock-free buffer pool using Kcas and Eio.
2
-
3
-
Provides pre-allocated buffers for zero-copy I/O operations. Uses
4
-
Kcas_data.Queue for lock-free buffer storage and Eio.Semaphore for blocking
5
-
acquire when pool is exhausted. *)
6
-
7
-
type t = {
8
-
buffers : Cstruct.t Kcas_data.Queue.t;
9
-
buf_size : int;
10
-
total : int;
11
-
semaphore : Eio.Semaphore.t;
12
-
}
1
+
type t = { pool : Cstruct.t Eio.Stream.t; buf_size : int; capacity : int }
13
2
14
3
let create ~size ~count =
15
-
let buffers = Kcas_data.Queue.create () in
4
+
let pool = Eio.Stream.create count in
16
5
for _ = 1 to count do
17
-
Kcas.Xt.commit
18
-
{
19
-
tx =
20
-
(fun ~xt -> Kcas_data.Queue.Xt.add ~xt (Cstruct.create size) buffers);
21
-
}
6
+
Eio.Stream.add pool (Cstruct.create size)
22
7
done;
23
-
{
24
-
buffers;
25
-
buf_size = size;
26
-
total = count;
27
-
semaphore = Eio.Semaphore.make count;
28
-
}
8
+
{ pool; buf_size = size; capacity = count }
29
9
30
-
let acquire t =
31
-
Eio.Semaphore.acquire t.semaphore;
32
-
let buf_opt =
33
-
Kcas.Xt.commit
34
-
{ tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
35
-
in
36
-
match buf_opt with
37
-
| Some buf -> buf
38
-
| None ->
39
-
(* Should not happen if semaphore is properly synchronized,
40
-
but handle gracefully by allocating a new buffer *)
41
-
Cstruct.create t.buf_size
42
-
43
-
let try_acquire t =
44
-
(* Check if semaphore has available permits without blocking *)
45
-
if Eio.Semaphore.get_value t.semaphore > 0 then begin
46
-
(* Race condition possible here - another fiber might acquire between
47
-
get_value and acquire. In that case, acquire will block briefly.
48
-
For truly non-blocking behavior, we'd need atomic CAS on semaphore. *)
49
-
Eio.Semaphore.acquire t.semaphore;
50
-
let buf_opt =
51
-
Kcas.Xt.commit
52
-
{ tx = (fun ~xt -> Kcas_data.Queue.Xt.take_opt ~xt t.buffers) }
53
-
in
54
-
match buf_opt with
55
-
| Some buf -> Some buf
56
-
| None -> Some (Cstruct.create t.buf_size)
57
-
end
58
-
else None
59
-
60
-
let release t buf =
61
-
Kcas.Xt.commit { tx = (fun ~xt -> Kcas_data.Queue.Xt.add ~xt buf t.buffers) };
62
-
Eio.Semaphore.release t.semaphore
10
+
let acquire t = Eio.Stream.take t.pool
11
+
let release t buf = Eio.Stream.add t.pool buf
63
12
64
13
let with_buffer t f =
65
14
let buf = acquire t in
66
15
Fun.protect ~finally:(fun () -> release t buf) (fun () -> f buf)
67
16
68
-
let available t = Eio.Semaphore.get_value t.semaphore
69
-
let total t = t.total
17
+
let available t = Eio.Stream.length t.pool
18
+
let total t = t.capacity
70
19
let size t = t.buf_size