swim protocol in ocaml interoperable with membership lib and serf cli
1open Types
2
3type item = {
4 msg : protocol_msg;
5 transmits : int Kcas.Loc.t;
6 created : Mtime.span;
7}
8
9type t = { queue : item Kcas_data.Queue.t; depth : int Kcas.Loc.t }
10
11let create () = { queue = Kcas_data.Queue.create (); depth = Kcas.Loc.make 0 }
12
13let enqueue t msg ~transmits ~created ~limit =
14 let item = { msg; transmits = Kcas.Loc.make transmits; created } in
15 Kcas.Xt.commit
16 {
17 tx =
18 (fun ~xt ->
19 let d = Kcas.Xt.get ~xt t.depth in
20 if d >= limit then ignore (Kcas_data.Queue.Xt.take_opt ~xt t.queue)
21 else Kcas.Xt.set ~xt t.depth (d + 1);
22 Kcas_data.Queue.Xt.add ~xt item t.queue);
23 }
24
25let depth t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.depth) }
26
27let drain t ~max_bytes ~encode_size =
28 let rec loop acc bytes_used =
29 let result =
30 Kcas.Xt.commit
31 {
32 tx =
33 (fun ~xt ->
34 match Kcas_data.Queue.Xt.take_opt ~xt t.queue with
35 | None -> `Done (List.rev acc)
36 | Some item ->
37 let msg_size = encode_size item.msg in
38 if bytes_used + msg_size > max_bytes && acc <> [] then begin
39 Kcas_data.Queue.Xt.add ~xt item t.queue;
40 `Done (List.rev acc)
41 end
42 else begin
43 let remaining = Kcas.Xt.get ~xt item.transmits - 1 in
44 if remaining > 0 then begin
45 Kcas.Xt.set ~xt item.transmits remaining;
46 Kcas_data.Queue.Xt.add ~xt item t.queue
47 end
48 else Kcas.Xt.modify ~xt t.depth pred;
49 `Continue (item.msg, msg_size)
50 end);
51 }
52 in
53 match result with
54 | `Done msgs -> msgs
55 | `Continue (msg, msg_size) -> loop (msg :: acc) (bytes_used + msg_size)
56 in
57 loop [] 0
58
59let invalidate t ~invalidates newer_msg =
60 let items =
61 Kcas.Xt.commit
62 { tx = (fun ~xt -> Kcas_data.Queue.Xt.to_seq ~xt t.queue |> List.of_seq) }
63 in
64 let valid_items, removed_count =
65 List.fold_left
66 (fun (valid, removed) item ->
67 if invalidates ~newer:newer_msg ~older:item.msg then (valid, removed + 1)
68 else (item :: valid, removed))
69 ([], 0) items
70 in
71 if removed_count > 0 then begin
72 Kcas.Xt.commit
73 {
74 tx =
75 (fun ~xt ->
76 Kcas_data.Queue.Xt.clear ~xt t.queue;
77 List.iter
78 (fun item -> Kcas_data.Queue.Xt.add ~xt item t.queue)
79 (List.rev valid_items);
80 Kcas.Xt.modify ~xt t.depth (fun d -> d - removed_count));
81 }
82 end