OR-1 dataflow CPU sketch
1"""Unit tests for monitor REPL functionality.
2
3Tests cover all acceptance criteria for AC4.1-AC4.9, verifying REPL
4commands work correctly with a synchronized backend and proper
5state management.
6"""
7
8from __future__ import annotations
9
10import io
11import sys
12from pathlib import Path
13from tempfile import NamedTemporaryFile
14
15import pytest
16
17from cm_inst import MemOp
18from monitor.backend import SimulationBackend
19from monitor.repl import MonitorREPL
20from tokens import MonadToken, SMToken
21
22
23@pytest.fixture
24def temp_dfasm_file():
25 """Create a temporary dfasm file for testing.
26
27 A minimal valid dfasm program that loads without errors.
28 """
29 content = """@system pe=1, sm=0
30&c1 <| const, 3
31&c2 <| const, 7
32&result <| add
33&c1 |> &result:L
34&c2 |> &result:R
35"""
36 with NamedTemporaryFile(mode="w", suffix=".dfasm", delete=False) as f:
37 f.write(content)
38 f.flush()
39 path = Path(f.name)
40
41 yield path
42
43 # Cleanup
44 path.unlink()
45
46
47@pytest.fixture
48def backend():
49 """Create and start a simulation backend for testing."""
50 b = SimulationBackend()
51 b.start()
52 yield b
53 b.stop()
54
55
56@pytest.fixture
57def repl(backend):
58 """Create a REPL instance with a test backend."""
59 return MonitorREPL(backend)
60
61
62class TestREPLLoad:
63 """Tests for AC4.1: load command assembles and loads dfasm."""
64
65 def test_load_valid_file(self, repl, temp_dfasm_file):
66 """load <path> should load a valid dfasm file."""
67 # Capture output
68 output = io.StringIO()
69 repl.stdout = output
70
71 # Call do_load
72 repl.do_load(str(temp_dfasm_file))
73
74 # Check state
75 assert repl._loaded is True
76 assert repl._last_snapshot is not None
77 assert "Loaded" in output.getvalue()
78
79 def test_load_nonexistent_file(self, repl):
80 """load with nonexistent path should fail gracefully."""
81 output = io.StringIO()
82 repl.stdout = output
83
84 repl.do_load("/nonexistent/path.dfasm")
85
86 assert repl._loaded is False
87 assert "not found" in output.getvalue()
88
89 def test_load_invalid_syntax(self, repl):
90 """load with invalid dfasm syntax should report error."""
91 # Create invalid file
92 with NamedTemporaryFile(mode="w", suffix=".dfasm", delete=False) as f:
93 f.write("@@@ invalid syntax @@@")
94 f.flush()
95 path = Path(f.name)
96
97 try:
98 output = io.StringIO()
99 repl.stdout = output
100
101 repl.do_load(str(path))
102
103 assert repl._loaded is False
104 out = output.getvalue()
105 assert "Error" in out
106 finally:
107 path.unlink()
108
109 def test_load_without_args(self, repl):
110 """load without args should print usage."""
111 output = io.StringIO()
112 repl.stdout = output
113
114 repl.do_load("")
115
116 assert "Usage" in output.getvalue()
117
118
119class TestREPLStep:
120 """Tests for AC4.2: step and stepe commands advance simulation."""
121
122 def test_step_without_load(self, repl):
123 """step without loaded simulation should error (AC4.8)."""
124 output = io.StringIO()
125 repl.stdout = output
126
127 repl.do_step("")
128
129 assert "No simulation loaded" in output.getvalue()
130
131 def test_step_after_load(self, repl, temp_dfasm_file):
132 """step after loading should advance simulation."""
133 # Load first
134 output = io.StringIO()
135 repl.stdout = output
136
137 repl.do_load(str(temp_dfasm_file))
138 assert repl._loaded
139
140 # Clear and step
141 output = io.StringIO()
142 repl.stdout = output
143
144 repl.do_step("")
145
146 # Should produce output (snapshot and/or events)
147 assert len(output.getvalue()) > 0
148
149 def test_stepe_after_load(self, repl, temp_dfasm_file):
150 """stepe after loading should advance by one event."""
151 # Load first
152 output = io.StringIO()
153 repl.stdout = output
154
155 repl.do_load(str(temp_dfasm_file))
156 assert repl._loaded
157
158 # Clear and step
159 output = io.StringIO()
160 repl.stdout = output
161
162 repl.do_stepe("")
163
164 # Should produce output
165 assert len(output.getvalue()) > 0
166
167 def test_stepe_without_load(self, repl):
168 """stepe without loaded simulation should error (AC4.8)."""
169 output = io.StringIO()
170 repl.stdout = output
171
172 repl.do_stepe("")
173
174 assert "No simulation loaded" in output.getvalue()
175
176
177class TestREPLRun:
178 """Tests for AC4.3: run command runs until target time."""
179
180 def test_run_without_load(self, repl):
181 """run without loaded simulation should error (AC4.8)."""
182 output = io.StringIO()
183 repl.stdout = output
184
185 repl.do_run("10")
186
187 assert "No simulation loaded" in output.getvalue()
188
189 def test_run_after_load(self, repl, temp_dfasm_file):
190 """run <until> should run until target time."""
191 # Load first
192 output = io.StringIO()
193 repl.stdout = output
194
195 repl.do_load(str(temp_dfasm_file))
196 assert repl._loaded
197
198 # Clear and run
199 output = io.StringIO()
200 repl.stdout = output
201
202 repl.do_run("10")
203
204 # Should produce output
205 assert len(output.getvalue()) > 0
206
207 def test_run_invalid_time(self, repl, temp_dfasm_file):
208 """run with invalid time should error."""
209 # Load first
210 output = io.StringIO()
211 repl.stdout = output
212
213 repl.do_load(str(temp_dfasm_file))
214
215 # Try invalid time
216 output = io.StringIO()
217 repl.stdout = output
218
219 repl.do_run("not_a_number")
220
221 assert "Invalid time" in output.getvalue()
222
223 def test_run_without_args(self, repl, temp_dfasm_file):
224 """run without args should print usage."""
225 # Load first
226 output = io.StringIO()
227 repl.stdout = output
228
229 repl.do_load(str(temp_dfasm_file))
230
231 # Try without args
232 output = io.StringIO()
233 repl.stdout = output
234
235 repl.do_run("")
236
237 assert "Usage" in output.getvalue()
238
239
240class TestREPLSend:
241 """Tests for AC4.4: send command injects token via FIFO."""
242
243 def test_send_monad_without_load(self, repl):
244 """send without loaded simulation should error (AC4.8)."""
245 output = io.StringIO()
246 repl.stdout = output
247
248 repl.do_send("0 0 0 42")
249
250 assert "No simulation loaded" in output.getvalue()
251
252 def test_send_monad_token(self, repl, temp_dfasm_file):
253 """send <target> <offset> <ctx> <data> should send MonadToken."""
254 # Load first
255 output = io.StringIO()
256 repl.stdout = output
257
258 repl.do_load(str(temp_dfasm_file))
259 assert repl._loaded
260
261 # Clear and send
262 output = io.StringIO()
263 repl.stdout = output
264
265 repl.do_send("0 0 0 42")
266
267 # Should produce output (no exception)
268 assert "Sent" in output.getvalue()
269
270 def test_send_sm_token(self, repl, temp_dfasm_file):
271 """send <target> --sm <addr> <op> should send SMToken."""
272 # Load first
273 output = io.StringIO()
274 repl.stdout = output
275
276 repl.do_load(str(temp_dfasm_file))
277 assert repl._loaded
278
279 # Clear and send SM token
280 output = io.StringIO()
281 repl.stdout = output
282
283 repl.do_send("0 --sm 5 READ")
284
285 # Should produce output (no exception)
286 assert "Sent" in output.getvalue()
287
288 def test_send_sm_token_with_data(self, repl, temp_dfasm_file):
289 """send <target> --sm <addr> <op> <data> should work."""
290 # Load first
291 output = io.StringIO()
292 repl.stdout = output
293
294 repl.do_load(str(temp_dfasm_file))
295 assert repl._loaded
296
297 # Send SM token with data
298 output = io.StringIO()
299 repl.stdout = output
300
301 repl.do_send("0 --sm 5 WRITE 123")
302
303 assert "Sent" in output.getvalue()
304
305 def test_send_invalid_args(self, repl, temp_dfasm_file):
306 """send with invalid args should error gracefully."""
307 # Load first
308 output = io.StringIO()
309 repl.stdout = output
310
311 repl.do_load(str(temp_dfasm_file))
312
313 # Try invalid args
314 output = io.StringIO()
315 repl.stdout = output
316
317 repl.do_send("not_an_int")
318
319 assert "Error" in output.getvalue()
320
321 def test_send_unknown_memop(self, repl, temp_dfasm_file):
322 """send with unknown MemOp should error."""
323 # Load first
324 output = io.StringIO()
325 repl.stdout = output
326
327 repl.do_load(str(temp_dfasm_file))
328
329 # Try unknown MemOp
330 output = io.StringIO()
331 repl.stdout = output
332
333 repl.do_send("0 --sm 5 UNKNOWN_OP")
334
335 assert "Unknown MemOp" in output.getvalue()
336
337 def test_send_without_args(self, repl, temp_dfasm_file):
338 """send without args should print usage."""
339 # Load first
340 output = io.StringIO()
341 repl.stdout = output
342
343 repl.do_load(str(temp_dfasm_file))
344
345 # Try without args
346 output = io.StringIO()
347 repl.stdout = output
348
349 repl.do_send("")
350
351 assert "Usage" in output.getvalue()
352
353
354class TestREPLInject:
355 """Tests for AC4.5: inject command does direct injection."""
356
357 def test_inject_monad_without_load(self, repl):
358 """inject without loaded simulation should error (AC4.8)."""
359 output = io.StringIO()
360 repl.stdout = output
361
362 repl.do_inject("0 0 0 42")
363
364 assert "No simulation loaded" in output.getvalue()
365
366 def test_inject_monad_token(self, repl, temp_dfasm_file):
367 """inject <target> <offset> <ctx> <data> should inject MonadToken."""
368 # Load first
369 output = io.StringIO()
370 repl.stdout = output
371
372 repl.do_load(str(temp_dfasm_file))
373 assert repl._loaded
374
375 # Clear and inject
376 output = io.StringIO()
377 repl.stdout = output
378
379 repl.do_inject("0 0 0 42")
380
381 # Should produce output (no exception)
382 assert "Injected" in output.getvalue()
383
384 def test_inject_sm_token(self, repl):
385 """inject <target> --sm <addr> <op> should inject SMToken without throwing."""
386 # For this test, we just verify token parsing works
387 # Actual injection will fail without SM, but parsing should succeed
388 output = io.StringIO()
389 repl.stdout = output
390
391 # Set loaded manually just to bypass the check
392 repl._loaded = True
393
394 # Just verify the token parsing happens without exception
395 token = repl._parse_token_args("0 --sm 5 WRITE")
396 assert token.addr == 5
397
398 def test_inject_invalid_args(self, repl, temp_dfasm_file):
399 """inject with invalid args should error gracefully."""
400 # Load first
401 output = io.StringIO()
402 repl.stdout = output
403
404 repl.do_load(str(temp_dfasm_file))
405
406 # Try invalid args
407 output = io.StringIO()
408 repl.stdout = output
409
410 repl.do_inject("invalid")
411
412 assert "Error" in output.getvalue()
413
414
415class TestREPLState:
416 """Tests for AC4.6: state, pe, sm commands display readable state."""
417
418 def test_state_without_load(self, repl):
419 """state without loaded simulation should error (AC4.8)."""
420 output = io.StringIO()
421 repl.stdout = output
422
423 repl.do_state("")
424
425 assert "No simulation loaded" in output.getvalue()
426
427 def test_state_after_load(self, repl, temp_dfasm_file):
428 """state should display system state summary."""
429 # Load first
430 output = io.StringIO()
431 repl.stdout = output
432
433 repl.do_load(str(temp_dfasm_file))
434 assert repl._loaded
435
436 # Clear and call state
437 output = io.StringIO()
438 repl.stdout = output
439
440 repl.do_state("")
441
442 out = output.getvalue()
443 # Should show PE/SM counts and time info
444 assert len(out) > 0
445 assert ("PE" in out or "SM" in out or "Time" in out)
446
447 def test_pe_without_load(self, repl):
448 """pe without loaded simulation should error (AC4.8)."""
449 output = io.StringIO()
450 repl.stdout = output
451
452 repl.do_pe("0")
453
454 assert "No simulation loaded" in output.getvalue()
455
456 def test_pe_after_load(self, repl, temp_dfasm_file):
457 """pe <id> should display PE state."""
458 # Load first
459 output = io.StringIO()
460 repl.stdout = output
461
462 repl.do_load(str(temp_dfasm_file))
463 assert repl._loaded
464
465 # Clear and call pe
466 output = io.StringIO()
467 repl.stdout = output
468
469 repl.do_pe("0")
470
471 out = output.getvalue()
472 # Should show PE state or not found message
473 assert len(out) > 0
474 # Verify formatting includes lane information or empty tag store marker (case-insensitive)
475 assert "lane" in out.lower() or "tag store: (empty)" in out.lower()
476
477 def test_pe_invalid_id(self, repl, temp_dfasm_file):
478 """pe with non-integer ID should error."""
479 # Load first
480 output = io.StringIO()
481 repl.stdout = output
482
483 repl.do_load(str(temp_dfasm_file))
484
485 # Try invalid ID
486 output = io.StringIO()
487 repl.stdout = output
488
489 repl.do_pe("invalid_id")
490
491 assert "Invalid PE ID" in output.getvalue()
492
493 def test_sm_without_load(self, repl):
494 """sm without loaded simulation should error (AC4.8)."""
495 output = io.StringIO()
496 repl.stdout = output
497
498 repl.do_sm("0")
499
500 assert "No simulation loaded" in output.getvalue()
501
502 def test_sm_after_load(self, repl, temp_dfasm_file):
503 """sm <id> should display SM state."""
504 # Load first
505 output = io.StringIO()
506 repl.stdout = output
507
508 repl.do_load(str(temp_dfasm_file))
509 assert repl._loaded
510
511 # Clear and call sm
512 output = io.StringIO()
513 repl.stdout = output
514
515 repl.do_sm("0")
516
517 out = output.getvalue()
518 # Should show SM state or not found message
519 assert len(out) > 0
520
521 def test_sm_invalid_id(self, repl, temp_dfasm_file):
522 """sm with non-integer ID should error."""
523 # Load first
524 output = io.StringIO()
525 repl.stdout = output
526
527 repl.do_load(str(temp_dfasm_file))
528
529 # Try invalid ID
530 output = io.StringIO()
531 repl.stdout = output
532
533 repl.do_sm("invalid_id")
534
535 assert "Invalid SM ID" in output.getvalue()
536
537
538class TestREPLLog:
539 """Tests for AC4.7: log command shows recent events."""
540
541 def test_log_without_events(self, repl):
542 """log with no events should indicate no events."""
543 output = io.StringIO()
544 repl.stdout = output
545
546 repl.do_log("")
547
548 assert "No events" in output.getvalue()
549
550 def test_log_after_step(self, repl, temp_dfasm_file):
551 """log after stepping should show events."""
552 # Load and step
553 output = io.StringIO()
554 repl.stdout = output
555
556 repl.do_load(str(temp_dfasm_file))
557 repl.do_step("")
558
559 # Now log
560 output = io.StringIO()
561 repl.stdout = output
562
563 repl.do_log("")
564
565 # Output depends on whether there were events
566 out = output.getvalue()
567 assert len(out) > 0
568
569 def test_log_filter_pe(self, repl, temp_dfasm_file):
570 """log pe:<id> should filter by PE component."""
571 # Load and step a program with multiple components
572 output = io.StringIO()
573 repl.stdout = output
574
575 # Create a dfasm with multiple PEs to test filtering
576 import tempfile
577 from pathlib import Path
578 with tempfile.NamedTemporaryFile(mode='w', suffix='.dfasm', delete=False) as f:
579 f.write("""\
580@system pe=2, sm=1
581&c0|pe0 <| const, 1
582&c1|pe1 <| const, 2
583&pass0|pe0 <| pass
584&pass1|pe1 <| pass
585&c0|pe0 |> &pass0|pe0:L
586&c1|pe1 |> &pass1|pe1:L
587""")
588 temp_file = f.name
589
590 try:
591 repl.do_load(temp_file)
592 # Run simulation to capture events (cycle-accurate timing starts events at time 1+)
593 repl.do_run("100")
594
595 # Filter by PE 0
596 output = io.StringIO()
597 repl.stdout = output
598 repl.do_log("pe:0")
599
600 # Should complete without exception and contain pe:0 events
601 output_str = output.getvalue()
602 assert len(output_str) > 0, "Expected output from filtering"
603 # The filtered output should contain references to pe:0
604 assert "pe:0" in output_str or "PE 0" in output_str, \
605 "Expected filtered output to contain pe:0"
606 # Verify that filtering actually excludes pe:1
607 assert "pe:1" not in output_str and "PE 1" not in output_str, \
608 "Expected filtered output to exclude pe:1"
609 finally:
610 Path(temp_file).unlink(missing_ok=True)
611
612 def test_log_filter_sm(self, repl, temp_dfasm_file):
613 """log sm:<id> should filter by SM component."""
614 # Load and step a program with SM operations
615 output = io.StringIO()
616 repl.stdout = output
617
618 # Create a dfasm with SM operations to generate SM events
619 import tempfile
620 from pathlib import Path
621 with tempfile.NamedTemporaryFile(mode='w', suffix='.dfasm', delete=False) as f:
622 f.write("""\
623@system pe=2, sm=2
624&c0|pe0 <| const, 100
625&write_op|pe0 <| write
626&c1|pe1 <| const, 200
627&read_op|pe1 <| read
628&c0|pe0 |> &write_op|pe0:L
629&c1|pe1 |> &read_op|pe1:L
630""")
631 temp_file = f.name
632
633 try:
634 repl.do_load(temp_file)
635 # Step multiple times to ensure events are generated
636 for _ in range(5):
637 repl.do_step("")
638
639 # Filter by SM 0
640 output = io.StringIO()
641 repl.stdout = output
642 repl.do_log("sm:0")
643
644 # Should complete without exception
645 output_str = output.getvalue()
646 assert len(output_str) > 0, "Expected output from SM filtering"
647 # Verify that sm:1 is NOT in the filtered output (exclusion check)
648 assert "sm:1" not in output_str, \
649 "Expected filtered output (sm:0) to exclude sm:1 events"
650 finally:
651 Path(temp_file).unlink(missing_ok=True)
652
653 def test_log_filter_by_type(self, repl, temp_dfasm_file):
654 """log <type> should filter by event type name."""
655 # Load and step
656 output = io.StringIO()
657 repl.stdout = output
658
659 repl.do_load(str(temp_dfasm_file))
660 repl.do_step("")
661
662 # Filter by type
663 output = io.StringIO()
664 repl.stdout = output
665
666 repl.do_log("TokenReceived")
667
668 # Should complete without exception
669 assert len(output.getvalue()) > 0
670
671
672class TestREPLTime:
673 """Tests for time command."""
674
675 def test_time_shows_current_time(self, repl, temp_dfasm_file):
676 """time should show current simulation time."""
677 # Load first
678 output = io.StringIO()
679 repl.stdout = output
680
681 repl.do_load(str(temp_dfasm_file))
682
683 # Check time
684 output = io.StringIO()
685 repl.stdout = output
686
687 repl.do_time("")
688
689 assert "Simulation time" in output.getvalue()
690
691
692class TestREPLReset:
693 """Tests for AC4.9: reset command clears simulation state."""
694
695 def test_reset_clears_state(self, repl, temp_dfasm_file):
696 """reset should clear loaded state and event history."""
697 # Load first
698 output = io.StringIO()
699 repl.stdout = output
700
701 repl.do_load(str(temp_dfasm_file))
702 assert repl._loaded
703
704 # Reset
705 output = io.StringIO()
706 repl.stdout = output
707
708 repl.do_reset("")
709
710 assert repl._loaded is False
711 assert len(repl._event_history) == 0
712 assert "reset" in output.getvalue()
713
714 def test_step_after_reset_errors(self, repl, temp_dfasm_file):
715 """step after reset should fail (AC4.8)."""
716 # Load and reset
717 output = io.StringIO()
718 repl.stdout = output
719
720 repl.do_load(str(temp_dfasm_file))
721 repl.do_reset("")
722
723 # Try step
724 output = io.StringIO()
725 repl.stdout = output
726
727 repl.do_step("")
728
729 assert "No simulation loaded" in output.getvalue()
730
731
732class TestREPLQuit:
733 """Tests for AC4.9: quit command exits cleanly."""
734
735 def test_quit_returns_true(self, repl):
736 """do_quit should return True to exit cmdloop."""
737 result = repl.do_quit("")
738 assert result is True
739
740 def test_eof_returns_true(self, repl):
741 """do_EOF should return True for Ctrl-D."""
742 result = repl.do_EOF("")
743 assert result is True
744
745
746class TestTokenParsing:
747 """Tests for token parsing in send/inject."""
748
749 def test_parse_monad_token(self, repl):
750 """Parse MonadToken from args."""
751 token = repl._parse_token_args("0 10 5 42")
752 assert isinstance(token, MonadToken)
753 assert token.target == 0
754 assert token.offset == 10
755 assert token.act_id == 5
756 assert token.data == 42
757 assert token.inline is False
758
759 def test_parse_sm_token_read(self, repl):
760 """Parse SMToken with READ op."""
761 token = repl._parse_token_args("1 --sm 20 READ")
762 assert isinstance(token, SMToken)
763 assert token.target == 1
764 assert token.addr == 20
765 assert token.op == MemOp.READ
766 assert token.data is None
767
768 def test_parse_sm_token_with_data(self, repl):
769 """Parse SMToken with data."""
770 token = repl._parse_token_args("1 --sm 20 WRITE 999")
771 assert isinstance(token, SMToken)
772 assert token.target == 1
773 assert token.addr == 20
774 assert token.op == MemOp.WRITE
775 assert token.data == 999
776
777 def test_parse_invalid_target(self, repl):
778 """Parse should fail on invalid target."""
779 with pytest.raises(ValueError):
780 repl._parse_token_args("not_a_number 0 0 0")
781
782 def test_parse_monad_missing_args(self, repl):
783 """Parse should fail if MonadToken args are incomplete."""
784 with pytest.raises(ValueError):
785 repl._parse_token_args("0 10 5") # Missing data
786
787 def test_parse_sm_missing_args(self, repl):
788 """Parse should fail if SMToken args are incomplete."""
789 with pytest.raises(ValueError):
790 repl._parse_token_args("0 --sm 20") # Missing op
791
792 def test_parse_sm_unknown_op(self, repl):
793 """Parse should fail on unknown MemOp."""
794 with pytest.raises(ValueError, match="Unknown MemOp"):
795 repl._parse_token_args("0 --sm 20 INVALID_OP")