this repo has no description
at master 106 lines 3.1 kB view raw
1defmodule Hobbes.ClusterNode do 2 use GenServer 3 alias Trinity.{SimServer, SimProcess} 4 5 alias Hobbes.{ClusterConfig, Cache} 6 alias Hobbes.Servers.{Coordinator, ServerSupervisor} 7 8 import ExUnit.Assertions, only: [assert: 1] 9 10 @spec start_link(keyword) :: term 11 def start_link(config) when is_list(config), do: SimServer.start_link(__MODULE__, config) 12 13 defmodule NodeConfig do 14 @enforce_keys [ 15 :cluster_name, 16 17 :coordinator_names, 18 :coordinator_id, 19 :initial_cluster_opts, 20 21 :slots, 22 ] 23 defstruct @enforce_keys 24 25 def from_opts(opts) do 26 # TODO: validate config with helpful error messages 27 %NodeConfig{ 28 cluster_name: Keyword.fetch!(opts, :cluster), 29 coordinator_names: Keyword.fetch!(opts, :coordinators), 30 coordinator_id: Keyword.get(opts, :coordinator_id), 31 initial_cluster_opts: Keyword.fetch!(opts, :initial_cluster_opts), 32 slots: Keyword.fetch!(opts, :slots), 33 } 34 end 35 end 36 37 defmodule State do 38 @enforce_keys [ 39 :node_config, 40 :initial_cluster_config, 41 :coordinator_pid, 42 :server_supervisor_pid, 43 ] 44 defstruct @enforce_keys 45 end 46 47 def init(opts) do 48 #SimProcess.flag(:trap_exit, true) 49 node_config = NodeConfig.from_opts(opts) 50 initial_cluster_config = ClusterConfig.from_opts(node_config.initial_cluster_opts) 51 52 coordinator_pid = start_coordinator(node_config) 53 server_supervisor_pid = start_server_supervisor(node_config) 54 55 state = %State{ 56 node_config: node_config, 57 initial_cluster_config: initial_cluster_config, 58 coordinator_pid: coordinator_pid, 59 server_supervisor_pid: server_supervisor_pid, 60 } 61 62 :ok = ensure_cluster_config(state) 63 Cache.write_coordinators(node_config.cluster_name, node_config.coordinator_names) 64 {:ok, state} 65 end 66 67 defp start_coordinator(%{coordinator_id: nil}), do: nil 68 69 defp start_coordinator(%{coordinator_id: id, coordinator_names: coordinator_names}) 70 when is_integer(id) and is_list(coordinator_names) do 71 assert id < length(coordinator_names) 72 73 # TODO: use cluster name 74 name = String.to_atom("coordinator-#{id}") 75 {:ok, pid} = Coordinator.start_link(id, coordinator_names, name: name) 76 pid 77 end 78 79 defp start_server_supervisor(%{coordinator_names: coordinators, slots: slots}) do 80 {:ok, pid} = ServerSupervisor.start_link(%{coordinators: coordinators, slots: slots}) 81 pid 82 end 83 84 defp ensure_cluster_config(%State{} = state) do 85 %{ 86 initial_cluster_config: initial_cluster_config, 87 coordinator_pid: coordinator_pid, 88 } = state 89 90 SimProcess.spawn_link(fn -> 91 do_ensure_cluster_config(coordinator_pid, initial_cluster_config) 92 end) 93 :ok 94 end 95 96 defp do_ensure_cluster_config(coordinator_pid, %ClusterConfig{} = config) do 97 case Coordinator.write_initial_config(coordinator_pid, config) do 98 :ok -> :noop 99 {:error, {:not_primary, _}} -> :noop 100 {:error, :config_already_written} -> :noop 101 {:error, :timeout} -> 102 SimProcess.sleep(100) 103 do_ensure_cluster_config(coordinator_pid, config) 104 end 105 end 106end