OR-1 dataflow CPU sketch
at pe-frame-redesign 567 lines 18 kB view raw
1"""Tests for monitor/backend.py SimulationBackend. 2 3Tests verify: 4- or1-monitor.AC1.1: LoadCmd with valid program assembles, builds topology, injects seeds 5- or1-monitor.AC1.2: LoadCmd wires event callbacks into all PEs and SMs 6- or1-monitor.AC1.3: LoadCmd with invalid program returns ErrorResult without crashing 7- or1-monitor.AC1.4: ResetCmd tears down and leaves ready for new LoadCmd 8- or1-monitor.AC1.5: ResetCmd with reload=True reloads program 9- or1-monitor.AC5.2: StepTickCmd processes all events at current time before returning 10- or1-monitor.AC5.3: StepEventCmd processes exactly one event 11- or1-monitor.AC5.4: RunUntilCmd batches events per tick 12- or1-monitor.AC5.5: StepResult contains both events and snapshot 13- or1-monitor.AC5.6: Stepping when simulation finished returns finished=True 14""" 15 16import pytest 17import simpy 18from threading import Thread 19import time 20 21from monitor.backend import SimulationBackend 22from monitor.commands import ( 23 LoadCmd, StepTickCmd, StepEventCmd, RunUntilCmd, InjectCmd, SendCmd, 24 ResetCmd, StopCmd, GraphLoaded, StepResult, ErrorResult 25) 26from monitor.snapshot import StateSnapshot 27from tokens import MonadToken 28 29 30class TestLoadCommand: 31 """Tests for LoadCmd acceptance criteria.""" 32 33 def test_ac11_valid_program_returns_graphloaded(self): 34 """AC1.1: LoadCmd with valid dfasm assembles and returns GraphLoaded.""" 35 backend = SimulationBackend() 36 source = """\ 37@system pe=1, sm=0 38&c|pe0 <| const, 42 39""" 40 result = backend._handle_load(source) 41 42 assert isinstance(result, GraphLoaded) 43 assert result.ir_graph is not None 44 assert result.snapshot is not None 45 assert isinstance(result.snapshot, StateSnapshot) 46 47 def test_ac11_snapshot_has_seed_tokens(self): 48 """AC1.1: Initial snapshot shows injected seed tokens in PE input queue.""" 49 backend = SimulationBackend() 50 source = """\ 51@system pe=1, sm=0 52&c|pe0 <| const, 42 53""" 54 result = backend._handle_load(source) 55 56 assert isinstance(result, GraphLoaded) 57 snapshot = result.snapshot 58 # Seed token from const node should be in PE 0's input queue 59 assert 0 in snapshot.pes 60 pe_snap = snapshot.pes[0] 61 assert len(pe_snap.input_queue) >= 1 # At least the seed token 62 63 def test_ac12_callbacks_wired_to_pes_and_sms(self): 64 """AC1.2: LoadCmd wires on_event callbacks into all PEs and SMs.""" 65 backend = SimulationBackend() 66 source = """\ 67@system pe=1, sm=0 68&const_val|pe0 <| const, 1 69&add_op|pe0 <| add 70&const_val|pe0 |> &add_op|pe0:L 71""" 72 result = backend._handle_load(source) 73 74 assert isinstance(result, GraphLoaded) 75 # Run the simulation to capture events (cycle-accurate timing starts events at time 1+) 76 step_result = backend._handle_run_until(100) 77 # If callbacks are wired, events should be captured 78 assert isinstance(step_result, StepResult) 79 assert len(step_result.events) > 0, "Expected events to be collected if callbacks are wired" 80 # Verify specific event types appear (TokenReceived, Matched, Executed) 81 event_types = {type(e).__name__ for e in step_result.events} 82 assert "TokenReceived" in event_types or "Matched" in event_types or "Executed" in event_types, \ 83 f"Expected at least one of TokenReceived/Matched/Executed, got: {event_types}" 84 85 def test_ac13_invalid_program_returns_error(self): 86 """AC1.3: LoadCmd with invalid dfasm returns ErrorResult.""" 87 backend = SimulationBackend() 88 # Invalid: references undefined label 89 source = """\ 90@system pe=1, sm=0 91&a|pe0 <| const, 5 92&a|pe0 |> &undefined|pe0:L 93""" 94 result = backend._handle_load(source) 95 96 assert isinstance(result, ErrorResult) 97 assert result.message # Should have error message 98 assert backend._system is None # Should not have loaded system 99 100 def test_ac13_backend_still_functional_after_error(self): 101 """AC1.3: Backend remains functional after error (can accept new LoadCmd).""" 102 backend = SimulationBackend() 103 104 # First: attempt invalid load 105 invalid_source = """\ 106@system pe=1, sm=0 107&a|pe0 <| const, 5 108&a|pe0 |> &undefined|pe0:L 109""" 110 result1 = backend._handle_load(invalid_source) 111 assert isinstance(result1, ErrorResult) 112 113 # Second: load valid program — should succeed 114 valid_source = """\ 115@system pe=1, sm=0 116&c|pe0 <| const, 99 117""" 118 result2 = backend._handle_load(valid_source) 119 assert isinstance(result2, GraphLoaded) 120 121 def test_ac72_backend_iram_populated_by_setup_tokens(self): 122 """AC7.2 backend: PE IRAM should be populated in snapshot after _handle_load(). 123 124 This verifies that setup_tokens were injected before taking the snapshot, 125 proving that IRAM write instructions were executed during seed phase. 126 """ 127 backend = SimulationBackend() 128 # Create a program with multiple instructions to populate IRAM 129 source = """\ 130@system pe=1, sm=0 131&add|pe0 <| add 132&sub|pe0 <| sub 133&inc|pe0 <| inc 134&add|pe0 |> &sub|pe0:L 135&sub|pe0 |> &inc|pe0:L 136""" 137 result = backend._handle_load(source) 138 139 assert isinstance(result, GraphLoaded) 140 snapshot = result.snapshot 141 142 # PE 0 should exist 143 assert 0 in snapshot.pes 144 pe_snap = snapshot.pes[0] 145 146 # IRAM should be populated (at least some instructions written) 147 assert pe_snap.iram is not None, "PE IRAM should not be None" 148 assert len(pe_snap.iram) > 0, \ 149 "PE IRAM should have been populated by setup_tokens" 150 151 # Verify some IRAM entries are valid Instruction objects 152 iram_entries = pe_snap.iram 153 assert any(v is not None for v in iram_entries.values()), \ 154 "At least some IRAM entries should contain instructions" 155 156 157class TestResetCommand: 158 """Tests for ResetCmd acceptance criteria.""" 159 160 def test_ac14_reset_tears_down_system(self): 161 """AC1.4: ResetCmd tears down current simulation.""" 162 backend = SimulationBackend() 163 source = """\ 164@system pe=1, sm=0 165&c|pe0 <| const, 42 166""" 167 # Load a program 168 backend._handle_load(source) 169 assert backend._system is not None 170 171 # Reset 172 result = backend._handle_reset(reload=False) 173 174 assert backend._system is None 175 assert backend._env is None 176 assert isinstance(result, StepResult) 177 178 def test_ac14_reset_ready_for_new_load(self): 179 """AC1.4: After reset, backend is ready for new LoadCmd.""" 180 backend = SimulationBackend() 181 source1 = """\ 182@system pe=1, sm=0 183&c|pe0 <| const, 42 184""" 185 # Load first program 186 backend._handle_load(source1) 187 188 # Reset 189 backend._handle_reset(reload=False) 190 191 # Load second program — should succeed 192 source2 = """\ 193@system pe=2, sm=0 194&a|pe0 <| const, 1 195&b|pe1 <| pass 196""" 197 result = backend._handle_load(source2) 198 assert isinstance(result, GraphLoaded) 199 200 def test_ac15_reset_with_reload_reloads_program(self): 201 """AC1.5: ResetCmd with reload=True reloads the last program.""" 202 backend = SimulationBackend() 203 source = """\ 204@system pe=1, sm=0 205&c|pe0 <| const, 42 206""" 207 # Load program 208 result1 = backend._handle_load(source) 209 assert isinstance(result1, GraphLoaded) 210 ir_graph1 = result1.ir_graph 211 212 # Reset with reload 213 result2 = backend._handle_reset(reload=True) 214 215 assert isinstance(result2, GraphLoaded) 216 assert backend._system is not None 217 assert result2.ir_graph is not None 218 219 220class TestStepTickCommand: 221 """Tests for StepTickCmd acceptance criteria.""" 222 223 def test_ac52_processes_all_events_at_current_time(self): 224 """AC5.2: StepTickCmd processes all events at current simulation time.""" 225 backend = SimulationBackend() 226 source = """\ 227@system pe=1, sm=0 228&c1|pe0 <| const, 1 229&c2|pe0 <| const, 2 230""" 231 backend._handle_load(source) 232 233 # Run simulation to capture events (cycle-accurate timing starts events at time 1+) 234 result = backend._handle_run_until(100) 235 236 assert isinstance(result, StepResult) 237 assert result.snapshot is not None 238 # Verify events were collected 239 assert len(result.events) > 0, "Expected events to be processed" 240 # After stepping, peek should advance or reach infinity 241 assert result.finished or result.snapshot.next_time > 0, \ 242 f"Expected simulation to progress" 243 244 def test_ac55_result_contains_events_and_snapshot(self): 245 """AC5.5: StepResult contains both events and snapshot.""" 246 backend = SimulationBackend() 247 source = """\ 248@system pe=1, sm=0 249&c|pe0 <| const, 42 250""" 251 backend._handle_load(source) 252 253 result = backend._handle_step_tick() 254 255 assert isinstance(result, StepResult) 256 assert result.snapshot is not None 257 assert isinstance(result.snapshot, StateSnapshot) 258 assert result.events is not None 259 260 def test_ac56_finished_simulation_returns_finished_true(self): 261 """AC5.6: Stepping when finished returns finished=True without error.""" 262 backend = SimulationBackend() 263 source = """\ 264@system pe=1, sm=0 265&c|pe0 <| const, 42 266""" 267 backend._handle_load(source) 268 269 # Step until finished 270 while True: 271 result = backend._handle_step_tick() 272 if result.finished: 273 break 274 # Safety check to prevent infinite loop 275 if backend._env.now > 1000: 276 pytest.fail("Simulation did not finish within 1000 time units") 277 278 # Verify finished state 279 assert result.finished is True 280 assert result.snapshot is not None 281 assert backend._env.peek() == float('inf') 282 283 284class TestStepEventCommand: 285 """Tests for StepEventCmd acceptance criteria.""" 286 287 def test_ac53_processes_exactly_one_event(self): 288 """AC5.3: StepEventCmd processes exactly one event.""" 289 backend = SimulationBackend() 290 source = """\ 291@system pe=1, sm=0 292&c1|pe0 <| const, 1 293&c2|pe0 <| const, 2 294&result|pe0 <| add 295&c1|pe0 |> &result|pe0:L 296&c2|pe0 |> &result|pe0:R 297""" 298 backend._handle_load(source) 299 300 # Collect events across multiple steps - at least some steps should have events 301 all_events = [] 302 for _ in range(10): 303 result = backend._handle_step_event() 304 assert isinstance(result, StepResult) 305 all_events.extend(result.events) 306 if result.finished: 307 break 308 309 # After stepping multiple times, we should have collected some events 310 assert len(all_events) >= 1, f"Expected at least one event across 10 steps, got {len(all_events)}" 311 312 def test_ac53_repeated_events_make_progress(self): 313 """AC5.3: Multiple StepEventCmd calls process each event separately.""" 314 backend = SimulationBackend() 315 source = """\ 316@system pe=1, sm=0 317&c1|pe0 <| const, 1 318&c2|pe0 <| const, 2 319&result|pe0 <| add 320&c1|pe0 |> &result|pe0:L 321&c2|pe0 |> &result|pe0:R 322""" 323 backend._handle_load(source) 324 325 # Collect time values after each step 326 times = [] 327 event_count = 0 328 for _ in range(10): 329 result = backend._handle_step_event() 330 times.append(backend._env.now) 331 # Track total events processed 332 event_count += len(result.events) 333 if result.finished: 334 break 335 336 # Times should be non-decreasing (verifies events are stepped individually) 337 assert times == sorted(times), f"Times not monotonic: {times}" 338 # Verify that at least some events were processed (not a crash) 339 assert event_count >= 1, f"Expected at least one event across steps, got {event_count}" 340 341 342class TestRunUntilCommand: 343 """Tests for RunUntilCmd acceptance criteria.""" 344 345 def test_ac54_batches_events_per_tick(self): 346 """AC5.4: RunUntilCmd batches events per tick.""" 347 backend = SimulationBackend() 348 source = """\ 349@system pe=1, sm=0 350&c1|pe0 <| const, 1 351&c2|pe0 <| const, 2 352&result|pe0 <| add 353&c1|pe0 |> &result|pe0:L 354&c2|pe0 |> &result|pe0:R 355""" 356 backend._handle_load(source) 357 358 # Run until time 10 359 result = backend._handle_run_until(10.0) 360 361 assert isinstance(result, StepResult) 362 assert result.snapshot is not None 363 # Verify events were collected and sim time <= target or finished 364 assert len(result.events) > 0, "Expected events to be batched" 365 assert result.sim_time <= 10.0 or result.finished, \ 366 f"Expected sim_time <= 10.0 or finished, got {result.sim_time} finished={result.finished}" 367 368 def test_ac54_stops_at_target_time(self): 369 """AC5.4: RunUntilCmd stops at or before target time.""" 370 backend = SimulationBackend() 371 source = """\ 372@system pe=1, sm=0 373&c|pe0 <| const, 42 374""" 375 backend._handle_load(source) 376 377 target = 50.0 378 result = backend._handle_run_until(target) 379 380 # Sim time should be <= target (or finished) 381 assert backend._env.now <= target or result.finished 382 383 384class TestInjectCommand: 385 """Tests for InjectCmd.""" 386 387 def test_inject_token_appears_in_snapshot(self): 388 """InjectCmd injects token into correct PE.""" 389 backend = SimulationBackend() 390 source = """\ 391@system pe=1, sm=0 392&c|pe0 <| const, 42 393""" 394 backend._handle_load(source) 395 396 # Inject a token 397 token = MonadToken(target=0, offset=0, act_id=0, data=99, inline=True) 398 result = backend._handle_inject(token) 399 400 assert isinstance(result, StepResult) 401 # Token should be in PE 0's input queue 402 snapshot = result.snapshot 403 assert 0 in snapshot.pes 404 pe_snap = snapshot.pes[0] 405 assert token in pe_snap.input_queue 406 407 408class TestSendCommand: 409 """Tests for SendCmd.""" 410 411 def test_send_token_respects_backpressure(self): 412 """SendCmd sends token via SimPy store.put().""" 413 backend = SimulationBackend() 414 source = """\ 415@system pe=1, sm=0 416&c|pe0 <| const, 42 417""" 418 backend._handle_load(source) 419 420 # Send a token (should go through SimPy backpressure mechanism) 421 token = MonadToken(target=0, offset=0, act_id=0, data=77, inline=True) 422 result = backend._handle_send(token) 423 424 assert isinstance(result, StepResult) 425 assert result.snapshot is not None 426 427 428class TestThreadedInterface: 429 """Tests for the threaded interface (start, send_command, stop).""" 430 431 def test_start_stop_threading(self): 432 """Backend threading interface starts and stops cleanly.""" 433 backend = SimulationBackend() 434 backend.start() 435 436 # Send a command 437 source = """\ 438@system pe=1, sm=0 439&c|pe0 <| const, 42 440""" 441 result = backend.send_command(LoadCmd(source=source), timeout=5.0) 442 443 assert isinstance(result, GraphLoaded) 444 445 # Stop 446 backend.stop() 447 448 def test_send_command_timeout(self): 449 """send_command respects timeout parameter.""" 450 backend = SimulationBackend() 451 backend.start() 452 453 source = """\ 454@system pe=1, sm=0 455&c|pe0 <| const, 42 456""" 457 # This should succeed within timeout 458 result = backend.send_command(LoadCmd(source=source), timeout=5.0) 459 assert isinstance(result, GraphLoaded) 460 461 backend.stop() 462 463 def test_threaded_step_commands(self): 464 """Multiple step commands work in threaded mode.""" 465 backend = SimulationBackend() 466 backend.start() 467 468 source = """\ 469@system pe=1, sm=0 470&c|pe0 <| const, 42 471""" 472 backend.send_command(LoadCmd(source=source), timeout=5.0) 473 474 # Step a few times 475 for _ in range(3): 476 result = backend.send_command(StepTickCmd(), timeout=5.0) 477 assert isinstance(result, StepResult) 478 if result.finished: 479 break 480 481 backend.stop() 482 483 def test_error_handling_in_thread(self): 484 """Backend catches and returns errors from thread.""" 485 backend = SimulationBackend() 486 backend.start() 487 488 # Send invalid program 489 result = backend.send_command( 490 LoadCmd(source="@system pe=1, sm=0\n&a|pe0 |> &undefined|pe0:L"), 491 timeout=5.0 492 ) 493 494 assert isinstance(result, ErrorResult) 495 assert result.message 496 497 backend.stop() 498 499 500class TestSequentialWorkflow: 501 """Integration tests for typical workflows.""" 502 503 def test_load_step_reset_reload_workflow(self): 504 """Workflow: Load → Step → Reset with reload → Step again.""" 505 backend = SimulationBackend() 506 507 # Load 508 source = """\ 509@system pe=1, sm=0 510&c|pe0 <| const, 42 511""" 512 result1 = backend._handle_load(source) 513 assert isinstance(result1, GraphLoaded) 514 515 # Step 516 result2 = backend._handle_step_tick() 517 assert isinstance(result2, StepResult) 518 519 # Reset with reload 520 result3 = backend._handle_reset(reload=True) 521 assert isinstance(result3, GraphLoaded) 522 523 # Step again 524 result4 = backend._handle_step_tick() 525 assert isinstance(result4, StepResult) 526 527 def test_load_with_multiple_pes_and_sms(self): 528 """Load a program with multiple PEs and SMs.""" 529 backend = SimulationBackend() 530 # Create a program that actually uses the SM (with a write operation) 531 source = """\ 532@system pe=2, sm=1 533&const_val|pe0 <| const, 42 534&write_op|pe0 <| write 535&relay|pe1 <| pass 536&const_val|pe0 |> &write_op|pe0:L 537&write_op|pe0 |> &relay|pe1:L 538""" 539 result = backend._handle_load(source) 540 541 assert isinstance(result, GraphLoaded) 542 snapshot = result.snapshot 543 544 # Verify multiple PEs are present 545 assert len(snapshot.pes) > 0 546 # SMs may or may not have state depending on program execution 547 # The system should have been set up correctly 548 assert backend._system is not None 549 550 def test_run_until_completion(self): 551 """Run simulation until completion.""" 552 backend = SimulationBackend() 553 source = """\ 554@system pe=1, sm=0 555&c|pe0 <| const, 42 556""" 557 backend._handle_load(source) 558 559 # Run until completion 560 while True: 561 result = backend._handle_step_tick() 562 if result.finished: 563 break 564 if backend._env.now > 1000: 565 pytest.fail("Simulation did not complete") 566 567 assert result.finished is True