this repo has no description
at master 148 lines 4.4 kB view raw
1defmodule Hobbes.Cache do 2 use GenServer 3 alias Trinity.{SimProcess, SimServer, SimPersistentTerm} 4 5 alias Hobbes.Servers.{Coordinator, Manager} 6 alias Hobbes.Structs.Cluster 7 8 @type cluster_name :: binary 9 10 @clusters_table_key :_hobbes_clusters_table 11 12 @spec get_clusters_table :: :ets.table 13 defp get_clusters_table do 14 case SimPersistentTerm.get(@clusters_table_key, nil) do 15 nil -> raise "Hobbes.Cache has not been started" 16 table -> table 17 end 18 end 19 20 defmodule CachedCluster do 21 @type t :: %__MODULE__{ 22 coordinators: list, 23 generation: non_neg_integer | -1, 24 manager_pid: pid | nil, 25 cluster: Cluster.t | nil, 26 } 27 @enforce_keys [ 28 :coordinators, 29 :generation, 30 :manager_pid, 31 :cluster, 32 ] 33 defstruct @enforce_keys 34 end 35 36 defmodule State do 37 @enforce_keys [ 38 :clusters_table, 39 ] 40 defstruct @enforce_keys 41 end 42 43 @spec start_link(nil) :: GenServer.on_start 44 def start_link(_arg \\ nil) do 45 SimServer.start_link(__MODULE__, nil, name: __MODULE__) 46 end 47 48 @spec lookup_cluster(cluster_name) :: {:ok, Cluster.t} | {:error, atom} 49 def lookup_cluster(name) do 50 clusters_table = get_clusters_table() 51 52 case :ets.lookup(clusters_table, name) do 53 [{^name, %CachedCluster{cluster: %Cluster{} = cluster}}] -> {:ok, cluster} 54 [{^name, %CachedCluster{}}] -> {:error, :cluster_not_loaded} 55 [] -> {:error, :unknown_cluster} 56 end 57 end 58 59 @spec write_coordinators(cluster_name, list) :: :ok 60 def write_coordinators(name, coordinators) when is_list(coordinators) do 61 try do 62 SimServer.call(__MODULE__, {:write_coordinators, name, coordinators}, 1000) 63 catch 64 :exit, {:timeout, _mfa} -> {:error, :timeout} 65 end 66 end 67 68 def init(nil) do 69 clusters_table = :ets.new(__MODULE__, [:set, :protected]) 70 SimPersistentTerm.put(@clusters_table_key, clusters_table) 71 72 state = %State{ 73 clusters_table: clusters_table, 74 } 75 76 SimProcess.send_after(self(), :tick, 0) 77 {:ok, state} 78 end 79 80 def handle_call({:write_coordinators, name, coordinators}, _from, %State{} = state) do 81 %{clusters_table: clusters_table} = state 82 83 case :ets.member(clusters_table, name) do 84 false -> 85 cached_cluster = %CachedCluster{ 86 coordinators: coordinators, 87 generation: -1, 88 manager_pid: nil, 89 cluster: nil, 90 } 91 :ets.insert(clusters_table, {name, cached_cluster}) 92 {:reply, :ok, state} 93 94 true -> 95 {:reply, {:error, :name_already_taken}, state} 96 end 97 end 98 99 def handle_info({:response_cluster_manager, response}, %State{} = state) do 100 %{clusters_table: clusters_table} = state 101 %{cluster_name: cluster_name, generation: generation, manager_pid: manager_pid} = response 102 103 [{^cluster_name, %CachedCluster{} = cached_cluster}] = :ets.lookup(clusters_table, cluster_name) 104 if generation > cached_cluster.generation do 105 cached_cluster = %{cached_cluster | generation: generation, manager_pid: manager_pid, cluster: nil} 106 :ets.insert(clusters_table, {cluster_name, cached_cluster}) 107 end 108 109 {:noreply, state} 110 end 111 112 def handle_info({:response_cluster, response}, %State{} = state) do 113 %{clusters_table: clusters_table} = state 114 %{cluster_name: cluster_name, cluster: cluster} = response 115 116 [{^cluster_name, %CachedCluster{} = cached_cluster}] = :ets.lookup(clusters_table, cluster_name) 117 if cached_cluster.cluster == nil and cached_cluster.generation == cluster.generation do 118 cached_cluster = %{cached_cluster | cluster: cluster} 119 :ets.insert(clusters_table, {cluster_name, cached_cluster}) 120 end 121 122 {:noreply, state} 123 end 124 125 def handle_info(:tick, %State{} = state) do 126 state = tick(state) 127 SimProcess.send_after(self(), :tick, 100) 128 {:noreply, state} 129 end 130 131 defp tick(%State{} = state) do 132 %{clusters_table: clusters_table} = state 133 134 :ets.tab2list(clusters_table) 135 |> Enum.each(fn 136 {cluster_name, %CachedCluster{} = cached_cluster} -> 137 Enum.each(cached_cluster.coordinators, fn coordinator -> 138 Coordinator.request_cluster_manager(coordinator, cluster_name) 139 end) 140 141 if !cached_cluster.cluster && cached_cluster.manager_pid do 142 Manager.request_cluster(cached_cluster.manager_pid, cluster_name) 143 end 144 end) 145 146 state 147 end 148end