OR-1 dataflow CPU sketch
1"""Convert IRGraph with StateSnapshot to JSON for the monitor frontend.
2
3Extends dfgraph/graph_json.py with execution state overlay: active, matched,
4executed flags on nodes, and token_flow on edges. Synthesizes SM nodes with
5live state from snapshots and event-driven overlay.
6"""
7
8from __future__ import annotations
9
10import logging
11import math
12from typing import Any
13
14from cm_inst import MemOp
15from asm.ir import (
16 IRGraph, IRNode, IREdge, ResolvedDest,
17 collect_all_nodes_and_edges, collect_all_data_defs,
18)
19from asm.opcodes import OP_TO_MNEMONIC
20from dfgraph.categories import OpcodeCategory, categorise, CATEGORY_COLOURS
21from emu.events import (
22 SimEvent, TokenReceived, Matched, Executed, Emitted,
23 CellWritten, DeferredRead as DeferredReadEvent,
24 DeferredSatisfied, ResultSent,
25 FrameAllocated, FrameFreed, FrameSlotWritten, TokenRejected,
26)
27from monitor.snapshot import StateSnapshot
28
29logger = logging.getLogger(__name__)
30
31SM_NODE_PREFIX = "__sm_"
32
33
34def _safe_time(value: float) -> float | None:
35 """Convert non-finite floats to None for JSON serialization."""
36 if math.isinf(value) or math.isnan(value):
37 return None
38 return value
39
40
41def _parse_component(component: str) -> tuple[str, int]:
42 """Parse component string format 'pe:0' or 'sm:1' into (kind, id)."""
43 kind, _, id_str = component.partition(":")
44 if not id_str:
45 raise ValueError(f"Invalid component format: {component}")
46 return kind, int(id_str)
47
48
49def _serialise_node(node: IRNode) -> dict[str, Any]:
50 """Serialize an IR node to JSON."""
51 category = categorise(node.opcode)
52 mnemonic = OP_TO_MNEMONIC[node.opcode]
53
54 return {
55 "id": node.name,
56 "opcode": mnemonic,
57 "category": category.value,
58 "colour": CATEGORY_COLOURS[category],
59 "const": node.const,
60 "pe": node.pe,
61 "iram_offset": node.iram_offset,
62 "act_id": node.act_id,
63 "has_error": False,
64 "loc": {
65 "line": 0,
66 "column": 0,
67 "end_line": None,
68 "end_column": None,
69 },
70 "active": False,
71 "matched": False,
72 "executed": False,
73 }
74
75
76def _serialise_slot(slot: Any) -> Any:
77 """Serialize a frame slot value to JSON-compatible format."""
78 if slot is None:
79 return None
80 if isinstance(slot, int):
81 return slot
82 # slot is a FrameDest
83 return {
84 "target_pe": slot.target_pe,
85 "offset": slot.offset,
86 "act_id": slot.act_id,
87 "port": slot.port.name,
88 "token_kind": slot.token_kind.name,
89 }
90
91
92def _serialise_edge(edge: IREdge) -> dict[str, Any]:
93 """Serialize an IR edge to JSON."""
94 return {
95 "source": edge.source,
96 "target": edge.dest,
97 "port": edge.port.name,
98 "source_port": edge.source_port.name if edge.source_port else None,
99 "has_error": False,
100 "token_flow": False,
101 }
102
103
104def _serialise_pe_state(pe_id: int, snapshot: StateSnapshot) -> dict[str, Any]:
105 """Serialize PE state from snapshot."""
106 pe_snap = snapshot.pes.get(pe_id)
107 if not pe_snap:
108 return {}
109
110 iram_json = {}
111 for offset, inst in pe_snap.iram.items():
112 opcode_str = str(inst.opcode.name) if hasattr(inst.opcode, 'name') else str(inst.opcode)
113 iram_json[str(offset)] = {
114 "opcode": opcode_str,
115 "offset": offset,
116 }
117
118 frames_json = [[_serialise_slot(s) for s in frame] for frame in pe_snap.frames]
119
120 tag_store_json = {
121 str(act_id): {"frame_id": fid, "lane": lane}
122 for act_id, (fid, lane) in pe_snap.tag_store.items()
123 }
124
125 return {
126 "pe_id": pe_snap.pe_id,
127 "iram": iram_json,
128 "frames": frames_json,
129 "tag_store": tag_store_json,
130 "lane_count": pe_snap.lane_count,
131 "free_frames": list(pe_snap.free_frames),
132 "input_queue_size": len(pe_snap.input_queue),
133 }
134
135
136def _serialise_sm_state(sm_id: int, snapshot: StateSnapshot) -> dict[str, Any]:
137 """Serialize SM state from snapshot."""
138 sm_snap = snapshot.sms.get(sm_id)
139 if not sm_snap:
140 return {}
141
142 cells = {}
143 for addr, cell_snap in sm_snap.cells.items():
144 cells[str(addr)] = {
145 "presence": cell_snap.pres.name,
146 "data_l": cell_snap.data_l,
147 "data_r": cell_snap.data_r,
148 }
149
150 deferred_read = None
151 if sm_snap.deferred_read:
152 deferred_read = {
153 "cell_addr": sm_snap.deferred_read["cell_addr"],
154 "return_route": str(sm_snap.deferred_read["return_route"]),
155 }
156
157 return {
158 "cells": cells,
159 "deferred_read": deferred_read,
160 "t0_store_size": len(sm_snap.t0_store),
161 "input_queue_depth": len(sm_snap.input_queue),
162 }
163
164
165def _serialise_event(event: SimEvent) -> dict[str, Any]:
166 """Serialize a SimEvent to JSON."""
167 base = {
168 "type": type(event).__name__,
169 "time": event.time,
170 "component": event.component,
171 }
172
173 if isinstance(event, TokenReceived):
174 base["details"] = {"token": str(event.token)}
175 elif isinstance(event, Matched):
176 base["details"] = {
177 "left": event.left,
178 "right": event.right,
179 "act_id": event.act_id,
180 "frame_id": event.frame_id,
181 "offset": event.offset,
182 }
183 elif isinstance(event, Executed):
184 base["details"] = {
185 "op": str(event.op),
186 "result": event.result,
187 "bool_out": event.bool_out,
188 }
189 elif isinstance(event, Emitted):
190 base["details"] = {"token": str(event.token)}
191 elif isinstance(event, FrameAllocated):
192 base["details"] = {
193 "act_id": event.act_id,
194 "frame_id": event.frame_id,
195 "lane": event.lane,
196 }
197 elif isinstance(event, FrameFreed):
198 base["details"] = {
199 "act_id": event.act_id,
200 "frame_id": event.frame_id,
201 "lane": event.lane,
202 "frame_freed": event.frame_freed,
203 }
204 elif isinstance(event, FrameSlotWritten):
205 base["details"] = {
206 "frame_id": event.frame_id,
207 "slot": event.slot,
208 "value": event.value,
209 }
210 elif isinstance(event, TokenRejected):
211 base["details"] = {
212 "reason": event.reason,
213 }
214 else:
215 base["details"] = {}
216
217 return base
218
219
220# ---------------------------------------------------------------------------
221# SM node/edge synthesis
222# ---------------------------------------------------------------------------
223
224
225def _collect_referenced_sm_ids(
226 all_nodes: dict[str, IRNode],
227 ir_graph: IRGraph,
228) -> set[int]:
229 """Collect SM IDs referenced by MemOp nodes or data definitions."""
230 sm_ids: set[int] = set()
231 for node in all_nodes.values():
232 if isinstance(node.opcode, MemOp) and node.sm_id is not None:
233 sm_ids.add(node.sm_id)
234 for data_def in collect_all_data_defs(ir_graph):
235 if data_def.sm_id is not None:
236 sm_ids.add(data_def.sm_id)
237 return sm_ids
238
239
240def _build_sm_node_label(sm_id: int, snapshot: StateSnapshot) -> str:
241 """Build a compact text label for an SM node showing cell contents and deferred reads."""
242 sm_snap = snapshot.sms.get(sm_id)
243 if not sm_snap:
244 return f"SM {sm_id}"
245
246 lines = [f"SM {sm_id}"]
247
248 for addr in sorted(sm_snap.cells.keys()):
249 cell = sm_snap.cells[addr]
250 pres = cell.pres.name
251 if cell.data_l is not None:
252 lines.append(f"[{addr}] {pres} {cell.data_l}")
253 else:
254 lines.append(f"[{addr}] {pres}")
255
256 if sm_snap.deferred_read:
257 dr = sm_snap.deferred_read
258 lines.append(f"DR: [{dr['cell_addr']}]")
259
260 return "\n".join(lines)
261
262
263def _synthesize_sm_nodes(
264 sm_ids: set[int],
265 snapshot: StateSnapshot,
266) -> list[dict[str, Any]]:
267 """Create synthetic monitor graph nodes for each referenced SM instance."""
268 category = OpcodeCategory.STRUCTURE_MEMORY
269 nodes = []
270 for sm_id in sorted(sm_ids):
271 label = _build_sm_node_label(sm_id, snapshot)
272 sm_snap = snapshot.sms.get(sm_id)
273
274 sm_state = None
275 if sm_snap:
276 cells = {}
277 for addr, cell in sm_snap.cells.items():
278 cells[str(addr)] = {
279 "presence": cell.pres.name,
280 "data_l": cell.data_l,
281 "data_r": cell.data_r,
282 }
283 deferred_read = None
284 if sm_snap.deferred_read:
285 deferred_read = {
286 "cell_addr": sm_snap.deferred_read["cell_addr"],
287 "return_route": str(sm_snap.deferred_read["return_route"]),
288 }
289 sm_state = {
290 "cells": cells,
291 "deferred_read": deferred_read,
292 "t0_store_size": len(sm_snap.t0_store),
293 }
294
295 nodes.append({
296 "id": f"{SM_NODE_PREFIX}{sm_id}",
297 "opcode": "sm",
298 "label": label,
299 "category": category.value,
300 "colour": CATEGORY_COLOURS[category],
301 "const": None,
302 "pe": None,
303 "iram_offset": None,
304 "act_id": None,
305 "has_error": False,
306 "loc": {"line": 0, "column": 0, "end_line": None, "end_column": None},
307 "sm_id": sm_id,
308 "synthetic": True,
309 "sm_state": sm_state,
310 # Execution overlay
311 "active": False,
312 "matched": False,
313 "executed": False,
314 "cell_written": False,
315 })
316 return nodes
317
318
319def _synthesize_sm_edges(
320 all_nodes: dict[str, IRNode],
321) -> list[dict[str, Any]]:
322 """Create synthetic edges between MemOp nodes and their target SM nodes."""
323 edges: list[dict[str, Any]] = []
324 for node in all_nodes.values():
325 if not isinstance(node.opcode, MemOp) or node.sm_id is None:
326 continue
327
328 sm_node_id = f"{SM_NODE_PREFIX}{node.sm_id}"
329
330 # Request edge: instruction → SM
331 edges.append({
332 "source": node.name,
333 "target": sm_node_id,
334 "port": "REQ",
335 "source_port": None,
336 "has_error": False,
337 "token_flow": False,
338 "synthetic": True,
339 })
340
341 # Return edge: SM → requesting node (data flows back to the reader)
342 if isinstance(node.dest_l, ResolvedDest):
343 edges.append({
344 "source": sm_node_id,
345 "target": node.name,
346 "port": "RET",
347 "source_port": None,
348 "has_error": False,
349 "token_flow": False,
350 "synthetic": True,
351 })
352
353 return edges
354
355
356def _apply_sm_event_overlay(
357 events: list[SimEvent],
358 node_by_id: dict[str, dict[str, Any]],
359 edge_by_key: dict[tuple[str, str], dict[str, Any]],
360) -> None:
361 """Apply execution overlay from SM events onto synthesized SM nodes and edges."""
362 for event in events:
363 try:
364 kind, comp_id = _parse_component(event.component)
365 except ValueError:
366 continue
367 if kind != "sm":
368 continue
369
370 sm_node_id = f"{SM_NODE_PREFIX}{comp_id}"
371 sm_node = node_by_id.get(sm_node_id)
372 if not sm_node:
373 continue
374
375 if isinstance(event, TokenReceived):
376 sm_node["active"] = True
377
378 elif isinstance(event, CellWritten):
379 sm_node["cell_written"] = True
380
381 elif isinstance(event, (DeferredReadEvent, DeferredSatisfied)):
382 sm_node["active"] = True
383
384 elif isinstance(event, ResultSent):
385 # Mark return edges from this SM as having token flow
386 for key, edge in edge_by_key.items():
387 if key[0] == sm_node_id:
388 edge["token_flow"] = True
389
390
391# ---------------------------------------------------------------------------
392# Public API
393# ---------------------------------------------------------------------------
394
395
396def graph_to_monitor_json(
397 ir_graph: IRGraph,
398 snapshot: StateSnapshot,
399 events: list[SimEvent],
400) -> dict[str, Any]:
401 """Convert IRGraph + StateSnapshot + events to monitor JSON."""
402 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph)
403
404 # Serialize base nodes and edges
405 nodes_json = [_serialise_node(node) for node in all_nodes.values()]
406 edges_json = [_serialise_edge(edge) for edge in all_edges]
407
408 # Synthesize SM nodes and edges
409 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph)
410 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot)
411 sm_edges = _synthesize_sm_edges(all_nodes)
412 nodes_json.extend(sm_nodes)
413 edges_json.extend(sm_edges)
414
415 # Create lookup dicts for quick updates
416 node_by_id = {n["id"]: n for n in nodes_json}
417 edge_by_key = {(e["source"], e["target"]): e for e in edges_json}
418
419 # Apply PE execution overlay from events
420 for event in events:
421 if isinstance(event, TokenReceived):
422 try:
423 kind, pe_id = _parse_component(event.component)
424 if kind == "pe":
425 for node in all_nodes.values():
426 if node.pe == pe_id:
427 if node.name in node_by_id:
428 node_by_id[node.name]["active"] = True
429 except ValueError:
430 pass
431
432 elif isinstance(event, Matched):
433 try:
434 kind, pe_id = _parse_component(event.component)
435 if kind == "pe":
436 for node in all_nodes.values():
437 if node.pe == pe_id and node.iram_offset == event.offset:
438 if node.name in node_by_id:
439 node_by_id[node.name]["matched"] = True
440 except ValueError:
441 pass
442
443 elif isinstance(event, Executed):
444 try:
445 kind, pe_id = _parse_component(event.component)
446 if kind == "pe":
447 for node in all_nodes.values():
448 if node.pe == pe_id:
449 if node.name in node_by_id:
450 node_by_id[node.name]["executed"] = True
451 except ValueError:
452 pass
453
454 # Mark token flow on edges based on emitted tokens
455 for event in events:
456 if isinstance(event, Emitted):
457 token = event.token
458 if hasattr(token, 'target'):
459 for edge in edges_json:
460 dest_id = edge["target"]
461 dest_node = node_by_id.get(dest_id)
462 if dest_node and dest_node.get("pe") == token.target:
463 edge["token_flow"] = True
464
465 # Apply SM event overlay
466 _apply_sm_event_overlay(events, node_by_id, edge_by_key)
467
468 # Serialize state
469 state_json = {
470 "pes": {
471 str(pe_id): _serialise_pe_state(pe_id, snapshot)
472 for pe_id in snapshot.pes.keys()
473 },
474 "sms": {
475 str(sm_id): _serialise_sm_state(sm_id, snapshot)
476 for sm_id in snapshot.sms.keys()
477 },
478 }
479
480 events_json = [_serialise_event(e) for e in events]
481
482 return {
483 "type": "monitor_update",
484 "graph": {
485 "nodes": nodes_json,
486 "edges": edges_json,
487 "regions": [],
488 },
489 "state": state_json,
490 "events": events_json,
491 "sim_time": snapshot.sim_time,
492 "next_time": _safe_time(snapshot.next_time),
493 "finished": snapshot.next_time == float('inf'),
494 }
495
496
497def graph_loaded_json(ir_graph: IRGraph, snapshot: StateSnapshot) -> dict[str, Any]:
498 """Initial graph load (no events, no execution overlay flags set)."""
499 all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph)
500
501 nodes_json = [_serialise_node(node) for node in all_nodes.values()]
502 edges_json = [_serialise_edge(edge) for edge in all_edges]
503
504 # Synthesize SM nodes and edges
505 sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph)
506 sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot)
507 sm_edges = _synthesize_sm_edges(all_nodes)
508 nodes_json.extend(sm_nodes)
509 edges_json.extend(sm_edges)
510
511 state_json = {
512 "pes": {
513 str(pe_id): _serialise_pe_state(pe_id, snapshot)
514 for pe_id in snapshot.pes.keys()
515 },
516 "sms": {
517 str(sm_id): _serialise_sm_state(sm_id, snapshot)
518 for sm_id in snapshot.sms.keys()
519 },
520 }
521
522 return {
523 "type": "graph_loaded",
524 "graph": {
525 "nodes": nodes_json,
526 "edges": edges_json,
527 "regions": [],
528 },
529 "state": state_json,
530 "sim_time": snapshot.sim_time,
531 "next_time": _safe_time(snapshot.next_time),
532 "finished": snapshot.next_time == float('inf'),
533 }