swim protocol in ocaml interoperable with membership lib and serf cli
at main 11 kB view raw
1open Swim.Types 2 3let gen_node_id : node_id QCheck.Gen.t = 4 let open QCheck.Gen in 5 let+ id = 6 oneof_weighted 7 [ 8 (3, string_size ~gen:printable (int_range 1 64)); 9 (1, return ""); 10 (1, string_size ~gen:printable (return 255)); 11 ] 12 in 13 node_id_of_string id 14 15let gen_incarnation : incarnation QCheck.Gen.t = 16 let open QCheck.Gen in 17 let+ i = 18 oneof_weighted 19 [ (5, int_range 0 1000); (2, int_range 0 max_int); (1, return 0) ] 20 in 21 incarnation_of_int i 22 23let gen_member_state : member_state QCheck.Gen.t = 24 let open QCheck.Gen in 25 let alive : member_state = Alive in 26 let suspect : member_state = Suspect in 27 let dead : member_state = Dead in 28 oneof [ return alive; return suspect; return dead ] 29 30let gen_ipv4 : string QCheck.Gen.t = 31 let open QCheck.Gen in 32 let+ a = int_range 0 255 33 and+ b = int_range 0 255 34 and+ c = int_range 0 255 35 and+ d = int_range 0 255 in 36 Printf.sprintf "%d.%d.%d.%d" a b c d 37 38let gen_port : int QCheck.Gen.t = 39 let open QCheck.Gen in 40 oneof_weighted 41 [ (3, int_range 1024 65535); (1, int_range 1 1023); (1, return 7946) ] 42 43let gen_addr : addr QCheck.Gen.t = 44 let open QCheck.Gen in 45 let+ ip = gen_ipv4 and+ port = gen_port in 46 let ipaddr = 47 match Ipaddr.V4.of_string ip with 48 | Ok v4 -> v4 49 | Error _ -> Ipaddr.V4.localhost 50 in 51 `Udp (Eio.Net.Ipaddr.of_raw (Ipaddr.V4.to_octets ipaddr), port) 52 53let gen_meta : string QCheck.Gen.t = 54 let open QCheck.Gen in 55 oneof_weighted 56 [ 57 (3, string_size ~gen:printable (int_range 0 256)); 58 (1, return ""); 59 (1, return (String.make 1024 'x')); 60 ] 61 62let gen_node_info : node_info QCheck.Gen.t = 63 let open QCheck.Gen in 64 let+ id = gen_node_id and+ addr = gen_addr and+ meta = gen_meta in 65 make_node_info ~id ~addr ~meta 66 67let gen_seq : int QCheck.Gen.t = 68 let open QCheck.Gen in 69 oneof_weighted 70 [ (5, int_range 0 10000); (2, int_range 0 max_int); (1, return 0) ] 71 72let gen_ping : protocol_msg QCheck.Gen.t = 73 let open QCheck.Gen in 74 let+ seq = gen_seq and+ target = gen_node_id and+ sender = gen_node_info in 75 Ping { seq; target; sender } 76 77let gen_ping_req : protocol_msg QCheck.Gen.t = 78 let open QCheck.Gen in 79 let+ seq = gen_seq and+ target = gen_node_id and+ sender = gen_node_info in 80 Ping_req { seq; target; sender } 81 82let gen_payload : string option QCheck.Gen.t = 83 let open QCheck.Gen in 84 oneof_weighted 85 [ 86 (2, return None); 87 (3, map Option.some (string_size ~gen:printable (int_range 0 512))); 88 ] 89 90let gen_ack : protocol_msg QCheck.Gen.t = 91 let open QCheck.Gen in 92 let+ seq = gen_seq 93 and+ responder = gen_node_info 94 and+ payload = gen_payload in 95 Ack { seq; responder; payload } 96 97let gen_alive : protocol_msg QCheck.Gen.t = 98 let open QCheck.Gen in 99 let+ node = gen_node_info and+ incarnation = gen_incarnation in 100 Alive { node; incarnation } 101 102let gen_suspect : protocol_msg QCheck.Gen.t = 103 let open QCheck.Gen in 104 let+ node = gen_node_id 105 and+ incarnation = gen_incarnation 106 and+ suspector = gen_node_id in 107 Suspect { node; incarnation; suspector } 108 109let gen_dead : protocol_msg QCheck.Gen.t = 110 let open QCheck.Gen in 111 let+ node = gen_node_id 112 and+ incarnation = gen_incarnation 113 and+ declarator = gen_node_id in 114 Dead { node; incarnation; declarator } 115 116let gen_topic : string QCheck.Gen.t = 117 QCheck.Gen.string_size ~gen:QCheck.Gen.printable (QCheck.Gen.int_range 1 64) 118 119let gen_user_payload : string QCheck.Gen.t = 120 QCheck.Gen.string_size ~gen:QCheck.Gen.printable (QCheck.Gen.int_range 0 1024) 121 122let gen_user_msg : protocol_msg QCheck.Gen.t = 123 let open QCheck.Gen in 124 let+ topic = gen_topic 125 and+ payload = gen_user_payload 126 and+ origin = gen_node_id in 127 User_msg { topic; payload; origin } 128 129let gen_protocol_msg : protocol_msg QCheck.Gen.t = 130 QCheck.Gen.oneof 131 [ 132 gen_ping; 133 gen_ping_req; 134 gen_ack; 135 gen_alive; 136 gen_suspect; 137 gen_dead; 138 gen_user_msg; 139 ] 140 141let gen_cluster_name : string QCheck.Gen.t = 142 let open QCheck.Gen in 143 oneof_weighted 144 [ 145 (3, string_size ~gen:printable (int_range 1 32)); 146 (1, return "default"); 147 (1, return "test-cluster"); 148 ] 149 150let gen_piggyback : protocol_msg list QCheck.Gen.t = 151 let open QCheck.Gen in 152 let piggyback_msg = 153 oneof [ gen_alive; gen_suspect; gen_dead; gen_user_msg ] 154 in 155 list_size (int_range 0 8) piggyback_msg 156 157let gen_packet : packet QCheck.Gen.t = 158 let open QCheck.Gen in 159 let+ cluster = gen_cluster_name 160 and+ primary = gen_protocol_msg 161 and+ piggyback = gen_piggyback in 162 { cluster; primary; piggyback } 163 164let gen_cstruct : Cstruct.t QCheck.Gen.t = 165 let open QCheck.Gen in 166 let+ len = 167 oneof_weighted 168 [ (3, int_range 0 1024); (1, return 0); (1, int_range 1024 4096) ] 169 and+ fill = char in 170 let cs = Cstruct.create len in 171 Cstruct.memset cs (Char.code fill); 172 cs 173 174let gen_cstruct_sized (size : int) : Cstruct.t QCheck.Gen.t = 175 let open QCheck.Gen in 176 let+ bytes = string_size ~gen:char (return size) in 177 Cstruct.of_string bytes 178 179let gen_config : config QCheck.Gen.t = 180 let open QCheck.Gen in 181 let+ bind_addr = gen_ipv4 182 and+ bind_port = gen_port 183 and+ node_name = 184 oneof_weighted [ (2, return None); (3, map Option.some gen_topic) ] 185 and+ protocol_interval = float_range 0.1 10.0 186 and+ probe_timeout = float_range 0.1 5.0 187 and+ indirect_checks = int_range 1 10 188 and+ suspicion_mult = int_range 1 10 189 and+ suspicion_max_timeout = float_range 10.0 120.0 190 and+ retransmit_mult = int_range 1 10 191 and+ udp_buffer_size = 192 oneof [ return 1400; return 1500; return 8192; return 65507 ] 193 and+ tcp_timeout = float_range 1.0 30.0 194 and+ send_buffer_count = int_range 4 64 195 and+ recv_buffer_count = int_range 4 64 196 and+ secret_key = gen_cstruct_sized 16 197 and+ cluster_name = gen_cluster_name 198 and+ label = oneof [ return ""; gen_topic ] 199 and+ encryption_enabled = bool 200 and+ gossip_verify_incoming = bool 201 and+ gossip_verify_outgoing = bool 202 and+ max_gossip_queue_depth = int_range 10 10000 in 203 { 204 bind_addr; 205 bind_port; 206 node_name; 207 protocol_interval; 208 probe_timeout; 209 indirect_checks; 210 suspicion_mult; 211 suspicion_max_timeout; 212 retransmit_mult; 213 udp_buffer_size; 214 tcp_timeout; 215 send_buffer_count; 216 recv_buffer_count; 217 secret_key = Cstruct.to_string secret_key; 218 cluster_name; 219 label; 220 encryption_enabled; 221 gossip_verify_incoming; 222 gossip_verify_outgoing; 223 max_gossip_queue_depth; 224 } 225 226let gen_decode_error : decode_error QCheck.Gen.t = 227 let open QCheck.Gen in 228 oneof 229 [ 230 return Invalid_magic; 231 map (fun v -> Unsupported_version v) (int_range 0 255); 232 return Truncated_message; 233 map (fun t -> Invalid_tag t) (int_range 0 255); 234 return Decryption_failed; 235 ] 236 237let gen_send_error : send_error QCheck.Gen.t = 238 let open QCheck.Gen in 239 oneof [ return Node_unreachable; return Timeout; return Connection_reset ] 240 241let gen_mtime_span : Mtime.span QCheck.Gen.t = 242 let open QCheck.Gen in 243 let+ ns = map Int64.of_int (int_range 0 1_000_000_000) in 244 Mtime.Span.of_uint64_ns ns 245 246let gen_member_snapshot : member_snapshot QCheck.Gen.t = 247 let open QCheck.Gen in 248 let+ node = gen_node_info 249 and+ state = gen_member_state 250 and+ incarnation = gen_incarnation 251 and+ state_change = gen_mtime_span in 252 { node; state; incarnation; state_change } 253 254let arb_node_id : node_id QCheck.arbitrary = 255 QCheck.make ~print:(fun id -> node_id_to_string id) gen_node_id 256 257let arb_incarnation : incarnation QCheck.arbitrary = 258 QCheck.make 259 ~print:(fun inc -> string_of_int (incarnation_to_int inc)) 260 ~shrink:(fun inc -> 261 let i = incarnation_to_int inc in 262 QCheck.Shrink.int i |> QCheck.Iter.map incarnation_of_int) 263 gen_incarnation 264 265let arb_member_state : member_state QCheck.arbitrary = 266 QCheck.make ~print:member_state_to_string gen_member_state 267 268let format_addr (addr : addr) : string = 269 match addr with 270 | `Udp (ip, port) -> Fmt.str "%a:%d" Eio.Net.Ipaddr.pp ip port 271 | `Unix path -> Printf.sprintf "unix:%s" path 272 273let format_node_info (ni : node_info) : string = 274 Printf.sprintf "{ id=%s; addr=%s; meta=%S }" (node_id_to_string ni.id) 275 (format_addr ni.addr) ni.meta 276 277let arb_node_info : node_info QCheck.arbitrary = 278 QCheck.make ~print:format_node_info gen_node_info 279 280let format_protocol_msg (msg : protocol_msg) : string = 281 match msg with 282 | Ping { seq; target; sender } -> 283 Printf.sprintf "Ping { seq=%d; target=%s; sender=%s }" seq 284 (node_id_to_string target) (format_node_info sender) 285 | Ping_req { seq; target; sender } -> 286 Printf.sprintf "Ping_req { seq=%d; target=%s; sender=%s }" seq 287 (node_id_to_string target) (format_node_info sender) 288 | Ack { seq; responder; payload } -> 289 Printf.sprintf "Ack { seq=%d; responder=%s; payload=%s }" seq 290 (format_node_info responder) 291 (match payload with 292 | None -> "None" 293 | Some p -> Printf.sprintf "Some %S" p) 294 | Alive { node; incarnation } -> 295 Printf.sprintf "Alive { node=%s; incarnation=%d }" (format_node_info node) 296 (incarnation_to_int incarnation) 297 | Suspect { node; incarnation; suspector } -> 298 Printf.sprintf "Suspect { node=%s; incarnation=%d; suspector=%s }" 299 (node_id_to_string node) 300 (incarnation_to_int incarnation) 301 (node_id_to_string suspector) 302 | Dead { node; incarnation; declarator } -> 303 Printf.sprintf "Dead { node=%s; incarnation=%d; declarator=%s }" 304 (node_id_to_string node) 305 (incarnation_to_int incarnation) 306 (node_id_to_string declarator) 307 | User_msg { topic; payload; origin } -> 308 Printf.sprintf "User_msg { topic=%S; payload=%S; origin=%s }" topic 309 payload (node_id_to_string origin) 310 311let arb_protocol_msg : protocol_msg QCheck.arbitrary = 312 QCheck.make ~print:format_protocol_msg gen_protocol_msg 313 314let format_packet (p : packet) : string = 315 Printf.sprintf "{ cluster=%S; primary=%s; piggyback=[%d msgs] }" p.cluster 316 (format_protocol_msg p.primary) 317 (List.length p.piggyback) 318 319let arb_packet : packet QCheck.arbitrary = 320 QCheck.make ~print:format_packet gen_packet 321 322let arb_cstruct : Cstruct.t QCheck.arbitrary = 323 QCheck.make 324 ~print:(fun cs -> Printf.sprintf "<cstruct len=%d>" (Cstruct.length cs)) 325 gen_cstruct 326 327let arb_decode_error : decode_error QCheck.arbitrary = 328 QCheck.make ~print:decode_error_to_string gen_decode_error 329 330let arb_send_error : send_error QCheck.arbitrary = 331 QCheck.make ~print:send_error_to_string gen_send_error 332 333let arb_member_snapshot : member_snapshot QCheck.arbitrary = 334 QCheck.make 335 ~print:(fun ms -> 336 Printf.sprintf "{ node=%s; state=%s; incarnation=%d }" 337 (format_node_info ms.node) 338 (member_state_to_string ms.state) 339 (incarnation_to_int ms.incarnation)) 340 gen_member_snapshot