personal memory agent
at main 674 lines 20 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4import importlib 5import io 6import os 7import subprocess 8import sys 9from unittest.mock import MagicMock 10 11import pytest 12 13 14@pytest.mark.asyncio 15async def test_send_notification(monkeypatch): 16 mod = importlib.import_module("think.supervisor") 17 called = [] 18 19 class FakeNotifier: 20 async def send(self, title, message, urgency): 21 called.append({"title": title, "message": message, "urgency": urgency}) 22 return "test-notification-id" 23 24 def fake_get_notifier(): 25 return FakeNotifier() 26 27 monkeypatch.setattr(mod, "_get_notifier", fake_get_notifier) 28 await mod.send_notification("test message", alert_key=("test", "key")) 29 assert len(called) == 1 30 assert called[0]["message"] == "test message" 31 assert called[0]["title"] == "solstone Supervisor" 32 assert ("test", "key") in mod._notification_ids 33 assert mod._notification_ids[("test", "key")] == "test-notification-id" 34 35 36@pytest.mark.asyncio 37async def test_clear_notification(monkeypatch): 38 mod = importlib.import_module("think.supervisor") 39 cleared = [] 40 41 class FakeNotifier: 42 async def send(self, title, message, urgency): 43 return "test-notification-id" 44 45 async def clear(self, notification_id): 46 cleared.append(notification_id) 47 48 def fake_get_notifier(): 49 return FakeNotifier() 50 51 monkeypatch.setattr(mod, "_get_notifier", fake_get_notifier) 52 53 # First send a notification to track 54 await mod.send_notification("test message", alert_key=("test", "key")) 55 assert ("test", "key") in mod._notification_ids 56 57 # Now clear it 58 await mod.clear_notification(("test", "key")) 59 assert len(cleared) == 1 60 assert cleared[0] == "test-notification-id" 61 assert ("test", "key") not in mod._notification_ids 62 63 # Clearing a non-existent notification should be a no-op 64 await mod.clear_notification(("nonexistent", "key")) 65 assert len(cleared) == 1 # Still just one clear call 66 67 68def test_start_sense(tmp_path, mock_callosum, monkeypatch): 69 """Test that sense launches correctly.""" 70 mod = importlib.import_module("think.supervisor") 71 72 started = [] 73 74 class DummyProc: 75 def __init__(self): 76 self.stdout = io.StringIO() 77 self.stderr = io.StringIO() 78 self.pid = 12345 79 80 def terminate(self): 81 pass 82 83 def wait(self, timeout=None): 84 pass 85 86 def fake_popen( 87 cmd, 88 stdout=None, 89 stderr=None, 90 text=False, 91 bufsize=-1, 92 start_new_session=False, 93 env=None, 94 ): 95 proc = DummyProc() 96 started.append((cmd, stdout, stderr)) 97 return proc 98 99 monkeypatch.setattr(mod.subprocess, "Popen", fake_popen) 100 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 101 102 # Test start_sense() 103 sense_proc = mod.start_sense() 104 assert sense_proc is not None 105 assert any(cmd == ["sol", "sense", "-v"] for cmd, _, _ in started) 106 107 # Check that stdout and stderr capture pipes 108 for cmd, stdout, stderr in started: 109 assert stdout == subprocess.PIPE 110 assert stderr == subprocess.PIPE 111 112 113def test_start_sync(tmp_path, mock_callosum, monkeypatch): 114 """Test that start_sync() launches sol sync with remote URL.""" 115 mod = importlib.import_module("think.supervisor") 116 117 started = [] 118 119 class DummyProc: 120 def __init__(self): 121 self.stdout = io.StringIO() 122 self.stderr = io.StringIO() 123 self.pid = 12345 124 125 def terminate(self): 126 pass 127 128 def wait(self, timeout=None): 129 pass 130 131 def fake_popen( 132 cmd, 133 stdout=None, 134 stderr=None, 135 text=False, 136 bufsize=-1, 137 start_new_session=False, 138 env=None, 139 ): 140 proc = DummyProc() 141 started.append((cmd, stdout, stderr)) 142 return proc 143 144 monkeypatch.setattr(mod.subprocess, "Popen", fake_popen) 145 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 146 147 # Test start_sync() 148 remote_url = "https://server:5000/app/observer/ingest/abc123" 149 sync_proc = mod.start_sync(remote_url) 150 assert sync_proc is not None 151 152 # Verify the command includes --remote with the URL 153 sync_cmds = [cmd for cmd, _, _ in started if "sync" in cmd] 154 assert len(sync_cmds) == 1 155 cmd = sync_cmds[0] 156 assert cmd == ["sol", "sync", "-v", "--remote", remote_url] 157 158 159def test_parse_args_remote_flag(): 160 """Test that parse_args includes --remote flag.""" 161 mod = importlib.reload(importlib.import_module("think.supervisor")) 162 163 parser = mod.parse_args() 164 args = parser.parse_args(["--remote", "https://server/ingest/key"]) 165 166 assert args.remote == "https://server/ingest/key" 167 168 169def test_parse_args_remote_flag_optional(): 170 """Test that --remote is optional.""" 171 mod = importlib.reload(importlib.import_module("think.supervisor")) 172 173 parser = mod.parse_args() 174 args = parser.parse_args([]) 175 176 assert args.remote is None 177 178 179def test_shutdown_stops_in_reverse_order(monkeypatch): 180 """Shutdown stops services in reverse order.""" 181 mod = importlib.import_module("think.supervisor") 182 183 operations = [] 184 185 class MockProc: 186 def __init__(self, name): 187 self._name = name 188 189 def terminate(self): 190 operations.append(("terminate", self._name)) 191 192 def wait(self, timeout=None): 193 operations.append(("wait", self._name)) 194 195 def kill(self): 196 pass 197 198 def poll(self): 199 return None 200 201 class MockManaged: 202 def __init__(self, name): 203 self.name = name 204 self.process = MockProc(name) 205 self.shutdown_timeout = 15 206 207 def cleanup(self): 208 operations.append(("cleanup", self.name)) 209 210 procs = [ 211 MockManaged("convey"), 212 MockManaged("sense"), 213 MockManaged("cortex"), 214 ] 215 216 for managed in reversed(procs): 217 proc = managed.process 218 try: 219 proc.terminate() 220 except Exception: 221 pass 222 try: 223 proc.wait(timeout=managed.shutdown_timeout) 224 except Exception: 225 pass 226 managed.cleanup() 227 228 assert operations == [ 229 ("terminate", "cortex"), 230 ("wait", "cortex"), 231 ("cleanup", "cortex"), 232 ("terminate", "sense"), 233 ("wait", "sense"), 234 ("cleanup", "sense"), 235 ("terminate", "convey"), 236 ("wait", "convey"), 237 ("cleanup", "convey"), 238 ] 239 240 241def test_get_command_name(): 242 """Test command name extraction for queue serialization.""" 243 mod = importlib.import_module("think.supervisor") 244 get = mod.TaskQueue.get_command_name 245 246 # sol X -> X 247 assert get(["sol", "indexer", "--rescan"]) == "indexer" 248 assert get(["sol", "insight", "20240101"]) == "insight" 249 assert get(["sol", "dream", "--day", "20240101"]) == "dream" 250 251 # Other commands -> basename 252 assert get(["/usr/bin/python", "script.py"]) == "python" 253 assert get(["custom-tool"]) == "custom-tool" 254 255 # Empty -> unknown 256 assert get([]) == "unknown" 257 258 259def test_task_queue_same_command_queued(monkeypatch): 260 """Test that same command is queued when already running.""" 261 mod = importlib.import_module("think.supervisor") 262 263 # Create fresh task queue (no callback to avoid callosum events) 264 mod._task_queue = mod.TaskQueue(on_queue_change=None) 265 266 spawned = [] 267 268 def fake_thread_start(self): 269 spawned.append(self._target.__name__) 270 271 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 272 273 # First request - should run immediately 274 msg1 = { 275 "tract": "supervisor", 276 "event": "request", 277 "cmd": ["sol", "indexer", "--rescan"], 278 } 279 mod._handle_task_request(msg1) 280 281 assert "indexer" in mod._task_queue._running 282 assert len(spawned) == 1 283 284 # Second request (different args) - should be queued 285 msg2 = { 286 "tract": "supervisor", 287 "event": "request", 288 "cmd": ["sol", "indexer", "--rescan-full"], 289 } 290 mod._handle_task_request(msg2) 291 292 assert len(spawned) == 1 # No new spawn 293 assert "indexer" in mod._task_queue._queues 294 assert len(mod._task_queue._queues["indexer"]) == 1 295 # Queue entries are {refs, cmd} dicts (refs is a list for coalescing) 296 assert mod._task_queue._queues["indexer"][0]["cmd"] == [ 297 "sol", 298 "indexer", 299 "--rescan-full", 300 ] 301 assert len(mod._task_queue._queues["indexer"][0]["refs"]) == 1 302 303 304def test_task_queue_dedupe_exact_match(monkeypatch): 305 """Test that exact same command is deduped in queue.""" 306 mod = importlib.import_module("think.supervisor") 307 308 # Create fresh task queue (no callback to avoid callosum events) 309 mod._task_queue = mod.TaskQueue(on_queue_change=None) 310 311 spawned = [] 312 313 def fake_thread_start(self): 314 spawned.append(self._target.__name__) 315 316 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 317 318 # First request - runs 319 msg1 = { 320 "tract": "supervisor", 321 "event": "request", 322 "cmd": ["sol", "indexer", "--rescan"], 323 } 324 mod._handle_task_request(msg1) 325 326 # Second request (same cmd) - queued 327 msg2 = { 328 "tract": "supervisor", 329 "event": "request", 330 "cmd": ["sol", "indexer", "--rescan"], 331 } 332 mod._handle_task_request(msg2) 333 334 assert len(mod._task_queue._queues["indexer"]) == 1 335 336 # Third request (same cmd again) - deduped, not added 337 msg3 = { 338 "tract": "supervisor", 339 "event": "request", 340 "cmd": ["sol", "indexer", "--rescan"], 341 } 342 mod._handle_task_request(msg3) 343 344 assert len(mod._task_queue._queues["indexer"]) == 1 # Still just 1 345 346 347def test_task_queue_different_commands_independent(monkeypatch): 348 """Test that different commands have independent queues.""" 349 mod = importlib.import_module("think.supervisor") 350 351 # Create fresh task queue (no callback to avoid callosum events) 352 mod._task_queue = mod.TaskQueue(on_queue_change=None) 353 354 spawned = [] 355 356 def fake_thread_start(self): 357 spawned.append(self._target.__name__) 358 359 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 360 361 # Indexer request - runs 362 msg1 = { 363 "tract": "supervisor", 364 "event": "request", 365 "cmd": ["sol", "indexer", "--rescan"], 366 } 367 mod._handle_task_request(msg1) 368 369 # Insight request - also runs (different command) 370 msg2 = { 371 "tract": "supervisor", 372 "event": "request", 373 "cmd": ["sol", "insight", "20240101"], 374 } 375 mod._handle_task_request(msg2) 376 377 assert len(spawned) == 2 # Both spawned 378 assert "indexer" in mod._task_queue._running 379 assert "insight" in mod._task_queue._running 380 381 382def test_process_queue_spawns_next(monkeypatch): 383 """Test that _process_next spawns next queued task.""" 384 mod = importlib.import_module("think.supervisor") 385 386 # Create task queue with pre-set state 387 mod._task_queue = mod.TaskQueue(on_queue_change=None) 388 mod._task_queue._running = {"indexer": "ref123"} 389 mod._task_queue._queues = { 390 "indexer": [ 391 {"refs": ["queued-ref"], "cmd": ["sol", "indexer", "--rescan-full"]} 392 ] 393 } 394 395 spawned = [] 396 397 def fake_thread_start(self): 398 spawned.append(self._args) # Capture args (refs, cmd, cmd_name, callosum) 399 400 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 401 402 # Process queue 403 mod._task_queue._process_next("indexer") 404 405 # Should have spawned the queued task with its refs list 406 assert len(spawned) == 1 407 assert spawned[0][0] == ["queued-ref"] # refs list preserved from queue 408 assert spawned[0][1] == ["sol", "indexer", "--rescan-full"] # cmd 409 assert spawned[0][2] == "indexer" # cmd_name 410 411 # Queue should be empty now 412 assert mod._task_queue._queues["indexer"] == [] 413 414 415def test_process_queue_clears_running_when_empty(monkeypatch): 416 """Test that _process_next clears running state when queue is empty.""" 417 mod = importlib.import_module("think.supervisor") 418 419 # Create task queue with pre-set state (no queued tasks) 420 mod._task_queue = mod.TaskQueue(on_queue_change=None) 421 mod._task_queue._running = {"indexer": "ref123"} 422 mod._task_queue._queues = {"indexer": []} 423 424 spawned = [] 425 426 def fake_thread_start(self): 427 spawned.append(True) 428 429 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 430 431 # Process queue 432 mod._task_queue._process_next("indexer") 433 434 # No spawn (queue was empty) 435 assert len(spawned) == 0 436 437 # Running state should be cleared 438 assert "indexer" not in mod._task_queue._running 439 440 441def test_task_request_uses_caller_provided_ref(monkeypatch): 442 """Test that caller-provided ref is used and preserved through queue.""" 443 mod = importlib.import_module("think.supervisor") 444 445 # Create fresh task queue (no callback to avoid callosum events) 446 mod._task_queue = mod.TaskQueue(on_queue_change=None) 447 448 spawned = [] 449 450 def fake_thread_start(self): 451 spawned.append(self._args) # Capture args (refs, cmd, cmd_name, callosum) 452 453 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 454 455 # Request with caller-provided ref 456 msg = { 457 "tract": "supervisor", 458 "event": "request", 459 "cmd": ["sol", "indexer", "--rescan"], 460 "ref": "my-custom-ref-123", 461 } 462 mod._handle_task_request(msg) 463 464 # Should use the provided ref 465 assert mod._task_queue._running["indexer"] == "my-custom-ref-123" 466 assert spawned[0][0] == ["my-custom-ref-123"] # refs is a list 467 468 469def test_task_queue_preserves_caller_ref(monkeypatch): 470 """Test that queued requests preserve their caller-provided ref.""" 471 mod = importlib.import_module("think.supervisor") 472 473 # Create fresh task queue (no callback to avoid callosum events) 474 mod._task_queue = mod.TaskQueue(on_queue_change=None) 475 476 spawned = [] 477 478 def fake_thread_start(self): 479 spawned.append(self._args) 480 481 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 482 483 # First request runs immediately 484 msg1 = { 485 "tract": "supervisor", 486 "event": "request", 487 "cmd": ["sol", "indexer", "--rescan"], 488 "ref": "first-ref", 489 } 490 mod._handle_task_request(msg1) 491 492 # Second request gets queued with its own ref 493 msg2 = { 494 "tract": "supervisor", 495 "event": "request", 496 "cmd": ["sol", "indexer", "--rescan-full"], 497 "ref": "second-ref", 498 } 499 mod._handle_task_request(msg2) 500 501 # Verify queued entry has the caller's ref in refs list 502 assert len(mod._task_queue._queues["indexer"]) == 1 503 assert mod._task_queue._queues["indexer"][0]["refs"] == ["second-ref"] 504 assert mod._task_queue._queues["indexer"][0]["cmd"] == [ 505 "sol", 506 "indexer", 507 "--rescan-full", 508 ] 509 510 511def test_task_queue_coalesces_refs_on_dedupe(monkeypatch): 512 """Test that duplicate queued requests coalesce their refs.""" 513 mod = importlib.import_module("think.supervisor") 514 515 # Create fresh task queue (no callback to avoid callosum events) 516 mod._task_queue = mod.TaskQueue(on_queue_change=None) 517 518 spawned = [] 519 520 def fake_thread_start(self): 521 spawned.append(self._args) 522 523 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 524 525 # First request runs immediately 526 msg1 = { 527 "tract": "supervisor", 528 "event": "request", 529 "cmd": ["sol", "indexer", "--rescan"], 530 "ref": "first-ref", 531 } 532 mod._handle_task_request(msg1) 533 534 # Second request (same cmd) gets queued 535 msg2 = { 536 "tract": "supervisor", 537 "event": "request", 538 "cmd": ["sol", "indexer", "--rescan"], 539 "ref": "second-ref", 540 } 541 mod._handle_task_request(msg2) 542 543 # Third request (same cmd) should coalesce its ref into existing queue entry 544 msg3 = { 545 "tract": "supervisor", 546 "event": "request", 547 "cmd": ["sol", "indexer", "--rescan"], 548 "ref": "third-ref", 549 } 550 mod._handle_task_request(msg3) 551 552 # Should still be just one queue entry 553 assert len(mod._task_queue._queues["indexer"]) == 1 554 # But it should have both refs 555 assert mod._task_queue._queues["indexer"][0]["refs"] == [ 556 "second-ref", 557 "third-ref", 558 ] 559 560 561def test_process_queue_spawns_with_multiple_refs(monkeypatch): 562 """Test that dequeued task has all coalesced refs.""" 563 mod = importlib.import_module("think.supervisor") 564 565 # Create task queue with pre-set state (queued task with multiple refs) 566 mod._task_queue = mod.TaskQueue(on_queue_change=None) 567 mod._task_queue._running = {"indexer": "running-ref"} 568 mod._task_queue._queues = { 569 "indexer": [ 570 { 571 "refs": ["ref-A", "ref-B", "ref-C"], 572 "cmd": ["sol", "indexer", "--rescan"], 573 } 574 ] 575 } 576 577 spawned = [] 578 579 def fake_thread_start(self): 580 spawned.append(self._args) 581 582 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start) 583 584 # Process queue 585 mod._task_queue._process_next("indexer") 586 587 # Should spawn with all three refs 588 assert len(spawned) == 1 589 assert spawned[0][0] == ["ref-A", "ref-B", "ref-C"] # all refs passed 590 assert spawned[0][1] == ["sol", "indexer", "--rescan"] 591 592 593def test_supervisor_singleton_lock_acquired(tmp_path, monkeypatch): 594 mod = importlib.reload(importlib.import_module("think.supervisor")) 595 596 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 597 (tmp_path / "health").mkdir(parents=True, exist_ok=True) 598 monkeypatch.setattr(sys, "argv", ["supervisor"]) 599 600 def stop_after_lock(): 601 raise SystemExit(0) 602 603 monkeypatch.setattr(mod, "start_callosum_in_process", stop_after_lock) 604 605 with pytest.raises(SystemExit) as exc: 606 mod.main() 607 608 assert exc.value.code == 0 609 assert (tmp_path / "health" / "supervisor.lock").exists() 610 assert (tmp_path / "health" / "supervisor.pid").read_text().strip() == str( 611 os.getpid() 612 ) 613 614 615def test_supervisor_singleton_lock_blocked(tmp_path, monkeypatch, capsys): 616 import fcntl 617 618 mod = importlib.reload(importlib.import_module("think.supervisor")) 619 620 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 621 health_dir = tmp_path / "health" 622 health_dir.mkdir(parents=True, exist_ok=True) 623 lock_file = open(health_dir / "supervisor.lock", "w") 624 fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) 625 (health_dir / "supervisor.pid").write_text("12345") 626 monkeypatch.setattr(sys, "argv", ["supervisor"]) 627 628 start_mock = MagicMock() 629 monkeypatch.setattr(mod, "start_callosum_in_process", start_mock) 630 631 try: 632 with pytest.raises(SystemExit) as exc: 633 mod.main() 634 finally: 635 lock_file.close() 636 637 assert exc.value.code == 1 638 output = capsys.readouterr().out 639 assert "Supervisor already running" in output 640 assert "PID 12345" in output 641 start_mock.assert_not_called() 642 643 644def test_supervisor_singleton_lock_blocked_with_health(tmp_path, monkeypatch, capsys): 645 import fcntl 646 647 mod = importlib.reload(importlib.import_module("think.supervisor")) 648 649 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 650 health_dir = tmp_path / "health" 651 health_dir.mkdir(parents=True, exist_ok=True) 652 lock_file = open(health_dir / "supervisor.lock", "w") 653 fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB) 654 (health_dir / "supervisor.pid").write_text("12345") 655 (health_dir / "callosum.sock").touch() 656 monkeypatch.setattr(sys, "argv", ["supervisor"]) 657 658 start_mock = MagicMock() 659 health_mock = MagicMock(return_value=0) 660 monkeypatch.setattr(mod, "start_callosum_in_process", start_mock) 661 monkeypatch.setattr("think.health_cli.health_check", health_mock) 662 663 try: 664 with pytest.raises(SystemExit) as exc: 665 mod.main() 666 finally: 667 lock_file.close() 668 669 assert exc.value.code == 1 670 output = capsys.readouterr().out 671 assert "Supervisor already running" in output 672 assert "PID 12345" in output 673 health_mock.assert_called_once_with() 674 start_mock.assert_not_called()