"""Simulation backend with command/result protocol and threading support. The SimulationBackend runs a dedicated thread that owns the SimPy environment. Commands arrive via queue.Queue, results are returned via another queue.Queue. Key features: - Thread-safe command/result protocol - Event collection via on_event callbacks - Tick-level and event-level stepping - Token injection and simulation control - State snapshots at any point """ from __future__ import annotations import logging import queue import threading from dataclasses import replace from typing import Optional import simpy from asm import run_pipeline from asm.codegen import generate_direct from asm.ir import IRGraph from emu.events import EventCallback, SimEvent from emu.network import System, build_topology from emu.types import PEConfig, SMConfig from monitor.commands import ( ErrorResult, GraphLoaded, InjectCmd, LoadCmd, ResetCmd, RunUntilCmd, SendCmd, SimCommand, StepEventCmd, StepResult, StepTickCmd, StopCmd, ) from monitor.snapshot import StateSnapshot, capture from tokens import Token logger = logging.getLogger(__name__) class SimulationBackend: """Manages a SimPy simulation environment in a dedicated thread. Commands are sent via send_command() and processed in a dedicated thread. Results are returned synchronously via queue communication. Attributes: _cmd_queue: Queue for incoming commands from caller _result_queue: Queue for outgoing results to caller _thread: The dedicated simulation thread _env: Current SimPy environment (None if not loaded) _system: Current System instance (None if not loaded) _ir_graph: Current IRGraph (None if not loaded) _last_source: Last loaded dfasm source (for reload on ResetCmd) _events: Thread-local event buffer for collecting SimEvent objects """ def __init__(self): self._cmd_queue: queue.Queue[SimCommand] = queue.Queue() self._result_queue: queue.Queue = queue.Queue() self._thread: Optional[threading.Thread] = None self._env: Optional[simpy.Environment] = None self._system: Optional[System] = None self._ir_graph: Optional[IRGraph] = None self._last_source: Optional[str] = None self._events: list[SimEvent] = [] def start(self): """Start the background simulation thread. Must be called before send_command(). """ self._thread = threading.Thread(target=self._run_loop, daemon=True) self._thread.start() def stop(self): """Stop the background simulation thread gracefully. Sends StopCmd and waits for thread to exit (timeout 5 seconds). """ self._cmd_queue.put(StopCmd()) if self._thread is not None: self._thread.join(timeout=5.0) def send_command(self, cmd: SimCommand, timeout: float | None = None): """Send a command to the backend and wait for result. Args: cmd: SimCommand instance (LoadCmd, StepTickCmd, etc.) timeout: Optional timeout in seconds for result retrieval Returns: Result dataclass (GraphLoaded, StepResult, or ErrorResult) Raises: queue.Empty: If timeout expires before result arrives """ self._cmd_queue.put(cmd) return self._result_queue.get(timeout=timeout) def _on_event(self, event: SimEvent): """Event callback registered with all PEs and SMs. Appends events to the thread-local buffer for collection during steps. Args: event: SimEvent from a PE or SM """ self._events.append(event) def _run_loop(self): """Main loop of the simulation thread. Processes commands from _cmd_queue until StopCmd is received. Exceptions are caught and wrapped in ErrorResult. """ while True: cmd = self._cmd_queue.get() if isinstance(cmd, StopCmd): break try: result = self._dispatch(cmd) except Exception as e: logger.exception("Backend error processing %s", type(cmd).__name__) result = ErrorResult(message=str(e)) self._result_queue.put(result) def _dispatch(self, cmd: SimCommand): """Dispatch command to appropriate handler. Args: cmd: Command instance Returns: Result dataclass """ match cmd: case LoadCmd(source=source): return self._handle_load(source) case StepTickCmd(): return self._handle_step_tick() case StepEventCmd(): return self._handle_step_event() case RunUntilCmd(until=until): return self._handle_run_until(until) case InjectCmd(token=token): return self._handle_inject(token) case SendCmd(token=token): return self._handle_send(token) case ResetCmd(reload=reload): return self._handle_reset(reload) case _: return ErrorResult(message=f"unknown command type: {type(cmd).__name__}") def _handle_load(self, source: str) -> GraphLoaded | ErrorResult: """Load a dfasm program and set up the simulation. Runs the assembly pipeline, generates direct-mode configuration, builds the topology, and injects seed tokens. Args: source: dfasm source code as a string Returns: GraphLoaded on success, ErrorResult on failure Acceptance criteria: - or1-monitor.AC1.1: Valid program → GraphLoaded with IR and snapshot - or1-monitor.AC1.2: Callbacks wired into all PEs and SMs - or1-monitor.AC1.3: Invalid program → ErrorResult, backend still functional """ try: ir_graph = run_pipeline(source) except ValueError as e: return ErrorResult(message=str(e)) try: result = generate_direct(ir_graph) env = simpy.Environment() # Wire on_event callback into all PE and SM configs pe_configs = [replace(cfg, on_event=self._on_event) for cfg in result.pe_configs] sm_configs = [replace(cfg, on_event=self._on_event) for cfg in result.sm_configs] system = build_topology(env, pe_configs, sm_configs) # Inject setup tokens (IRAM writes, ALLOC, frame slot writes) for token in result.setup_tokens: system.inject(token) # Inject seed tokens for seed in result.seed_tokens: system.inject(seed) except Exception as e: return ErrorResult(message=str(e)) # Commit state atomically — only after everything succeeds self._events.clear() self._env = env self._system = system self._last_source = source self._ir_graph = ir_graph snapshot = capture(self._system) return GraphLoaded(ir_graph=ir_graph, snapshot=snapshot) def _handle_step_tick(self) -> StepResult: """Step the simulation by one tick (all events at current simulation time). Loops env.step() while env.peek() == env.now to process all events at the current simulation time before returning. Returns: StepResult with events, snapshot, sim_time, and finished flag Acceptance criteria: - or1-monitor.AC5.2: Processes all events at current time before returning - or1-monitor.AC5.5: Result contains events and snapshot - or1-monitor.AC5.6: Finished simulation handled without error """ if self._env is None or self._system is None: return StepResult(finished=True) self._events.clear() if self._env.peek() == float('inf'): return StepResult( snapshot=capture(self._system), sim_time=self._env.now, finished=True, ) current_time = self._env.peek() while self._env.peek() == current_time: self._env.step() return StepResult( events=tuple(self._events), snapshot=capture(self._system), sim_time=self._env.now, finished=self._env.peek() == float('inf'), ) def _handle_step_event(self) -> StepResult: """Step the simulation by exactly one event. Calls env.step() once and returns the result. Returns: StepResult with events, snapshot, sim_time, and finished flag Acceptance criteria: - or1-monitor.AC5.3: Processes exactly one env.step() - or1-monitor.AC5.5: Result contains events and snapshot """ if self._env is None or self._system is None: return StepResult(finished=True) self._events.clear() if self._env.peek() == float('inf'): return StepResult( snapshot=capture(self._system), sim_time=self._env.now, finished=True, ) self._env.step() return StepResult( events=tuple(self._events), snapshot=capture(self._system), sim_time=self._env.now, finished=self._env.peek() == float('inf'), ) def _handle_run_until(self, until: float) -> StepResult: """Run the simulation continuously until reaching a target simulation time. Batches events per tick to avoid flooding the result. Loops while env.peek() <= until, stepping all events at each time point. Args: until: Target simulation time Returns: StepResult with all accumulated events, final snapshot, and finished flag Acceptance criteria: - or1-monitor.AC5.4: Batches events per tick - or1-monitor.AC5.5: Result contains events and snapshot """ if self._env is None or self._system is None: return StepResult(finished=True) self._events.clear() all_events: list[SimEvent] = [] while self._env.peek() <= until and self._env.peek() != float('inf'): current_time = self._env.peek() self._events.clear() while self._env.peek() == current_time: self._env.step() all_events.extend(self._events) return StepResult( events=tuple(all_events), snapshot=capture(self._system), sim_time=self._env.now, finished=self._env.peek() == float('inf'), ) def _handle_inject(self, token: Token) -> StepResult: """Inject a token directly into the simulation (no backpressure). Args: token: Token to inject Returns: StepResult with snapshot and current state """ if self._system is None: return StepResult(finished=True) self._events.clear() self._system.inject(token) return StepResult( events=tuple(self._events), snapshot=capture(self._system), sim_time=self._env.now if self._env else 0.0, finished=self._env.peek() == float('inf') if self._env else True, ) def _handle_send(self, token: Token) -> StepResult: """Send a token via SimPy store.put() (respects backpressure). Creates a one-shot SimPy process and steps once to allow processing. Args: token: Token to send Returns: StepResult with events, snapshot, and state """ if self._env is None or self._system is None: return StepResult(finished=True) self._events.clear() def _sender(): yield from self._system.send(token) self._env.process(_sender()) # Step once to process the send event if self._env.peek() != float('inf'): self._env.step() return StepResult( events=tuple(self._events), snapshot=capture(self._system), sim_time=self._env.now, finished=self._env.peek() == float('inf'), ) def _handle_reset(self, reload: bool) -> StepResult | GraphLoaded | ErrorResult: """Reset the simulation (tear down current topology). Clears all state. If reload=True, reloads the last program. Args: reload: If True, reload the last program after reset Returns: StepResult if reload=False, GraphLoaded if reload=True, or ErrorResult Acceptance criteria: - or1-monitor.AC1.4: Reset tears down and leaves ready for new LoadCmd - or1-monitor.AC1.5: Reset with reload=True reloads program """ self._env = None self._system = None self._events.clear() self._ir_graph = None if reload and self._last_source is not None: return self._handle_load(self._last_source) return StepResult(sim_time=0.0, finished=True)