OR-1 dataflow CPU sketch
1"""Simulation backend with command/result protocol and threading support.
2
3The SimulationBackend runs a dedicated thread that owns the SimPy environment.
4Commands arrive via queue.Queue, results are returned via another queue.Queue.
5
6Key features:
7- Thread-safe command/result protocol
8- Event collection via on_event callbacks
9- Tick-level and event-level stepping
10- Token injection and simulation control
11- State snapshots at any point
12"""
13
14from __future__ import annotations
15
16import logging
17import queue
18import threading
19from dataclasses import replace
20from typing import Optional
21
22import simpy
23
24from asm import run_pipeline
25from asm.codegen import generate_direct
26from asm.ir import IRGraph
27from emu.events import EventCallback, SimEvent
28from emu.network import System, build_topology
29from emu.types import PEConfig, SMConfig
30from monitor.commands import (
31 ErrorResult, GraphLoaded, InjectCmd, LoadCmd, ResetCmd, RunUntilCmd,
32 SendCmd, SimCommand, StepEventCmd, StepResult, StepTickCmd, StopCmd,
33)
34from monitor.snapshot import StateSnapshot, capture
35from tokens import Token
36
37logger = logging.getLogger(__name__)
38
39
40class SimulationBackend:
41 """Manages a SimPy simulation environment in a dedicated thread.
42
43 Commands are sent via send_command() and processed in a dedicated thread.
44 Results are returned synchronously via queue communication.
45
46 Attributes:
47 _cmd_queue: Queue for incoming commands from caller
48 _result_queue: Queue for outgoing results to caller
49 _thread: The dedicated simulation thread
50 _env: Current SimPy environment (None if not loaded)
51 _system: Current System instance (None if not loaded)
52 _ir_graph: Current IRGraph (None if not loaded)
53 _last_source: Last loaded dfasm source (for reload on ResetCmd)
54 _events: Thread-local event buffer for collecting SimEvent objects
55 """
56
57 def __init__(self):
58 self._cmd_queue: queue.Queue[SimCommand] = queue.Queue()
59 self._result_queue: queue.Queue = queue.Queue()
60 self._thread: Optional[threading.Thread] = None
61 self._env: Optional[simpy.Environment] = None
62 self._system: Optional[System] = None
63 self._ir_graph: Optional[IRGraph] = None
64 self._last_source: Optional[str] = None
65 self._events: list[SimEvent] = []
66
67 def start(self):
68 """Start the background simulation thread.
69
70 Must be called before send_command().
71 """
72 self._thread = threading.Thread(target=self._run_loop, daemon=True)
73 self._thread.start()
74
75 def stop(self):
76 """Stop the background simulation thread gracefully.
77
78 Sends StopCmd and waits for thread to exit (timeout 5 seconds).
79 """
80 self._cmd_queue.put(StopCmd())
81 if self._thread is not None:
82 self._thread.join(timeout=5.0)
83
84 def send_command(self, cmd: SimCommand, timeout: float | None = None):
85 """Send a command to the backend and wait for result.
86
87 Args:
88 cmd: SimCommand instance (LoadCmd, StepTickCmd, etc.)
89 timeout: Optional timeout in seconds for result retrieval
90
91 Returns:
92 Result dataclass (GraphLoaded, StepResult, or ErrorResult)
93
94 Raises:
95 queue.Empty: If timeout expires before result arrives
96 """
97 self._cmd_queue.put(cmd)
98 return self._result_queue.get(timeout=timeout)
99
100 def _on_event(self, event: SimEvent):
101 """Event callback registered with all PEs and SMs.
102
103 Appends events to the thread-local buffer for collection during steps.
104
105 Args:
106 event: SimEvent from a PE or SM
107 """
108 self._events.append(event)
109
110 def _run_loop(self):
111 """Main loop of the simulation thread.
112
113 Processes commands from _cmd_queue until StopCmd is received.
114 Exceptions are caught and wrapped in ErrorResult.
115 """
116 while True:
117 cmd = self._cmd_queue.get()
118 if isinstance(cmd, StopCmd):
119 break
120 try:
121 result = self._dispatch(cmd)
122 except Exception as e:
123 logger.exception("Backend error processing %s", type(cmd).__name__)
124 result = ErrorResult(message=str(e))
125 self._result_queue.put(result)
126
127 def _dispatch(self, cmd: SimCommand):
128 """Dispatch command to appropriate handler.
129
130 Args:
131 cmd: Command instance
132
133 Returns:
134 Result dataclass
135 """
136 match cmd:
137 case LoadCmd(source=source):
138 return self._handle_load(source)
139 case StepTickCmd():
140 return self._handle_step_tick()
141 case StepEventCmd():
142 return self._handle_step_event()
143 case RunUntilCmd(until=until):
144 return self._handle_run_until(until)
145 case InjectCmd(token=token):
146 return self._handle_inject(token)
147 case SendCmd(token=token):
148 return self._handle_send(token)
149 case ResetCmd(reload=reload):
150 return self._handle_reset(reload)
151 case _:
152 return ErrorResult(message=f"unknown command type: {type(cmd).__name__}")
153
154 def _handle_load(self, source: str) -> GraphLoaded | ErrorResult:
155 """Load a dfasm program and set up the simulation.
156
157 Runs the assembly pipeline, generates direct-mode configuration,
158 builds the topology, and injects seed tokens.
159
160 Args:
161 source: dfasm source code as a string
162
163 Returns:
164 GraphLoaded on success, ErrorResult on failure
165
166 Acceptance criteria:
167 - or1-monitor.AC1.1: Valid program → GraphLoaded with IR and snapshot
168 - or1-monitor.AC1.2: Callbacks wired into all PEs and SMs
169 - or1-monitor.AC1.3: Invalid program → ErrorResult, backend still functional
170 """
171 try:
172 ir_graph = run_pipeline(source)
173 except ValueError as e:
174 return ErrorResult(message=str(e))
175
176 try:
177 result = generate_direct(ir_graph)
178 env = simpy.Environment()
179
180 # Wire on_event callback into all PE and SM configs
181 pe_configs = [replace(cfg, on_event=self._on_event) for cfg in result.pe_configs]
182 sm_configs = [replace(cfg, on_event=self._on_event) for cfg in result.sm_configs]
183
184 system = build_topology(env, pe_configs, sm_configs)
185
186 # Inject setup tokens (IRAM writes, ALLOC, frame slot writes)
187 for token in result.setup_tokens:
188 system.inject(token)
189
190 # Inject seed tokens
191 for seed in result.seed_tokens:
192 system.inject(seed)
193 except Exception as e:
194 return ErrorResult(message=str(e))
195
196 # Commit state atomically — only after everything succeeds
197 self._events.clear()
198 self._env = env
199 self._system = system
200 self._last_source = source
201 self._ir_graph = ir_graph
202
203 snapshot = capture(self._system)
204 return GraphLoaded(ir_graph=ir_graph, snapshot=snapshot)
205
206 def _handle_step_tick(self) -> StepResult:
207 """Step the simulation by one tick (all events at current simulation time).
208
209 Loops env.step() while env.peek() == env.now to process all events
210 at the current simulation time before returning.
211
212 Returns:
213 StepResult with events, snapshot, sim_time, and finished flag
214
215 Acceptance criteria:
216 - or1-monitor.AC5.2: Processes all events at current time before returning
217 - or1-monitor.AC5.5: Result contains events and snapshot
218 - or1-monitor.AC5.6: Finished simulation handled without error
219 """
220 if self._env is None or self._system is None:
221 return StepResult(finished=True)
222
223 self._events.clear()
224
225 if self._env.peek() == float('inf'):
226 return StepResult(
227 snapshot=capture(self._system),
228 sim_time=self._env.now,
229 finished=True,
230 )
231
232 current_time = self._env.peek()
233 while self._env.peek() == current_time:
234 self._env.step()
235
236 return StepResult(
237 events=tuple(self._events),
238 snapshot=capture(self._system),
239 sim_time=self._env.now,
240 finished=self._env.peek() == float('inf'),
241 )
242
243 def _handle_step_event(self) -> StepResult:
244 """Step the simulation by exactly one event.
245
246 Calls env.step() once and returns the result.
247
248 Returns:
249 StepResult with events, snapshot, sim_time, and finished flag
250
251 Acceptance criteria:
252 - or1-monitor.AC5.3: Processes exactly one env.step()
253 - or1-monitor.AC5.5: Result contains events and snapshot
254 """
255 if self._env is None or self._system is None:
256 return StepResult(finished=True)
257
258 self._events.clear()
259
260 if self._env.peek() == float('inf'):
261 return StepResult(
262 snapshot=capture(self._system),
263 sim_time=self._env.now,
264 finished=True,
265 )
266
267 self._env.step()
268
269 return StepResult(
270 events=tuple(self._events),
271 snapshot=capture(self._system),
272 sim_time=self._env.now,
273 finished=self._env.peek() == float('inf'),
274 )
275
276 def _handle_run_until(self, until: float) -> StepResult:
277 """Run the simulation continuously until reaching a target simulation time.
278
279 Batches events per tick to avoid flooding the result. Loops while
280 env.peek() <= until, stepping all events at each time point.
281
282 Args:
283 until: Target simulation time
284
285 Returns:
286 StepResult with all accumulated events, final snapshot, and finished flag
287
288 Acceptance criteria:
289 - or1-monitor.AC5.4: Batches events per tick
290 - or1-monitor.AC5.5: Result contains events and snapshot
291 """
292 if self._env is None or self._system is None:
293 return StepResult(finished=True)
294
295 self._events.clear()
296 all_events: list[SimEvent] = []
297
298 while self._env.peek() <= until and self._env.peek() != float('inf'):
299 current_time = self._env.peek()
300 self._events.clear()
301 while self._env.peek() == current_time:
302 self._env.step()
303 all_events.extend(self._events)
304
305 return StepResult(
306 events=tuple(all_events),
307 snapshot=capture(self._system),
308 sim_time=self._env.now,
309 finished=self._env.peek() == float('inf'),
310 )
311
312 def _handle_inject(self, token: Token) -> StepResult:
313 """Inject a token directly into the simulation (no backpressure).
314
315 Args:
316 token: Token to inject
317
318 Returns:
319 StepResult with snapshot and current state
320 """
321 if self._system is None:
322 return StepResult(finished=True)
323
324 self._events.clear()
325 self._system.inject(token)
326
327 return StepResult(
328 events=tuple(self._events),
329 snapshot=capture(self._system),
330 sim_time=self._env.now if self._env else 0.0,
331 finished=self._env.peek() == float('inf') if self._env else True,
332 )
333
334 def _handle_send(self, token: Token) -> StepResult:
335 """Send a token via SimPy store.put() (respects backpressure).
336
337 Creates a one-shot SimPy process and steps once to allow processing.
338
339 Args:
340 token: Token to send
341
342 Returns:
343 StepResult with events, snapshot, and state
344 """
345 if self._env is None or self._system is None:
346 return StepResult(finished=True)
347
348 self._events.clear()
349
350 def _sender():
351 yield from self._system.send(token)
352
353 self._env.process(_sender())
354 # Step once to process the send event
355 if self._env.peek() != float('inf'):
356 self._env.step()
357
358 return StepResult(
359 events=tuple(self._events),
360 snapshot=capture(self._system),
361 sim_time=self._env.now,
362 finished=self._env.peek() == float('inf'),
363 )
364
365 def _handle_reset(self, reload: bool) -> StepResult | GraphLoaded | ErrorResult:
366 """Reset the simulation (tear down current topology).
367
368 Clears all state. If reload=True, reloads the last program.
369
370 Args:
371 reload: If True, reload the last program after reset
372
373 Returns:
374 StepResult if reload=False, GraphLoaded if reload=True, or ErrorResult
375
376 Acceptance criteria:
377 - or1-monitor.AC1.4: Reset tears down and leaves ready for new LoadCmd
378 - or1-monitor.AC1.5: Reset with reload=True reloads program
379 """
380 self._env = None
381 self._system = None
382 self._events.clear()
383 self._ir_graph = None
384
385 if reload and self._last_source is not None:
386 return self._handle_load(self._last_source)
387
388 return StepResult(sim_time=0.0, finished=True)