personal memory agent
at main 405 lines 13 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Cortex client for managing AI agent requests.""" 5 6import json 7import logging 8import threading 9from pathlib import Path 10from typing import Any, Dict, Optional 11 12from think.callosum import CallosumConnection, callosum_send 13from think.utils import get_journal, now_ms 14 15logger = logging.getLogger(__name__) 16 17# Module-level state for monotonic timestamp generation 18_last_ts = 0 19 20 21def _find_agent_file(agents_dir: Path, agent_id: str) -> tuple[Path | None, str]: 22 """Find an agent log file in per-agent subdirectories. 23 24 Returns: 25 Tuple of (file_path, status) where status is 26 "completed", "running", or "not_found". 27 """ 28 for match in agents_dir.glob(f"*/{agent_id}.jsonl"): 29 return match, "completed" 30 for match in agents_dir.glob(f"*/{agent_id}_active.jsonl"): 31 return match, "running" 32 return None, "not_found" 33 34 35def cortex_request( 36 prompt: str, 37 name: str, 38 provider: Optional[str] = None, 39 config: Optional[Dict[str, Any]] = None, 40) -> str | None: 41 """Create a Cortex agent request via Callosum broadcast. 42 43 Args: 44 prompt: The task or question for the agent 45 name: Agent name - system (e.g., "unified") or app-qualified (e.g., "entities:entity_assist") 46 provider: AI provider - openai, google, or anthropic 47 config: Provider-specific configuration (model, max_output_tokens, thinking_budget, etc.) 48 49 Returns: 50 Agent ID (timestamp-based string), or None if the Callosum send failed. 51 """ 52 # Get journal path (for agent_id uniqueness check) 53 journal_path = get_journal() 54 55 # Create agents directory if it doesn't exist 56 agents_dir = Path(journal_path) / "agents" 57 agents_dir.mkdir(parents=True, exist_ok=True) 58 59 # Generate monotonic timestamp in milliseconds, ensuring uniqueness 60 global _last_ts 61 ts = now_ms() 62 63 # If same or earlier than last used, increment to ensure uniqueness 64 if ts <= _last_ts: 65 ts = _last_ts + 1 66 67 _last_ts = ts 68 agent_id = str(ts) 69 70 # Build request object 71 request = { 72 "event": "request", 73 "ts": ts, 74 "agent_id": agent_id, 75 "prompt": prompt, 76 "provider": provider, 77 "name": name, 78 } 79 80 # Add optional fields 81 if config: 82 if not isinstance(config, dict): 83 raise ValueError("config must be a dictionary") 84 # Merge config overrides directly into the request for a flat schema 85 request.update(config) 86 87 # Broadcast request to Callosum 88 # Note: callosum_send() signature is send(tract, event, **fields) 89 # Remove "event" from request dict to avoid conflict 90 request_fields = {k: v for k, v in request.items() if k != "event"} 91 sent = callosum_send("cortex", "request", **request_fields) 92 93 if not sent: 94 logger.info("Failed to send cortex request for agent '%s'", name) 95 return None 96 97 return agent_id 98 99 100def get_agent_log_status(agent_id: str) -> str: 101 """Get the status of a specific agent from its log file. 102 103 Args: 104 agent_id: The agent ID (timestamp) 105 106 Returns: 107 "completed" - Agent finished (*.jsonl exists) 108 "running" - Agent still active (*_active.jsonl exists) 109 "not_found" - No agent file exists 110 """ 111 agents_dir = Path(get_journal()) / "agents" 112 _, status = _find_agent_file(agents_dir, agent_id) 113 return status 114 115 116def wait_for_agents( 117 agent_ids: list[str], 118 timeout: int | None = 600, 119) -> tuple[dict[str, str], list[str]]: 120 """Wait for agents to complete via Callosum events. 121 122 Listens for cortex.finish and cortex.error events. Sets up the event 123 listener first, then does an initial file check for agents that may have 124 already completed, and a final file check at timeout as a backstop for 125 any missed events. 126 127 Args: 128 agent_ids: List of agent IDs to wait for 129 timeout: Maximum wait time in seconds (default 600 = 10 minutes) 130 131 Returns: 132 Tuple of (completed, timed_out) where completed is a dict mapping 133 agent_id to end state ("finish" or "error"), and timed_out is a 134 list of agent IDs that did not complete within the timeout. 135 """ 136 pending = set(agent_ids) 137 completed: dict[str, str] = {} 138 lock = threading.Lock() 139 all_done = threading.Event() 140 141 def on_message(msg: dict) -> None: 142 if msg.get("tract") != "cortex": 143 return 144 agent_id = msg.get("agent_id") 145 if not agent_id: 146 return 147 148 event_type = msg.get("event") 149 if event_type in ("finish", "error"): 150 with lock: 151 if agent_id in pending: 152 completed[agent_id] = event_type 153 pending.discard(agent_id) 154 if not pending: 155 all_done.set() 156 157 # Start listener BEFORE initial check to avoid race condition 158 listener = CallosumConnection() 159 listener.start(callback=on_message) 160 161 try: 162 # Initial file check (with lock since callback may be running) 163 with lock: 164 for agent_id in list(pending): 165 end_state = get_agent_end_state(agent_id) 166 if end_state in ("finish", "error"): 167 completed[agent_id] = end_state 168 pending.discard(agent_id) 169 170 if not pending: 171 return completed, [] 172 173 # Wait for all completions or timeout 174 all_done.wait(timeout=timeout) 175 176 finally: 177 listener.stop() 178 179 # Final file check for any remaining (backstop for missed events) 180 # Listener is stopped, so no lock needed 181 for agent_id in list(pending): 182 end_state = get_agent_end_state(agent_id) 183 if end_state in ("finish", "error"): 184 logger.info( 185 f"Agent {agent_id} completion event not received but agent completed" 186 ) 187 completed[agent_id] = end_state 188 pending.discard(agent_id) 189 190 return completed, list(pending) 191 192 193def get_agent_end_state(agent_id: str) -> str: 194 """Get how a completed agent ended (finish or error). 195 196 Checks file contents for terminal events even if file is still _active.jsonl, 197 since Callosum broadcasts happen before file rename. 198 199 Args: 200 agent_id: The agent ID (timestamp) 201 202 Returns: 203 "finish" - Agent completed successfully 204 "error" - Agent ended with an error 205 "running" - Agent is still active (no terminal event in file) 206 "unknown" - Agent file not found 207 """ 208 status = get_agent_log_status(agent_id) 209 if status == "not_found": 210 return "unknown" 211 212 # Read events to find terminal state (even for "running" files that may 213 # have finish event - Callosum broadcast happens before file rename) 214 try: 215 events = read_agent_events(agent_id) 216 # Find last finish or error event 217 for event in reversed(events): 218 event_type = event.get("event") 219 if event_type == "finish": 220 return "finish" 221 if event_type == "error": 222 return "error" 223 # No terminal event found - still running 224 return "running" 225 except FileNotFoundError: 226 return "unknown" 227 228 229def read_agent_events(agent_id: str) -> list[Dict[str, Any]]: 230 """Read all events from an agent's JSONL log file. 231 232 Args: 233 agent_id: The agent ID (timestamp) 234 235 Returns: 236 List of event dictionaries in chronological order 237 238 Raises: 239 FileNotFoundError: If agent log doesn't exist 240 """ 241 agents_dir = Path(get_journal()) / "agents" 242 agent_file, _status = _find_agent_file(agents_dir, agent_id) 243 if agent_file is None: 244 raise FileNotFoundError(f"Agent log not found: {agent_id}") 245 246 events = [] 247 with open(agent_file, "r") as f: 248 for line in f: 249 line = line.strip() 250 if not line: 251 continue 252 try: 253 event = json.loads(line) 254 events.append(event) 255 except json.JSONDecodeError: 256 logger.debug(f"Skipping malformed JSON in {agent_file}") 257 continue 258 259 return events 260 261 262def cortex_agents( 263 limit: int = 10, 264 offset: int = 0, 265 agent_type: str = "all", 266 facet: Optional[str] = None, 267) -> Dict[str, Any]: 268 """List agents from the journal with pagination and filtering. 269 270 Args: 271 limit: Maximum number of agents to return (1-100) 272 offset: Number of agents to skip 273 agent_type: Filter by "live", "historical", or "all" 274 facet: Optional facet to filter by. If provided, only returns agents 275 that were run in this facet context. None means no filtering. 276 277 Returns: 278 Dictionary with agents list and pagination info 279 """ 280 # Validate parameters 281 limit = max(1, min(limit, 100)) 282 offset = max(0, offset) 283 284 agents_dir = Path(get_journal()) / "agents" 285 if not agents_dir.exists(): 286 return { 287 "agents": [], 288 "pagination": { 289 "limit": limit, 290 "offset": offset, 291 "total": 0, 292 "has_more": False, 293 }, 294 "live_count": 0, 295 "historical_count": 0, 296 } 297 298 # Collect all agent files 299 all_agents = [] 300 live_count = 0 301 historical_count = 0 302 303 for agent_file in agents_dir.glob("*/*.jsonl"): 304 # Determine status from filename 305 is_active = "_active.jsonl" in agent_file.name 306 is_pending = "_pending.jsonl" in agent_file.name 307 308 # Skip pending files 309 if is_pending: 310 continue 311 312 status = "running" if is_active else "completed" 313 314 # Count by type 315 if status == "running": 316 live_count += 1 317 else: 318 historical_count += 1 319 320 # Filter by requested type 321 if agent_type == "live" and status != "running": 322 continue 323 if agent_type == "historical" and status != "completed": 324 continue 325 326 # Extract agent ID from filename 327 agent_id = agent_file.stem.replace("_active", "") 328 329 # Read agent file to get request info and calculate runtime 330 try: 331 with open(agent_file, "r") as f: 332 lines = f.readlines() 333 if not lines: 334 continue 335 336 # Parse first line (request) 337 first_line = lines[0].strip() 338 if not first_line: 339 continue 340 341 request = json.loads(first_line) 342 if request.get("event") != "request": 343 continue 344 345 # Extract facet from request 346 agent_facet = request.get("facet") 347 348 # Filter by facet if specified 349 if facet is not None and agent_facet != facet: 350 continue 351 352 # Extract basic info 353 agent_info = { 354 "id": agent_id, 355 "name": request.get("name", "unified"), 356 "start": request.get("ts", 0), 357 "status": status, 358 "prompt": request.get("prompt", ""), 359 "provider": request.get("provider", "openai"), 360 "facet": agent_facet, 361 } 362 363 # For completed agents, find finish event to calculate runtime 364 if status == "completed" and len(lines) > 1: 365 # Read last few lines to find finish event (reading backwards is more efficient) 366 for line in reversed(lines[-10:]): # Check last 10 lines 367 line = line.strip() 368 if not line: 369 continue 370 try: 371 event = json.loads(line) 372 if event.get("event") == "finish": 373 end_ts = event.get("ts", 0) 374 if end_ts and agent_info["start"]: 375 # Calculate runtime in seconds 376 agent_info["runtime_seconds"] = ( 377 end_ts - agent_info["start"] 378 ) / 1000.0 379 break 380 except json.JSONDecodeError: 381 continue 382 383 all_agents.append(agent_info) 384 except (json.JSONDecodeError, IOError): 385 # Skip malformed files 386 continue 387 388 # Sort by start time (newest first) 389 all_agents.sort(key=lambda x: x["start"], reverse=True) 390 391 # Apply pagination 392 total = len(all_agents) 393 paginated = all_agents[offset : offset + limit] 394 395 return { 396 "agents": paginated, 397 "pagination": { 398 "limit": limit, 399 "offset": offset, 400 "total": total, 401 "has_more": (offset + limit) < total, 402 }, 403 "live_count": live_count, 404 "historical_count": historical_count, 405 }