"""Interactive CLI REPL for the OR1 Monitor using cmd.Cmd. Provides a command-line interface for interactive simulation control, with commands for loading programs, stepping/running simulation, injecting tokens, and inspecting state. """ from __future__ import annotations import cmd import sys from io import StringIO from pathlib import Path from cm_inst import MemOp from monitor.backend import SimulationBackend from monitor.commands import ( ErrorResult, GraphLoaded, InjectCmd, LoadCmd, ResetCmd, RunUntilCmd, SendCmd, StepEventCmd, StepResult, StepTickCmd, ) from monitor.formatting import ( format_event, format_pe_state, format_sm_state, format_snapshot_summary, format_step_result, ) from tokens import MonadToken, SMToken class MonitorREPL(cmd.Cmd): """Interactive REPL for simulation control. Subclasses cmd.Cmd to provide a user-friendly interface to the SimulationBackend. Holds reference to backend and maintains event history and state snapshots. Attributes: backend: SimulationBackend instance _event_history: List of all SimEvents received so far _loaded: True if a simulation is currently loaded _last_snapshot: Most recent StateSnapshot (from load or step) _last_sim_time: Current simulation time """ intro = "OR1 Monitor. Type 'help' for commands." prompt = "or1> " def __init__(self, backend: SimulationBackend, **kwargs): super().__init__(**kwargs) self.backend = backend self._event_history: list = [] self._loaded = False self._last_snapshot = None self._last_sim_time = 0.0 def _check_loaded(self) -> bool: """Check if simulation is loaded, print error if not. Returns: True if loaded, False otherwise """ if not self._loaded: print("No simulation loaded. Use 'load ' first.", file=self.stdout) return False return True def do_load(self, arg: str) -> None: """Load a dfasm program: load Reads the specified file and assembles it into a fresh simulation. On success, sets _loaded=True and displays initial state. On error, displays error messages. Args: arg: Path to .dfasm file """ if not arg: print("Usage: load ", file=self.stdout) return path = Path(arg) if not path.exists(): print(f"Error: file not found: {path}", file=self.stdout) return try: source = path.read_text() except Exception as e: print(f"Error reading file: {e}", file=self.stdout) return try: result = self.backend.send_command(LoadCmd(source=source)) except Exception as e: print(f"Error sending command: {e}", file=self.stdout) return if isinstance(result, ErrorResult): print(f"Error loading program: {result.message}", file=self.stdout) if result.errors: for err in result.errors: print(f" {err}", file=self.stdout) return if isinstance(result, GraphLoaded): self._loaded = True self._last_snapshot = result.snapshot self._last_sim_time = result.snapshot.sim_time self._event_history.clear() print(f"Loaded {path}", file=self.stdout) print("", file=self.stdout) # Print initial state summary print(format_snapshot_summary(result.snapshot), file=self.stdout) def do_step(self, arg: str) -> None: """Step by one tick (all events at current time): step Processes all events at the current simulation time and returns. Args: arg: Unused (for cmd.Cmd compatibility) """ if not self._check_loaded(): return try: result = self.backend.send_command(StepTickCmd()) except Exception as e: print(f"Error: {e}", file=self.stdout) return if isinstance(result, StepResult): self._event_history.extend(result.events) if result.snapshot: self._last_snapshot = result.snapshot self._last_sim_time = result.snapshot.sim_time print(format_step_result(result), file=self.stdout) def do_stepe(self, arg: str) -> None: """Step by one event: stepe Processes exactly one event and returns. Args: arg: Unused (for cmd.Cmd compatibility) """ if not self._check_loaded(): return try: result = self.backend.send_command(StepEventCmd()) except Exception as e: print(f"Error: {e}", file=self.stdout) return if isinstance(result, StepResult): self._event_history.extend(result.events) if result.snapshot: self._last_snapshot = result.snapshot self._last_sim_time = result.snapshot.sim_time print(format_step_result(result), file=self.stdout) def do_run(self, arg: str) -> None: """Run until target time: run Runs the simulation continuously until reaching the specified simulation time, batching events per tick. Args: arg: Target simulation time (float) """ if not self._check_loaded(): return if not arg: print("Usage: run ", file=self.stdout) return try: until = float(arg) except ValueError: print(f"Invalid time: {arg}", file=self.stdout) return try: result = self.backend.send_command(RunUntilCmd(until=until)) except Exception as e: print(f"Error: {e}", file=self.stdout) return if isinstance(result, StepResult): self._event_history.extend(result.events) if result.snapshot: self._last_snapshot = result.snapshot self._last_sim_time = result.snapshot.sim_time # Print summary with event count event_count = len(result.events) print(f"Ran {event_count} events", file=self.stdout) print(format_step_result(result), file=self.stdout) def do_send(self, arg: str) -> None: """Send token to target: send [--sm ] For MonadToken: send For SMToken: send --sm [] The --sm flag switches to SMToken mode. op is parsed from MemOp names (READ, WRITE, etc). Args: arg: Command arguments as described above """ if not self._check_loaded(): return if not arg: print("Usage: send [--sm ]", file=self.stdout) return try: token = self._parse_token_args(arg) except ValueError as e: print(f"Error parsing token: {e}", file=self.stdout) return try: result = self.backend.send_command(SendCmd(token=token)) except Exception as e: print(f"Error: {e}", file=self.stdout) return if isinstance(result, StepResult): self._event_history.extend(result.events) if result.snapshot: self._last_snapshot = result.snapshot self._last_sim_time = result.snapshot.sim_time print(f"Sent {token}", file=self.stdout) def do_inject(self, arg: str) -> None: """Inject token directly (no backpressure): inject [--sm ] For MonadToken: inject For SMToken: inject --sm [] The --sm flag switches to SMToken mode. op is parsed from MemOp names (READ, WRITE, etc). Args: arg: Command arguments as described above """ if not self._check_loaded(): return if not arg: print("Usage: inject [--sm ]", file=self.stdout) return try: token = self._parse_token_args(arg) except ValueError as e: print(f"Error parsing token: {e}", file=self.stdout) return try: result = self.backend.send_command(InjectCmd(token=token)) except Exception as e: print(f"Error: {e}", file=self.stdout) return if isinstance(result, StepResult): self._event_history.extend(result.events) if result.snapshot: self._last_snapshot = result.snapshot self._last_sim_time = result.snapshot.sim_time print(f"Injected {token}", file=self.stdout) def _parse_token_args(self, arg: str) -> MonadToken | SMToken: """Parse token arguments from command line. Supports two modes: - MonadToken: [--sm ...] - SMToken: --sm [] Args: arg: Command arguments string Returns: Constructed token (MonadToken or SMToken) Raises: ValueError: If parsing fails """ parts = arg.split() if not parts: raise ValueError("Expected at least target") target = int(parts[0]) # Check for --sm flag if "--sm" in parts: sm_idx = parts.index("--sm") if sm_idx + 2 >= len(parts): raise ValueError("--sm requires []") addr = int(parts[sm_idx + 1]) op_name = parts[sm_idx + 2].upper() # Parse MemOp by name try: op = MemOp[op_name] except KeyError: raise ValueError(f"Unknown MemOp: {op_name}") # Optional data data = None if sm_idx + 3 < len(parts): data = int(parts[sm_idx + 3]) return SMToken(target=target, addr=addr, op=op, flags=None, data=data, ret=None) else: # MonadToken mode if len(parts) < 4: raise ValueError("MonadToken requires ") offset = int(parts[1]) act_id = int(parts[2]) data = int(parts[3]) return MonadToken(target=target, offset=offset, act_id=act_id, data=data, inline=False) def do_state(self, arg: str) -> None: """Display current simulation state: state Shows full summary of current system state from the last snapshot. Args: arg: Unused (for cmd.Cmd compatibility) """ if not self._check_loaded(): return if self._last_snapshot is None: print("No snapshot available", file=self.stdout) return print(format_snapshot_summary(self._last_snapshot), file=self.stdout) def do_pe(self, arg: str) -> None: """Display PE state: pe Shows detailed state for the specified Processing Element. Args: arg: PE ID (integer) """ if not self._check_loaded(): return if not arg: print("Usage: pe ", file=self.stdout) return try: pe_id = int(arg) except ValueError: print(f"Invalid PE ID: {arg}", file=self.stdout) return if self._last_snapshot is None: print("No snapshot available", file=self.stdout) return if pe_id not in self._last_snapshot.pes: print(f"PE {pe_id} not found", file=self.stdout) return pe_snapshot = self._last_snapshot.pes[pe_id] print(format_pe_state(pe_snapshot), file=self.stdout) def do_sm(self, arg: str) -> None: """Display SM state: sm Shows detailed state for the specified Structure Memory. Args: arg: SM ID (integer) """ if not self._check_loaded(): return if not arg: print("Usage: sm ", file=self.stdout) return try: sm_id = int(arg) except ValueError: print(f"Invalid SM ID: {arg}", file=self.stdout) return if self._last_snapshot is None: print("No snapshot available", file=self.stdout) return if sm_id not in self._last_snapshot.sms: print(f"SM {sm_id} not found", file=self.stdout) return sm_snapshot = self._last_snapshot.sms[sm_id] print(format_sm_state(sm_snapshot), file=self.stdout) def do_log(self, arg: str) -> None: """Display recent events: log [filter] Shows all events from history, optionally filtered by component or type. Filter format: 'pe:0', 'sm:1', 'Matched', etc. Args: arg: Optional filter string """ if not self._event_history: print("No events in history", file=self.stdout) return filter_str = arg.strip() if arg else None events_to_show = self._event_history if filter_str: filtered = [] for event in self._event_history: # Filter by component (pe:0, sm:1) if ":" in filter_str: comp_type, comp_id = filter_str.split(":", 1) if comp_type.lower() in event.component.lower(): try: if str(int(comp_id)) in event.component: filtered.append(event) except ValueError: pass # Filter by event type name elif filter_str in type(event).__name__: filtered.append(event) events_to_show = filtered for event in events_to_show: print(format_event(event), file=self.stdout) if not events_to_show and filter_str: print(f"No events matching '{filter_str}'", file=self.stdout) def do_time(self, arg: str) -> None: """Display current simulation time: time Shows the current simulation time from the last snapshot. Args: arg: Unused (for cmd.Cmd compatibility) """ print(f"Simulation time: {self._last_sim_time}", file=self.stdout) def do_reset(self, arg: str) -> None: """Reset simulation state: reset Clears the current simulation and resets all state. The backend remains ready to load a new program. Args: arg: Unused (for cmd.Cmd compatibility) """ try: result = self.backend.send_command(ResetCmd(reload=False)) except Exception as e: print(f"Error: {e}", file=self.stdout) return self._loaded = False self._event_history.clear() self._last_snapshot = None self._last_sim_time = 0.0 print("Simulation reset", file=self.stdout) def do_quit(self, arg: str) -> bool: """Quit the monitor: quit Exits the REPL cleanly. Also triggered by Ctrl-D. Args: arg: Unused (for cmd.Cmd compatibility) Returns: True to exit cmdloop """ return True # Alias for Ctrl-D def do_EOF(self, arg: str) -> bool: """Exit on Ctrl-D""" return True def default(self, line: str) -> None: """Handle unknown commands.""" if line.strip(): print(f"Unknown command: {line}", file=self.stdout)