personal memory agent
at main 219 lines 6.5 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Integration tests for the Cortex agent system with Callosum.""" 5 6import json 7import os 8import threading 9import time 10 11import pytest 12 13from think.callosum import CallosumServer 14from think.cortex import CortexService 15from think.cortex_client import cortex_agents, cortex_request 16from think.utils import now_ms 17 18 19@pytest.fixture 20def callosum_server(integration_journal_path): 21 """Start a Callosum server for integration testing.""" 22 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path) 23 24 server = CallosumServer() 25 server_thread = threading.Thread(target=server.start, daemon=True) 26 server_thread.start() 27 28 # Wait for server to be ready 29 socket_path = integration_journal_path / "health" / "callosum.sock" 30 for _ in range(50): 31 if socket_path.exists(): 32 break 33 time.sleep(0.1) 34 else: 35 pytest.fail("Callosum server did not start in time") 36 37 yield server 38 39 server.stop() 40 server_thread.join(timeout=2) 41 42 43@pytest.mark.integration 44def test_cortex_service_startup(integration_journal_path, callosum_server): 45 """Test that Cortex service starts up and creates necessary directories.""" 46 # Initialize the service 47 cortex = CortexService(journal_path=str(integration_journal_path)) 48 49 # Verify agents directory was created 50 agents_dir = integration_journal_path / "agents" 51 assert agents_dir.exists() 52 assert agents_dir.is_dir() 53 54 # Verify service initializes correctly 55 status = cortex.get_status() 56 assert status["running_agents"] == 0 57 assert status["agent_ids"] == [] 58 59 60@pytest.mark.integration 61def test_cortex_request_creation(integration_journal_path, callosum_server): 62 """Test creating a Cortex agent request via Callosum.""" 63 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path) 64 65 # Listen for broadcasts 66 received_messages = [] 67 68 def callback(msg): 69 received_messages.append(msg) 70 71 from think.callosum import CallosumConnection 72 73 listener = CallosumConnection() 74 listener.start(callback=callback) 75 time.sleep(0.1) 76 77 # Create a request 78 agent_id = cortex_request(prompt="Test prompt", name="default", provider="openai") 79 80 time.sleep(0.2) 81 82 # Verify request was broadcast 83 assert len(received_messages) >= 1 84 request = [m for m in received_messages if m.get("event") == "request"][0] 85 assert request["prompt"] == "Test prompt" 86 assert request["name"] == "default" 87 assert request["provider"] == "openai" 88 assert request["agent_id"] == agent_id 89 90 listener.stop() 91 92 93@pytest.mark.integration 94def test_cortex_end_to_end_with_echo_agent(integration_journal_path, callosum_server): 95 """Test end-to-end Cortex flow with a simple echo agent.""" 96 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path) 97 98 # Create a mock agent script that just echoes 99 agents_dir = integration_journal_path / "agents" 100 agents_dir.mkdir(parents=True, exist_ok=True) 101 102 # Start Cortex service in background 103 cortex = CortexService(journal_path=str(integration_journal_path)) 104 service_thread = threading.Thread(target=cortex.start, daemon=True) 105 service_thread.start() 106 107 time.sleep(0.5) # Let service start 108 109 # Collect events 110 received_events = [] 111 112 def callback(message): 113 # Filter for cortex tract 114 if message.get("tract") != "cortex": 115 return 116 received_events.append(message) 117 118 # Start watching with CallosumConnection 119 from think.callosum import CallosumConnection 120 121 watcher = CallosumConnection() 122 watcher.start(callback=callback) 123 124 time.sleep(0.2) 125 126 # Make a request (this will fail because no real agent, but we can verify the flow) 127 agent_id = cortex_request( 128 prompt="Test end-to-end", name="default", provider="openai" 129 ) 130 131 # Wait for at least request event 132 time.sleep(1.0) 133 134 # Should have received the request event 135 request_events = [e for e in received_events if e.get("event") == "request"] 136 assert len(request_events) >= 1 137 assert request_events[0]["agent_id"] == agent_id 138 139 watcher.stop() 140 cortex.stop() 141 142 143@pytest.mark.integration 144def test_cortex_agents_listing(integration_journal_path): 145 """Test listing agents from the cortex_agents function.""" 146 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path) 147 148 # Create some test agent files 149 agents_dir = integration_journal_path / "agents" 150 agents_dir.mkdir(parents=True, exist_ok=True) 151 152 # Get initial count 153 initial_result = cortex_agents() 154 initial_count = len(initial_result["agents"]) 155 156 ts = now_ms() 157 158 # Create completed agent (inside a date subdirectory, matching cortex layout) 159 agent_subdir = agents_dir / "20260214" 160 agent_subdir.mkdir(parents=True, exist_ok=True) 161 completed_file = agent_subdir / f"{ts}.jsonl" 162 with open(completed_file, "w") as f: 163 json.dump( 164 { 165 "event": "request", 166 "ts": ts, 167 "prompt": "Test", 168 "name": "default", 169 "provider": "openai", 170 }, 171 f, 172 ) 173 f.write("\n") 174 json.dump({"event": "finish", "ts": ts + 100, "result": "Done"}, f) 175 f.write("\n") 176 177 # List agents 178 result = cortex_agents() 179 180 # Should have one more than before 181 assert len(result["agents"]) == initial_count + 1 182 183 # Find our agent 184 our_agent = [a for a in result["agents"] if a["id"] == str(ts)][0] 185 assert our_agent["status"] == "completed" 186 assert our_agent["prompt"] == "Test" 187 188 189@pytest.mark.integration 190def test_cortex_error_handling(integration_journal_path, callosum_server): 191 """Test that Cortex handles errors gracefully.""" 192 os.environ["_SOLSTONE_JOURNAL_OVERRIDE"] = str(integration_journal_path) 193 194 # Listen for events 195 received_events = [] 196 197 def callback(msg): 198 received_events.append(msg) 199 200 from think.callosum import CallosumConnection 201 202 listener = CallosumConnection() 203 listener.start(callback=callback) 204 time.sleep(0.1) 205 206 # Make a request 207 cortex_request( 208 prompt="Test error handling", 209 name="nonexistent_agent", # This may cause issues 210 provider="openai", 211 ) 212 213 time.sleep(0.2) 214 215 # Should have at least received the request 216 request_events = [e for e in received_events if e.get("event") == "request"] 217 assert len(request_events) >= 1 218 219 listener.stop()