"""Tests for SM node synthesis in graph JSON output. Verifies that Structure Memory instances appear as synthesized nodes in both dfgraph (static) and monitor (live) graph JSON output. """ from __future__ import annotations import pytest from cm_inst import OutputStyle, ArithOp, MemOp, Port, Instruction from asm.ir import ( IRGraph, IRNode, IREdge, IRDataDef, SourceLoc, SystemConfig, ResolvedDest, collect_all_nodes_and_edges, collect_all_data_defs, ) from dfgraph.graph_json import ( SM_NODE_PREFIX, _collect_referenced_sm_ids, _synthesize_sm_nodes, _synthesize_sm_edges, graph_to_json, ) from dfgraph.pipeline import PipelineResult, PipelineStage from dfgraph.categories import OpcodeCategory, CATEGORY_COLOURS # ── Helpers ─────────────────────────────────────────────────────────── def _loc(): return SourceLoc(1, 1) def _make_graph_with_sm(): """Create an IRGraph with a MemOp node targeting SM 0.""" writer = IRNode( name="&writer", opcode=MemOp.WRITE, pe=0, iram_offset=0, act_slot=0, loc=_loc(), sm_id=0, const=0, ) reader = IRNode( name="&reader", opcode=MemOp.READ, pe=0, iram_offset=1, act_slot=0, loc=_loc(), sm_id=0, const=0, dest_l=ResolvedDest(name="&output", addr=None), ) output = IRNode( name="&output", opcode=ArithOp.ADD, pe=0, iram_offset=2, act_slot=0, loc=_loc(), ) edge = IREdge( source="&reader", dest="&output", port=Port.L, loc=_loc(), ) return IRGraph( nodes={"&writer": writer, "&reader": reader, "&output": output}, edges=[edge], system=SystemConfig(pe_count=1, sm_count=1), ) def _make_graph_no_sm(): """Create an IRGraph with no MemOp nodes.""" a = IRNode(name="&a", opcode=ArithOp.ADD, pe=0, iram_offset=0, act_slot=0, loc=_loc()) b = IRNode(name="&b", opcode=ArithOp.ADD, pe=0, iram_offset=1, act_slot=0, loc=_loc()) edge = IREdge(source="&a", dest="&b", port=Port.L, loc=_loc()) return IRGraph( nodes={"&a": a, "&b": b}, edges=[edge], system=SystemConfig(pe_count=1, sm_count=0), ) def _make_graph_with_datadef(): """Create an IRGraph with a datadef referencing SM 0 but no MemOp nodes.""" a = IRNode(name="&a", opcode=ArithOp.ADD, pe=0, iram_offset=0, act_slot=0, loc=_loc()) graph = IRGraph( nodes={"&a": a}, edges=[], system=SystemConfig(pe_count=1, sm_count=1), data_defs=[IRDataDef(name="@d0", sm_id=0, cell_addr=0, value=42, loc=_loc())], ) return graph def _make_pipeline_result(graph): """Wrap an IRGraph in a PipelineResult.""" return PipelineResult( graph=graph, stage=PipelineStage.ALLOCATE, errors=[], parse_error=None, ) # ── Tests: _collect_referenced_sm_ids ───────────────────────────────── class TestCollectReferencedSmIds: def test_from_memop_nodes(self): graph = _make_graph_with_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) sm_ids = _collect_referenced_sm_ids(all_nodes, graph) assert 0 in sm_ids def test_from_datadefs(self): graph = _make_graph_with_datadef() all_nodes, _ = collect_all_nodes_and_edges(graph) sm_ids = _collect_referenced_sm_ids(all_nodes, graph) assert 0 in sm_ids def test_no_sm_returns_empty(self): graph = _make_graph_no_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) sm_ids = _collect_referenced_sm_ids(all_nodes, graph) assert len(sm_ids) == 0 # ── Tests: _synthesize_sm_nodes ─────────────────────────────────────── class TestSynthesizeSmNodes: def test_node_structure(self): graph = _make_graph_with_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) nodes = _synthesize_sm_nodes({0}, all_nodes, graph) assert len(nodes) == 1 node = nodes[0] assert node["id"] == f"{SM_NODE_PREFIX}0" assert node["opcode"] == "sm" assert node["category"] == "structure_memory" assert node["synthetic"] is True assert node["sm_id"] == 0 assert node["has_error"] is False assert node["pe"] is None def test_label_includes_cell_addresses(self): graph = _make_graph_with_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) nodes = _synthesize_sm_nodes({0}, all_nodes, graph) label = nodes[0]["label"] assert "SM 0" in label assert "[0]" in label def test_multiple_sms_sorted(self): graph = _make_graph_with_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) nodes = _synthesize_sm_nodes({2, 0, 1}, all_nodes, graph) assert [n["sm_id"] for n in nodes] == [0, 1, 2] def test_empty_set(self): graph = _make_graph_no_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) nodes = _synthesize_sm_nodes(set(), all_nodes, graph) assert nodes == [] def test_colour_matches_category(self): graph = _make_graph_with_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) nodes = _synthesize_sm_nodes({0}, all_nodes, graph) expected = CATEGORY_COLOURS[OpcodeCategory.STRUCTURE_MEMORY] assert nodes[0]["colour"] == expected def test_label_includes_datadef(self): graph = _make_graph_with_datadef() all_nodes, _ = collect_all_nodes_and_edges(graph) nodes = _synthesize_sm_nodes({0}, all_nodes, graph) label = nodes[0]["label"] assert "SM 0" in label assert "init=42" in label # ── Tests: _synthesize_sm_edges ─────────────────────────────────────── class TestSynthesizeSmEdges: def test_request_edges_created(self): graph = _make_graph_with_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) edges = _synthesize_sm_edges(all_nodes) request_edges = [e for e in edges if e["port"] == "REQ"] assert len(request_edges) >= 1 for edge in request_edges: assert edge["target"].startswith(SM_NODE_PREFIX) assert edge["synthetic"] is True def test_return_edges_for_read_with_dest(self): graph = _make_graph_with_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) edges = _synthesize_sm_edges(all_nodes) return_edges = [e for e in edges if e["source"].startswith(SM_NODE_PREFIX)] assert len(return_edges) >= 1 for edge in return_edges: assert edge["synthetic"] is True assert edge["target"] == "&reader" assert edge["port"] == "RET" def test_no_edges_without_sm(self): graph = _make_graph_no_sm() all_nodes, _ = collect_all_nodes_and_edges(graph) edges = _synthesize_sm_edges(all_nodes) assert edges == [] # ── Tests: graph_to_json integration ────────────────────────────────── class TestGraphToJsonIntegration: def test_sm_nodes_in_output(self): result = _make_pipeline_result(_make_graph_with_sm()) json_data = graph_to_json(result) sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")] assert len(sm_nodes) >= 1 assert sm_nodes[0]["category"] == "structure_memory" def test_sm_edges_in_output(self): result = _make_pipeline_result(_make_graph_with_sm()) json_data = graph_to_json(result) sm_edges = [e for e in json_data["edges"] if e.get("synthetic")] assert len(sm_edges) >= 1 def test_no_sm_nodes_without_sm(self): result = _make_pipeline_result(_make_graph_no_sm()) json_data = graph_to_json(result) sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")] assert sm_nodes == [] def test_no_sm_edges_without_sm(self): result = _make_pipeline_result(_make_graph_no_sm()) json_data = graph_to_json(result) sm_edges = [e for e in json_data["edges"] if e.get("synthetic")] assert len(sm_edges) == 0 def test_datadef_only_creates_sm_node(self): result = _make_pipeline_result(_make_graph_with_datadef()) json_data = graph_to_json(result) sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")] assert len(sm_nodes) == 1 assert sm_nodes[0]["sm_id"] == 0