"""Tests for monitor/backend.py SimulationBackend. Tests verify: - or1-monitor.AC1.1: LoadCmd with valid program assembles, builds topology, injects seeds - or1-monitor.AC1.2: LoadCmd wires event callbacks into all PEs and SMs - or1-monitor.AC1.3: LoadCmd with invalid program returns ErrorResult without crashing - or1-monitor.AC1.4: ResetCmd tears down and leaves ready for new LoadCmd - or1-monitor.AC1.5: ResetCmd with reload=True reloads program - or1-monitor.AC5.2: StepTickCmd processes all events at current time before returning - or1-monitor.AC5.3: StepEventCmd processes exactly one event - or1-monitor.AC5.4: RunUntilCmd batches events per tick - or1-monitor.AC5.5: StepResult contains both events and snapshot - or1-monitor.AC5.6: Stepping when simulation finished returns finished=True """ import pytest import simpy from threading import Thread import time from monitor.backend import SimulationBackend from monitor.commands import ( LoadCmd, StepTickCmd, StepEventCmd, RunUntilCmd, InjectCmd, SendCmd, ResetCmd, StopCmd, GraphLoaded, StepResult, ErrorResult ) from monitor.snapshot import StateSnapshot from tokens import MonadToken class TestLoadCommand: """Tests for LoadCmd acceptance criteria.""" def test_ac11_valid_program_returns_graphloaded(self): """AC1.1: LoadCmd with valid dfasm assembles and returns GraphLoaded.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ result = backend._handle_load(source) assert isinstance(result, GraphLoaded) assert result.ir_graph is not None assert result.snapshot is not None assert isinstance(result.snapshot, StateSnapshot) def test_ac11_snapshot_has_seed_tokens(self): """AC1.1: Initial snapshot shows injected seed tokens in PE input queue.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ result = backend._handle_load(source) assert isinstance(result, GraphLoaded) snapshot = result.snapshot # Seed token from const node should be in PE 0's input queue assert 0 in snapshot.pes pe_snap = snapshot.pes[0] assert len(pe_snap.input_queue) >= 1 # At least the seed token def test_ac12_callbacks_wired_to_pes_and_sms(self): """AC1.2: LoadCmd wires on_event callbacks into all PEs and SMs.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &const_val|pe0 <| const, 1 &add_op|pe0 <| add &const_val|pe0 |> &add_op|pe0:L """ result = backend._handle_load(source) assert isinstance(result, GraphLoaded) # Run the simulation to capture events (cycle-accurate timing starts events at time 1+) step_result = backend._handle_run_until(100) # If callbacks are wired, events should be captured assert isinstance(step_result, StepResult) assert len(step_result.events) > 0, "Expected events to be collected if callbacks are wired" # Verify specific event types appear (TokenReceived, Matched, Executed) event_types = {type(e).__name__ for e in step_result.events} assert "TokenReceived" in event_types or "Matched" in event_types or "Executed" in event_types, \ f"Expected at least one of TokenReceived/Matched/Executed, got: {event_types}" def test_ac13_invalid_program_returns_error(self): """AC1.3: LoadCmd with invalid dfasm returns ErrorResult.""" backend = SimulationBackend() # Invalid: references undefined label source = """\ @system pe=1, sm=0 &a|pe0 <| const, 5 &a|pe0 |> &undefined|pe0:L """ result = backend._handle_load(source) assert isinstance(result, ErrorResult) assert result.message # Should have error message assert backend._system is None # Should not have loaded system def test_ac13_backend_still_functional_after_error(self): """AC1.3: Backend remains functional after error (can accept new LoadCmd).""" backend = SimulationBackend() # First: attempt invalid load invalid_source = """\ @system pe=1, sm=0 &a|pe0 <| const, 5 &a|pe0 |> &undefined|pe0:L """ result1 = backend._handle_load(invalid_source) assert isinstance(result1, ErrorResult) # Second: load valid program — should succeed valid_source = """\ @system pe=1, sm=0 &c|pe0 <| const, 99 """ result2 = backend._handle_load(valid_source) assert isinstance(result2, GraphLoaded) def test_ac72_backend_iram_populated_by_setup_tokens(self): """AC7.2 backend: PE IRAM should be populated in snapshot after _handle_load(). This verifies that setup_tokens were injected before taking the snapshot, proving that IRAM write instructions were executed during seed phase. """ backend = SimulationBackend() # Create a program with multiple instructions to populate IRAM source = """\ @system pe=1, sm=0 &add|pe0 <| add &sub|pe0 <| sub &inc|pe0 <| inc &add|pe0 |> &sub|pe0:L &sub|pe0 |> &inc|pe0:L """ result = backend._handle_load(source) assert isinstance(result, GraphLoaded) snapshot = result.snapshot # PE 0 should exist assert 0 in snapshot.pes pe_snap = snapshot.pes[0] # IRAM should be populated (at least some instructions written) assert pe_snap.iram is not None, "PE IRAM should not be None" assert len(pe_snap.iram) > 0, \ "PE IRAM should have been populated by setup_tokens" # Verify some IRAM entries are valid Instruction objects iram_entries = pe_snap.iram assert any(v is not None for v in iram_entries.values()), \ "At least some IRAM entries should contain instructions" class TestResetCommand: """Tests for ResetCmd acceptance criteria.""" def test_ac14_reset_tears_down_system(self): """AC1.4: ResetCmd tears down current simulation.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ # Load a program backend._handle_load(source) assert backend._system is not None # Reset result = backend._handle_reset(reload=False) assert backend._system is None assert backend._env is None assert isinstance(result, StepResult) def test_ac14_reset_ready_for_new_load(self): """AC1.4: After reset, backend is ready for new LoadCmd.""" backend = SimulationBackend() source1 = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ # Load first program backend._handle_load(source1) # Reset backend._handle_reset(reload=False) # Load second program — should succeed source2 = """\ @system pe=2, sm=0 &a|pe0 <| const, 1 &b|pe1 <| pass """ result = backend._handle_load(source2) assert isinstance(result, GraphLoaded) def test_ac15_reset_with_reload_reloads_program(self): """AC1.5: ResetCmd with reload=True reloads the last program.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ # Load program result1 = backend._handle_load(source) assert isinstance(result1, GraphLoaded) ir_graph1 = result1.ir_graph # Reset with reload result2 = backend._handle_reset(reload=True) assert isinstance(result2, GraphLoaded) assert backend._system is not None assert result2.ir_graph is not None class TestStepTickCommand: """Tests for StepTickCmd acceptance criteria.""" def test_ac52_processes_all_events_at_current_time(self): """AC5.2: StepTickCmd processes all events at current simulation time.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c1|pe0 <| const, 1 &c2|pe0 <| const, 2 """ backend._handle_load(source) # Run simulation to capture events (cycle-accurate timing starts events at time 1+) result = backend._handle_run_until(100) assert isinstance(result, StepResult) assert result.snapshot is not None # Verify events were collected assert len(result.events) > 0, "Expected events to be processed" # After stepping, peek should advance or reach infinity assert result.finished or result.snapshot.next_time > 0, \ f"Expected simulation to progress" def test_ac55_result_contains_events_and_snapshot(self): """AC5.5: StepResult contains both events and snapshot.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ backend._handle_load(source) result = backend._handle_step_tick() assert isinstance(result, StepResult) assert result.snapshot is not None assert isinstance(result.snapshot, StateSnapshot) assert result.events is not None def test_ac56_finished_simulation_returns_finished_true(self): """AC5.6: Stepping when finished returns finished=True without error.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ backend._handle_load(source) # Step until finished while True: result = backend._handle_step_tick() if result.finished: break # Safety check to prevent infinite loop if backend._env.now > 1000: pytest.fail("Simulation did not finish within 1000 time units") # Verify finished state assert result.finished is True assert result.snapshot is not None assert backend._env.peek() == float('inf') class TestStepEventCommand: """Tests for StepEventCmd acceptance criteria.""" def test_ac53_processes_exactly_one_event(self): """AC5.3: StepEventCmd processes exactly one event.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c1|pe0 <| const, 1 &c2|pe0 <| const, 2 &result|pe0 <| add &c1|pe0 |> &result|pe0:L &c2|pe0 |> &result|pe0:R """ backend._handle_load(source) # Collect events across multiple steps - at least some steps should have events all_events = [] for _ in range(10): result = backend._handle_step_event() assert isinstance(result, StepResult) all_events.extend(result.events) if result.finished: break # After stepping multiple times, we should have collected some events assert len(all_events) >= 1, f"Expected at least one event across 10 steps, got {len(all_events)}" def test_ac53_repeated_events_make_progress(self): """AC5.3: Multiple StepEventCmd calls process each event separately.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c1|pe0 <| const, 1 &c2|pe0 <| const, 2 &result|pe0 <| add &c1|pe0 |> &result|pe0:L &c2|pe0 |> &result|pe0:R """ backend._handle_load(source) # Collect time values after each step times = [] event_count = 0 for _ in range(10): result = backend._handle_step_event() times.append(backend._env.now) # Track total events processed event_count += len(result.events) if result.finished: break # Times should be non-decreasing (verifies events are stepped individually) assert times == sorted(times), f"Times not monotonic: {times}" # Verify that at least some events were processed (not a crash) assert event_count >= 1, f"Expected at least one event across steps, got {event_count}" class TestRunUntilCommand: """Tests for RunUntilCmd acceptance criteria.""" def test_ac54_batches_events_per_tick(self): """AC5.4: RunUntilCmd batches events per tick.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c1|pe0 <| const, 1 &c2|pe0 <| const, 2 &result|pe0 <| add &c1|pe0 |> &result|pe0:L &c2|pe0 |> &result|pe0:R """ backend._handle_load(source) # Run until time 10 result = backend._handle_run_until(10.0) assert isinstance(result, StepResult) assert result.snapshot is not None # Verify events were collected and sim time <= target or finished assert len(result.events) > 0, "Expected events to be batched" assert result.sim_time <= 10.0 or result.finished, \ f"Expected sim_time <= 10.0 or finished, got {result.sim_time} finished={result.finished}" def test_ac54_stops_at_target_time(self): """AC5.4: RunUntilCmd stops at or before target time.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ backend._handle_load(source) target = 50.0 result = backend._handle_run_until(target) # Sim time should be <= target (or finished) assert backend._env.now <= target or result.finished class TestInjectCommand: """Tests for InjectCmd.""" def test_inject_token_appears_in_snapshot(self): """InjectCmd injects token into correct PE.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ backend._handle_load(source) # Inject a token token = MonadToken(target=0, offset=0, act_id=0, data=99, inline=True) result = backend._handle_inject(token) assert isinstance(result, StepResult) # Token should be in PE 0's input queue snapshot = result.snapshot assert 0 in snapshot.pes pe_snap = snapshot.pes[0] assert token in pe_snap.input_queue class TestSendCommand: """Tests for SendCmd.""" def test_send_token_respects_backpressure(self): """SendCmd sends token via SimPy store.put().""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ backend._handle_load(source) # Send a token (should go through SimPy backpressure mechanism) token = MonadToken(target=0, offset=0, act_id=0, data=77, inline=True) result = backend._handle_send(token) assert isinstance(result, StepResult) assert result.snapshot is not None class TestThreadedInterface: """Tests for the threaded interface (start, send_command, stop).""" def test_start_stop_threading(self): """Backend threading interface starts and stops cleanly.""" backend = SimulationBackend() backend.start() # Send a command source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ result = backend.send_command(LoadCmd(source=source), timeout=5.0) assert isinstance(result, GraphLoaded) # Stop backend.stop() def test_send_command_timeout(self): """send_command respects timeout parameter.""" backend = SimulationBackend() backend.start() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ # This should succeed within timeout result = backend.send_command(LoadCmd(source=source), timeout=5.0) assert isinstance(result, GraphLoaded) backend.stop() def test_threaded_step_commands(self): """Multiple step commands work in threaded mode.""" backend = SimulationBackend() backend.start() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ backend.send_command(LoadCmd(source=source), timeout=5.0) # Step a few times for _ in range(3): result = backend.send_command(StepTickCmd(), timeout=5.0) assert isinstance(result, StepResult) if result.finished: break backend.stop() def test_error_handling_in_thread(self): """Backend catches and returns errors from thread.""" backend = SimulationBackend() backend.start() # Send invalid program result = backend.send_command( LoadCmd(source="@system pe=1, sm=0\n&a|pe0 |> &undefined|pe0:L"), timeout=5.0 ) assert isinstance(result, ErrorResult) assert result.message backend.stop() class TestSequentialWorkflow: """Integration tests for typical workflows.""" def test_load_step_reset_reload_workflow(self): """Workflow: Load → Step → Reset with reload → Step again.""" backend = SimulationBackend() # Load source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ result1 = backend._handle_load(source) assert isinstance(result1, GraphLoaded) # Step result2 = backend._handle_step_tick() assert isinstance(result2, StepResult) # Reset with reload result3 = backend._handle_reset(reload=True) assert isinstance(result3, GraphLoaded) # Step again result4 = backend._handle_step_tick() assert isinstance(result4, StepResult) def test_load_with_multiple_pes_and_sms(self): """Load a program with multiple PEs and SMs.""" backend = SimulationBackend() # Create a program that actually uses the SM (with a write operation) source = """\ @system pe=2, sm=1 &const_val|pe0 <| const, 42 &write_op|pe0 <| write &relay|pe1 <| pass &const_val|pe0 |> &write_op|pe0:L &write_op|pe0 |> &relay|pe1:L """ result = backend._handle_load(source) assert isinstance(result, GraphLoaded) snapshot = result.snapshot # Verify multiple PEs are present assert len(snapshot.pes) > 0 # SMs may or may not have state depending on program execution # The system should have been set up correctly assert backend._system is not None def test_run_until_completion(self): """Run simulation until completion.""" backend = SimulationBackend() source = """\ @system pe=1, sm=0 &c|pe0 <| const, 42 """ backend._handle_load(source) # Run until completion while True: result = backend._handle_step_tick() if result.finished: break if backend._env.now > 1000: pytest.fail("Simulation did not complete") assert result.finished is True