personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Cortex client for managing AI agent requests."""
5
6import json
7import logging
8import threading
9from pathlib import Path
10from typing import Any, Dict, Optional
11
12from think.callosum import CallosumConnection, callosum_send
13from think.utils import get_journal, now_ms
14
15logger = logging.getLogger(__name__)
16
17# Module-level state for monotonic timestamp generation
18_last_ts = 0
19
20
21def _find_agent_file(agents_dir: Path, agent_id: str) -> tuple[Path | None, str]:
22 """Find an agent log file in per-agent subdirectories.
23
24 Returns:
25 Tuple of (file_path, status) where status is
26 "completed", "running", or "not_found".
27 """
28 for match in agents_dir.glob(f"*/{agent_id}.jsonl"):
29 return match, "completed"
30 for match in agents_dir.glob(f"*/{agent_id}_active.jsonl"):
31 return match, "running"
32 return None, "not_found"
33
34
35def cortex_request(
36 prompt: str,
37 name: str,
38 provider: Optional[str] = None,
39 config: Optional[Dict[str, Any]] = None,
40) -> str | None:
41 """Create a Cortex agent request via Callosum broadcast.
42
43 Args:
44 prompt: The task or question for the agent
45 name: Agent name - system (e.g., "unified") or app-qualified (e.g., "entities:entity_assist")
46 provider: AI provider - openai, google, or anthropic
47 config: Provider-specific configuration (model, max_output_tokens, thinking_budget, etc.)
48
49 Returns:
50 Agent ID (timestamp-based string), or None if the Callosum send failed.
51 """
52 # Get journal path (for agent_id uniqueness check)
53 journal_path = get_journal()
54
55 # Create agents directory if it doesn't exist
56 agents_dir = Path(journal_path) / "agents"
57 agents_dir.mkdir(parents=True, exist_ok=True)
58
59 # Generate monotonic timestamp in milliseconds, ensuring uniqueness
60 global _last_ts
61 ts = now_ms()
62
63 # If same or earlier than last used, increment to ensure uniqueness
64 if ts <= _last_ts:
65 ts = _last_ts + 1
66
67 _last_ts = ts
68 agent_id = str(ts)
69
70 # Build request object
71 request = {
72 "event": "request",
73 "ts": ts,
74 "agent_id": agent_id,
75 "prompt": prompt,
76 "provider": provider,
77 "name": name,
78 }
79
80 # Add optional fields
81 if config:
82 if not isinstance(config, dict):
83 raise ValueError("config must be a dictionary")
84 # Merge config overrides directly into the request for a flat schema
85 request.update(config)
86
87 # Broadcast request to Callosum
88 # Note: callosum_send() signature is send(tract, event, **fields)
89 # Remove "event" from request dict to avoid conflict
90 request_fields = {k: v for k, v in request.items() if k != "event"}
91 sent = callosum_send("cortex", "request", **request_fields)
92
93 if not sent:
94 logger.info("Failed to send cortex request for agent '%s'", name)
95 return None
96
97 return agent_id
98
99
100def get_agent_log_status(agent_id: str) -> str:
101 """Get the status of a specific agent from its log file.
102
103 Args:
104 agent_id: The agent ID (timestamp)
105
106 Returns:
107 "completed" - Agent finished (*.jsonl exists)
108 "running" - Agent still active (*_active.jsonl exists)
109 "not_found" - No agent file exists
110 """
111 agents_dir = Path(get_journal()) / "agents"
112 _, status = _find_agent_file(agents_dir, agent_id)
113 return status
114
115
116def wait_for_agents(
117 agent_ids: list[str],
118 timeout: int | None = 600,
119) -> tuple[dict[str, str], list[str]]:
120 """Wait for agents to complete via Callosum events.
121
122 Listens for cortex.finish and cortex.error events. Sets up the event
123 listener first, then does an initial file check for agents that may have
124 already completed, and a final file check at timeout as a backstop for
125 any missed events.
126
127 Args:
128 agent_ids: List of agent IDs to wait for
129 timeout: Maximum wait time in seconds (default 600 = 10 minutes)
130
131 Returns:
132 Tuple of (completed, timed_out) where completed is a dict mapping
133 agent_id to end state ("finish" or "error"), and timed_out is a
134 list of agent IDs that did not complete within the timeout.
135 """
136 pending = set(agent_ids)
137 completed: dict[str, str] = {}
138 lock = threading.Lock()
139 all_done = threading.Event()
140
141 def on_message(msg: dict) -> None:
142 if msg.get("tract") != "cortex":
143 return
144 agent_id = msg.get("agent_id")
145 if not agent_id:
146 return
147
148 event_type = msg.get("event")
149 if event_type in ("finish", "error"):
150 with lock:
151 if agent_id in pending:
152 completed[agent_id] = event_type
153 pending.discard(agent_id)
154 if not pending:
155 all_done.set()
156
157 # Start listener BEFORE initial check to avoid race condition
158 listener = CallosumConnection()
159 listener.start(callback=on_message)
160
161 try:
162 # Initial file check (with lock since callback may be running)
163 with lock:
164 for agent_id in list(pending):
165 end_state = get_agent_end_state(agent_id)
166 if end_state in ("finish", "error"):
167 completed[agent_id] = end_state
168 pending.discard(agent_id)
169
170 if not pending:
171 return completed, []
172
173 # Wait for all completions or timeout
174 all_done.wait(timeout=timeout)
175
176 finally:
177 listener.stop()
178
179 # Final file check for any remaining (backstop for missed events)
180 # Listener is stopped, so no lock needed
181 for agent_id in list(pending):
182 end_state = get_agent_end_state(agent_id)
183 if end_state in ("finish", "error"):
184 logger.info(
185 f"Agent {agent_id} completion event not received but agent completed"
186 )
187 completed[agent_id] = end_state
188 pending.discard(agent_id)
189
190 return completed, list(pending)
191
192
193def get_agent_end_state(agent_id: str) -> str:
194 """Get how a completed agent ended (finish or error).
195
196 Checks file contents for terminal events even if file is still _active.jsonl,
197 since Callosum broadcasts happen before file rename.
198
199 Args:
200 agent_id: The agent ID (timestamp)
201
202 Returns:
203 "finish" - Agent completed successfully
204 "error" - Agent ended with an error
205 "running" - Agent is still active (no terminal event in file)
206 "unknown" - Agent file not found
207 """
208 status = get_agent_log_status(agent_id)
209 if status == "not_found":
210 return "unknown"
211
212 # Read events to find terminal state (even for "running" files that may
213 # have finish event - Callosum broadcast happens before file rename)
214 try:
215 events = read_agent_events(agent_id)
216 # Find last finish or error event
217 for event in reversed(events):
218 event_type = event.get("event")
219 if event_type == "finish":
220 return "finish"
221 if event_type == "error":
222 return "error"
223 # No terminal event found - still running
224 return "running"
225 except FileNotFoundError:
226 return "unknown"
227
228
229def read_agent_events(agent_id: str) -> list[Dict[str, Any]]:
230 """Read all events from an agent's JSONL log file.
231
232 Args:
233 agent_id: The agent ID (timestamp)
234
235 Returns:
236 List of event dictionaries in chronological order
237
238 Raises:
239 FileNotFoundError: If agent log doesn't exist
240 """
241 agents_dir = Path(get_journal()) / "agents"
242 agent_file, _status = _find_agent_file(agents_dir, agent_id)
243 if agent_file is None:
244 raise FileNotFoundError(f"Agent log not found: {agent_id}")
245
246 events = []
247 with open(agent_file, "r") as f:
248 for line in f:
249 line = line.strip()
250 if not line:
251 continue
252 try:
253 event = json.loads(line)
254 events.append(event)
255 except json.JSONDecodeError:
256 logger.debug(f"Skipping malformed JSON in {agent_file}")
257 continue
258
259 return events
260
261
262def cortex_agents(
263 limit: int = 10,
264 offset: int = 0,
265 agent_type: str = "all",
266 facet: Optional[str] = None,
267) -> Dict[str, Any]:
268 """List agents from the journal with pagination and filtering.
269
270 Args:
271 limit: Maximum number of agents to return (1-100)
272 offset: Number of agents to skip
273 agent_type: Filter by "live", "historical", or "all"
274 facet: Optional facet to filter by. If provided, only returns agents
275 that were run in this facet context. None means no filtering.
276
277 Returns:
278 Dictionary with agents list and pagination info
279 """
280 # Validate parameters
281 limit = max(1, min(limit, 100))
282 offset = max(0, offset)
283
284 agents_dir = Path(get_journal()) / "agents"
285 if not agents_dir.exists():
286 return {
287 "agents": [],
288 "pagination": {
289 "limit": limit,
290 "offset": offset,
291 "total": 0,
292 "has_more": False,
293 },
294 "live_count": 0,
295 "historical_count": 0,
296 }
297
298 # Collect all agent files
299 all_agents = []
300 live_count = 0
301 historical_count = 0
302
303 for agent_file in agents_dir.glob("*/*.jsonl"):
304 # Determine status from filename
305 is_active = "_active.jsonl" in agent_file.name
306 is_pending = "_pending.jsonl" in agent_file.name
307
308 # Skip pending files
309 if is_pending:
310 continue
311
312 status = "running" if is_active else "completed"
313
314 # Count by type
315 if status == "running":
316 live_count += 1
317 else:
318 historical_count += 1
319
320 # Filter by requested type
321 if agent_type == "live" and status != "running":
322 continue
323 if agent_type == "historical" and status != "completed":
324 continue
325
326 # Extract agent ID from filename
327 agent_id = agent_file.stem.replace("_active", "")
328
329 # Read agent file to get request info and calculate runtime
330 try:
331 with open(agent_file, "r") as f:
332 lines = f.readlines()
333 if not lines:
334 continue
335
336 # Parse first line (request)
337 first_line = lines[0].strip()
338 if not first_line:
339 continue
340
341 request = json.loads(first_line)
342 if request.get("event") != "request":
343 continue
344
345 # Extract facet from request
346 agent_facet = request.get("facet")
347
348 # Filter by facet if specified
349 if facet is not None and agent_facet != facet:
350 continue
351
352 # Extract basic info
353 agent_info = {
354 "id": agent_id,
355 "name": request.get("name", "unified"),
356 "start": request.get("ts", 0),
357 "status": status,
358 "prompt": request.get("prompt", ""),
359 "provider": request.get("provider", "openai"),
360 "facet": agent_facet,
361 }
362
363 # For completed agents, find finish event to calculate runtime
364 if status == "completed" and len(lines) > 1:
365 # Read last few lines to find finish event (reading backwards is more efficient)
366 for line in reversed(lines[-10:]): # Check last 10 lines
367 line = line.strip()
368 if not line:
369 continue
370 try:
371 event = json.loads(line)
372 if event.get("event") == "finish":
373 end_ts = event.get("ts", 0)
374 if end_ts and agent_info["start"]:
375 # Calculate runtime in seconds
376 agent_info["runtime_seconds"] = (
377 end_ts - agent_info["start"]
378 ) / 1000.0
379 break
380 except json.JSONDecodeError:
381 continue
382
383 all_agents.append(agent_info)
384 except (json.JSONDecodeError, IOError):
385 # Skip malformed files
386 continue
387
388 # Sort by start time (newest first)
389 all_agents.sort(key=lambda x: x["start"], reverse=True)
390
391 # Apply pagination
392 total = len(all_agents)
393 paginated = all_agents[offset : offset + limit]
394
395 return {
396 "agents": paginated,
397 "pagination": {
398 "limit": limit,
399 "offset": offset,
400 "total": total,
401 "has_more": (offset + limit) < total,
402 },
403 "live_count": live_count,
404 "historical_count": historical_count,
405 }