personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Callosum-based agent process manager for solstone.
5
6Cortex listens for agent requests via the Callosum message bus and manages
7agent process lifecycle:
8- Receives requests via Callosum (tract="cortex", event="request")
9- Creates <agent>/<timestamp>_active.jsonl files to track active agents
10- Spawns agent processes and captures their stdout events
11- Broadcasts all agent events back to Callosum
12- Renames to <agent>/<timestamp>.jsonl when complete
13
14Agent files provide persistence and historical record, while Callosum provides
15real-time event distribution to all interested services.
16"""
17
18from __future__ import annotations
19
20import json
21import logging
22import os
23import subprocess
24import sys
25import threading
26import time
27from pathlib import Path
28from typing import Any, Dict, Optional
29
30from think.callosum import CallosumConnection
31from think.runner import _atomic_symlink
32from think.utils import get_journal, get_rev, now_ms
33
34
35class AgentProcess:
36 """Manages a running agent subprocess."""
37
38 def __init__(self, agent_id: str, process: subprocess.Popen, log_path: Path):
39 self.agent_id = agent_id
40 self.process = process
41 self.log_path = log_path
42 self.stop_event = threading.Event()
43 self.timeout_timer = None # For timeout support
44 self.start_time = time.time() # Track when agent started
45
46 def is_running(self) -> bool:
47 """Check if the agent process is still running."""
48 return self.process.poll() is None and not self.stop_event.is_set()
49
50 def stop(self) -> None:
51 """Stop the agent process gracefully."""
52 self.stop_event.set()
53
54 # Cancel timeout timer if it exists
55 if self.timeout_timer:
56 self.timeout_timer.cancel()
57
58 if self.process.poll() is None:
59 # First try SIGTERM for graceful shutdown
60 self.process.terminate()
61 try:
62 self.process.wait(timeout=10) # Give more time for graceful shutdown
63 except subprocess.TimeoutExpired:
64 logging.getLogger(__name__).warning(
65 f"Agent {self.agent_id} didn't stop gracefully, killing"
66 )
67 self.process.kill()
68 self.process.wait() # Ensure zombie is reaped
69
70
71class CortexService:
72 """Callosum-based agent process manager."""
73
74 def __init__(self, journal_path: Optional[str] = None):
75 self.journal_path = Path(journal_path or get_journal())
76 self.agents_dir = self.journal_path / "agents"
77 self.agents_dir.mkdir(parents=True, exist_ok=True)
78
79 self.logger = logging.getLogger(__name__)
80 self.running_agents: Dict[str, AgentProcess] = {}
81 self.agent_requests: Dict[str, Dict[str, Any]] = {} # Store agent requests
82 self.lock = threading.RLock()
83 self.stop_event = threading.Event()
84 self.shutdown_requested = threading.Event()
85
86 # Callosum connection for receiving requests and broadcasting events
87 self.callosum = CallosumConnection(defaults={"rev": get_rev()})
88
89 def _create_error_event(
90 self,
91 agent_id: str,
92 error: str,
93 trace: Optional[str] = None,
94 exit_code: Optional[int] = None,
95 ) -> Dict[str, Any]:
96 """Create standardized error event."""
97 event = {
98 "event": "error",
99 "ts": now_ms(),
100 "agent_id": agent_id,
101 "error": error,
102 }
103 if trace:
104 event["trace"] = trace
105 if exit_code is not None:
106 event["exit_code"] = exit_code
107 return event
108
109 def _recover_orphaned_agents(self, active_files: list) -> None:
110 """Recover orphaned active agent files from a previous crash.
111
112 Appends an error event to each file and renames to completed.
113 """
114 for file_path in active_files:
115 agent_id = file_path.stem.replace("_active", "")
116 try:
117 error_event = self._create_error_event(
118 agent_id, "Recovered: Cortex restarted while agent was running"
119 )
120 with open(file_path, "a") as f:
121 f.write(json.dumps(error_event) + "\n")
122
123 completed_path = file_path.parent / f"{agent_id}.jsonl"
124 file_path.rename(completed_path)
125 self.logger.warning(f"Recovered orphaned agent: {agent_id}")
126 except Exception as e:
127 self.logger.error(f"Failed to recover agent {agent_id}: {e}")
128
129 def start(self) -> None:
130 """Start listening for agent requests via Callosum."""
131 # Recover any orphaned active files from previous crash
132 active_files = list(self.agents_dir.glob("*/*_active.jsonl"))
133 if active_files:
134 self.logger.warning(
135 f"Found {len(active_files)} orphaned agent(s), recovering..."
136 )
137 self._recover_orphaned_agents(active_files)
138
139 # Connect to Callosum to receive requests
140 try:
141 self.callosum.start(callback=self._handle_callosum_message)
142 self.logger.info("Connected to Callosum message bus")
143 self.callosum.emit("supervisor", "request", cmd=["sol", "agents", "check"])
144 self.logger.info("Requested agents health check via supervisor")
145 except Exception as e:
146 self.logger.error(f"Failed to connect to Callosum: {e}")
147 sys.exit(1)
148
149 # Start status emission thread
150 threading.Thread(
151 target=self._emit_periodic_status,
152 name="cortex-status",
153 daemon=True,
154 ).start()
155
156 self.logger.info("Cortex service started, listening for agent requests")
157
158 while True:
159 try:
160 while not self.stop_event.is_set():
161 time.sleep(1)
162 # Exit when idle during shutdown
163 if self.shutdown_requested.is_set():
164 with self.lock:
165 if len(self.running_agents) == 0:
166 self.logger.info(
167 "No agents running, exiting gracefully"
168 )
169 return
170 break
171 except KeyboardInterrupt:
172 self.logger.info("Shutdown requested, will exit when idle")
173 self.shutdown_requested.set()
174
175 def _handle_callosum_message(self, message: Dict[str, Any]) -> None:
176 """Handle incoming Callosum messages (callback)."""
177 # Filter for cortex tract and request event
178 if message.get("tract") != "cortex" or message.get("event") != "request":
179 return
180
181 # Handle the request
182 try:
183 self._handle_request(message)
184 except Exception as e:
185 self.logger.exception(f"Error handling request: {e}")
186
187 def _handle_request(self, request: Dict[str, Any]) -> None:
188 """Handle a new agent request from Callosum.
189
190 Cortex is a minimal process manager - it only handles:
191 - File lifecycle (<agent>/<id>_active.jsonl -> <agent>/<id>.jsonl)
192 - Process spawning and monitoring
193 - Event relay to Callosum
194
195 All config loading, validation, and hydration is done by agents.py.
196 """
197 agent_id = request.get("agent_id")
198 if not agent_id:
199 self.logger.error("Received request without agent_id")
200 return
201
202 # Skip if this agent is already being processed
203 with self.lock:
204 if agent_id in self.running_agents:
205 self.logger.debug(f"Agent {agent_id} already running, skipping")
206 return
207
208 # Create _active.jsonl file (exclusive creation to prevent race conditions)
209 name = request.get("name", "unified")
210 safe_name = name.replace(":", "--")
211 agent_subdir = self.agents_dir / safe_name
212 agent_subdir.mkdir(parents=True, exist_ok=True)
213 file_path = agent_subdir / f"{agent_id}_active.jsonl"
214 if file_path.exists():
215 self.logger.debug(f"Agent {agent_id} already claimed by another process")
216 return
217
218 try:
219 with open(file_path, "x") as f: # 'x' mode fails if file exists
220 f.write(json.dumps(request) + "\n")
221 except FileExistsError:
222 return
223
224 self.logger.info(f"Processing agent request: {agent_id}")
225
226 # Store request for later use (output writing)
227 with self.lock:
228 self.agent_requests[agent_id] = request
229
230 # Spawn agent process - it handles all validation/hydration
231 try:
232 self._spawn_subprocess(
233 agent_id, file_path, request, ["sol", "agents"], "agent"
234 )
235 except Exception as e:
236 self.logger.exception(f"Failed to spawn agent {agent_id}: {e}")
237 self._write_error_and_complete(file_path, f"Failed to spawn agent: {e}")
238
239 def _spawn_subprocess(
240 self,
241 agent_id: str,
242 file_path: Path,
243 config: Dict[str, Any],
244 cmd: list[str],
245 process_type: str,
246 ) -> None:
247 """Spawn a subprocess and monitor its output.
248
249 Args:
250 agent_id: Unique identifier for this process
251 file_path: Path to the JSONL log file
252 config: Configuration dict to pass via NDJSON stdin
253 cmd: Command to run (e.g., ["sol", "agents"])
254 process_type: Label for logging ("agent")
255 """
256 try:
257 # Store the config for later use - thread safe
258 with self.lock:
259 self.agent_requests[agent_id] = config
260
261 # Pass the full config through as NDJSON
262 ndjson_input = json.dumps(config)
263
264 # Prepare environment
265 env = os.environ.copy()
266
267 # Promote top-level config keys to environment so tools can read
268 # them as defaults (e.g., sol call todos add uses SOL_FACET).
269 # Explicit env overrides below take precedence.
270 if config.get("facet"):
271 env["SOL_FACET"] = str(config["facet"])
272 if config.get("day"):
273 env["SOL_DAY"] = str(config["day"])
274
275 # Apply explicit env overrides (from dream.py etc.) — these win
276 env_overrides = config.get("env")
277 if env_overrides and isinstance(env_overrides, dict):
278 env.update({k: str(v) for k, v in env_overrides.items()})
279
280 # Spawn the subprocess
281 self.logger.info(f"Spawning {process_type} {agent_id}: {cmd}")
282 self.logger.debug(f"NDJSON input: {ndjson_input}")
283
284 process = subprocess.Popen(
285 cmd,
286 stdin=subprocess.PIPE,
287 stdout=subprocess.PIPE,
288 stderr=subprocess.PIPE,
289 text=True,
290 env=env,
291 bufsize=1,
292 )
293
294 # Send input and close stdin
295 process.stdin.write(ndjson_input + "\n")
296 process.stdin.close()
297
298 # Track the running process
299 agent = AgentProcess(agent_id, process, file_path)
300 with self.lock:
301 self.running_agents[agent_id] = agent
302
303 # Set up timeout (default to 10 minutes if not specified)
304 timeout_seconds = config.get("timeout_seconds", 600)
305 agent.timeout_timer = threading.Timer(
306 timeout_seconds,
307 lambda: self._timeout_agent(agent_id, agent, timeout_seconds),
308 )
309 agent.timeout_timer.start()
310
311 # Start monitoring threads
312 threading.Thread(
313 target=self._monitor_stdout, args=(agent,), daemon=True
314 ).start()
315
316 threading.Thread(
317 target=self._monitor_stderr, args=(agent,), daemon=True
318 ).start()
319
320 self.logger.info(
321 f"{process_type.capitalize()} {agent_id} spawned successfully "
322 f"(PID: {process.pid})"
323 )
324
325 except Exception as e:
326 self.logger.exception(f"Failed to spawn {process_type} {agent_id}: {e}")
327 self._write_error_and_complete(
328 file_path, f"Failed to spawn {process_type}: {e}"
329 )
330
331 def _timeout_agent(
332 self, agent_id: str, agent: AgentProcess, timeout_seconds: int
333 ) -> None:
334 """Handle agent timeout."""
335 if agent.is_running():
336 self.logger.warning(
337 f"Agent {agent_id} timed out after {timeout_seconds} seconds"
338 )
339 error_event = self._create_error_event(
340 agent_id, f"Agent timed out after {timeout_seconds} seconds"
341 )
342 try:
343 with open(agent.log_path, "a") as f:
344 f.write(json.dumps(error_event) + "\n")
345 except Exception as e:
346 self.logger.error(f"Failed to write timeout event: {e}")
347
348 # Broadcast to callosum so wait_for_agents detects immediately
349 try:
350 event_copy = error_event.copy()
351 event_type = event_copy.pop("event", "error")
352 self.callosum.emit("cortex", event_type, **event_copy)
353 except Exception:
354 pass
355
356 agent.stop()
357
358 def _monitor_stdout(self, agent: AgentProcess) -> None:
359 """Monitor agent stdout and append events to the JSONL file."""
360 if not agent.process.stdout:
361 return
362
363 try:
364 with agent.process.stdout:
365 for line in agent.process.stdout:
366 if not line:
367 continue
368
369 line = line.strip()
370 if not line:
371 continue
372
373 try:
374 # Parse JSON event
375 event = json.loads(line)
376
377 # Ensure event has timestamp and agent_id
378 if "ts" not in event:
379 event["ts"] = now_ms()
380 if "agent_id" not in event:
381 event["agent_id"] = agent.agent_id
382
383 # Inject agent name for WebSocket consumers
384 with self.lock:
385 _req = self.agent_requests.get(agent.agent_id)
386 if _req and "name" not in event:
387 event["name"] = _req.get("name", "")
388
389 # Append to JSONL file
390 with open(agent.log_path, "a") as f:
391 f.write(json.dumps(event) + "\n")
392
393 # Broadcast event to Callosum
394 try:
395 event_copy = event.copy()
396 event_type = event_copy.pop("event", "unknown")
397 self.callosum.emit("cortex", event_type, **event_copy)
398 except Exception as e:
399 self.logger.info(
400 f"Failed to broadcast event to Callosum: {e}"
401 )
402
403 # Handle start event
404 if event.get("event") == "start":
405 # Capture model and provider for status reporting
406 with self.lock:
407 if agent.agent_id in self.agent_requests:
408 model = event.get("model")
409 if model:
410 self.agent_requests[agent.agent_id]["model"] = (
411 model
412 )
413 provider = event.get("provider")
414 if provider:
415 self.agent_requests[agent.agent_id]["provider"] = (
416 provider
417 )
418
419 # Handle finish or error event
420 if event.get("event") in ["finish", "error"]:
421 # Check for output (only on finish)
422 if event.get("event") == "finish":
423 result = event.get("result", "")
424
425 # Get original request (thread-safe access)
426 with self.lock:
427 original_request = self.agent_requests.get(
428 agent.agent_id
429 )
430
431 # Log token usage if available
432 usage_data = event.get("usage")
433 if usage_data and original_request:
434 try:
435 from think.models import log_token_usage
436 from think.talent import key_to_context
437
438 model = original_request.get("model", "unknown")
439 name = original_request.get("name", "unknown")
440 context = key_to_context(name)
441
442 # Extract segment from env if set (flat merge puts env at top level)
443 env_config = original_request.get("env", {})
444 segment = (
445 env_config.get("SOL_SEGMENT")
446 if env_config
447 else None
448 )
449
450 log_token_usage(
451 model=model,
452 usage=usage_data,
453 context=context,
454 segment=segment,
455 type="cogitate",
456 )
457 except Exception as e:
458 self.logger.warning(
459 f"Failed to log token usage for agent {agent.agent_id}: {e}"
460 )
461
462 # Write output if requested
463 if original_request and original_request.get("output"):
464 self._write_output(
465 agent.agent_id,
466 result,
467 original_request,
468 )
469
470 # Break to trigger cleanup
471 break
472
473 except json.JSONDecodeError:
474 # Non-JSON output becomes info event
475 info_event = {
476 "event": "info",
477 "ts": now_ms(),
478 "message": line,
479 "agent_id": agent.agent_id,
480 }
481 with open(agent.log_path, "a") as f:
482 f.write(json.dumps(info_event) + "\n")
483
484 except Exception as e:
485 self.logger.error(
486 f"Error monitoring stdout for agent {agent.agent_id}: {e}"
487 )
488 finally:
489 # Wait for process to fully exit (reaps zombie)
490 exit_code = agent.process.wait()
491 self.logger.info(f"Agent {agent.agent_id} exited with code {exit_code}")
492
493 # Check if finish event was emitted
494 has_finish = self._has_finish_event(agent.log_path)
495
496 if not has_finish:
497 # Write error event if no finish using standardized format
498 error_event = self._create_error_event(
499 agent.agent_id,
500 f"Agent exited with code {exit_code} without finish event",
501 exit_code=exit_code,
502 )
503 with open(agent.log_path, "a") as f:
504 f.write(json.dumps(error_event) + "\n")
505
506 # Complete the file (rename from _active.jsonl to .jsonl)
507 self._complete_agent_file(agent.agent_id, agent.log_path)
508
509 # Remove from running agents and clean up stored request (thread-safe)
510 with self.lock:
511 if agent.agent_id in self.running_agents:
512 del self.running_agents[agent.agent_id]
513 # Clean up stored request
514 if agent.agent_id in self.agent_requests:
515 del self.agent_requests[agent.agent_id]
516
517 def _monitor_stderr(self, agent: AgentProcess) -> None:
518 """Monitor agent stderr for errors."""
519 if not agent.process.stderr:
520 return
521
522 stderr_lines = []
523 try:
524 with agent.process.stderr:
525 for line in agent.process.stderr:
526 if not line:
527 continue
528 stripped = line.strip()
529 if stripped:
530 stderr_lines.append(stripped)
531 # Pass through to cortex stderr with agent prefix for traceability
532 print(
533 f"[agent:{agent.agent_id}:stderr] {stripped}",
534 file=sys.stderr,
535 flush=True,
536 )
537
538 except Exception as e:
539 self.logger.error(
540 f"Error monitoring stderr for agent {agent.agent_id}: {e}"
541 )
542 finally:
543 # If process failed with stderr output, write error event
544 if stderr_lines:
545 exit_code = agent.process.poll()
546 if exit_code is not None and exit_code != 0:
547 error_event = self._create_error_event(
548 agent.agent_id,
549 "Process failed with stderr output",
550 trace="\n".join(stderr_lines),
551 exit_code=exit_code,
552 )
553 try:
554 with open(agent.log_path, "a") as f:
555 f.write(json.dumps(error_event) + "\n")
556 except Exception as e:
557 self.logger.warning(f"Failed to write stderr event: {e}")
558
559 def _has_finish_event(self, file_path: Path) -> bool:
560 """Check if the JSONL file contains a finish or error event."""
561 try:
562 with open(file_path, "r") as f:
563 for line in f:
564 try:
565 event = json.loads(line)
566 if event.get("event") in ["finish", "error"]:
567 return True
568 except json.JSONDecodeError:
569 continue
570 except Exception:
571 pass
572 return False
573
574 def _complete_agent_file(self, agent_id: str, file_path: Path) -> None:
575 """Complete an agent by renaming the file from _active.jsonl to .jsonl."""
576 try:
577 completed_path = file_path.parent / f"{agent_id}.jsonl"
578 file_path.rename(completed_path)
579 self.logger.info(f"Completed agent {agent_id}: {completed_path}")
580
581 # Create convenience symlink: {name}.log -> {name}/{agent_id}.jsonl
582 request = self.agent_requests.get(agent_id)
583 if request:
584 name = request.get("name")
585 if name:
586 safe_name = name.replace(":", "--")
587 link_path = self.agents_dir / f"{safe_name}.log"
588 _atomic_symlink(link_path, f"{safe_name}/{agent_id}.jsonl")
589 self.logger.debug(
590 f"Symlinked {safe_name}.log -> {safe_name}/{agent_id}.jsonl"
591 )
592
593 # Append summary to day index
594 self._append_day_index(agent_id, request, completed_path)
595 else:
596 self.logger.debug(
597 f"No name in request for {agent_id}, skipping symlink"
598 )
599 except Exception as e:
600 self.logger.error(f"Failed to complete agent file {agent_id}: {e}")
601
602 def _append_day_index(
603 self, agent_id: str, request: Dict[str, Any], completed_path: Path
604 ) -> None:
605 """Append agent summary to day index file."""
606 try:
607 # Determine day from request or agent_id timestamp
608 day = request.get("day")
609 if not day:
610 from datetime import datetime
611
612 ts_seconds = int(agent_id) / 1000
613 day = datetime.fromtimestamp(ts_seconds).strftime("%Y%m%d")
614
615 start_ts = request.get("ts", 0)
616
617 # Read last few lines to find finish/error event for runtime
618 runtime_seconds = None
619 status = "completed"
620 try:
621 with open(completed_path, "r") as f:
622 lines = f.readlines()
623 for line in reversed(lines[-10:]):
624 line = line.strip()
625 if not line:
626 continue
627 try:
628 event = json.loads(line)
629 event_type = event.get("event")
630 if event_type == "finish":
631 end_ts = event.get("ts", 0)
632 if end_ts and start_ts:
633 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1)
634 break
635 if event_type == "error":
636 status = "error"
637 end_ts = event.get("ts", 0)
638 if end_ts and start_ts:
639 runtime_seconds = round((end_ts - start_ts) / 1000.0, 1)
640 break
641 except json.JSONDecodeError:
642 continue
643 except Exception:
644 pass
645
646 summary = {
647 "agent_id": agent_id,
648 "name": request.get("name", "unified"),
649 "day": day,
650 "facet": request.get("facet"),
651 "ts": start_ts,
652 "status": status,
653 "runtime_seconds": runtime_seconds,
654 "provider": request.get("provider"),
655 "model": request.get("model"),
656 "schedule": request.get("schedule"),
657 }
658
659 day_index_path = self.agents_dir / f"{day}.jsonl"
660 with open(day_index_path, "a") as f:
661 f.write(json.dumps(summary) + "\n")
662 f.flush()
663
664 except Exception as e:
665 self.logger.error(f"Failed to append day index for {agent_id}: {e}")
666
667 def _write_error_and_complete(self, file_path: Path, error_message: str) -> None:
668 """Write an error event to the file and mark it as complete."""
669 try:
670 agent_id = file_path.stem.replace("_active", "")
671 error_event = self._create_error_event(agent_id, error_message)
672 with open(file_path, "a") as f:
673 f.write(json.dumps(error_event) + "\n")
674
675 # Complete the file
676 self._complete_agent_file(agent_id, file_path)
677 except Exception as e:
678 self.logger.error(f"Failed to write error and complete: {e}")
679
680 def _write_output(self, agent_id: str, result: str, config: Dict[str, Any]) -> None:
681 """Write agent output to config["output_path"].
682
683 The output path is set by the caller — either derived by
684 prepare_config in agents.py (day/segment agents) or computed
685 by dream.py via get_activity_output_path (activity agents).
686 Cortex does not derive paths itself.
687 """
688 output_path_str = config.get("output_path")
689 if not output_path_str:
690 return
691
692 try:
693 output_path = Path(output_path_str)
694 output_path.parent.mkdir(parents=True, exist_ok=True)
695
696 with open(output_path, "w", encoding="utf-8") as f:
697 f.write(result)
698
699 self.logger.info(f"Wrote agent {agent_id} output to {output_path}")
700
701 except Exception as e:
702 self.logger.error(f"Failed to write agent {agent_id} output: {e}")
703
704 def stop(self) -> None:
705 """Stop the Cortex service."""
706 self.stop_event.set()
707
708 # Close Callosum connection
709 if self.callosum:
710 self.callosum.stop()
711
712 # Stop all running agents
713 with self.lock:
714 for agent in self.running_agents.values():
715 agent.stop()
716
717 def _emit_periodic_status(self) -> None:
718 """Emit status events every 5 seconds (runs in background thread)."""
719 while not self.stop_event.is_set():
720 try:
721 with self.lock:
722 agents = []
723 for agent_id, agent_proc in self.running_agents.items():
724 config = self.agent_requests.get(agent_id, {})
725 agents.append(
726 {
727 "agent_id": agent_id,
728 "name": config.get("name", "unknown"),
729 "provider": config.get("provider", "unknown"),
730 "elapsed_seconds": int(
731 time.time() - agent_proc.start_time
732 ),
733 }
734 )
735
736 # Only emit status when there are active agents
737 if agents:
738 self.callosum.emit(
739 "cortex",
740 "status",
741 running_agents=len(agents),
742 agents=agents,
743 )
744 except Exception as e:
745 self.logger.debug(f"Status emission failed: {e}")
746
747 time.sleep(5)
748
749 def get_status(self) -> Dict[str, Any]:
750 """Get service status information."""
751 with self.lock:
752 return {
753 "running_agents": len(self.running_agents),
754 "agent_ids": list(self.running_agents.keys()),
755 }
756
757
758def main() -> None:
759 """CLI entry point for the Cortex service."""
760 import argparse
761
762 from think.utils import setup_cli
763
764 parser = argparse.ArgumentParser(description="solstone Cortex Agent Manager")
765 args = setup_cli(parser)
766
767 # Set up logging
768 logging.basicConfig(
769 level=logging.INFO if not args.verbose else logging.DEBUG,
770 format="%(asctime)s - %(name)s - %(levelname)s - %(message)s",
771 )
772
773 # Start the service
774 cortex = CortexService()
775
776 try:
777 cortex.start()
778 except KeyboardInterrupt:
779 logging.getLogger(__name__).info("Shutting down Cortex service")
780 cortex.stop()
781
782
783if __name__ == "__main__":
784 main()