personal memory agent
at main 705 lines 24 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for the file-based Cortex agent manager.""" 5 6import json 7import os 8from pathlib import Path 9from unittest.mock import MagicMock, patch 10 11import pytest 12 13from think.models import GPT_5 14 15 16class MockPipe: 17 """Mock for subprocess stdout/stderr that supports context manager protocol.""" 18 19 def __init__(self, lines: list[str]): 20 self._lines = lines 21 self._iter = None 22 23 def __enter__(self): 24 self._iter = iter(self._lines) 25 return self 26 27 def __exit__(self, *args): 28 pass 29 30 def __iter__(self): 31 return self._iter or iter(self._lines) 32 33 def __next__(self): 34 if self._iter is None: 35 self._iter = iter(self._lines) 36 return next(self._iter) 37 38 39@pytest.fixture 40def mock_journal(tmp_path, monkeypatch): 41 """Set up a temporary journal directory.""" 42 journal_path = tmp_path / "journal" 43 journal_path.mkdir() 44 agents_path = journal_path / "agents" 45 agents_path.mkdir() 46 47 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 48 return journal_path 49 50 51@pytest.fixture 52def cortex_service(mock_journal): 53 """Create a CortexService instance for testing.""" 54 from think.cortex import CortexService 55 56 return CortexService(str(mock_journal)) 57 58 59def test_agent_process_creation(): 60 """Test AgentProcess class initialization and methods.""" 61 from think.cortex import AgentProcess 62 63 mock_process = MagicMock() 64 mock_process.poll.return_value = None # Running 65 mock_process.pid = 12345 66 67 log_path = Path("/tmp/test.jsonl") 68 agent = AgentProcess("123456789", mock_process, log_path) 69 70 assert agent.agent_id == "123456789" 71 assert agent.process == mock_process 72 assert agent.log_path == log_path 73 assert agent.is_running() is True 74 75 # Test stop 76 agent.stop() 77 mock_process.terminate.assert_called_once() 78 assert agent.stop_event.is_set() 79 80 81def test_cortex_service_initialization(cortex_service, mock_journal): 82 """Test CortexService initialization.""" 83 assert cortex_service.journal_path == mock_journal 84 assert cortex_service.agents_dir == mock_journal / "agents" 85 assert cortex_service.running_agents == {} 86 assert cortex_service.agents_dir.exists() 87 88 89@patch("think.cortex.subprocess.Popen") 90@patch("think.cortex.threading.Thread") 91@patch("think.cortex.threading.Timer") 92def test_spawn_subprocess( 93 mock_timer, mock_thread, mock_popen, cortex_service, mock_journal 94): 95 """Test spawning an agent subprocess.""" 96 mock_process = MagicMock() 97 mock_process.pid = 12345 98 mock_process.poll.return_value = None 99 mock_process.stdin = MagicMock() 100 mock_process.stdout = MagicMock() 101 mock_process.stderr = MagicMock() 102 mock_popen.return_value = mock_process 103 104 # Setup mock timer 105 mock_timer_instance = MagicMock() 106 mock_timer.return_value = mock_timer_instance 107 108 agent_id = "123456789" 109 file_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 110 111 request = { 112 "event": "request", 113 "ts": 123456789, 114 "prompt": "Test prompt", 115 "provider": "openai", 116 "name": "unified", 117 "model": GPT_5, 118 } 119 120 cortex_service._spawn_subprocess( 121 agent_id, 122 file_path, 123 request, 124 ["sol", "agents"], 125 "agent", 126 ) 127 128 # Check subprocess was called 129 mock_popen.assert_called_once() 130 call_args = mock_popen.call_args 131 assert call_args[0][0] == ["sol", "agents"] 132 assert call_args[1]["stdin"] is not None 133 assert call_args[1]["stdout"] is not None 134 assert call_args[1]["stderr"] is not None 135 136 # Check NDJSON was written to stdin 137 mock_process.stdin.write.assert_called_once() 138 written_data = mock_process.stdin.write.call_args[0][0] 139 ndjson = json.loads(written_data.strip()) 140 assert ndjson["event"] == "request" 141 assert ndjson["prompt"] == "Test prompt" 142 assert ndjson["provider"] == "openai" 143 assert ndjson["name"] == "unified" 144 assert ndjson["model"] == GPT_5 145 146 # Check stdin was closed 147 mock_process.stdin.close.assert_called_once() 148 149 # Check agent was tracked 150 assert agent_id in cortex_service.running_agents 151 agent = cortex_service.running_agents[agent_id] 152 assert agent.agent_id == agent_id 153 assert agent.log_path == file_path 154 155 # Check monitoring threads were started 156 assert mock_thread.call_count == 2 # stdout and stderr 157 158 # Check timer was created and started 159 mock_timer.assert_called_once() 160 mock_timer_instance.start.assert_called_once() 161 162 163@patch("think.cortex.subprocess.Popen") 164@patch("think.cortex.threading.Thread") 165@patch("think.cortex.threading.Timer") 166def test_spawn_generator_via_subprocess( 167 mock_timer, mock_thread, mock_popen, cortex_service, mock_journal 168): 169 """Test spawning a generator subprocess via _spawn_subprocess.""" 170 mock_process = MagicMock() 171 mock_process.pid = 54321 172 mock_process.poll.return_value = None 173 mock_process.stdin = MagicMock() 174 mock_process.stdout = MagicMock() 175 mock_process.stderr = MagicMock() 176 mock_popen.return_value = mock_process 177 178 # Setup mock timer 179 mock_timer_instance = MagicMock() 180 mock_timer.return_value = mock_timer_instance 181 182 agent_id = "987654321" 183 file_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 184 185 # Generator config has "output" instead of "tools" 186 config = { 187 "event": "request", 188 "ts": 987654321, 189 "name": "activity", 190 "day": "20240101", 191 "output": "md", 192 } 193 194 # Generators route through _spawn_subprocess 195 cortex_service._spawn_subprocess( 196 agent_id, 197 file_path, 198 config, 199 ["sol", "agents"], 200 "agent", 201 ) 202 203 # Check subprocess was called with agents command (generators route through agents) 204 mock_popen.assert_called_once() 205 call_args = mock_popen.call_args 206 assert call_args[0][0] == ["sol", "agents"] 207 assert call_args[1]["stdin"] is not None 208 assert call_args[1]["stdout"] is not None 209 assert call_args[1]["stderr"] is not None 210 211 # Check NDJSON was written to stdin 212 mock_process.stdin.write.assert_called_once() 213 written_data = mock_process.stdin.write.call_args[0][0] 214 ndjson = json.loads(written_data.strip()) 215 assert ndjson["event"] == "request" 216 assert ndjson["name"] == "activity" 217 assert ndjson["day"] == "20240101" 218 assert ndjson["output"] == "md" 219 220 # Check stdin was closed 221 mock_process.stdin.close.assert_called_once() 222 223 # Check generator was tracked 224 assert agent_id in cortex_service.running_agents 225 agent = cortex_service.running_agents[agent_id] 226 assert agent.agent_id == agent_id 227 assert agent.log_path == file_path 228 229 # Check monitoring threads were started 230 assert mock_thread.call_count == 2 # stdout and stderr 231 232 # Check timer was created and started 233 mock_timer.assert_called_once() 234 mock_timer_instance.start.assert_called_once() 235 236 237def test_monitor_stdout_json_events(cortex_service, mock_journal): 238 """Test monitoring stdout with JSON events.""" 239 from io import StringIO 240 241 from think.cortex import AgentProcess 242 243 agent_id = "123456789" 244 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 245 246 mock_process = MagicMock() 247 mock_process.poll.return_value = 0 # Process exits 248 mock_process.stdout = StringIO( 249 '{"event": "start", "ts": 1234567890}\n' 250 '{"event": "finish", "ts": 1234567891, "result": "Done"}\n' 251 ) 252 253 agent = AgentProcess(agent_id, mock_process, log_path) 254 cortex_service.running_agents[agent_id] = agent 255 256 with patch.object(cortex_service, "_complete_agent_file") as mock_complete: 257 cortex_service._monitor_stdout(agent) 258 259 # Check events were written to file 260 assert log_path.exists() 261 lines = log_path.read_text().strip().split("\n") 262 assert len(lines) == 2 263 assert json.loads(lines[0])["event"] == "start" 264 assert json.loads(lines[1])["event"] == "finish" 265 266 # Check file was completed 267 mock_complete.assert_called_once_with(agent_id, log_path) 268 269 # Check agent was removed 270 assert agent_id not in cortex_service.running_agents 271 272 273def test_monitor_stdout_non_json_output(cortex_service, mock_journal): 274 """Test monitoring stdout with non-JSON output.""" 275 from io import StringIO 276 277 from think.cortex import AgentProcess 278 279 agent_id = "123456789" 280 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 281 282 mock_process = MagicMock() 283 mock_process.poll.return_value = 0 284 mock_process.stdout = StringIO( 285 'Plain text output\n{"event": "finish", "ts": 1234567890}\n' 286 ) 287 288 agent = AgentProcess(agent_id, mock_process, log_path) 289 cortex_service.running_agents[agent_id] = agent 290 291 with patch.object(cortex_service, "_complete_agent_file"): 292 cortex_service._monitor_stdout(agent) 293 294 # Check info event was created for non-JSON 295 lines = log_path.read_text().strip().split("\n") 296 assert len(lines) == 2 297 298 info_event = json.loads(lines[0]) 299 assert info_event["event"] == "info" 300 assert info_event["message"] == "Plain text output" 301 assert "ts" in info_event 302 303 304def test_monitor_stdout_no_finish_event(cortex_service, mock_journal): 305 """Test monitoring stdout when process exits without finish event.""" 306 from io import StringIO 307 308 from think.cortex import AgentProcess 309 310 agent_id = "123456789" 311 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 312 313 mock_process = MagicMock() 314 mock_process.wait.return_value = 1 # Non-zero exit 315 mock_process.stdout = StringIO('{"event": "start", "ts": 1234567890}\n') 316 317 agent = AgentProcess(agent_id, mock_process, log_path) 318 cortex_service.running_agents[agent_id] = agent 319 320 with patch.object(cortex_service, "_complete_agent_file"): 321 cortex_service._monitor_stdout(agent) 322 323 # Check error event was added 324 lines = log_path.read_text().strip().split("\n") 325 assert len(lines) == 2 326 327 error_event = json.loads(lines[1]) 328 assert error_event["event"] == "error" 329 assert "exit_code" in error_event 330 assert error_event["exit_code"] == 1 331 332 333def test_monitor_stderr(cortex_service, mock_journal): 334 """Test monitoring stderr for errors.""" 335 from io import StringIO 336 337 from think.cortex import AgentProcess 338 339 agent_id = "123456789" 340 log_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 341 342 mock_process = MagicMock() 343 mock_process.poll.return_value = 1 # Error exit 344 mock_process.stderr = StringIO( 345 "Error: Something went wrong\nStack trace line 1\nStack trace line 2\n" 346 ) 347 348 agent = AgentProcess(agent_id, mock_process, log_path) 349 350 cortex_service._monitor_stderr(agent) 351 352 # Check error event was written 353 assert log_path.exists() 354 lines = log_path.read_text().strip().split("\n") 355 assert len(lines) == 1 356 357 error_event = json.loads(lines[0]) 358 assert error_event["event"] == "error" 359 assert "trace" in error_event 360 assert "Error: Something went wrong" in error_event["trace"] 361 assert error_event["exit_code"] == 1 362 363 364def test_has_finish_event(cortex_service, mock_journal): 365 """Test checking for finish event in JSONL file.""" 366 file_path = mock_journal / "agents" / "test.jsonl" 367 368 # File with finish event 369 file_path.write_text( 370 '{"event": "start", "ts": 123}\n{"event": "finish", "ts": 124}\n' 371 ) 372 assert cortex_service._has_finish_event(file_path) is True 373 374 # File with error event 375 file_path.write_text( 376 '{"event": "start", "ts": 123}\n{"event": "error", "ts": 124}\n' 377 ) 378 assert cortex_service._has_finish_event(file_path) is True 379 380 # File without finish/error 381 file_path.write_text('{"event": "start", "ts": 123}\n') 382 assert cortex_service._has_finish_event(file_path) is False 383 384 # Empty file 385 file_path.write_text("") 386 assert cortex_service._has_finish_event(file_path) is False 387 388 389def test_complete_agent_file(cortex_service, mock_journal): 390 """Test completing an agent file (rename from active to completed).""" 391 agent_id = "123456789" 392 unified_dir = mock_journal / "agents" / "unified" 393 unified_dir.mkdir() 394 active_path = unified_dir / f"{agent_id}_active.jsonl" 395 active_path.touch() 396 cortex_service.agent_requests[agent_id] = {"name": "unified", "agent_id": agent_id} 397 398 cortex_service._complete_agent_file(agent_id, active_path) 399 400 # Check file was renamed 401 assert not active_path.exists() 402 completed_path = unified_dir / f"{agent_id}.jsonl" 403 assert completed_path.exists() 404 symlink_path = mock_journal / "agents" / "unified.log" 405 assert symlink_path.is_symlink() 406 assert os.readlink(symlink_path) == f"unified/{agent_id}.jsonl" 407 408 409def test_complete_agent_file_replaces_symlink(cortex_service, mock_journal): 410 """Test completing agent file replaces convenience symlink for same name.""" 411 unified_dir = mock_journal / "agents" / "unified" 412 unified_dir.mkdir() 413 414 first_agent_id = "111" 415 first_active_path = unified_dir / f"{first_agent_id}_active.jsonl" 416 first_active_path.touch() 417 cortex_service.agent_requests[first_agent_id] = {"name": "unified"} 418 419 cortex_service._complete_agent_file(first_agent_id, first_active_path) 420 421 second_agent_id = "222" 422 second_active_path = unified_dir / f"{second_agent_id}_active.jsonl" 423 second_active_path.touch() 424 cortex_service.agent_requests[second_agent_id] = {"name": "unified"} 425 426 cortex_service._complete_agent_file(second_agent_id, second_active_path) 427 428 symlink_path = mock_journal / "agents" / "unified.log" 429 assert symlink_path.is_symlink() 430 assert os.readlink(symlink_path) == f"unified/{second_agent_id}.jsonl" 431 432 433def test_complete_agent_file_colon_name(cortex_service, mock_journal): 434 """Test completing agent file sanitizes colon in convenience symlink name.""" 435 agent_id = "123456789" 436 entities_dir = mock_journal / "agents" / "entities--entity_assist" 437 entities_dir.mkdir() 438 active_path = entities_dir / f"{agent_id}_active.jsonl" 439 active_path.touch() 440 cortex_service.agent_requests[agent_id] = {"name": "entities:entity_assist"} 441 442 cortex_service._complete_agent_file(agent_id, active_path) 443 444 symlink_path = mock_journal / "agents" / "entities--entity_assist.log" 445 assert symlink_path.is_symlink() 446 assert os.readlink(symlink_path) == f"entities--entity_assist/{agent_id}.jsonl" 447 448 449def test_complete_agent_file_no_name(cortex_service, mock_journal): 450 """Test completing agent file skips symlink when request name is missing.""" 451 agent_id = "123456789" 452 active_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 453 active_path.touch() 454 455 cortex_service._complete_agent_file(agent_id, active_path) 456 457 completed_path = mock_journal / "agents" / f"{agent_id}.jsonl" 458 assert completed_path.exists() 459 assert not any(path.is_symlink() for path in (mock_journal / "agents").iterdir()) 460 461 462def test_write_error_and_complete(cortex_service, mock_journal): 463 """Test writing error and completing file.""" 464 agent_id = "123456789" 465 file_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 466 file_path.touch() 467 468 cortex_service._write_error_and_complete(file_path, "Test error message") 469 470 # Check error was written 471 completed_path = mock_journal / "agents" / f"{agent_id}.jsonl" 472 assert completed_path.exists() 473 assert not file_path.exists() 474 475 content = completed_path.read_text() 476 error_event = json.loads(content) 477 assert error_event["event"] == "error" 478 assert error_event["error"] == "Test error message" 479 assert "ts" in error_event 480 481 482def test_get_status(cortex_service): 483 """Test getting service status.""" 484 from think.cortex import AgentProcess 485 486 # Empty status 487 status = cortex_service.get_status() 488 assert status["running_agents"] == 0 489 assert status["agent_ids"] == [] 490 491 # Add running agents 492 mock_process = MagicMock() 493 agent1 = AgentProcess("111", mock_process, Path("/tmp/1.jsonl")) 494 agent2 = AgentProcess("222", mock_process, Path("/tmp/2.jsonl")) 495 496 cortex_service.running_agents["111"] = agent1 497 cortex_service.running_agents["222"] = agent2 498 499 status = cortex_service.get_status() 500 assert status["running_agents"] == 2 501 assert set(status["agent_ids"]) == {"111", "222"} 502 503 504def test_write_output(cortex_service, mock_journal): 505 """Test writing agent output using explicit output_path.""" 506 agent_id = "test_agent" 507 result = "This is the agent result content" 508 expected_path = mock_journal / "20240115" / "agents" / "my_agent.md" 509 config = {"output": "md", "name": "my_agent", "output_path": str(expected_path)} 510 511 cortex_service._write_output(agent_id, result, config) 512 513 assert expected_path.exists() 514 assert expected_path.read_text() == result 515 assert expected_path.parent.is_dir() 516 517 518def test_write_output_with_error(cortex_service, mock_journal, caplog): 519 """Test write output handles errors gracefully.""" 520 import logging 521 522 output_path = mock_journal / "20240115" / "agents" / "test.md" 523 with patch("builtins.open", side_effect=PermissionError("Cannot write")): 524 with caplog.at_level(logging.ERROR): 525 config = {"output": "md", "name": "test", "output_path": str(output_path)} 526 cortex_service._write_output("agent_id", "result", config) 527 528 # Check error was logged but didn't raise 529 assert "Failed to write agent agent_id output" in caplog.text 530 531 532def test_write_output_missing_path_skips(cortex_service, mock_journal, caplog): 533 """Test write output skips when output_path is missing.""" 534 config = {"output": "md", "name": "test"} 535 cortex_service._write_output("agent_id", "result", config) 536 537 # No output written, no error — silent skip is expected 538 assert "Failed to write" not in caplog.text 539 540 541def test_write_output_with_day_parameter(cortex_service, mock_journal): 542 """Test writing agent output to a specific day directory.""" 543 agent_id = "test_agent" 544 result = "This is the agent result content" 545 specified_day = "20240201" 546 expected_path = mock_journal / specified_day / "agents" / "reporter.md" 547 config = { 548 "output": "md", 549 "name": "reporter", 550 "day": specified_day, 551 "output_path": str(expected_path), 552 } 553 554 cortex_service._write_output(agent_id, result, config) 555 556 assert expected_path.exists() 557 assert expected_path.read_text() == result 558 assert expected_path.parent.is_dir() 559 560 561def test_write_output_with_segment(cortex_service, mock_journal): 562 """Test writing segment agent output to segment agents directory.""" 563 agent_id = "segment_agent" 564 result = "Segment analysis content" 565 expected_path = mock_journal / "20240115" / "143000_600" / "agents" / "analyzer.md" 566 config = { 567 "output": "md", 568 "name": "analyzer", 569 "segment": "143000_600", 570 "output_path": str(expected_path), 571 } 572 573 cortex_service._write_output(agent_id, result, config) 574 575 assert expected_path.exists() 576 assert expected_path.read_text() == result 577 578 579def test_write_output_json_format(cortex_service, mock_journal): 580 """Test writing agent output in JSON format.""" 581 agent_id = "json_agent" 582 result = '{"key": "value"}' 583 expected_path = mock_journal / "20240115" / "agents" / "data_agent.json" 584 config = { 585 "output": "json", 586 "name": "data_agent", 587 "output_path": str(expected_path), 588 } 589 590 cortex_service._write_output(agent_id, result, config) 591 592 assert expected_path.exists() 593 assert expected_path.read_text() == result 594 595 596def test_monitor_stdout_with_output(cortex_service, mock_journal): 597 """Test monitor_stdout writes output when output_path is present.""" 598 from think.cortex import AgentProcess 599 600 agent_id = "output_test" 601 active_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 602 output_path = mock_journal / "20240115" / "agents" / "test_agent.md" 603 604 # Store request with explicit output_path 605 cortex_service.agent_requests = { 606 agent_id: { 607 "event": "request", 608 "prompt": "test", 609 "output": "md", 610 "name": "test_agent", 611 "output_path": str(output_path), 612 } 613 } 614 615 mock_process = MagicMock() 616 mock_stdout = [ 617 '{"event": "start", "ts": 1000}\n', 618 '{"event": "finish", "ts": 2000, "result": "Test result"}\n', 619 ] 620 mock_process.stdout = MockPipe(mock_stdout) 621 mock_process.wait.return_value = 0 622 623 agent = AgentProcess(agent_id, mock_process, active_path) 624 625 with patch.object(cortex_service, "_complete_agent_file"): 626 with patch.object(cortex_service, "_has_finish_event", return_value=True): 627 cortex_service._monitor_stdout(agent) 628 629 assert output_path.exists() 630 assert output_path.read_text() == "Test result" 631 632 633def test_monitor_stdout_with_output_and_day(cortex_service, mock_journal): 634 """Test monitor_stdout writes output to specific day via output_path.""" 635 from think.cortex import AgentProcess 636 637 agent_id = "output_day_test" 638 active_path = mock_journal / "agents" / f"{agent_id}_active.jsonl" 639 specified_day = "20240220" 640 output_path = mock_journal / specified_day / "agents" / "daily_reporter.md" 641 642 # Store request with explicit output_path and day 643 cortex_service.agent_requests = { 644 agent_id: { 645 "event": "request", 646 "prompt": "test", 647 "output": "md", 648 "name": "daily_reporter", 649 "day": specified_day, 650 "output_path": str(output_path), 651 } 652 } 653 654 mock_process = MagicMock() 655 mock_stdout = [ 656 '{"event": "start", "ts": 1000}\n', 657 '{"event": "finish", "ts": 2000, "result": "Daily report content"}\n', 658 ] 659 mock_process.stdout = MockPipe(mock_stdout) 660 mock_process.wait.return_value = 0 661 662 agent = AgentProcess(agent_id, mock_process, active_path) 663 664 with patch.object(cortex_service, "_complete_agent_file"): 665 with patch.object(cortex_service, "_has_finish_event", return_value=True): 666 cortex_service._monitor_stdout(agent) 667 668 assert output_path.exists() 669 assert output_path.read_text() == "Daily report content" 670 671 672def test_recover_orphaned_agents(cortex_service, mock_journal): 673 """Test recovery of orphaned active agent files.""" 674 # Create orphaned active files 675 agents_dir = mock_journal / "agents" 676 unified_dir = agents_dir / "unified" 677 unified_dir.mkdir() 678 agent1_active = unified_dir / "111_active.jsonl" 679 agent2_active = unified_dir / "222_active.jsonl" 680 681 agent1_active.write_text('{"event": "start", "ts": 1000}\n') 682 agent2_active.write_text('{"event": "start", "ts": 2000}\n') 683 684 active_files = [agent1_active, agent2_active] 685 cortex_service._recover_orphaned_agents(active_files) 686 687 # Check active files were renamed to completed 688 assert not agent1_active.exists() 689 assert not agent2_active.exists() 690 assert (unified_dir / "111.jsonl").exists() 691 assert (unified_dir / "222.jsonl").exists() 692 693 # Check error events were appended 694 content1 = (unified_dir / "111.jsonl").read_text() 695 lines1 = content1.strip().split("\n") 696 assert len(lines1) == 2 697 error_event = json.loads(lines1[1]) 698 assert error_event["event"] == "error" 699 assert "Recovered" in error_event["error"] 700 assert error_event["agent_id"] == "111" 701 702 content2 = (unified_dir / "222.jsonl").read_text() 703 lines2 = content2.strip().split("\n") 704 assert len(lines2) == 2 705 assert json.loads(lines2[1])["event"] == "error"