OR-1 dataflow CPU sketch
1"""Tests for SM node synthesis in graph JSON output.
2
3Verifies that Structure Memory instances appear as synthesized nodes
4in both dfgraph (static) and monitor (live) graph JSON output.
5"""
6
7from __future__ import annotations
8
9import pytest
10
11from cm_inst import OutputStyle, ArithOp, MemOp, Port, Instruction
12from asm.ir import (
13 IRGraph, IRNode, IREdge, IRDataDef, SourceLoc, SystemConfig, ResolvedDest,
14 collect_all_nodes_and_edges, collect_all_data_defs,
15)
16from dfgraph.graph_json import (
17 SM_NODE_PREFIX,
18 _collect_referenced_sm_ids,
19 _synthesize_sm_nodes,
20 _synthesize_sm_edges,
21 graph_to_json,
22)
23from dfgraph.pipeline import PipelineResult, PipelineStage
24from dfgraph.categories import OpcodeCategory, CATEGORY_COLOURS
25
26
27# ── Helpers ───────────────────────────────────────────────────────────
28
29def _loc():
30 return SourceLoc(1, 1)
31
32
33def _make_graph_with_sm():
34 """Create an IRGraph with a MemOp node targeting SM 0."""
35 writer = IRNode(
36 name="&writer",
37 opcode=MemOp.WRITE,
38 pe=0,
39 iram_offset=0,
40 act_slot=0,
41 loc=_loc(),
42 sm_id=0,
43 const=0,
44 )
45 reader = IRNode(
46 name="&reader",
47 opcode=MemOp.READ,
48 pe=0,
49 iram_offset=1,
50 act_slot=0,
51 loc=_loc(),
52 sm_id=0,
53 const=0,
54 dest_l=ResolvedDest(name="&output", addr=None),
55 )
56 output = IRNode(
57 name="&output",
58 opcode=ArithOp.ADD,
59 pe=0,
60 iram_offset=2,
61 act_slot=0,
62 loc=_loc(),
63 )
64 edge = IREdge(
65 source="&reader",
66 dest="&output",
67 port=Port.L,
68 loc=_loc(),
69 )
70 return IRGraph(
71 nodes={"&writer": writer, "&reader": reader, "&output": output},
72 edges=[edge],
73 system=SystemConfig(pe_count=1, sm_count=1),
74 )
75
76
77def _make_graph_no_sm():
78 """Create an IRGraph with no MemOp nodes."""
79 a = IRNode(name="&a", opcode=ArithOp.ADD, pe=0, iram_offset=0, act_slot=0, loc=_loc())
80 b = IRNode(name="&b", opcode=ArithOp.ADD, pe=0, iram_offset=1, act_slot=0, loc=_loc())
81 edge = IREdge(source="&a", dest="&b", port=Port.L, loc=_loc())
82 return IRGraph(
83 nodes={"&a": a, "&b": b},
84 edges=[edge],
85 system=SystemConfig(pe_count=1, sm_count=0),
86 )
87
88
89def _make_graph_with_datadef():
90 """Create an IRGraph with a datadef referencing SM 0 but no MemOp nodes."""
91 a = IRNode(name="&a", opcode=ArithOp.ADD, pe=0, iram_offset=0, act_slot=0, loc=_loc())
92 graph = IRGraph(
93 nodes={"&a": a},
94 edges=[],
95 system=SystemConfig(pe_count=1, sm_count=1),
96 data_defs=[IRDataDef(name="@d0", sm_id=0, cell_addr=0, value=42, loc=_loc())],
97 )
98 return graph
99
100
101def _make_pipeline_result(graph):
102 """Wrap an IRGraph in a PipelineResult."""
103 return PipelineResult(
104 graph=graph,
105 stage=PipelineStage.ALLOCATE,
106 errors=[],
107 parse_error=None,
108 )
109
110
111# ── Tests: _collect_referenced_sm_ids ─────────────────────────────────
112
113
114class TestCollectReferencedSmIds:
115 def test_from_memop_nodes(self):
116 graph = _make_graph_with_sm()
117 all_nodes, _ = collect_all_nodes_and_edges(graph)
118 sm_ids = _collect_referenced_sm_ids(all_nodes, graph)
119 assert 0 in sm_ids
120
121 def test_from_datadefs(self):
122 graph = _make_graph_with_datadef()
123 all_nodes, _ = collect_all_nodes_and_edges(graph)
124 sm_ids = _collect_referenced_sm_ids(all_nodes, graph)
125 assert 0 in sm_ids
126
127 def test_no_sm_returns_empty(self):
128 graph = _make_graph_no_sm()
129 all_nodes, _ = collect_all_nodes_and_edges(graph)
130 sm_ids = _collect_referenced_sm_ids(all_nodes, graph)
131 assert len(sm_ids) == 0
132
133
134# ── Tests: _synthesize_sm_nodes ───────────────────────────────────────
135
136
137class TestSynthesizeSmNodes:
138 def test_node_structure(self):
139 graph = _make_graph_with_sm()
140 all_nodes, _ = collect_all_nodes_and_edges(graph)
141 nodes = _synthesize_sm_nodes({0}, all_nodes, graph)
142 assert len(nodes) == 1
143 node = nodes[0]
144 assert node["id"] == f"{SM_NODE_PREFIX}0"
145 assert node["opcode"] == "sm"
146 assert node["category"] == "structure_memory"
147 assert node["synthetic"] is True
148 assert node["sm_id"] == 0
149 assert node["has_error"] is False
150 assert node["pe"] is None
151
152 def test_label_includes_cell_addresses(self):
153 graph = _make_graph_with_sm()
154 all_nodes, _ = collect_all_nodes_and_edges(graph)
155 nodes = _synthesize_sm_nodes({0}, all_nodes, graph)
156 label = nodes[0]["label"]
157 assert "SM 0" in label
158 assert "[0]" in label
159
160 def test_multiple_sms_sorted(self):
161 graph = _make_graph_with_sm()
162 all_nodes, _ = collect_all_nodes_and_edges(graph)
163 nodes = _synthesize_sm_nodes({2, 0, 1}, all_nodes, graph)
164 assert [n["sm_id"] for n in nodes] == [0, 1, 2]
165
166 def test_empty_set(self):
167 graph = _make_graph_no_sm()
168 all_nodes, _ = collect_all_nodes_and_edges(graph)
169 nodes = _synthesize_sm_nodes(set(), all_nodes, graph)
170 assert nodes == []
171
172 def test_colour_matches_category(self):
173 graph = _make_graph_with_sm()
174 all_nodes, _ = collect_all_nodes_and_edges(graph)
175 nodes = _synthesize_sm_nodes({0}, all_nodes, graph)
176 expected = CATEGORY_COLOURS[OpcodeCategory.STRUCTURE_MEMORY]
177 assert nodes[0]["colour"] == expected
178
179 def test_label_includes_datadef(self):
180 graph = _make_graph_with_datadef()
181 all_nodes, _ = collect_all_nodes_and_edges(graph)
182 nodes = _synthesize_sm_nodes({0}, all_nodes, graph)
183 label = nodes[0]["label"]
184 assert "SM 0" in label
185 assert "init=42" in label
186
187
188# ── Tests: _synthesize_sm_edges ───────────────────────────────────────
189
190
191class TestSynthesizeSmEdges:
192 def test_request_edges_created(self):
193 graph = _make_graph_with_sm()
194 all_nodes, _ = collect_all_nodes_and_edges(graph)
195 edges = _synthesize_sm_edges(all_nodes)
196 request_edges = [e for e in edges if e["port"] == "REQ"]
197 assert len(request_edges) >= 1
198 for edge in request_edges:
199 assert edge["target"].startswith(SM_NODE_PREFIX)
200 assert edge["synthetic"] is True
201
202 def test_return_edges_for_read_with_dest(self):
203 graph = _make_graph_with_sm()
204 all_nodes, _ = collect_all_nodes_and_edges(graph)
205 edges = _synthesize_sm_edges(all_nodes)
206 return_edges = [e for e in edges if e["source"].startswith(SM_NODE_PREFIX)]
207 assert len(return_edges) >= 1
208 for edge in return_edges:
209 assert edge["synthetic"] is True
210 assert edge["target"] == "&reader"
211 assert edge["port"] == "RET"
212
213 def test_no_edges_without_sm(self):
214 graph = _make_graph_no_sm()
215 all_nodes, _ = collect_all_nodes_and_edges(graph)
216 edges = _synthesize_sm_edges(all_nodes)
217 assert edges == []
218
219
220# ── Tests: graph_to_json integration ──────────────────────────────────
221
222
223class TestGraphToJsonIntegration:
224 def test_sm_nodes_in_output(self):
225 result = _make_pipeline_result(_make_graph_with_sm())
226 json_data = graph_to_json(result)
227 sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")]
228 assert len(sm_nodes) >= 1
229 assert sm_nodes[0]["category"] == "structure_memory"
230
231 def test_sm_edges_in_output(self):
232 result = _make_pipeline_result(_make_graph_with_sm())
233 json_data = graph_to_json(result)
234 sm_edges = [e for e in json_data["edges"] if e.get("synthetic")]
235 assert len(sm_edges) >= 1
236
237 def test_no_sm_nodes_without_sm(self):
238 result = _make_pipeline_result(_make_graph_no_sm())
239 json_data = graph_to_json(result)
240 sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")]
241 assert sm_nodes == []
242
243 def test_no_sm_edges_without_sm(self):
244 result = _make_pipeline_result(_make_graph_no_sm())
245 json_data = graph_to_json(result)
246 sm_edges = [e for e in json_data["edges"] if e.get("synthetic")]
247 assert len(sm_edges) == 0
248
249 def test_datadef_only_creates_sm_node(self):
250 result = _make_pipeline_result(_make_graph_with_datadef())
251 json_data = graph_to_json(result)
252 sm_nodes = [n for n in json_data["nodes"] if n.get("synthetic")]
253 assert len(sm_nodes) == 1
254 assert sm_nodes[0]["sm_id"] == 0