"""Convert IRGraph with StateSnapshot to JSON for the monitor frontend. Extends dfgraph/graph_json.py with execution state overlay: active, matched, executed flags on nodes, and token_flow on edges. Synthesizes SM nodes with live state from snapshots and event-driven overlay. """ from __future__ import annotations import logging import math from typing import Any from cm_inst import MemOp from asm.ir import ( IRGraph, IRNode, IREdge, ResolvedDest, collect_all_nodes_and_edges, collect_all_data_defs, ) from asm.opcodes import OP_TO_MNEMONIC from dfgraph.categories import OpcodeCategory, categorise, CATEGORY_COLOURS from emu.events import ( SimEvent, TokenReceived, Matched, Executed, Emitted, CellWritten, DeferredRead as DeferredReadEvent, DeferredSatisfied, ResultSent, FrameAllocated, FrameFreed, FrameSlotWritten, TokenRejected, ) from monitor.snapshot import StateSnapshot logger = logging.getLogger(__name__) SM_NODE_PREFIX = "__sm_" def _safe_time(value: float) -> float | None: """Convert non-finite floats to None for JSON serialization.""" if math.isinf(value) or math.isnan(value): return None return value def _parse_component(component: str) -> tuple[str, int]: """Parse component string format 'pe:0' or 'sm:1' into (kind, id).""" kind, _, id_str = component.partition(":") if not id_str: raise ValueError(f"Invalid component format: {component}") return kind, int(id_str) def _serialise_node(node: IRNode) -> dict[str, Any]: """Serialize an IR node to JSON.""" category = categorise(node.opcode) mnemonic = OP_TO_MNEMONIC[node.opcode] return { "id": node.name, "opcode": mnemonic, "category": category.value, "colour": CATEGORY_COLOURS[category], "const": node.const, "pe": node.pe, "iram_offset": node.iram_offset, "act_id": node.act_id, "has_error": False, "loc": { "line": 0, "column": 0, "end_line": None, "end_column": None, }, "active": False, "matched": False, "executed": False, } def _serialise_slot(slot: Any) -> Any: """Serialize a frame slot value to JSON-compatible format.""" if slot is None: return None if isinstance(slot, int): return slot # slot is a FrameDest return { "target_pe": slot.target_pe, "offset": slot.offset, "act_id": slot.act_id, "port": slot.port.name, "token_kind": slot.token_kind.name, } def _serialise_edge(edge: IREdge) -> dict[str, Any]: """Serialize an IR edge to JSON.""" return { "source": edge.source, "target": edge.dest, "port": edge.port.name, "source_port": edge.source_port.name if edge.source_port else None, "has_error": False, "token_flow": False, } def _serialise_pe_state(pe_id: int, snapshot: StateSnapshot) -> dict[str, Any]: """Serialize PE state from snapshot.""" pe_snap = snapshot.pes.get(pe_id) if not pe_snap: return {} iram_json = {} for offset, inst in pe_snap.iram.items(): opcode_str = str(inst.opcode.name) if hasattr(inst.opcode, 'name') else str(inst.opcode) iram_json[str(offset)] = { "opcode": opcode_str, "offset": offset, } frames_json = [[_serialise_slot(s) for s in frame] for frame in pe_snap.frames] tag_store_json = { str(act_id): {"frame_id": fid, "lane": lane} for act_id, (fid, lane) in pe_snap.tag_store.items() } return { "pe_id": pe_snap.pe_id, "iram": iram_json, "frames": frames_json, "tag_store": tag_store_json, "lane_count": pe_snap.lane_count, "free_frames": list(pe_snap.free_frames), "input_queue_size": len(pe_snap.input_queue), } def _serialise_sm_state(sm_id: int, snapshot: StateSnapshot) -> dict[str, Any]: """Serialize SM state from snapshot.""" sm_snap = snapshot.sms.get(sm_id) if not sm_snap: return {} cells = {} for addr, cell_snap in sm_snap.cells.items(): cells[str(addr)] = { "presence": cell_snap.pres.name, "data_l": cell_snap.data_l, "data_r": cell_snap.data_r, } deferred_read = None if sm_snap.deferred_read: deferred_read = { "cell_addr": sm_snap.deferred_read["cell_addr"], "return_route": str(sm_snap.deferred_read["return_route"]), } return { "cells": cells, "deferred_read": deferred_read, "t0_store_size": len(sm_snap.t0_store), "input_queue_depth": len(sm_snap.input_queue), } def _serialise_event(event: SimEvent) -> dict[str, Any]: """Serialize a SimEvent to JSON.""" base = { "type": type(event).__name__, "time": event.time, "component": event.component, } if isinstance(event, TokenReceived): base["details"] = {"token": str(event.token)} elif isinstance(event, Matched): base["details"] = { "left": event.left, "right": event.right, "act_id": event.act_id, "frame_id": event.frame_id, "offset": event.offset, } elif isinstance(event, Executed): base["details"] = { "op": str(event.op), "result": event.result, "bool_out": event.bool_out, } elif isinstance(event, Emitted): base["details"] = {"token": str(event.token)} elif isinstance(event, FrameAllocated): base["details"] = { "act_id": event.act_id, "frame_id": event.frame_id, "lane": event.lane, } elif isinstance(event, FrameFreed): base["details"] = { "act_id": event.act_id, "frame_id": event.frame_id, "lane": event.lane, "frame_freed": event.frame_freed, } elif isinstance(event, FrameSlotWritten): base["details"] = { "frame_id": event.frame_id, "slot": event.slot, "value": event.value, } elif isinstance(event, TokenRejected): base["details"] = { "reason": event.reason, } else: base["details"] = {} return base # --------------------------------------------------------------------------- # SM node/edge synthesis # --------------------------------------------------------------------------- def _collect_referenced_sm_ids( all_nodes: dict[str, IRNode], ir_graph: IRGraph, ) -> set[int]: """Collect SM IDs referenced by MemOp nodes or data definitions.""" sm_ids: set[int] = set() for node in all_nodes.values(): if isinstance(node.opcode, MemOp) and node.sm_id is not None: sm_ids.add(node.sm_id) for data_def in collect_all_data_defs(ir_graph): if data_def.sm_id is not None: sm_ids.add(data_def.sm_id) return sm_ids def _build_sm_node_label(sm_id: int, snapshot: StateSnapshot) -> str: """Build a compact text label for an SM node showing cell contents and deferred reads.""" sm_snap = snapshot.sms.get(sm_id) if not sm_snap: return f"SM {sm_id}" lines = [f"SM {sm_id}"] for addr in sorted(sm_snap.cells.keys()): cell = sm_snap.cells[addr] pres = cell.pres.name if cell.data_l is not None: lines.append(f"[{addr}] {pres} {cell.data_l}") else: lines.append(f"[{addr}] {pres}") if sm_snap.deferred_read: dr = sm_snap.deferred_read lines.append(f"DR: [{dr['cell_addr']}]") return "\n".join(lines) def _synthesize_sm_nodes( sm_ids: set[int], snapshot: StateSnapshot, ) -> list[dict[str, Any]]: """Create synthetic monitor graph nodes for each referenced SM instance.""" category = OpcodeCategory.STRUCTURE_MEMORY nodes = [] for sm_id in sorted(sm_ids): label = _build_sm_node_label(sm_id, snapshot) sm_snap = snapshot.sms.get(sm_id) sm_state = None if sm_snap: cells = {} for addr, cell in sm_snap.cells.items(): cells[str(addr)] = { "presence": cell.pres.name, "data_l": cell.data_l, "data_r": cell.data_r, } deferred_read = None if sm_snap.deferred_read: deferred_read = { "cell_addr": sm_snap.deferred_read["cell_addr"], "return_route": str(sm_snap.deferred_read["return_route"]), } sm_state = { "cells": cells, "deferred_read": deferred_read, "t0_store_size": len(sm_snap.t0_store), } nodes.append({ "id": f"{SM_NODE_PREFIX}{sm_id}", "opcode": "sm", "label": label, "category": category.value, "colour": CATEGORY_COLOURS[category], "const": None, "pe": None, "iram_offset": None, "act_id": None, "has_error": False, "loc": {"line": 0, "column": 0, "end_line": None, "end_column": None}, "sm_id": sm_id, "synthetic": True, "sm_state": sm_state, # Execution overlay "active": False, "matched": False, "executed": False, "cell_written": False, }) return nodes def _synthesize_sm_edges( all_nodes: dict[str, IRNode], ) -> list[dict[str, Any]]: """Create synthetic edges between MemOp nodes and their target SM nodes.""" edges: list[dict[str, Any]] = [] for node in all_nodes.values(): if not isinstance(node.opcode, MemOp) or node.sm_id is None: continue sm_node_id = f"{SM_NODE_PREFIX}{node.sm_id}" # Request edge: instruction → SM edges.append({ "source": node.name, "target": sm_node_id, "port": "REQ", "source_port": None, "has_error": False, "token_flow": False, "synthetic": True, }) # Return edge: SM → requesting node (data flows back to the reader) if isinstance(node.dest_l, ResolvedDest): edges.append({ "source": sm_node_id, "target": node.name, "port": "RET", "source_port": None, "has_error": False, "token_flow": False, "synthetic": True, }) return edges def _apply_sm_event_overlay( events: list[SimEvent], node_by_id: dict[str, dict[str, Any]], edge_by_key: dict[tuple[str, str], dict[str, Any]], ) -> None: """Apply execution overlay from SM events onto synthesized SM nodes and edges.""" for event in events: try: kind, comp_id = _parse_component(event.component) except ValueError: continue if kind != "sm": continue sm_node_id = f"{SM_NODE_PREFIX}{comp_id}" sm_node = node_by_id.get(sm_node_id) if not sm_node: continue if isinstance(event, TokenReceived): sm_node["active"] = True elif isinstance(event, CellWritten): sm_node["cell_written"] = True elif isinstance(event, (DeferredReadEvent, DeferredSatisfied)): sm_node["active"] = True elif isinstance(event, ResultSent): # Mark return edges from this SM as having token flow for key, edge in edge_by_key.items(): if key[0] == sm_node_id: edge["token_flow"] = True # --------------------------------------------------------------------------- # Public API # --------------------------------------------------------------------------- def graph_to_monitor_json( ir_graph: IRGraph, snapshot: StateSnapshot, events: list[SimEvent], ) -> dict[str, Any]: """Convert IRGraph + StateSnapshot + events to monitor JSON.""" all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph) # Serialize base nodes and edges nodes_json = [_serialise_node(node) for node in all_nodes.values()] edges_json = [_serialise_edge(edge) for edge in all_edges] # Synthesize SM nodes and edges sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph) sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot) sm_edges = _synthesize_sm_edges(all_nodes) nodes_json.extend(sm_nodes) edges_json.extend(sm_edges) # Create lookup dicts for quick updates node_by_id = {n["id"]: n for n in nodes_json} edge_by_key = {(e["source"], e["target"]): e for e in edges_json} # Apply PE execution overlay from events for event in events: if isinstance(event, TokenReceived): try: kind, pe_id = _parse_component(event.component) if kind == "pe": for node in all_nodes.values(): if node.pe == pe_id: if node.name in node_by_id: node_by_id[node.name]["active"] = True except ValueError: pass elif isinstance(event, Matched): try: kind, pe_id = _parse_component(event.component) if kind == "pe": for node in all_nodes.values(): if node.pe == pe_id and node.iram_offset == event.offset: if node.name in node_by_id: node_by_id[node.name]["matched"] = True except ValueError: pass elif isinstance(event, Executed): try: kind, pe_id = _parse_component(event.component) if kind == "pe": for node in all_nodes.values(): if node.pe == pe_id: if node.name in node_by_id: node_by_id[node.name]["executed"] = True except ValueError: pass # Mark token flow on edges based on emitted tokens for event in events: if isinstance(event, Emitted): token = event.token if hasattr(token, 'target'): for edge in edges_json: dest_id = edge["target"] dest_node = node_by_id.get(dest_id) if dest_node and dest_node.get("pe") == token.target: edge["token_flow"] = True # Apply SM event overlay _apply_sm_event_overlay(events, node_by_id, edge_by_key) # Serialize state state_json = { "pes": { str(pe_id): _serialise_pe_state(pe_id, snapshot) for pe_id in snapshot.pes.keys() }, "sms": { str(sm_id): _serialise_sm_state(sm_id, snapshot) for sm_id in snapshot.sms.keys() }, } events_json = [_serialise_event(e) for e in events] return { "type": "monitor_update", "graph": { "nodes": nodes_json, "edges": edges_json, "regions": [], }, "state": state_json, "events": events_json, "sim_time": snapshot.sim_time, "next_time": _safe_time(snapshot.next_time), "finished": snapshot.next_time == float('inf'), } def graph_loaded_json(ir_graph: IRGraph, snapshot: StateSnapshot) -> dict[str, Any]: """Initial graph load (no events, no execution overlay flags set).""" all_nodes, all_edges = collect_all_nodes_and_edges(ir_graph) nodes_json = [_serialise_node(node) for node in all_nodes.values()] edges_json = [_serialise_edge(edge) for edge in all_edges] # Synthesize SM nodes and edges sm_ids = _collect_referenced_sm_ids(all_nodes, ir_graph) sm_nodes = _synthesize_sm_nodes(sm_ids, snapshot) sm_edges = _synthesize_sm_edges(all_nodes) nodes_json.extend(sm_nodes) edges_json.extend(sm_edges) state_json = { "pes": { str(pe_id): _serialise_pe_state(pe_id, snapshot) for pe_id in snapshot.pes.keys() }, "sms": { str(sm_id): _serialise_sm_state(sm_id, snapshot) for sm_id in snapshot.sms.keys() }, } return { "type": "graph_loaded", "graph": { "nodes": nodes_json, "edges": edges_json, "regions": [], }, "state": state_json, "sim_time": snapshot.sim_time, "next_time": _safe_time(snapshot.next_time), "finished": snapshot.next_time == float('inf'), }