OR-1 dataflow CPU sketch
1"""Tests for built-in macro library (Phase 7).
2
3Tests verify:
4- dfasm-macros.AC8.1: Built-in macros available without explicit import
5- dfasm-macros.AC8.2: User macro with same name shadows built-in
6- dfasm-macros.AC8.3: #loop_counted expands to correct counted loop topology
7- dfasm-macros.AC8.4: Program using built-in macros assembles and runs in emulator
8"""
9
10from pathlib import Path
11
12import simpy
13from lark import Lark
14
15from asm import assemble, run_pipeline
16from asm.expand import expand
17from asm.ir import IRGraph
18from asm.lower import lower
19from emu import build_topology
20
21
22def _get_parser():
23 """Get the dfasm parser."""
24 grammar_path = Path(__file__).parent.parent / "dfasm.lark"
25 return Lark(
26 grammar_path.read_text(),
27 parser="earley",
28 propagate_positions=True,
29 )
30
31
32def parse_and_lower(source: str) -> IRGraph:
33 """Parse source and lower to IRGraph (before expansion)."""
34 parser = _get_parser()
35 tree = parser.parse(source)
36 return lower(tree)
37
38
39def parse_lower_expand(source: str) -> IRGraph:
40 """Parse, lower, and expand WITHOUT built-in macros."""
41 graph = parse_and_lower(source)
42 return expand(graph)
43
44
45def run_program_direct(source: str, until: int = 1000) -> dict:
46 """Assemble source in direct mode, run through emulator.
47
48 Args:
49 source: dfasm source code as a string
50 until: Simulation timeout in time units (default: 1000)
51
52 Returns:
53 Dict mapping PE ID to list of output tokens from that PE
54 """
55 result = assemble(source)
56 env = simpy.Environment()
57 sys = build_topology(env, result.pe_configs, result.sm_configs)
58
59 # Inject setup tokens first (frame/IRAM initialization)
60 for setup in result.setup_tokens:
61 sys.inject(setup)
62
63 # Then inject seed tokens
64 for seed in result.seed_tokens:
65 sys.inject(seed)
66
67 env.run(until=until)
68
69 # Collect output from each PE's output_log
70 outputs = {}
71 for pe_id, pe in sys.pes.items():
72 outputs[pe_id] = list(pe.output_log)
73
74 return outputs
75
76
77class TestAC81_BuiltinAvailable:
78 """AC8.1: Built-in macros available without explicit import."""
79
80 def test_builtins_loaded_from_constant(self):
81 """Verify that built-in macros are available as constant."""
82 from asm.builtins import _BUILTIN_LINE_COUNT, BUILTIN_MACROS
83
84 assert len(BUILTIN_MACROS) > 0
85 assert _BUILTIN_LINE_COUNT > 0
86
87 assert "#loop_counted" in BUILTIN_MACROS
88 assert "#permit_inject" in BUILTIN_MACROS
89 assert "#reduce_2 op" in BUILTIN_MACROS
90
91 def test_builtins_prepended_to_pipeline(self):
92 """Verify that built-in macros are prepended in run_pipeline."""
93 source = """
94 @system pe=1, sm=0
95 &c <| const, 42
96 """
97 graph = run_pipeline(source)
98 assert graph is not None
99 assert len(graph.errors) == 0
100
101 def test_builtin_line_offset_set(self):
102 """Verify that builtin_line_offset is set on returned graph."""
103 source = """
104 @system pe=1, sm=0
105 &c <| const, 42
106 """
107 graph = run_pipeline(source)
108 assert graph.builtin_line_offset > 0, "builtin_line_offset should be set"
109
110 def test_builtin_macro_invocation_expands(self):
111 """Invoking a built-in macro through pipeline produces expanded nodes."""
112 source = """
113 @system pe=1, sm=0
114 &source <| pass
115 &sink <| pass
116 #permit_inject &source |> &sink
117 """
118 graph = run_pipeline(source)
119 assert len(graph.errors) == 0
120
121 node_names = list(graph.nodes.keys())
122 has_p = any("&p" in n and "permit_inject" in n for n in node_names)
123 assert has_p, f"Expected &p node from #permit_inject expansion in {node_names}"
124
125 p_node = next(
126 n
127 for n in graph.nodes.values()
128 if "&p" in n.name and "permit_inject" in n.name
129 )
130 assert p_node.const == 1, "permit_inject &p should have const=1"
131
132
133class TestAC82_UserMacroShadows:
134 """AC8.2: User macro with same name shadows built-in."""
135
136 def test_user_defined_macro_shadows_builtin(self):
137 """User-defined macro with same name shadows built-in.
138
139 Verifies that when a user defines #permit_inject with custom body,
140 their definition is used instead of the built-in version.
141 """
142 source = """
143 @system pe=1, sm=0
144
145 ; User defines #permit_inject with a custom body
146 #permit_inject *targets |> {
147 &custom_node <| const, 99
148 }
149
150 ; Invoke the user-defined macro
151 &sink <| pass
152 #permit_inject &sink
153 """
154 graph = run_pipeline(source)
155
156 node_names = list(graph.nodes.keys())
157 has_custom_node = any("&custom_node" in n for n in node_names)
158
159 assert has_custom_node, (
160 f"Expected &custom_node from user macro shadowing built-in in {node_names}"
161 )
162
163 custom_node = next(n for n in graph.nodes.values() if "&custom_node" in n.name)
164 assert custom_node.const == 99, "User's shadowing macro should use const=99"
165
166 def test_builtin_available_when_not_shadowed(self):
167 """Built-in macro is used when user doesn't define it."""
168 source = """
169 @system pe=1, sm=0
170 &sink <| pass
171 #reduce_2 add |> &sink
172 """
173 graph = run_pipeline(source)
174 assert graph is not None
175 assert len(graph.errors) == 0
176
177 node_names = list(graph.nodes.keys())
178 has_r = any("&r" in n for n in node_names)
179 assert has_r, f"Expected &r node from #reduce_2 expansion in {node_names}"
180
181
182class TestAC83_LoopCountedTopology:
183 """AC8.3: #loop_counted macro defines correct loop topology."""
184
185 def test_loop_counted_invoked_expands_with_required_opcodes(self):
186 """#loop_counted invoked through pipeline expands with correct opcodes.
187
188 Verifies expanded graph contains nodes with:
189 - add (counter arithmetic)
190 - brgt (greater-than comparison)
191 - inc (increment)
192 """
193 source = """
194 @system pe=1, sm=0
195 &init <| const, 0
196 &limit <| const, 10
197 &body <| pass
198 &exit <| pass
199 #loop_counted &init, &limit |> body=&body, exit=&exit
200 """
201 graph = run_pipeline(source)
202 assert len(graph.errors) == 0
203
204 from cm_inst import ArithOp, RoutingOp
205
206 opcode_names = set()
207 for node in graph.nodes.values():
208 if node.opcode is not None:
209 if isinstance(node.opcode, ArithOp):
210 opcode_names.add(node.opcode.name.lower())
211 elif isinstance(node.opcode, RoutingOp):
212 opcode_names.add(node.opcode.name.lower())
213
214 assert "add" in opcode_names, f"Expected 'add' opcode, got: {opcode_names}"
215 assert "brgt" in opcode_names, f"Expected 'brgt' opcode, got: {opcode_names}"
216 assert "inc" in opcode_names, f"Expected 'inc' opcode, got: {opcode_names}"
217
218 def test_loop_counted_invoked_creates_feedback_topology(self):
219 """#loop_counted invoked creates feedback arc from increment to counter.
220
221 Verifies the graph has correct loop feedback topology:
222 counter -> compare, compare -> inc, inc -> counter (feedback).
223 """
224 source = """
225 @system pe=1, sm=0
226 &init <| const, 0
227 &limit <| const, 10
228 &body <| pass
229 &exit <| pass
230 #loop_counted &init, &limit |> body=&body, exit=&exit
231 """
232 graph = run_pipeline(source)
233
234 from cm_inst import ArithOp
235
236 add_nodes = [
237 n
238 for n, node in graph.nodes.items()
239 if isinstance(node.opcode, ArithOp) and node.opcode == ArithOp.ADD
240 ]
241 inc_nodes = [
242 n
243 for n, node in graph.nodes.items()
244 if isinstance(node.opcode, ArithOp) and node.opcode == ArithOp.INC
245 ]
246
247 assert len(add_nodes) >= 1, (
248 f"Expected at least 1 'add' node, got {len(add_nodes)}"
249 )
250 assert len(inc_nodes) >= 1, (
251 f"Expected at least 1 'inc' node, got {len(inc_nodes)}"
252 )
253
254 edge_pairs = [(edge.source, edge.dest, edge.port) for edge in graph.edges]
255
256 add_node = add_nodes[0]
257 inc_node = inc_nodes[0]
258 has_feedback = any(
259 src == inc_node
260 and dst == add_node
261 and (port.name == "R" if hasattr(port, "name") else port == "R")
262 for src, dst, port in edge_pairs
263 )
264 assert has_feedback, (
265 f"Expected feedback edge from inc to counter, edges: {edge_pairs}"
266 )
267
268
269class TestAC84_EndToEnd:
270 """AC8.4: Program using built-in macros assembles and runs in emulator."""
271
272 def test_builtin_reduce_2_invoked_assembles(self):
273 """#reduce_2 add invocation assembles through the full pipeline."""
274 source = """
275 @system pe=1, sm=0
276 &out <| pass
277 #reduce_2 add |> &out
278 """
279 result = assemble(source)
280 assert result is not None, "assemble() should succeed"
281 assert len(result.pe_configs) > 0, "Should have PE configs"
282
283 def test_builtin_reduce_2_runs_in_emulator(self):
284 """#reduce_2 add runs through emulator without error."""
285 source = """
286 @system pe=1, sm=0
287 &out <| pass
288 #reduce_2 add |> &out
289 """
290 outputs = run_program_direct(source, until=500)
291 assert 0 in outputs, "PE 0 should exist in outputs"
292
293 def test_builtin_reduce_2_produces_output_when_wired(self):
294 """#reduce_2 add produces correct sum when inputs and output are wired via @ret."""
295 source = """
296 @system pe=1, sm=0
297 &a <| const, 3
298 &b <| const, 4
299 &out <| pass
300 #reduce_2 add |> &out
301 &a |> #reduce_2_0.&r:L
302 &b |> #reduce_2_0.&r:R
303 """
304 outputs = run_program_direct(source, until=500)
305 all_values = []
306 for pe_outputs in outputs.values():
307 all_values.extend([t.data for t in pe_outputs if hasattr(t, "data")])
308 assert 7 in all_values, f"Expected 3+4=7 in outputs, got {all_values}"
309
310 def test_builtin_permit_inject_assembles_and_runs(self):
311 """#permit_inject assembles and expands to const nodes."""
312 source = """
313 @system pe=1, sm=0
314 &source <| pass
315 &sink <| pass
316 #permit_inject &source |> &sink
317 """
318 result = assemble(source)
319 assert result is not None
320 assert len(result.pe_configs) > 0
321
322 outputs = run_program_direct(source, until=500)
323 assert 0 in outputs, "PE 0 should exist in outputs"
324
325 def test_builtin_reduce_3_assembles(self):
326 """#reduce_3 add invocation assembles through the full pipeline."""
327 source = """
328 @system pe=1, sm=0
329 &out <| pass
330 #reduce_3 add |> &out
331 """
332 result = assemble(source)
333 assert result is not None, "assemble() should succeed"
334 assert len(result.pe_configs) > 0, "Should have PE configs"
335
336 def test_builtin_reduce_3_runs_in_emulator(self):
337 """#reduce_3 add runs through emulator without error."""
338 source = """
339 @system pe=1, sm=0
340 &out <| pass
341 #reduce_3 add |> &out
342 """
343 outputs = run_program_direct(source, until=500)
344 assert 0 in outputs, "PE 0 should exist in outputs"
345
346
347class TestBuiltinSyntaxValidation:
348 """Verify that all built-in macros have valid syntax."""
349
350 def test_all_builtins_parse(self):
351 """All built-in macros parse without syntax errors."""
352 from asm.builtins import BUILTIN_MACROS
353
354 parser = _get_parser()
355 tree = parser.parse(BUILTIN_MACROS)
356 assert tree is not None
357
358 def test_permit_inject_defined_in_builtins(self):
359 """#permit_inject variadic macro is defined."""
360 from asm.builtins import BUILTIN_MACROS
361
362 assert "#permit_inject *targets" in BUILTIN_MACROS, (
363 "Expected variadic #permit_inject definition in BUILTIN_MACROS"
364 )
365
366 def test_reduce_variants_defined_in_builtins(self):
367 """All #reduce_2 through #reduce_4 are defined with op parameter."""
368 from asm.builtins import BUILTIN_MACROS
369
370 for i in range(2, 5):
371 macro_name = f"#reduce_{i}"
372 assert f"{macro_name} op" in BUILTIN_MACROS, (
373 f"Expected '{macro_name} op' definition in BUILTIN_MACROS"
374 )
375
376 start = BUILTIN_MACROS.find(f"{macro_name} op")
377 end = BUILTIN_MACROS.find("\n#", start + 1)
378 if end == -1:
379 end = len(BUILTIN_MACROS)
380 macro_body = BUILTIN_MACROS[start:end]
381
382 assert "${op}" in macro_body, (
383 f"{macro_name} should use '${{op}}' parameter for opcode"
384 )
385
386
387class TestBuiltinComposition:
388 """Test composition of built-in macros with user-defined code."""
389
390 def test_builtins_dont_interfere_with_user_code(self):
391 """Built-in macros don't cause errors in simple user code."""
392 source = """
393 @system pe=1, sm=0
394 &a <| const, 10
395 &b <| const, 20
396 &sum <| add
397 &a |> &sum:L
398 &b |> &sum:R
399 &out <| pass
400 &sum |> &out:L
401 """
402 graph = run_pipeline(source)
403 assert len(graph.errors) == 0, f"Unexpected errors: {graph.errors}"
404
405 def test_user_macros_work_alongside_builtins(self):
406 """User-defined macros work in same program as built-ins."""
407 source = """
408 @system pe=1, sm=0
409
410 #helper |> {
411 &internal <| pass
412 }
413
414 #helper
415 """
416 graph = parse_lower_expand(source)
417
418 node_names = list(graph.nodes.keys())
419 has_helper = any("#helper_0" in n for n in node_names)
420 assert has_helper, f"Expected helper_0 nodes in {node_names}"
421
422 def test_builtin_and_user_macros_in_same_program(self):
423 """A program using both built-in and user macros works correctly."""
424 source = """
425 @system pe=1, sm=0
426
427 #my_const |> {
428 &val <| const, 77
429 }
430
431 &sink <| pass
432 &source <| add, #my_const
433
434 #permit_inject &source |> &sink
435 """
436 graph = run_pipeline(source)
437 assert len(graph.errors) == 0
438
439 node_names = list(graph.nodes.keys())
440 has_val = any("&source" in n for n in node_names)
441 has_p = any("&p" in n and "permit_inject" in n for n in node_names)
442 assert has_val, f"Expected user macro &source in {node_names}"
443 assert has_p, f"Expected builtin &p in {node_names}"
444
445
446class TestLineNumberOffset:
447 """Test that line numbers in user code are correct despite built-in prepending."""
448
449 def test_builtin_line_offset_is_tracked(self):
450 """Verify that builtin_line_offset is tracked on IRGraph."""
451 source = """
452 @system pe=1, sm=0
453 &const|pe0 <| const, 42
454 """
455 graph = run_pipeline(source)
456 assert graph.builtin_line_offset > 0, "builtin_line_offset should be > 0"
457
458 def test_builtin_offset_allows_error_line_adjustment(self):
459 """Verify builtin_line_offset is available for error message adjustment."""
460 source = """
461 @system pe=1, sm=0
462 &c <| const, 42
463 """
464 graph = run_pipeline(source)
465
466 assert hasattr(graph, "builtin_line_offset")
467 assert isinstance(graph.builtin_line_offset, int)