this repo has no description
1defmodule Hobbes.Cache do
2 use GenServer
3 alias Trinity.{SimProcess, SimServer, SimPersistentTerm}
4
5 alias Hobbes.Servers.{Coordinator, Manager}
6 alias Hobbes.Structs.Cluster
7
8 @type cluster_name :: binary
9
10 @clusters_table_key :_hobbes_clusters_table
11
12 @spec get_clusters_table :: :ets.table
13 defp get_clusters_table do
14 case SimPersistentTerm.get(@clusters_table_key, nil) do
15 nil -> raise "Hobbes.Cache has not been started"
16 table -> table
17 end
18 end
19
20 defmodule CachedCluster do
21 @type t :: %__MODULE__{
22 coordinators: list,
23 generation: non_neg_integer | -1,
24 manager_pid: pid | nil,
25 cluster: Cluster.t | nil,
26 }
27 @enforce_keys [
28 :coordinators,
29 :generation,
30 :manager_pid,
31 :cluster,
32 ]
33 defstruct @enforce_keys
34 end
35
36 defmodule State do
37 @enforce_keys [
38 :clusters_table,
39 ]
40 defstruct @enforce_keys
41 end
42
43 @spec start_link(nil) :: GenServer.on_start
44 def start_link(_arg \\ nil) do
45 SimServer.start_link(__MODULE__, nil, name: __MODULE__)
46 end
47
48 @spec lookup_cluster(cluster_name) :: {:ok, Cluster.t} | {:error, atom}
49 def lookup_cluster(name) do
50 clusters_table = get_clusters_table()
51
52 case :ets.lookup(clusters_table, name) do
53 [{^name, %CachedCluster{cluster: %Cluster{} = cluster}}] -> {:ok, cluster}
54 [{^name, %CachedCluster{}}] -> {:error, :cluster_not_loaded}
55 [] -> {:error, :unknown_cluster}
56 end
57 end
58
59 @spec write_coordinators(cluster_name, list) :: :ok
60 def write_coordinators(name, coordinators) when is_list(coordinators) do
61 try do
62 SimServer.call(__MODULE__, {:write_coordinators, name, coordinators}, 1000)
63 catch
64 :exit, {:timeout, _mfa} -> {:error, :timeout}
65 end
66 end
67
68 def init(nil) do
69 clusters_table = :ets.new(__MODULE__, [:set, :protected])
70 SimPersistentTerm.put(@clusters_table_key, clusters_table)
71
72 state = %State{
73 clusters_table: clusters_table,
74 }
75
76 SimProcess.send_after(self(), :tick, 0)
77 {:ok, state}
78 end
79
80 def handle_call({:write_coordinators, name, coordinators}, _from, %State{} = state) do
81 %{clusters_table: clusters_table} = state
82
83 case :ets.member(clusters_table, name) do
84 false ->
85 cached_cluster = %CachedCluster{
86 coordinators: coordinators,
87 generation: -1,
88 manager_pid: nil,
89 cluster: nil,
90 }
91 :ets.insert(clusters_table, {name, cached_cluster})
92 {:reply, :ok, state}
93
94 true ->
95 {:reply, {:error, :name_already_taken}, state}
96 end
97 end
98
99 def handle_info({:response_cluster_manager, response}, %State{} = state) do
100 %{clusters_table: clusters_table} = state
101 %{cluster_name: cluster_name, generation: generation, manager_pid: manager_pid} = response
102
103 [{^cluster_name, %CachedCluster{} = cached_cluster}] = :ets.lookup(clusters_table, cluster_name)
104 if generation > cached_cluster.generation do
105 cached_cluster = %{cached_cluster | generation: generation, manager_pid: manager_pid, cluster: nil}
106 :ets.insert(clusters_table, {cluster_name, cached_cluster})
107 end
108
109 {:noreply, state}
110 end
111
112 def handle_info({:response_cluster, response}, %State{} = state) do
113 %{clusters_table: clusters_table} = state
114 %{cluster_name: cluster_name, cluster: cluster} = response
115
116 [{^cluster_name, %CachedCluster{} = cached_cluster}] = :ets.lookup(clusters_table, cluster_name)
117 if cached_cluster.cluster == nil and cached_cluster.generation == cluster.generation do
118 cached_cluster = %{cached_cluster | cluster: cluster}
119 :ets.insert(clusters_table, {cluster_name, cached_cluster})
120 end
121
122 {:noreply, state}
123 end
124
125 def handle_info(:tick, %State{} = state) do
126 state = tick(state)
127 SimProcess.send_after(self(), :tick, 100)
128 {:noreply, state}
129 end
130
131 defp tick(%State{} = state) do
132 %{clusters_table: clusters_table} = state
133
134 :ets.tab2list(clusters_table)
135 |> Enum.each(fn
136 {cluster_name, %CachedCluster{} = cached_cluster} ->
137 Enum.each(cached_cluster.coordinators, fn coordinator ->
138 Coordinator.request_cluster_manager(coordinator, cluster_name)
139 end)
140
141 if !cached_cluster.cluster && cached_cluster.manager_pid do
142 Manager.request_cluster(cached_cluster.manager_pid, cluster_name)
143 end
144 end)
145
146 state
147 end
148end