OR-1 dataflow CPU sketch
1"""Tests for monitor/backend.py SimulationBackend.
2
3Tests verify:
4- or1-monitor.AC1.1: LoadCmd with valid program assembles, builds topology, injects seeds
5- or1-monitor.AC1.2: LoadCmd wires event callbacks into all PEs and SMs
6- or1-monitor.AC1.3: LoadCmd with invalid program returns ErrorResult without crashing
7- or1-monitor.AC1.4: ResetCmd tears down and leaves ready for new LoadCmd
8- or1-monitor.AC1.5: ResetCmd with reload=True reloads program
9- or1-monitor.AC5.2: StepTickCmd processes all events at current time before returning
10- or1-monitor.AC5.3: StepEventCmd processes exactly one event
11- or1-monitor.AC5.4: RunUntilCmd batches events per tick
12- or1-monitor.AC5.5: StepResult contains both events and snapshot
13- or1-monitor.AC5.6: Stepping when simulation finished returns finished=True
14"""
15
16import pytest
17import simpy
18from threading import Thread
19import time
20
21from monitor.backend import SimulationBackend
22from monitor.commands import (
23 LoadCmd, StepTickCmd, StepEventCmd, RunUntilCmd, InjectCmd, SendCmd,
24 ResetCmd, StopCmd, GraphLoaded, StepResult, ErrorResult
25)
26from monitor.snapshot import StateSnapshot
27from tokens import MonadToken
28
29
30class TestLoadCommand:
31 """Tests for LoadCmd acceptance criteria."""
32
33 def test_ac11_valid_program_returns_graphloaded(self):
34 """AC1.1: LoadCmd with valid dfasm assembles and returns GraphLoaded."""
35 backend = SimulationBackend()
36 source = """\
37@system pe=1, sm=0
38&c|pe0 <| const, 42
39"""
40 result = backend._handle_load(source)
41
42 assert isinstance(result, GraphLoaded)
43 assert result.ir_graph is not None
44 assert result.snapshot is not None
45 assert isinstance(result.snapshot, StateSnapshot)
46
47 def test_ac11_snapshot_has_seed_tokens(self):
48 """AC1.1: Initial snapshot shows injected seed tokens in PE input queue."""
49 backend = SimulationBackend()
50 source = """\
51@system pe=1, sm=0
52&c|pe0 <| const, 42
53"""
54 result = backend._handle_load(source)
55
56 assert isinstance(result, GraphLoaded)
57 snapshot = result.snapshot
58 # Seed token from const node should be in PE 0's input queue
59 assert 0 in snapshot.pes
60 pe_snap = snapshot.pes[0]
61 assert len(pe_snap.input_queue) >= 1 # At least the seed token
62
63 def test_ac12_callbacks_wired_to_pes_and_sms(self):
64 """AC1.2: LoadCmd wires on_event callbacks into all PEs and SMs."""
65 backend = SimulationBackend()
66 source = """\
67@system pe=1, sm=0
68&const_val|pe0 <| const, 1
69&add_op|pe0 <| add
70&const_val|pe0 |> &add_op|pe0:L
71"""
72 result = backend._handle_load(source)
73
74 assert isinstance(result, GraphLoaded)
75 # Run the simulation to capture events (cycle-accurate timing starts events at time 1+)
76 step_result = backend._handle_run_until(100)
77 # If callbacks are wired, events should be captured
78 assert isinstance(step_result, StepResult)
79 assert len(step_result.events) > 0, "Expected events to be collected if callbacks are wired"
80 # Verify specific event types appear (TokenReceived, Matched, Executed)
81 event_types = {type(e).__name__ for e in step_result.events}
82 assert "TokenReceived" in event_types or "Matched" in event_types or "Executed" in event_types, \
83 f"Expected at least one of TokenReceived/Matched/Executed, got: {event_types}"
84
85 def test_ac13_invalid_program_returns_error(self):
86 """AC1.3: LoadCmd with invalid dfasm returns ErrorResult."""
87 backend = SimulationBackend()
88 # Invalid: references undefined label
89 source = """\
90@system pe=1, sm=0
91&a|pe0 <| const, 5
92&a|pe0 |> &undefined|pe0:L
93"""
94 result = backend._handle_load(source)
95
96 assert isinstance(result, ErrorResult)
97 assert result.message # Should have error message
98 assert backend._system is None # Should not have loaded system
99
100 def test_ac13_backend_still_functional_after_error(self):
101 """AC1.3: Backend remains functional after error (can accept new LoadCmd)."""
102 backend = SimulationBackend()
103
104 # First: attempt invalid load
105 invalid_source = """\
106@system pe=1, sm=0
107&a|pe0 <| const, 5
108&a|pe0 |> &undefined|pe0:L
109"""
110 result1 = backend._handle_load(invalid_source)
111 assert isinstance(result1, ErrorResult)
112
113 # Second: load valid program — should succeed
114 valid_source = """\
115@system pe=1, sm=0
116&c|pe0 <| const, 99
117"""
118 result2 = backend._handle_load(valid_source)
119 assert isinstance(result2, GraphLoaded)
120
121 def test_ac72_backend_iram_populated_by_setup_tokens(self):
122 """AC7.2 backend: PE IRAM should be populated in snapshot after _handle_load().
123
124 This verifies that setup_tokens were injected before taking the snapshot,
125 proving that IRAM write instructions were executed during seed phase.
126 """
127 backend = SimulationBackend()
128 # Create a program with multiple instructions to populate IRAM
129 source = """\
130@system pe=1, sm=0
131&add|pe0 <| add
132&sub|pe0 <| sub
133&inc|pe0 <| inc
134&add|pe0 |> &sub|pe0:L
135&sub|pe0 |> &inc|pe0:L
136"""
137 result = backend._handle_load(source)
138
139 assert isinstance(result, GraphLoaded)
140 snapshot = result.snapshot
141
142 # PE 0 should exist
143 assert 0 in snapshot.pes
144 pe_snap = snapshot.pes[0]
145
146 # IRAM should be populated (at least some instructions written)
147 assert pe_snap.iram is not None, "PE IRAM should not be None"
148 assert len(pe_snap.iram) > 0, \
149 "PE IRAM should have been populated by setup_tokens"
150
151 # Verify some IRAM entries are valid Instruction objects
152 iram_entries = pe_snap.iram
153 assert any(v is not None for v in iram_entries.values()), \
154 "At least some IRAM entries should contain instructions"
155
156
157class TestResetCommand:
158 """Tests for ResetCmd acceptance criteria."""
159
160 def test_ac14_reset_tears_down_system(self):
161 """AC1.4: ResetCmd tears down current simulation."""
162 backend = SimulationBackend()
163 source = """\
164@system pe=1, sm=0
165&c|pe0 <| const, 42
166"""
167 # Load a program
168 backend._handle_load(source)
169 assert backend._system is not None
170
171 # Reset
172 result = backend._handle_reset(reload=False)
173
174 assert backend._system is None
175 assert backend._env is None
176 assert isinstance(result, StepResult)
177
178 def test_ac14_reset_ready_for_new_load(self):
179 """AC1.4: After reset, backend is ready for new LoadCmd."""
180 backend = SimulationBackend()
181 source1 = """\
182@system pe=1, sm=0
183&c|pe0 <| const, 42
184"""
185 # Load first program
186 backend._handle_load(source1)
187
188 # Reset
189 backend._handle_reset(reload=False)
190
191 # Load second program — should succeed
192 source2 = """\
193@system pe=2, sm=0
194&a|pe0 <| const, 1
195&b|pe1 <| pass
196"""
197 result = backend._handle_load(source2)
198 assert isinstance(result, GraphLoaded)
199
200 def test_ac15_reset_with_reload_reloads_program(self):
201 """AC1.5: ResetCmd with reload=True reloads the last program."""
202 backend = SimulationBackend()
203 source = """\
204@system pe=1, sm=0
205&c|pe0 <| const, 42
206"""
207 # Load program
208 result1 = backend._handle_load(source)
209 assert isinstance(result1, GraphLoaded)
210 ir_graph1 = result1.ir_graph
211
212 # Reset with reload
213 result2 = backend._handle_reset(reload=True)
214
215 assert isinstance(result2, GraphLoaded)
216 assert backend._system is not None
217 assert result2.ir_graph is not None
218
219
220class TestStepTickCommand:
221 """Tests for StepTickCmd acceptance criteria."""
222
223 def test_ac52_processes_all_events_at_current_time(self):
224 """AC5.2: StepTickCmd processes all events at current simulation time."""
225 backend = SimulationBackend()
226 source = """\
227@system pe=1, sm=0
228&c1|pe0 <| const, 1
229&c2|pe0 <| const, 2
230"""
231 backend._handle_load(source)
232
233 # Run simulation to capture events (cycle-accurate timing starts events at time 1+)
234 result = backend._handle_run_until(100)
235
236 assert isinstance(result, StepResult)
237 assert result.snapshot is not None
238 # Verify events were collected
239 assert len(result.events) > 0, "Expected events to be processed"
240 # After stepping, peek should advance or reach infinity
241 assert result.finished or result.snapshot.next_time > 0, \
242 f"Expected simulation to progress"
243
244 def test_ac55_result_contains_events_and_snapshot(self):
245 """AC5.5: StepResult contains both events and snapshot."""
246 backend = SimulationBackend()
247 source = """\
248@system pe=1, sm=0
249&c|pe0 <| const, 42
250"""
251 backend._handle_load(source)
252
253 result = backend._handle_step_tick()
254
255 assert isinstance(result, StepResult)
256 assert result.snapshot is not None
257 assert isinstance(result.snapshot, StateSnapshot)
258 assert result.events is not None
259
260 def test_ac56_finished_simulation_returns_finished_true(self):
261 """AC5.6: Stepping when finished returns finished=True without error."""
262 backend = SimulationBackend()
263 source = """\
264@system pe=1, sm=0
265&c|pe0 <| const, 42
266"""
267 backend._handle_load(source)
268
269 # Step until finished
270 while True:
271 result = backend._handle_step_tick()
272 if result.finished:
273 break
274 # Safety check to prevent infinite loop
275 if backend._env.now > 1000:
276 pytest.fail("Simulation did not finish within 1000 time units")
277
278 # Verify finished state
279 assert result.finished is True
280 assert result.snapshot is not None
281 assert backend._env.peek() == float('inf')
282
283
284class TestStepEventCommand:
285 """Tests for StepEventCmd acceptance criteria."""
286
287 def test_ac53_processes_exactly_one_event(self):
288 """AC5.3: StepEventCmd processes exactly one event."""
289 backend = SimulationBackend()
290 source = """\
291@system pe=1, sm=0
292&c1|pe0 <| const, 1
293&c2|pe0 <| const, 2
294&result|pe0 <| add
295&c1|pe0 |> &result|pe0:L
296&c2|pe0 |> &result|pe0:R
297"""
298 backend._handle_load(source)
299
300 # Collect events across multiple steps - at least some steps should have events
301 all_events = []
302 for _ in range(10):
303 result = backend._handle_step_event()
304 assert isinstance(result, StepResult)
305 all_events.extend(result.events)
306 if result.finished:
307 break
308
309 # After stepping multiple times, we should have collected some events
310 assert len(all_events) >= 1, f"Expected at least one event across 10 steps, got {len(all_events)}"
311
312 def test_ac53_repeated_events_make_progress(self):
313 """AC5.3: Multiple StepEventCmd calls process each event separately."""
314 backend = SimulationBackend()
315 source = """\
316@system pe=1, sm=0
317&c1|pe0 <| const, 1
318&c2|pe0 <| const, 2
319&result|pe0 <| add
320&c1|pe0 |> &result|pe0:L
321&c2|pe0 |> &result|pe0:R
322"""
323 backend._handle_load(source)
324
325 # Collect time values after each step
326 times = []
327 event_count = 0
328 for _ in range(10):
329 result = backend._handle_step_event()
330 times.append(backend._env.now)
331 # Track total events processed
332 event_count += len(result.events)
333 if result.finished:
334 break
335
336 # Times should be non-decreasing (verifies events are stepped individually)
337 assert times == sorted(times), f"Times not monotonic: {times}"
338 # Verify that at least some events were processed (not a crash)
339 assert event_count >= 1, f"Expected at least one event across steps, got {event_count}"
340
341
342class TestRunUntilCommand:
343 """Tests for RunUntilCmd acceptance criteria."""
344
345 def test_ac54_batches_events_per_tick(self):
346 """AC5.4: RunUntilCmd batches events per tick."""
347 backend = SimulationBackend()
348 source = """\
349@system pe=1, sm=0
350&c1|pe0 <| const, 1
351&c2|pe0 <| const, 2
352&result|pe0 <| add
353&c1|pe0 |> &result|pe0:L
354&c2|pe0 |> &result|pe0:R
355"""
356 backend._handle_load(source)
357
358 # Run until time 10
359 result = backend._handle_run_until(10.0)
360
361 assert isinstance(result, StepResult)
362 assert result.snapshot is not None
363 # Verify events were collected and sim time <= target or finished
364 assert len(result.events) > 0, "Expected events to be batched"
365 assert result.sim_time <= 10.0 or result.finished, \
366 f"Expected sim_time <= 10.0 or finished, got {result.sim_time} finished={result.finished}"
367
368 def test_ac54_stops_at_target_time(self):
369 """AC5.4: RunUntilCmd stops at or before target time."""
370 backend = SimulationBackend()
371 source = """\
372@system pe=1, sm=0
373&c|pe0 <| const, 42
374"""
375 backend._handle_load(source)
376
377 target = 50.0
378 result = backend._handle_run_until(target)
379
380 # Sim time should be <= target (or finished)
381 assert backend._env.now <= target or result.finished
382
383
384class TestInjectCommand:
385 """Tests for InjectCmd."""
386
387 def test_inject_token_appears_in_snapshot(self):
388 """InjectCmd injects token into correct PE."""
389 backend = SimulationBackend()
390 source = """\
391@system pe=1, sm=0
392&c|pe0 <| const, 42
393"""
394 backend._handle_load(source)
395
396 # Inject a token
397 token = MonadToken(target=0, offset=0, act_id=0, data=99, inline=True)
398 result = backend._handle_inject(token)
399
400 assert isinstance(result, StepResult)
401 # Token should be in PE 0's input queue
402 snapshot = result.snapshot
403 assert 0 in snapshot.pes
404 pe_snap = snapshot.pes[0]
405 assert token in pe_snap.input_queue
406
407
408class TestSendCommand:
409 """Tests for SendCmd."""
410
411 def test_send_token_respects_backpressure(self):
412 """SendCmd sends token via SimPy store.put()."""
413 backend = SimulationBackend()
414 source = """\
415@system pe=1, sm=0
416&c|pe0 <| const, 42
417"""
418 backend._handle_load(source)
419
420 # Send a token (should go through SimPy backpressure mechanism)
421 token = MonadToken(target=0, offset=0, act_id=0, data=77, inline=True)
422 result = backend._handle_send(token)
423
424 assert isinstance(result, StepResult)
425 assert result.snapshot is not None
426
427
428class TestThreadedInterface:
429 """Tests for the threaded interface (start, send_command, stop)."""
430
431 def test_start_stop_threading(self):
432 """Backend threading interface starts and stops cleanly."""
433 backend = SimulationBackend()
434 backend.start()
435
436 # Send a command
437 source = """\
438@system pe=1, sm=0
439&c|pe0 <| const, 42
440"""
441 result = backend.send_command(LoadCmd(source=source), timeout=5.0)
442
443 assert isinstance(result, GraphLoaded)
444
445 # Stop
446 backend.stop()
447
448 def test_send_command_timeout(self):
449 """send_command respects timeout parameter."""
450 backend = SimulationBackend()
451 backend.start()
452
453 source = """\
454@system pe=1, sm=0
455&c|pe0 <| const, 42
456"""
457 # This should succeed within timeout
458 result = backend.send_command(LoadCmd(source=source), timeout=5.0)
459 assert isinstance(result, GraphLoaded)
460
461 backend.stop()
462
463 def test_threaded_step_commands(self):
464 """Multiple step commands work in threaded mode."""
465 backend = SimulationBackend()
466 backend.start()
467
468 source = """\
469@system pe=1, sm=0
470&c|pe0 <| const, 42
471"""
472 backend.send_command(LoadCmd(source=source), timeout=5.0)
473
474 # Step a few times
475 for _ in range(3):
476 result = backend.send_command(StepTickCmd(), timeout=5.0)
477 assert isinstance(result, StepResult)
478 if result.finished:
479 break
480
481 backend.stop()
482
483 def test_error_handling_in_thread(self):
484 """Backend catches and returns errors from thread."""
485 backend = SimulationBackend()
486 backend.start()
487
488 # Send invalid program
489 result = backend.send_command(
490 LoadCmd(source="@system pe=1, sm=0\n&a|pe0 |> &undefined|pe0:L"),
491 timeout=5.0
492 )
493
494 assert isinstance(result, ErrorResult)
495 assert result.message
496
497 backend.stop()
498
499
500class TestSequentialWorkflow:
501 """Integration tests for typical workflows."""
502
503 def test_load_step_reset_reload_workflow(self):
504 """Workflow: Load → Step → Reset with reload → Step again."""
505 backend = SimulationBackend()
506
507 # Load
508 source = """\
509@system pe=1, sm=0
510&c|pe0 <| const, 42
511"""
512 result1 = backend._handle_load(source)
513 assert isinstance(result1, GraphLoaded)
514
515 # Step
516 result2 = backend._handle_step_tick()
517 assert isinstance(result2, StepResult)
518
519 # Reset with reload
520 result3 = backend._handle_reset(reload=True)
521 assert isinstance(result3, GraphLoaded)
522
523 # Step again
524 result4 = backend._handle_step_tick()
525 assert isinstance(result4, StepResult)
526
527 def test_load_with_multiple_pes_and_sms(self):
528 """Load a program with multiple PEs and SMs."""
529 backend = SimulationBackend()
530 # Create a program that actually uses the SM (with a write operation)
531 source = """\
532@system pe=2, sm=1
533&const_val|pe0 <| const, 42
534&write_op|pe0 <| write
535&relay|pe1 <| pass
536&const_val|pe0 |> &write_op|pe0:L
537&write_op|pe0 |> &relay|pe1:L
538"""
539 result = backend._handle_load(source)
540
541 assert isinstance(result, GraphLoaded)
542 snapshot = result.snapshot
543
544 # Verify multiple PEs are present
545 assert len(snapshot.pes) > 0
546 # SMs may or may not have state depending on program execution
547 # The system should have been set up correctly
548 assert backend._system is not None
549
550 def test_run_until_completion(self):
551 """Run simulation until completion."""
552 backend = SimulationBackend()
553 source = """\
554@system pe=1, sm=0
555&c|pe0 <| const, 42
556"""
557 backend._handle_load(source)
558
559 # Run until completion
560 while True:
561 result = backend._handle_step_tick()
562 if result.finished:
563 break
564 if backend._env.now > 1000:
565 pytest.fail("Simulation did not complete")
566
567 assert result.finished is True