swim protocol in ocaml interoperable with membership lib and serf cli
1module Types = Types
2module Codec = Codec
3module Crypto = Crypto
4module Lzw = Lzw
5module Buffer_pool = Buffer_pool
6module Protocol_pure = Protocol_pure
7module Membership = Membership
8module Dissemination = Dissemination
9module Pending_acks = Pending_acks
10module Transport = Transport
11module Protocol = Protocol
12
13module Cluster = struct
14 type t = { protocol : Protocol.t; sw : Eio.Switch.t }
15
16 let create ~sw ~(env : _ Types.env) ~config =
17 let net = env.stdenv#net in
18 let clock = env.stdenv#clock in
19 let mono_clock = env.stdenv#mono_clock in
20 let secure_random = env.stdenv#secure_random in
21
22 let node_name =
23 match config.Types.node_name with
24 | Some name -> name
25 | None -> Printf.sprintf "node-%d" (Random.int 100000)
26 in
27 let self_id = Types.node_id_of_string node_name in
28
29 let udp_sock =
30 Transport.create_udp_socket net ~sw ~addr:config.bind_addr
31 ~port:config.bind_port
32 in
33
34 let tcp_listener =
35 Transport.create_tcp_listener net ~sw ~addr:config.bind_addr
36 ~port:config.bind_port ~backlog:10
37 in
38
39 let self_addr =
40 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port)
41 in
42 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in
43
44 match
45 Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock
46 ~mono_clock ~secure_random
47 with
48 | Error `Invalid_key -> Error `Invalid_key
49 | Ok protocol -> Ok { protocol; sw }
50
51 let start t =
52 Eio.Fiber.fork_daemon ~sw:t.sw (fun () ->
53 Protocol.run_protocol t.protocol;
54 `Stop_daemon);
55 Eio.Fiber.fork_daemon ~sw:t.sw (fun () ->
56 Protocol.run_udp_receiver t.protocol;
57 `Stop_daemon);
58 Eio.Fiber.fork_daemon ~sw:t.sw (fun () ->
59 Protocol.run_tcp_listener t.protocol;
60 `Stop_daemon)
61
62 let shutdown t = Protocol.shutdown t.protocol
63 let local_node t = Protocol.local_node t.protocol
64 let members t = Protocol.members t.protocol |> List.map Membership.Member.node
65 let member_count t = Protocol.member_count t.protocol
66 let events t = Protocol.events t.protocol
67 let stats t = Protocol.stats t.protocol
68 let add_member t node_info = Protocol.add_member t.protocol node_info
69 let remove_member t node_id = Protocol.remove_member t.protocol node_id
70
71 let join t ~seed_nodes =
72 let parse_and_try seed =
73 match Transport.parse_udp_addr seed with
74 | Error _ -> false
75 | Ok addr ->
76 let node_id = Types.node_id_of_string seed in
77 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in
78 Protocol.add_member t.protocol node;
79 true
80 in
81 let any_success = List.exists parse_and_try seed_nodes in
82 if any_success then Ok () else Error `No_seeds_reachable
83
84 let broadcast t ~topic ~payload =
85 Protocol.broadcast t.protocol ~topic ~payload
86
87 let send t ~target ~topic ~payload =
88 Protocol.send_direct t.protocol ~target ~topic ~payload
89
90 let send_to_addr t ~addr ~topic ~payload =
91 Protocol.send_to_addr t.protocol ~addr ~topic ~payload
92
93 let on_message t handler = Protocol.on_message t.protocol handler
94
95 let is_alive t node_id =
96 match
97 Membership.find
98 ( Protocol.members t.protocol |> fun _ ->
99 let p = t.protocol in
100 p.Protocol.members )
101 node_id
102 with
103 | None -> false
104 | Some member ->
105 let snap = Membership.Member.snapshot_now member in
106 snap.Types.state = Types.Alive
107
108 let find_node t node_id =
109 match Membership.find t.protocol.Protocol.members node_id with
110 | None -> None
111 | Some member -> Some (Membership.Member.node member)
112
113 let is_healthy t =
114 let s = stats t in
115 s.Types.nodes_alive > 0
116end
117
118let default_config = Types.default_config
119let version = "0.1.0"