OR-1 dataflow CPU sketch
at main 350 lines 13 kB view raw
1"""Tests for monitor/snapshot.py state capture functionality.""" 2 3import pytest 4import simpy 5 6from cm_inst import Instruction, ArithOp, MemOp, Port, RoutingOp, OutputStyle 7from emu.network import build_topology 8from emu.types import PEConfig, SMConfig 9from monitor.snapshot import capture, PESnapshot, SMSnapshot, SMCellSnapshot 10from sm_mod import Presence 11from tokens import DyadToken, MonadToken 12 13 14class TestCaptureWithSinglePEAndSM: 15 """Tests for capturing state of a system with one PE and one SM.""" 16 17 def test_capture_empty_system(self): 18 """Test capturing state of a completely empty system (no PEs, no SMs).""" 19 env = simpy.Environment() 20 system = build_topology(env, [], []) 21 22 snapshot = capture(system) 23 24 assert snapshot.sim_time == 0.0 25 assert snapshot.next_time == float('inf') 26 assert snapshot.pes == {} 27 assert snapshot.sms == {} 28 29 def test_capture_system_with_seed_token(self): 30 """Test capturing state after seed token injection.""" 31 env = simpy.Environment() 32 33 # Create one PE with a simple CONST instruction 34 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} 35 pe_configs = [PEConfig(pe_id=0, iram=iram)] 36 sm_configs = [] 37 38 system = build_topology(env, pe_configs, sm_configs) 39 40 # Inject a seed token 41 seed = MonadToken(target=0, offset=0, act_id=0, data=0, inline=True) 42 system.inject(seed) 43 44 snapshot = capture(system) 45 46 # Verify basic snapshot structure 47 assert snapshot.sim_time == 0.0 48 assert 0 in snapshot.pes 49 pe_snap = snapshot.pes[0] 50 assert pe_snap.pe_id == 0 51 52 # Verify the seed token is in the input queue 53 assert len(pe_snap.input_queue) == 1 54 assert pe_snap.input_queue[0] == seed 55 56 # Verify IRAM is captured 57 assert pe_snap.iram == iram 58 59 # Verify frame-based state is initialized 60 assert hasattr(pe_snap, 'frames') 61 assert hasattr(pe_snap, 'tag_store') 62 63 def test_capture_sm_with_written_cells(self): 64 """Test capturing SM state with written cells.""" 65 env = simpy.Environment() 66 67 pe_configs = [] 68 sm_configs = [SMConfig(sm_id=0, cell_count=256)] 69 70 system = build_topology(env, pe_configs, sm_configs) 71 72 # Manually set a cell to FULL 73 sm = system.sms[0] 74 sm.cells[10].pres = Presence.FULL 75 sm.cells[10].data_l = 100 76 sm.cells[10].data_r = 200 77 78 snapshot = capture(system) 79 80 assert 0 in snapshot.sms 81 sm_snap = snapshot.sms[0] 82 assert sm_snap.sm_id == 0 83 84 # Verify the written cell is captured 85 assert 10 in sm_snap.cells 86 cell_snap = sm_snap.cells[10] 87 assert cell_snap.pres == Presence.FULL 88 assert cell_snap.data_l == 100 89 assert cell_snap.data_r == 200 90 91 def test_capture_sm_empty_cells_not_included(self): 92 """Test that empty, unwritten cells are not included in snapshot.""" 93 env = simpy.Environment() 94 95 pe_configs = [] 96 sm_configs = [SMConfig(sm_id=0, cell_count=512)] 97 98 system = build_topology(env, pe_configs, sm_configs) 99 100 snapshot = capture(system) 101 102 sm_snap = snapshot.sms[0] 103 # Since no cells were written, cells dict should be empty 104 assert sm_snap.cells == {} 105 106 def test_capture_pe_frame_structure(self): 107 """Test that PE frame state is captured with correct structure.""" 108 env = simpy.Environment() 109 110 iram = {0: Instruction(opcode=ArithOp.ADD, output=OutputStyle.INHERIT, has_const=False, dest_count=2, wide=False, fref=0)} 111 pe_configs = [PEConfig(pe_id=0, iram=iram, frame_count=4, frame_slots=64)] 112 sm_configs = [] 113 114 system = build_topology(env, pe_configs, sm_configs) 115 116 snapshot = capture(system) 117 118 pe_snap = snapshot.pes[0] 119 # frames should be a tuple of tuples (frozen dataclass) 120 assert isinstance(pe_snap.frames, tuple) 121 assert len(pe_snap.frames) == 4 # 4 frames 122 for frame in pe_snap.frames: 123 assert isinstance(frame, tuple) 124 # Each frame has slots 125 126 # tag_store should be dict mapping act_id to (frame_id, lane) tuple 127 assert isinstance(pe_snap.tag_store, dict) 128 129 # presence should be a tuple of tuples (frame_count x matchable_offsets) 130 assert isinstance(pe_snap.presence, tuple) 131 assert len(pe_snap.presence) == 4 # 4 frames 132 133 # port_store should be a tuple of tuples 134 assert isinstance(pe_snap.port_store, tuple) 135 assert len(pe_snap.port_store) == 4 # 4 frames 136 137 # free_frames should be a tuple of frame IDs 138 assert isinstance(pe_snap.free_frames, tuple) 139 140 def test_capture_pe_output_log(self): 141 """Test that PE output_log is captured.""" 142 env = simpy.Environment() 143 144 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} 145 pe_configs = [PEConfig(pe_id=0, iram=iram)] 146 sm_configs = [] 147 148 system = build_topology(env, pe_configs, sm_configs) 149 150 # Manually add a token to output_log to simulate emission 151 output_token = MonadToken(target=0, offset=1, act_id=0, data=99, inline=True) 152 system.pes[0].output_log.append(output_token) 153 154 snapshot = capture(system) 155 156 pe_snap = snapshot.pes[0] 157 assert len(pe_snap.output_log) == 1 158 assert pe_snap.output_log[0] == output_token 159 160 def test_capture_snapshot_is_frozen(self): 161 """Test that captured snapshots are frozen (immutable).""" 162 env = simpy.Environment() 163 system = build_topology(env, [], []) 164 165 snapshot = capture(system) 166 167 # Verify the snapshot dataclass is frozen 168 with pytest.raises(AttributeError): 169 snapshot.sim_time = 1.0 170 171 with pytest.raises(AttributeError): 172 snapshot.pes = {} 173 174 def test_capture_pe_tag_store(self): 175 """Test that PE tag_store is captured.""" 176 env = simpy.Environment() 177 178 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} 179 pe_configs = [PEConfig(pe_id=0, iram=iram, frame_count=8)] 180 system = build_topology(env, pe_configs, []) 181 182 # Verify tag_store is captured 183 snapshot = capture(system) 184 185 pe_snap = snapshot.pes[0] 186 # tag_store should be a dict mapping act_id to (frame_id, lane) tuple 187 assert isinstance(pe_snap.tag_store, dict) 188 assert hasattr(pe_snap, 'frames') 189 assert hasattr(pe_snap, 'free_frames') 190 191 def test_capture_sm_deferred_read(self): 192 """Test that SM deferred_read state is captured.""" 193 env = simpy.Environment() 194 195 sm_configs = [SMConfig(sm_id=0, cell_count=256)] 196 system = build_topology(env, [], sm_configs) 197 198 sm = system.sms[0] 199 # Manually set up a deferred read (in real code this happens during simulation) 200 return_token = MonadToken(target=0, offset=0, act_id=0, data=0, inline=False) 201 from emu.types import DeferredRead 202 sm.deferred_read = DeferredRead(cell_addr=42, return_route=return_token) 203 204 snapshot = capture(system) 205 206 sm_snap = snapshot.sms[0] 207 assert sm_snap.deferred_read is not None 208 assert sm_snap.deferred_read["cell_addr"] == 42 209 assert "return_route" in sm_snap.deferred_read 210 211 def test_capture_sm_t0_store(self): 212 """Test that SM T0 store is captured.""" 213 env = simpy.Environment() 214 215 sm_configs = [SMConfig(sm_id=0, cell_count=256, tier_boundary=256)] 216 system = build_topology(env, [], sm_configs) 217 218 sm = system.sms[0] 219 # Add some int values to T0 store 220 sm.t0_store.append(777) 221 sm.t0_store.append(888) 222 223 snapshot = capture(system) 224 225 sm_snap = snapshot.sms[0] 226 assert len(sm_snap.t0_store) == 2 227 assert sm_snap.t0_store[0] == 777 228 assert sm_snap.t0_store[1] == 888 229 230 def test_capture_multiple_pes_and_sms(self): 231 """Test capturing state of a system with multiple PEs and SMs.""" 232 env = simpy.Environment() 233 234 pe_configs = [ 235 PEConfig(pe_id=0, iram={0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}, frame_count=4), 236 PEConfig(pe_id=1, iram={0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}, frame_count=4), 237 ] 238 sm_configs = [ 239 SMConfig(sm_id=0, cell_count=256), 240 SMConfig(sm_id=1, cell_count=256), 241 ] 242 243 system = build_topology(env, pe_configs, sm_configs) 244 245 # Set up some state 246 system.sms[0].cells[5].pres = Presence.FULL 247 system.sms[0].cells[5].data_l = 50 248 system.sms[1].cells[10].pres = Presence.FULL 249 system.sms[1].cells[10].data_l = 100 250 251 snapshot = capture(system) 252 253 # Verify all PEs and SMs are captured 254 assert len(snapshot.pes) == 2 255 assert len(snapshot.sms) == 2 256 257 # Verify PE frame state is captured 258 assert len(snapshot.pes[0].frames) == 4 259 assert len(snapshot.pes[1].frames) == 4 260 assert isinstance(snapshot.pes[0].tag_store, dict) 261 assert isinstance(snapshot.pes[1].tag_store, dict) 262 263 # Verify SM state 264 assert snapshot.sms[0].cells[5].pres == Presence.FULL 265 assert snapshot.sms[0].cells[5].data_l == 50 266 assert snapshot.sms[1].cells[10].pres == Presence.FULL 267 assert snapshot.sms[1].cells[10].data_l == 100 268 269 def test_ac72_pe_snapshot_lacks_matching_store(self): 270 """AC7.2: PESnapshot must NOT have matching_store attribute. 271 272 Frame-based PE no longer uses matching_store; matching happens via 273 presence bits and frame slots. This test verifies the snapshot 274 correctly excludes this legacy field. 275 """ 276 import simpy 277 from emu.pe import ProcessingElement 278 from emu.types import PEConfig 279 from monitor.snapshot import capture 280 281 env = simpy.Environment() 282 config = PEConfig(frame_count=4, frame_slots=32, matchable_offsets=4) 283 pe = ProcessingElement(env=env, pe_id=0, config=config) 284 285 # Build a minimal system for capture 286 from emu.network import build_topology 287 system = build_topology(env, [config], [], fifo_capacity=10) 288 289 snapshot = capture(system) 290 291 # Check PE snapshot 292 assert 0 in snapshot.pes 293 pe_snap = snapshot.pes[0] 294 295 # Verify NO matching_store attribute 296 assert not hasattr(pe_snap, 'matching_store'), \ 297 "PESnapshot should NOT have matching_store (legacy field)" 298 299 def test_ac72_pe_snapshot_lacks_gen_counters(self): 300 """AC7.2: PESnapshot must NOT have gen_counters attribute. 301 302 Frame-based PE no longer uses generation counters; frame validity is 303 managed via tag_store and frame allocation/deallocation. This test 304 verifies the snapshot correctly excludes this legacy field. 305 """ 306 import simpy 307 from emu.pe import ProcessingElement 308 from emu.types import PEConfig 309 from monitor.snapshot import capture 310 311 env = simpy.Environment() 312 config = PEConfig(frame_count=4, frame_slots=32, matchable_offsets=4) 313 pe = ProcessingElement(env=env, pe_id=0, config=config) 314 315 # Build a minimal system for capture 316 from emu.network import build_topology 317 system = build_topology(env, [config], [], fifo_capacity=10) 318 319 snapshot = capture(system) 320 321 # Check PE snapshot 322 assert 0 in snapshot.pes 323 pe_snap = snapshot.pes[0] 324 325 # Verify NO gen_counters attribute 326 assert not hasattr(pe_snap, 'gen_counters'), \ 327 "PESnapshot should NOT have gen_counters (legacy field)" 328 329 def test_capture_sim_time_and_next_time(self): 330 """Test that sim_time and next_time are correctly captured.""" 331 env = simpy.Environment() 332 333 # Create an empty system (no PEs/SMs) 334 system = build_topology(env, [], []) 335 336 assert env.now == 0.0 337 initial_snapshot = capture(system) 338 assert initial_snapshot.sim_time == 0.0 339 assert initial_snapshot.next_time == float('inf') 340 341 # Create a PE and add it to trigger time advancement 342 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)} 343 pe_configs = [PEConfig(pe_id=0, iram=iram)] 344 env2 = simpy.Environment() 345 system2 = build_topology(env2, pe_configs, []) 346 347 snapshot_with_pe = capture(system2) 348 assert snapshot_with_pe.sim_time == 0.0 349 # PE process starts at time 0, so next_time is 0 350 assert snapshot_with_pe.next_time == 0.0