swim protocol in ocaml interoperable with membership lib and serf cli
1open Swim.Types
2module Buffer_pool = Swim.Buffer_pool
3module Membership = Swim.Membership
4module Member = Membership.Member
5module Pending_acks = Swim.Pending_acks
6
7let node1 =
8 make_node_info
9 ~id:(node_id_of_string "node1")
10 ~addr:(`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", 7946))
11 ~meta:""
12
13let node2 =
14 make_node_info
15 ~id:(node_id_of_string "node2")
16 ~addr:(`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\002", 7946))
17 ~meta:""
18
19let node3 =
20 make_node_info
21 ~id:(node_id_of_string "node3")
22 ~addr:(`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\003", 7946))
23 ~meta:""
24
25let now = Mtime.Span.of_uint64_ns 0L
26
27let test_buffer_pool_acquire_release () =
28 let pool = Buffer_pool.create ~size:1024 ~count:4 in
29 Alcotest.(check int) "initial available" 4 (Buffer_pool.available pool);
30 let buf1 = Buffer_pool.acquire pool in
31 Alcotest.(check int) "after acquire" 3 (Buffer_pool.available pool);
32 Alcotest.(check int) "buffer size" 1024 (Cstruct.length buf1);
33 Buffer_pool.release pool buf1;
34 Alcotest.(check int) "after release" 4 (Buffer_pool.available pool)
35
36let test_buffer_pool_with_buffer () =
37 let pool = Buffer_pool.create ~size:512 ~count:2 in
38 let result =
39 Buffer_pool.with_buffer pool (fun buf ->
40 Alcotest.(check int) "inside with_buffer" 1 (Buffer_pool.available pool);
41 Cstruct.length buf)
42 in
43 Alcotest.(check int) "returned value" 512 result;
44 Alcotest.(check int) "after with_buffer" 2 (Buffer_pool.available pool)
45
46let test_buffer_pool_exception_safe () =
47 let pool = Buffer_pool.create ~size:256 ~count:2 in
48 (try
49 Buffer_pool.with_buffer pool (fun _buf ->
50 Alcotest.(check int) "during exception" 1 (Buffer_pool.available pool);
51 failwith "test exception")
52 with Failure _ -> ());
53 Alcotest.(check int) "after exception" 2 (Buffer_pool.available pool)
54
55let test_buffer_pool_all_buffers () =
56 let pool = Buffer_pool.create ~size:64 ~count:3 in
57 let b1 = Buffer_pool.acquire pool in
58 let b2 = Buffer_pool.acquire pool in
59 let b3 = Buffer_pool.acquire pool in
60 Alcotest.(check int) "all acquired" 0 (Buffer_pool.available pool);
61 Buffer_pool.release pool b1;
62 Buffer_pool.release pool b2;
63 Buffer_pool.release pool b3;
64 Alcotest.(check int) "all released" 3 (Buffer_pool.available pool)
65
66let test_membership_add_find () =
67 let table = Membership.create () in
68 let member = Member.create ~now node1 in
69 Membership.add table member;
70 Alcotest.(check int) "count" 1 (Membership.count table);
71 match Membership.find table node1.id with
72 | Some m ->
73 Alcotest.(check bool)
74 "same node" true
75 (equal_node_id (Member.node m).id node1.id)
76 | None -> Alcotest.fail "member not found"
77
78let test_membership_remove () =
79 let table = Membership.create () in
80 let member = Member.create ~now node1 in
81 Membership.add table member;
82 Alcotest.(check bool)
83 "remove existing" true
84 (Membership.remove table node1.id);
85 Alcotest.(check int) "count after remove" 0 (Membership.count table);
86 Alcotest.(check bool)
87 "remove non-existing" false
88 (Membership.remove table node1.id)
89
90let test_membership_to_list () =
91 let table = Membership.create () in
92 Membership.add table (Member.create ~now node1);
93 Membership.add table (Member.create ~now node2);
94 Membership.add table (Member.create ~now node3);
95 let members = Membership.to_list table in
96 Alcotest.(check int) "list length" 3 (List.length members)
97
98let test_membership_snapshot_consistency () =
99 let table = Membership.create () in
100 Membership.add table (Member.create ~now node1);
101 let snapshots = Membership.snapshot_all table in
102 Alcotest.(check int) "snapshot count" 1 (List.length snapshots);
103 let snap = List.hd snapshots in
104 Alcotest.(check bool)
105 "node id matches" true
106 (equal_node_id snap.node.id node1.id)
107
108let test_membership_count_accurate () =
109 let table = Membership.create () in
110 Alcotest.(check int) "empty" 0 (Membership.count table);
111 Membership.add table (Member.create ~now node1);
112 Alcotest.(check int) "after add 1" 1 (Membership.count table);
113 Membership.add table (Member.create ~now node2);
114 Alcotest.(check int) "after add 2" 2 (Membership.count table);
115 let _ = Membership.remove table node1.id in
116 Alcotest.(check int) "after remove" 1 (Membership.count table)
117
118let test_membership_no_duplicates () =
119 let table = Membership.create () in
120 Membership.add table (Member.create ~now node1);
121 Membership.add table (Member.create ~now node1);
122 Alcotest.(check int) "no duplicate" 1 (Membership.count table)
123
124let test_pending_acks_register_cancel () =
125 let pa = Pending_acks.create () in
126 let _ = Pending_acks.register pa ~seq:123 in
127 Alcotest.(check int) "pending before" 1 (Pending_acks.pending_count pa);
128 Pending_acks.cancel pa ~seq:123;
129 Alcotest.(check int) "pending after" 0 (Pending_acks.pending_count pa)
130
131let test_pending_acks_complete_not_found () =
132 let pa = Pending_acks.create () in
133 let completed = Pending_acks.complete pa ~seq:999 ~payload:None in
134 Alcotest.(check bool) "not found" false completed
135
136let test_member_state_transitions () =
137 let table = Membership.create () in
138 let member = Member.create ~now node1 in
139 Membership.add table member;
140 let updated =
141 Membership.update_member table node1.id
142 {
143 update =
144 (fun m ~xt ->
145 Member.set_suspect m ~incarnation:(incarnation_of_int 5) ~now ~xt);
146 }
147 in
148 Alcotest.(check bool) "updated" true updated;
149 match Membership.find table node1.id with
150 | Some m ->
151 let snap = Member.snapshot_now m in
152 Alcotest.(check string)
153 "state suspect" "suspect"
154 (member_state_to_string snap.state);
155 Alcotest.(check int) "incarnation" 5 (incarnation_to_int snap.incarnation)
156 | None -> Alcotest.fail "member not found"
157
158let test_member_set_alive () =
159 let table = Membership.create () in
160 let member = Member.create ~now node1 in
161 Membership.add table member;
162 let _ =
163 Membership.update_member table node1.id
164 {
165 update =
166 (fun m ~xt ->
167 Member.set_suspect m ~incarnation:(incarnation_of_int 1) ~now ~xt);
168 }
169 in
170 let _ =
171 Membership.update_member table node1.id
172 {
173 update =
174 (fun m ~xt ->
175 Member.set_alive m ~incarnation:(incarnation_of_int 2) ~now ~xt);
176 }
177 in
178 match Membership.find table node1.id with
179 | Some m ->
180 let snap = Member.snapshot_now m in
181 Alcotest.(check string)
182 "state alive" "alive"
183 (member_state_to_string snap.state)
184 | None -> Alcotest.fail "member not found"
185
186let test_member_set_dead () =
187 let table = Membership.create () in
188 let member = Member.create ~now node1 in
189 Membership.add table member;
190 let _ =
191 Membership.update_member table node1.id
192 {
193 update =
194 (fun m ~xt ->
195 Member.set_dead m ~incarnation:(incarnation_of_int 10) ~now ~xt);
196 }
197 in
198 match Membership.find table node1.id with
199 | Some m ->
200 let snap = Member.snapshot_now m in
201 Alcotest.(check string)
202 "state dead" "dead"
203 (member_state_to_string snap.state)
204 | None -> Alcotest.fail "member not found"
205
206let () =
207 Eio_main.run @@ fun _env ->
208 Alcotest.run "kcas"
209 [
210 ( "buffer_pool",
211 [
212 ("acquire_release", `Quick, test_buffer_pool_acquire_release);
213 ("with_buffer", `Quick, test_buffer_pool_with_buffer);
214 ("exception_safe", `Quick, test_buffer_pool_exception_safe);
215 ("all_buffers", `Quick, test_buffer_pool_all_buffers);
216 ] );
217 ( "membership",
218 [
219 ("add_find", `Quick, test_membership_add_find);
220 ("remove", `Quick, test_membership_remove);
221 ("to_list", `Quick, test_membership_to_list);
222 ("snapshot_consistency", `Quick, test_membership_snapshot_consistency);
223 ("count_accurate", `Quick, test_membership_count_accurate);
224 ("no_duplicates", `Quick, test_membership_no_duplicates);
225 ] );
226 ( "pending_acks",
227 [
228 ("register_cancel", `Quick, test_pending_acks_register_cancel);
229 ("complete_not_found", `Quick, test_pending_acks_complete_not_found);
230 ] );
231 ( "member_transitions",
232 [
233 ("state_transitions", `Quick, test_member_state_transitions);
234 ("set_alive", `Quick, test_member_set_alive);
235 ("set_dead", `Quick, test_member_set_dead);
236 ] );
237 ]