swim protocol in ocaml interoperable with membership lib and serf cli
1open Types
2
3module Member = struct
4 type t = {
5 node : node_info;
6 state : member_state Kcas.Loc.t;
7 incarnation : incarnation Kcas.Loc.t;
8 state_change_time : Mtime.span Kcas.Loc.t;
9 last_ack_time : Mtime.span Kcas.Loc.t;
10 }
11
12 let create ?(initial_state : member_state = Types.Alive)
13 ?(initial_incarnation = zero_incarnation) ~(now : Mtime.span)
14 (node : node_info) =
15 {
16 node;
17 state = Kcas.Loc.make initial_state;
18 incarnation = Kcas.Loc.make initial_incarnation;
19 state_change_time = Kcas.Loc.make now;
20 last_ack_time = Kcas.Loc.make now;
21 }
22
23 let node t = t.node
24 let get_state ~xt t = Kcas.Xt.get ~xt t.state
25 let get_incarnation ~xt t = Kcas.Xt.get ~xt t.incarnation
26 let get_state_change_time ~xt t = Kcas.Xt.get ~xt t.state_change_time
27 let get_last_ack_time ~xt t = Kcas.Xt.get ~xt t.last_ack_time
28
29 let set_state ~xt t state ~now =
30 Kcas.Xt.set ~xt t.state state;
31 Kcas.Xt.set ~xt t.state_change_time now
32
33 let set_incarnation ~xt t inc = Kcas.Xt.set ~xt t.incarnation inc
34
35 let set_alive ~xt t ~incarnation ~now =
36 set_state ~xt t Alive ~now;
37 set_incarnation ~xt t incarnation
38
39 let set_suspect ~xt t ~incarnation ~now =
40 set_state ~xt t Suspect ~now;
41 set_incarnation ~xt t incarnation
42
43 let set_dead ~xt t ~incarnation ~now =
44 set_state ~xt t Dead ~now;
45 set_incarnation ~xt t incarnation
46
47 let record_ack ~xt t ~now = Kcas.Xt.set ~xt t.last_ack_time now
48
49 let snapshot ~xt t : member_snapshot =
50 {
51 node = t.node;
52 state = get_state ~xt t;
53 incarnation = get_incarnation ~xt t;
54 state_change = get_state_change_time ~xt t;
55 }
56
57 let snapshot_now t : member_snapshot =
58 Kcas.Xt.commit { tx = (fun ~xt -> snapshot ~xt t) }
59end
60
61type t = {
62 table : (string, Member.t) Kcas_data.Hashtbl.t;
63 count : int Kcas.Loc.t;
64}
65
66let create () = { table = Kcas_data.Hashtbl.create (); count = Kcas.Loc.make 0 }
67let key_of_id (Node_id s) = s
68
69let add t (member : Member.t) =
70 Kcas.Xt.commit
71 {
72 tx =
73 (fun ~xt ->
74 let key = key_of_id member.node.id in
75 match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with
76 | Some _ -> ()
77 | None ->
78 Kcas_data.Hashtbl.Xt.replace ~xt t.table key member;
79 Kcas.Xt.modify ~xt t.count succ);
80 }
81
82let remove t id =
83 Kcas.Xt.commit
84 {
85 tx =
86 (fun ~xt ->
87 let key = key_of_id id in
88 match Kcas_data.Hashtbl.Xt.find_opt ~xt t.table key with
89 | None -> false
90 | Some _ ->
91 Kcas_data.Hashtbl.Xt.remove ~xt t.table key;
92 Kcas.Xt.modify ~xt t.count pred;
93 true);
94 }
95
96let find t id =
97 Kcas.Xt.commit
98 {
99 tx = (fun ~xt -> Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id));
100 }
101
102let mem t id =
103 Kcas.Xt.commit
104 {
105 tx =
106 (fun ~xt ->
107 Option.is_some
108 (Kcas_data.Hashtbl.Xt.find_opt ~xt t.table (key_of_id id)));
109 }
110
111let to_list t = Kcas_data.Hashtbl.to_seq t.table |> Seq.map snd |> List.of_seq
112let to_node_list t = to_list t |> List.map Member.node
113let count t = Kcas.Xt.commit { tx = (fun ~xt -> Kcas.Xt.get ~xt t.count) }
114
115type member_updater = { update : 'x. Member.t -> xt:'x Kcas.Xt.t -> unit }
116
117let update_member t id updater =
118 match Kcas_data.Hashtbl.find_opt t.table (key_of_id id) with
119 | None -> false
120 | Some member ->
121 Kcas.Xt.commit { tx = (fun ~xt -> updater.update member ~xt) };
122 true
123
124let iter_alive t f =
125 to_list t
126 |> List.iter (fun m ->
127 let snap = Member.snapshot_now m in
128 if snap.state = Alive then f m snap)
129
130let snapshot_all t = to_list t |> List.map Member.snapshot_now