"""Tests for built-in macro library (Phase 7). Tests verify: - dfasm-macros.AC8.1: Built-in macros available without explicit import - dfasm-macros.AC8.2: User macro with same name shadows built-in - dfasm-macros.AC8.3: #loop_counted expands to correct counted loop topology - dfasm-macros.AC8.4: Program using built-in macros assembles and runs in emulator """ from pathlib import Path import simpy from lark import Lark from asm import assemble, run_pipeline from asm.expand import expand from asm.ir import IRGraph from asm.lower import lower from emu import build_topology def _get_parser(): """Get the dfasm parser.""" grammar_path = Path(__file__).parent.parent / "dfasm.lark" return Lark( grammar_path.read_text(), parser="earley", propagate_positions=True, ) def parse_and_lower(source: str) -> IRGraph: """Parse source and lower to IRGraph (before expansion).""" parser = _get_parser() tree = parser.parse(source) return lower(tree) def parse_lower_expand(source: str) -> IRGraph: """Parse, lower, and expand WITHOUT built-in macros.""" graph = parse_and_lower(source) return expand(graph) def run_program_direct(source: str, until: int = 1000) -> dict: """Assemble source in direct mode, run through emulator. Args: source: dfasm source code as a string until: Simulation timeout in time units (default: 1000) Returns: Dict mapping PE ID to list of output tokens from that PE """ result = assemble(source) env = simpy.Environment() sys = build_topology(env, result.pe_configs, result.sm_configs) # Inject setup tokens first (frame/IRAM initialization) for setup in result.setup_tokens: sys.inject(setup) # Then inject seed tokens for seed in result.seed_tokens: sys.inject(seed) env.run(until=until) # Collect output from each PE's output_log outputs = {} for pe_id, pe in sys.pes.items(): outputs[pe_id] = list(pe.output_log) return outputs class TestAC81_BuiltinAvailable: """AC8.1: Built-in macros available without explicit import.""" def test_builtins_loaded_from_constant(self): """Verify that built-in macros are available as constant.""" from asm.builtins import _BUILTIN_LINE_COUNT, BUILTIN_MACROS assert len(BUILTIN_MACROS) > 0 assert _BUILTIN_LINE_COUNT > 0 assert "#loop_counted" in BUILTIN_MACROS assert "#permit_inject" in BUILTIN_MACROS assert "#reduce_2 op" in BUILTIN_MACROS def test_builtins_prepended_to_pipeline(self): """Verify that built-in macros are prepended in run_pipeline.""" source = """ @system pe=1, sm=0 &c <| const, 42 """ graph = run_pipeline(source) assert graph is not None assert len(graph.errors) == 0 def test_builtin_line_offset_set(self): """Verify that builtin_line_offset is set on returned graph.""" source = """ @system pe=1, sm=0 &c <| const, 42 """ graph = run_pipeline(source) assert graph.builtin_line_offset > 0, "builtin_line_offset should be set" def test_builtin_macro_invocation_expands(self): """Invoking a built-in macro through pipeline produces expanded nodes.""" source = """ @system pe=1, sm=0 &source <| pass &sink <| pass #permit_inject &source |> &sink """ graph = run_pipeline(source) assert len(graph.errors) == 0 node_names = list(graph.nodes.keys()) has_p = any("&p" in n and "permit_inject" in n for n in node_names) assert has_p, f"Expected &p node from #permit_inject expansion in {node_names}" p_node = next( n for n in graph.nodes.values() if "&p" in n.name and "permit_inject" in n.name ) assert p_node.const == 1, "permit_inject &p should have const=1" class TestAC82_UserMacroShadows: """AC8.2: User macro with same name shadows built-in.""" def test_user_defined_macro_shadows_builtin(self): """User-defined macro with same name shadows built-in. Verifies that when a user defines #permit_inject with custom body, their definition is used instead of the built-in version. """ source = """ @system pe=1, sm=0 ; User defines #permit_inject with a custom body #permit_inject *targets |> { &custom_node <| const, 99 } ; Invoke the user-defined macro &sink <| pass #permit_inject &sink """ graph = run_pipeline(source) node_names = list(graph.nodes.keys()) has_custom_node = any("&custom_node" in n for n in node_names) assert has_custom_node, ( f"Expected &custom_node from user macro shadowing built-in in {node_names}" ) custom_node = next(n for n in graph.nodes.values() if "&custom_node" in n.name) assert custom_node.const == 99, "User's shadowing macro should use const=99" def test_builtin_available_when_not_shadowed(self): """Built-in macro is used when user doesn't define it.""" source = """ @system pe=1, sm=0 &sink <| pass #reduce_2 add |> &sink """ graph = run_pipeline(source) assert graph is not None assert len(graph.errors) == 0 node_names = list(graph.nodes.keys()) has_r = any("&r" in n for n in node_names) assert has_r, f"Expected &r node from #reduce_2 expansion in {node_names}" class TestAC83_LoopCountedTopology: """AC8.3: #loop_counted macro defines correct loop topology.""" def test_loop_counted_invoked_expands_with_required_opcodes(self): """#loop_counted invoked through pipeline expands with correct opcodes. Verifies expanded graph contains nodes with: - add (counter arithmetic) - brgt (greater-than comparison) - inc (increment) """ source = """ @system pe=1, sm=0 &init <| const, 0 &limit <| const, 10 &body <| pass &exit <| pass #loop_counted &init, &limit |> body=&body, exit=&exit """ graph = run_pipeline(source) assert len(graph.errors) == 0 from cm_inst import ArithOp, RoutingOp opcode_names = set() for node in graph.nodes.values(): if node.opcode is not None: if isinstance(node.opcode, ArithOp): opcode_names.add(node.opcode.name.lower()) elif isinstance(node.opcode, RoutingOp): opcode_names.add(node.opcode.name.lower()) assert "add" in opcode_names, f"Expected 'add' opcode, got: {opcode_names}" assert "brgt" in opcode_names, f"Expected 'brgt' opcode, got: {opcode_names}" assert "inc" in opcode_names, f"Expected 'inc' opcode, got: {opcode_names}" def test_loop_counted_invoked_creates_feedback_topology(self): """#loop_counted invoked creates feedback arc from increment to counter. Verifies the graph has correct loop feedback topology: counter -> compare, compare -> inc, inc -> counter (feedback). """ source = """ @system pe=1, sm=0 &init <| const, 0 &limit <| const, 10 &body <| pass &exit <| pass #loop_counted &init, &limit |> body=&body, exit=&exit """ graph = run_pipeline(source) from cm_inst import ArithOp add_nodes = [ n for n, node in graph.nodes.items() if isinstance(node.opcode, ArithOp) and node.opcode == ArithOp.ADD ] inc_nodes = [ n for n, node in graph.nodes.items() if isinstance(node.opcode, ArithOp) and node.opcode == ArithOp.INC ] assert len(add_nodes) >= 1, ( f"Expected at least 1 'add' node, got {len(add_nodes)}" ) assert len(inc_nodes) >= 1, ( f"Expected at least 1 'inc' node, got {len(inc_nodes)}" ) edge_pairs = [(edge.source, edge.dest, edge.port) for edge in graph.edges] add_node = add_nodes[0] inc_node = inc_nodes[0] has_feedback = any( src == inc_node and dst == add_node and (port.name == "R" if hasattr(port, "name") else port == "R") for src, dst, port in edge_pairs ) assert has_feedback, ( f"Expected feedback edge from inc to counter, edges: {edge_pairs}" ) class TestAC84_EndToEnd: """AC8.4: Program using built-in macros assembles and runs in emulator.""" def test_builtin_reduce_2_invoked_assembles(self): """#reduce_2 add invocation assembles through the full pipeline.""" source = """ @system pe=1, sm=0 &out <| pass #reduce_2 add |> &out """ result = assemble(source) assert result is not None, "assemble() should succeed" assert len(result.pe_configs) > 0, "Should have PE configs" def test_builtin_reduce_2_runs_in_emulator(self): """#reduce_2 add runs through emulator without error.""" source = """ @system pe=1, sm=0 &out <| pass #reduce_2 add |> &out """ outputs = run_program_direct(source, until=500) assert 0 in outputs, "PE 0 should exist in outputs" def test_builtin_reduce_2_produces_output_when_wired(self): """#reduce_2 add produces correct sum when inputs and output are wired via @ret.""" source = """ @system pe=1, sm=0 &a <| const, 3 &b <| const, 4 &out <| pass #reduce_2 add |> &out &a |> #reduce_2_0.&r:L &b |> #reduce_2_0.&r:R """ outputs = run_program_direct(source, until=500) all_values = [] for pe_outputs in outputs.values(): all_values.extend([t.data for t in pe_outputs if hasattr(t, "data")]) assert 7 in all_values, f"Expected 3+4=7 in outputs, got {all_values}" def test_builtin_permit_inject_assembles_and_runs(self): """#permit_inject assembles and expands to const nodes.""" source = """ @system pe=1, sm=0 &source <| pass &sink <| pass #permit_inject &source |> &sink """ result = assemble(source) assert result is not None assert len(result.pe_configs) > 0 outputs = run_program_direct(source, until=500) assert 0 in outputs, "PE 0 should exist in outputs" def test_builtin_reduce_3_assembles(self): """#reduce_3 add invocation assembles through the full pipeline.""" source = """ @system pe=1, sm=0 &out <| pass #reduce_3 add |> &out """ result = assemble(source) assert result is not None, "assemble() should succeed" assert len(result.pe_configs) > 0, "Should have PE configs" def test_builtin_reduce_3_runs_in_emulator(self): """#reduce_3 add runs through emulator without error.""" source = """ @system pe=1, sm=0 &out <| pass #reduce_3 add |> &out """ outputs = run_program_direct(source, until=500) assert 0 in outputs, "PE 0 should exist in outputs" class TestBuiltinSyntaxValidation: """Verify that all built-in macros have valid syntax.""" def test_all_builtins_parse(self): """All built-in macros parse without syntax errors.""" from asm.builtins import BUILTIN_MACROS parser = _get_parser() tree = parser.parse(BUILTIN_MACROS) assert tree is not None def test_permit_inject_defined_in_builtins(self): """#permit_inject variadic macro is defined.""" from asm.builtins import BUILTIN_MACROS assert "#permit_inject *targets" in BUILTIN_MACROS, ( "Expected variadic #permit_inject definition in BUILTIN_MACROS" ) def test_reduce_variants_defined_in_builtins(self): """All #reduce_2 through #reduce_4 are defined with op parameter.""" from asm.builtins import BUILTIN_MACROS for i in range(2, 5): macro_name = f"#reduce_{i}" assert f"{macro_name} op" in BUILTIN_MACROS, ( f"Expected '{macro_name} op' definition in BUILTIN_MACROS" ) start = BUILTIN_MACROS.find(f"{macro_name} op") end = BUILTIN_MACROS.find("\n#", start + 1) if end == -1: end = len(BUILTIN_MACROS) macro_body = BUILTIN_MACROS[start:end] assert "${op}" in macro_body, ( f"{macro_name} should use '${{op}}' parameter for opcode" ) class TestBuiltinComposition: """Test composition of built-in macros with user-defined code.""" def test_builtins_dont_interfere_with_user_code(self): """Built-in macros don't cause errors in simple user code.""" source = """ @system pe=1, sm=0 &a <| const, 10 &b <| const, 20 &sum <| add &a |> &sum:L &b |> &sum:R &out <| pass &sum |> &out:L """ graph = run_pipeline(source) assert len(graph.errors) == 0, f"Unexpected errors: {graph.errors}" def test_user_macros_work_alongside_builtins(self): """User-defined macros work in same program as built-ins.""" source = """ @system pe=1, sm=0 #helper |> { &internal <| pass } #helper """ graph = parse_lower_expand(source) node_names = list(graph.nodes.keys()) has_helper = any("#helper_0" in n for n in node_names) assert has_helper, f"Expected helper_0 nodes in {node_names}" def test_builtin_and_user_macros_in_same_program(self): """A program using both built-in and user macros works correctly.""" source = """ @system pe=1, sm=0 #my_const |> { &val <| const, 77 } &sink <| pass &source <| add, #my_const #permit_inject &source |> &sink """ graph = run_pipeline(source) assert len(graph.errors) == 0 node_names = list(graph.nodes.keys()) has_val = any("&source" in n for n in node_names) has_p = any("&p" in n and "permit_inject" in n for n in node_names) assert has_val, f"Expected user macro &source in {node_names}" assert has_p, f"Expected builtin &p in {node_names}" class TestLineNumberOffset: """Test that line numbers in user code are correct despite built-in prepending.""" def test_builtin_line_offset_is_tracked(self): """Verify that builtin_line_offset is tracked on IRGraph.""" source = """ @system pe=1, sm=0 &const|pe0 <| const, 42 """ graph = run_pipeline(source) assert graph.builtin_line_offset > 0, "builtin_line_offset should be > 0" def test_builtin_offset_allows_error_line_adjustment(self): """Verify builtin_line_offset is available for error message adjustment.""" source = """ @system pe=1, sm=0 &c <| const, 42 """ graph = run_pipeline(source) assert hasattr(graph, "builtin_line_offset") assert isinstance(graph.builtin_line_offset, int)