"""Tests for the Resource allocation pass. Tests verify: - or1-asm.AC6.1: Dyadic instructions are assigned IRAM offsets starting at 0 - or1-asm.AC6.2: Monadic/SM instructions are assigned IRAM offsets above dyadic range - or1-asm.AC6.3: Each function body on a PE gets a distinct context slot - or1-asm.AC6.4: All NameRef destinations resolve to ResolvedDest with correct Addr - or1-asm.AC6.5: Local edges (same PE) produce Addr with dest PE = source PE - or1-asm.AC6.6: Cross-PE edges produce Addr with dest PE = target PE - or1-asm.AC6.7: IRAM overflow produces error - or1-asm.AC6.8: Context slot overflow produces error """ from asm.allocate import allocate from asm.ir import ( IRGraph, IRNode, IREdge, SystemConfig, SourceLoc, NameRef, ResolvedDest, CallSite, ) from asm.errors import ErrorCategory, ErrorSeverity from cm_inst import ArithOp, MemOp, Port, RoutingOp class TestIRAMPacking: """AC6.1, AC6.2: IRAM offset assignment (dyadic first, then monadic).""" def test_mixed_dyadic_and_monadic(self): """PE with 2 dyadic (ADD, SUB) and 2 monadic (INC, CONST).""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "&sub": IRNode( name="&sub", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), "&inc": IRNode( name="&inc", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1), ), "&const_1": IRNode( name="&const_1", opcode=ArithOp.ADD, # Using const operand makes it monadic pe=0, const=1, loc=SourceLoc(4, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 # Dyadic nodes should get offsets 0, 1 add_node = result.nodes["&add"] sub_node = result.nodes["&sub"] assert add_node.iram_offset == 0 assert sub_node.iram_offset == 1 # Monadic nodes should get offsets starting at 2 inc_node = result.nodes["&inc"] const_node = result.nodes["&const_1"] assert const_node.iram_offset == 2 assert inc_node.iram_offset == 3 def test_only_monadic(self): """PE with only monadic instructions.""" nodes = { "&inc": IRNode( name="&inc", opcode=ArithOp.INC, pe=0, loc=SourceLoc(1, 1), ), "&dec": IRNode( name="&dec", opcode=ArithOp.DEC, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 inc_node = result.nodes["&inc"] dec_node = result.nodes["&dec"] assert inc_node.iram_offset == 0 assert dec_node.iram_offset == 1 def test_only_dyadic(self): """PE with only dyadic instructions.""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "&sub": IRNode( name="&sub", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["&add"] sub_node = result.nodes["&sub"] assert add_node.iram_offset == 0 assert sub_node.iram_offset == 1 class TestActivationIDs: """AC5.3: Activation ID assignment per function scope per PE.""" def test_single_function_scope(self): """PE with nodes from only $main.""" nodes = { "$main.&add": IRNode( name="$main.&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "$main.&sub": IRNode( name="$main.&sub", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["$main.&add"] sub_node = result.nodes["$main.&sub"] # Both should have act_id=0 (same function scope) assert add_node.act_id == 0 assert sub_node.act_id == 0 def test_multiple_function_scopes(self): """PE with nodes from $main and $helper.""" nodes = { "$main.&add": IRNode( name="$main.&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "$main.&sub": IRNode( name="$main.&sub", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), "$helper.&inc": IRNode( name="$helper.&inc", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["$main.&add"] helper_node = result.nodes["$helper.&inc"] # $main should get ctx=0, $helper should get ctx=1 assert add_node.act_id == 0 assert helper_node.act_id == 1 def test_toplevel_nodes(self): """Top-level nodes (no function scope) get ctx=0.""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "&sub": IRNode( name="&sub", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["&add"] sub_node = result.nodes["&sub"] assert add_node.act_id == 0 assert sub_node.act_id == 0 def test_multiple_functions_order_preserved(self): """Context slots assigned in order of first appearance.""" nodes = { "$fib.&a": IRNode( name="$fib.&a", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "$main.&b": IRNode( name="$main.&b", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), "$helper.&c": IRNode( name="$helper.&c", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1), ), "$fib.&d": IRNode( name="$fib.&d", opcode=ArithOp.DEC, pe=0, loc=SourceLoc(4, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 a_node = result.nodes["$fib.&a"] b_node = result.nodes["$main.&b"] c_node = result.nodes["$helper.&c"] d_node = result.nodes["$fib.&d"] # First appearance order: $fib (ctx=0), $main (ctx=1), $helper (ctx=2) assert a_node.act_id == 0 assert d_node.act_id == 0 # Same function as first assert b_node.act_id == 1 assert c_node.act_id == 2 class TestDestinationResolution: """AC6.4, AC6.5, AC6.6: NameRef resolution to Addr with local/cross-PE edges.""" def test_single_dest_l_local_edge(self): """Local edge: dest_l has FrameDest with target_pe matching source PE.""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, dest_l=NameRef(name="&sub", port=Port.L), loc=SourceLoc(1, 1), ), "&sub": IRNode( name="&sub", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), } edges = [ IREdge(source="&add", dest="&sub", port=Port.L, loc=SourceLoc(1, 5)) ] system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, edges=edges, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["&add"] sub_node = result.nodes["&sub"] # dest_l should be resolved with FrameDest assert isinstance(add_node.dest_l, ResolvedDest) assert add_node.dest_l.frame_dest is not None assert add_node.dest_l.frame_dest.offset == sub_node.iram_offset assert add_node.dest_l.frame_dest.target_pe == 0 # Same PE assert add_node.dest_l.frame_dest.act_id == sub_node.act_id def test_cross_pe_edge(self): """Cross-PE edge: FrameDest.target_pe = destination PE.""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, dest_l=NameRef(name="&sub", port=Port.L), loc=SourceLoc(1, 1), ), "&sub": IRNode( name="&sub", opcode=ArithOp.SUB, pe=1, loc=SourceLoc(2, 1), ), } edges = [ IREdge(source="&add", dest="&sub", port=Port.L, loc=SourceLoc(1, 5)) ] system = SystemConfig(pe_count=2, sm_count=1) graph = IRGraph(nodes, edges=edges, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["&add"] sub_node = result.nodes["&sub"] # dest_l should be resolved with FrameDest for destination PE assert isinstance(add_node.dest_l, ResolvedDest) assert add_node.dest_l.frame_dest is not None assert add_node.dest_l.frame_dest.offset == sub_node.iram_offset assert add_node.dest_l.frame_dest.target_pe == 1 # Destination PE def test_dual_destinations_with_source_ports(self): """Dual destinations with source port qualifiers (source_port L and R).""" nodes = { "&branch": IRNode( name="&branch", opcode=ArithOp.ADD, pe=0, dest_l=NameRef(name="&taken", port=Port.L), dest_r=NameRef(name="¬_taken", port=Port.L), loc=SourceLoc(1, 1), ), "&taken": IRNode( name="&taken", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), "¬_taken": IRNode( name="¬_taken", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1), ), } edges = [ IREdge( source="&branch", dest="&taken", port=Port.L, source_port=Port.L, loc=SourceLoc(1, 5), ), IREdge( source="&branch", dest="¬_taken", port=Port.L, source_port=Port.R, loc=SourceLoc(1, 15), ), ] system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, edges=edges, system=system) result = allocate(graph) assert len(result.errors) == 0 branch_node = result.nodes["&branch"] # Both dest_l and dest_r should be resolved assert isinstance(branch_node.dest_l, ResolvedDest) assert isinstance(branch_node.dest_r, ResolvedDest) def test_single_implicit_edge_maps_to_dest_l(self): """Single edge without source_port → dest_l.""" nodes = { "&a": IRNode( name="&a", opcode=ArithOp.ADD, pe=0, dest_l=NameRef(name="&b", port=Port.L), loc=SourceLoc(1, 1), ), "&b": IRNode( name="&b", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), } edges = [ IREdge(source="&a", dest="&b", port=Port.L, loc=SourceLoc(1, 5)) ] system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, edges=edges, system=system) result = allocate(graph) assert len(result.errors) == 0 a_node = result.nodes["&a"] assert isinstance(a_node.dest_l, ResolvedDest) class TestOverflow: """AC6.7, AC6.8: IRAM and context slot overflow errors.""" def test_iram_overflow_default_capacity(self): """PE with 65 nodes exceeds default 64 IRAM slots.""" nodes = {} for i in range(65): nodes[f"&node_{i}"] = IRNode( name=f"&node_{i}", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(i + 1, 1), ) system = SystemConfig(pe_count=1, sm_count=1, iram_capacity=64) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) > 0 error = result.errors[0] assert error.category == ErrorCategory.RESOURCE assert "IRAM" in error.message or "overflow" in error.message.lower() def test_iram_overflow_custom_capacity(self): """PE with 9 nodes exceeds custom IRAM limit of 8.""" nodes = {} for i in range(9): nodes[f"&node_{i}"] = IRNode( name=f"&node_{i}", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(i + 1, 1), ) system = SystemConfig(pe_count=1, sm_count=1, iram_capacity=8) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) > 0 error = result.errors[0] assert error.category == ErrorCategory.RESOURCE def test_act_id_overflow_default_capacity(self): """PE with 5 function bodies exceeds default 8 frame_count (but uses custom 4).""" nodes = { "$main.&a": IRNode( name="$main.&a", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "$fib.&b": IRNode( name="$fib.&b", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), "$helper.&c": IRNode( name="$helper.&c", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1), ), "$util.&d": IRNode( name="$util.&d", opcode=ArithOp.DEC, pe=0, loc=SourceLoc(4, 1), ), "$extra.&e": IRNode( name="$extra.&e", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(5, 1), ), } system = SystemConfig(pe_count=1, sm_count=1, frame_count=4) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) > 0 error = result.errors[0] assert error.category == ErrorCategory.FRAME assert "activation" in error.message.lower() or "frame" in error.message.lower() def test_act_id_overflow_custom_capacity(self): """PE with 3 function bodies exceeds custom frame_count of 2.""" nodes = { "$main.&a": IRNode( name="$main.&a", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "$helper.&b": IRNode( name="$helper.&b", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1), ), "$util.&c": IRNode( name="$util.&c", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1), ), } system = SystemConfig(pe_count=1, sm_count=1, frame_count=2) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) > 0 error = result.errors[0] assert error.category == ErrorCategory.FRAME class TestMultiplePEs: """Multiple PEs with independent allocation.""" def test_separate_pe_allocations(self): """Different PEs get independent IRAM offsets.""" nodes = { "&a0": IRNode(name="&a0", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1)), "&b0": IRNode(name="&b0", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1)), "&a1": IRNode(name="&a1", opcode=ArithOp.ADD, pe=1, loc=SourceLoc(3, 1)), "&b1": IRNode(name="&b1", opcode=ArithOp.SUB, pe=1, loc=SourceLoc(4, 1)), } system = SystemConfig(pe_count=2, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 # Each PE should have its own offset space a0 = result.nodes["&a0"] b0 = result.nodes["&b0"] a1 = result.nodes["&a1"] b1 = result.nodes["&b1"] # PE0 nodes start at 0 assert a0.iram_offset == 0 assert b0.iram_offset == 1 # PE1 nodes also start at 0 assert a1.iram_offset == 0 assert b1.iram_offset == 1 class TestMemoryOps: """Memory operations (SM instructions) are monadic.""" def test_read_is_monadic(self): """READ instruction is monadic.""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "&read": IRNode( name="&read", opcode=MemOp.READ, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["&add"] read_node = result.nodes["&read"] # Dyadic ADD gets offset 0, monadic READ gets offset 1 assert add_node.iram_offset == 0 assert read_node.iram_offset == 1 def test_write_with_const_is_monadic(self): """WRITE with const is monadic.""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "&write": IRNode( name="&write", opcode=MemOp.WRITE, const=0x10, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["&add"] write_node = result.nodes["&write"] # Dyadic ADD gets offset 0, monadic WRITE gets offset 1 assert add_node.iram_offset == 0 assert write_node.iram_offset == 1 def test_write_without_const_is_dyadic(self): """WRITE without const is dyadic.""" nodes = { "&add": IRNode( name="&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "&write": IRNode( name="&write", opcode=MemOp.WRITE, const=None, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["&add"] write_node = result.nodes["&write"] # Both are dyadic, so ADD gets 0, WRITE gets 1 assert add_node.iram_offset == 0 assert write_node.iram_offset == 1 class TestErrorValidation: """Test edge-to-destination validation errors.""" def test_more_than_two_outgoing_edges(self): """Node with 3+ outgoing edges produces error.""" nodes = { "&a": IRNode( name="&a", opcode=ArithOp.ADD, pe=0, dest_l=NameRef(name="&b", port=Port.L), dest_r=NameRef(name="&c", port=Port.L), loc=SourceLoc(1, 1), ), "&b": IRNode(name="&b", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1)), "&c": IRNode(name="&c", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1)), "&d": IRNode(name="&d", opcode=ArithOp.DEC, pe=0, loc=SourceLoc(4, 1)), } edges = [ IREdge(source="&a", dest="&b", port=Port.L, loc=SourceLoc(1, 5)), IREdge(source="&a", dest="&c", port=Port.L, loc=SourceLoc(1, 10)), IREdge(source="&a", dest="&d", port=Port.L, loc=SourceLoc(1, 15)), ] system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, edges=edges, system=system) result = allocate(graph) # Should have error about too many edges assert len(result.errors) > 0 def test_conflicting_source_ports(self): """Two edges with same source_port produces error.""" nodes = { "&a": IRNode( name="&a", opcode=ArithOp.ADD, pe=0, dest_l=NameRef(name="&b", port=Port.L), dest_r=NameRef(name="&c", port=Port.L), loc=SourceLoc(1, 1), ), "&b": IRNode(name="&b", opcode=ArithOp.SUB, pe=0, loc=SourceLoc(2, 1)), "&c": IRNode(name="&c", opcode=ArithOp.INC, pe=0, loc=SourceLoc(3, 1)), } edges = [ IREdge(source="&a", dest="&b", port=Port.L, source_port=Port.L), IREdge(source="&a", dest="&c", port=Port.L, source_port=Port.L), ] system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, edges=edges, system=system) result = allocate(graph) # Should have error about conflicting ports assert len(result.errors) > 0 class TestSMReturnRoutes: """SM read instructions use dest_l as return route.""" def test_read_with_return_address(self): """READ instruction's dest_l is resolved as FrameDest return address.""" nodes = { "&read": IRNode( name="&read", opcode=MemOp.READ, pe=0, dest_l=NameRef(name="&next", port=Port.L), loc=SourceLoc(1, 1), ), "&next": IRNode( name="&next", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(2, 1), ), } edges = [ IREdge(source="&read", dest="&next", port=Port.L) ] system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, edges=edges, system=system) result = allocate(graph) assert len(result.errors) == 0 read_node = result.nodes["&read"] next_node = result.nodes["&next"] # dest_l should be resolved with FrameDest assert isinstance(read_node.dest_l, ResolvedDest) assert read_node.dest_l.frame_dest is not None assert read_node.dest_l.frame_dest.offset == next_node.iram_offset class TestMacroScopeHandling: """Task 1: Macro scope segments are ignored during context allocation.""" def test_extract_function_scope_with_macro_segment(self): """Macro segment (#loop_0) is stripped from node name.""" from asm.allocate import _extract_function_scope # Macro segment in middle of qualified name assert _extract_function_scope("$main.#loop_0.&counter") == "$main" def test_extract_function_scope_macro_at_root(self): """Macro at root scope yields empty function scope.""" from asm.allocate import _extract_function_scope assert _extract_function_scope("#loop_0.&counter") == "" def test_extract_function_scope_multiple_macro_segments(self): """Multiple macro segments are all stripped.""" from asm.allocate import _extract_function_scope assert _extract_function_scope("$func.#outer_1.#inner_2.&label") == "$func" def test_extract_function_scope_macro_only(self): """Node with only macro segments yields root scope.""" from asm.allocate import _extract_function_scope # All segments are macro segments assert _extract_function_scope("#macro1.#macro2") == "" def test_macro_scope_nodes_same_ctx_as_function(self): """Nodes with macro scope segments get same ctx as function scope nodes.""" nodes = { "$main.&add": IRNode( name="$main.&add", opcode=ArithOp.ADD, pe=0, loc=SourceLoc(1, 1), ), "$main.#loop_0.&counter": IRNode( name="$main.#loop_0.&counter", opcode=ArithOp.INC, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 add_node = result.nodes["$main.&add"] counter_node = result.nodes["$main.#loop_0.&counter"] # Both should have ctx=0 (same function scope, macro segment ignored) assert add_node.act_id == 0 assert counter_node.act_id == 0 def test_macro_scope_distinguishes_from_different_functions(self): """Macro segments don't prevent distinguishing different functions.""" nodes = { "$main.#loop_0.&counter": IRNode( name="$main.#loop_0.&counter", opcode=ArithOp.INC, pe=0, loc=SourceLoc(1, 1), ), "$helper.#loop_0.&counter": IRNode( name="$helper.#loop_0.&counter", opcode=ArithOp.DEC, pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 main_counter = result.nodes["$main.#loop_0.&counter"] helper_counter = result.nodes["$helper.#loop_0.&counter"] # Different functions get different ctx values assert main_counter.act_id == 0 assert helper_counter.act_id == 1 def test_macro_scope_with_root_and_function(self): """Macro scope at root and function scope get different ctx slots.""" nodes = { "#loop_0.&counter": IRNode( name="#loop_0.&counter", opcode=ArithOp.ADD, # dyadic, like $main.&add pe=0, loc=SourceLoc(1, 1), ), "$main.&add": IRNode( name="$main.&add", opcode=ArithOp.SUB, # dyadic pe=0, loc=SourceLoc(2, 1), ), } system = SystemConfig(pe_count=1, sm_count=1) graph = IRGraph(nodes, system=system) result = allocate(graph) assert len(result.errors) == 0 macro_counter = result.nodes["#loop_0.&counter"] main_add = result.nodes["$main.&add"] # Macro scope at root extracts to "" (root scope) # $main extracts to "$main" (function scope) # First appearance gets ctx=0, next gets ctx=1 # macro_counter appears first in the PE's node list after IRAM packing assert macro_counter.act_id == 0 # First scope seen assert main_add.act_id == 1 # Second scope seen class TestPerCallSiteAllocation: """Task 2: Per-call-site context allocation and budget warnings.""" def test_two_call_sites_to_same_function_get_different_ctx(self): """Two call sites to same function get two distinct ctx values.""" nodes = { "&trampoline_1": IRNode( name="&trampoline_1", opcode=RoutingOp.PASS, pe=0, loc=SourceLoc(1, 1), ), "&trampoline_2": IRNode( name="&trampoline_2", opcode=RoutingOp.PASS, pe=0, loc=SourceLoc(2, 1), ), "$func.&add": IRNode( name="$func.&add", opcode=ArithOp.ADD, pe=1, loc=SourceLoc(3, 1), ), "&free_ctx_1": IRNode( name="&free_ctx_1", opcode=RoutingOp.FREE_FRAME, pe=0, loc=SourceLoc(4, 1), ), "&free_ctx_2": IRNode( name="&free_ctx_2", opcode=RoutingOp.FREE_FRAME, pe=0, loc=SourceLoc(5, 1), ), } call_sites = [ CallSite( func_name="$func", call_id=1, trampoline_nodes=("&trampoline_1",), free_frame_nodes=("&free_ctx_1",), loc=SourceLoc(1, 1), ), CallSite( func_name="$func", call_id=2, trampoline_nodes=("&trampoline_2",), free_frame_nodes=("&free_ctx_2",), loc=SourceLoc(2, 1), ), ] system = SystemConfig(pe_count=2, sm_count=1) graph = IRGraph(nodes, system=system, call_sites=call_sites) result = allocate(graph) assert len(result.errors) == 0 # Each call site gets its own ctx slot on PE1 (where $func lives) # Trampoline and free_ctx on PE0 should also get the call site's ctx trampoline_1 = result.nodes["&trampoline_1"] trampoline_2 = result.nodes["&trampoline_2"] free_ctx_1 = result.nodes["&free_ctx_1"] free_ctx_2 = result.nodes["&free_ctx_2"] func_node = result.nodes["$func.&add"] # Both trampoline nodes should have ctx values assigned (per-call-site) # They should be different assert trampoline_1.act_id is not None assert trampoline_2.act_id is not None assert trampoline_1.act_id != trampoline_2.act_id def test_context_overflow_produces_resource_error(self): """Activation ID overflow produces FRAME error with per-PE breakdown.""" # Create 20 call sites on PE0 (8 frame_count available by default) nodes = {} call_sites = [] for i in range(20): node_name = f"&trampoline_{i}" nodes[node_name] = IRNode( name=node_name, opcode=RoutingOp.PASS, pe=0, loc=SourceLoc(i+1, 1), ) call_sites.append(CallSite( func_name=f"$func_{i}", call_id=i, trampoline_nodes=(node_name,), free_frame_nodes=(), loc=SourceLoc(i+1, 1), )) system = SystemConfig(pe_count=1, sm_count=1, frame_count=8) graph = IRGraph(nodes, system=system, call_sites=call_sites) result = allocate(graph) # Should have FRAME errors for activation ID overflow frame_errors = [e for e in result.errors if e.category == ErrorCategory.FRAME] assert len(frame_errors) > 0 # Error should mention overflow or exhaustion error_msg = " ".join(e.message for e in frame_errors) assert "overflow" in error_msg.lower() or "exceed" in error_msg.lower() or "exhaustion" in error_msg.lower() def test_budget_warning_at_75_percent(self): """Budget warning emitted at 75% utilisation.""" # Create 13 call sites on PE0 (16 ctx slots, so 13/16 = 81% > 75%) nodes = {} call_sites = [] for i in range(13): node_name = f"&trampoline_{i}" nodes[node_name] = IRNode( name=node_name, opcode=RoutingOp.PASS, pe=0, loc=SourceLoc(i+1, 1), ) call_sites.append(CallSite( func_name=f"$func_{i}", call_id=i, trampoline_nodes=(node_name,), free_frame_nodes=(), loc=SourceLoc(i+1, 1), )) system = SystemConfig(pe_count=1, sm_count=1, frame_count=16) graph = IRGraph(nodes, system=system, call_sites=call_sites) result = allocate(graph) # Should succeed but have WARNING errors for budget assert len(result.errors) > 0 warnings = [e for e in result.errors if e.severity == ErrorSeverity.WARNING] assert len(warnings) > 0 warning_msg = " ".join(w.message for w in warnings) # Check for actual percentage (87% with 14 slots used out of 16) # and "context slots used" pattern assert ("87%" in warning_msg or "context slots used" in warning_msg.lower()) and \ ("PE0" in warning_msg or "context" in warning_msg.lower())