personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for the file-based Cortex agent manager."""
5
6import json
7import os
8from pathlib import Path
9from unittest.mock import MagicMock, patch
10
11import pytest
12
13from think.models import GPT_5
14
15
16class MockPipe:
17 """Mock for subprocess stdout/stderr that supports context manager protocol."""
18
19 def __init__(self, lines: list[str]):
20 self._lines = lines
21 self._iter = None
22
23 def __enter__(self):
24 self._iter = iter(self._lines)
25 return self
26
27 def __exit__(self, *args):
28 pass
29
30 def __iter__(self):
31 return self._iter or iter(self._lines)
32
33 def __next__(self):
34 if self._iter is None:
35 self._iter = iter(self._lines)
36 return next(self._iter)
37
38
39@pytest.fixture
40def mock_journal(tmp_path, monkeypatch):
41 """Set up a temporary journal directory."""
42 journal_path = tmp_path / "journal"
43 journal_path.mkdir()
44 agents_path = journal_path / "agents"
45 agents_path.mkdir()
46
47 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
48 return journal_path
49
50
51@pytest.fixture
52def cortex_service(mock_journal):
53 """Create a CortexService instance for testing."""
54 from think.cortex import CortexService
55
56 return CortexService(str(mock_journal))
57
58
59def test_agent_process_creation():
60 """Test AgentProcess class initialization and methods."""
61 from think.cortex import AgentProcess
62
63 mock_process = MagicMock()
64 mock_process.poll.return_value = None # Running
65 mock_process.pid = 12345
66
67 log_path = Path("/tmp/test.jsonl")
68 agent = AgentProcess("123456789", mock_process, log_path)
69
70 assert agent.agent_id == "123456789"
71 assert agent.process == mock_process
72 assert agent.log_path == log_path
73 assert agent.is_running() is True
74
75 # Test stop
76 agent.stop()
77 mock_process.terminate.assert_called_once()
78 assert agent.stop_event.is_set()
79
80
81def test_cortex_service_initialization(cortex_service, mock_journal):
82 """Test CortexService initialization."""
83 assert cortex_service.journal_path == mock_journal
84 assert cortex_service.agents_dir == mock_journal / "agents"
85 assert cortex_service.running_agents == {}
86 assert cortex_service.agents_dir.exists()
87
88
89@patch("think.cortex.subprocess.Popen")
90@patch("think.cortex.threading.Thread")
91@patch("think.cortex.threading.Timer")
92def test_spawn_subprocess(
93 mock_timer, mock_thread, mock_popen, cortex_service, mock_journal
94):
95 """Test spawning an agent subprocess."""
96 mock_process = MagicMock()
97 mock_process.pid = 12345
98 mock_process.poll.return_value = None
99 mock_process.stdin = MagicMock()
100 mock_process.stdout = MagicMock()
101 mock_process.stderr = MagicMock()
102 mock_popen.return_value = mock_process
103
104 # Setup mock timer
105 mock_timer_instance = MagicMock()
106 mock_timer.return_value = mock_timer_instance
107
108 agent_id = "123456789"
109 file_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
110
111 request = {
112 "event": "request",
113 "ts": 123456789,
114 "prompt": "Test prompt",
115 "provider": "openai",
116 "name": "unified",
117 "model": GPT_5,
118 }
119
120 cortex_service._spawn_subprocess(
121 agent_id,
122 file_path,
123 request,
124 ["sol", "agents"],
125 "agent",
126 )
127
128 # Check subprocess was called
129 mock_popen.assert_called_once()
130 call_args = mock_popen.call_args
131 assert call_args[0][0] == ["sol", "agents"]
132 assert call_args[1]["stdin"] is not None
133 assert call_args[1]["stdout"] is not None
134 assert call_args[1]["stderr"] is not None
135
136 # Check NDJSON was written to stdin
137 mock_process.stdin.write.assert_called_once()
138 written_data = mock_process.stdin.write.call_args[0][0]
139 ndjson = json.loads(written_data.strip())
140 assert ndjson["event"] == "request"
141 assert ndjson["prompt"] == "Test prompt"
142 assert ndjson["provider"] == "openai"
143 assert ndjson["name"] == "unified"
144 assert ndjson["model"] == GPT_5
145
146 # Check stdin was closed
147 mock_process.stdin.close.assert_called_once()
148
149 # Check agent was tracked
150 assert agent_id in cortex_service.running_agents
151 agent = cortex_service.running_agents[agent_id]
152 assert agent.agent_id == agent_id
153 assert agent.log_path == file_path
154
155 # Check monitoring threads were started
156 assert mock_thread.call_count == 2 # stdout and stderr
157
158 # Check timer was created and started
159 mock_timer.assert_called_once()
160 mock_timer_instance.start.assert_called_once()
161
162
163@patch("think.cortex.subprocess.Popen")
164@patch("think.cortex.threading.Thread")
165@patch("think.cortex.threading.Timer")
166def test_spawn_generator_via_subprocess(
167 mock_timer, mock_thread, mock_popen, cortex_service, mock_journal
168):
169 """Test spawning a generator subprocess via _spawn_subprocess."""
170 mock_process = MagicMock()
171 mock_process.pid = 54321
172 mock_process.poll.return_value = None
173 mock_process.stdin = MagicMock()
174 mock_process.stdout = MagicMock()
175 mock_process.stderr = MagicMock()
176 mock_popen.return_value = mock_process
177
178 # Setup mock timer
179 mock_timer_instance = MagicMock()
180 mock_timer.return_value = mock_timer_instance
181
182 agent_id = "987654321"
183 file_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
184
185 # Generator config has "output" instead of "tools"
186 config = {
187 "event": "request",
188 "ts": 987654321,
189 "name": "activity",
190 "day": "20240101",
191 "output": "md",
192 }
193
194 # Generators route through _spawn_subprocess
195 cortex_service._spawn_subprocess(
196 agent_id,
197 file_path,
198 config,
199 ["sol", "agents"],
200 "agent",
201 )
202
203 # Check subprocess was called with agents command (generators route through agents)
204 mock_popen.assert_called_once()
205 call_args = mock_popen.call_args
206 assert call_args[0][0] == ["sol", "agents"]
207 assert call_args[1]["stdin"] is not None
208 assert call_args[1]["stdout"] is not None
209 assert call_args[1]["stderr"] is not None
210
211 # Check NDJSON was written to stdin
212 mock_process.stdin.write.assert_called_once()
213 written_data = mock_process.stdin.write.call_args[0][0]
214 ndjson = json.loads(written_data.strip())
215 assert ndjson["event"] == "request"
216 assert ndjson["name"] == "activity"
217 assert ndjson["day"] == "20240101"
218 assert ndjson["output"] == "md"
219
220 # Check stdin was closed
221 mock_process.stdin.close.assert_called_once()
222
223 # Check generator was tracked
224 assert agent_id in cortex_service.running_agents
225 agent = cortex_service.running_agents[agent_id]
226 assert agent.agent_id == agent_id
227 assert agent.log_path == file_path
228
229 # Check monitoring threads were started
230 assert mock_thread.call_count == 2 # stdout and stderr
231
232 # Check timer was created and started
233 mock_timer.assert_called_once()
234 mock_timer_instance.start.assert_called_once()
235
236
237def test_monitor_stdout_json_events(cortex_service, mock_journal):
238 """Test monitoring stdout with JSON events."""
239 from io import StringIO
240
241 from think.cortex import AgentProcess
242
243 agent_id = "123456789"
244 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
245
246 mock_process = MagicMock()
247 mock_process.poll.return_value = 0 # Process exits
248 mock_process.stdout = StringIO(
249 '{"event": "start", "ts": 1234567890}\n'
250 '{"event": "finish", "ts": 1234567891, "result": "Done"}\n'
251 )
252
253 agent = AgentProcess(agent_id, mock_process, log_path)
254 cortex_service.running_agents[agent_id] = agent
255
256 with patch.object(cortex_service, "_complete_agent_file") as mock_complete:
257 cortex_service._monitor_stdout(agent)
258
259 # Check events were written to file
260 assert log_path.exists()
261 lines = log_path.read_text().strip().split("\n")
262 assert len(lines) == 2
263 assert json.loads(lines[0])["event"] == "start"
264 assert json.loads(lines[1])["event"] == "finish"
265
266 # Check file was completed
267 mock_complete.assert_called_once_with(agent_id, log_path)
268
269 # Check agent was removed
270 assert agent_id not in cortex_service.running_agents
271
272
273def test_monitor_stdout_non_json_output(cortex_service, mock_journal):
274 """Test monitoring stdout with non-JSON output."""
275 from io import StringIO
276
277 from think.cortex import AgentProcess
278
279 agent_id = "123456789"
280 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
281
282 mock_process = MagicMock()
283 mock_process.poll.return_value = 0
284 mock_process.stdout = StringIO(
285 'Plain text output\n{"event": "finish", "ts": 1234567890}\n'
286 )
287
288 agent = AgentProcess(agent_id, mock_process, log_path)
289 cortex_service.running_agents[agent_id] = agent
290
291 with patch.object(cortex_service, "_complete_agent_file"):
292 cortex_service._monitor_stdout(agent)
293
294 # Check info event was created for non-JSON
295 lines = log_path.read_text().strip().split("\n")
296 assert len(lines) == 2
297
298 info_event = json.loads(lines[0])
299 assert info_event["event"] == "info"
300 assert info_event["message"] == "Plain text output"
301 assert "ts" in info_event
302
303
304def test_monitor_stdout_no_finish_event(cortex_service, mock_journal):
305 """Test monitoring stdout when process exits without finish event."""
306 from io import StringIO
307
308 from think.cortex import AgentProcess
309
310 agent_id = "123456789"
311 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
312
313 mock_process = MagicMock()
314 mock_process.wait.return_value = 1 # Non-zero exit
315 mock_process.stdout = StringIO('{"event": "start", "ts": 1234567890}\n')
316
317 agent = AgentProcess(agent_id, mock_process, log_path)
318 cortex_service.running_agents[agent_id] = agent
319
320 with patch.object(cortex_service, "_complete_agent_file"):
321 cortex_service._monitor_stdout(agent)
322
323 # Check error event was added
324 lines = log_path.read_text().strip().split("\n")
325 assert len(lines) == 2
326
327 error_event = json.loads(lines[1])
328 assert error_event["event"] == "error"
329 assert "exit_code" in error_event
330 assert error_event["exit_code"] == 1
331
332
333def test_monitor_stderr(cortex_service, mock_journal):
334 """Test monitoring stderr for errors."""
335 from io import StringIO
336
337 from think.cortex import AgentProcess
338
339 agent_id = "123456789"
340 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
341
342 mock_process = MagicMock()
343 mock_process.poll.return_value = 1 # Error exit
344 mock_process.stderr = StringIO(
345 "Error: Something went wrong\nStack trace line 1\nStack trace line 2\n"
346 )
347
348 agent = AgentProcess(agent_id, mock_process, log_path)
349
350 cortex_service._monitor_stderr(agent)
351
352 # Check error event was written
353 assert log_path.exists()
354 lines = log_path.read_text().strip().split("\n")
355 assert len(lines) == 1
356
357 error_event = json.loads(lines[0])
358 assert error_event["event"] == "error"
359 assert "trace" in error_event
360 assert "Error: Something went wrong" in error_event["trace"]
361 assert error_event["exit_code"] == 1
362
363
364def test_has_finish_event(cortex_service, mock_journal):
365 """Test checking for finish event in JSONL file."""
366 file_path = mock_journal / "agents" / "test.jsonl"
367
368 # File with finish event
369 file_path.write_text(
370 '{"event": "start", "ts": 123}\n{"event": "finish", "ts": 124}\n'
371 )
372 assert cortex_service._has_finish_event(file_path) is True
373
374 # File with error event
375 file_path.write_text(
376 '{"event": "start", "ts": 123}\n{"event": "error", "ts": 124}\n'
377 )
378 assert cortex_service._has_finish_event(file_path) is True
379
380 # File without finish/error
381 file_path.write_text('{"event": "start", "ts": 123}\n')
382 assert cortex_service._has_finish_event(file_path) is False
383
384 # Empty file
385 file_path.write_text("")
386 assert cortex_service._has_finish_event(file_path) is False
387
388
389def test_complete_agent_file(cortex_service, mock_journal):
390 """Test completing an agent file (rename from active to completed)."""
391 agent_id = "123456789"
392 unified_dir = mock_journal / "agents" / "unified"
393 unified_dir.mkdir()
394 active_path = unified_dir / f"{agent_id}_active.jsonl"
395 active_path.touch()
396 cortex_service.agent_requests[agent_id] = {"name": "unified", "agent_id": agent_id}
397
398 cortex_service._complete_agent_file(agent_id, active_path)
399
400 # Check file was renamed
401 assert not active_path.exists()
402 completed_path = unified_dir / f"{agent_id}.jsonl"
403 assert completed_path.exists()
404 symlink_path = mock_journal / "agents" / "unified.log"
405 assert symlink_path.is_symlink()
406 assert os.readlink(symlink_path) == f"unified/{agent_id}.jsonl"
407
408
409def test_complete_agent_file_replaces_symlink(cortex_service, mock_journal):
410 """Test completing agent file replaces convenience symlink for same name."""
411 unified_dir = mock_journal / "agents" / "unified"
412 unified_dir.mkdir()
413
414 first_agent_id = "111"
415 first_active_path = unified_dir / f"{first_agent_id}_active.jsonl"
416 first_active_path.touch()
417 cortex_service.agent_requests[first_agent_id] = {"name": "unified"}
418
419 cortex_service._complete_agent_file(first_agent_id, first_active_path)
420
421 second_agent_id = "222"
422 second_active_path = unified_dir / f"{second_agent_id}_active.jsonl"
423 second_active_path.touch()
424 cortex_service.agent_requests[second_agent_id] = {"name": "unified"}
425
426 cortex_service._complete_agent_file(second_agent_id, second_active_path)
427
428 symlink_path = mock_journal / "agents" / "unified.log"
429 assert symlink_path.is_symlink()
430 assert os.readlink(symlink_path) == f"unified/{second_agent_id}.jsonl"
431
432
433def test_complete_agent_file_colon_name(cortex_service, mock_journal):
434 """Test completing agent file sanitizes colon in convenience symlink name."""
435 agent_id = "123456789"
436 entities_dir = mock_journal / "agents" / "entities--entity_assist"
437 entities_dir.mkdir()
438 active_path = entities_dir / f"{agent_id}_active.jsonl"
439 active_path.touch()
440 cortex_service.agent_requests[agent_id] = {"name": "entities:entity_assist"}
441
442 cortex_service._complete_agent_file(agent_id, active_path)
443
444 symlink_path = mock_journal / "agents" / "entities--entity_assist.log"
445 assert symlink_path.is_symlink()
446 assert os.readlink(symlink_path) == f"entities--entity_assist/{agent_id}.jsonl"
447
448
449def test_complete_agent_file_no_name(cortex_service, mock_journal):
450 """Test completing agent file skips symlink when request name is missing."""
451 agent_id = "123456789"
452 active_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
453 active_path.touch()
454
455 cortex_service._complete_agent_file(agent_id, active_path)
456
457 completed_path = mock_journal / "agents" / f"{agent_id}.jsonl"
458 assert completed_path.exists()
459 assert not any(path.is_symlink() for path in (mock_journal / "agents").iterdir())
460
461
462def test_write_error_and_complete(cortex_service, mock_journal):
463 """Test writing error and completing file."""
464 agent_id = "123456789"
465 file_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
466 file_path.touch()
467
468 cortex_service._write_error_and_complete(file_path, "Test error message")
469
470 # Check error was written
471 completed_path = mock_journal / "agents" / f"{agent_id}.jsonl"
472 assert completed_path.exists()
473 assert not file_path.exists()
474
475 content = completed_path.read_text()
476 error_event = json.loads(content)
477 assert error_event["event"] == "error"
478 assert error_event["error"] == "Test error message"
479 assert "ts" in error_event
480
481
482def test_get_status(cortex_service):
483 """Test getting service status."""
484 from think.cortex import AgentProcess
485
486 # Empty status
487 status = cortex_service.get_status()
488 assert status["running_agents"] == 0
489 assert status["agent_ids"] == []
490
491 # Add running agents
492 mock_process = MagicMock()
493 agent1 = AgentProcess("111", mock_process, Path("/tmp/1.jsonl"))
494 agent2 = AgentProcess("222", mock_process, Path("/tmp/2.jsonl"))
495
496 cortex_service.running_agents["111"] = agent1
497 cortex_service.running_agents["222"] = agent2
498
499 status = cortex_service.get_status()
500 assert status["running_agents"] == 2
501 assert set(status["agent_ids"]) == {"111", "222"}
502
503
504def test_write_output(cortex_service, mock_journal):
505 """Test writing agent output using explicit output_path."""
506 agent_id = "test_agent"
507 result = "This is the agent result content"
508 expected_path = mock_journal / "20240115" / "agents" / "my_agent.md"
509 config = {"output": "md", "name": "my_agent", "output_path": str(expected_path)}
510
511 cortex_service._write_output(agent_id, result, config)
512
513 assert expected_path.exists()
514 assert expected_path.read_text() == result
515 assert expected_path.parent.is_dir()
516
517
518def test_write_output_with_error(cortex_service, mock_journal, caplog):
519 """Test write output handles errors gracefully."""
520 import logging
521
522 output_path = mock_journal / "20240115" / "agents" / "test.md"
523 with patch("builtins.open", side_effect=PermissionError("Cannot write")):
524 with caplog.at_level(logging.ERROR):
525 config = {"output": "md", "name": "test", "output_path": str(output_path)}
526 cortex_service._write_output("agent_id", "result", config)
527
528 # Check error was logged but didn't raise
529 assert "Failed to write agent agent_id output" in caplog.text
530
531
532def test_write_output_missing_path_skips(cortex_service, mock_journal, caplog):
533 """Test write output skips when output_path is missing."""
534 config = {"output": "md", "name": "test"}
535 cortex_service._write_output("agent_id", "result", config)
536
537 # No output written, no error — silent skip is expected
538 assert "Failed to write" not in caplog.text
539
540
541def test_write_output_with_day_parameter(cortex_service, mock_journal):
542 """Test writing agent output to a specific day directory."""
543 agent_id = "test_agent"
544 result = "This is the agent result content"
545 specified_day = "20240201"
546 expected_path = mock_journal / specified_day / "agents" / "reporter.md"
547 config = {
548 "output": "md",
549 "name": "reporter",
550 "day": specified_day,
551 "output_path": str(expected_path),
552 }
553
554 cortex_service._write_output(agent_id, result, config)
555
556 assert expected_path.exists()
557 assert expected_path.read_text() == result
558 assert expected_path.parent.is_dir()
559
560
561def test_write_output_with_segment(cortex_service, mock_journal):
562 """Test writing segment agent output to segment agents directory."""
563 agent_id = "segment_agent"
564 result = "Segment analysis content"
565 expected_path = mock_journal / "20240115" / "143000_600" / "agents" / "analyzer.md"
566 config = {
567 "output": "md",
568 "name": "analyzer",
569 "segment": "143000_600",
570 "output_path": str(expected_path),
571 }
572
573 cortex_service._write_output(agent_id, result, config)
574
575 assert expected_path.exists()
576 assert expected_path.read_text() == result
577
578
579def test_write_output_json_format(cortex_service, mock_journal):
580 """Test writing agent output in JSON format."""
581 agent_id = "json_agent"
582 result = '{"key": "value"}'
583 expected_path = mock_journal / "20240115" / "agents" / "data_agent.json"
584 config = {
585 "output": "json",
586 "name": "data_agent",
587 "output_path": str(expected_path),
588 }
589
590 cortex_service._write_output(agent_id, result, config)
591
592 assert expected_path.exists()
593 assert expected_path.read_text() == result
594
595
596def test_monitor_stdout_with_output(cortex_service, mock_journal):
597 """Test monitor_stdout writes output when output_path is present."""
598 from think.cortex import AgentProcess
599
600 agent_id = "output_test"
601 active_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
602 output_path = mock_journal / "20240115" / "agents" / "test_agent.md"
603
604 # Store request with explicit output_path
605 cortex_service.agent_requests = {
606 agent_id: {
607 "event": "request",
608 "prompt": "test",
609 "output": "md",
610 "name": "test_agent",
611 "output_path": str(output_path),
612 }
613 }
614
615 mock_process = MagicMock()
616 mock_stdout = [
617 '{"event": "start", "ts": 1000}\n',
618 '{"event": "finish", "ts": 2000, "result": "Test result"}\n',
619 ]
620 mock_process.stdout = MockPipe(mock_stdout)
621 mock_process.wait.return_value = 0
622
623 agent = AgentProcess(agent_id, mock_process, active_path)
624
625 with patch.object(cortex_service, "_complete_agent_file"):
626 with patch.object(cortex_service, "_has_finish_event", return_value=True):
627 cortex_service._monitor_stdout(agent)
628
629 assert output_path.exists()
630 assert output_path.read_text() == "Test result"
631
632
633def test_monitor_stdout_with_output_and_day(cortex_service, mock_journal):
634 """Test monitor_stdout writes output to specific day via output_path."""
635 from think.cortex import AgentProcess
636
637 agent_id = "output_day_test"
638 active_path = mock_journal / "agents" / f"{agent_id}_active.jsonl"
639 specified_day = "20240220"
640 output_path = mock_journal / specified_day / "agents" / "daily_reporter.md"
641
642 # Store request with explicit output_path and day
643 cortex_service.agent_requests = {
644 agent_id: {
645 "event": "request",
646 "prompt": "test",
647 "output": "md",
648 "name": "daily_reporter",
649 "day": specified_day,
650 "output_path": str(output_path),
651 }
652 }
653
654 mock_process = MagicMock()
655 mock_stdout = [
656 '{"event": "start", "ts": 1000}\n',
657 '{"event": "finish", "ts": 2000, "result": "Daily report content"}\n',
658 ]
659 mock_process.stdout = MockPipe(mock_stdout)
660 mock_process.wait.return_value = 0
661
662 agent = AgentProcess(agent_id, mock_process, active_path)
663
664 with patch.object(cortex_service, "_complete_agent_file"):
665 with patch.object(cortex_service, "_has_finish_event", return_value=True):
666 cortex_service._monitor_stdout(agent)
667
668 assert output_path.exists()
669 assert output_path.read_text() == "Daily report content"
670
671
672def test_recover_orphaned_agents(cortex_service, mock_journal):
673 """Test recovery of orphaned active agent files."""
674 # Create orphaned active files
675 agents_dir = mock_journal / "agents"
676 unified_dir = agents_dir / "unified"
677 unified_dir.mkdir()
678 agent1_active = unified_dir / "111_active.jsonl"
679 agent2_active = unified_dir / "222_active.jsonl"
680
681 agent1_active.write_text('{"event": "start", "ts": 1000}\n')
682 agent2_active.write_text('{"event": "start", "ts": 2000}\n')
683
684 active_files = [agent1_active, agent2_active]
685 cortex_service._recover_orphaned_agents(active_files)
686
687 # Check active files were renamed to completed
688 assert not agent1_active.exists()
689 assert not agent2_active.exists()
690 assert (unified_dir / "111.jsonl").exists()
691 assert (unified_dir / "222.jsonl").exists()
692
693 # Check error events were appended
694 content1 = (unified_dir / "111.jsonl").read_text()
695 lines1 = content1.strip().split("\n")
696 assert len(lines1) == 2
697 error_event = json.loads(lines1[1])
698 assert error_event["event"] == "error"
699 assert "Recovered" in error_event["error"]
700 assert error_event["agent_id"] == "111"
701
702 content2 = (unified_dir / "222.jsonl").read_text()
703 lines2 = content2.strip().split("\n")
704 assert len(lines2) == 2
705 assert json.loads(lines2[1])["event"] == "error"