OR-1 dataflow CPU sketch
at main 511 lines 16 kB view raw
1"""Interactive CLI REPL for the OR1 Monitor using cmd.Cmd. 2 3Provides a command-line interface for interactive simulation control, 4with commands for loading programs, stepping/running simulation, 5injecting tokens, and inspecting state. 6""" 7 8from __future__ import annotations 9 10import cmd 11import sys 12from io import StringIO 13from pathlib import Path 14 15from cm_inst import MemOp 16from monitor.backend import SimulationBackend 17from monitor.commands import ( 18 ErrorResult, 19 GraphLoaded, 20 InjectCmd, 21 LoadCmd, 22 ResetCmd, 23 RunUntilCmd, 24 SendCmd, 25 StepEventCmd, 26 StepResult, 27 StepTickCmd, 28) 29from monitor.formatting import ( 30 format_event, 31 format_pe_state, 32 format_sm_state, 33 format_snapshot_summary, 34 format_step_result, 35) 36from tokens import MonadToken, SMToken 37 38 39class MonitorREPL(cmd.Cmd): 40 """Interactive REPL for simulation control. 41 42 Subclasses cmd.Cmd to provide a user-friendly interface to the 43 SimulationBackend. Holds reference to backend and maintains 44 event history and state snapshots. 45 46 Attributes: 47 backend: SimulationBackend instance 48 _event_history: List of all SimEvents received so far 49 _loaded: True if a simulation is currently loaded 50 _last_snapshot: Most recent StateSnapshot (from load or step) 51 _last_sim_time: Current simulation time 52 """ 53 54 intro = "OR1 Monitor. Type 'help' for commands." 55 prompt = "or1> " 56 57 def __init__(self, backend: SimulationBackend, **kwargs): 58 super().__init__(**kwargs) 59 self.backend = backend 60 self._event_history: list = [] 61 self._loaded = False 62 self._last_snapshot = None 63 self._last_sim_time = 0.0 64 65 def _check_loaded(self) -> bool: 66 """Check if simulation is loaded, print error if not. 67 68 Returns: 69 True if loaded, False otherwise 70 """ 71 if not self._loaded: 72 print("No simulation loaded. Use 'load <path>' first.", file=self.stdout) 73 return False 74 return True 75 76 def do_load(self, arg: str) -> None: 77 """Load a dfasm program: load <path> 78 79 Reads the specified file and assembles it into a fresh simulation. 80 On success, sets _loaded=True and displays initial state. 81 On error, displays error messages. 82 83 Args: 84 arg: Path to .dfasm file 85 """ 86 if not arg: 87 print("Usage: load <path>", file=self.stdout) 88 return 89 90 path = Path(arg) 91 if not path.exists(): 92 print(f"Error: file not found: {path}", file=self.stdout) 93 return 94 95 try: 96 source = path.read_text() 97 except Exception as e: 98 print(f"Error reading file: {e}", file=self.stdout) 99 return 100 101 try: 102 result = self.backend.send_command(LoadCmd(source=source)) 103 except Exception as e: 104 print(f"Error sending command: {e}", file=self.stdout) 105 return 106 107 if isinstance(result, ErrorResult): 108 print(f"Error loading program: {result.message}", file=self.stdout) 109 if result.errors: 110 for err in result.errors: 111 print(f" {err}", file=self.stdout) 112 return 113 114 if isinstance(result, GraphLoaded): 115 self._loaded = True 116 self._last_snapshot = result.snapshot 117 self._last_sim_time = result.snapshot.sim_time 118 self._event_history.clear() 119 print(f"Loaded {path}", file=self.stdout) 120 print("", file=self.stdout) 121 # Print initial state summary 122 print(format_snapshot_summary(result.snapshot), file=self.stdout) 123 124 def do_step(self, arg: str) -> None: 125 """Step by one tick (all events at current time): step 126 127 Processes all events at the current simulation time and returns. 128 129 Args: 130 arg: Unused (for cmd.Cmd compatibility) 131 """ 132 if not self._check_loaded(): 133 return 134 135 try: 136 result = self.backend.send_command(StepTickCmd()) 137 except Exception as e: 138 print(f"Error: {e}", file=self.stdout) 139 return 140 141 if isinstance(result, StepResult): 142 self._event_history.extend(result.events) 143 if result.snapshot: 144 self._last_snapshot = result.snapshot 145 self._last_sim_time = result.snapshot.sim_time 146 print(format_step_result(result), file=self.stdout) 147 148 def do_stepe(self, arg: str) -> None: 149 """Step by one event: stepe 150 151 Processes exactly one event and returns. 152 153 Args: 154 arg: Unused (for cmd.Cmd compatibility) 155 """ 156 if not self._check_loaded(): 157 return 158 159 try: 160 result = self.backend.send_command(StepEventCmd()) 161 except Exception as e: 162 print(f"Error: {e}", file=self.stdout) 163 return 164 165 if isinstance(result, StepResult): 166 self._event_history.extend(result.events) 167 if result.snapshot: 168 self._last_snapshot = result.snapshot 169 self._last_sim_time = result.snapshot.sim_time 170 print(format_step_result(result), file=self.stdout) 171 172 def do_run(self, arg: str) -> None: 173 """Run until target time: run <until> 174 175 Runs the simulation continuously until reaching the specified 176 simulation time, batching events per tick. 177 178 Args: 179 arg: Target simulation time (float) 180 """ 181 if not self._check_loaded(): 182 return 183 184 if not arg: 185 print("Usage: run <until>", file=self.stdout) 186 return 187 188 try: 189 until = float(arg) 190 except ValueError: 191 print(f"Invalid time: {arg}", file=self.stdout) 192 return 193 194 try: 195 result = self.backend.send_command(RunUntilCmd(until=until)) 196 except Exception as e: 197 print(f"Error: {e}", file=self.stdout) 198 return 199 200 if isinstance(result, StepResult): 201 self._event_history.extend(result.events) 202 if result.snapshot: 203 self._last_snapshot = result.snapshot 204 self._last_sim_time = result.snapshot.sim_time 205 206 # Print summary with event count 207 event_count = len(result.events) 208 print(f"Ran {event_count} events", file=self.stdout) 209 print(format_step_result(result), file=self.stdout) 210 211 def do_send(self, arg: str) -> None: 212 """Send token to target: send <target> <offset> <act_id> <data> [--sm <addr> <op>] 213 214 For MonadToken: send <target> <offset> <act_id> <data> 215 For SMToken: send <target> --sm <addr> <op> [<data>] 216 217 The --sm flag switches to SMToken mode. op is parsed from MemOp names (READ, WRITE, etc). 218 219 Args: 220 arg: Command arguments as described above 221 """ 222 if not self._check_loaded(): 223 return 224 225 if not arg: 226 print("Usage: send <target> <offset> <act_id> <data> [--sm <addr> <op>]", file=self.stdout) 227 return 228 229 try: 230 token = self._parse_token_args(arg) 231 except ValueError as e: 232 print(f"Error parsing token: {e}", file=self.stdout) 233 return 234 235 try: 236 result = self.backend.send_command(SendCmd(token=token)) 237 except Exception as e: 238 print(f"Error: {e}", file=self.stdout) 239 return 240 241 if isinstance(result, StepResult): 242 self._event_history.extend(result.events) 243 if result.snapshot: 244 self._last_snapshot = result.snapshot 245 self._last_sim_time = result.snapshot.sim_time 246 print(f"Sent {token}", file=self.stdout) 247 248 def do_inject(self, arg: str) -> None: 249 """Inject token directly (no backpressure): inject <target> <offset> <act_id> <data> [--sm <addr> <op>] 250 251 For MonadToken: inject <target> <offset> <act_id> <data> 252 For SMToken: inject <target> --sm <addr> <op> [<data>] 253 254 The --sm flag switches to SMToken mode. op is parsed from MemOp names (READ, WRITE, etc). 255 256 Args: 257 arg: Command arguments as described above 258 """ 259 if not self._check_loaded(): 260 return 261 262 if not arg: 263 print("Usage: inject <target> <offset> <act_id> <data> [--sm <addr> <op>]", file=self.stdout) 264 return 265 266 try: 267 token = self._parse_token_args(arg) 268 except ValueError as e: 269 print(f"Error parsing token: {e}", file=self.stdout) 270 return 271 272 try: 273 result = self.backend.send_command(InjectCmd(token=token)) 274 except Exception as e: 275 print(f"Error: {e}", file=self.stdout) 276 return 277 278 if isinstance(result, StepResult): 279 self._event_history.extend(result.events) 280 if result.snapshot: 281 self._last_snapshot = result.snapshot 282 self._last_sim_time = result.snapshot.sim_time 283 print(f"Injected {token}", file=self.stdout) 284 285 def _parse_token_args(self, arg: str) -> MonadToken | SMToken: 286 """Parse token arguments from command line. 287 288 Supports two modes: 289 - MonadToken: <target> <offset> <act_id> <data> [--sm ...] 290 - SMToken: <target> --sm <addr> <op> [<data>] 291 292 Args: 293 arg: Command arguments string 294 295 Returns: 296 Constructed token (MonadToken or SMToken) 297 298 Raises: 299 ValueError: If parsing fails 300 """ 301 parts = arg.split() 302 if not parts: 303 raise ValueError("Expected at least target") 304 305 target = int(parts[0]) 306 307 # Check for --sm flag 308 if "--sm" in parts: 309 sm_idx = parts.index("--sm") 310 if sm_idx + 2 >= len(parts): 311 raise ValueError("--sm requires <addr> <op> [<data>]") 312 313 addr = int(parts[sm_idx + 1]) 314 op_name = parts[sm_idx + 2].upper() 315 316 # Parse MemOp by name 317 try: 318 op = MemOp[op_name] 319 except KeyError: 320 raise ValueError(f"Unknown MemOp: {op_name}") 321 322 # Optional data 323 data = None 324 if sm_idx + 3 < len(parts): 325 data = int(parts[sm_idx + 3]) 326 327 return SMToken(target=target, addr=addr, op=op, flags=None, data=data, ret=None) 328 else: 329 # MonadToken mode 330 if len(parts) < 4: 331 raise ValueError("MonadToken requires <target> <offset> <act_id> <data>") 332 333 offset = int(parts[1]) 334 act_id = int(parts[2]) 335 data = int(parts[3]) 336 337 return MonadToken(target=target, offset=offset, act_id=act_id, data=data, inline=False) 338 339 def do_state(self, arg: str) -> None: 340 """Display current simulation state: state 341 342 Shows full summary of current system state from the last snapshot. 343 344 Args: 345 arg: Unused (for cmd.Cmd compatibility) 346 """ 347 if not self._check_loaded(): 348 return 349 350 if self._last_snapshot is None: 351 print("No snapshot available", file=self.stdout) 352 return 353 354 print(format_snapshot_summary(self._last_snapshot), file=self.stdout) 355 356 def do_pe(self, arg: str) -> None: 357 """Display PE state: pe <id> 358 359 Shows detailed state for the specified Processing Element. 360 361 Args: 362 arg: PE ID (integer) 363 """ 364 if not self._check_loaded(): 365 return 366 367 if not arg: 368 print("Usage: pe <id>", file=self.stdout) 369 return 370 371 try: 372 pe_id = int(arg) 373 except ValueError: 374 print(f"Invalid PE ID: {arg}", file=self.stdout) 375 return 376 377 if self._last_snapshot is None: 378 print("No snapshot available", file=self.stdout) 379 return 380 381 if pe_id not in self._last_snapshot.pes: 382 print(f"PE {pe_id} not found", file=self.stdout) 383 return 384 385 pe_snapshot = self._last_snapshot.pes[pe_id] 386 print(format_pe_state(pe_snapshot), file=self.stdout) 387 388 def do_sm(self, arg: str) -> None: 389 """Display SM state: sm <id> 390 391 Shows detailed state for the specified Structure Memory. 392 393 Args: 394 arg: SM ID (integer) 395 """ 396 if not self._check_loaded(): 397 return 398 399 if not arg: 400 print("Usage: sm <id>", file=self.stdout) 401 return 402 403 try: 404 sm_id = int(arg) 405 except ValueError: 406 print(f"Invalid SM ID: {arg}", file=self.stdout) 407 return 408 409 if self._last_snapshot is None: 410 print("No snapshot available", file=self.stdout) 411 return 412 413 if sm_id not in self._last_snapshot.sms: 414 print(f"SM {sm_id} not found", file=self.stdout) 415 return 416 417 sm_snapshot = self._last_snapshot.sms[sm_id] 418 print(format_sm_state(sm_snapshot), file=self.stdout) 419 420 def do_log(self, arg: str) -> None: 421 """Display recent events: log [filter] 422 423 Shows all events from history, optionally filtered by component or type. 424 Filter format: 'pe:0', 'sm:1', 'Matched', etc. 425 426 Args: 427 arg: Optional filter string 428 """ 429 if not self._event_history: 430 print("No events in history", file=self.stdout) 431 return 432 433 filter_str = arg.strip() if arg else None 434 events_to_show = self._event_history 435 436 if filter_str: 437 filtered = [] 438 for event in self._event_history: 439 # Filter by component (pe:0, sm:1) 440 if ":" in filter_str: 441 comp_type, comp_id = filter_str.split(":", 1) 442 if comp_type.lower() in event.component.lower(): 443 try: 444 if str(int(comp_id)) in event.component: 445 filtered.append(event) 446 except ValueError: 447 pass 448 # Filter by event type name 449 elif filter_str in type(event).__name__: 450 filtered.append(event) 451 events_to_show = filtered 452 453 for event in events_to_show: 454 print(format_event(event), file=self.stdout) 455 456 if not events_to_show and filter_str: 457 print(f"No events matching '{filter_str}'", file=self.stdout) 458 459 def do_time(self, arg: str) -> None: 460 """Display current simulation time: time 461 462 Shows the current simulation time from the last snapshot. 463 464 Args: 465 arg: Unused (for cmd.Cmd compatibility) 466 """ 467 print(f"Simulation time: {self._last_sim_time}", file=self.stdout) 468 469 def do_reset(self, arg: str) -> None: 470 """Reset simulation state: reset 471 472 Clears the current simulation and resets all state. The backend 473 remains ready to load a new program. 474 475 Args: 476 arg: Unused (for cmd.Cmd compatibility) 477 """ 478 try: 479 result = self.backend.send_command(ResetCmd(reload=False)) 480 except Exception as e: 481 print(f"Error: {e}", file=self.stdout) 482 return 483 484 self._loaded = False 485 self._event_history.clear() 486 self._last_snapshot = None 487 self._last_sim_time = 0.0 488 print("Simulation reset", file=self.stdout) 489 490 def do_quit(self, arg: str) -> bool: 491 """Quit the monitor: quit 492 493 Exits the REPL cleanly. Also triggered by Ctrl-D. 494 495 Args: 496 arg: Unused (for cmd.Cmd compatibility) 497 498 Returns: 499 True to exit cmdloop 500 """ 501 return True 502 503 # Alias for Ctrl-D 504 def do_EOF(self, arg: str) -> bool: 505 """Exit on Ctrl-D""" 506 return True 507 508 def default(self, line: str) -> None: 509 """Handle unknown commands.""" 510 if line.strip(): 511 print(f"Unknown command: {line}", file=self.stdout)