OR-1 dataflow CPU sketch
at main 795 lines 22 kB view raw
1"""Unit tests for monitor REPL functionality. 2 3Tests cover all acceptance criteria for AC4.1-AC4.9, verifying REPL 4commands work correctly with a synchronized backend and proper 5state management. 6""" 7 8from __future__ import annotations 9 10import io 11import sys 12from pathlib import Path 13from tempfile import NamedTemporaryFile 14 15import pytest 16 17from cm_inst import MemOp 18from monitor.backend import SimulationBackend 19from monitor.repl import MonitorREPL 20from tokens import MonadToken, SMToken 21 22 23@pytest.fixture 24def temp_dfasm_file(): 25 """Create a temporary dfasm file for testing. 26 27 A minimal valid dfasm program that loads without errors. 28 """ 29 content = """@system pe=1, sm=0 30&c1 <| const, 3 31&c2 <| const, 7 32&result <| add 33&c1 |> &result:L 34&c2 |> &result:R 35""" 36 with NamedTemporaryFile(mode="w", suffix=".dfasm", delete=False) as f: 37 f.write(content) 38 f.flush() 39 path = Path(f.name) 40 41 yield path 42 43 # Cleanup 44 path.unlink() 45 46 47@pytest.fixture 48def backend(): 49 """Create and start a simulation backend for testing.""" 50 b = SimulationBackend() 51 b.start() 52 yield b 53 b.stop() 54 55 56@pytest.fixture 57def repl(backend): 58 """Create a REPL instance with a test backend.""" 59 return MonitorREPL(backend) 60 61 62class TestREPLLoad: 63 """Tests for AC4.1: load command assembles and loads dfasm.""" 64 65 def test_load_valid_file(self, repl, temp_dfasm_file): 66 """load <path> should load a valid dfasm file.""" 67 # Capture output 68 output = io.StringIO() 69 repl.stdout = output 70 71 # Call do_load 72 repl.do_load(str(temp_dfasm_file)) 73 74 # Check state 75 assert repl._loaded is True 76 assert repl._last_snapshot is not None 77 assert "Loaded" in output.getvalue() 78 79 def test_load_nonexistent_file(self, repl): 80 """load with nonexistent path should fail gracefully.""" 81 output = io.StringIO() 82 repl.stdout = output 83 84 repl.do_load("/nonexistent/path.dfasm") 85 86 assert repl._loaded is False 87 assert "not found" in output.getvalue() 88 89 def test_load_invalid_syntax(self, repl): 90 """load with invalid dfasm syntax should report error.""" 91 # Create invalid file 92 with NamedTemporaryFile(mode="w", suffix=".dfasm", delete=False) as f: 93 f.write("@@@ invalid syntax @@@") 94 f.flush() 95 path = Path(f.name) 96 97 try: 98 output = io.StringIO() 99 repl.stdout = output 100 101 repl.do_load(str(path)) 102 103 assert repl._loaded is False 104 out = output.getvalue() 105 assert "Error" in out 106 finally: 107 path.unlink() 108 109 def test_load_without_args(self, repl): 110 """load without args should print usage.""" 111 output = io.StringIO() 112 repl.stdout = output 113 114 repl.do_load("") 115 116 assert "Usage" in output.getvalue() 117 118 119class TestREPLStep: 120 """Tests for AC4.2: step and stepe commands advance simulation.""" 121 122 def test_step_without_load(self, repl): 123 """step without loaded simulation should error (AC4.8).""" 124 output = io.StringIO() 125 repl.stdout = output 126 127 repl.do_step("") 128 129 assert "No simulation loaded" in output.getvalue() 130 131 def test_step_after_load(self, repl, temp_dfasm_file): 132 """step after loading should advance simulation.""" 133 # Load first 134 output = io.StringIO() 135 repl.stdout = output 136 137 repl.do_load(str(temp_dfasm_file)) 138 assert repl._loaded 139 140 # Clear and step 141 output = io.StringIO() 142 repl.stdout = output 143 144 repl.do_step("") 145 146 # Should produce output (snapshot and/or events) 147 assert len(output.getvalue()) > 0 148 149 def test_stepe_after_load(self, repl, temp_dfasm_file): 150 """stepe after loading should advance by one event.""" 151 # Load first 152 output = io.StringIO() 153 repl.stdout = output 154 155 repl.do_load(str(temp_dfasm_file)) 156 assert repl._loaded 157 158 # Clear and step 159 output = io.StringIO() 160 repl.stdout = output 161 162 repl.do_stepe("") 163 164 # Should produce output 165 assert len(output.getvalue()) > 0 166 167 def test_stepe_without_load(self, repl): 168 """stepe without loaded simulation should error (AC4.8).""" 169 output = io.StringIO() 170 repl.stdout = output 171 172 repl.do_stepe("") 173 174 assert "No simulation loaded" in output.getvalue() 175 176 177class TestREPLRun: 178 """Tests for AC4.3: run command runs until target time.""" 179 180 def test_run_without_load(self, repl): 181 """run without loaded simulation should error (AC4.8).""" 182 output = io.StringIO() 183 repl.stdout = output 184 185 repl.do_run("10") 186 187 assert "No simulation loaded" in output.getvalue() 188 189 def test_run_after_load(self, repl, temp_dfasm_file): 190 """run <until> should run until target time.""" 191 # Load first 192 output = io.StringIO() 193 repl.stdout = output 194 195 repl.do_load(str(temp_dfasm_file)) 196 assert repl._loaded 197 198 # Clear and run 199 output = io.StringIO() 200 repl.stdout = output 201 202 repl.do_run("10") 203 204 # Should produce output 205 assert len(output.getvalue()) > 0 206 207 def test_run_invalid_time(self, repl, temp_dfasm_file): 208 """run with invalid time should error.""" 209 # Load first 210 output = io.StringIO() 211 repl.stdout = output 212 213 repl.do_load(str(temp_dfasm_file)) 214 215 # Try invalid time 216 output = io.StringIO() 217 repl.stdout = output 218 219 repl.do_run("not_a_number") 220 221 assert "Invalid time" in output.getvalue() 222 223 def test_run_without_args(self, repl, temp_dfasm_file): 224 """run without args should print usage.""" 225 # Load first 226 output = io.StringIO() 227 repl.stdout = output 228 229 repl.do_load(str(temp_dfasm_file)) 230 231 # Try without args 232 output = io.StringIO() 233 repl.stdout = output 234 235 repl.do_run("") 236 237 assert "Usage" in output.getvalue() 238 239 240class TestREPLSend: 241 """Tests for AC4.4: send command injects token via FIFO.""" 242 243 def test_send_monad_without_load(self, repl): 244 """send without loaded simulation should error (AC4.8).""" 245 output = io.StringIO() 246 repl.stdout = output 247 248 repl.do_send("0 0 0 42") 249 250 assert "No simulation loaded" in output.getvalue() 251 252 def test_send_monad_token(self, repl, temp_dfasm_file): 253 """send <target> <offset> <ctx> <data> should send MonadToken.""" 254 # Load first 255 output = io.StringIO() 256 repl.stdout = output 257 258 repl.do_load(str(temp_dfasm_file)) 259 assert repl._loaded 260 261 # Clear and send 262 output = io.StringIO() 263 repl.stdout = output 264 265 repl.do_send("0 0 0 42") 266 267 # Should produce output (no exception) 268 assert "Sent" in output.getvalue() 269 270 def test_send_sm_token(self, repl, temp_dfasm_file): 271 """send <target> --sm <addr> <op> should send SMToken.""" 272 # Load first 273 output = io.StringIO() 274 repl.stdout = output 275 276 repl.do_load(str(temp_dfasm_file)) 277 assert repl._loaded 278 279 # Clear and send SM token 280 output = io.StringIO() 281 repl.stdout = output 282 283 repl.do_send("0 --sm 5 READ") 284 285 # Should produce output (no exception) 286 assert "Sent" in output.getvalue() 287 288 def test_send_sm_token_with_data(self, repl, temp_dfasm_file): 289 """send <target> --sm <addr> <op> <data> should work.""" 290 # Load first 291 output = io.StringIO() 292 repl.stdout = output 293 294 repl.do_load(str(temp_dfasm_file)) 295 assert repl._loaded 296 297 # Send SM token with data 298 output = io.StringIO() 299 repl.stdout = output 300 301 repl.do_send("0 --sm 5 WRITE 123") 302 303 assert "Sent" in output.getvalue() 304 305 def test_send_invalid_args(self, repl, temp_dfasm_file): 306 """send with invalid args should error gracefully.""" 307 # Load first 308 output = io.StringIO() 309 repl.stdout = output 310 311 repl.do_load(str(temp_dfasm_file)) 312 313 # Try invalid args 314 output = io.StringIO() 315 repl.stdout = output 316 317 repl.do_send("not_an_int") 318 319 assert "Error" in output.getvalue() 320 321 def test_send_unknown_memop(self, repl, temp_dfasm_file): 322 """send with unknown MemOp should error.""" 323 # Load first 324 output = io.StringIO() 325 repl.stdout = output 326 327 repl.do_load(str(temp_dfasm_file)) 328 329 # Try unknown MemOp 330 output = io.StringIO() 331 repl.stdout = output 332 333 repl.do_send("0 --sm 5 UNKNOWN_OP") 334 335 assert "Unknown MemOp" in output.getvalue() 336 337 def test_send_without_args(self, repl, temp_dfasm_file): 338 """send without args should print usage.""" 339 # Load first 340 output = io.StringIO() 341 repl.stdout = output 342 343 repl.do_load(str(temp_dfasm_file)) 344 345 # Try without args 346 output = io.StringIO() 347 repl.stdout = output 348 349 repl.do_send("") 350 351 assert "Usage" in output.getvalue() 352 353 354class TestREPLInject: 355 """Tests for AC4.5: inject command does direct injection.""" 356 357 def test_inject_monad_without_load(self, repl): 358 """inject without loaded simulation should error (AC4.8).""" 359 output = io.StringIO() 360 repl.stdout = output 361 362 repl.do_inject("0 0 0 42") 363 364 assert "No simulation loaded" in output.getvalue() 365 366 def test_inject_monad_token(self, repl, temp_dfasm_file): 367 """inject <target> <offset> <ctx> <data> should inject MonadToken.""" 368 # Load first 369 output = io.StringIO() 370 repl.stdout = output 371 372 repl.do_load(str(temp_dfasm_file)) 373 assert repl._loaded 374 375 # Clear and inject 376 output = io.StringIO() 377 repl.stdout = output 378 379 repl.do_inject("0 0 0 42") 380 381 # Should produce output (no exception) 382 assert "Injected" in output.getvalue() 383 384 def test_inject_sm_token(self, repl): 385 """inject <target> --sm <addr> <op> should inject SMToken without throwing.""" 386 # For this test, we just verify token parsing works 387 # Actual injection will fail without SM, but parsing should succeed 388 output = io.StringIO() 389 repl.stdout = output 390 391 # Set loaded manually just to bypass the check 392 repl._loaded = True 393 394 # Just verify the token parsing happens without exception 395 token = repl._parse_token_args("0 --sm 5 WRITE") 396 assert token.addr == 5 397 398 def test_inject_invalid_args(self, repl, temp_dfasm_file): 399 """inject with invalid args should error gracefully.""" 400 # Load first 401 output = io.StringIO() 402 repl.stdout = output 403 404 repl.do_load(str(temp_dfasm_file)) 405 406 # Try invalid args 407 output = io.StringIO() 408 repl.stdout = output 409 410 repl.do_inject("invalid") 411 412 assert "Error" in output.getvalue() 413 414 415class TestREPLState: 416 """Tests for AC4.6: state, pe, sm commands display readable state.""" 417 418 def test_state_without_load(self, repl): 419 """state without loaded simulation should error (AC4.8).""" 420 output = io.StringIO() 421 repl.stdout = output 422 423 repl.do_state("") 424 425 assert "No simulation loaded" in output.getvalue() 426 427 def test_state_after_load(self, repl, temp_dfasm_file): 428 """state should display system state summary.""" 429 # Load first 430 output = io.StringIO() 431 repl.stdout = output 432 433 repl.do_load(str(temp_dfasm_file)) 434 assert repl._loaded 435 436 # Clear and call state 437 output = io.StringIO() 438 repl.stdout = output 439 440 repl.do_state("") 441 442 out = output.getvalue() 443 # Should show PE/SM counts and time info 444 assert len(out) > 0 445 assert ("PE" in out or "SM" in out or "Time" in out) 446 447 def test_pe_without_load(self, repl): 448 """pe without loaded simulation should error (AC4.8).""" 449 output = io.StringIO() 450 repl.stdout = output 451 452 repl.do_pe("0") 453 454 assert "No simulation loaded" in output.getvalue() 455 456 def test_pe_after_load(self, repl, temp_dfasm_file): 457 """pe <id> should display PE state.""" 458 # Load first 459 output = io.StringIO() 460 repl.stdout = output 461 462 repl.do_load(str(temp_dfasm_file)) 463 assert repl._loaded 464 465 # Clear and call pe 466 output = io.StringIO() 467 repl.stdout = output 468 469 repl.do_pe("0") 470 471 out = output.getvalue() 472 # Should show PE state or not found message 473 assert len(out) > 0 474 # Verify formatting includes lane information or empty tag store marker (case-insensitive) 475 assert "lane" in out.lower() or "tag store: (empty)" in out.lower() 476 477 def test_pe_invalid_id(self, repl, temp_dfasm_file): 478 """pe with non-integer ID should error.""" 479 # Load first 480 output = io.StringIO() 481 repl.stdout = output 482 483 repl.do_load(str(temp_dfasm_file)) 484 485 # Try invalid ID 486 output = io.StringIO() 487 repl.stdout = output 488 489 repl.do_pe("invalid_id") 490 491 assert "Invalid PE ID" in output.getvalue() 492 493 def test_sm_without_load(self, repl): 494 """sm without loaded simulation should error (AC4.8).""" 495 output = io.StringIO() 496 repl.stdout = output 497 498 repl.do_sm("0") 499 500 assert "No simulation loaded" in output.getvalue() 501 502 def test_sm_after_load(self, repl, temp_dfasm_file): 503 """sm <id> should display SM state.""" 504 # Load first 505 output = io.StringIO() 506 repl.stdout = output 507 508 repl.do_load(str(temp_dfasm_file)) 509 assert repl._loaded 510 511 # Clear and call sm 512 output = io.StringIO() 513 repl.stdout = output 514 515 repl.do_sm("0") 516 517 out = output.getvalue() 518 # Should show SM state or not found message 519 assert len(out) > 0 520 521 def test_sm_invalid_id(self, repl, temp_dfasm_file): 522 """sm with non-integer ID should error.""" 523 # Load first 524 output = io.StringIO() 525 repl.stdout = output 526 527 repl.do_load(str(temp_dfasm_file)) 528 529 # Try invalid ID 530 output = io.StringIO() 531 repl.stdout = output 532 533 repl.do_sm("invalid_id") 534 535 assert "Invalid SM ID" in output.getvalue() 536 537 538class TestREPLLog: 539 """Tests for AC4.7: log command shows recent events.""" 540 541 def test_log_without_events(self, repl): 542 """log with no events should indicate no events.""" 543 output = io.StringIO() 544 repl.stdout = output 545 546 repl.do_log("") 547 548 assert "No events" in output.getvalue() 549 550 def test_log_after_step(self, repl, temp_dfasm_file): 551 """log after stepping should show events.""" 552 # Load and step 553 output = io.StringIO() 554 repl.stdout = output 555 556 repl.do_load(str(temp_dfasm_file)) 557 repl.do_step("") 558 559 # Now log 560 output = io.StringIO() 561 repl.stdout = output 562 563 repl.do_log("") 564 565 # Output depends on whether there were events 566 out = output.getvalue() 567 assert len(out) > 0 568 569 def test_log_filter_pe(self, repl, temp_dfasm_file): 570 """log pe:<id> should filter by PE component.""" 571 # Load and step a program with multiple components 572 output = io.StringIO() 573 repl.stdout = output 574 575 # Create a dfasm with multiple PEs to test filtering 576 import tempfile 577 from pathlib import Path 578 with tempfile.NamedTemporaryFile(mode='w', suffix='.dfasm', delete=False) as f: 579 f.write("""\ 580@system pe=2, sm=1 581&c0|pe0 <| const, 1 582&c1|pe1 <| const, 2 583&pass0|pe0 <| pass 584&pass1|pe1 <| pass 585&c0|pe0 |> &pass0|pe0:L 586&c1|pe1 |> &pass1|pe1:L 587""") 588 temp_file = f.name 589 590 try: 591 repl.do_load(temp_file) 592 # Run simulation to capture events (cycle-accurate timing starts events at time 1+) 593 repl.do_run("100") 594 595 # Filter by PE 0 596 output = io.StringIO() 597 repl.stdout = output 598 repl.do_log("pe:0") 599 600 # Should complete without exception and contain pe:0 events 601 output_str = output.getvalue() 602 assert len(output_str) > 0, "Expected output from filtering" 603 # The filtered output should contain references to pe:0 604 assert "pe:0" in output_str or "PE 0" in output_str, \ 605 "Expected filtered output to contain pe:0" 606 # Verify that filtering actually excludes pe:1 607 assert "pe:1" not in output_str and "PE 1" not in output_str, \ 608 "Expected filtered output to exclude pe:1" 609 finally: 610 Path(temp_file).unlink(missing_ok=True) 611 612 def test_log_filter_sm(self, repl, temp_dfasm_file): 613 """log sm:<id> should filter by SM component.""" 614 # Load and step a program with SM operations 615 output = io.StringIO() 616 repl.stdout = output 617 618 # Create a dfasm with SM operations to generate SM events 619 import tempfile 620 from pathlib import Path 621 with tempfile.NamedTemporaryFile(mode='w', suffix='.dfasm', delete=False) as f: 622 f.write("""\ 623@system pe=2, sm=2 624&c0|pe0 <| const, 100 625&write_op|pe0 <| write 626&c1|pe1 <| const, 200 627&read_op|pe1 <| read 628&c0|pe0 |> &write_op|pe0:L 629&c1|pe1 |> &read_op|pe1:L 630""") 631 temp_file = f.name 632 633 try: 634 repl.do_load(temp_file) 635 # Step multiple times to ensure events are generated 636 for _ in range(5): 637 repl.do_step("") 638 639 # Filter by SM 0 640 output = io.StringIO() 641 repl.stdout = output 642 repl.do_log("sm:0") 643 644 # Should complete without exception 645 output_str = output.getvalue() 646 assert len(output_str) > 0, "Expected output from SM filtering" 647 # Verify that sm:1 is NOT in the filtered output (exclusion check) 648 assert "sm:1" not in output_str, \ 649 "Expected filtered output (sm:0) to exclude sm:1 events" 650 finally: 651 Path(temp_file).unlink(missing_ok=True) 652 653 def test_log_filter_by_type(self, repl, temp_dfasm_file): 654 """log <type> should filter by event type name.""" 655 # Load and step 656 output = io.StringIO() 657 repl.stdout = output 658 659 repl.do_load(str(temp_dfasm_file)) 660 repl.do_step("") 661 662 # Filter by type 663 output = io.StringIO() 664 repl.stdout = output 665 666 repl.do_log("TokenReceived") 667 668 # Should complete without exception 669 assert len(output.getvalue()) > 0 670 671 672class TestREPLTime: 673 """Tests for time command.""" 674 675 def test_time_shows_current_time(self, repl, temp_dfasm_file): 676 """time should show current simulation time.""" 677 # Load first 678 output = io.StringIO() 679 repl.stdout = output 680 681 repl.do_load(str(temp_dfasm_file)) 682 683 # Check time 684 output = io.StringIO() 685 repl.stdout = output 686 687 repl.do_time("") 688 689 assert "Simulation time" in output.getvalue() 690 691 692class TestREPLReset: 693 """Tests for AC4.9: reset command clears simulation state.""" 694 695 def test_reset_clears_state(self, repl, temp_dfasm_file): 696 """reset should clear loaded state and event history.""" 697 # Load first 698 output = io.StringIO() 699 repl.stdout = output 700 701 repl.do_load(str(temp_dfasm_file)) 702 assert repl._loaded 703 704 # Reset 705 output = io.StringIO() 706 repl.stdout = output 707 708 repl.do_reset("") 709 710 assert repl._loaded is False 711 assert len(repl._event_history) == 0 712 assert "reset" in output.getvalue() 713 714 def test_step_after_reset_errors(self, repl, temp_dfasm_file): 715 """step after reset should fail (AC4.8).""" 716 # Load and reset 717 output = io.StringIO() 718 repl.stdout = output 719 720 repl.do_load(str(temp_dfasm_file)) 721 repl.do_reset("") 722 723 # Try step 724 output = io.StringIO() 725 repl.stdout = output 726 727 repl.do_step("") 728 729 assert "No simulation loaded" in output.getvalue() 730 731 732class TestREPLQuit: 733 """Tests for AC4.9: quit command exits cleanly.""" 734 735 def test_quit_returns_true(self, repl): 736 """do_quit should return True to exit cmdloop.""" 737 result = repl.do_quit("") 738 assert result is True 739 740 def test_eof_returns_true(self, repl): 741 """do_EOF should return True for Ctrl-D.""" 742 result = repl.do_EOF("") 743 assert result is True 744 745 746class TestTokenParsing: 747 """Tests for token parsing in send/inject.""" 748 749 def test_parse_monad_token(self, repl): 750 """Parse MonadToken from args.""" 751 token = repl._parse_token_args("0 10 5 42") 752 assert isinstance(token, MonadToken) 753 assert token.target == 0 754 assert token.offset == 10 755 assert token.act_id == 5 756 assert token.data == 42 757 assert token.inline is False 758 759 def test_parse_sm_token_read(self, repl): 760 """Parse SMToken with READ op.""" 761 token = repl._parse_token_args("1 --sm 20 READ") 762 assert isinstance(token, SMToken) 763 assert token.target == 1 764 assert token.addr == 20 765 assert token.op == MemOp.READ 766 assert token.data is None 767 768 def test_parse_sm_token_with_data(self, repl): 769 """Parse SMToken with data.""" 770 token = repl._parse_token_args("1 --sm 20 WRITE 999") 771 assert isinstance(token, SMToken) 772 assert token.target == 1 773 assert token.addr == 20 774 assert token.op == MemOp.WRITE 775 assert token.data == 999 776 777 def test_parse_invalid_target(self, repl): 778 """Parse should fail on invalid target.""" 779 with pytest.raises(ValueError): 780 repl._parse_token_args("not_a_number 0 0 0") 781 782 def test_parse_monad_missing_args(self, repl): 783 """Parse should fail if MonadToken args are incomplete.""" 784 with pytest.raises(ValueError): 785 repl._parse_token_args("0 10 5") # Missing data 786 787 def test_parse_sm_missing_args(self, repl): 788 """Parse should fail if SMToken args are incomplete.""" 789 with pytest.raises(ValueError): 790 repl._parse_token_args("0 --sm 20") # Missing op 791 792 def test_parse_sm_unknown_op(self, repl): 793 """Parse should fail on unknown MemOp.""" 794 with pytest.raises(ValueError, match="Unknown MemOp"): 795 repl._parse_token_args("0 --sm 20 INVALID_OP")