swim protocol in ocaml interoperable with membership lib and serf cli
1type node_id = Node_id of string [@@unboxed]
2
3let node_id_to_string (Node_id s) = s
4let node_id_of_string s = Node_id s
5let equal_node_id (Node_id a) (Node_id b) = String.equal a b
6let compare_node_id (Node_id a) (Node_id b) = String.compare a b
7
8type incarnation = Incarnation of int [@@unboxed]
9
10let incarnation_to_int (Incarnation i) = i
11let incarnation_of_int i = Incarnation i
12let zero_incarnation = Incarnation 0
13let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b
14let incr_incarnation (Incarnation i) = Incarnation (i + 1)
15
16type addr = Eio.Net.Sockaddr.datagram
17type node_info = { id : node_id; addr : addr; meta : string }
18
19let make_node_info ~id ~addr ~meta = { id; addr; meta }
20
21type member_state = Alive | Suspect | Dead | Left
22
23let member_state_to_string = function
24 | Alive -> "alive"
25 | Suspect -> "suspect"
26 | Dead -> "dead"
27 | Left -> "left"
28
29let member_state_to_int = function
30 | Alive -> 0
31 | Suspect -> 1
32 | Dead -> 2
33 | Left -> 3
34
35let member_state_of_int = function
36 | 0 -> Alive
37 | 1 -> Suspect
38 | 2 -> Dead
39 | _ -> Left
40
41type member_snapshot = {
42 node : node_info;
43 state : member_state;
44 incarnation : incarnation;
45 state_change : Mtime.span;
46}
47
48type protocol_msg =
49 | Ping of { seq : int; target : node_id; sender : node_info }
50 | Ping_req of { seq : int; target : node_id; sender : node_info }
51 | Ack of { seq : int; responder : node_info; payload : string option }
52 | Alive of { node : node_info; incarnation : incarnation }
53 | Suspect of {
54 node : node_id;
55 incarnation : incarnation;
56 suspector : node_id;
57 }
58 | Dead of { node : node_id; incarnation : incarnation; declarator : node_id }
59 | User_msg of { topic : string; payload : string; origin : node_id }
60
61type packet = {
62 cluster : string;
63 primary : protocol_msg;
64 piggyback : protocol_msg list;
65}
66
67type decode_error =
68 | Invalid_magic
69 | Unsupported_version of int
70 | Truncated_message
71 | Invalid_tag of int
72 | Decryption_failed
73 | Msgpack_error of string
74 | Invalid_crc
75
76let decode_error_to_string = function
77 | Invalid_magic -> "invalid magic bytes"
78 | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v
79 | Truncated_message -> "truncated message"
80 | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t
81 | Decryption_failed -> "decryption failed"
82 | Msgpack_error s -> Printf.sprintf "msgpack error: %s" s
83 | Invalid_crc -> "invalid CRC checksum"
84
85type send_error = Node_unreachable | Timeout | Connection_reset
86
87let send_error_to_string = function
88 | Node_unreachable -> "node unreachable"
89 | Timeout -> "timeout"
90 | Connection_reset -> "connection reset"
91
92type node_event =
93 | Join of node_info
94 | Leave of node_info
95 | Update of node_info
96 | Suspect_event of node_info
97 | Alive_event of node_info
98
99type config = {
100 bind_addr : string;
101 bind_port : int;
102 node_name : string option;
103 protocol_interval : float;
104 probe_timeout : float;
105 indirect_checks : int;
106 suspicion_mult : int;
107 suspicion_max_timeout : float;
108 retransmit_mult : int;
109 udp_buffer_size : int;
110 tcp_timeout : float;
111 send_buffer_count : int;
112 recv_buffer_count : int;
113 secret_key : string;
114 cluster_name : string;
115 label : string;
116 encryption_enabled : bool;
117 gossip_verify_incoming : bool;
118 gossip_verify_outgoing : bool;
119 max_gossip_queue_depth : int;
120}
121
122let default_config =
123 {
124 bind_addr = "0.0.0.0";
125 bind_port = 7946;
126 node_name = None;
127 protocol_interval = 1.0;
128 probe_timeout = 0.5;
129 indirect_checks = 3;
130 suspicion_mult = 4;
131 suspicion_max_timeout = 60.0;
132 retransmit_mult = 4;
133 udp_buffer_size = 1400;
134 tcp_timeout = 10.0;
135 send_buffer_count = 16;
136 recv_buffer_count = 16;
137 secret_key = String.make 16 '\x00';
138 cluster_name = "default";
139 label = "";
140 encryption_enabled = false;
141 gossip_verify_incoming = true;
142 gossip_verify_outgoing = true;
143 max_gossip_queue_depth = 5000;
144 }
145
146type 'a env = {
147 stdenv : 'a;
148 sw : Eio.Switch.t;
149}
150 constraint
151 'a =
152 < clock : _ Eio.Time.clock
153 ; mono_clock : _ Eio.Time.Mono.t
154 ; net : _ Eio.Net.t
155 ; secure_random : _ Eio.Flow.source
156 ; .. >
157
158type stats = {
159 nodes_alive : int;
160 nodes_suspect : int;
161 nodes_dead : int;
162 msgs_sent : int;
163 msgs_received : int;
164 msgs_dropped : int;
165 queue_depth : int;
166 buffers_available : int;
167 buffers_total : int;
168}
169
170let empty_stats =
171 {
172 nodes_alive = 0;
173 nodes_suspect = 0;
174 nodes_dead = 0;
175 msgs_sent = 0;
176 msgs_received = 0;
177 msgs_dropped = 0;
178 queue_depth = 0;
179 buffers_available = 0;
180 buffers_total = 0;
181 }
182
183module Wire = struct
184 type message_type =
185 | Ping_msg
186 | Indirect_ping_msg
187 | Ack_resp_msg
188 | Suspect_msg
189 | Alive_msg
190 | Dead_msg
191 | Push_pull_msg
192 | Compound_msg
193 | User_msg
194 | Compress_msg
195 | Encrypt_msg
196 | Nack_resp_msg
197 | Has_crc_msg
198 | Err_msg
199 | Has_label_msg
200
201 let message_type_to_int = function
202 | Ping_msg -> 0
203 | Indirect_ping_msg -> 1
204 | Ack_resp_msg -> 2
205 | Suspect_msg -> 3
206 | Alive_msg -> 4
207 | Dead_msg -> 5
208 | Push_pull_msg -> 6
209 | Compound_msg -> 7
210 | User_msg -> 8
211 | Compress_msg -> 9
212 | Encrypt_msg -> 10
213 | Nack_resp_msg -> 11
214 | Has_crc_msg -> 12
215 | Err_msg -> 13
216 | Has_label_msg -> 244
217
218 let message_type_of_int = function
219 | 0 -> Ok Ping_msg
220 | 1 -> Ok Indirect_ping_msg
221 | 2 -> Ok Ack_resp_msg
222 | 3 -> Ok Suspect_msg
223 | 4 -> Ok Alive_msg
224 | 5 -> Ok Dead_msg
225 | 6 -> Ok Push_pull_msg
226 | 7 -> Ok Compound_msg
227 | 8 -> Ok User_msg
228 | 9 -> Ok Compress_msg
229 | 10 -> Ok Encrypt_msg
230 | 11 -> Ok Nack_resp_msg
231 | 12 -> Ok Has_crc_msg
232 | 13 -> Ok Err_msg
233 | 244 -> Ok Has_label_msg
234 | n -> Error n
235
236 type ping = {
237 seq_no : int;
238 node : string;
239 source_addr : string;
240 source_port : int;
241 source_node : string;
242 }
243
244 type indirect_ping_req = {
245 seq_no : int;
246 target : string;
247 port : int;
248 node : string;
249 nack : bool;
250 source_addr : string;
251 source_port : int;
252 source_node : string;
253 }
254
255 type ack_resp = { seq_no : int; payload : string }
256 type nack_resp = { seq_no : int }
257 type suspect = { incarnation : int; node : string; from : string }
258
259 type alive = {
260 incarnation : int;
261 node : string;
262 addr : string;
263 port : int;
264 meta : string;
265 vsn : int list;
266 }
267
268 type dead = { incarnation : int; node : string; from : string }
269 type compress = { algo : int; buf : string }
270
271 type push_pull_header = {
272 pp_nodes : int;
273 pp_user_state_len : int;
274 pp_join : bool;
275 }
276
277 type push_node_state = {
278 pns_name : string;
279 pns_addr : string;
280 pns_port : int;
281 pns_meta : string;
282 pns_incarnation : int;
283 pns_state : int;
284 pns_vsn : int list;
285 }
286
287 type protocol_msg =
288 | Ping of ping
289 | Indirect_ping of indirect_ping_req
290 | Ack of ack_resp
291 | Nack of nack_resp
292 | Suspect of suspect
293 | Alive of alive
294 | Dead of dead
295 | User_data of string
296 | Compound of string list
297 | Compressed of compress
298 | Err of string
299end
300
301let ip_to_bytes ip =
302 let s = Fmt.to_to_string Eio.Net.Ipaddr.pp ip in
303 if String.contains s ':' then (
304 let parts = String.split_on_char ':' s in
305 let buf = Bytes.create 16 in
306 let rec fill idx = function
307 | [] -> ()
308 | "" :: rest when List.exists (( = ) "") rest ->
309 let tail_len = List.length (List.filter (( <> ) "") rest) in
310 let zeros = 8 - idx - tail_len in
311 for i = 0 to (zeros * 2) - 1 do
312 Bytes.set_uint8 buf ((idx * 2) + i) 0
313 done;
314 fill (idx + zeros) rest
315 | "" :: rest -> fill idx rest
316 | h :: rest ->
317 let v = int_of_string ("0x" ^ h) in
318 Bytes.set_uint8 buf (idx * 2) (v lsr 8);
319 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff);
320 fill (idx + 1) rest
321 in
322 fill 0 parts;
323 Bytes.to_string buf)
324 else
325 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d ->
326 let buf = Bytes.create 4 in
327 Bytes.set_uint8 buf 0 a;
328 Bytes.set_uint8 buf 1 b;
329 Bytes.set_uint8 buf 2 c;
330 Bytes.set_uint8 buf 3 d;
331 Bytes.to_string buf)
332
333let ip_of_bytes s =
334 let len = String.length s in
335 if len = 4 then Eio.Net.Ipaddr.of_raw s
336 else if len = 16 then Eio.Net.Ipaddr.of_raw s
337 else failwith "invalid IP address length"
338
339let default_vsn = [ 1; 5; 5; 0; 0; 0 ]
340
341let node_info_to_wire (info : node_info) ~source_node :
342 string * int * string * string =
343 match info.addr with
344 | `Udp (ip, port) -> (ip_to_bytes ip, port, info.meta, source_node)
345 | `Unix _ -> failwith "Unix sockets not supported"
346
347let node_info_of_wire ~name ~addr ~port ~meta : node_info =
348 let ip = ip_of_bytes addr in
349 { id = node_id_of_string name; addr = `Udp (ip, port); meta }
350
351let msg_to_wire ~self_name ~self_port (msg : protocol_msg) : Wire.protocol_msg =
352 match msg with
353 | Ping { seq; target; sender } ->
354 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in
355 Wire.Ping
356 {
357 seq_no = seq;
358 node = node_id_to_string target;
359 source_addr = addr;
360 source_port = port;
361 source_node = self_name;
362 }
363 | Ping_req { seq; target; sender } ->
364 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in
365 let target_addr =
366 match sender.addr with `Udp (ip, _) -> ip_to_bytes ip | `Unix _ -> ""
367 in
368 Wire.Indirect_ping
369 {
370 seq_no = seq;
371 target = target_addr;
372 port = self_port;
373 node = node_id_to_string target;
374 nack = true;
375 source_addr = addr;
376 source_port = port;
377 source_node = self_name;
378 }
379 | Ack { seq; responder = _; payload } ->
380 Wire.Ack { seq_no = seq; payload = Option.value payload ~default:"" }
381 | Alive { node; incarnation } ->
382 let addr, port, meta, _ = node_info_to_wire node ~source_node:"" in
383 Wire.Alive
384 {
385 incarnation = incarnation_to_int incarnation;
386 node = node_id_to_string node.id;
387 addr;
388 port;
389 meta;
390 vsn = default_vsn;
391 }
392 | Suspect { node; incarnation; suspector } ->
393 Wire.Suspect
394 {
395 incarnation = incarnation_to_int incarnation;
396 node = node_id_to_string node;
397 from = node_id_to_string suspector;
398 }
399 | Dead { node; incarnation; declarator } ->
400 Wire.Dead
401 {
402 incarnation = incarnation_to_int incarnation;
403 node = node_id_to_string node;
404 from = node_id_to_string declarator;
405 }
406 | User_msg { topic; payload; origin } ->
407 let origin_str = node_id_to_string origin in
408 let topic_len = String.length topic in
409 let origin_len = String.length origin_str in
410 let encoded =
411 String.concat ""
412 [
413 string_of_int topic_len;
414 ":";
415 topic;
416 string_of_int origin_len;
417 ":";
418 origin_str;
419 payload;
420 ]
421 in
422 Wire.User_data encoded
423
424let msg_of_wire ~default_port (wmsg : Wire.protocol_msg) : protocol_msg option =
425 match wmsg with
426 | Wire.Ping { seq_no; node; source_addr; source_port; source_node } ->
427 let port = if source_port > 0 then source_port else default_port in
428 let ip =
429 if String.length source_addr > 0 then ip_of_bytes source_addr
430 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
431 in
432 let sender =
433 {
434 id =
435 node_id_of_string (if source_node <> "" then source_node else node);
436 addr = `Udp (ip, port);
437 meta = "";
438 }
439 in
440 Some (Ping { seq = seq_no; target = node_id_of_string node; sender })
441 | Wire.Indirect_ping
442 { seq_no; target; port; node; source_addr; source_port; source_node; _ }
443 ->
444 let src_port = if source_port > 0 then source_port else default_port in
445 let ip =
446 if String.length source_addr > 0 then ip_of_bytes source_addr
447 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
448 in
449 let sender =
450 {
451 id = node_id_of_string (if source_node <> "" then source_node else "");
452 addr = `Udp (ip, src_port);
453 meta = "";
454 }
455 in
456 let _ = target in
457 let _ = port in
458 Some (Ping_req { seq = seq_no; target = node_id_of_string node; sender })
459 | Wire.Ack { seq_no; payload } ->
460 let responder =
461 {
462 id = node_id_of_string "";
463 addr = `Udp (Eio.Net.Ipaddr.of_raw "\000\000\000\000", 0);
464 meta = "";
465 }
466 in
467 let payload = if payload = "" then None else Some payload in
468 Some (Ack { seq = seq_no; responder; payload })
469 | Wire.Alive { incarnation; node; addr; port; meta; _ } ->
470 let ip =
471 if String.length addr > 0 then ip_of_bytes addr
472 else Eio.Net.Ipaddr.of_raw "\000\000\000\000"
473 in
474 let node_info =
475 { id = node_id_of_string node; addr = `Udp (ip, port); meta }
476 in
477 Some
478 (Alive
479 { node = node_info; incarnation = incarnation_of_int incarnation })
480 | Wire.Suspect { incarnation; node; from } ->
481 Some
482 (Suspect
483 {
484 node = node_id_of_string node;
485 incarnation = incarnation_of_int incarnation;
486 suspector = node_id_of_string from;
487 })
488 | Wire.Dead { incarnation; node; from } ->
489 Some
490 (Dead
491 {
492 node = node_id_of_string node;
493 incarnation = incarnation_of_int incarnation;
494 declarator = node_id_of_string from;
495 })
496 | Wire.User_data encoded -> (
497 let parse_length s start =
498 let rec find_colon i =
499 if i >= String.length s then None
500 else if s.[i] = ':' then Some i
501 else find_colon (i + 1)
502 in
503 match find_colon start with
504 | None -> None
505 | Some colon_pos -> (
506 let len_str = String.sub s start (colon_pos - start) in
507 match int_of_string_opt len_str with
508 | None -> None
509 | Some len -> Some (len, colon_pos + 1))
510 in
511 match parse_length encoded 0 with
512 | None ->
513 Some
514 (User_msg
515 { topic = ""; payload = encoded; origin = node_id_of_string "" })
516 | Some (topic_len, topic_start) -> (
517 if topic_start + topic_len > String.length encoded then
518 Some
519 (User_msg
520 {
521 topic = "";
522 payload = encoded;
523 origin = node_id_of_string "";
524 })
525 else
526 let topic = String.sub encoded topic_start topic_len in
527 let origin_start = topic_start + topic_len in
528 match parse_length encoded origin_start with
529 | None ->
530 Some
531 (User_msg
532 { topic; payload = ""; origin = node_id_of_string "" })
533 | Some (origin_len, payload_start) ->
534 if payload_start + origin_len > String.length encoded then
535 Some
536 (User_msg
537 { topic; payload = ""; origin = node_id_of_string "" })
538 else
539 let origin = String.sub encoded payload_start origin_len in
540 let data_start = payload_start + origin_len in
541 let payload =
542 String.sub encoded data_start
543 (String.length encoded - data_start)
544 in
545 Some
546 (User_msg
547 { topic; payload; origin = node_id_of_string origin })))
548 | Wire.Nack _ -> None
549 | Wire.Compound _ -> None
550 | Wire.Compressed _ -> None
551 | Wire.Err _ -> None