swim protocol in ocaml interoperable with membership lib and serf cli
at main 16 kB view raw
1type node_id = Node_id of string [@@unboxed] 2 3let node_id_to_string (Node_id s) = s 4let node_id_of_string s = Node_id s 5let equal_node_id (Node_id a) (Node_id b) = String.equal a b 6let compare_node_id (Node_id a) (Node_id b) = String.compare a b 7 8type incarnation = Incarnation of int [@@unboxed] 9 10let incarnation_to_int (Incarnation i) = i 11let incarnation_of_int i = Incarnation i 12let zero_incarnation = Incarnation 0 13let compare_incarnation (Incarnation a) (Incarnation b) = Int.compare a b 14let incr_incarnation (Incarnation i) = Incarnation (i + 1) 15 16type addr = Eio.Net.Sockaddr.datagram 17type node_info = { id : node_id; addr : addr; meta : string } 18 19let make_node_info ~id ~addr ~meta = { id; addr; meta } 20 21type member_state = Alive | Suspect | Dead | Left 22 23let member_state_to_string = function 24 | Alive -> "alive" 25 | Suspect -> "suspect" 26 | Dead -> "dead" 27 | Left -> "left" 28 29let member_state_to_int = function 30 | Alive -> 0 31 | Suspect -> 1 32 | Dead -> 2 33 | Left -> 3 34 35let member_state_of_int = function 36 | 0 -> Alive 37 | 1 -> Suspect 38 | 2 -> Dead 39 | _ -> Left 40 41type member_snapshot = { 42 node : node_info; 43 state : member_state; 44 incarnation : incarnation; 45 state_change : Mtime.span; 46} 47 48type protocol_msg = 49 | Ping of { seq : int; target : node_id; sender : node_info } 50 | Ping_req of { seq : int; target : node_id; sender : node_info } 51 | Ack of { seq : int; responder : node_info; payload : string option } 52 | Alive of { node : node_info; incarnation : incarnation } 53 | Suspect of { 54 node : node_id; 55 incarnation : incarnation; 56 suspector : node_id; 57 } 58 | Dead of { node : node_id; incarnation : incarnation; declarator : node_id } 59 | User_msg of { topic : string; payload : string; origin : node_id } 60 61type packet = { 62 cluster : string; 63 primary : protocol_msg; 64 piggyback : protocol_msg list; 65} 66 67type decode_error = 68 | Invalid_magic 69 | Unsupported_version of int 70 | Truncated_message 71 | Invalid_tag of int 72 | Decryption_failed 73 | Msgpack_error of string 74 | Invalid_crc 75 76let decode_error_to_string = function 77 | Invalid_magic -> "invalid magic bytes" 78 | Unsupported_version v -> Printf.sprintf "unsupported version: %d" v 79 | Truncated_message -> "truncated message" 80 | Invalid_tag t -> Printf.sprintf "invalid tag: %d" t 81 | Decryption_failed -> "decryption failed" 82 | Msgpack_error s -> Printf.sprintf "msgpack error: %s" s 83 | Invalid_crc -> "invalid CRC checksum" 84 85type send_error = Node_unreachable | Timeout | Connection_reset 86 87let send_error_to_string = function 88 | Node_unreachable -> "node unreachable" 89 | Timeout -> "timeout" 90 | Connection_reset -> "connection reset" 91 92type node_event = 93 | Join of node_info 94 | Leave of node_info 95 | Update of node_info 96 | Suspect_event of node_info 97 | Alive_event of node_info 98 99type config = { 100 bind_addr : string; 101 bind_port : int; 102 node_name : string option; 103 protocol_interval : float; 104 probe_timeout : float; 105 indirect_checks : int; 106 suspicion_mult : int; 107 suspicion_max_timeout : float; 108 retransmit_mult : int; 109 udp_buffer_size : int; 110 tcp_timeout : float; 111 send_buffer_count : int; 112 recv_buffer_count : int; 113 secret_key : string; 114 cluster_name : string; 115 label : string; 116 encryption_enabled : bool; 117 gossip_verify_incoming : bool; 118 gossip_verify_outgoing : bool; 119 max_gossip_queue_depth : int; 120} 121 122let default_config = 123 { 124 bind_addr = "0.0.0.0"; 125 bind_port = 7946; 126 node_name = None; 127 protocol_interval = 1.0; 128 probe_timeout = 0.5; 129 indirect_checks = 3; 130 suspicion_mult = 4; 131 suspicion_max_timeout = 60.0; 132 retransmit_mult = 4; 133 udp_buffer_size = 1400; 134 tcp_timeout = 10.0; 135 send_buffer_count = 16; 136 recv_buffer_count = 16; 137 secret_key = String.make 16 '\x00'; 138 cluster_name = "default"; 139 label = ""; 140 encryption_enabled = false; 141 gossip_verify_incoming = true; 142 gossip_verify_outgoing = true; 143 max_gossip_queue_depth = 5000; 144 } 145 146type 'a env = { 147 stdenv : 'a; 148 sw : Eio.Switch.t; 149} 150 constraint 151 'a = 152 < clock : _ Eio.Time.clock 153 ; mono_clock : _ Eio.Time.Mono.t 154 ; net : _ Eio.Net.t 155 ; secure_random : _ Eio.Flow.source 156 ; .. > 157 158type stats = { 159 nodes_alive : int; 160 nodes_suspect : int; 161 nodes_dead : int; 162 msgs_sent : int; 163 msgs_received : int; 164 msgs_dropped : int; 165 queue_depth : int; 166 buffers_available : int; 167 buffers_total : int; 168} 169 170let empty_stats = 171 { 172 nodes_alive = 0; 173 nodes_suspect = 0; 174 nodes_dead = 0; 175 msgs_sent = 0; 176 msgs_received = 0; 177 msgs_dropped = 0; 178 queue_depth = 0; 179 buffers_available = 0; 180 buffers_total = 0; 181 } 182 183module Wire = struct 184 type message_type = 185 | Ping_msg 186 | Indirect_ping_msg 187 | Ack_resp_msg 188 | Suspect_msg 189 | Alive_msg 190 | Dead_msg 191 | Push_pull_msg 192 | Compound_msg 193 | User_msg 194 | Compress_msg 195 | Encrypt_msg 196 | Nack_resp_msg 197 | Has_crc_msg 198 | Err_msg 199 | Has_label_msg 200 201 let message_type_to_int = function 202 | Ping_msg -> 0 203 | Indirect_ping_msg -> 1 204 | Ack_resp_msg -> 2 205 | Suspect_msg -> 3 206 | Alive_msg -> 4 207 | Dead_msg -> 5 208 | Push_pull_msg -> 6 209 | Compound_msg -> 7 210 | User_msg -> 8 211 | Compress_msg -> 9 212 | Encrypt_msg -> 10 213 | Nack_resp_msg -> 11 214 | Has_crc_msg -> 12 215 | Err_msg -> 13 216 | Has_label_msg -> 244 217 218 let message_type_of_int = function 219 | 0 -> Ok Ping_msg 220 | 1 -> Ok Indirect_ping_msg 221 | 2 -> Ok Ack_resp_msg 222 | 3 -> Ok Suspect_msg 223 | 4 -> Ok Alive_msg 224 | 5 -> Ok Dead_msg 225 | 6 -> Ok Push_pull_msg 226 | 7 -> Ok Compound_msg 227 | 8 -> Ok User_msg 228 | 9 -> Ok Compress_msg 229 | 10 -> Ok Encrypt_msg 230 | 11 -> Ok Nack_resp_msg 231 | 12 -> Ok Has_crc_msg 232 | 13 -> Ok Err_msg 233 | 244 -> Ok Has_label_msg 234 | n -> Error n 235 236 type ping = { 237 seq_no : int; 238 node : string; 239 source_addr : string; 240 source_port : int; 241 source_node : string; 242 } 243 244 type indirect_ping_req = { 245 seq_no : int; 246 target : string; 247 port : int; 248 node : string; 249 nack : bool; 250 source_addr : string; 251 source_port : int; 252 source_node : string; 253 } 254 255 type ack_resp = { seq_no : int; payload : string } 256 type nack_resp = { seq_no : int } 257 type suspect = { incarnation : int; node : string; from : string } 258 259 type alive = { 260 incarnation : int; 261 node : string; 262 addr : string; 263 port : int; 264 meta : string; 265 vsn : int list; 266 } 267 268 type dead = { incarnation : int; node : string; from : string } 269 type compress = { algo : int; buf : string } 270 271 type push_pull_header = { 272 pp_nodes : int; 273 pp_user_state_len : int; 274 pp_join : bool; 275 } 276 277 type push_node_state = { 278 pns_name : string; 279 pns_addr : string; 280 pns_port : int; 281 pns_meta : string; 282 pns_incarnation : int; 283 pns_state : int; 284 pns_vsn : int list; 285 } 286 287 type protocol_msg = 288 | Ping of ping 289 | Indirect_ping of indirect_ping_req 290 | Ack of ack_resp 291 | Nack of nack_resp 292 | Suspect of suspect 293 | Alive of alive 294 | Dead of dead 295 | User_data of string 296 | Compound of string list 297 | Compressed of compress 298 | Err of string 299end 300 301let ip_to_bytes ip = 302 let s = Fmt.to_to_string Eio.Net.Ipaddr.pp ip in 303 if String.contains s ':' then ( 304 let parts = String.split_on_char ':' s in 305 let buf = Bytes.create 16 in 306 let rec fill idx = function 307 | [] -> () 308 | "" :: rest when List.exists (( = ) "") rest -> 309 let tail_len = List.length (List.filter (( <> ) "") rest) in 310 let zeros = 8 - idx - tail_len in 311 for i = 0 to (zeros * 2) - 1 do 312 Bytes.set_uint8 buf ((idx * 2) + i) 0 313 done; 314 fill (idx + zeros) rest 315 | "" :: rest -> fill idx rest 316 | h :: rest -> 317 let v = int_of_string ("0x" ^ h) in 318 Bytes.set_uint8 buf (idx * 2) (v lsr 8); 319 Bytes.set_uint8 buf ((idx * 2) + 1) (v land 0xff); 320 fill (idx + 1) rest 321 in 322 fill 0 parts; 323 Bytes.to_string buf) 324 else 325 Scanf.sscanf s "%d.%d.%d.%d" (fun a b c d -> 326 let buf = Bytes.create 4 in 327 Bytes.set_uint8 buf 0 a; 328 Bytes.set_uint8 buf 1 b; 329 Bytes.set_uint8 buf 2 c; 330 Bytes.set_uint8 buf 3 d; 331 Bytes.to_string buf) 332 333let ip_of_bytes s = 334 let len = String.length s in 335 if len = 4 then Eio.Net.Ipaddr.of_raw s 336 else if len = 16 then Eio.Net.Ipaddr.of_raw s 337 else failwith "invalid IP address length" 338 339let default_vsn = [ 1; 5; 5; 0; 0; 0 ] 340 341let node_info_to_wire (info : node_info) ~source_node : 342 string * int * string * string = 343 match info.addr with 344 | `Udp (ip, port) -> (ip_to_bytes ip, port, info.meta, source_node) 345 | `Unix _ -> failwith "Unix sockets not supported" 346 347let node_info_of_wire ~name ~addr ~port ~meta : node_info = 348 let ip = ip_of_bytes addr in 349 { id = node_id_of_string name; addr = `Udp (ip, port); meta } 350 351let msg_to_wire ~self_name ~self_port (msg : protocol_msg) : Wire.protocol_msg = 352 match msg with 353 | Ping { seq; target; sender } -> 354 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in 355 Wire.Ping 356 { 357 seq_no = seq; 358 node = node_id_to_string target; 359 source_addr = addr; 360 source_port = port; 361 source_node = self_name; 362 } 363 | Ping_req { seq; target; sender } -> 364 let addr, port, _, _ = node_info_to_wire sender ~source_node:"" in 365 let target_addr = 366 match sender.addr with `Udp (ip, _) -> ip_to_bytes ip | `Unix _ -> "" 367 in 368 Wire.Indirect_ping 369 { 370 seq_no = seq; 371 target = target_addr; 372 port = self_port; 373 node = node_id_to_string target; 374 nack = true; 375 source_addr = addr; 376 source_port = port; 377 source_node = self_name; 378 } 379 | Ack { seq; responder = _; payload } -> 380 Wire.Ack { seq_no = seq; payload = Option.value payload ~default:"" } 381 | Alive { node; incarnation } -> 382 let addr, port, meta, _ = node_info_to_wire node ~source_node:"" in 383 Wire.Alive 384 { 385 incarnation = incarnation_to_int incarnation; 386 node = node_id_to_string node.id; 387 addr; 388 port; 389 meta; 390 vsn = default_vsn; 391 } 392 | Suspect { node; incarnation; suspector } -> 393 Wire.Suspect 394 { 395 incarnation = incarnation_to_int incarnation; 396 node = node_id_to_string node; 397 from = node_id_to_string suspector; 398 } 399 | Dead { node; incarnation; declarator } -> 400 Wire.Dead 401 { 402 incarnation = incarnation_to_int incarnation; 403 node = node_id_to_string node; 404 from = node_id_to_string declarator; 405 } 406 | User_msg { topic; payload; origin } -> 407 let origin_str = node_id_to_string origin in 408 let topic_len = String.length topic in 409 let origin_len = String.length origin_str in 410 let encoded = 411 String.concat "" 412 [ 413 string_of_int topic_len; 414 ":"; 415 topic; 416 string_of_int origin_len; 417 ":"; 418 origin_str; 419 payload; 420 ] 421 in 422 Wire.User_data encoded 423 424let msg_of_wire ~default_port (wmsg : Wire.protocol_msg) : protocol_msg option = 425 match wmsg with 426 | Wire.Ping { seq_no; node; source_addr; source_port; source_node } -> 427 let port = if source_port > 0 then source_port else default_port in 428 let ip = 429 if String.length source_addr > 0 then ip_of_bytes source_addr 430 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 431 in 432 let sender = 433 { 434 id = 435 node_id_of_string (if source_node <> "" then source_node else node); 436 addr = `Udp (ip, port); 437 meta = ""; 438 } 439 in 440 Some (Ping { seq = seq_no; target = node_id_of_string node; sender }) 441 | Wire.Indirect_ping 442 { seq_no; target; port; node; source_addr; source_port; source_node; _ } 443 -> 444 let src_port = if source_port > 0 then source_port else default_port in 445 let ip = 446 if String.length source_addr > 0 then ip_of_bytes source_addr 447 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 448 in 449 let sender = 450 { 451 id = node_id_of_string (if source_node <> "" then source_node else ""); 452 addr = `Udp (ip, src_port); 453 meta = ""; 454 } 455 in 456 let _ = target in 457 let _ = port in 458 Some (Ping_req { seq = seq_no; target = node_id_of_string node; sender }) 459 | Wire.Ack { seq_no; payload } -> 460 let responder = 461 { 462 id = node_id_of_string ""; 463 addr = `Udp (Eio.Net.Ipaddr.of_raw "\000\000\000\000", 0); 464 meta = ""; 465 } 466 in 467 let payload = if payload = "" then None else Some payload in 468 Some (Ack { seq = seq_no; responder; payload }) 469 | Wire.Alive { incarnation; node; addr; port; meta; _ } -> 470 let ip = 471 if String.length addr > 0 then ip_of_bytes addr 472 else Eio.Net.Ipaddr.of_raw "\000\000\000\000" 473 in 474 let node_info = 475 { id = node_id_of_string node; addr = `Udp (ip, port); meta } 476 in 477 Some 478 (Alive 479 { node = node_info; incarnation = incarnation_of_int incarnation }) 480 | Wire.Suspect { incarnation; node; from } -> 481 Some 482 (Suspect 483 { 484 node = node_id_of_string node; 485 incarnation = incarnation_of_int incarnation; 486 suspector = node_id_of_string from; 487 }) 488 | Wire.Dead { incarnation; node; from } -> 489 Some 490 (Dead 491 { 492 node = node_id_of_string node; 493 incarnation = incarnation_of_int incarnation; 494 declarator = node_id_of_string from; 495 }) 496 | Wire.User_data encoded -> ( 497 let parse_length s start = 498 let rec find_colon i = 499 if i >= String.length s then None 500 else if s.[i] = ':' then Some i 501 else find_colon (i + 1) 502 in 503 match find_colon start with 504 | None -> None 505 | Some colon_pos -> ( 506 let len_str = String.sub s start (colon_pos - start) in 507 match int_of_string_opt len_str with 508 | None -> None 509 | Some len -> Some (len, colon_pos + 1)) 510 in 511 match parse_length encoded 0 with 512 | None -> 513 Some 514 (User_msg 515 { topic = ""; payload = encoded; origin = node_id_of_string "" }) 516 | Some (topic_len, topic_start) -> ( 517 if topic_start + topic_len > String.length encoded then 518 Some 519 (User_msg 520 { 521 topic = ""; 522 payload = encoded; 523 origin = node_id_of_string ""; 524 }) 525 else 526 let topic = String.sub encoded topic_start topic_len in 527 let origin_start = topic_start + topic_len in 528 match parse_length encoded origin_start with 529 | None -> 530 Some 531 (User_msg 532 { topic; payload = ""; origin = node_id_of_string "" }) 533 | Some (origin_len, payload_start) -> 534 if payload_start + origin_len > String.length encoded then 535 Some 536 (User_msg 537 { topic; payload = ""; origin = node_id_of_string "" }) 538 else 539 let origin = String.sub encoded payload_start origin_len in 540 let data_start = payload_start + origin_len in 541 let payload = 542 String.sub encoded data_start 543 (String.length encoded - data_start) 544 in 545 Some 546 (User_msg 547 { topic; payload; origin = node_id_of_string origin }))) 548 | Wire.Nack _ -> None 549 | Wire.Compound _ -> None 550 | Wire.Compressed _ -> None 551 | Wire.Err _ -> None