swim protocol in ocaml interoperable with membership lib and serf cli
at main 3.7 kB view raw
1open Types 2 3module Member = struct 4 type t = { 5 node : node_info; 6 state : member_state Kcas.Loc.t; 7 incarnation : incarnation Kcas.Loc.t; 8 state_change_time : Mtime.span Kcas.Loc.t; 9 last_ack_time : Mtime.span Kcas.Loc.t; 10 } 11 12 let create ?(initial_state : member_state = Types.Alive) 13 ?(initial_incarnation = zero_incarnation) ~(now : Mtime.span) 14 (node : node_info) = 15 { 16 node; 17 state = Kcas.Loc.make initial_state; 18 incarnation = Kcas.Loc.make initial_incarnation; 19 state_change_time = Kcas.Loc.make now; 20 last_ack_time = Kcas.Loc.make now; 21 } 22 23 let node t = t.node 24 let get_state ~xt t = Kcas.Xt.get ~xt t.state 25 let get_incarnation ~xt t = Kcas.Xt.get ~xt t.incarnation 26 let get_state_change_time ~xt t = Kcas.Xt.get ~xt t.state_change_time 27 let get_last_ack_time ~xt t = Kcas.Xt.get ~xt t.last_ack_time 28 29 let set_state ~xt t state ~now = 30 Kcas.Xt.set ~xt t.state state; 31 Kcas.Xt.set ~xt t.state_change_time now 32 33 let set_incarnation ~xt t inc = Kcas.Xt.set ~xt t.incarnation inc 34 35 let set_alive ~xt t ~incarnation ~now = 36 set_state ~xt t Alive ~now; 37 set_incarnation ~xt t incarnation 38 39 let set_suspect ~xt t ~incarnation ~now = 40 set_state ~xt t Suspect ~now; 41 set_incarnation ~xt t incarnation 42 43 let set_dead ~xt t ~incarnation ~now = 44 set_state ~xt t Dead ~now; 45 set_incarnation ~xt t incarnation 46 47 let record_ack ~xt t ~now = Kcas.Xt.set ~xt t.last_ack_time now 48 49 let snapshot ~xt t : member_snapshot = 50 { 51 node = t.node; 52 state = get_state ~xt t; 53 incarnation = get_incarnation ~xt t; 54 state_change = get_state_change_time ~xt t; 55 } 56 57 let snapshot_now t : member_snapshot = 58 Kcas.Xt.commit { tx = (fun ~xt -> snapshot ~xt t) } 59end 60 61type t = { 62 table : (string, Member.t) Kcas_data.Hashtbl.t; 63 count : int Kcas.Loc.t; 64} 65 66let create () = { table = Kcas_data.Hashtbl.create (); count = Kcas.Loc.make 0 } 67let key_of_id (Node_id s) = s 68 69let add t (member : Member.t) = 70 Kcas.Xt.commit 71 { 72 tx = 73 (fun ~xt -> 74 let key = key_of_id member.node.id in 75 match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with 76 | Some _ -> () 77 | None -> 78 Kcas_data.Hashtbl.Xt.replace ~xt t.table key member; 79 Kcas.Xt.modify ~xt t.count succ); 80 } 81 82let remove t id = 83 Kcas.Xt.commit 84 { 85 tx = 86 (fun ~xt -> 87 let key = key_of_id id in 88 match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with 89 | None -> false 90 | Some _ -> 91 Kcas_data.Hashtbl.Xt.remove ~xt t.table key; 92 Kcas.Xt.modify ~xt t.count pred; 93 true); 94 } 95 96let find t id = 97 Kcas.Xt.commit 98 { 99 tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id)); 100 } 101 102let mem t id = 103 Kcas.Xt.commit 104 { 105 tx = 106 (fun ~xt -> 107 Option.is_some 108 (Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id))); 109 } 110 111let to_list t = Kcas_data.Hashtbl.to_seq t.table |> Seq.map snd |> List.of_seq 112let to_node_list t = to_list t |> List.map Member.node 113let count t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.count) } 114 115type member_updater = { update : 'x. Member.t -> xt:'x Kcas.Xt.t -> unit } 116 117let update_member t id updater = 118 match Kcas_data.Hashtbl.find_opt t.table (key_of_id id) with 119 | None -> false 120 | Some member -> 121 Kcas.Xt.commit { tx = (fun ~xt -> updater.update member ~xt) }; 122 true 123 124let iter_alive t f = 125 to_list t 126 |> List.iter (fun m -> 127 let snap = Member.snapshot_now m in 128 if snap.state = Alive then f m snap) 129 130let snapshot_all t = to_list t |> List.map Member.snapshot_now