swim protocol in ocaml interoperable with membership lib and serf cli
at main 3.8 kB view raw
1module Types = Types 2module Codec = Codec 3module Crypto = Crypto 4module Lzw = Lzw 5module Buffer_pool = Buffer_pool 6module Protocol_pure = Protocol_pure 7module Membership = Membership 8module Dissemination = Dissemination 9module Pending_acks = Pending_acks 10module Transport = Transport 11module Protocol = Protocol 12 13module Cluster = struct 14 type t = { protocol : Protocol.t; sw : Eio.Switch.t } 15 16 let create ~sw ~(env : _ Types.env) ~config = 17 let net = env.stdenv#net in 18 let clock = env.stdenv#clock in 19 let mono_clock = env.stdenv#mono_clock in 20 let secure_random = env.stdenv#secure_random in 21 22 let node_name = 23 match config.Types.node_name with 24 | Some name -> name 25 | None -> Printf.sprintf "node-%d" (Random.int 100000) 26 in 27 let self_id = Types.node_id_of_string node_name in 28 29 let udp_sock = 30 Transport.create_udp_socket net ~sw ~addr:config.bind_addr 31 ~port:config.bind_port 32 in 33 34 let tcp_listener = 35 Transport.create_tcp_listener net ~sw ~addr:config.bind_addr 36 ~port:config.bind_port ~backlog:10 37 in 38 39 let self_addr = 40 `Udp (Eio.Net.Ipaddr.of_raw config.bind_addr, config.bind_port) 41 in 42 let self = Types.make_node_info ~id:self_id ~addr:self_addr ~meta:"" in 43 44 match 45 Protocol.create ~sw ~config ~self ~udp_sock ~tcp_listener ~clock 46 ~mono_clock ~secure_random 47 with 48 | Error `Invalid_key -> Error `Invalid_key 49 | Ok protocol -> Ok { protocol; sw } 50 51 let start t = 52 Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> 53 Protocol.run_protocol t.protocol; 54 `Stop_daemon); 55 Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> 56 Protocol.run_udp_receiver t.protocol; 57 `Stop_daemon); 58 Eio.Fiber.fork_daemon ~sw:t.sw (fun () -> 59 Protocol.run_tcp_listener t.protocol; 60 `Stop_daemon) 61 62 let shutdown t = Protocol.shutdown t.protocol 63 let local_node t = Protocol.local_node t.protocol 64 let members t = Protocol.members t.protocol |> List.map Membership.Member.node 65 let member_count t = Protocol.member_count t.protocol 66 let events t = Protocol.events t.protocol 67 let stats t = Protocol.stats t.protocol 68 let add_member t node_info = Protocol.add_member t.protocol node_info 69 let remove_member t node_id = Protocol.remove_member t.protocol node_id 70 71 let join t ~seed_nodes = 72 let parse_and_try seed = 73 match Transport.parse_udp_addr seed with 74 | Error _ -> false 75 | Ok addr -> 76 let node_id = Types.node_id_of_string seed in 77 let node = Types.make_node_info ~id:node_id ~addr ~meta:"" in 78 Protocol.add_member t.protocol node; 79 true 80 in 81 let any_success = List.exists parse_and_try seed_nodes in 82 if any_success then Ok () else Error `No_seeds_reachable 83 84 let broadcast t ~topic ~payload = 85 Protocol.broadcast t.protocol ~topic ~payload 86 87 let send t ~target ~topic ~payload = 88 Protocol.send_direct t.protocol ~target ~topic ~payload 89 90 let send_to_addr t ~addr ~topic ~payload = 91 Protocol.send_to_addr t.protocol ~addr ~topic ~payload 92 93 let on_message t handler = Protocol.on_message t.protocol handler 94 95 let is_alive t node_id = 96 match 97 Membership.find 98 ( Protocol.members t.protocol |> fun _ -> 99 let p = t.protocol in 100 p.Protocol.members ) 101 node_id 102 with 103 | None -> false 104 | Some member -> 105 let snap = Membership.Member.snapshot_now member in 106 snap.Types.state = Types.Alive 107 108 let find_node t node_id = 109 match Membership.find t.protocol.Protocol.members node_id with 110 | None -> None 111 | Some member -> Some (Membership.Member.node member) 112 113 let is_healthy t = 114 let s = stats t in 115 s.Types.nodes_alive > 0 116end 117 118let default_config = Types.default_config 119let version = "0.1.0"