swim protocol in ocaml interoperable with membership lib and serf cli
at main 8.4 kB view raw
1open Swim.Types 2module Buffer_pool = Swim.Buffer_pool 3module Membership = Swim.Membership 4module Member = Membership.Member 5module Pending_acks = Swim.Pending_acks 6 7let node1 = 8 make_node_info 9 ~id:(node_id_of_string "node1") 10 ~addr:(`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\001", 7946)) 11 ~meta:"" 12 13let node2 = 14 make_node_info 15 ~id:(node_id_of_string "node2") 16 ~addr:(`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\002", 7946)) 17 ~meta:"" 18 19let node3 = 20 make_node_info 21 ~id:(node_id_of_string "node3") 22 ~addr:(`Udp (Eio.Net.Ipaddr.of_raw "\127\000\000\003", 7946)) 23 ~meta:"" 24 25let now = Mtime.Span.of_uint64_ns 0L 26 27let test_buffer_pool_acquire_release () = 28 let pool = Buffer_pool.create ~size:1024 ~count:4 in 29 Alcotest.(check int) "initial available" 4 (Buffer_pool.available pool); 30 let buf1 = Buffer_pool.acquire pool in 31 Alcotest.(check int) "after acquire" 3 (Buffer_pool.available pool); 32 Alcotest.(check int) "buffer size" 1024 (Cstruct.length buf1); 33 Buffer_pool.release pool buf1; 34 Alcotest.(check int) "after release" 4 (Buffer_pool.available pool) 35 36let test_buffer_pool_with_buffer () = 37 let pool = Buffer_pool.create ~size:512 ~count:2 in 38 let result = 39 Buffer_pool.with_buffer pool (fun buf -> 40 Alcotest.(check int) "inside with_buffer" 1 (Buffer_pool.available pool); 41 Cstruct.length buf) 42 in 43 Alcotest.(check int) "returned value" 512 result; 44 Alcotest.(check int) "after with_buffer" 2 (Buffer_pool.available pool) 45 46let test_buffer_pool_exception_safe () = 47 let pool = Buffer_pool.create ~size:256 ~count:2 in 48 (try 49 Buffer_pool.with_buffer pool (fun _buf -> 50 Alcotest.(check int) "during exception" 1 (Buffer_pool.available pool); 51 failwith "test exception") 52 with Failure _ -> ()); 53 Alcotest.(check int) "after exception" 2 (Buffer_pool.available pool) 54 55let test_buffer_pool_all_buffers () = 56 let pool = Buffer_pool.create ~size:64 ~count:3 in 57 let b1 = Buffer_pool.acquire pool in 58 let b2 = Buffer_pool.acquire pool in 59 let b3 = Buffer_pool.acquire pool in 60 Alcotest.(check int) "all acquired" 0 (Buffer_pool.available pool); 61 Buffer_pool.release pool b1; 62 Buffer_pool.release pool b2; 63 Buffer_pool.release pool b3; 64 Alcotest.(check int) "all released" 3 (Buffer_pool.available pool) 65 66let test_membership_add_find () = 67 let table = Membership.create () in 68 let member = Member.create ~now node1 in 69 Membership.add table member; 70 Alcotest.(check int) "count" 1 (Membership.count table); 71 match Membership.find table node1.id with 72 | Some m -> 73 Alcotest.(check bool) 74 "same node" true 75 (equal_node_id (Member.node m).id node1.id) 76 | None -> Alcotest.fail "member not found" 77 78let test_membership_remove () = 79 let table = Membership.create () in 80 let member = Member.create ~now node1 in 81 Membership.add table member; 82 Alcotest.(check bool) 83 "remove existing" true 84 (Membership.remove table node1.id); 85 Alcotest.(check int) "count after remove" 0 (Membership.count table); 86 Alcotest.(check bool) 87 "remove non-existing" false 88 (Membership.remove table node1.id) 89 90let test_membership_to_list () = 91 let table = Membership.create () in 92 Membership.add table (Member.create ~now node1); 93 Membership.add table (Member.create ~now node2); 94 Membership.add table (Member.create ~now node3); 95 let members = Membership.to_list table in 96 Alcotest.(check int) "list length" 3 (List.length members) 97 98let test_membership_snapshot_consistency () = 99 let table = Membership.create () in 100 Membership.add table (Member.create ~now node1); 101 let snapshots = Membership.snapshot_all table in 102 Alcotest.(check int) "snapshot count" 1 (List.length snapshots); 103 let snap = List.hd snapshots in 104 Alcotest.(check bool) 105 "node id matches" true 106 (equal_node_id snap.node.id node1.id) 107 108let test_membership_count_accurate () = 109 let table = Membership.create () in 110 Alcotest.(check int) "empty" 0 (Membership.count table); 111 Membership.add table (Member.create ~now node1); 112 Alcotest.(check int) "after add 1" 1 (Membership.count table); 113 Membership.add table (Member.create ~now node2); 114 Alcotest.(check int) "after add 2" 2 (Membership.count table); 115 let _ = Membership.remove table node1.id in 116 Alcotest.(check int) "after remove" 1 (Membership.count table) 117 118let test_membership_no_duplicates () = 119 let table = Membership.create () in 120 Membership.add table (Member.create ~now node1); 121 Membership.add table (Member.create ~now node1); 122 Alcotest.(check int) "no duplicate" 1 (Membership.count table) 123 124let test_pending_acks_register_cancel () = 125 let pa = Pending_acks.create () in 126 let _ = Pending_acks.register pa ~seq:123 in 127 Alcotest.(check int) "pending before" 1 (Pending_acks.pending_count pa); 128 Pending_acks.cancel pa ~seq:123; 129 Alcotest.(check int) "pending after" 0 (Pending_acks.pending_count pa) 130 131let test_pending_acks_complete_not_found () = 132 let pa = Pending_acks.create () in 133 let completed = Pending_acks.complete pa ~seq:999 ~payload:None in 134 Alcotest.(check bool) "not found" false completed 135 136let test_member_state_transitions () = 137 let table = Membership.create () in 138 let member = Member.create ~now node1 in 139 Membership.add table member; 140 let updated = 141 Membership.update_member table node1.id 142 { 143 update = 144 (fun m ~xt -> 145 Member.set_suspect m ~incarnation:(incarnation_of_int 5) ~now ~xt); 146 } 147 in 148 Alcotest.(check bool) "updated" true updated; 149 match Membership.find table node1.id with 150 | Some m -> 151 let snap = Member.snapshot_now m in 152 Alcotest.(check string) 153 "state suspect" "suspect" 154 (member_state_to_string snap.state); 155 Alcotest.(check int) "incarnation" 5 (incarnation_to_int snap.incarnation) 156 | None -> Alcotest.fail "member not found" 157 158let test_member_set_alive () = 159 let table = Membership.create () in 160 let member = Member.create ~now node1 in 161 Membership.add table member; 162 let _ = 163 Membership.update_member table node1.id 164 { 165 update = 166 (fun m ~xt -> 167 Member.set_suspect m ~incarnation:(incarnation_of_int 1) ~now ~xt); 168 } 169 in 170 let _ = 171 Membership.update_member table node1.id 172 { 173 update = 174 (fun m ~xt -> 175 Member.set_alive m ~incarnation:(incarnation_of_int 2) ~now ~xt); 176 } 177 in 178 match Membership.find table node1.id with 179 | Some m -> 180 let snap = Member.snapshot_now m in 181 Alcotest.(check string) 182 "state alive" "alive" 183 (member_state_to_string snap.state) 184 | None -> Alcotest.fail "member not found" 185 186let test_member_set_dead () = 187 let table = Membership.create () in 188 let member = Member.create ~now node1 in 189 Membership.add table member; 190 let _ = 191 Membership.update_member table node1.id 192 { 193 update = 194 (fun m ~xt -> 195 Member.set_dead m ~incarnation:(incarnation_of_int 10) ~now ~xt); 196 } 197 in 198 match Membership.find table node1.id with 199 | Some m -> 200 let snap = Member.snapshot_now m in 201 Alcotest.(check string) 202 "state dead" "dead" 203 (member_state_to_string snap.state) 204 | None -> Alcotest.fail "member not found" 205 206let () = 207 Eio_main.run @@ fun _env -> 208 Alcotest.run "kcas" 209 [ 210 ( "buffer_pool", 211 [ 212 ("acquire_release", `Quick, test_buffer_pool_acquire_release); 213 ("with_buffer", `Quick, test_buffer_pool_with_buffer); 214 ("exception_safe", `Quick, test_buffer_pool_exception_safe); 215 ("all_buffers", `Quick, test_buffer_pool_all_buffers); 216 ] ); 217 ( "membership", 218 [ 219 ("add_find", `Quick, test_membership_add_find); 220 ("remove", `Quick, test_membership_remove); 221 ("to_list", `Quick, test_membership_to_list); 222 ("snapshot_consistency", `Quick, test_membership_snapshot_consistency); 223 ("count_accurate", `Quick, test_membership_count_accurate); 224 ("no_duplicates", `Quick, test_membership_no_duplicates); 225 ] ); 226 ( "pending_acks", 227 [ 228 ("register_cancel", `Quick, test_pending_acks_register_cancel); 229 ("complete_not_found", `Quick, test_pending_acks_complete_not_found); 230 ] ); 231 ( "member_transitions", 232 [ 233 ("state_transitions", `Quick, test_member_state_transitions); 234 ("set_alive", `Quick, test_member_set_alive); 235 ("set_dead", `Quick, test_member_set_dead); 236 ] ); 237 ]