personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Integration tests for the Cortex agent system with Callosum."""
5
6import json
7import os
8import threading
9import time
10
11import pytest
12
13from think.callosum import CallosumServer
14from think.cortex import CortexService
15from think.cortex_client import cortex_agents, cortex_request
16from think.utils import now_ms
17
18
19@pytest.fixture
20def callosum_server(integration_journal_path):
21 """Start a Callosum server for integration testing."""
22 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path)
23
24 server = CallosumServer()
25 server_thread = threading.Thread(target=server.start, daemon=True)
26 server_thread.start()
27
28 # Wait for server to be ready
29 socket_path = integration_journal_path / "health" / "callosum.sock"
30 for _ in range(50):
31 if socket_path.exists():
32 break
33 time.sleep(0.1)
34 else:
35 pytest.fail("Callosum server did not start in time")
36
37 yield server
38
39 server.stop()
40 server_thread.join(timeout=2)
41
42
43@pytest.mark.integration
44def test_cortex_service_startup(integration_journal_path, callosum_server):
45 """Test that Cortex service starts up and creates necessary directories."""
46 # Initialize the service
47 cortex = CortexService(journal_path=str(integration_journal_path))
48
49 # Verify agents directory was created
50 agents_dir = integration_journal_path / "agents"
51 assert agents_dir.exists()
52 assert agents_dir.is_dir()
53
54 # Verify service initializes correctly
55 status = cortex.get_status()
56 assert status["running_agents"] == 0
57 assert status["agent_ids"] == []
58
59
60@pytest.mark.integration
61def test_cortex_request_creation(integration_journal_path, callosum_server):
62 """Test creating a Cortex agent request via Callosum."""
63 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path)
64
65 # Listen for broadcasts
66 received_messages = []
67
68 def callback(msg):
69 received_messages.append(msg)
70
71 from think.callosum import CallosumConnection
72
73 listener = CallosumConnection()
74 listener.start(callback=callback)
75 time.sleep(0.1)
76
77 # Create a request
78 agent_id = cortex_request(prompt="Test prompt", name="default", provider="openai")
79
80 time.sleep(0.2)
81
82 # Verify request was broadcast
83 assert len(received_messages) >= 1
84 request = [m for m in received_messages if m.get("event") == "request"][0]
85 assert request["prompt"] == "Test prompt"
86 assert request["name"] == "default"
87 assert request["provider"] == "openai"
88 assert request["agent_id"] == agent_id
89
90 listener.stop()
91
92
93@pytest.mark.integration
94def test_cortex_end_to_end_with_echo_agent(integration_journal_path, callosum_server):
95 """Test end-to-end Cortex flow with a simple echo agent."""
96 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path)
97
98 # Create a mock agent script that just echoes
99 agents_dir = integration_journal_path / "agents"
100 agents_dir.mkdir(parents=True, exist_ok=True)
101
102 # Start Cortex service in background
103 cortex = CortexService(journal_path=str(integration_journal_path))
104 service_thread = threading.Thread(target=cortex.start, daemon=True)
105 service_thread.start()
106
107 time.sleep(0.5) # Let service start
108
109 # Collect events
110 received_events = []
111
112 def callback(message):
113 # Filter for cortex tract
114 if message.get("tract") != "cortex":
115 return
116 received_events.append(message)
117
118 # Start watching with CallosumConnection
119 from think.callosum import CallosumConnection
120
121 watcher = CallosumConnection()
122 watcher.start(callback=callback)
123
124 time.sleep(0.2)
125
126 # Make a request (this will fail because no real agent, but we can verify the flow)
127 agent_id = cortex_request(
128 prompt="Test end-to-end", name="default", provider="openai"
129 )
130
131 # Wait for at least request event
132 time.sleep(1.0)
133
134 # Should have received the request event
135 request_events = [e for e in received_events if e.get("event") == "request"]
136 assert len(request_events) >= 1
137 assert request_events[0]["agent_id"] == agent_id
138
139 watcher.stop()
140 cortex.stop()
141
142
143@pytest.mark.integration
144def test_cortex_agents_listing(integration_journal_path):
145 """Test listing agents from the cortex_agents function."""
146 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path)
147
148 # Create some test agent files
149 agents_dir = integration_journal_path / "agents"
150 agents_dir.mkdir(parents=True, exist_ok=True)
151
152 # Get initial count
153 initial_result = cortex_agents()
154 initial_count = len(initial_result["agents"])
155
156 ts = now_ms()
157
158 # Create completed agent (inside a date subdirectory, matching cortex layout)
159 agent_subdir = agents_dir / "20260214"
160 agent_subdir.mkdir(parents=True, exist_ok=True)
161 completed_file = agent_subdir / f"{ts}.jsonl"
162 with open(completed_file, "w") as f:
163 json.dump(
164 {
165 "event": "request",
166 "ts": ts,
167 "prompt": "Test",
168 "name": "default",
169 "provider": "openai",
170 },
171 f,
172 )
173 f.write("\n")
174 json.dump({"event": "finish", "ts": ts + 100, "result": "Done"}, f)
175 f.write("\n")
176
177 # List agents
178 result = cortex_agents()
179
180 # Should have one more than before
181 assert len(result["agents"]) == initial_count + 1
182
183 # Find our agent
184 our_agent = [a for a in result["agents"] if a["id"] == str(ts)][0]
185 assert our_agent["status"] == "completed"
186 assert our_agent["prompt"] == "Test"
187
188
189@pytest.mark.integration
190def test_cortex_error_handling(integration_journal_path, callosum_server):
191 """Test that Cortex handles errors gracefully."""
192 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path)
193
194 # Listen for events
195 received_events = []
196
197 def callback(msg):
198 received_events.append(msg)
199
200 from think.callosum import CallosumConnection
201
202 listener = CallosumConnection()
203 listener.start(callback=callback)
204 time.sleep(0.1)
205
206 # Make a request
207 cortex_request(
208 prompt="Test error handling",
209 name="nonexistent_agent", # This may cause issues
210 provider="openai",
211 )
212
213 time.sleep(0.2)
214
215 # Should have at least received the request
216 request_events = [e for e in received_events if e.get("event") == "request"]
217 assert len(request_events) >= 1
218
219 listener.stop()