personal memory agent
at main 630 lines 20 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for cortex_client module with Callosum.""" 5 6import json 7import shutil 8import tempfile 9import threading 10import time 11from pathlib import Path 12 13import pytest 14 15from think.callosum import CallosumConnection, CallosumServer 16from think.cortex_client import ( 17 cortex_agents, 18 cortex_request, 19 get_agent_end_state, 20 get_agent_log_status, 21 wait_for_agents, 22) 23from think.models import GPT_5 24from think.utils import now_ms 25 26 27@pytest.fixture 28def callosum_server(monkeypatch): 29 """Start a Callosum server for testing. 30 31 Uses a short temp path in /tmp to avoid Unix socket path length limits 32 (~104 chars on macOS). pytest's tmp_path creates paths that are too long. 33 """ 34 # Create short temp dir to avoid Unix socket path length limits 35 tmp_dir = tempfile.mkdtemp(dir="/tmp", prefix="callosum_") 36 tmp_path = Path(tmp_dir) 37 38 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 39 (tmp_path / "agents").mkdir(parents=True, exist_ok=True) 40 41 server = CallosumServer() 42 server_thread = threading.Thread(target=server.start, daemon=True) 43 server_thread.start() 44 45 # Wait for server to be ready 46 socket_path = tmp_path / "health" / "callosum.sock" 47 for _ in range(50): 48 if socket_path.exists(): 49 break 50 time.sleep(0.1) 51 else: 52 pytest.fail("Callosum server did not start in time") 53 54 yield tmp_path 55 56 server.stop() 57 server_thread.join(timeout=2) 58 shutil.rmtree(tmp_dir, ignore_errors=True) 59 60 61@pytest.fixture 62def callosum_listener(callosum_server): 63 """Provide a CallosumConnection listener that collects received messages. 64 65 Yields (messages, listener) where messages is a list that accumulates 66 all broadcast messages received during the test. 67 """ 68 messages = [] 69 70 def callback(msg): 71 messages.append(msg) 72 73 listener = CallosumConnection() 74 listener.start(callback=callback) 75 time.sleep(0.1) # Allow connection to establish 76 77 yield messages 78 79 listener.stop() 80 81 82def test_cortex_request_broadcasts_to_callosum(callosum_listener): 83 """Test that cortex_request broadcasts request to Callosum.""" 84 messages = callosum_listener 85 86 # Create a request 87 agent_id = cortex_request( 88 prompt="Test prompt", 89 name="unified", 90 provider="openai", 91 config={"model": GPT_5}, 92 ) 93 94 time.sleep(0.2) 95 96 # Verify broadcast was received 97 assert len(messages) == 1 98 msg = messages[0] 99 assert msg["tract"] == "cortex" 100 assert msg["event"] == "request" 101 assert msg["prompt"] == "Test prompt" 102 assert msg["name"] == "unified" 103 assert msg["provider"] == "openai" 104 assert msg["model"] == GPT_5 105 assert msg["agent_id"] == agent_id 106 assert "ts" in msg 107 108 109def test_cortex_request_returns_agent_id(callosum_server): 110 """Test that cortex_request returns agent_id string.""" 111 _ = callosum_server # Needed for side effects only 112 113 agent_id = cortex_request(prompt="Test", name="unified", provider="openai") 114 115 # Verify agent_id is a string timestamp 116 assert isinstance(agent_id, str) 117 assert agent_id.isdigit() 118 assert len(agent_id) == 13 # Millisecond timestamp 119 120 121def test_cortex_request_unique_agent_ids(callosum_server): 122 """Test that cortex_request generates unique agent IDs.""" 123 _ = callosum_server # Needed for side effects only 124 125 agent_ids = [] 126 for i in range(3): 127 agent_id = cortex_request(prompt=f"Test {i}", name="unified", provider="openai") 128 agent_ids.append(agent_id) 129 time.sleep(0.002) 130 131 # All agent IDs should be unique 132 assert len(set(agent_ids)) == 3 133 134 135def test_cortex_request_returns_none_on_send_failure(callosum_server, monkeypatch): 136 """Test cortex_request returns None when callosum_send fails.""" 137 monkeypatch.setattr("think.cortex_client.callosum_send", lambda *a, **kw: False) 138 139 agent_id = cortex_request(prompt="Test", name="unified", provider="openai") 140 141 assert agent_id is None 142 143 144def test_cortex_request_empty_journal(tmp_path, monkeypatch): 145 """Test cortex_request works with an empty journal directory.""" 146 monkeypatch.setattr("think.cortex_client.callosum_send", lambda *a, **kw: True) 147 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 148 149 agent_id = cortex_request("test", "unified", "openai") 150 assert agent_id is not None 151 assert len(agent_id) > 0 152 153 154# Tests for cortex_agents remain mostly unchanged as they read from files 155 156 157def test_cortex_agents_empty(tmp_path, monkeypatch): 158 """Test cortex_agents with no agents.""" 159 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 160 161 result = cortex_agents() 162 163 assert result["agents"] == [] 164 assert result["pagination"]["total"] == 0 165 assert result["pagination"]["has_more"] is False 166 assert result["live_count"] == 0 167 assert result["historical_count"] == 0 168 169 170def test_cortex_agents_with_active(tmp_path, monkeypatch): 171 """Test cortex_agents with active (running) agents.""" 172 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 173 agents_dir = tmp_path / "agents" 174 agents_dir.mkdir() 175 176 # Create active agent files 177 ts1 = now_ms() 178 ts2 = ts1 + 1000 179 180 unified_dir = agents_dir / "unified" 181 tester_dir = agents_dir / "tester" 182 unified_dir.mkdir() 183 tester_dir.mkdir() 184 185 active_file1 = unified_dir / f"{ts1}_active.jsonl" 186 with open(active_file1, "w") as f: 187 json.dump( 188 { 189 "event": "request", 190 "ts": ts1, 191 "prompt": "Task 1", 192 "name": "unified", 193 "provider": "openai", 194 }, 195 f, 196 ) 197 f.write("\n") 198 199 active_file2 = tester_dir / f"{ts2}_active.jsonl" 200 with open(active_file2, "w") as f: 201 json.dump( 202 { 203 "event": "request", 204 "ts": ts2, 205 "prompt": "Task 2", 206 "name": "tester", 207 "provider": "google", 208 }, 209 f, 210 ) 211 f.write("\n") 212 213 result = cortex_agents() 214 215 assert len(result["agents"]) == 2 216 assert result["live_count"] == 2 217 assert result["historical_count"] == 0 218 219 220def test_cortex_agents_with_completed(tmp_path, monkeypatch): 221 """Test cortex_agents with completed (historical) agents.""" 222 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 223 agents_dir = tmp_path / "agents" 224 agents_dir.mkdir() 225 226 # Create completed agent files 227 ts1 = now_ms() 228 reviewer_dir = agents_dir / "reviewer" 229 reviewer_dir.mkdir() 230 231 completed_file1 = reviewer_dir / f"{ts1}.jsonl" 232 with open(completed_file1, "w") as f: 233 json.dump( 234 { 235 "event": "request", 236 "ts": ts1, 237 "prompt": "Old task", 238 "name": "reviewer", 239 "provider": "anthropic", 240 }, 241 f, 242 ) 243 f.write("\n") 244 json.dump({"event": "finish", "ts": ts1 + 100, "result": "Done"}, f) 245 f.write("\n") 246 247 result = cortex_agents() 248 249 assert len(result["agents"]) == 1 250 assert result["live_count"] == 0 251 assert result["historical_count"] == 1 252 assert result["agents"][0]["status"] == "completed" 253 254 255def test_cortex_agents_pagination(tmp_path, monkeypatch): 256 """Test cortex_agents pagination.""" 257 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 258 agents_dir = tmp_path / "agents" 259 agents_dir.mkdir() 260 261 # Create multiple agents 262 base_ts = now_ms() 263 unified_dir = agents_dir / "unified" 264 unified_dir.mkdir() 265 for i in range(5): 266 ts = base_ts + (i * 1000) 267 file = unified_dir / f"{ts}.jsonl" 268 with open(file, "w") as f: 269 json.dump( 270 { 271 "event": "request", 272 "ts": ts, 273 "prompt": f"Task {i}", 274 "name": "unified", 275 }, 276 f, 277 ) 278 f.write("\n") 279 280 # Test limit 281 result = cortex_agents(limit=2) 282 assert len(result["agents"]) == 2 283 assert result["pagination"]["limit"] == 2 284 assert result["pagination"]["total"] == 5 285 assert result["pagination"]["has_more"] is True 286 287 288def test_cortex_agents_empty_journal(tmp_path, monkeypatch): 289 """Test cortex_agents works with an empty journal directory.""" 290 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 291 292 result = cortex_agents() 293 assert "agents" in result 294 assert "pagination" in result 295 assert isinstance(result["agents"], list) 296 297 298def test_get_agent_log_status_completed(tmp_path, monkeypatch): 299 """Test get_agent_log_status returns 'completed' for finished agents.""" 300 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 301 agents_dir = tmp_path / "agents" 302 agents_dir.mkdir() 303 unified_dir = agents_dir / "unified" 304 unified_dir.mkdir() 305 306 agent_id = "1234567890123" 307 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 308 309 assert get_agent_log_status(agent_id) == "completed" 310 311 312def test_get_agent_log_status_running(tmp_path, monkeypatch): 313 """Test get_agent_log_status returns 'running' for active agents.""" 314 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 315 agents_dir = tmp_path / "agents" 316 agents_dir.mkdir() 317 unified_dir = agents_dir / "unified" 318 unified_dir.mkdir() 319 320 agent_id = "1234567890123" 321 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 322 323 assert get_agent_log_status(agent_id) == "running" 324 325 326def test_get_agent_log_status_not_found(tmp_path, monkeypatch): 327 """Test get_agent_log_status returns 'not_found' for missing agents.""" 328 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 329 (tmp_path / "agents").mkdir() 330 331 assert get_agent_log_status("nonexistent") == "not_found" 332 333 334def test_get_agent_log_status_prefers_completed(tmp_path, monkeypatch): 335 """Test get_agent_log_status returns 'completed' when both files exist.""" 336 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 337 agents_dir = tmp_path / "agents" 338 agents_dir.mkdir() 339 unified_dir = agents_dir / "unified" 340 unified_dir.mkdir() 341 342 # Edge case: both files exist (shouldn't happen, but check precedence) 343 agent_id = "1234567890123" 344 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 345 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 346 347 assert get_agent_log_status(agent_id) == "completed" 348 349 350def test_get_agent_end_state_finish(tmp_path, monkeypatch): 351 """Test get_agent_end_state returns 'finish' for successful agents.""" 352 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 353 agents_dir = tmp_path / "agents" 354 agents_dir.mkdir() 355 unified_dir = agents_dir / "unified" 356 unified_dir.mkdir() 357 358 agent_id = "1234567890123" 359 (unified_dir / f"{agent_id}.jsonl").write_text( 360 '{"event": "request", "prompt": "hello"}\n' 361 '{"event": "finish", "result": "done"}\n' 362 ) 363 364 assert get_agent_end_state(agent_id) == "finish" 365 366 367def test_get_agent_end_state_error(tmp_path, monkeypatch): 368 """Test get_agent_end_state returns 'error' for failed agents.""" 369 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 370 agents_dir = tmp_path / "agents" 371 agents_dir.mkdir() 372 unified_dir = agents_dir / "unified" 373 unified_dir.mkdir() 374 375 agent_id = "1234567890123" 376 (unified_dir / f"{agent_id}.jsonl").write_text( 377 '{"event": "request", "prompt": "hello"}\n' 378 '{"event": "error", "error": "something went wrong"}\n' 379 ) 380 381 assert get_agent_end_state(agent_id) == "error" 382 383 384def test_get_agent_end_state_running(tmp_path, monkeypatch): 385 """Test get_agent_end_state returns 'running' for active agents.""" 386 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 387 agents_dir = tmp_path / "agents" 388 agents_dir.mkdir() 389 unified_dir = agents_dir / "unified" 390 unified_dir.mkdir() 391 392 agent_id = "1234567890123" 393 (unified_dir / f"{agent_id}_active.jsonl").write_text( 394 '{"event": "request", "prompt": "hello"}\n' 395 ) 396 397 assert get_agent_end_state(agent_id) == "running" 398 399 400def test_get_agent_end_state_unknown(tmp_path, monkeypatch): 401 """Test get_agent_end_state returns 'unknown' for missing agents.""" 402 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 403 (tmp_path / "agents").mkdir() 404 405 assert get_agent_end_state("nonexistent") == "unknown" 406 407 408# Tests for wait_for_agents 409 410 411def test_wait_for_agents_already_complete(tmp_path, monkeypatch): 412 """Test wait_for_agents returns immediately if agents already completed.""" 413 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 414 agents_dir = tmp_path / "agents" 415 agents_dir.mkdir() 416 unified_dir = agents_dir / "unified" 417 unified_dir.mkdir() 418 (tmp_path / "health").mkdir() 419 420 # Create completed agents 421 agent_ids = ["1000", "2000"] 422 for agent_id in agent_ids: 423 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 424 425 completed, timed_out = wait_for_agents(agent_ids, timeout=1) 426 427 assert set(completed.keys()) == set(agent_ids) 428 assert all(v == "finish" for v in completed.values()) 429 assert timed_out == [] 430 431 432def test_wait_for_agents_event_completion(callosum_server): 433 """Test wait_for_agents completes when finish event is received.""" 434 tmp_path = callosum_server 435 agents_dir = tmp_path / "agents" 436 unified_dir = agents_dir / "unified" 437 unified_dir.mkdir(exist_ok=True) 438 439 agent_id = "1234567890123" 440 441 # Start wait in background thread 442 result = {"completed": None, "timed_out": None} 443 444 def wait_thread(): 445 result["completed"], result["timed_out"] = wait_for_agents( 446 [agent_id], timeout=5 447 ) 448 449 waiter = threading.Thread(target=wait_thread) 450 waiter.start() 451 452 # Give the waiter time to set up listener 453 time.sleep(0.2) 454 455 # Create the completed file and emit finish event 456 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 457 458 # Emit finish event via Callosum 459 client = CallosumConnection() 460 client.start() 461 time.sleep(0.1) 462 client.emit("cortex", "finish", agent_id=agent_id, result="done") 463 time.sleep(0.2) 464 client.stop() 465 466 waiter.join(timeout=3) 467 468 assert result["completed"] == {agent_id: "finish"} 469 assert result["timed_out"] == [] 470 471 472def test_wait_for_agents_error_event(callosum_server): 473 """Test wait_for_agents completes on error event too.""" 474 tmp_path = callosum_server 475 agents_dir = tmp_path / "agents" 476 unified_dir = agents_dir / "unified" 477 unified_dir.mkdir(exist_ok=True) 478 479 agent_id = "1234567890124" 480 481 result = {"completed": None, "timed_out": None} 482 483 def wait_thread(): 484 result["completed"], result["timed_out"] = wait_for_agents( 485 [agent_id], timeout=5 486 ) 487 488 waiter = threading.Thread(target=wait_thread) 489 waiter.start() 490 time.sleep(0.2) 491 492 # Create completed file and emit error event 493 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "error"}\n') 494 495 client = CallosumConnection() 496 client.start() 497 time.sleep(0.1) 498 client.emit("cortex", "error", agent_id=agent_id, error="something failed") 499 time.sleep(0.2) 500 client.stop() 501 502 waiter.join(timeout=3) 503 504 assert result["completed"] == {agent_id: "error"} 505 assert result["timed_out"] == [] 506 507 508def test_wait_for_agents_initial_file_check(tmp_path, monkeypatch): 509 """Test wait_for_agents finds already-completed agents via initial file check.""" 510 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 511 agents_dir = tmp_path / "agents" 512 agents_dir.mkdir() 513 unified_dir = agents_dir / "unified" 514 unified_dir.mkdir() 515 (tmp_path / "health").mkdir() 516 517 agent_id = "1234567890125" 518 519 # Agent already completed before we start waiting 520 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 521 522 completed, timed_out = wait_for_agents([agent_id], timeout=1) 523 524 # Should find via initial file check 525 assert completed == {agent_id: "finish"} 526 assert timed_out == [] 527 528 529def test_wait_for_agents_timeout_actual(tmp_path, monkeypatch): 530 """Test wait_for_agents times out for agents that never complete.""" 531 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 532 agents_dir = tmp_path / "agents" 533 agents_dir.mkdir() 534 unified_dir = agents_dir / "unified" 535 unified_dir.mkdir() 536 (tmp_path / "health").mkdir() 537 538 agent_id = "1234567890126" 539 # Create active file (not completed) 540 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 541 542 completed, timed_out = wait_for_agents([agent_id], timeout=1) 543 544 assert completed == {} 545 assert timed_out == [agent_id] 546 547 548def test_wait_for_agents_partial(callosum_server): 549 """Test wait_for_agents with some completing and some timing out.""" 550 tmp_path = callosum_server 551 agents_dir = tmp_path / "agents" 552 unified_dir = agents_dir / "unified" 553 unified_dir.mkdir(exist_ok=True) 554 555 completing_agent = "1111" 556 timeout_agent = "2222" 557 558 # Create active file for timeout agent 559 (unified_dir / f"{timeout_agent}_active.jsonl").write_text('{"event": "start"}\n') 560 561 result = {"completed": None, "timed_out": None} 562 563 def wait_thread(): 564 result["completed"], result["timed_out"] = wait_for_agents( 565 [completing_agent, timeout_agent], timeout=2 566 ) 567 568 waiter = threading.Thread(target=wait_thread) 569 waiter.start() 570 time.sleep(0.2) 571 572 # Complete one agent 573 (unified_dir / f"{completing_agent}.jsonl").write_text('{"event": "finish"}\n') 574 575 client = CallosumConnection() 576 client.start() 577 time.sleep(0.1) 578 client.emit("cortex", "finish", agent_id=completing_agent, result="done") 579 time.sleep(0.1) 580 client.stop() 581 582 waiter.join(timeout=5) 583 584 assert result["completed"] == {completing_agent: "finish"} 585 assert result["timed_out"] == [timeout_agent] 586 587 588def test_wait_for_agents_missed_event_recovery(tmp_path, monkeypatch, caplog): 589 """Test that missed events are recovered via final file check with INFO log.""" 590 import logging 591 592 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(tmp_path)) 593 agents_dir = tmp_path / "agents" 594 agents_dir.mkdir() 595 unified_dir = agents_dir / "unified" 596 unified_dir.mkdir() 597 (tmp_path / "health").mkdir() 598 599 agent_id = "1234567890127" 600 601 # Start with active file 602 (unified_dir / f"{agent_id}_active.jsonl").write_text('{"event": "start"}\n') 603 604 result = {"completed": None, "timed_out": None} 605 606 def wait_and_complete(): 607 # Wait a bit then "complete" the agent by renaming file 608 time.sleep(0.3) 609 (unified_dir / f"{agent_id}_active.jsonl").unlink() 610 (unified_dir / f"{agent_id}.jsonl").write_text('{"event": "finish"}\n') 611 612 completer = threading.Thread(target=wait_and_complete) 613 completer.start() 614 615 with caplog.at_level(logging.INFO): 616 result["completed"], result["timed_out"] = wait_for_agents( 617 [agent_id], timeout=1 618 ) 619 620 completer.join() 621 622 # Should recover via final file check 623 assert result["completed"] == {agent_id: "finish"} 624 assert result["timed_out"] == [] 625 626 # Should log about missed event 627 assert any( 628 "completion event not received but agent completed" in record.message 629 for record in caplog.records 630 )