OR-1 dataflow CPU sketch
at main 254 lines 8.9 kB view raw
1"""Tests for SM node synthesis in graph JSON output. 2 3Verifies that Structure Memory instances appear as synthesized nodes 4in both dfgraph (static) and monitor (live) graph JSON output. 5""" 6 7from __future__ import annotations 8 9import pytest 10 11from cm_inst import OutputStyle, ArithOp, MemOp, Port, Instruction 12from asm.ir import ( 13 IRGraph, IRNode, IREdge, IRDataDef, SourceLoc, SystemConfig, ResolvedDest, 14 collect_all_nodes_and_edges, collect_all_data_defs, 15) 16from dfgraph.graph_json import ( 17 SM_NODE_PREFIX, 18 _collect_referenced_sm_ids, 19 _synthesize_sm_nodes, 20 _synthesize_sm_edges, 21 graph_to_json, 22) 23from dfgraph.pipeline import PipelineResult, PipelineStage 24from dfgraph.categories import OpcodeCategory, CATEGORY_COLOURS 25 26 27# ── Helpers ─────────────────────────────────────────────────────────── 28 29def _loc(): 30 return SourceLoc(1, 1) 31 32 33def _make_graph_with_sm(): 34 """Create an IRGraph with a MemOp node targeting SM 0.""" 35 writer = IRNode( 36 name="&writer", 37 opcode=MemOp.WRITE, 38 pe=0, 39 iram_offset=0, 40 act_slot=0, 41 loc=_loc(), 42 sm_id=0, 43 const=0, 44 ) 45 reader = IRNode( 46 name="&reader", 47 opcode=MemOp.READ, 48 pe=0, 49 iram_offset=1, 50 act_slot=0, 51 loc=_loc(), 52 sm_id=0, 53 const=0, 54 dest_l=ResolvedDest(name="&output", addr=None), 55 ) 56 output = IRNode( 57 name="&output", 58 opcode=ArithOp.ADD, 59 pe=0, 60 iram_offset=2, 61 act_slot=0, 62 loc=_loc(), 63 ) 64 edge = IREdge( 65 source="&reader", 66 dest="&output", 67 port=Port.L, 68 loc=_loc(), 69 ) 70 return IRGraph( 71 nodes={"&writer": writer, "&reader": reader, "&output": output}, 72 edges=[edge], 73 system=SystemConfig(pe_count=1, sm_count=1), 74 ) 75 76 77def _make_graph_no_sm(): 78 """Create an IRGraph with no MemOp nodes.""" 79 a = IRNode(name="&a", opcode=ArithOp.ADD, pe=0, iram_offset=0, act_slot=0, loc=_loc()) 80 b = IRNode(name="&b", opcode=ArithOp.ADD, pe=0, iram_offset=1, act_slot=0, loc=_loc()) 81 edge = IREdge(source="&a", dest="&b", port=Port.L, loc=_loc()) 82 return IRGraph( 83 nodes={"&a": a, "&b": b}, 84 edges=[edge], 85 system=SystemConfig(pe_count=1, sm_count=0), 86 ) 87 88 89def _make_graph_with_datadef(): 90 """Create an IRGraph with a datadef referencing SM 0 but no MemOp nodes.""" 91 a = IRNode(name="&a", opcode=ArithOp.ADD, pe=0, iram_offset=0, act_slot=0, loc=_loc()) 92 graph = IRGraph( 93 nodes={"&a": a}, 94 edges=[], 95 system=SystemConfig(pe_count=1, sm_count=1), 96 data_defs=[IRDataDef(name="@d0", sm_id=0, cell_addr=0, value=42, loc=_loc())], 97 ) 98 return graph 99 100 101def _make_pipeline_result(graph): 102 """Wrap an IRGraph in a PipelineResult.""" 103 return PipelineResult( 104 graph=graph, 105 stage=PipelineStage.ALLOCATE, 106 errors=[], 107 parse_error=None, 108 ) 109 110 111# ── Tests: _collect_referenced_sm_ids ───────────────────────────────── 112 113 114class TestCollectReferencedSmIds: 115 def test_from_memop_nodes(self): 116 graph = _make_graph_with_sm() 117 all_nodes, _ = collect_all_nodes_and_edges(graph) 118 sm_ids = _collect_referenced_sm_ids(all_nodes, graph) 119 assert 0 in sm_ids 120 121 def test_from_datadefs(self): 122 graph = _make_graph_with_datadef() 123 all_nodes, _ = collect_all_nodes_and_edges(graph) 124 sm_ids = _collect_referenced_sm_ids(all_nodes, graph) 125 assert 0 in sm_ids 126 127 def test_no_sm_returns_empty(self): 128 graph = _make_graph_no_sm() 129 all_nodes, _ = collect_all_nodes_and_edges(graph) 130 sm_ids = _collect_referenced_sm_ids(all_nodes, graph) 131 assert len(sm_ids) == 0 132 133 134# ── Tests: _synthesize_sm_nodes ─────────────────────────────────────── 135 136 137class TestSynthesizeSmNodes: 138 def test_node_structure(self): 139 graph = _make_graph_with_sm() 140 all_nodes, _ = collect_all_nodes_and_edges(graph) 141 nodes = _synthesize_sm_nodes({0}, all_nodes, graph) 142 assert len(nodes) == 1 143 node = nodes[0] 144 assert node["id"] == f"{SM_NODE_PREFIX}0" 145 assert node["opcode"] == "sm" 146 assert node["category"] == "structure_memory" 147 assert node["synthetic"] is True 148 assert node["sm_id"] == 0 149 assert node["has_error"] is False 150 assert node["pe"] is None 151 152 def test_label_includes_cell_addresses(self): 153 graph = _make_graph_with_sm() 154 all_nodes, _ = collect_all_nodes_and_edges(graph) 155 nodes = _synthesize_sm_nodes({0}, all_nodes, graph) 156 label = nodes[0]["label"] 157 assert "SM 0" in label 158 assert "[0]" in label 159 160 def test_multiple_sms_sorted(self): 161 graph = _make_graph_with_sm() 162 all_nodes, _ = collect_all_nodes_and_edges(graph) 163 nodes = _synthesize_sm_nodes({2, 0, 1}, all_nodes, graph) 164 assert [n["sm_id"] for n in nodes] == [0, 1, 2] 165 166 def test_empty_set(self): 167 graph = _make_graph_no_sm() 168 all_nodes, _ = collect_all_nodes_and_edges(graph) 169 nodes = _synthesize_sm_nodes(set(), all_nodes, graph) 170 assert nodes == [] 171 172 def test_colour_matches_category(self): 173 graph = _make_graph_with_sm() 174 all_nodes, _ = collect_all_nodes_and_edges(graph) 175 nodes = _synthesize_sm_nodes({0}, all_nodes, graph) 176 expected = CATEGORY_COLOURS[OpcodeCategory.STRUCTURE_MEMORY] 177 assert nodes[0]["colour"] == expected 178 179 def test_label_includes_datadef(self): 180 graph = _make_graph_with_datadef() 181 all_nodes, _ = collect_all_nodes_and_edges(graph) 182 nodes = _synthesize_sm_nodes({0}, all_nodes, graph) 183 label = nodes[0]["label"] 184 assert "SM 0" in label 185 assert "init=42" in label 186 187 188# ── Tests: _synthesize_sm_edges ─────────────────────────────────────── 189 190 191class TestSynthesizeSmEdges: 192 def test_request_edges_created(self): 193 graph = _make_graph_with_sm() 194 all_nodes, _ = collect_all_nodes_and_edges(graph) 195 edges = _synthesize_sm_edges(all_nodes) 196 request_edges = [e for e in edges if e["port"] == "REQ"] 197 assert len(request_edges) >= 1 198 for edge in request_edges: 199 assert edge["target"].startswith(SM_NODE_PREFIX) 200 assert edge["synthetic"] is True 201 202 def test_return_edges_for_read_with_dest(self): 203 graph = _make_graph_with_sm() 204 all_nodes, _ = collect_all_nodes_and_edges(graph) 205 edges = _synthesize_sm_edges(all_nodes) 206 return_edges = [e for e in edges if e["source"].startswith(SM_NODE_PREFIX)] 207 assert len(return_edges) >= 1 208 for edge in return_edges: 209 assert edge["synthetic"] is True 210 assert edge["target"] == "&reader" 211 assert edge["port"] == "RET" 212 213 def test_no_edges_without_sm(self): 214 graph = _make_graph_no_sm() 215 all_nodes, _ = collect_all_nodes_and_edges(graph) 216 edges = _synthesize_sm_edges(all_nodes) 217 assert edges == [] 218 219 220# ── Tests: graph_to_json integration ────────────────────────────────── 221 222 223class TestGraphToJsonIntegration: 224 def test_sm_nodes_in_output(self): 225 result = _make_pipeline_result(_make_graph_with_sm()) 226 json_data = graph_to_json(result) 227 sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")] 228 assert len(sm_nodes) >= 1 229 assert sm_nodes[0]["category"] == "structure_memory" 230 231 def test_sm_edges_in_output(self): 232 result = _make_pipeline_result(_make_graph_with_sm()) 233 json_data = graph_to_json(result) 234 sm_edges = [e for e in json_data["edges"] if e.get("synthetic")] 235 assert len(sm_edges) >= 1 236 237 def test_no_sm_nodes_without_sm(self): 238 result = _make_pipeline_result(_make_graph_no_sm()) 239 json_data = graph_to_json(result) 240 sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")] 241 assert sm_nodes == [] 242 243 def test_no_sm_edges_without_sm(self): 244 result = _make_pipeline_result(_make_graph_no_sm()) 245 json_data = graph_to_json(result) 246 sm_edges = [e for e in json_data["edges"] if e.get("synthetic")] 247 assert len(sm_edges) == 0 248 249 def test_datadef_only_creates_sm_node(self): 250 result = _make_pipeline_result(_make_graph_with_datadef()) 251 json_data = graph_to_json(result) 252 sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")] 253 assert len(sm_nodes) == 1 254 assert sm_nodes[0]["sm_id"] == 0