OR-1 dataflow CPU sketch
at main 388 lines 13 kB view raw
1"""Simulation backend with command/result protocol and threading support. 2 3The SimulationBackend runs a dedicated thread that owns the SimPy environment. 4Commands arrive via queue.Queue, results are returned via another queue.Queue. 5 6Key features: 7- Thread-safe command/result protocol 8- Event collection via on_event callbacks 9- Tick-level and event-level stepping 10- Token injection and simulation control 11- State snapshots at any point 12""" 13 14from __future__ import annotations 15 16import logging 17import queue 18import threading 19from dataclasses import replace 20from typing import Optional 21 22import simpy 23 24from asm import run_pipeline 25from asm.codegen import generate_direct 26from asm.ir import IRGraph 27from emu.events import EventCallback, SimEvent 28from emu.network import System, build_topology 29from emu.types import PEConfig, SMConfig 30from monitor.commands import ( 31 ErrorResult, GraphLoaded, InjectCmd, LoadCmd, ResetCmd, RunUntilCmd, 32 SendCmd, SimCommand, StepEventCmd, StepResult, StepTickCmd, StopCmd, 33) 34from monitor.snapshot import StateSnapshot, capture 35from tokens import Token 36 37logger = logging.getLogger(__name__) 38 39 40class SimulationBackend: 41 """Manages a SimPy simulation environment in a dedicated thread. 42 43 Commands are sent via send_command() and processed in a dedicated thread. 44 Results are returned synchronously via queue communication. 45 46 Attributes: 47 _cmd_queue: Queue for incoming commands from caller 48 _result_queue: Queue for outgoing results to caller 49 _thread: The dedicated simulation thread 50 _env: Current SimPy environment (None if not loaded) 51 _system: Current System instance (None if not loaded) 52 _ir_graph: Current IRGraph (None if not loaded) 53 _last_source: Last loaded dfasm source (for reload on ResetCmd) 54 _events: Thread-local event buffer for collecting SimEvent objects 55 """ 56 57 def __init__(self): 58 self._cmd_queue: queue.Queue[SimCommand] = queue.Queue() 59 self._result_queue: queue.Queue = queue.Queue() 60 self._thread: Optional[threading.Thread] = None 61 self._env: Optional[simpy.Environment] = None 62 self._system: Optional[System] = None 63 self._ir_graph: Optional[IRGraph] = None 64 self._last_source: Optional[str] = None 65 self._events: list[SimEvent] = [] 66 67 def start(self): 68 """Start the background simulation thread. 69 70 Must be called before send_command(). 71 """ 72 self._thread = threading.Thread(target=self._run_loop, daemon=True) 73 self._thread.start() 74 75 def stop(self): 76 """Stop the background simulation thread gracefully. 77 78 Sends StopCmd and waits for thread to exit (timeout 5 seconds). 79 """ 80 self._cmd_queue.put(StopCmd()) 81 if self._thread is not None: 82 self._thread.join(timeout=5.0) 83 84 def send_command(self, cmd: SimCommand, timeout: float | None = None): 85 """Send a command to the backend and wait for result. 86 87 Args: 88 cmd: SimCommand instance (LoadCmd, StepTickCmd, etc.) 89 timeout: Optional timeout in seconds for result retrieval 90 91 Returns: 92 Result dataclass (GraphLoaded, StepResult, or ErrorResult) 93 94 Raises: 95 queue.Empty: If timeout expires before result arrives 96 """ 97 self._cmd_queue.put(cmd) 98 return self._result_queue.get(timeout=timeout) 99 100 def _on_event(self, event: SimEvent): 101 """Event callback registered with all PEs and SMs. 102 103 Appends events to the thread-local buffer for collection during steps. 104 105 Args: 106 event: SimEvent from a PE or SM 107 """ 108 self._events.append(event) 109 110 def _run_loop(self): 111 """Main loop of the simulation thread. 112 113 Processes commands from _cmd_queue until StopCmd is received. 114 Exceptions are caught and wrapped in ErrorResult. 115 """ 116 while True: 117 cmd = self._cmd_queue.get() 118 if isinstance(cmd, StopCmd): 119 break 120 try: 121 result = self._dispatch(cmd) 122 except Exception as e: 123 logger.exception("Backend error processing %s", type(cmd).__name__) 124 result = ErrorResult(message=str(e)) 125 self._result_queue.put(result) 126 127 def _dispatch(self, cmd: SimCommand): 128 """Dispatch command to appropriate handler. 129 130 Args: 131 cmd: Command instance 132 133 Returns: 134 Result dataclass 135 """ 136 match cmd: 137 case LoadCmd(source=source): 138 return self._handle_load(source) 139 case StepTickCmd(): 140 return self._handle_step_tick() 141 case StepEventCmd(): 142 return self._handle_step_event() 143 case RunUntilCmd(until=until): 144 return self._handle_run_until(until) 145 case InjectCmd(token=token): 146 return self._handle_inject(token) 147 case SendCmd(token=token): 148 return self._handle_send(token) 149 case ResetCmd(reload=reload): 150 return self._handle_reset(reload) 151 case _: 152 return ErrorResult(message=f"unknown command type: {type(cmd).__name__}") 153 154 def _handle_load(self, source: str) -> GraphLoaded | ErrorResult: 155 """Load a dfasm program and set up the simulation. 156 157 Runs the assembly pipeline, generates direct-mode configuration, 158 builds the topology, and injects seed tokens. 159 160 Args: 161 source: dfasm source code as a string 162 163 Returns: 164 GraphLoaded on success, ErrorResult on failure 165 166 Acceptance criteria: 167 - or1-monitor.AC1.1: Valid program → GraphLoaded with IR and snapshot 168 - or1-monitor.AC1.2: Callbacks wired into all PEs and SMs 169 - or1-monitor.AC1.3: Invalid program → ErrorResult, backend still functional 170 """ 171 try: 172 ir_graph = run_pipeline(source) 173 except ValueError as e: 174 return ErrorResult(message=str(e)) 175 176 try: 177 result = generate_direct(ir_graph) 178 env = simpy.Environment() 179 180 # Wire on_event callback into all PE and SM configs 181 pe_configs = [replace(cfg, on_event=self._on_event) for cfg in result.pe_configs] 182 sm_configs = [replace(cfg, on_event=self._on_event) for cfg in result.sm_configs] 183 184 system = build_topology(env, pe_configs, sm_configs) 185 186 # Inject setup tokens (IRAM writes, ALLOC, frame slot writes) 187 for token in result.setup_tokens: 188 system.inject(token) 189 190 # Inject seed tokens 191 for seed in result.seed_tokens: 192 system.inject(seed) 193 except Exception as e: 194 return ErrorResult(message=str(e)) 195 196 # Commit state atomically — only after everything succeeds 197 self._events.clear() 198 self._env = env 199 self._system = system 200 self._last_source = source 201 self._ir_graph = ir_graph 202 203 snapshot = capture(self._system) 204 return GraphLoaded(ir_graph=ir_graph, snapshot=snapshot) 205 206 def _handle_step_tick(self) -> StepResult: 207 """Step the simulation by one tick (all events at current simulation time). 208 209 Loops env.step() while env.peek() == env.now to process all events 210 at the current simulation time before returning. 211 212 Returns: 213 StepResult with events, snapshot, sim_time, and finished flag 214 215 Acceptance criteria: 216 - or1-monitor.AC5.2: Processes all events at current time before returning 217 - or1-monitor.AC5.5: Result contains events and snapshot 218 - or1-monitor.AC5.6: Finished simulation handled without error 219 """ 220 if self._env is None or self._system is None: 221 return StepResult(finished=True) 222 223 self._events.clear() 224 225 if self._env.peek() == float('inf'): 226 return StepResult( 227 snapshot=capture(self._system), 228 sim_time=self._env.now, 229 finished=True, 230 ) 231 232 current_time = self._env.peek() 233 while self._env.peek() == current_time: 234 self._env.step() 235 236 return StepResult( 237 events=tuple(self._events), 238 snapshot=capture(self._system), 239 sim_time=self._env.now, 240 finished=self._env.peek() == float('inf'), 241 ) 242 243 def _handle_step_event(self) -> StepResult: 244 """Step the simulation by exactly one event. 245 246 Calls env.step() once and returns the result. 247 248 Returns: 249 StepResult with events, snapshot, sim_time, and finished flag 250 251 Acceptance criteria: 252 - or1-monitor.AC5.3: Processes exactly one env.step() 253 - or1-monitor.AC5.5: Result contains events and snapshot 254 """ 255 if self._env is None or self._system is None: 256 return StepResult(finished=True) 257 258 self._events.clear() 259 260 if self._env.peek() == float('inf'): 261 return StepResult( 262 snapshot=capture(self._system), 263 sim_time=self._env.now, 264 finished=True, 265 ) 266 267 self._env.step() 268 269 return StepResult( 270 events=tuple(self._events), 271 snapshot=capture(self._system), 272 sim_time=self._env.now, 273 finished=self._env.peek() == float('inf'), 274 ) 275 276 def _handle_run_until(self, until: float) -> StepResult: 277 """Run the simulation continuously until reaching a target simulation time. 278 279 Batches events per tick to avoid flooding the result. Loops while 280 env.peek() <= until, stepping all events at each time point. 281 282 Args: 283 until: Target simulation time 284 285 Returns: 286 StepResult with all accumulated events, final snapshot, and finished flag 287 288 Acceptance criteria: 289 - or1-monitor.AC5.4: Batches events per tick 290 - or1-monitor.AC5.5: Result contains events and snapshot 291 """ 292 if self._env is None or self._system is None: 293 return StepResult(finished=True) 294 295 self._events.clear() 296 all_events: list[SimEvent] = [] 297 298 while self._env.peek() <= until and self._env.peek() != float('inf'): 299 current_time = self._env.peek() 300 self._events.clear() 301 while self._env.peek() == current_time: 302 self._env.step() 303 all_events.extend(self._events) 304 305 return StepResult( 306 events=tuple(all_events), 307 snapshot=capture(self._system), 308 sim_time=self._env.now, 309 finished=self._env.peek() == float('inf'), 310 ) 311 312 def _handle_inject(self, token: Token) -> StepResult: 313 """Inject a token directly into the simulation (no backpressure). 314 315 Args: 316 token: Token to inject 317 318 Returns: 319 StepResult with snapshot and current state 320 """ 321 if self._system is None: 322 return StepResult(finished=True) 323 324 self._events.clear() 325 self._system.inject(token) 326 327 return StepResult( 328 events=tuple(self._events), 329 snapshot=capture(self._system), 330 sim_time=self._env.now if self._env else 0.0, 331 finished=self._env.peek() == float('inf') if self._env else True, 332 ) 333 334 def _handle_send(self, token: Token) -> StepResult: 335 """Send a token via SimPy store.put() (respects backpressure). 336 337 Creates a one-shot SimPy process and steps once to allow processing. 338 339 Args: 340 token: Token to send 341 342 Returns: 343 StepResult with events, snapshot, and state 344 """ 345 if self._env is None or self._system is None: 346 return StepResult(finished=True) 347 348 self._events.clear() 349 350 def _sender(): 351 yield from self._system.send(token) 352 353 self._env.process(_sender()) 354 # Step once to process the send event 355 if self._env.peek() != float('inf'): 356 self._env.step() 357 358 return StepResult( 359 events=tuple(self._events), 360 snapshot=capture(self._system), 361 sim_time=self._env.now, 362 finished=self._env.peek() == float('inf'), 363 ) 364 365 def _handle_reset(self, reload: bool) -> StepResult | GraphLoaded | ErrorResult: 366 """Reset the simulation (tear down current topology). 367 368 Clears all state. If reload=True, reloads the last program. 369 370 Args: 371 reload: If True, reload the last program after reset 372 373 Returns: 374 StepResult if reload=False, GraphLoaded if reload=True, or ErrorResult 375 376 Acceptance criteria: 377 - or1-monitor.AC1.4: Reset tears down and leaves ready for new LoadCmd 378 - or1-monitor.AC1.5: Reset with reload=True reloads program 379 """ 380 self._env = None 381 self._system = None 382 self._events.clear() 383 self._ir_graph = None 384 385 if reload and self._last_source is not None: 386 return self._handle_load(self._last_source) 387 388 return StepResult(sim_time=0.0, finished=True)