"""Convert IRGraph to JSON-serialisable structure for the frontend. Produces a flat graph representation with all nodes, edges, regions, errors, and metadata needed for both logical and physical views. Synthesizes SM nodes and edges from MemOp instructions and data definitions. """ from __future__ import annotations from typing import Any from cm_inst import FrameDest, MemOp from asm.ir import ( IRNode, IREdge, IRGraph, IRRegion, RegionKind, SourceLoc, ResolvedDest, collect_all_nodes_and_edges, collect_all_data_defs, ) from asm.errors import AssemblyError from asm.opcodes import OP_TO_MNEMONIC from dfgraph.pipeline import PipelineResult from dfgraph.categories import OpcodeCategory, CATEGORY_COLOURS SM_NODE_PREFIX = "__sm_" def _serialise_loc(loc: SourceLoc) -> dict[str, Any]: return { "line": loc.line, "column": loc.column, "end_line": loc.end_line, "end_column": loc.end_column, } def _serialise_frame_dest(dest: FrameDest) -> dict[str, Any]: return { "target_pe": dest.target_pe, "offset": dest.offset, "act_id": dest.act_id, "port": dest.port.name, "token_kind": dest.token_kind.name, } def _serialise_node(node: IRNode, error_node_names: set[str]) -> dict[str, Any]: from dfgraph.categories import categorise category = categorise(node.opcode) mnemonic = OP_TO_MNEMONIC[node.opcode] return { "id": node.name, "opcode": mnemonic, "category": category.value, "colour": CATEGORY_COLOURS[category], "const": node.const, "pe": node.pe, "iram_offset": node.iram_offset, "act_id": node.act_id, "has_error": node.name in error_node_names, "loc": _serialise_loc(node.loc), } def _serialise_edge(edge: IREdge, all_nodes: dict[str, IRNode], error_lines: set[int]) -> dict[str, Any]: result: dict[str, Any] = { "source": edge.source, "target": edge.dest, "port": edge.port.name, "source_port": edge.source_port.name if edge.source_port else None, "has_error": edge.loc.line in error_lines, } source_node = all_nodes.get(edge.source) if source_node: if (isinstance(source_node.dest_l, ResolvedDest) and source_node.dest_l.name == edge.dest and source_node.dest_l.frame_dest is not None): result["frame_dest"] = _serialise_frame_dest(source_node.dest_l.frame_dest) elif (isinstance(source_node.dest_r, ResolvedDest) and source_node.dest_r.name == edge.dest and source_node.dest_r.frame_dest is not None): result["frame_dest"] = _serialise_frame_dest(source_node.dest_r.frame_dest) return result def _serialise_error(error: AssemblyError) -> dict[str, Any]: return { "line": error.loc.line, "column": error.loc.column, "category": error.category.value, "message": error.message, "suggestions": error.suggestions, } def _serialise_region(region: IRRegion) -> dict[str, Any]: node_ids = list(region.body.nodes.keys()) for sub_region in region.body.regions: node_ids.extend(sub_region.body.nodes.keys()) return { "tag": region.tag, "kind": region.kind.value, "node_ids": node_ids, } def _collect_error_node_names(errors: list[AssemblyError], all_nodes: dict[str, IRNode]) -> set[str]: error_lines: set[int] = {e.loc.line for e in errors} return { name for name, node in all_nodes.items() if node.loc.line in error_lines } def _collect_referenced_sm_ids( all_nodes: dict[str, IRNode], graph: IRGraph, ) -> set[int]: """Collect SM IDs referenced by MemOp nodes or data definitions.""" sm_ids: set[int] = set() for node in all_nodes.values(): if isinstance(node.opcode, MemOp) and node.sm_id is not None: sm_ids.add(node.sm_id) for data_def in collect_all_data_defs(graph): if data_def.sm_id is not None: sm_ids.add(data_def.sm_id) return sm_ids def _build_sm_label( sm_id: int, all_nodes: dict[str, IRNode], graph: IRGraph, ) -> str: """Build a label for an SM node showing referenced cell addresses.""" lines = [f"SM {sm_id}"] # Collect cell addresses referenced by MemOp nodes targeting this SM cell_ops: dict[int, list[str]] = {} for node in all_nodes.values(): if isinstance(node.opcode, MemOp) and node.sm_id == sm_id and node.const is not None: addr = node.const mnemonic = OP_TO_MNEMONIC[node.opcode] cell_ops.setdefault(addr, []).append(mnemonic) # Collect data definitions for this SM for data_def in collect_all_data_defs(graph): if data_def.sm_id == sm_id and data_def.cell_addr is not None: addr = data_def.cell_addr cell_ops.setdefault(addr, []).append(f"init={data_def.value}") for addr in sorted(cell_ops): ops = ", ".join(cell_ops[addr]) lines.append(f"[{addr}] {ops}") return "\n".join(lines) def _synthesize_sm_nodes( sm_ids: set[int], all_nodes: dict[str, IRNode], graph: IRGraph, ) -> list[dict[str, Any]]: """Create synthetic graph nodes for each referenced SM instance.""" category = OpcodeCategory.STRUCTURE_MEMORY return [ { "id": f"{SM_NODE_PREFIX}{sm_id}", "opcode": "sm", "label": _build_sm_label(sm_id, all_nodes, graph), "category": category.value, "colour": CATEGORY_COLOURS[category], "const": None, "pe": None, "iram_offset": None, "act_id": None, "has_error": False, "loc": {"line": 0, "column": 0, "end_line": None, "end_column": None}, "sm_id": sm_id, "synthetic": True, } for sm_id in sorted(sm_ids) ] def _synthesize_sm_edges( all_nodes: dict[str, IRNode], ) -> list[dict[str, Any]]: """Create synthetic edges between MemOp nodes and their target SM nodes. Produces: - Request edge: MemOp node → SM node (the memory operation request) - Return edge: SM node → destination node (if a return route exists) """ edges: list[dict[str, Any]] = [] for node in all_nodes.values(): if not isinstance(node.opcode, MemOp) or node.sm_id is None: continue sm_node_id = f"{SM_NODE_PREFIX}{node.sm_id}" # Request edge: instruction → SM edges.append({ "source": node.name, "target": sm_node_id, "port": "REQ", "source_port": None, "has_error": False, "synthetic": True, }) # Return edge: SM → requesting node (data flows back to the reader) if isinstance(node.dest_l, ResolvedDest): edges.append({ "source": sm_node_id, "target": node.name, "port": "RET", "source_port": None, "has_error": False, "synthetic": True, }) return edges def graph_to_json(result: PipelineResult) -> dict[str, Any]: if result.graph is None: return { "type": "graph_update", "stage": result.stage.value, "nodes": [], "edges": [], "regions": [], "errors": [], "parse_error": result.parse_error, "metadata": { "stage": result.stage.value, "pe_count": 0, "sm_count": 0, }, } graph = result.graph all_nodes, all_edges = collect_all_nodes_and_edges(graph) error_lines: set[int] = {e.loc.line for e in result.errors} error_node_names = _collect_error_node_names(result.errors, all_nodes) nodes_json = [ _serialise_node(node, error_node_names) for node in all_nodes.values() ] edges_json = [ _serialise_edge(edge, all_nodes, error_lines) for edge in all_edges ] # Synthesize SM nodes and edges sm_ids = _collect_referenced_sm_ids(all_nodes, graph) nodes_json.extend(_synthesize_sm_nodes(sm_ids, all_nodes, graph)) edges_json.extend(_synthesize_sm_edges(all_nodes)) regions_json = [] for region in graph.regions: if region.kind == RegionKind.FUNCTION: regions_json.append(_serialise_region(region)) errors_json = [_serialise_error(e) for e in result.errors] pe_count = graph.system.pe_count if graph.system else 0 sm_count = graph.system.sm_count if graph.system else 0 return { "type": "graph_update", "stage": result.stage.value, "nodes": nodes_json, "edges": edges_json, "regions": regions_json, "errors": errors_json, "parse_error": None, "metadata": { "stage": result.stage.value, "pe_count": pe_count, "sm_count": sm_count, }, }