OR-1 dataflow CPU sketch
1"""Tests for monitor/snapshot.py state capture functionality."""
2
3import pytest
4import simpy
5
6from cm_inst import Instruction, ArithOp, MemOp, Port, RoutingOp, OutputStyle
7from emu.network import build_topology
8from emu.types import PEConfig, SMConfig
9from monitor.snapshot import capture, PESnapshot, SMSnapshot, SMCellSnapshot
10from sm_mod import Presence
11from tokens import DyadToken, MonadToken
12
13
14class TestCaptureWithSinglePEAndSM:
15 """Tests for capturing state of a system with one PE and one SM."""
16
17 def test_capture_empty_system(self):
18 """Test capturing state of a completely empty system (no PEs, no SMs)."""
19 env = simpy.Environment()
20 system = build_topology(env, [], [])
21
22 snapshot = capture(system)
23
24 assert snapshot.sim_time == 0.0
25 assert snapshot.next_time == float('inf')
26 assert snapshot.pes == {}
27 assert snapshot.sms == {}
28
29 def test_capture_system_with_seed_token(self):
30 """Test capturing state after seed token injection."""
31 env = simpy.Environment()
32
33 # Create one PE with a simple CONST instruction
34 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}
35 pe_configs = [PEConfig(pe_id=0, iram=iram)]
36 sm_configs = []
37
38 system = build_topology(env, pe_configs, sm_configs)
39
40 # Inject a seed token
41 seed = MonadToken(target=0, offset=0, act_id=0, data=0, inline=True)
42 system.inject(seed)
43
44 snapshot = capture(system)
45
46 # Verify basic snapshot structure
47 assert snapshot.sim_time == 0.0
48 assert 0 in snapshot.pes
49 pe_snap = snapshot.pes[0]
50 assert pe_snap.pe_id == 0
51
52 # Verify the seed token is in the input queue
53 assert len(pe_snap.input_queue) == 1
54 assert pe_snap.input_queue[0] == seed
55
56 # Verify IRAM is captured
57 assert pe_snap.iram == iram
58
59 # Verify frame-based state is initialized
60 assert hasattr(pe_snap, 'frames')
61 assert hasattr(pe_snap, 'tag_store')
62
63 def test_capture_sm_with_written_cells(self):
64 """Test capturing SM state with written cells."""
65 env = simpy.Environment()
66
67 pe_configs = []
68 sm_configs = [SMConfig(sm_id=0, cell_count=256)]
69
70 system = build_topology(env, pe_configs, sm_configs)
71
72 # Manually set a cell to FULL
73 sm = system.sms[0]
74 sm.cells[10].pres = Presence.FULL
75 sm.cells[10].data_l = 100
76 sm.cells[10].data_r = 200
77
78 snapshot = capture(system)
79
80 assert 0 in snapshot.sms
81 sm_snap = snapshot.sms[0]
82 assert sm_snap.sm_id == 0
83
84 # Verify the written cell is captured
85 assert 10 in sm_snap.cells
86 cell_snap = sm_snap.cells[10]
87 assert cell_snap.pres == Presence.FULL
88 assert cell_snap.data_l == 100
89 assert cell_snap.data_r == 200
90
91 def test_capture_sm_empty_cells_not_included(self):
92 """Test that empty, unwritten cells are not included in snapshot."""
93 env = simpy.Environment()
94
95 pe_configs = []
96 sm_configs = [SMConfig(sm_id=0, cell_count=512)]
97
98 system = build_topology(env, pe_configs, sm_configs)
99
100 snapshot = capture(system)
101
102 sm_snap = snapshot.sms[0]
103 # Since no cells were written, cells dict should be empty
104 assert sm_snap.cells == {}
105
106 def test_capture_pe_frame_structure(self):
107 """Test that PE frame state is captured with correct structure."""
108 env = simpy.Environment()
109
110 iram = {0: Instruction(opcode=ArithOp.ADD, output=OutputStyle.INHERIT, has_const=False, dest_count=2, wide=False, fref=0)}
111 pe_configs = [PEConfig(pe_id=0, iram=iram, frame_count=4, frame_slots=64)]
112 sm_configs = []
113
114 system = build_topology(env, pe_configs, sm_configs)
115
116 snapshot = capture(system)
117
118 pe_snap = snapshot.pes[0]
119 # frames should be a tuple of tuples (frozen dataclass)
120 assert isinstance(pe_snap.frames, tuple)
121 assert len(pe_snap.frames) == 4 # 4 frames
122 for frame in pe_snap.frames:
123 assert isinstance(frame, tuple)
124 # Each frame has slots
125
126 # tag_store should be dict mapping act_id to (frame_id, lane) tuple
127 assert isinstance(pe_snap.tag_store, dict)
128
129 # presence should be a tuple of tuples (frame_count x matchable_offsets)
130 assert isinstance(pe_snap.presence, tuple)
131 assert len(pe_snap.presence) == 4 # 4 frames
132
133 # port_store should be a tuple of tuples
134 assert isinstance(pe_snap.port_store, tuple)
135 assert len(pe_snap.port_store) == 4 # 4 frames
136
137 # free_frames should be a tuple of frame IDs
138 assert isinstance(pe_snap.free_frames, tuple)
139
140 def test_capture_pe_output_log(self):
141 """Test that PE output_log is captured."""
142 env = simpy.Environment()
143
144 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}
145 pe_configs = [PEConfig(pe_id=0, iram=iram)]
146 sm_configs = []
147
148 system = build_topology(env, pe_configs, sm_configs)
149
150 # Manually add a token to output_log to simulate emission
151 output_token = MonadToken(target=0, offset=1, act_id=0, data=99, inline=True)
152 system.pes[0].output_log.append(output_token)
153
154 snapshot = capture(system)
155
156 pe_snap = snapshot.pes[0]
157 assert len(pe_snap.output_log) == 1
158 assert pe_snap.output_log[0] == output_token
159
160 def test_capture_snapshot_is_frozen(self):
161 """Test that captured snapshots are frozen (immutable)."""
162 env = simpy.Environment()
163 system = build_topology(env, [], [])
164
165 snapshot = capture(system)
166
167 # Verify the snapshot dataclass is frozen
168 with pytest.raises(AttributeError):
169 snapshot.sim_time = 1.0
170
171 with pytest.raises(AttributeError):
172 snapshot.pes = {}
173
174 def test_capture_pe_tag_store(self):
175 """Test that PE tag_store is captured."""
176 env = simpy.Environment()
177
178 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}
179 pe_configs = [PEConfig(pe_id=0, iram=iram, frame_count=8)]
180 system = build_topology(env, pe_configs, [])
181
182 # Verify tag_store is captured
183 snapshot = capture(system)
184
185 pe_snap = snapshot.pes[0]
186 # tag_store should be a dict mapping act_id to (frame_id, lane) tuple
187 assert isinstance(pe_snap.tag_store, dict)
188 assert hasattr(pe_snap, 'frames')
189 assert hasattr(pe_snap, 'free_frames')
190
191 def test_capture_sm_deferred_read(self):
192 """Test that SM deferred_read state is captured."""
193 env = simpy.Environment()
194
195 sm_configs = [SMConfig(sm_id=0, cell_count=256)]
196 system = build_topology(env, [], sm_configs)
197
198 sm = system.sms[0]
199 # Manually set up a deferred read (in real code this happens during simulation)
200 return_token = MonadToken(target=0, offset=0, act_id=0, data=0, inline=False)
201 from emu.types import DeferredRead
202 sm.deferred_read = DeferredRead(cell_addr=42, return_route=return_token)
203
204 snapshot = capture(system)
205
206 sm_snap = snapshot.sms[0]
207 assert sm_snap.deferred_read is not None
208 assert sm_snap.deferred_read["cell_addr"] == 42
209 assert "return_route" in sm_snap.deferred_read
210
211 def test_capture_sm_t0_store(self):
212 """Test that SM T0 store is captured."""
213 env = simpy.Environment()
214
215 sm_configs = [SMConfig(sm_id=0, cell_count=256, tier_boundary=256)]
216 system = build_topology(env, [], sm_configs)
217
218 sm = system.sms[0]
219 # Add some int values to T0 store
220 sm.t0_store.append(777)
221 sm.t0_store.append(888)
222
223 snapshot = capture(system)
224
225 sm_snap = snapshot.sms[0]
226 assert len(sm_snap.t0_store) == 2
227 assert sm_snap.t0_store[0] == 777
228 assert sm_snap.t0_store[1] == 888
229
230 def test_capture_multiple_pes_and_sms(self):
231 """Test capturing state of a system with multiple PEs and SMs."""
232 env = simpy.Environment()
233
234 pe_configs = [
235 PEConfig(pe_id=0, iram={0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}, frame_count=4),
236 PEConfig(pe_id=1, iram={0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}, frame_count=4),
237 ]
238 sm_configs = [
239 SMConfig(sm_id=0, cell_count=256),
240 SMConfig(sm_id=1, cell_count=256),
241 ]
242
243 system = build_topology(env, pe_configs, sm_configs)
244
245 # Set up some state
246 system.sms[0].cells[5].pres = Presence.FULL
247 system.sms[0].cells[5].data_l = 50
248 system.sms[1].cells[10].pres = Presence.FULL
249 system.sms[1].cells[10].data_l = 100
250
251 snapshot = capture(system)
252
253 # Verify all PEs and SMs are captured
254 assert len(snapshot.pes) == 2
255 assert len(snapshot.sms) == 2
256
257 # Verify PE frame state is captured
258 assert len(snapshot.pes[0].frames) == 4
259 assert len(snapshot.pes[1].frames) == 4
260 assert isinstance(snapshot.pes[0].tag_store, dict)
261 assert isinstance(snapshot.pes[1].tag_store, dict)
262
263 # Verify SM state
264 assert snapshot.sms[0].cells[5].pres == Presence.FULL
265 assert snapshot.sms[0].cells[5].data_l == 50
266 assert snapshot.sms[1].cells[10].pres == Presence.FULL
267 assert snapshot.sms[1].cells[10].data_l == 100
268
269 def test_ac72_pe_snapshot_lacks_matching_store(self):
270 """AC7.2: PESnapshot must NOT have matching_store attribute.
271
272 Frame-based PE no longer uses matching_store; matching happens via
273 presence bits and frame slots. This test verifies the snapshot
274 correctly excludes this legacy field.
275 """
276 import simpy
277 from emu.pe import ProcessingElement
278 from emu.types import PEConfig
279 from monitor.snapshot import capture
280
281 env = simpy.Environment()
282 config = PEConfig(frame_count=4, frame_slots=32, matchable_offsets=4)
283 pe = ProcessingElement(env=env, pe_id=0, config=config)
284
285 # Build a minimal system for capture
286 from emu.network import build_topology
287 system = build_topology(env, [config], [], fifo_capacity=10)
288
289 snapshot = capture(system)
290
291 # Check PE snapshot
292 assert 0 in snapshot.pes
293 pe_snap = snapshot.pes[0]
294
295 # Verify NO matching_store attribute
296 assert not hasattr(pe_snap, 'matching_store'), \
297 "PESnapshot should NOT have matching_store (legacy field)"
298
299 def test_ac72_pe_snapshot_lacks_gen_counters(self):
300 """AC7.2: PESnapshot must NOT have gen_counters attribute.
301
302 Frame-based PE no longer uses generation counters; frame validity is
303 managed via tag_store and frame allocation/deallocation. This test
304 verifies the snapshot correctly excludes this legacy field.
305 """
306 import simpy
307 from emu.pe import ProcessingElement
308 from emu.types import PEConfig
309 from monitor.snapshot import capture
310
311 env = simpy.Environment()
312 config = PEConfig(frame_count=4, frame_slots=32, matchable_offsets=4)
313 pe = ProcessingElement(env=env, pe_id=0, config=config)
314
315 # Build a minimal system for capture
316 from emu.network import build_topology
317 system = build_topology(env, [config], [], fifo_capacity=10)
318
319 snapshot = capture(system)
320
321 # Check PE snapshot
322 assert 0 in snapshot.pes
323 pe_snap = snapshot.pes[0]
324
325 # Verify NO gen_counters attribute
326 assert not hasattr(pe_snap, 'gen_counters'), \
327 "PESnapshot should NOT have gen_counters (legacy field)"
328
329 def test_capture_sim_time_and_next_time(self):
330 """Test that sim_time and next_time are correctly captured."""
331 env = simpy.Environment()
332
333 # Create an empty system (no PEs/SMs)
334 system = build_topology(env, [], [])
335
336 assert env.now == 0.0
337 initial_snapshot = capture(system)
338 assert initial_snapshot.sim_time == 0.0
339 assert initial_snapshot.next_time == float('inf')
340
341 # Create a PE and add it to trigger time advancement
342 iram = {0: Instruction(opcode=RoutingOp.CONST, output=OutputStyle.SINK, has_const=True, dest_count=0, wide=False, fref=0)}
343 pe_configs = [PEConfig(pe_id=0, iram=iram)]
344 env2 = simpy.Environment()
345 system2 = build_topology(env2, pe_configs, [])
346
347 snapshot_with_pe = capture(system2)
348 assert snapshot_with_pe.sim_time == 0.0
349 # PE process starts at time 0, so next_time is 0
350 assert snapshot_with_pe.next_time == 0.0