"""Tests for monitor/snapshot.py state capture functionality.""" import pytest import simpy from cm_inst import Instruction, ArithOp, MemOp, Port, RoutingOp, OutputStyle from emu.network import build_topology from emu.types import PEConfig, SMConfig from monitor.snapshot import capture, PESnapshot, SMSnapshot, SMCellSnapshot from sm_mod import Presence from tokens import DyadToken, MonadToken class TestCaptureWithSinglePEAndSM: """Tests for capturing state of a system with one PE and one SM.""" def test_capture_empty_system(self): """Test capturing state of a completely empty system (no PEs, no SMs).""" env = simpy.Environment() system = build_topology(env, [], []) snapshot = capture(system) assert snapshot.sim_time == 0.0 assert snapshot.next_time == float('inf') assert snapshot.pes == {} assert snapshot.sms == {} def test_capture_system_with_seed_token(self): """Test capturing state after seed token injection.""" env = simpy.Environment() # Create one PE with a simple CONST instruction iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} pe_configs = [PEConfig(pe_id=0, iram=iram)] sm_configs = [] system = build_topology(env, pe_configs, sm_configs) # Inject a seed token seed = MonadToken(target=0, offset=0, act_id=0, data=0, inline=True) system.inject(seed) snapshot = capture(system) # Verify basic snapshot structure assert snapshot.sim_time == 0.0 assert 0 in snapshot.pes pe_snap = snapshot.pes[0] assert pe_snap.pe_id == 0 # Verify the seed token is in the input queue assert len(pe_snap.input_queue) == 1 assert pe_snap.input_queue[0] == seed # Verify IRAM is captured assert pe_snap.iram == iram # Verify frame-based state is initialized assert hasattr(pe_snap, 'frames') assert hasattr(pe_snap, 'tag_store') def test_capture_sm_with_written_cells(self): """Test capturing SM state with written cells.""" env = simpy.Environment() pe_configs = [] sm_configs = [SMConfig(sm_id=0, cell_count=256)] system = build_topology(env, pe_configs, sm_configs) # Manually set a cell to FULL sm = system.sms[0] sm.cells[10].pres = Presence.FULL sm.cells[10].data_l = 100 sm.cells[10].data_r = 200 snapshot = capture(system) assert 0 in snapshot.sms sm_snap = snapshot.sms[0] assert sm_snap.sm_id == 0 # Verify the written cell is captured assert 10 in sm_snap.cells cell_snap = sm_snap.cells[10] assert cell_snap.pres == Presence.FULL assert cell_snap.data_l == 100 assert cell_snap.data_r == 200 def test_capture_sm_empty_cells_not_included(self): """Test that empty, unwritten cells are not included in snapshot.""" env = simpy.Environment() pe_configs = [] sm_configs = [SMConfig(sm_id=0, cell_count=512)] system = build_topology(env, pe_configs, sm_configs) snapshot = capture(system) sm_snap = snapshot.sms[0] # Since no cells were written, cells dict should be empty assert sm_snap.cells == {} def test_capture_pe_frame_structure(self): """Test that PE frame state is captured with correct structure.""" env = simpy.Environment() iram = {0: Instruction(opcode=ArithOp.ADD, output=OutputStyle.INHERIT, has_const=False, dest_count=2, wide=False, fref=0)} pe_configs = [PEConfig(pe_id=0, iram=iram, frame_count=4, frame_slots=64)] sm_configs = [] system = build_topology(env, pe_configs, sm_configs) snapshot = capture(system) pe_snap = snapshot.pes[0] # frames should be a tuple of tuples (frozen dataclass) assert isinstance(pe_snap.frames, tuple) assert len(pe_snap.frames) == 4 # 4 frames for frame in pe_snap.frames: assert isinstance(frame, tuple) # Each frame has slots # tag_store should be dict mapping act_id to (frame_id, lane) tuple assert isinstance(pe_snap.tag_store, dict) # presence should be a tuple of tuples (frame_count x matchable_offsets) assert isinstance(pe_snap.presence, tuple) assert len(pe_snap.presence) == 4 # 4 frames # port_store should be a tuple of tuples assert isinstance(pe_snap.port_store, tuple) assert len(pe_snap.port_store) == 4 # 4 frames # free_frames should be a tuple of frame IDs assert isinstance(pe_snap.free_frames, tuple) def test_capture_pe_output_log(self): """Test that PE output_log is captured.""" env = simpy.Environment() iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} pe_configs = [PEConfig(pe_id=0, iram=iram)] sm_configs = [] system = build_topology(env, pe_configs, sm_configs) # Manually add a token to output_log to simulate emission output_token = MonadToken(target=0, offset=1, act_id=0, data=99, inline=True) system.pes[0].output_log.append(output_token) snapshot = capture(system) pe_snap = snapshot.pes[0] assert len(pe_snap.output_log) == 1 assert pe_snap.output_log[0] == output_token def test_capture_snapshot_is_frozen(self): """Test that captured snapshots are frozen (immutable).""" env = simpy.Environment() system = build_topology(env, [], []) snapshot = capture(system) # Verify the snapshot dataclass is frozen with pytest.raises(AttributeError): snapshot.sim_time = 1.0 with pytest.raises(AttributeError): snapshot.pes = {} def test_capture_pe_tag_store(self): """Test that PE tag_store is captured.""" env = simpy.Environment() iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} pe_configs = [PEConfig(pe_id=0, iram=iram, frame_count=8)] system = build_topology(env, pe_configs, []) # Verify tag_store is captured snapshot = capture(system) pe_snap = snapshot.pes[0] # tag_store should be a dict mapping act_id to (frame_id, lane) tuple assert isinstance(pe_snap.tag_store, dict) assert hasattr(pe_snap, 'frames') assert hasattr(pe_snap, 'free_frames') def test_capture_sm_deferred_read(self): """Test that SM deferred_read state is captured.""" env = simpy.Environment() sm_configs = [SMConfig(sm_id=0, cell_count=256)] system = build_topology(env, [], sm_configs) sm = system.sms[0] # Manually set up a deferred read (in real code this happens during simulation) return_token = MonadToken(target=0, offset=0, act_id=0, data=0, inline=False) from emu.types import DeferredRead sm.deferred_read = DeferredRead(cell_addr=42, return_route=return_token) snapshot = capture(system) sm_snap = snapshot.sms[0] assert sm_snap.deferred_read is not None assert sm_snap.deferred_read["cell_addr"] == 42 assert "return_route" in sm_snap.deferred_read def test_capture_sm_t0_store(self): """Test that SM T0 store is captured.""" env = simpy.Environment() sm_configs = [SMConfig(sm_id=0, cell_count=256, tier_boundary=256)] system = build_topology(env, [], sm_configs) sm = system.sms[0] # Add some int values to T0 store sm.t0_store.append(777) sm.t0_store.append(888) snapshot = capture(system) sm_snap = snapshot.sms[0] assert len(sm_snap.t0_store) == 2 assert sm_snap.t0_store[0] == 777 assert sm_snap.t0_store[1] == 888 def test_capture_multiple_pes_and_sms(self): """Test capturing state of a system with multiple PEs and SMs.""" env = simpy.Environment() pe_configs = [ PEConfig(pe_id=0, iram={0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}, frame_count=4), PEConfig(pe_id=1, iram={0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}, frame_count=4), ] sm_configs = [ SMConfig(sm_id=0, cell_count=256), SMConfig(sm_id=1, cell_count=256), ] system = build_topology(env, pe_configs, sm_configs) # Set up some state system.sms[0].cells[5].pres = Presence.FULL system.sms[0].cells[5].data_l = 50 system.sms[1].cells[10].pres = Presence.FULL system.sms[1].cells[10].data_l = 100 snapshot = capture(system) # Verify all PEs and SMs are captured assert len(snapshot.pes) == 2 assert len(snapshot.sms) == 2 # Verify PE frame state is captured assert len(snapshot.pes[0].frames) == 4 assert len(snapshot.pes[1].frames) == 4 assert isinstance(snapshot.pes[0].tag_store, dict) assert isinstance(snapshot.pes[1].tag_store, dict) # Verify SM state assert snapshot.sms[0].cells[5].pres == Presence.FULL assert snapshot.sms[0].cells[5].data_l == 50 assert snapshot.sms[1].cells[10].pres == Presence.FULL assert snapshot.sms[1].cells[10].data_l == 100 def test_ac72_pe_snapshot_lacks_matching_store(self): """AC7.2: PESnapshot must NOT have matching_store attribute. Frame-based PE no longer uses matching_store; matching happens via presence bits and frame slots. This test verifies the snapshot correctly excludes this legacy field. """ import simpy from emu.pe import ProcessingElement from emu.types import PEConfig from monitor.snapshot import capture env = simpy.Environment() config = PEConfig(frame_count=4, frame_slots=32, matchable_offsets=4) pe = ProcessingElement(env=env, pe_id=0, config=config) # Build a minimal system for capture from emu.network import build_topology system = build_topology(env, [config], [], fifo_capacity=10) snapshot = capture(system) # Check PE snapshot assert 0 in snapshot.pes pe_snap = snapshot.pes[0] # Verify NO matching_store attribute assert not hasattr(pe_snap, 'matching_store'), \ "PESnapshot should NOT have matching_store (legacy field)" def test_ac72_pe_snapshot_lacks_gen_counters(self): """AC7.2: PESnapshot must NOT have gen_counters attribute. Frame-based PE no longer uses generation counters; frame validity is managed via tag_store and frame allocation/deallocation. This test verifies the snapshot correctly excludes this legacy field. """ import simpy from emu.pe import ProcessingElement from emu.types import PEConfig from monitor.snapshot import capture env = simpy.Environment() config = PEConfig(frame_count=4, frame_slots=32, matchable_offsets=4) pe = ProcessingElement(env=env, pe_id=0, config=config) # Build a minimal system for capture from emu.network import build_topology system = build_topology(env, [config], [], fifo_capacity=10) snapshot = capture(system) # Check PE snapshot assert 0 in snapshot.pes pe_snap = snapshot.pes[0] # Verify NO gen_counters attribute assert not hasattr(pe_snap, 'gen_counters'), \ "PESnapshot should NOT have gen_counters (legacy field)" def test_capture_sim_time_and_next_time(self): """Test that sim_time and next_time are correctly captured.""" env = simpy.Environment() # Create an empty system (no PEs/SMs) system = build_topology(env, [], []) assert env.now == 0.0 initial_snapshot = capture(system) assert initial_snapshot.sim_time == 0.0 assert initial_snapshot.next_time == float('inf') # Create a PE and add it to trigger time advancement iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} pe_configs = [PEConfig(pe_id=0, iram=iram)] env2 = simpy.Environment() system2 = build_topology(env2, pe_configs, []) snapshot_with_pe = capture(system2) assert snapshot_with_pe.sim_time == 0.0 # PE process starts at time 0, so next_time is 0 assert snapshot_with_pe.next_time == 0.0