swim protocol in ocaml interoperable with membership lib and serf cli
1open Swim.Types
2
3let gen_node_id : node_id QCheck.Gen.t =
4 let open QCheck.Gen in
5 let+ id =
6 oneof_weighted
7 [
8 (3, string_size ~gen:printable (int_range 1 64));
9 (1, return "");
10 (1, string_size ~gen:printable (return 255));
11 ]
12 in
13 node_id_of_string id
14
15let gen_incarnation : incarnation QCheck.Gen.t =
16 let open QCheck.Gen in
17 let+ i =
18 oneof_weighted
19 [ (5, int_range 0 1000); (2, int_range 0 max_int); (1, return 0) ]
20 in
21 incarnation_of_int i
22
23let gen_member_state : member_state QCheck.Gen.t =
24 let open QCheck.Gen in
25 let alive : member_state = Alive in
26 let suspect : member_state = Suspect in
27 let dead : member_state = Dead in
28 oneof [ return alive; return suspect; return dead ]
29
30let gen_ipv4 : string QCheck.Gen.t =
31 let open QCheck.Gen in
32 let+ a = int_range 0 255
33 and+ b = int_range 0 255
34 and+ c = int_range 0 255
35 and+ d = int_range 0 255 in
36 Printf.sprintf "%d.%d.%d.%d" a b c d
37
38let gen_port : int QCheck.Gen.t =
39 let open QCheck.Gen in
40 oneof_weighted
41 [ (3, int_range 1024 65535); (1, int_range 1 1023); (1, return 7946) ]
42
43let gen_addr : addr QCheck.Gen.t =
44 let open QCheck.Gen in
45 let+ ip = gen_ipv4 and+ port = gen_port in
46 let ipaddr =
47 match Ipaddr.V4.of_string ip with
48 | Ok v4 -> v4
49 | Error _ -> Ipaddr.V4.localhost
50 in
51 `Udp (Eio.Net.Ipaddr.of_raw (Ipaddr.V4.to_octets ipaddr), port)
52
53let gen_meta : string QCheck.Gen.t =
54 let open QCheck.Gen in
55 oneof_weighted
56 [
57 (3, string_size ~gen:printable (int_range 0 256));
58 (1, return "");
59 (1, return (String.make 1024 'x'));
60 ]
61
62let gen_node_info : node_info QCheck.Gen.t =
63 let open QCheck.Gen in
64 let+ id = gen_node_id and+ addr = gen_addr and+ meta = gen_meta in
65 make_node_info ~id ~addr ~meta
66
67let gen_seq : int QCheck.Gen.t =
68 let open QCheck.Gen in
69 oneof_weighted
70 [ (5, int_range 0 10000); (2, int_range 0 max_int); (1, return 0) ]
71
72let gen_ping : protocol_msg QCheck.Gen.t =
73 let open QCheck.Gen in
74 let+ seq = gen_seq and+ target = gen_node_id and+ sender = gen_node_info in
75 Ping { seq; target; sender }
76
77let gen_ping_req : protocol_msg QCheck.Gen.t =
78 let open QCheck.Gen in
79 let+ seq = gen_seq and+ target = gen_node_id and+ sender = gen_node_info in
80 Ping_req { seq; target; sender }
81
82let gen_payload : string option QCheck.Gen.t =
83 let open QCheck.Gen in
84 oneof_weighted
85 [
86 (2, return None);
87 (3, map Option.some (string_size ~gen:printable (int_range 0 512)));
88 ]
89
90let gen_ack : protocol_msg QCheck.Gen.t =
91 let open QCheck.Gen in
92 let+ seq = gen_seq
93 and+ responder = gen_node_info
94 and+ payload = gen_payload in
95 Ack { seq; responder; payload }
96
97let gen_alive : protocol_msg QCheck.Gen.t =
98 let open QCheck.Gen in
99 let+ node = gen_node_info and+ incarnation = gen_incarnation in
100 Alive { node; incarnation }
101
102let gen_suspect : protocol_msg QCheck.Gen.t =
103 let open QCheck.Gen in
104 let+ node = gen_node_id
105 and+ incarnation = gen_incarnation
106 and+ suspector = gen_node_id in
107 Suspect { node; incarnation; suspector }
108
109let gen_dead : protocol_msg QCheck.Gen.t =
110 let open QCheck.Gen in
111 let+ node = gen_node_id
112 and+ incarnation = gen_incarnation
113 and+ declarator = gen_node_id in
114 Dead { node; incarnation; declarator }
115
116let gen_topic : string QCheck.Gen.t =
117 QCheck.Gen.string_size ~gen:QCheck.Gen.printable (QCheck.Gen.int_range 1 64)
118
119let gen_user_payload : string QCheck.Gen.t =
120 QCheck.Gen.string_size ~gen:QCheck.Gen.printable (QCheck.Gen.int_range 0 1024)
121
122let gen_user_msg : protocol_msg QCheck.Gen.t =
123 let open QCheck.Gen in
124 let+ topic = gen_topic
125 and+ payload = gen_user_payload
126 and+ origin = gen_node_id in
127 User_msg { topic; payload; origin }
128
129let gen_protocol_msg : protocol_msg QCheck.Gen.t =
130 QCheck.Gen.oneof
131 [
132 gen_ping;
133 gen_ping_req;
134 gen_ack;
135 gen_alive;
136 gen_suspect;
137 gen_dead;
138 gen_user_msg;
139 ]
140
141let gen_cluster_name : string QCheck.Gen.t =
142 let open QCheck.Gen in
143 oneof_weighted
144 [
145 (3, string_size ~gen:printable (int_range 1 32));
146 (1, return "default");
147 (1, return "test-cluster");
148 ]
149
150let gen_piggyback : protocol_msg list QCheck.Gen.t =
151 let open QCheck.Gen in
152 let piggyback_msg =
153 oneof [ gen_alive; gen_suspect; gen_dead; gen_user_msg ]
154 in
155 list_size (int_range 0 8) piggyback_msg
156
157let gen_packet : packet QCheck.Gen.t =
158 let open QCheck.Gen in
159 let+ cluster = gen_cluster_name
160 and+ primary = gen_protocol_msg
161 and+ piggyback = gen_piggyback in
162 { cluster; primary; piggyback }
163
164let gen_cstruct : Cstruct.t QCheck.Gen.t =
165 let open QCheck.Gen in
166 let+ len =
167 oneof_weighted
168 [ (3, int_range 0 1024); (1, return 0); (1, int_range 1024 4096) ]
169 and+ fill = char in
170 let cs = Cstruct.create len in
171 Cstruct.memset cs (Char.code fill);
172 cs
173
174let gen_cstruct_sized (size : int) : Cstruct.t QCheck.Gen.t =
175 let open QCheck.Gen in
176 let+ bytes = string_size ~gen:char (return size) in
177 Cstruct.of_string bytes
178
179let gen_config : config QCheck.Gen.t =
180 let open QCheck.Gen in
181 let+ bind_addr = gen_ipv4
182 and+ bind_port = gen_port
183 and+ node_name =
184 oneof_weighted [ (2, return None); (3, map Option.some gen_topic) ]
185 and+ protocol_interval = float_range 0.1 10.0
186 and+ probe_timeout = float_range 0.1 5.0
187 and+ indirect_checks = int_range 1 10
188 and+ suspicion_mult = int_range 1 10
189 and+ suspicion_max_timeout = float_range 10.0 120.0
190 and+ retransmit_mult = int_range 1 10
191 and+ udp_buffer_size =
192 oneof [ return 1400; return 1500; return 8192; return 65507 ]
193 and+ tcp_timeout = float_range 1.0 30.0
194 and+ send_buffer_count = int_range 4 64
195 and+ recv_buffer_count = int_range 4 64
196 and+ secret_key = gen_cstruct_sized 16
197 and+ cluster_name = gen_cluster_name
198 and+ label = oneof [ return ""; gen_topic ]
199 and+ encryption_enabled = bool
200 and+ gossip_verify_incoming = bool
201 and+ gossip_verify_outgoing = bool
202 and+ max_gossip_queue_depth = int_range 10 10000 in
203 {
204 bind_addr;
205 bind_port;
206 node_name;
207 protocol_interval;
208 probe_timeout;
209 indirect_checks;
210 suspicion_mult;
211 suspicion_max_timeout;
212 retransmit_mult;
213 udp_buffer_size;
214 tcp_timeout;
215 send_buffer_count;
216 recv_buffer_count;
217 secret_key = Cstruct.to_string secret_key;
218 cluster_name;
219 label;
220 encryption_enabled;
221 gossip_verify_incoming;
222 gossip_verify_outgoing;
223 max_gossip_queue_depth;
224 }
225
226let gen_decode_error : decode_error QCheck.Gen.t =
227 let open QCheck.Gen in
228 oneof
229 [
230 return Invalid_magic;
231 map (fun v -> Unsupported_version v) (int_range 0 255);
232 return Truncated_message;
233 map (fun t -> Invalid_tag t) (int_range 0 255);
234 return Decryption_failed;
235 ]
236
237let gen_send_error : send_error QCheck.Gen.t =
238 let open QCheck.Gen in
239 oneof [ return Node_unreachable; return Timeout; return Connection_reset ]
240
241let gen_mtime_span : Mtime.span QCheck.Gen.t =
242 let open QCheck.Gen in
243 let+ ns = map Int64.of_int (int_range 0 1_000_000_000) in
244 Mtime.Span.of_uint64_ns ns
245
246let gen_member_snapshot : member_snapshot QCheck.Gen.t =
247 let open QCheck.Gen in
248 let+ node = gen_node_info
249 and+ state = gen_member_state
250 and+ incarnation = gen_incarnation
251 and+ state_change = gen_mtime_span in
252 { node; state; incarnation; state_change }
253
254let arb_node_id : node_id QCheck.arbitrary =
255 QCheck.make ~print:(fun id -> node_id_to_string id) gen_node_id
256
257let arb_incarnation : incarnation QCheck.arbitrary =
258 QCheck.make
259 ~print:(fun inc -> string_of_int (incarnation_to_int inc))
260 ~shrink:(fun inc ->
261 let i = incarnation_to_int inc in
262 QCheck.Shrink.int i |> QCheck.Iter.map incarnation_of_int)
263 gen_incarnation
264
265let arb_member_state : member_state QCheck.arbitrary =
266 QCheck.make ~print:member_state_to_string gen_member_state
267
268let format_addr (addr : addr) : string =
269 match addr with
270 | `Udp (ip, port) -> Fmt.str "%a:%d" Eio.Net.Ipaddr.pp ip port
271 | `Unix path -> Printf.sprintf "unix:%s" path
272
273let format_node_info (ni : node_info) : string =
274 Printf.sprintf "{ id=%s; addr=%s; meta=%S }" (node_id_to_string ni.id)
275 (format_addr ni.addr) ni.meta
276
277let arb_node_info : node_info QCheck.arbitrary =
278 QCheck.make ~print:format_node_info gen_node_info
279
280let format_protocol_msg (msg : protocol_msg) : string =
281 match msg with
282 | Ping { seq; target; sender } ->
283 Printf.sprintf "Ping { seq=%d; target=%s; sender=%s }" seq
284 (node_id_to_string target) (format_node_info sender)
285 | Ping_req { seq; target; sender } ->
286 Printf.sprintf "Ping_req { seq=%d; target=%s; sender=%s }" seq
287 (node_id_to_string target) (format_node_info sender)
288 | Ack { seq; responder; payload } ->
289 Printf.sprintf "Ack { seq=%d; responder=%s; payload=%s }" seq
290 (format_node_info responder)
291 (match payload with
292 | None -> "None"
293 | Some p -> Printf.sprintf "Some %S" p)
294 | Alive { node; incarnation } ->
295 Printf.sprintf "Alive { node=%s; incarnation=%d }" (format_node_info node)
296 (incarnation_to_int incarnation)
297 | Suspect { node; incarnation; suspector } ->
298 Printf.sprintf "Suspect { node=%s; incarnation=%d; suspector=%s }"
299 (node_id_to_string node)
300 (incarnation_to_int incarnation)
301 (node_id_to_string suspector)
302 | Dead { node; incarnation; declarator } ->
303 Printf.sprintf "Dead { node=%s; incarnation=%d; declarator=%s }"
304 (node_id_to_string node)
305 (incarnation_to_int incarnation)
306 (node_id_to_string declarator)
307 | User_msg { topic; payload; origin } ->
308 Printf.sprintf "User_msg { topic=%S; payload=%S; origin=%s }" topic
309 payload (node_id_to_string origin)
310
311let arb_protocol_msg : protocol_msg QCheck.arbitrary =
312 QCheck.make ~print:format_protocol_msg gen_protocol_msg
313
314let format_packet (p : packet) : string =
315 Printf.sprintf "{ cluster=%S; primary=%s; piggyback=[%d msgs] }" p.cluster
316 (format_protocol_msg p.primary)
317 (List.length p.piggyback)
318
319let arb_packet : packet QCheck.arbitrary =
320 QCheck.make ~print:format_packet gen_packet
321
322let arb_cstruct : Cstruct.t QCheck.arbitrary =
323 QCheck.make
324 ~print:(fun cs -> Printf.sprintf "<cstruct len=%d>" (Cstruct.length cs))
325 gen_cstruct
326
327let arb_decode_error : decode_error QCheck.arbitrary =
328 QCheck.make ~print:decode_error_to_string gen_decode_error
329
330let arb_send_error : send_error QCheck.arbitrary =
331 QCheck.make ~print:send_error_to_string gen_send_error
332
333let arb_member_snapshot : member_snapshot QCheck.arbitrary =
334 QCheck.make
335 ~print:(fun ms ->
336 Printf.sprintf "{ node=%s; state=%s; incarnation=%d }"
337 (format_node_info ms.node)
338 (member_state_to_string ms.state)
339 (incarnation_to_int ms.incarnation))
340 gen_member_snapshot