OR-1 dataflow CPU sketch
1"""Interactive CLI REPL for the OR1 Monitor using cmd.Cmd.
2
3Provides a command-line interface for interactive simulation control,
4with commands for loading programs, stepping/running simulation,
5injecting tokens, and inspecting state.
6"""
7
8from __future__ import annotations
9
10import cmd
11import sys
12from io import StringIO
13from pathlib import Path
14
15from cm_inst import MemOp
16from monitor.backend import SimulationBackend
17from monitor.commands import (
18 ErrorResult,
19 GraphLoaded,
20 InjectCmd,
21 LoadCmd,
22 ResetCmd,
23 RunUntilCmd,
24 SendCmd,
25 StepEventCmd,
26 StepResult,
27 StepTickCmd,
28)
29from monitor.formatting import (
30 format_event,
31 format_pe_state,
32 format_sm_state,
33 format_snapshot_summary,
34 format_step_result,
35)
36from tokens import MonadToken, SMToken
37
38
39class MonitorREPL(cmd.Cmd):
40 """Interactive REPL for simulation control.
41
42 Subclasses cmd.Cmd to provide a user-friendly interface to the
43 SimulationBackend. Holds reference to backend and maintains
44 event history and state snapshots.
45
46 Attributes:
47 backend: SimulationBackend instance
48 _event_history: List of all SimEvents received so far
49 _loaded: True if a simulation is currently loaded
50 _last_snapshot: Most recent StateSnapshot (from load or step)
51 _last_sim_time: Current simulation time
52 """
53
54 intro = "OR1 Monitor. Type 'help' for commands."
55 prompt = "or1> "
56
57 def __init__(self, backend: SimulationBackend, **kwargs):
58 super().__init__(**kwargs)
59 self.backend = backend
60 self._event_history: list = []
61 self._loaded = False
62 self._last_snapshot = None
63 self._last_sim_time = 0.0
64
65 def _check_loaded(self) -> bool:
66 """Check if simulation is loaded, print error if not.
67
68 Returns:
69 True if loaded, False otherwise
70 """
71 if not self._loaded:
72 print("No simulation loaded. Use 'load <path>' first.", file=self.stdout)
73 return False
74 return True
75
76 def do_load(self, arg: str) -> None:
77 """Load a dfasm program: load <path>
78
79 Reads the specified file and assembles it into a fresh simulation.
80 On success, sets _loaded=True and displays initial state.
81 On error, displays error messages.
82
83 Args:
84 arg: Path to .dfasm file
85 """
86 if not arg:
87 print("Usage: load <path>", file=self.stdout)
88 return
89
90 path = Path(arg)
91 if not path.exists():
92 print(f"Error: file not found: {path}", file=self.stdout)
93 return
94
95 try:
96 source = path.read_text()
97 except Exception as e:
98 print(f"Error reading file: {e}", file=self.stdout)
99 return
100
101 try:
102 result = self.backend.send_command(LoadCmd(source=source))
103 except Exception as e:
104 print(f"Error sending command: {e}", file=self.stdout)
105 return
106
107 if isinstance(result, ErrorResult):
108 print(f"Error loading program: {result.message}", file=self.stdout)
109 if result.errors:
110 for err in result.errors:
111 print(f" {err}", file=self.stdout)
112 return
113
114 if isinstance(result, GraphLoaded):
115 self._loaded = True
116 self._last_snapshot = result.snapshot
117 self._last_sim_time = result.snapshot.sim_time
118 self._event_history.clear()
119 print(f"Loaded {path}", file=self.stdout)
120 print("", file=self.stdout)
121 # Print initial state summary
122 print(format_snapshot_summary(result.snapshot), file=self.stdout)
123
124 def do_step(self, arg: str) -> None:
125 """Step by one tick (all events at current time): step
126
127 Processes all events at the current simulation time and returns.
128
129 Args:
130 arg: Unused (for cmd.Cmd compatibility)
131 """
132 if not self._check_loaded():
133 return
134
135 try:
136 result = self.backend.send_command(StepTickCmd())
137 except Exception as e:
138 print(f"Error: {e}", file=self.stdout)
139 return
140
141 if isinstance(result, StepResult):
142 self._event_history.extend(result.events)
143 if result.snapshot:
144 self._last_snapshot = result.snapshot
145 self._last_sim_time = result.snapshot.sim_time
146 print(format_step_result(result), file=self.stdout)
147
148 def do_stepe(self, arg: str) -> None:
149 """Step by one event: stepe
150
151 Processes exactly one event and returns.
152
153 Args:
154 arg: Unused (for cmd.Cmd compatibility)
155 """
156 if not self._check_loaded():
157 return
158
159 try:
160 result = self.backend.send_command(StepEventCmd())
161 except Exception as e:
162 print(f"Error: {e}", file=self.stdout)
163 return
164
165 if isinstance(result, StepResult):
166 self._event_history.extend(result.events)
167 if result.snapshot:
168 self._last_snapshot = result.snapshot
169 self._last_sim_time = result.snapshot.sim_time
170 print(format_step_result(result), file=self.stdout)
171
172 def do_run(self, arg: str) -> None:
173 """Run until target time: run <until>
174
175 Runs the simulation continuously until reaching the specified
176 simulation time, batching events per tick.
177
178 Args:
179 arg: Target simulation time (float)
180 """
181 if not self._check_loaded():
182 return
183
184 if not arg:
185 print("Usage: run <until>", file=self.stdout)
186 return
187
188 try:
189 until = float(arg)
190 except ValueError:
191 print(f"Invalid time: {arg}", file=self.stdout)
192 return
193
194 try:
195 result = self.backend.send_command(RunUntilCmd(until=until))
196 except Exception as e:
197 print(f"Error: {e}", file=self.stdout)
198 return
199
200 if isinstance(result, StepResult):
201 self._event_history.extend(result.events)
202 if result.snapshot:
203 self._last_snapshot = result.snapshot
204 self._last_sim_time = result.snapshot.sim_time
205
206 # Print summary with event count
207 event_count = len(result.events)
208 print(f"Ran {event_count} events", file=self.stdout)
209 print(format_step_result(result), file=self.stdout)
210
211 def do_send(self, arg: str) -> None:
212 """Send token to target: send <target> <offset> <act_id> <data> [--sm <addr> <op>]
213
214 For MonadToken: send <target> <offset> <act_id> <data>
215 For SMToken: send <target> --sm <addr> <op> [<data>]
216
217 The --sm flag switches to SMToken mode. op is parsed from MemOp names (READ, WRITE, etc).
218
219 Args:
220 arg: Command arguments as described above
221 """
222 if not self._check_loaded():
223 return
224
225 if not arg:
226 print("Usage: send <target> <offset> <act_id> <data> [--sm <addr> <op>]", file=self.stdout)
227 return
228
229 try:
230 token = self._parse_token_args(arg)
231 except ValueError as e:
232 print(f"Error parsing token: {e}", file=self.stdout)
233 return
234
235 try:
236 result = self.backend.send_command(SendCmd(token=token))
237 except Exception as e:
238 print(f"Error: {e}", file=self.stdout)
239 return
240
241 if isinstance(result, StepResult):
242 self._event_history.extend(result.events)
243 if result.snapshot:
244 self._last_snapshot = result.snapshot
245 self._last_sim_time = result.snapshot.sim_time
246 print(f"Sent {token}", file=self.stdout)
247
248 def do_inject(self, arg: str) -> None:
249 """Inject token directly (no backpressure): inject <target> <offset> <act_id> <data> [--sm <addr> <op>]
250
251 For MonadToken: inject <target> <offset> <act_id> <data>
252 For SMToken: inject <target> --sm <addr> <op> [<data>]
253
254 The --sm flag switches to SMToken mode. op is parsed from MemOp names (READ, WRITE, etc).
255
256 Args:
257 arg: Command arguments as described above
258 """
259 if not self._check_loaded():
260 return
261
262 if not arg:
263 print("Usage: inject <target> <offset> <act_id> <data> [--sm <addr> <op>]", file=self.stdout)
264 return
265
266 try:
267 token = self._parse_token_args(arg)
268 except ValueError as e:
269 print(f"Error parsing token: {e}", file=self.stdout)
270 return
271
272 try:
273 result = self.backend.send_command(InjectCmd(token=token))
274 except Exception as e:
275 print(f"Error: {e}", file=self.stdout)
276 return
277
278 if isinstance(result, StepResult):
279 self._event_history.extend(result.events)
280 if result.snapshot:
281 self._last_snapshot = result.snapshot
282 self._last_sim_time = result.snapshot.sim_time
283 print(f"Injected {token}", file=self.stdout)
284
285 def _parse_token_args(self, arg: str) -> MonadToken | SMToken:
286 """Parse token arguments from command line.
287
288 Supports two modes:
289 - MonadToken: <target> <offset> <act_id> <data> [--sm ...]
290 - SMToken: <target> --sm <addr> <op> [<data>]
291
292 Args:
293 arg: Command arguments string
294
295 Returns:
296 Constructed token (MonadToken or SMToken)
297
298 Raises:
299 ValueError: If parsing fails
300 """
301 parts = arg.split()
302 if not parts:
303 raise ValueError("Expected at least target")
304
305 target = int(parts[0])
306
307 # Check for --sm flag
308 if "--sm" in parts:
309 sm_idx = parts.index("--sm")
310 if sm_idx + 2 >= len(parts):
311 raise ValueError("--sm requires <addr> <op> [<data>]")
312
313 addr = int(parts[sm_idx + 1])
314 op_name = parts[sm_idx + 2].upper()
315
316 # Parse MemOp by name
317 try:
318 op = MemOp[op_name]
319 except KeyError:
320 raise ValueError(f"Unknown MemOp: {op_name}")
321
322 # Optional data
323 data = None
324 if sm_idx + 3 < len(parts):
325 data = int(parts[sm_idx + 3])
326
327 return SMToken(target=target, addr=addr, op=op, flags=None, data=data, ret=None)
328 else:
329 # MonadToken mode
330 if len(parts) < 4:
331 raise ValueError("MonadToken requires <target> <offset> <act_id> <data>")
332
333 offset = int(parts[1])
334 act_id = int(parts[2])
335 data = int(parts[3])
336
337 return MonadToken(target=target, offset=offset, act_id=act_id, data=data, inline=False)
338
339 def do_state(self, arg: str) -> None:
340 """Display current simulation state: state
341
342 Shows full summary of current system state from the last snapshot.
343
344 Args:
345 arg: Unused (for cmd.Cmd compatibility)
346 """
347 if not self._check_loaded():
348 return
349
350 if self._last_snapshot is None:
351 print("No snapshot available", file=self.stdout)
352 return
353
354 print(format_snapshot_summary(self._last_snapshot), file=self.stdout)
355
356 def do_pe(self, arg: str) -> None:
357 """Display PE state: pe <id>
358
359 Shows detailed state for the specified Processing Element.
360
361 Args:
362 arg: PE ID (integer)
363 """
364 if not self._check_loaded():
365 return
366
367 if not arg:
368 print("Usage: pe <id>", file=self.stdout)
369 return
370
371 try:
372 pe_id = int(arg)
373 except ValueError:
374 print(f"Invalid PE ID: {arg}", file=self.stdout)
375 return
376
377 if self._last_snapshot is None:
378 print("No snapshot available", file=self.stdout)
379 return
380
381 if pe_id not in self._last_snapshot.pes:
382 print(f"PE {pe_id} not found", file=self.stdout)
383 return
384
385 pe_snapshot = self._last_snapshot.pes[pe_id]
386 print(format_pe_state(pe_snapshot), file=self.stdout)
387
388 def do_sm(self, arg: str) -> None:
389 """Display SM state: sm <id>
390
391 Shows detailed state for the specified Structure Memory.
392
393 Args:
394 arg: SM ID (integer)
395 """
396 if not self._check_loaded():
397 return
398
399 if not arg:
400 print("Usage: sm <id>", file=self.stdout)
401 return
402
403 try:
404 sm_id = int(arg)
405 except ValueError:
406 print(f"Invalid SM ID: {arg}", file=self.stdout)
407 return
408
409 if self._last_snapshot is None:
410 print("No snapshot available", file=self.stdout)
411 return
412
413 if sm_id not in self._last_snapshot.sms:
414 print(f"SM {sm_id} not found", file=self.stdout)
415 return
416
417 sm_snapshot = self._last_snapshot.sms[sm_id]
418 print(format_sm_state(sm_snapshot), file=self.stdout)
419
420 def do_log(self, arg: str) -> None:
421 """Display recent events: log [filter]
422
423 Shows all events from history, optionally filtered by component or type.
424 Filter format: 'pe:0', 'sm:1', 'Matched', etc.
425
426 Args:
427 arg: Optional filter string
428 """
429 if not self._event_history:
430 print("No events in history", file=self.stdout)
431 return
432
433 filter_str = arg.strip() if arg else None
434 events_to_show = self._event_history
435
436 if filter_str:
437 filtered = []
438 for event in self._event_history:
439 # Filter by component (pe:0, sm:1)
440 if ":" in filter_str:
441 comp_type, comp_id = filter_str.split(":", 1)
442 if comp_type.lower() in event.component.lower():
443 try:
444 if str(int(comp_id)) in event.component:
445 filtered.append(event)
446 except ValueError:
447 pass
448 # Filter by event type name
449 elif filter_str in type(event).__name__:
450 filtered.append(event)
451 events_to_show = filtered
452
453 for event in events_to_show:
454 print(format_event(event), file=self.stdout)
455
456 if not events_to_show and filter_str:
457 print(f"No events matching '{filter_str}'", file=self.stdout)
458
459 def do_time(self, arg: str) -> None:
460 """Display current simulation time: time
461
462 Shows the current simulation time from the last snapshot.
463
464 Args:
465 arg: Unused (for cmd.Cmd compatibility)
466 """
467 print(f"Simulation time: {self._last_sim_time}", file=self.stdout)
468
469 def do_reset(self, arg: str) -> None:
470 """Reset simulation state: reset
471
472 Clears the current simulation and resets all state. The backend
473 remains ready to load a new program.
474
475 Args:
476 arg: Unused (for cmd.Cmd compatibility)
477 """
478 try:
479 result = self.backend.send_command(ResetCmd(reload=False))
480 except Exception as e:
481 print(f"Error: {e}", file=self.stdout)
482 return
483
484 self._loaded = False
485 self._event_history.clear()
486 self._last_snapshot = None
487 self._last_sim_time = 0.0
488 print("Simulation reset", file=self.stdout)
489
490 def do_quit(self, arg: str) -> bool:
491 """Quit the monitor: quit
492
493 Exits the REPL cleanly. Also triggered by Ctrl-D.
494
495 Args:
496 arg: Unused (for cmd.Cmd compatibility)
497
498 Returns:
499 True to exit cmdloop
500 """
501 return True
502
503 # Alias for Ctrl-D
504 def do_EOF(self, arg: str) -> bool:
505 """Exit on Ctrl-D"""
506 return True
507
508 def default(self, line: str) -> None:
509 """Handle unknown commands."""
510 if line.strip():
511 print(f"Unknown command: {line}", file=self.stdout)