personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for cortex_client module with Callosum."""
5
6import json
7import shutil
8import tempfile
9import threading
10import time
11from pathlib import Path
12
13import pytest
14
15from think.callosum import CallosumConnection, CallosumServer
16from think.cortex_client import (
17 cortex_agents,
18 cortex_request,
19 get_agent_end_state,
20 get_agent_log_status,
21 wait_for_agents,
22)
23from think.models import GPT_5
24from think.utils import now_ms
25
26
27@pytest.fixture
28def callosum_server(monkeypatch):
29 """Start a Callosum server for testing.
30
31 Uses a short temp path in /tmp to avoid Unix socket path length limits
32 (~104 chars on macOS). pytest's tmp_path creates paths that are too long.
33 """
34 # Create short temp dir to avoid Unix socket path length limits
35 tmp_dir = tempfile.mkdtemp(dir="/tmp", prefix="callosum_")
36 tmp_path = Path(tmp_dir)
37
38 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
39 (tmp_path / "agents").mkdir(parents=True, exist_ok=True)
40
41 server = CallosumServer()
42 server_thread = threading.Thread(target=server.start, daemon=True)
43 server_thread.start()
44
45 # Wait for server to be ready
46 socket_path = tmp_path / "health" / "callosum.sock"
47 for _ in range(50):
48 if socket_path.exists():
49 break
50 time.sleep(0.1)
51 else:
52 pytest.fail("Callosum server did not start in time")
53
54 yield tmp_path
55
56 server.stop()
57 server_thread.join(timeout=2)
58 shutil.rmtree(tmp_dir, ignore_errors=True)
59
60
61@pytest.fixture
62def callosum_listener(callosum_server):
63 """Provide a CallosumConnection listener that collects received messages.
64
65 Yields (messages, listener) where messages is a list that accumulates
66 all broadcast messages received during the test.
67 """
68 messages = []
69
70 def callback(msg):
71 messages.append(msg)
72
73 listener = CallosumConnection()
74 listener.start(callback=callback)
75 time.sleep(0.1) # Allow connection to establish
76
77 yield messages
78
79 listener.stop()
80
81
82def test_cortex_request_broadcasts_to_callosum(callosum_listener):
83 """Test that cortex_request broadcasts request to Callosum."""
84 messages = callosum_listener
85
86 # Create a request
87 agent_id = cortex_request(
88 prompt="Test prompt",
89 name="unified",
90 provider="openai",
91 config={"model": GPT_5},
92 )
93
94 time.sleep(0.2)
95
96 # Verify broadcast was received
97 assert len(messages) == 1
98 msg = messages[0]
99 assert msg["tract"] == "cortex"
100 assert msg["event"] == "request"
101 assert msg["prompt"] == "Test prompt"
102 assert msg["name"] == "unified"
103 assert msg["provider"] == "openai"
104 assert msg["model"] == GPT_5
105 assert msg["agent_id"] == agent_id
106 assert "ts" in msg
107
108
109def test_cortex_request_returns_agent_id(callosum_server):
110 """Test that cortex_request returns agent_id string."""
111 _ = callosum_server # Needed for side effects only
112
113 agent_id = cortex_request(prompt="Test", name="unified", provider="openai")
114
115 # Verify agent_id is a string timestamp
116 assert isinstance(agent_id, str)
117 assert agent_id.isdigit()
118 assert len(agent_id) == 13 # Millisecond timestamp
119
120
121def test_cortex_request_unique_agent_ids(callosum_server):
122 """Test that cortex_request generates unique agent IDs."""
123 _ = callosum_server # Needed for side effects only
124
125 agent_ids = []
126 for i in range(3):
127 agent_id = cortex_request(prompt=f"Test {i}", name="unified", provider="openai")
128 agent_ids.append(agent_id)
129 time.sleep(0.002)
130
131 # All agent IDs should be unique
132 assert len(set(agent_ids)) == 3
133
134
135def test_cortex_request_returns_none_on_send_failure(callosum_server, monkeypatch):
136 """Test cortex_request returns None when callosum_send fails."""
137 monkeypatch.setattr("think.cortex_client.callosum_send", lambda *a, **kw: False)
138
139 agent_id = cortex_request(prompt="Test", name="unified", provider="openai")
140
141 assert agent_id is None
142
143
144def test_cortex_request_empty_journal(tmp_path, monkeypatch):
145 """Test cortex_request works with an empty journal directory."""
146 monkeypatch.setattr("think.cortex_client.callosum_send", lambda *a, **kw: True)
147 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
148
149 agent_id = cortex_request("test", "unified", "openai")
150 assert agent_id is not None
151 assert len(agent_id) > 0
152
153
154# Tests for cortex_agents remain mostly unchanged as they read from files
155
156
157def test_cortex_agents_empty(tmp_path, monkeypatch):
158 """Test cortex_agents with no agents."""
159 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
160
161 result = cortex_agents()
162
163 assert result["agents"] == []
164 assert result["pagination"]["total"] == 0
165 assert result["pagination"]["has_more"] is False
166 assert result["live_count"] == 0
167 assert result["historical_count"] == 0
168
169
170def test_cortex_agents_with_active(tmp_path, monkeypatch):
171 """Test cortex_agents with active (running) agents."""
172 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
173 agents_dir = tmp_path / "agents"
174 agents_dir.mkdir()
175
176 # Create active agent files
177 ts1 = now_ms()
178 ts2 = ts1 + 1000
179
180 unified_dir = agents_dir / "unified"
181 tester_dir = agents_dir / "tester"
182 unified_dir.mkdir()
183 tester_dir.mkdir()
184
185 active_file1 = unified_dir / f"{ts1}_active.jsonl"
186 with open(active_file1, "w") as f:
187 json.dump(
188 {
189 "event": "request",
190 "ts": ts1,
191 "prompt": "Task 1",
192 "name": "unified",
193 "provider": "openai",
194 },
195 f,
196 )
197 f.write("\n")
198
199 active_file2 = tester_dir / f"{ts2}_active.jsonl"
200 with open(active_file2, "w") as f:
201 json.dump(
202 {
203 "event": "request",
204 "ts": ts2,
205 "prompt": "Task 2",
206 "name": "tester",
207 "provider": "google",
208 },
209 f,
210 )
211 f.write("\n")
212
213 result = cortex_agents()
214
215 assert len(result["agents"]) == 2
216 assert result["live_count"] == 2
217 assert result["historical_count"] == 0
218
219
220def test_cortex_agents_with_completed(tmp_path, monkeypatch):
221 """Test cortex_agents with completed (historical) agents."""
222 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
223 agents_dir = tmp_path / "agents"
224 agents_dir.mkdir()
225
226 # Create completed agent files
227 ts1 = now_ms()
228 reviewer_dir = agents_dir / "reviewer"
229 reviewer_dir.mkdir()
230
231 completed_file1 = reviewer_dir / f"{ts1}.jsonl"
232 with open(completed_file1, "w") as f:
233 json.dump(
234 {
235 "event": "request",
236 "ts": ts1,
237 "prompt": "Old task",
238 "name": "reviewer",
239 "provider": "anthropic",
240 },
241 f,
242 )
243 f.write("\n")
244 json.dump({"event": "finish", "ts": ts1 + 100, "result": "Done"}, f)
245 f.write("\n")
246
247 result = cortex_agents()
248
249 assert len(result["agents"]) == 1
250 assert result["live_count"] == 0
251 assert result["historical_count"] == 1
252 assert result["agents"][0]["status"] == "completed"
253
254
255def test_cortex_agents_pagination(tmp_path, monkeypatch):
256 """Test cortex_agents pagination."""
257 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
258 agents_dir = tmp_path / "agents"
259 agents_dir.mkdir()
260
261 # Create multiple agents
262 base_ts = now_ms()
263 unified_dir = agents_dir / "unified"
264 unified_dir.mkdir()
265 for i in range(5):
266 ts = base_ts + (i * 1000)
267 file = unified_dir / f"{ts}.jsonl"
268 with open(file, "w") as f:
269 json.dump(
270 {
271 "event": "request",
272 "ts": ts,
273 "prompt": f"Task {i}",
274 "name": "unified",
275 },
276 f,
277 )
278 f.write("\n")
279
280 # Test limit
281 result = cortex_agents(limit=2)
282 assert len(result["agents"]) == 2
283 assert result["pagination"]["limit"] == 2
284 assert result["pagination"]["total"] == 5
285 assert result["pagination"]["has_more"] is True
286
287
288def test_cortex_agents_empty_journal(tmp_path, monkeypatch):
289 """Test cortex_agents works with an empty journal directory."""
290 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
291
292 result = cortex_agents()
293 assert "agents" in result
294 assert "pagination" in result
295 assert isinstance(result["agents"], list)
296
297
298def test_get_agent_log_status_completed(tmp_path, monkeypatch):
299 """Test get_agent_log_status returns 'completed' for finished agents."""
300 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
301 agents_dir = tmp_path / "agents"
302 agents_dir.mkdir()
303 unified_dir = agents_dir / "unified"
304 unified_dir.mkdir()
305
306 agent_id = "1234567890123"
307 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n')
308
309 assert get_agent_log_status(agent_id) == "completed"
310
311
312def test_get_agent_log_status_running(tmp_path, monkeypatch):
313 """Test get_agent_log_status returns 'running' for active agents."""
314 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
315 agents_dir = tmp_path / "agents"
316 agents_dir.mkdir()
317 unified_dir = agents_dir / "unified"
318 unified_dir.mkdir()
319
320 agent_id = "1234567890123"
321 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n')
322
323 assert get_agent_log_status(agent_id) == "running"
324
325
326def test_get_agent_log_status_not_found(tmp_path, monkeypatch):
327 """Test get_agent_log_status returns 'not_found' for missing agents."""
328 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
329 (tmp_path / "agents").mkdir()
330
331 assert get_agent_log_status("nonexistent") == "not_found"
332
333
334def test_get_agent_log_status_prefers_completed(tmp_path, monkeypatch):
335 """Test get_agent_log_status returns 'completed' when both files exist."""
336 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
337 agents_dir = tmp_path / "agents"
338 agents_dir.mkdir()
339 unified_dir = agents_dir / "unified"
340 unified_dir.mkdir()
341
342 # Edge case: both files exist (shouldn't happen, but check precedence)
343 agent_id = "1234567890123"
344 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n')
345 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n')
346
347 assert get_agent_log_status(agent_id) == "completed"
348
349
350def test_get_agent_end_state_finish(tmp_path, monkeypatch):
351 """Test get_agent_end_state returns 'finish' for successful agents."""
352 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
353 agents_dir = tmp_path / "agents"
354 agents_dir.mkdir()
355 unified_dir = agents_dir / "unified"
356 unified_dir.mkdir()
357
358 agent_id = "1234567890123"
359 (unified_dir / f"{agent_id}.jsonl").write_text(
360 '{"event": "request", "prompt": "hello"}\n'
361 '{"event": "finish", "result": "done"}\n'
362 )
363
364 assert get_agent_end_state(agent_id) == "finish"
365
366
367def test_get_agent_end_state_error(tmp_path, monkeypatch):
368 """Test get_agent_end_state returns 'error' for failed agents."""
369 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
370 agents_dir = tmp_path / "agents"
371 agents_dir.mkdir()
372 unified_dir = agents_dir / "unified"
373 unified_dir.mkdir()
374
375 agent_id = "1234567890123"
376 (unified_dir / f"{agent_id}.jsonl").write_text(
377 '{"event": "request", "prompt": "hello"}\n'
378 '{"event": "error", "error": "something went wrong"}\n'
379 )
380
381 assert get_agent_end_state(agent_id) == "error"
382
383
384def test_get_agent_end_state_running(tmp_path, monkeypatch):
385 """Test get_agent_end_state returns 'running' for active agents."""
386 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
387 agents_dir = tmp_path / "agents"
388 agents_dir.mkdir()
389 unified_dir = agents_dir / "unified"
390 unified_dir.mkdir()
391
392 agent_id = "1234567890123"
393 (unified_dir / f"{agent_id}_active.jsonl").write_text(
394 '{"event": "request", "prompt": "hello"}\n'
395 )
396
397 assert get_agent_end_state(agent_id) == "running"
398
399
400def test_get_agent_end_state_unknown(tmp_path, monkeypatch):
401 """Test get_agent_end_state returns 'unknown' for missing agents."""
402 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
403 (tmp_path / "agents").mkdir()
404
405 assert get_agent_end_state("nonexistent") == "unknown"
406
407
408# Tests for wait_for_agents
409
410
411def test_wait_for_agents_already_complete(tmp_path, monkeypatch):
412 """Test wait_for_agents returns immediately if agents already completed."""
413 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
414 agents_dir = tmp_path / "agents"
415 agents_dir.mkdir()
416 unified_dir = agents_dir / "unified"
417 unified_dir.mkdir()
418 (tmp_path / "health").mkdir()
419
420 # Create completed agents
421 agent_ids = ["1000", "2000"]
422 for agent_id in agent_ids:
423 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n')
424
425 completed, timed_out = wait_for_agents(agent_ids, timeout=1)
426
427 assert set(completed.keys()) == set(agent_ids)
428 assert all(v == "finish" for v in completed.values())
429 assert timed_out == []
430
431
432def test_wait_for_agents_event_completion(callosum_server):
433 """Test wait_for_agents completes when finish event is received."""
434 tmp_path = callosum_server
435 agents_dir = tmp_path / "agents"
436 unified_dir = agents_dir / "unified"
437 unified_dir.mkdir(exist_ok=True)
438
439 agent_id = "1234567890123"
440
441 # Start wait in background thread
442 result = {"completed": None, "timed_out": None}
443
444 def wait_thread():
445 result["completed"], result["timed_out"] = wait_for_agents(
446 [agent_id], timeout=5
447 )
448
449 waiter = threading.Thread(target=wait_thread)
450 waiter.start()
451
452 # Give the waiter time to set up listener
453 time.sleep(0.2)
454
455 # Create the completed file and emit finish event
456 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n')
457
458 # Emit finish event via Callosum
459 client = CallosumConnection()
460 client.start()
461 time.sleep(0.1)
462 client.emit("cortex", "finish", agent_id=agent_id, result="done")
463 time.sleep(0.2)
464 client.stop()
465
466 waiter.join(timeout=3)
467
468 assert result["completed"] == {agent_id: "finish"}
469 assert result["timed_out"] == []
470
471
472def test_wait_for_agents_error_event(callosum_server):
473 """Test wait_for_agents completes on error event too."""
474 tmp_path = callosum_server
475 agents_dir = tmp_path / "agents"
476 unified_dir = agents_dir / "unified"
477 unified_dir.mkdir(exist_ok=True)
478
479 agent_id = "1234567890124"
480
481 result = {"completed": None, "timed_out": None}
482
483 def wait_thread():
484 result["completed"], result["timed_out"] = wait_for_agents(
485 [agent_id], timeout=5
486 )
487
488 waiter = threading.Thread(target=wait_thread)
489 waiter.start()
490 time.sleep(0.2)
491
492 # Create completed file and emit error event
493 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "error"}\n')
494
495 client = CallosumConnection()
496 client.start()
497 time.sleep(0.1)
498 client.emit("cortex", "error", agent_id=agent_id, error="something failed")
499 time.sleep(0.2)
500 client.stop()
501
502 waiter.join(timeout=3)
503
504 assert result["completed"] == {agent_id: "error"}
505 assert result["timed_out"] == []
506
507
508def test_wait_for_agents_initial_file_check(tmp_path, monkeypatch):
509 """Test wait_for_agents finds already-completed agents via initial file check."""
510 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
511 agents_dir = tmp_path / "agents"
512 agents_dir.mkdir()
513 unified_dir = agents_dir / "unified"
514 unified_dir.mkdir()
515 (tmp_path / "health").mkdir()
516
517 agent_id = "1234567890125"
518
519 # Agent already completed before we start waiting
520 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n')
521
522 completed, timed_out = wait_for_agents([agent_id], timeout=1)
523
524 # Should find via initial file check
525 assert completed == {agent_id: "finish"}
526 assert timed_out == []
527
528
529def test_wait_for_agents_timeout_actual(tmp_path, monkeypatch):
530 """Test wait_for_agents times out for agents that never complete."""
531 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
532 agents_dir = tmp_path / "agents"
533 agents_dir.mkdir()
534 unified_dir = agents_dir / "unified"
535 unified_dir.mkdir()
536 (tmp_path / "health").mkdir()
537
538 agent_id = "1234567890126"
539 # Create active file (not completed)
540 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n')
541
542 completed, timed_out = wait_for_agents([agent_id], timeout=1)
543
544 assert completed == {}
545 assert timed_out == [agent_id]
546
547
548def test_wait_for_agents_partial(callosum_server):
549 """Test wait_for_agents with some completing and some timing out."""
550 tmp_path = callosum_server
551 agents_dir = tmp_path / "agents"
552 unified_dir = agents_dir / "unified"
553 unified_dir.mkdir(exist_ok=True)
554
555 completing_agent = "1111"
556 timeout_agent = "2222"
557
558 # Create active file for timeout agent
559 (unified_dir / f"{timeout_agent}_active.jsonl").write_text('{"event": "start"}\n')
560
561 result = {"completed": None, "timed_out": None}
562
563 def wait_thread():
564 result["completed"], result["timed_out"] = wait_for_agents(
565 [completing_agent, timeout_agent], timeout=2
566 )
567
568 waiter = threading.Thread(target=wait_thread)
569 waiter.start()
570 time.sleep(0.2)
571
572 # Complete one agent
573 (unified_dir / f"{completing_agent}.jsonl").write_text('{"event": "finish"}\n')
574
575 client = CallosumConnection()
576 client.start()
577 time.sleep(0.1)
578 client.emit("cortex", "finish", agent_id=completing_agent, result="done")
579 time.sleep(0.1)
580 client.stop()
581
582 waiter.join(timeout=5)
583
584 assert result["completed"] == {completing_agent: "finish"}
585 assert result["timed_out"] == [timeout_agent]
586
587
588def test_wait_for_agents_missed_event_recovery(tmp_path, monkeypatch, caplog):
589 """Test that missed events are recovered via final file check with INFO log."""
590 import logging
591
592 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path))
593 agents_dir = tmp_path / "agents"
594 agents_dir.mkdir()
595 unified_dir = agents_dir / "unified"
596 unified_dir.mkdir()
597 (tmp_path / "health").mkdir()
598
599 agent_id = "1234567890127"
600
601 # Start with active file
602 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n')
603
604 result = {"completed": None, "timed_out": None}
605
606 def wait_and_complete():
607 # Wait a bit then "complete" the agent by renaming file
608 time.sleep(0.3)
609 (unified_dir / f"{agent_id}_active.jsonl").unlink()
610 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n')
611
612 completer = threading.Thread(target=wait_and_complete)
613 completer.start()
614
615 with caplog.at_level(logging.INFO):
616 result["completed"], result["timed_out"] = wait_for_agents(
617 [agent_id], timeout=1
618 )
619
620 completer.join()
621
622 # Should recover via final file check
623 assert result["completed"] == {agent_id: "finish"}
624 assert result["timed_out"] == []
625
626 # Should log about missed event
627 assert any(
628 "completion event not received but agent completed" in record.message
629 for record in caplog.records
630 )