tangled
alpha
login
or
join now
corporate.fm
/
trinity
2
fork
atom
this repo has no description
2
fork
atom
overview
issues
pulls
pipelines
Propagate exit through links
garrison.corporate.fm
2 months ago
669af4d6
4b6adf80
+126
-23
2 changed files
expand all
collapse all
unified
split
lib
trinity
scheduler.ex
test
trinity_test.exs
+115
-18
lib/trinity/scheduler.ex
reviewed
···
2
2
defmodule Simulation do
3
3
@enforce_keys [
4
4
:queue,
5
5
+
:proc_queue_keys,
5
6
:proc_links,
6
7
:now,
7
8
:supervisor_pid,
···
12
13
defmodule SimulationSupervisor do
13
14
use GenServer
14
15
16
16
+
defmodule State do
17
17
+
@enforce_keys [:sim, :monitors]
18
18
+
defstruct @enforce_keys
19
19
+
end
20
20
+
15
21
def start_link(%Simulation{} = sim) do
16
16
-
GenServer.start_link(__MODULE__, sim)
22
22
+
GenServer.start(__MODULE__, sim)
17
23
end
18
24
19
25
def register_self(server) do
···
21
27
end
22
28
23
29
def init(%Simulation{} = sim) do
24
24
-
{:ok, sim}
30
30
+
state = %State{
31
31
+
sim: sim,
32
32
+
monitors: %{},
33
33
+
}
34
34
+
{:ok, state}
25
35
end
26
36
27
27
-
def handle_cast({:register, pid}, state) do
28
28
-
Process.monitor(pid)
37
37
+
def handle_cast({:register, pid}, %State{} = state) do
38
38
+
mref = Process.monitor(pid)
39
39
+
state = %{state | monitors: Map.put(state.monitors, pid, mref)}
29
40
{:noreply, state}
30
41
end
31
42
32
32
-
def handle_info({:DOWN, _ref, :process, _pid, _reason}, state) do
33
33
-
Trinity.Scheduler.perform_next_task(state)
43
43
+
def handle_info({:DOWN, _ref, :process, pid, reason}, %State{} = state) do
44
44
+
killed = Trinity.Scheduler.handle_down(state.sim, pid, reason)
45
45
+
killed = [pid | killed]
46
46
+
47
47
+
monitors =
48
48
+
Enum.reduce(killed, state.monitors, fn p, acc ->
49
49
+
{mref, acc} = Map.pop!(acc, p)
50
50
+
Process.demonitor(mref, [:flush])
51
51
+
acc
52
52
+
end)
53
53
+
54
54
+
state = %{state | monitors: monitors}
34
55
{:noreply, state}
35
56
end
36
57
end
···
44
65
def start do
45
66
sim = %Simulation{
46
67
queue: :ets.new(__MODULE__, [:ordered_set, :public]),
68
68
+
proc_queue_keys: :ets.new(__MODULE__, [:set, :public]),
47
69
proc_links: :ets.new(__MODULE__, [:set, :public]),
48
70
now: :atomics.new(1, signed: false),
49
71
supervisor_pid: nil,
···
60
82
61
83
@spec yield(non_neg_integer) :: :ok
62
84
def yield(delay \\ 0) do
63
63
-
%Simulation{queue: queue, now: now} = get_sim()
64
64
-
yield_ref = enqueue_self(queue, now, delay)
85
85
+
%Simulation{queue: queue, proc_queue_keys: proc_queue_keys, now: now} = get_sim()
86
86
+
yield_ref = enqueue_self(queue, proc_queue_keys, now, delay)
65
87
66
88
perform_next(queue, now)
67
89
···
85
107
# We then enqueue the new process and unsuspend the parent
86
108
# so that it can register any links before yielding back
87
109
# to us (eventually, if there are others in queue at `time=now`)
88
88
-
yield_ref = enqueue_self(sim.queue, sim.now, 0)
110
110
+
yield_ref = enqueue_self(sim.queue, sim.proc_queue_keys, sim.now, 0)
89
111
send parent_pid, spawn_ref
90
112
receive do
91
113
^yield_ref -> :noop
···
138
160
:ok
139
161
end
140
162
163
163
+
def handle_down(%Simulation{} = sim, _pid, :normal) do
164
164
+
perform_next(sim.queue, sim.now)
165
165
+
[]
166
166
+
end
167
167
+
168
168
+
def handle_down(%Simulation{} = sim, pid, reason) do
169
169
+
%Simulation{proc_links: proc_links} = sim
170
170
+
linked = gather_linked(pid, proc_links)
171
171
+
172
172
+
Enum.each(linked, fn pid ->
173
173
+
destroy_process(sim, pid)
174
174
+
Process.exit(pid, reason)
175
175
+
end)
176
176
+
177
177
+
perform_next(sim.queue, sim.now)
178
178
+
linked
179
179
+
end
180
180
+
141
181
defp perform_next(queue, now) do
142
182
case pop_next(queue) do
143
183
{time, {:resume, pid, ref}} ->
···
156
196
end
157
197
end
158
198
159
159
-
defp enqueue_self(queue, now, delay) do
199
199
+
defp enqueue_self(queue, proc_queue_keys, now, delay) do
160
200
ref = make_ref()
161
201
time = read_now(now) + delay
162
202
163
163
-
entry = {:resume, self(), ref}
203
203
+
pid = self()
204
204
+
entry = {:resume, pid, ref}
164
205
165
206
i = case :ets.prev(queue, {time, :infinity}) do
166
166
-
[{^time, prev}] -> prev + 1
207
207
+
{^time, prev} -> prev + 1
167
208
_ -> 0
168
209
end
210
210
+
queue_key = {time, i}
169
211
:ets.insert(queue, {{time, i}, entry})
212
212
+
:ets.insert(proc_queue_keys, {pid, queue_key})
170
213
171
214
ref
172
215
end
173
216
174
174
-
defp read_now(now), do: :atomics.get(now, 1)
175
175
-
defp set_now(now, time), do: :atomics.put(now, 1, time)
217
217
+
defp destroy_process(%Simulation{} = sim, pid) do
218
218
+
%Simulation{
219
219
+
queue: queue,
220
220
+
proc_links: proc_links,
221
221
+
proc_queue_keys: proc_queue_keys,
222
222
+
} = sim
223
223
+
224
224
+
destroy_links(proc_links, pid)
176
225
177
177
-
# Used by SimulationSupervisor
178
178
-
@doc false
179
179
-
def perform_next_task(%Simulation{queue: queue, now: now}) do
180
180
-
perform_next(queue, now)
226
226
+
case :ets.lookup(proc_queue_keys, pid) do
227
227
+
[{^pid, qk}] ->
228
228
+
:ets.delete(queue, qk)
229
229
+
:ets.delete(proc_queue_keys, pid)
230
230
+
_ -> :noop
231
231
+
end
181
232
end
233
233
+
234
234
+
defp destroy_links(proc_links, from_pid) do
235
235
+
case :ets.lookup(proc_links, from_pid) do
236
236
+
[{^from_pid, from_links}] ->
237
237
+
# Remove `from_pid` from each of its links' links
238
238
+
Enum.each(from_links, fn to_pid ->
239
239
+
[{^to_pid, to_links}] = :ets.lookup(proc_links, to_pid)
240
240
+
:ets.insert(proc_links, {to_pid, List.delete(to_links, from_pid)})
241
241
+
end)
242
242
+
243
243
+
# Delete `from_pid`'s links entry
244
244
+
:ets.delete(proc_links, from_pid)
245
245
+
246
246
+
# No links, do nothing
247
247
+
_ -> :noop
248
248
+
end
249
249
+
end
250
250
+
251
251
+
defp gather_linked(pid, proc_links) do
252
252
+
# TODO: there must be a nicer way to structure this
253
253
+
# (especially the uniq)
254
254
+
gather_linked(pid, proc_links, [pid])
255
255
+
|> Enum.uniq()
256
256
+
|> List.delete(pid)
257
257
+
end
258
258
+
259
259
+
defp gather_linked(pid, proc_links, acc) do
260
260
+
case :ets.lookup(proc_links, pid) do
261
261
+
[{^pid, links}] ->
262
262
+
links = Enum.filter(links, &(&1 not in acc))
263
263
+
acc = links ++ acc
264
264
+
265
265
+
links_of_links =
266
266
+
Enum.reduce(links, acc, fn p, acc ->
267
267
+
gather_linked(p, proc_links, acc)
268
268
+
|> Enum.uniq()
269
269
+
end)
270
270
+
271
271
+
links_of_links ++ acc
272
272
+
273
273
+
_ -> acc
274
274
+
end
275
275
+
end
276
276
+
277
277
+
defp read_now(now), do: :atomics.get(now, 1)
278
278
+
defp set_now(now, time), do: :atomics.put(now, 1, time)
182
279
183
280
@doc false
184
281
def dump do
+11
-5
test/trinity_test.exs
reviewed
···
6
6
test "scheduler" do
7
7
Scheduler.start()
8
8
9
9
-
pid1 = Scheduler.spawn_and_yield(fn -> dbg "hello 1" end, true)
10
10
-
pid2 = Scheduler.spawn_and_yield(fn -> dbg "hello 2" end, false)
11
11
-
pid3 = Scheduler.spawn_and_yield(fn -> dbg "hello 3" end, true)
12
12
-
dbg {pid1, pid2, pid3}
9
9
+
Scheduler.spawn_and_yield(fn ->
10
10
+
Enum.each(1..100, fn i ->
11
11
+
Scheduler.spawn_and_yield(fn ->
12
12
+
dbg {self(), "proc #{i}"}
13
13
+
Scheduler.yield(1_000_000)
14
14
+
end, true)
15
15
+
end)
16
16
+
Scheduler.yield(1000)
17
17
+
raise "foobar"
18
18
+
end, false)
13
19
14
14
-
Scheduler.yield(100)
20
20
+
Scheduler.yield(2000)
15
21
dbg Scheduler.dump()
16
22
end
17
23
end