OR-1 dataflow CPU sketch
1"""ANSI formatting helpers for monitor REPL output.
2
3Provides functions to format SimEvent, snapshot state, and step results
4with ANSI colour codes for readable terminal display.
5
6Key features:
7- NO_COLOUR flag to disable all colouring (for testing and pipes)
8- Distinct colours for event types, component IDs, data values, errors, and time
9- Compact and detailed formatting modes for different use cases
10"""
11
12from __future__ import annotations
13
14from emu.events import (
15 CellWritten,
16 DeferredRead,
17 DeferredSatisfied,
18 Emitted,
19 Executed,
20 IRAMWritten,
21 Matched,
22 ResultSent,
23 SimEvent,
24 TokenReceived,
25)
26from monitor.commands import StepResult
27from monitor.snapshot import PESnapshot, SMSnapshot, StateSnapshot
28
29# Global flag to disable all colouring (for testing and pipes)
30NO_COLOUR = False
31
32# ANSI colour codes
33COLOURS = {
34 "reset": "\033[0m",
35 "bright_cyan": "\033[96m", # Event types
36 "bright_yellow": "\033[93m", # PE/SM identifiers
37 "white": "\033[97m", # Data values
38 "bright_red": "\033[91m", # Errors
39 "bright_green": "\033[92m", # Sim time
40 "dim": "\033[2m", # Dim (for secondary info)
41}
42
43
44def colour(text: str, code: str) -> str:
45 """Wrap text in ANSI escape code, respecting NO_COLOUR flag.
46
47 Args:
48 text: Text to colour
49 code: Colour code key (from COLOURS dict)
50
51 Returns:
52 Coloured text if NO_COLOUR is False, otherwise plain text
53 """
54 if NO_COLOUR:
55 return text
56 return f"{COLOURS[code]}{text}{COLOURS['reset']}"
57
58
59def format_event(event: SimEvent) -> str:
60 """Format a single SimEvent as a coloured one-line string.
61
62 Shows event type and key fields in distinct colours.
63
64 Args:
65 event: SimEvent instance
66
67 Returns:
68 Formatted event string with ANSI colours
69 """
70 time_str = colour(f"[{event.time:.1f}]", "bright_green")
71 component_str = colour(event.component, "bright_yellow")
72
73 match event:
74 case TokenReceived(token=token):
75 event_type = colour("TokenReceived", "bright_cyan")
76 return f"{time_str} {event_type} {component_str}: {token}"
77
78 case Matched(left=left, right=right, act_id=act_id, frame_id=frame_id):
79 event_type = colour("Matched", "bright_cyan")
80 return (
81 f"{time_str} {event_type} {component_str}: "
82 f"left={colour(str(left), 'white')} "
83 f"right={colour(str(right), 'white')} "
84 f"act_id={colour(str(act_id), 'white')} "
85 f"frame_id={colour(str(frame_id), 'white')}"
86 )
87
88 case Executed(op=op, result=result, bool_out=bool_out):
89 event_type = colour("Executed", "bright_cyan")
90 return (
91 f"{time_str} {event_type} {component_str}: "
92 f"op={colour(op.name, 'white')} "
93 f"result={colour(str(result), 'white')} "
94 f"bool={colour(str(bool_out), 'white')}"
95 )
96
97 case Emitted(token=token):
98 event_type = colour("Emitted", "bright_cyan")
99 return f"{time_str} {event_type} {component_str}: {token}"
100
101 case IRAMWritten(offset=offset, count=count):
102 event_type = colour("IRAMWritten", "bright_cyan")
103 return (
104 f"{time_str} {event_type} {component_str}: "
105 f"offset={colour(str(offset), 'white')} "
106 f"count={colour(str(count), 'white')}"
107 )
108
109 case CellWritten(addr=addr, old_pres=old_pres, new_pres=new_pres):
110 event_type = colour("CellWritten", "bright_cyan")
111 return (
112 f"{time_str} {event_type} {component_str}: "
113 f"addr={colour(str(addr), 'white')} "
114 f"{colour(old_pres.name, 'dim')}→{colour(new_pres.name, 'white')}"
115 )
116
117 case DeferredRead(addr=addr):
118 event_type = colour("DeferredRead", "bright_cyan")
119 return (
120 f"{time_str} {event_type} {component_str}: "
121 f"addr={colour(str(addr), 'white')}"
122 )
123
124 case DeferredSatisfied(addr=addr, data=data):
125 event_type = colour("DeferredSatisfied", "bright_cyan")
126 return (
127 f"{time_str} {event_type} {component_str}: "
128 f"addr={colour(str(addr), 'white')} "
129 f"data={colour(str(data), 'white')}"
130 )
131
132 case ResultSent(token=token):
133 event_type = colour("ResultSent", "bright_cyan")
134 return f"{time_str} {event_type} {component_str}: {token}"
135
136 case _:
137 return f"{time_str} {colour('Unknown', 'bright_red')} {component_str}"
138
139
140def format_snapshot_summary(snapshot: StateSnapshot) -> str:
141 """Format a compact overview of the simulation state.
142
143 Shows: PE count, SM count, sim time, next event time, queue depths.
144
145 Args:
146 snapshot: StateSnapshot instance
147
148 Returns:
149 Compact formatted summary
150 """
151 pe_count = len(snapshot.pes)
152 sm_count = len(snapshot.sms)
153 sim_time_str = colour(f"{snapshot.sim_time:.1f}", "bright_green")
154 next_time_str = colour(f"{snapshot.next_time:.1f}", "bright_green")
155
156 # Count total tokens in queues
157 total_input_tokens = sum(len(pe.input_queue) for pe in snapshot.pes.values())
158 total_input_tokens += sum(len(sm.input_queue) for sm in snapshot.sms.values())
159
160 lines = [
161 f"Simulation: {colour(f'{pe_count} PEs', 'bright_yellow')}, "
162 f"{colour(f'{sm_count} SMs', 'bright_yellow')}",
163 f"Time: {sim_time_str} (next event: {next_time_str})",
164 f"Input queue depth: {colour(str(total_input_tokens), 'white')}",
165 ]
166 return "\n".join(lines)
167
168
169def format_pe_state(pe_snapshot: PESnapshot) -> str:
170 """Format detailed PE state information (multi-line).
171
172 Shows: IRAM entries, frame state, tag store, free frames, queue depth, output log length.
173
174 Args:
175 pe_snapshot: PESnapshot instance
176
177 Returns:
178 Detailed multi-line PE state
179 """
180 pe_id_str = colour(f"PE {pe_snapshot.pe_id}", "bright_yellow")
181 lines = [pe_id_str]
182
183 # IRAM
184 if pe_snapshot.iram:
185 lines.append(" IRAM:")
186 for addr in sorted(pe_snapshot.iram.keys()):
187 inst = pe_snapshot.iram[addr]
188 lines.append(
189 f" [{colour(str(addr), 'white')}] {inst}"
190 )
191 else:
192 lines.append(" IRAM: (empty)")
193
194 # Tag store
195 if pe_snapshot.tag_store:
196 tag_str = ", ".join(
197 f"{colour(str(k), 'white')}: frame {colour(str(fid), 'white')} lane {colour(str(lane), 'white')}"
198 for k, (fid, lane) in sorted(pe_snapshot.tag_store.items())
199 )
200 lines.append(f" Tag store: {{{tag_str}}}")
201 else:
202 lines.append(" Tag store: (empty)")
203
204 # Free frames
205 if pe_snapshot.free_frames:
206 free_str = ", ".join(colour(str(f), "white") for f in pe_snapshot.free_frames)
207 lines.append(f" Free frames: [{free_str}]")
208 else:
209 lines.append(" Free frames: (none)")
210
211 # Frames
212 if pe_snapshot.frames:
213 lines.append(" Frames:")
214 for frame_id, frame in enumerate(pe_snapshot.frames):
215 frame_display = []
216 for slot_id, slot_val in enumerate(frame):
217 if slot_val is None:
218 frame_display.append("None")
219 elif isinstance(slot_val, int):
220 frame_display.append(colour(str(slot_val), "white"))
221 else:
222 # FrameDest object
223 frame_display.append(f"FrameDest(...)")
224 lines.append(
225 f" Frame {colour(str(frame_id), 'white')}: [{', '.join(frame_display)}]"
226 )
227 else:
228 lines.append(" Frames: (empty)")
229
230 # Input queue
231 lines.append(
232 f" Input queue: {colour(str(len(pe_snapshot.input_queue)), 'white')} tokens"
233 )
234
235 # Output log
236 lines.append(
237 f" Output log: {colour(str(len(pe_snapshot.output_log)), 'white')} entries"
238 )
239
240 return "\n".join(lines)
241
242
243def format_sm_state(sm_snapshot: SMSnapshot) -> str:
244 """Format detailed SM state information (multi-line).
245
246 Shows: non-empty cells with presence, deferred read info, T0 store size.
247
248 Args:
249 sm_snapshot: SMSnapshot instance
250
251 Returns:
252 Detailed multi-line SM state
253 """
254 sm_id_str = colour(f"SM {sm_snapshot.sm_id}", "bright_yellow")
255 lines = [sm_id_str]
256
257 # Non-empty cells
258 if sm_snapshot.cells:
259 lines.append(" Cells:")
260 for addr in sorted(sm_snapshot.cells.keys()):
261 cell = sm_snapshot.cells[addr]
262 pres_str = colour(cell.pres.name, "white")
263 data_str = f"L={cell.data_l}" if cell.data_l is not None else ""
264 if cell.data_r is not None:
265 data_str += f" R={cell.data_r}"
266 lines.append(f" [{colour(str(addr), 'white')}] {pres_str} {data_str}")
267 else:
268 lines.append(" Cells: (empty)")
269
270 # Deferred read
271 if sm_snapshot.deferred_read:
272 dr = sm_snapshot.deferred_read
273 lines.append(
274 f" Deferred read: addr={colour(str(dr['cell_addr']), 'white')} "
275 f"route={colour(str(dr['return_route']), 'white')}"
276 )
277
278 # T0 store
279 lines.append(
280 f" T0 store: {colour(str(len(sm_snapshot.t0_store)), 'white')} entries"
281 )
282
283 # Input queue
284 lines.append(
285 f" Input queue: {colour(str(len(sm_snapshot.input_queue)), 'white')} tokens"
286 )
287
288 return "\n".join(lines)
289
290
291def format_step_result(result: StepResult) -> str:
292 """Format a step result combining events and snapshot summary.
293
294 Shows: events (if any), snapshot summary, sim time, and finished status.
295
296 Args:
297 result: StepResult instance
298
299 Returns:
300 Formatted output with events and summary
301 """
302 lines = []
303
304 # Events
305 if result.events:
306 event_count = colour(str(len(result.events)), "white")
307 lines.append(f"Events ({event_count}):")
308 for event in result.events:
309 lines.append(f" {format_event(event)}")
310 else:
311 lines.append("Events: (none)")
312
313 # Summary
314 if result.snapshot:
315 lines.append("")
316 lines.append(format_snapshot_summary(result.snapshot))
317
318 # Finished status
319 if result.finished:
320 lines.append("")
321 lines.append(colour("Simulation finished.", "bright_green"))
322
323 return "\n".join(lines)