personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4import importlib
5import io
6import os
7import subprocess
8import sys
9from unittest.mock import MagicMock
10
11import pytest
12
13
14@pytest.mark.asyncio
15async def test_send_notification(monkeypatch):
16 mod = importlib.import_module("think.supervisor")
17 called = []
18
19 class FakeNotifier:
20 async def send(self, title, message, urgency):
21 called.append({"title": title, "message": message, "urgency": urgency})
22 return "test-notification-id"
23
24 def fake_get_notifier():
25 return FakeNotifier()
26
27 monkeypatch.setattr(mod, "_get_notifier", fake_get_notifier)
28 await mod.send_notification("test message", alert_key=("test", "key"))
29 assert len(called) == 1
30 assert called[0]["message"] == "test message"
31 assert called[0]["title"] == "solstone Supervisor"
32 assert ("test", "key") in mod._notification_ids
33 assert mod._notification_ids[("test", "key")] == "test-notification-id"
34
35
36@pytest.mark.asyncio
37async def test_clear_notification(monkeypatch):
38 mod = importlib.import_module("think.supervisor")
39 cleared = []
40
41 class FakeNotifier:
42 async def send(self, title, message, urgency):
43 return "test-notification-id"
44
45 async def clear(self, notification_id):
46 cleared.append(notification_id)
47
48 def fake_get_notifier():
49 return FakeNotifier()
50
51 monkeypatch.setattr(mod, "_get_notifier", fake_get_notifier)
52
53 # First send a notification to track
54 await mod.send_notification("test message", alert_key=("test", "key"))
55 assert ("test", "key") in mod._notification_ids
56
57 # Now clear it
58 await mod.clear_notification(("test", "key"))
59 assert len(cleared) == 1
60 assert cleared[0] == "test-notification-id"
61 assert ("test", "key") not in mod._notification_ids
62
63 # Clearing a non-existent notification should be a no-op
64 await mod.clear_notification(("nonexistent", "key"))
65 assert len(cleared) == 1 # Still just one clear call
66
67
68def test_start_sense(tmp_path, mock_callosum, monkeypatch):
69 """Test that sense launches correctly."""
70 mod = importlib.import_module("think.supervisor")
71
72 started = []
73
74 class DummyProc:
75 def __init__(self):
76 self.stdout = io.StringIO()
77 self.stderr = io.StringIO()
78 self.pid = 12345
79
80 def terminate(self):
81 pass
82
83 def wait(self, timeout=None):
84 pass
85
86 def fake_popen(
87 cmd,
88 stdout=None,
89 stderr=None,
90 text=False,
91 bufsize=-1,
92 start_new_session=False,
93 env=None,
94 ):
95 proc = DummyProc()
96 started.append((cmd, stdout, stderr))
97 return proc
98
99 monkeypatch.setattr(mod.subprocess, "Popen", fake_popen)
100 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
101
102 # Test start_sense()
103 sense_proc = mod.start_sense()
104 assert sense_proc is not None
105 assert any(cmd == ["sol", "sense", "-v"] for cmd, _, _ in started)
106
107 # Check that stdout and stderr capture pipes
108 for cmd, stdout, stderr in started:
109 assert stdout == subprocess.PIPE
110 assert stderr == subprocess.PIPE
111
112
113def test_start_sync(tmp_path, mock_callosum, monkeypatch):
114 """Test that start_sync() launches sol sync with remote URL."""
115 mod = importlib.import_module("think.supervisor")
116
117 started = []
118
119 class DummyProc:
120 def __init__(self):
121 self.stdout = io.StringIO()
122 self.stderr = io.StringIO()
123 self.pid = 12345
124
125 def terminate(self):
126 pass
127
128 def wait(self, timeout=None):
129 pass
130
131 def fake_popen(
132 cmd,
133 stdout=None,
134 stderr=None,
135 text=False,
136 bufsize=-1,
137 start_new_session=False,
138 env=None,
139 ):
140 proc = DummyProc()
141 started.append((cmd, stdout, stderr))
142 return proc
143
144 monkeypatch.setattr(mod.subprocess, "Popen", fake_popen)
145 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
146
147 # Test start_sync()
148 remote_url = "https://server:5000/app/observer/ingest/abc123"
149 sync_proc = mod.start_sync(remote_url)
150 assert sync_proc is not None
151
152 # Verify the command includes --remote with the URL
153 sync_cmds = [cmd for cmd, _, _ in started if "sync" in cmd]
154 assert len(sync_cmds) == 1
155 cmd = sync_cmds[0]
156 assert cmd == ["sol", "sync", "-v", "--remote", remote_url]
157
158
159def test_parse_args_remote_flag():
160 """Test that parse_args includes --remote flag."""
161 mod = importlib.reload(importlib.import_module("think.supervisor"))
162
163 parser = mod.parse_args()
164 args = parser.parse_args(["--remote", "https://server/ingest/key"])
165
166 assert args.remote == "https://server/ingest/key"
167
168
169def test_parse_args_remote_flag_optional():
170 """Test that --remote is optional."""
171 mod = importlib.reload(importlib.import_module("think.supervisor"))
172
173 parser = mod.parse_args()
174 args = parser.parse_args([])
175
176 assert args.remote is None
177
178
179def test_shutdown_stops_in_reverse_order(monkeypatch):
180 """Shutdown stops services in reverse order."""
181 mod = importlib.import_module("think.supervisor")
182
183 operations = []
184
185 class MockProc:
186 def __init__(self, name):
187 self._name = name
188
189 def terminate(self):
190 operations.append(("terminate", self._name))
191
192 def wait(self, timeout=None):
193 operations.append(("wait", self._name))
194
195 def kill(self):
196 pass
197
198 def poll(self):
199 return None
200
201 class MockManaged:
202 def __init__(self, name):
203 self.name = name
204 self.process = MockProc(name)
205 self.shutdown_timeout = 15
206
207 def cleanup(self):
208 operations.append(("cleanup", self.name))
209
210 procs = [
211 MockManaged("convey"),
212 MockManaged("sense"),
213 MockManaged("cortex"),
214 ]
215
216 for managed in reversed(procs):
217 proc = managed.process
218 try:
219 proc.terminate()
220 except Exception:
221 pass
222 try:
223 proc.wait(timeout=managed.shutdown_timeout)
224 except Exception:
225 pass
226 managed.cleanup()
227
228 assert operations == [
229 ("terminate", "cortex"),
230 ("wait", "cortex"),
231 ("cleanup", "cortex"),
232 ("terminate", "sense"),
233 ("wait", "sense"),
234 ("cleanup", "sense"),
235 ("terminate", "convey"),
236 ("wait", "convey"),
237 ("cleanup", "convey"),
238 ]
239
240
241def test_get_command_name():
242 """Test command name extraction for queue serialization."""
243 mod = importlib.import_module("think.supervisor")
244 get = mod.TaskQueue.get_command_name
245
246 # sol X -> X
247 assert get(["sol", "indexer", "--rescan"]) == "indexer"
248 assert get(["sol", "insight", "20240101"]) == "insight"
249 assert get(["sol", "dream", "--day", "20240101"]) == "dream"
250
251 # Other commands -> basename
252 assert get(["/usr/bin/python", "script.py"]) == "python"
253 assert get(["custom-tool"]) == "custom-tool"
254
255 # Empty -> unknown
256 assert get([]) == "unknown"
257
258
259def test_task_queue_same_command_queued(monkeypatch):
260 """Test that same command is queued when already running."""
261 mod = importlib.import_module("think.supervisor")
262
263 # Create fresh task queue (no callback to avoid callosum events)
264 mod._task_queue = mod.TaskQueue(on_queue_change=None)
265
266 spawned = []
267
268 def fake_thread_start(self):
269 spawned.append(self._target.__name__)
270
271 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
272
273 # First request - should run immediately
274 msg1 = {
275 "tract": "supervisor",
276 "event": "request",
277 "cmd": ["sol", "indexer", "--rescan"],
278 }
279 mod._handle_task_request(msg1)
280
281 assert "indexer" in mod._task_queue._running
282 assert len(spawned) == 1
283
284 # Second request (different args) - should be queued
285 msg2 = {
286 "tract": "supervisor",
287 "event": "request",
288 "cmd": ["sol", "indexer", "--rescan-full"],
289 }
290 mod._handle_task_request(msg2)
291
292 assert len(spawned) == 1 # No new spawn
293 assert "indexer" in mod._task_queue._queues
294 assert len(mod._task_queue._queues["indexer"]) == 1
295 # Queue entries are {refs, cmd} dicts (refs is a list for coalescing)
296 assert mod._task_queue._queues["indexer"][0]["cmd"] == [
297 "sol",
298 "indexer",
299 "--rescan-full",
300 ]
301 assert len(mod._task_queue._queues["indexer"][0]["refs"]) == 1
302
303
304def test_task_queue_dedupe_exact_match(monkeypatch):
305 """Test that exact same command is deduped in queue."""
306 mod = importlib.import_module("think.supervisor")
307
308 # Create fresh task queue (no callback to avoid callosum events)
309 mod._task_queue = mod.TaskQueue(on_queue_change=None)
310
311 spawned = []
312
313 def fake_thread_start(self):
314 spawned.append(self._target.__name__)
315
316 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
317
318 # First request - runs
319 msg1 = {
320 "tract": "supervisor",
321 "event": "request",
322 "cmd": ["sol", "indexer", "--rescan"],
323 }
324 mod._handle_task_request(msg1)
325
326 # Second request (same cmd) - queued
327 msg2 = {
328 "tract": "supervisor",
329 "event": "request",
330 "cmd": ["sol", "indexer", "--rescan"],
331 }
332 mod._handle_task_request(msg2)
333
334 assert len(mod._task_queue._queues["indexer"]) == 1
335
336 # Third request (same cmd again) - deduped, not added
337 msg3 = {
338 "tract": "supervisor",
339 "event": "request",
340 "cmd": ["sol", "indexer", "--rescan"],
341 }
342 mod._handle_task_request(msg3)
343
344 assert len(mod._task_queue._queues["indexer"]) == 1 # Still just 1
345
346
347def test_task_queue_different_commands_independent(monkeypatch):
348 """Test that different commands have independent queues."""
349 mod = importlib.import_module("think.supervisor")
350
351 # Create fresh task queue (no callback to avoid callosum events)
352 mod._task_queue = mod.TaskQueue(on_queue_change=None)
353
354 spawned = []
355
356 def fake_thread_start(self):
357 spawned.append(self._target.__name__)
358
359 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
360
361 # Indexer request - runs
362 msg1 = {
363 "tract": "supervisor",
364 "event": "request",
365 "cmd": ["sol", "indexer", "--rescan"],
366 }
367 mod._handle_task_request(msg1)
368
369 # Insight request - also runs (different command)
370 msg2 = {
371 "tract": "supervisor",
372 "event": "request",
373 "cmd": ["sol", "insight", "20240101"],
374 }
375 mod._handle_task_request(msg2)
376
377 assert len(spawned) == 2 # Both spawned
378 assert "indexer" in mod._task_queue._running
379 assert "insight" in mod._task_queue._running
380
381
382def test_process_queue_spawns_next(monkeypatch):
383 """Test that _process_next spawns next queued task."""
384 mod = importlib.import_module("think.supervisor")
385
386 # Create task queue with pre-set state
387 mod._task_queue = mod.TaskQueue(on_queue_change=None)
388 mod._task_queue._running = {"indexer": "ref123"}
389 mod._task_queue._queues = {
390 "indexer": [
391 {"refs": ["queued-ref"], "cmd": ["sol", "indexer", "--rescan-full"]}
392 ]
393 }
394
395 spawned = []
396
397 def fake_thread_start(self):
398 spawned.append(self._args) # Capture args (refs, cmd, cmd_name, callosum)
399
400 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
401
402 # Process queue
403 mod._task_queue._process_next("indexer")
404
405 # Should have spawned the queued task with its refs list
406 assert len(spawned) == 1
407 assert spawned[0][0] == ["queued-ref"] # refs list preserved from queue
408 assert spawned[0][1] == ["sol", "indexer", "--rescan-full"] # cmd
409 assert spawned[0][2] == "indexer" # cmd_name
410
411 # Queue should be empty now
412 assert mod._task_queue._queues["indexer"] == []
413
414
415def test_process_queue_clears_running_when_empty(monkeypatch):
416 """Test that _process_next clears running state when queue is empty."""
417 mod = importlib.import_module("think.supervisor")
418
419 # Create task queue with pre-set state (no queued tasks)
420 mod._task_queue = mod.TaskQueue(on_queue_change=None)
421 mod._task_queue._running = {"indexer": "ref123"}
422 mod._task_queue._queues = {"indexer": []}
423
424 spawned = []
425
426 def fake_thread_start(self):
427 spawned.append(True)
428
429 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
430
431 # Process queue
432 mod._task_queue._process_next("indexer")
433
434 # No spawn (queue was empty)
435 assert len(spawned) == 0
436
437 # Running state should be cleared
438 assert "indexer" not in mod._task_queue._running
439
440
441def test_task_request_uses_caller_provided_ref(monkeypatch):
442 """Test that caller-provided ref is used and preserved through queue."""
443 mod = importlib.import_module("think.supervisor")
444
445 # Create fresh task queue (no callback to avoid callosum events)
446 mod._task_queue = mod.TaskQueue(on_queue_change=None)
447
448 spawned = []
449
450 def fake_thread_start(self):
451 spawned.append(self._args) # Capture args (refs, cmd, cmd_name, callosum)
452
453 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
454
455 # Request with caller-provided ref
456 msg = {
457 "tract": "supervisor",
458 "event": "request",
459 "cmd": ["sol", "indexer", "--rescan"],
460 "ref": "my-custom-ref-123",
461 }
462 mod._handle_task_request(msg)
463
464 # Should use the provided ref
465 assert mod._task_queue._running["indexer"] == "my-custom-ref-123"
466 assert spawned[0][0] == ["my-custom-ref-123"] # refs is a list
467
468
469def test_task_queue_preserves_caller_ref(monkeypatch):
470 """Test that queued requests preserve their caller-provided ref."""
471 mod = importlib.import_module("think.supervisor")
472
473 # Create fresh task queue (no callback to avoid callosum events)
474 mod._task_queue = mod.TaskQueue(on_queue_change=None)
475
476 spawned = []
477
478 def fake_thread_start(self):
479 spawned.append(self._args)
480
481 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
482
483 # First request runs immediately
484 msg1 = {
485 "tract": "supervisor",
486 "event": "request",
487 "cmd": ["sol", "indexer", "--rescan"],
488 "ref": "first-ref",
489 }
490 mod._handle_task_request(msg1)
491
492 # Second request gets queued with its own ref
493 msg2 = {
494 "tract": "supervisor",
495 "event": "request",
496 "cmd": ["sol", "indexer", "--rescan-full"],
497 "ref": "second-ref",
498 }
499 mod._handle_task_request(msg2)
500
501 # Verify queued entry has the caller's ref in refs list
502 assert len(mod._task_queue._queues["indexer"]) == 1
503 assert mod._task_queue._queues["indexer"][0]["refs"] == ["second-ref"]
504 assert mod._task_queue._queues["indexer"][0]["cmd"] == [
505 "sol",
506 "indexer",
507 "--rescan-full",
508 ]
509
510
511def test_task_queue_coalesces_refs_on_dedupe(monkeypatch):
512 """Test that duplicate queued requests coalesce their refs."""
513 mod = importlib.import_module("think.supervisor")
514
515 # Create fresh task queue (no callback to avoid callosum events)
516 mod._task_queue = mod.TaskQueue(on_queue_change=None)
517
518 spawned = []
519
520 def fake_thread_start(self):
521 spawned.append(self._args)
522
523 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
524
525 # First request runs immediately
526 msg1 = {
527 "tract": "supervisor",
528 "event": "request",
529 "cmd": ["sol", "indexer", "--rescan"],
530 "ref": "first-ref",
531 }
532 mod._handle_task_request(msg1)
533
534 # Second request (same cmd) gets queued
535 msg2 = {
536 "tract": "supervisor",
537 "event": "request",
538 "cmd": ["sol", "indexer", "--rescan"],
539 "ref": "second-ref",
540 }
541 mod._handle_task_request(msg2)
542
543 # Third request (same cmd) should coalesce its ref into existing queue entry
544 msg3 = {
545 "tract": "supervisor",
546 "event": "request",
547 "cmd": ["sol", "indexer", "--rescan"],
548 "ref": "third-ref",
549 }
550 mod._handle_task_request(msg3)
551
552 # Should still be just one queue entry
553 assert len(mod._task_queue._queues["indexer"]) == 1
554 # But it should have both refs
555 assert mod._task_queue._queues["indexer"][0]["refs"] == [
556 "second-ref",
557 "third-ref",
558 ]
559
560
561def test_process_queue_spawns_with_multiple_refs(monkeypatch):
562 """Test that dequeued task has all coalesced refs."""
563 mod = importlib.import_module("think.supervisor")
564
565 # Create task queue with pre-set state (queued task with multiple refs)
566 mod._task_queue = mod.TaskQueue(on_queue_change=None)
567 mod._task_queue._running = {"indexer": "running-ref"}
568 mod._task_queue._queues = {
569 "indexer": [
570 {
571 "refs": ["ref-A", "ref-B", "ref-C"],
572 "cmd": ["sol", "indexer", "--rescan"],
573 }
574 ]
575 }
576
577 spawned = []
578
579 def fake_thread_start(self):
580 spawned.append(self._args)
581
582 monkeypatch.setattr(mod.threading.Thread, "start", fake_thread_start)
583
584 # Process queue
585 mod._task_queue._process_next("indexer")
586
587 # Should spawn with all three refs
588 assert len(spawned) == 1
589 assert spawned[0][0] == ["ref-A", "ref-B", "ref-C"] # all refs passed
590 assert spawned[0][1] == ["sol", "indexer", "--rescan"]
591
592
593def test_supervisor_singleton_lock_acquired(tmp_path, monkeypatch):
594 mod = importlib.reload(importlib.import_module("think.supervisor"))
595
596 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
597 (tmp_path / "health").mkdir(parents=True, exist_ok=True)
598 monkeypatch.setattr(sys, "argv", ["supervisor"])
599
600 def stop_after_lock():
601 raise SystemExit(0)
602
603 monkeypatch.setattr(mod, "start_callosum_in_process", stop_after_lock)
604
605 with pytest.raises(SystemExit) as exc:
606 mod.main()
607
608 assert exc.value.code == 0
609 assert (tmp_path / "health" / "supervisor.lock").exists()
610 assert (tmp_path / "health" / "supervisor.pid").read_text().strip() == str(
611 os.getpid()
612 )
613
614
615def test_supervisor_singleton_lock_blocked(tmp_path, monkeypatch, capsys):
616 import fcntl
617
618 mod = importlib.reload(importlib.import_module("think.supervisor"))
619
620 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
621 health_dir = tmp_path / "health"
622 health_dir.mkdir(parents=True, exist_ok=True)
623 lock_file = open(health_dir / "supervisor.lock", "w")
624 fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
625 (health_dir / "supervisor.pid").write_text("12345")
626 monkeypatch.setattr(sys, "argv", ["supervisor"])
627
628 start_mock = MagicMock()
629 monkeypatch.setattr(mod, "start_callosum_in_process", start_mock)
630
631 try:
632 with pytest.raises(SystemExit) as exc:
633 mod.main()
634 finally:
635 lock_file.close()
636
637 assert exc.value.code == 1
638 output = capsys.readouterr().out
639 assert "Supervisor already running" in output
640 assert "PID 12345" in output
641 start_mock.assert_not_called()
642
643
644def test_supervisor_singleton_lock_blocked_with_health(tmp_path, monkeypatch, capsys):
645 import fcntl
646
647 mod = importlib.reload(importlib.import_module("think.supervisor"))
648
649 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
650 health_dir = tmp_path / "health"
651 health_dir.mkdir(parents=True, exist_ok=True)
652 lock_file = open(health_dir / "supervisor.lock", "w")
653 fcntl.flock(lock_file, fcntl.LOCK_EX | fcntl.LOCK_NB)
654 (health_dir / "supervisor.pid").write_text("12345")
655 (health_dir / "callosum.sock").touch()
656 monkeypatch.setattr(sys, "argv", ["supervisor"])
657
658 start_mock = MagicMock()
659 health_mock = MagicMock(return_value=0)
660 monkeypatch.setattr(mod, "start_callosum_in_process", start_mock)
661 monkeypatch.setattr("think.health_cli.health_check", health_mock)
662
663 try:
664 with pytest.raises(SystemExit) as exc:
665 mod.main()
666 finally:
667 lock_file.close()
668
669 assert exc.value.code == 1
670 output = capsys.readouterr().out
671 assert "Supervisor already running" in output
672 assert "PID 12345" in output
673 health_mock.assert_called_once_with()
674 start_mock.assert_not_called()