personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Unified agent CLI for solstone.
5
6Spawned by cortex for all agent types:
7- Tool-using agents (with configured tools)
8- Generators (transcript analysis, no tools)
9
10Both paths share unified config preparation and execution flow.
11Reads NDJSON config from stdin, emits JSONL events to stdout.
12"""
13
14from __future__ import annotations
15
16import argparse
17import asyncio
18import json
19import logging
20import os
21import sys
22import time
23import traceback
24from datetime import datetime, timezone
25from pathlib import Path
26from string import Template
27from typing import Any, Callable, Optional
28
29from think.cluster import cluster, cluster_period, cluster_span
30from think.providers.shared import Event
31from think.talent import (
32 get_agent_filter,
33 get_output_path,
34 get_talent_configs,
35 load_post_hook,
36 load_pre_hook,
37 load_prompt,
38 source_is_enabled,
39 source_is_required,
40)
41from think.utils import (
42 day_log,
43 day_path,
44 format_day,
45 format_segment_times,
46 get_journal,
47 now_ms,
48 segment_parse,
49 setup_cli,
50)
51
52LOG = logging.getLogger("think.agents")
53
54# Minimum content length for transcript-based generation
55MIN_INPUT_CHARS = 50
56
57
58def setup_logging(verbose: bool = False) -> logging.Logger:
59 """Configure logging for agent CLI."""
60 level = logging.DEBUG if verbose else logging.INFO
61 logging.basicConfig(level=level, stream=sys.stdout)
62 return LOG
63
64
65class JSONEventWriter:
66 """Write JSONL events to stdout and optionally to a file."""
67
68 def __init__(self, path: Optional[str] = None) -> None:
69 self.path = path
70 self.file = None
71 if path:
72 try:
73 Path(path).parent.mkdir(parents=True, exist_ok=True)
74 self.file = open(path, "a", encoding="utf-8")
75 except Exception:
76 pass # Fail silently if can't open file
77
78 def emit(self, data: Event) -> None:
79 line = json.dumps(data, ensure_ascii=False)
80 print(line)
81 sys.stdout.flush() # Ensure immediate output for cortex
82 if self.file:
83 try:
84 self.file.write(line + "\n")
85 self.file.flush()
86 except Exception:
87 pass # Fail silently on write errors
88
89 def close(self) -> None:
90 if self.file:
91 try:
92 self.file.close()
93 except Exception:
94 pass
95
96
97# =============================================================================
98# Unified Config Preparation
99# =============================================================================
100
101
102def _stream_content_description(stream: str | None) -> str:
103 """Return a human-readable content description for a stream.
104
105 Used in preamble templates so agents know what kind of content they're
106 analyzing (live capture vs imported conversations, notes, etc.).
107 """
108 if not stream:
109 return "audio transcription and screen recording"
110
111 STREAM_DESCRIPTIONS = {
112 "archon": "audio transcription and screen recording",
113 "import.chatgpt": "an imported ChatGPT conversation",
114 "import.claude": "an imported Claude conversation",
115 "import.gemini": "an imported Gemini conversation",
116 "import.ics": "an imported calendar event",
117 "import.obsidian": "an imported note from Obsidian",
118 "import.document": "an imported document (PDF)",
119 "import.kindle": "imported Kindle reading highlights",
120 }
121
122 if stream in STREAM_DESCRIPTIONS:
123 return STREAM_DESCRIPTIONS[stream]
124
125 # Fallback for unknown import streams
126 if stream.startswith("import."):
127 source = stream.split(".", 1)[1]
128 return f"imported content from {source}"
129
130 return "captured content"
131
132
133def _stream_import_guidance(stream: str | None) -> str:
134 """Return stream-conditional guidance for the activity agent.
135
136 For live capture, returns guidance about frame comparison and spoken audio.
137 For imports, returns content-type-specific analysis instructions.
138 Returns empty string for unknown streams.
139 """
140 if not stream or stream == "archon":
141 return (
142 "## Live Capture Guidance\n\n"
143 "ONLY report what CHANGED between screenshots or was SPOKEN in audio. "
144 "If content looks the same across frames, skip it entirely.\n\n"
145 "### Your Inputs\n\n"
146 "- **Screenshots**: Sampled across this segment. Compare frames — what's different?\n"
147 "- **Audio**: Transcript of speech. What was said?\n\n"
148 "### SKIP Entirely\n\n"
149 "- Windows that look identical in first and last frame\n"
150 "- Apps open but showing same content throughout\n"
151 "- Background windows never brought to focus\n"
152 '- Anything you\'d describe as "had open" or "was visible"'
153 )
154
155 IMPORT_GUIDANCE = {
156 "import.chatgpt": (
157 "This is an AI conversation. Summarize the key topics discussed, "
158 "questions asked, solutions proposed, and decisions reached. "
159 "Focus on what the human was trying to accomplish and what they learned or decided."
160 ),
161 "import.claude": (
162 "This is an AI conversation. Summarize the key topics discussed, "
163 "questions asked, solutions proposed, and decisions reached. "
164 "Focus on what the human was trying to accomplish and what they learned or decided."
165 ),
166 "import.gemini": (
167 "This is an AI conversation. Summarize the key topics discussed, "
168 "questions asked, solutions proposed, and decisions reached. "
169 "Focus on what the human was trying to accomplish and what they learned or decided."
170 ),
171 "import.ics": (
172 "This is a calendar event. Describe the event: its purpose, "
173 "participants, and any context from the description about why it was scheduled."
174 ),
175 "import.obsidian": (
176 "This is a note. Summarize the key ideas, references, and connections. "
177 "What was the author thinking about and working through?"
178 ),
179 "import.document": (
180 "This is an imported document (legal, financial, medical, or personal). "
181 "Extract all named parties and their roles (grantor, trustee, beneficiary, "
182 "attorney, witness, agent, etc.). Produce a plain-language summary that a "
183 "non-expert could understand. Identify key provisions, dates, conditions, "
184 "obligations, and deadlines. Note any time-sensitive requirements (renewal "
185 "dates, filing deadlines, review periods)."
186 ),
187 "import.kindle": (
188 "These are reading highlights. Describe what was being read and what "
189 "the reader found noteworthy. What themes or ideas do these highlights capture?"
190 ),
191 }
192
193 if stream in IMPORT_GUIDANCE:
194 return f"## Content Guidance\n\n{IMPORT_GUIDANCE[stream]}"
195
196 if stream.startswith("import."):
197 return (
198 "## Content Guidance\n\n"
199 "This is imported content. Summarize the key topics, actions, "
200 "and takeaways present in this segment."
201 )
202
203 return ""
204
205
206def _build_prompt_context(
207 day: str | None,
208 segment: str | None,
209 span: list[str] | None,
210 activity: dict | None = None,
211) -> dict[str, str]:
212 """Build context dict for prompt template substitution.
213
214 Args:
215 day: Day in YYYYMMDD format
216 segment: Segment key (HHMMSS_LEN)
217 span: List of segment keys
218 activity: Optional activity record dict for activity-scheduled agents
219
220 Returns:
221 Dict with template variables:
222 - day: Friendly format (e.g., "Sunday, February 2, 2025")
223 - day_YYYYMMDD: Raw day string (e.g., "20250202")
224 - segment_start, segment_end: Time strings if segment/span provided
225 - stream, content_description: Stream name and human-readable description
226 - activity_*: Activity fields if activity record provided
227 """
228 context: dict[str, str] = {}
229 if not day:
230 return context
231
232 context["day"] = format_day(day)
233 context["day_YYYYMMDD"] = day
234
235 # Stream-aware content description and import guidance
236 stream = os.environ.get("SOL_STREAM")
237 context["stream"] = stream or "archon"
238 context["content_description"] = _stream_content_description(stream)
239 context["import_guidance"] = _stream_import_guidance(stream)
240
241 if segment:
242 start_str, end_str = format_segment_times(segment)
243 if start_str and end_str:
244 context["segment"] = segment
245 context["segment_start"] = start_str
246 context["segment_end"] = end_str
247 elif span:
248 all_times = []
249 for seg in span:
250 start_time, end_time = segment_parse(seg)
251 if start_time and end_time:
252 all_times.append((start_time, end_time))
253
254 if all_times:
255 earliest_start = min(t[0] for t in all_times)
256 latest_end = max(t[1] for t in all_times)
257 context["segment_start"] = (
258 datetime.combine(datetime.today(), earliest_start)
259 .strftime("%I:%M %p")
260 .lstrip("0")
261 )
262 context["segment_end"] = (
263 datetime.combine(datetime.today(), latest_end)
264 .strftime("%I:%M %p")
265 .lstrip("0")
266 )
267
268 # Activity template variables
269 if activity:
270 from think.activities import estimate_duration_minutes
271
272 context["activity_id"] = activity.get("id", "")
273 context["activity_type"] = activity.get("activity", "")
274 context["activity_description"] = activity.get("description", "")
275 context["activity_level"] = str(activity.get("level_avg", 0.5))
276 entities = activity.get("active_entities", [])
277 context["activity_entities"] = ", ".join(entities) if entities else ""
278 segments = activity.get("segments", [])
279 context["activity_segments"] = ", ".join(segments) if segments else ""
280 context["activity_duration"] = str(estimate_duration_minutes(segments))
281
282 return context
283
284
285def _build_activity_context(
286 activity: dict,
287 span: list[str],
288 facet: str,
289 day: str,
290) -> str | None:
291 """Build activity context sections for $activity_context.
292
293 Args:
294 activity: Activity record dict (from activity records JSONL)
295 span: List of segment keys in the activity's span
296 facet: Facet name
297 day: Day in YYYYMMDD format
298
299 Returns:
300 Formatted string for the $activity_context template variable.
301 """
302 activity_cfg = {"context": True, "state": True, "focus": True}
303
304 parts: list[str] = []
305 activity_type = activity.get("activity", "unknown")
306
307 # --- activity.context: Activity metadata section ---
308 if activity_cfg.get("context"):
309 from think.activities import estimate_duration_minutes
310
311 level_avg = activity.get("level_avg", 0.5)
312 level_label = (
313 "high" if level_avg >= 0.75 else "medium" if level_avg >= 0.4 else "low"
314 )
315 segments = activity.get("segments", [])
316 duration = estimate_duration_minutes(segments)
317 entities = activity.get("active_entities", [])
318 entities_str = ", ".join(entities) if entities else "none detected"
319
320 parts.append(
321 f"## Activity Context\n"
322 f"- **Type:** {activity_type}\n"
323 f"- **Description:** {activity.get('description', '')}\n"
324 f"- **Engagement Level:** {level_avg} ({level_label})\n"
325 f"- **Duration:** ~{duration} minutes ({len(segments)} segments)\n"
326 f"- **Active Entities:** {entities_str}"
327 )
328
329 # --- activity.state: Per-segment activity descriptions ---
330 if activity_cfg.get("state"):
331 from think.activities import load_segment_activity_state
332
333 state_lines: list[str] = []
334 for seg in span:
335 entry = load_segment_activity_state(day, seg, facet, activity_type)
336 if entry:
337 level = entry.get("level", "")
338 desc = entry.get("description", "")
339 # Format segment time for readability
340 start_str, end_str = format_segment_times(seg)
341 time_label = (
342 f" ({start_str} - {end_str})" if start_str and end_str else ""
343 )
344 state_lines.append(
345 f"### {seg}{time_label}\n{activity_type} [{level}]: {desc}"
346 )
347
348 if state_lines:
349 parts.append("## Activity State Per Segment\n\n" + "\n\n".join(state_lines))
350
351 # --- activity.focus: Focusing instructions ---
352 if activity_cfg.get("focus"):
353 parts.append(
354 f"## Analysis Focus\n"
355 f"You are analyzing ONLY the **{activity_type}** activity within the "
356 f"**{facet}** facet. The transcript segments may contain content from "
357 f"other concurrent activities (e.g., background meetings, messaging). "
358 f"Use the Activity State Per Segment section above to identify which "
359 f"content relates to this activity, and ignore unrelated content. "
360 f"Your analysis should only cover what happened within this specific activity."
361 )
362
363 if not parts:
364 return None
365
366 return "\n\n".join(parts)
367
368
369def _load_transcript(
370 day: str,
371 segment: str | None,
372 span: list[str] | None,
373 sources: dict,
374) -> tuple[str, dict[str, int]]:
375 """Load and cluster transcript for day/segment/span.
376
377 Args:
378 day: Day in YYYYMMDD format
379 segment: Optional segment key
380 span: Optional list of segment keys
381 sources: Source config dict from frontmatter load
382
383 Returns:
384 Tuple of (transcript text, source_counts dict)
385 """
386 # Set segment key for token usage logging
387 if segment:
388 os.environ["SOL_SEGMENT"] = segment
389 elif span:
390 os.environ["SOL_SEGMENT"] = span[0]
391
392 # Convert sources config for clustering
393 cluster_sources: dict = {}
394 for k, v in sources.items():
395 if k == "agents":
396 agent_filter = get_agent_filter(v)
397 if agent_filter is None:
398 cluster_sources[k] = source_is_enabled(v)
399 elif not agent_filter:
400 cluster_sources[k] = False
401 else:
402 cluster_sources[k] = agent_filter
403 else:
404 cluster_sources[k] = source_is_enabled(v)
405
406 # Build transcript via clustering
407 stream = os.environ.get("SOL_STREAM")
408 if span:
409 return cluster_span(day, span, sources=cluster_sources, stream=stream)
410 elif segment:
411 return cluster_period(day, segment, sources=cluster_sources, stream=stream)
412 else:
413 return cluster(day, sources=cluster_sources)
414
415
416def prepare_config(request: dict) -> dict:
417 """Prepare complete agent config from request.
418
419 Single unified preparation path for all agent types. Takes raw request
420 from cortex and returns fully prepared config ready for execution.
421
422 Config fields produced:
423 - name: Agent name
424 - provider, model: Resolved from context/request
425 - user_instruction: Agent instruction from .md file
426 - prompt: User's runtime query/request
427 - transcript: Clustered transcript (if day provided)
428 - output_path: Where to write output (if output format set)
429 - skip_reason: Why to skip (if applicable)
430
431 Args:
432 request: Raw request dict from cortex
433
434 Returns:
435 Fully prepared config dict
436 """
437 from think.models import resolve_model_for_provider, resolve_provider
438 from think.talent import get_agent, key_to_context
439
440 name = request.get("name", "unified")
441 facet = request.get("facet")
442 day = request.get("day")
443 segment = request.get("segment")
444 span = request.get("span")
445 activity = request.get("activity")
446 output_format = request.get("output")
447 output_path_override = request.get("output_path")
448 user_prompt = request.get("prompt", "")
449
450 # Load complete agent config
451 config = get_agent(name, facet=facet, analysis_day=day)
452
453 # Config now contains all frontmatter fields plus:
454 # - path: Path to the .md file
455 # - sources: Source config for transcript loading
456 # - All frontmatter: tools, hook, disabled, thinking_budget, max_output_tokens, etc.
457
458 # Convert path string to Path object for convenience
459 agent_path = Path(config["path"]) if config.get("path") else None
460 sources = config.get("sources", {})
461
462 # Merge request values (request overrides agent defaults)
463 config.update({k: v for k, v in request.items() if v is not None})
464
465 # Populate stream from env if not already in config (dream passes it as
466 # SOL_STREAM env var but not as a top-level request key — hooks need it)
467 if "stream" not in config:
468 sol_stream = os.environ.get("SOL_STREAM")
469 if sol_stream:
470 config["stream"] = sol_stream
471
472 # Track additional state
473 config["span_mode"] = bool(span)
474 config["source_counts"] = {}
475
476 # Resolve provider and model from context
477 context = key_to_context(name)
478 agent_type = config["type"]
479 default_provider, default_model = resolve_provider(context, agent_type)
480
481 provider = config.get("provider") or default_provider
482 model = config.get("model")
483 if not model:
484 if provider != default_provider:
485 model = resolve_model_for_provider(context, provider, agent_type)
486 else:
487 model = default_model
488
489 config["provider"] = provider
490 config["model"] = model
491 config["context"] = context
492
493 # --- Provider fallback: preflight swap if primary is unhealthy ---
494 from think.models import (
495 get_backup_provider,
496 is_provider_healthy,
497 load_health_status,
498 should_recheck_health,
499 )
500 from think.providers import PROVIDER_METADATA
501
502 health_data = load_health_status()
503 config["health_stale"] = should_recheck_health(health_data)
504
505 if not is_provider_healthy(provider, health_data):
506 backup = get_backup_provider(agent_type)
507 if backup and backup != provider:
508 env_key = PROVIDER_METADATA.get(backup, {}).get("env_key")
509 if env_key and os.getenv(env_key):
510 config["fallback_from"] = provider
511 config["provider"] = backup
512 config["model"] = resolve_model_for_provider(
513 context, backup, agent_type
514 )
515
516 # Check if disabled
517 if config.get("disabled"):
518 config["skip_reason"] = "disabled"
519 return config
520
521 # Day-based processing: load transcript and apply template substitution
522 if day:
523 # Load transcript (only when agent has enabled sources to consume)
524 if any(source_is_enabled(v) for v in sources.values()):
525 transcript, source_counts = _load_transcript(day, segment, span, sources)
526 config["transcript"] = transcript
527 config["source_counts"] = source_counts
528 total_count = sum(source_counts.values())
529
530 # Check required sources
531 for source_type, mode in sources.items():
532 if source_is_required(mode) and source_counts.get(source_type, 0) == 0:
533 config["skip_reason"] = f"missing_required_{source_type}"
534 return config
535
536 # Skip if no content
537 if total_count == 0 or len(transcript.strip()) < MIN_INPUT_CHARS:
538 config["skip_reason"] = "no_input"
539 return config
540
541 # Note for limited recordings
542 if total_count < 3:
543 config["transcript"] = (
544 "**Input Note:** Limited recordings for this day. "
545 "Scale analysis to available input.\n\n" + transcript
546 )
547
548 # Reload agent instruction with template substitution for day/segment context
549 if agent_path and agent_path.exists():
550 from think.prompts import _resolve_facets
551
552 prompt_context = _build_prompt_context(
553 day, segment, span, activity=activity
554 )
555 prompt_context["facets"] = _resolve_facets(facet)
556
557 if activity and span and facet:
558 activity_ctx = _build_activity_context(activity, span, facet, day)
559 if activity_ctx:
560 prompt_context["activity_context"] = activity_ctx
561
562 agent_prompt_obj = load_prompt(
563 agent_path.stem, base_dir=agent_path.parent, context=prompt_context
564 )
565 config["user_instruction"] = agent_prompt_obj.text
566
567 # Set prompt (user's runtime query)
568 # For tool agents: prompt is the user's question
569 # For generators: prompt is typically empty (instruction is in user_instruction)
570 config["prompt"] = user_prompt
571
572 # Determine output path
573 if output_format:
574 if output_path_override:
575 config["output_path"] = Path(output_path_override)
576 elif day:
577 stream = os.environ.get("SOL_STREAM")
578 day_dir = str(day_path(day))
579 config["output_path"] = get_output_path(
580 day_dir,
581 name,
582 segment=segment,
583 output_format=output_format,
584 facet=facet,
585 stream=stream,
586 )
587
588 return config
589
590
591def validate_config(config: dict) -> str | None:
592 """Validate prepared config.
593
594 Args:
595 config: Prepared config dict
596
597 Returns:
598 Error message string if invalid, None if valid
599 """
600 is_cogitate = config["type"] == "cogitate"
601 has_prompt = bool(config.get("prompt"))
602 has_user_instruction = bool(config.get("user_instruction"))
603 has_day = bool(config.get("day"))
604
605 # Cogitate agents need a prompt (user's question)
606 if is_cogitate and not has_prompt:
607 return "Missing 'prompt' field for cogitate agent"
608
609 # Generate prompts need either day (transcript) or user_instruction
610 if not is_cogitate and not has_day and not has_user_instruction and not has_prompt:
611 return "Invalid config: must have 'type', 'day', or 'prompt'"
612
613 # Segment/span requires day
614 if (config.get("segment") or config.get("span")) and not has_day:
615 return "Invalid config: 'segment' or 'span' requires 'day'"
616
617 return None
618
619
620# =============================================================================
621# Hook Execution
622# =============================================================================
623
624
625def _run_pre_hooks(config: dict) -> dict:
626 """Run pre-processing hooks, return dict of modifications.
627
628 Args:
629 config: Full config dict (hooks receive this directly)
630
631 Returns:
632 Dict of field modifications to apply to config
633 """
634 pre_hook = load_pre_hook(config)
635 if not pre_hook:
636 return {}
637
638 try:
639 modifications = pre_hook(config)
640 if modifications:
641 LOG.info("Pre-hook returned modifications: %s", list(modifications.keys()))
642 return modifications
643 except Exception as exc:
644 LOG.error("Pre-hook failed: %s", exc)
645
646 return {}
647
648
649def _apply_template_vars(config: dict, template_vars: dict) -> None:
650 """Substitute template_vars into text fields of config in-place.
651
652 Expands each key with auto-capitalize convention (matching load_prompt):
653 {"foo": "bar"} -> $foo="bar", $Foo="Bar"
654 """
655 expanded = {}
656 for key, value in template_vars.items():
657 str_value = str(value)
658 expanded[key] = str_value
659 expanded[key.capitalize()] = str_value.capitalize()
660
661 for field in ("user_instruction", "transcript", "prompt"):
662 text = config.get(field)
663 if text:
664 config[field] = Template(text).safe_substitute(expanded)
665
666
667def _run_post_hooks(result: str, config: dict) -> str:
668 """Run post-processing hooks, return transformed result.
669
670 Args:
671 result: LLM output text
672 config: Full config dict (hooks receive this directly)
673
674 Returns:
675 Transformed result (or original if no hook)
676 """
677 post_hook = load_post_hook(config)
678 if not post_hook:
679 return result
680
681 try:
682 hook_result = post_hook(result, config)
683 if hook_result is not None:
684 LOG.info("Post-hook transformed result")
685 return hook_result
686 except Exception as exc:
687 LOG.error("Post-hook failed: %s", exc)
688
689 return result
690
691
692# =============================================================================
693# Unified Agent Execution
694# =============================================================================
695
696
697def _write_output(output_path: Path, result: str) -> None:
698 """Write result to output file."""
699 output_path.parent.mkdir(parents=True, exist_ok=True)
700 with open(output_path, "w", encoding="utf-8") as f:
701 f.write(result)
702 LOG.info("Wrote output to %s", output_path)
703
704
705def _build_dry_run_event(config: dict, before_values: dict) -> dict:
706 """Build a dry-run event with all context."""
707 agent_type = config["type"]
708
709 event: dict[str, Any] = {
710 "event": "dry_run",
711 "ts": now_ms(),
712 "type": agent_type,
713 "name": config.get("name", "unified"),
714 "provider": config.get("provider", ""),
715 "model": config.get("model") or "unknown",
716 "system_instruction": config.get("system_instruction", ""),
717 "user_instruction": config.get("user_instruction", ""),
718 "prompt": config.get("prompt", ""),
719 }
720
721 extra_context = config.get("extra_context", "")
722 if extra_context:
723 event["extra_context"] = extra_context
724
725 # Day-based fields
726 if config.get("day"):
727 event["day"] = config["day"]
728 event["segment"] = config.get("segment")
729 transcript = config.get("transcript", "")
730 if transcript:
731 event["transcript"] = transcript
732 event["transcript_chars"] = len(transcript)
733 event["transcript_files"] = sum(config.get("source_counts", {}).values())
734 output_path = Path(config["output_path"]) if config.get("output_path") else None
735 if output_path:
736 event["output_path"] = str(output_path)
737
738 # Show before values for comparison
739 for key, before_val in before_values.items():
740 current_val = config.get(key, "")
741 if current_val != before_val:
742 if key == "transcript":
743 event["transcript_before_chars"] = len(before_val)
744 else:
745 event[f"{key}_before"] = before_val
746
747 return event
748
749
750_NON_RETRYABLE_ERRORS = (
751 ValueError,
752 json.JSONDecodeError,
753 KeyError,
754 TypeError,
755 AttributeError,
756 FileNotFoundError,
757 PermissionError,
758 NotImplementedError,
759)
760
761
762def _is_retryable_error(exc: Exception) -> bool:
763 """Check if an exception is likely a provider error worth retrying.
764
765 Returns False for local/code errors (ValueError, KeyError, etc.).
766 Returns True for everything else (SDK connection, timeout, server errors).
767 """
768 return not isinstance(exc, _NON_RETRYABLE_ERRORS)
769
770
771async def _execute_with_tools(
772 config: dict,
773 emit_event: Callable[[dict], None],
774) -> None:
775 """Execute tool-using agent via provider's run_cogitate.
776
777 Args:
778 config: Prepared config dict
779 emit_event: Event emission callback
780 """
781 from .providers import PROVIDER_REGISTRY, get_provider_module
782
783 provider = config.get("provider", "google")
784 output_path = Path(config["output_path"]) if config.get("output_path") else None
785
786 if provider not in PROVIDER_REGISTRY:
787 valid = ", ".join(sorted(PROVIDER_REGISTRY.keys()))
788 raise ValueError(f"Unknown provider: {provider!r}. Valid providers: {valid}")
789
790 provider_mod = get_provider_module(provider)
791
792 # Wrapper to intercept finish event for post-processing
793 def agent_emit_event(data: Event) -> None:
794 if data.get("event") == "finish":
795 result = data.get("result", "")
796 result = _run_post_hooks(result, config)
797 if result != data.get("result", ""):
798 data = {**data, "result": result}
799 if output_path and result:
800 _write_output(output_path, result)
801
802 # Filter out start events from providers (we already emitted ours)
803 if data.get("event") == "start":
804 return
805
806 emit_event(data)
807
808 try:
809 await provider_mod.run_cogitate(config=config, on_event=agent_emit_event)
810 except Exception as exc:
811 if not _is_retryable_error(exc) or config.get("fallback_from"):
812 raise
813 from think.models import (
814 get_backup_provider,
815 resolve_model_for_provider,
816 )
817 from think.providers import PROVIDER_METADATA
818
819 backup = get_backup_provider("cogitate")
820 if not backup or backup == provider:
821 raise
822 env_key = PROVIDER_METADATA.get(backup, {}).get("env_key")
823 if not env_key or not os.getenv(env_key):
824 raise
825
826 context = config.get("context")
827 if not context:
828 from think.talent import key_to_context
829
830 context = key_to_context(config.get("name", "unified"))
831 backup_model = resolve_model_for_provider(context, backup, "cogitate")
832
833 emit_event(
834 {
835 "event": "fallback",
836 "ts": now_ms(),
837 "original_provider": provider,
838 "backup_provider": backup,
839 "reason": "on_failure",
840 "error": str(exc),
841 }
842 )
843
844 config["fallback_from"] = provider
845 config["provider"] = backup
846 config["model"] = backup_model
847
848 backup_mod = get_provider_module(backup)
849
850 # Suppress error events from backup provider — if backup also fails
851 # we report the original error, not the backup's error.
852 def backup_emit(data: Event) -> None:
853 if data.get("event") == "error":
854 return
855 agent_emit_event(data)
856
857 try:
858 await backup_mod.run_cogitate(config=config, on_event=backup_emit)
859 except Exception:
860 # Ensure the original error is reported by the caller even if the
861 # primary provider already emitted its own error event (_evented).
862 if hasattr(exc, "_evented"):
863 delattr(exc, "_evented")
864 raise exc
865 finally:
866 if config.get("health_stale"):
867 from think.models import request_health_recheck
868
869 request_health_recheck()
870 config["health_stale"] = False
871
872
873async def _execute_generate(
874 config: dict,
875 emit_event: Callable[[dict], None],
876) -> None:
877 """Execute single-shot generation (no tools).
878
879 Args:
880 config: Prepared config dict
881 emit_event: Event emission callback
882 """
883 from think.models import generate_with_result
884 from think.talent import key_to_context
885
886 name = config.get("name", "unified")
887 transcript = config.get("transcript", "")
888 user_instruction = config.get("user_instruction", "")
889 prompt = config.get("prompt", "")
890 system_instruction = config.get("system_instruction") or None
891 output_path = Path(config["output_path"]) if config.get("output_path") else None
892 output_format = config.get("output")
893
894 # Get generation parameters from config (set in frontmatter)
895 thinking_budget = config.get("thinking_budget") or 8192 * 3
896 max_output_tokens = config.get("max_output_tokens") or 8192 * 6
897 is_json_output = output_format == "json"
898
899 # Derive LLM request timeout from token budget: scale with output size,
900 # floor at 120s, cap at 480s (well under cortex's 600s subprocess kill).
901 timeout_s = config.get("timeout_s") or min(
902 480, max(120, (max_output_tokens + thinking_budget) // 100)
903 )
904
905 # Build contents: transcript + instruction + prompt
906 contents = []
907 if transcript:
908 contents.append(transcript)
909 if user_instruction:
910 contents.append(user_instruction)
911 if prompt:
912 contents.append(prompt)
913
914 # Fallback if no contents
915 if not contents:
916 contents = ["No input provided."]
917
918 context = key_to_context(name)
919 try:
920 gen_result = generate_with_result(
921 contents=contents,
922 context=context,
923 temperature=0.3,
924 max_output_tokens=max_output_tokens,
925 thinking_budget=thinking_budget,
926 system_instruction=system_instruction,
927 json_output=is_json_output,
928 timeout_s=timeout_s,
929 )
930 except Exception as exc:
931 if not _is_retryable_error(exc) or config.get("fallback_from"):
932 raise
933 from think.models import (
934 get_backup_provider,
935 resolve_model_for_provider,
936 )
937 from think.providers import PROVIDER_METADATA
938
939 provider = config.get("provider", "google")
940 backup = get_backup_provider("generate")
941 if not backup or backup == provider:
942 raise
943 env_key = PROVIDER_METADATA.get(backup, {}).get("env_key")
944 if not env_key or not os.getenv(env_key):
945 raise
946
947 backup_model = resolve_model_for_provider(context, backup, "generate")
948
949 emit_event(
950 {
951 "event": "fallback",
952 "ts": now_ms(),
953 "original_provider": provider,
954 "backup_provider": backup,
955 "reason": "on_failure",
956 "error": str(exc),
957 }
958 )
959
960 config["fallback_from"] = provider
961 config["provider"] = backup
962 config["model"] = backup_model
963
964 try:
965 gen_result = generate_with_result(
966 contents=contents,
967 context=context,
968 temperature=0.3,
969 max_output_tokens=max_output_tokens,
970 thinking_budget=thinking_budget,
971 system_instruction=system_instruction,
972 json_output=is_json_output,
973 timeout_s=timeout_s,
974 provider=backup,
975 model=backup_model,
976 )
977 except Exception:
978 raise exc
979 finally:
980 if config.get("health_stale"):
981 from think.models import request_health_recheck
982
983 request_health_recheck()
984 config["health_stale"] = False
985
986 result = gen_result["text"]
987 usage_data = gen_result.get("usage")
988
989 # Run post-hooks
990 result = _run_post_hooks(result, config)
991
992 # Write output
993 if output_path and result:
994 _write_output(output_path, result)
995
996 # Emit finish event
997 finish_event: dict[str, Any] = {
998 "event": "finish",
999 "ts": now_ms(),
1000 "result": result,
1001 }
1002 if usage_data:
1003 finish_event["usage"] = usage_data
1004 emit_event(finish_event)
1005
1006
1007async def _run_agent(
1008 config: dict,
1009 emit_event: Callable[[dict], None],
1010 dry_run: bool = False,
1011) -> None:
1012 """Execute agent based on config.
1013
1014 Unified execution path for all agent types. Handles:
1015 - Skip conditions (disabled, no input, etc.)
1016 - Output existence checking (skip if exists unless refresh)
1017 - Pre/post hooks
1018 - Dry-run mode
1019 - Routing to tool or generate execution
1020
1021 Args:
1022 config: Fully prepared config dict
1023 emit_event: Callback to emit JSONL events
1024 dry_run: If True, emit dry_run event instead of calling LLM
1025 """
1026 name = config.get("name", "unified")
1027 provider = config.get("provider", "google")
1028 model = config.get("model")
1029 is_cogitate = config["type"] == "cogitate"
1030 refresh = config.get("refresh", False)
1031 output_path = Path(config["output_path"]) if config.get("output_path") else None
1032
1033 # Emit start event
1034 start_event: dict[str, Any] = {
1035 "event": "start",
1036 "ts": now_ms(),
1037 "prompt": config.get("prompt", ""),
1038 "name": name,
1039 "model": model or "unknown",
1040 "provider": provider,
1041 }
1042 if config.get("session_id"):
1043 start_event["session_id"] = config["session_id"]
1044 if config.get("chat_id"):
1045 start_event["chat_id"] = config["chat_id"]
1046 emit_event(start_event)
1047
1048 # Emit preflight fallback event if provider was swapped
1049 if config.get("fallback_from"):
1050 emit_event(
1051 {
1052 "event": "fallback",
1053 "ts": now_ms(),
1054 "original_provider": config["fallback_from"],
1055 "backup_provider": config["provider"],
1056 "reason": "preflight",
1057 }
1058 )
1059
1060 # Handle skip conditions
1061 skip_reason = config.get("skip_reason")
1062 if skip_reason:
1063 LOG.info("Config %s skipped: %s", name, skip_reason)
1064 emit_event(
1065 {
1066 "event": "finish",
1067 "ts": now_ms(),
1068 "result": "",
1069 "skipped": skip_reason,
1070 }
1071 )
1072 if config.get("day"):
1073 day_log(config["day"], f"agent {name} skipped ({skip_reason})")
1074 return
1075
1076 # Check if output already exists (applies to both tool agents and generators)
1077 if output_path and not refresh and not dry_run:
1078 if output_path.exists() and output_path.stat().st_size > 0:
1079 LOG.info("Output exists, loading: %s", output_path)
1080 with open(output_path, "r") as f:
1081 result = f.read()
1082 emit_event(
1083 {
1084 "event": "finish",
1085 "ts": now_ms(),
1086 "result": result,
1087 }
1088 )
1089 return
1090
1091 # Capture state before pre-hooks
1092 before_values = {
1093 "prompt": config.get("prompt", ""),
1094 "system_instruction": config.get("system_instruction", ""),
1095 "user_instruction": config.get("user_instruction", ""),
1096 "transcript": config.get("transcript", ""),
1097 }
1098 before_values["extra_context"] = config.get("extra_context", "")
1099
1100 # Run pre-hooks
1101 modifications = _run_pre_hooks(config)
1102 template_vars = modifications.pop("template_vars", None)
1103 for key, value in modifications.items():
1104 config[key] = value
1105 if template_vars:
1106 LOG.info("Pre-hook template_vars: %s", list(template_vars.keys()))
1107 _apply_template_vars(config, template_vars)
1108
1109 # Handle skip conditions set by pre-hooks
1110 skip_reason = config.get("skip_reason")
1111 if skip_reason:
1112 LOG.info("Config %s skipped by pre-hook: %s", name, skip_reason)
1113 emit_event(
1114 {
1115 "event": "finish",
1116 "ts": now_ms(),
1117 "result": "",
1118 "skipped": skip_reason,
1119 }
1120 )
1121 if config.get("day"):
1122 day_log(config["day"], f"agent {name} skipped ({skip_reason})")
1123 return
1124
1125 # Dry-run mode
1126 if dry_run:
1127 emit_event(_build_dry_run_event(config, before_values))
1128 return
1129
1130 # Execute based on agent type
1131 if is_cogitate:
1132 await _execute_with_tools(config, emit_event)
1133 else:
1134 await _execute_generate(config, emit_event)
1135
1136 # Log completion
1137 if config.get("day"):
1138 day_log(config["day"], f"agent {name} ok")
1139
1140
1141# =============================================================================
1142# Utility Functions
1143# =============================================================================
1144
1145
1146def scan_day(day: str) -> dict[str, list[str]]:
1147 """Return lists of processed and pending daily generator output files.
1148
1149 Only scans daily generators (schedule='daily'). Segment generators are
1150 stored within segment directories and are not included here.
1151 """
1152 day_dir = day_path(day)
1153 daily_generators = get_talent_configs(
1154 type="generate", schedule="daily", include_disabled=True
1155 )
1156 processed: list[str] = []
1157 pending: list[str] = []
1158 for key, meta in sorted(daily_generators.items()):
1159 output_format = meta.get("output")
1160 output_file = get_output_path(day_dir, key, output_format=output_format)
1161 if output_file.exists():
1162 processed.append(os.path.join("agents", output_file.name))
1163 else:
1164 pending.append(os.path.join("agents", output_file.name))
1165 return {"processed": sorted(processed), "repairable": sorted(pending)}
1166
1167
1168def _check_generate(provider_name: str, tier: int, timeout: int) -> tuple[bool, str]:
1169 """Check generate interface for a provider."""
1170 from think.models import PROVIDER_DEFAULTS
1171 from think.providers import PROVIDER_METADATA, get_provider_module
1172
1173 env_key = PROVIDER_METADATA[provider_name]["env_key"]
1174 if not os.getenv(env_key):
1175 return False, f"FAIL: {env_key} not set"
1176
1177 try:
1178 module = get_provider_module(provider_name)
1179 model = PROVIDER_DEFAULTS[provider_name][tier]
1180 result = module.run_generate(
1181 contents="Say OK",
1182 model=model,
1183 temperature=0,
1184 max_output_tokens=16,
1185 system_instruction=None,
1186 json_output=False,
1187 thinking_budget=None,
1188 timeout_s=timeout,
1189 )
1190 text = result.get("text", "") if isinstance(result, dict) else ""
1191 if text:
1192 usage = result.get("usage") if isinstance(result, dict) else None
1193 if usage:
1194 from think.models import log_token_usage
1195
1196 log_token_usage(
1197 model=PROVIDER_DEFAULTS[provider_name][tier],
1198 usage=usage,
1199 context="health.check.generate",
1200 type="generate",
1201 )
1202 return True, "OK"
1203 return False, "FAIL: empty response text"
1204 except Exception as exc:
1205 return False, f"FAIL: {exc}"
1206
1207
1208async def _check_cogitate(
1209 provider_name: str, tier: int, timeout: int
1210) -> tuple[bool, str]:
1211 """Check cogitate interface for a provider by running a real prompt."""
1212 from think.models import PROVIDER_DEFAULTS
1213 from think.providers import get_provider_module
1214
1215 try:
1216 module = get_provider_module(provider_name)
1217 model = PROVIDER_DEFAULTS[provider_name][tier]
1218 config = {"prompt": "Say OK", "model": model}
1219 result = await asyncio.wait_for(
1220 module.run_cogitate(config=config, on_event=None),
1221 timeout=timeout,
1222 )
1223 if result:
1224 return True, "OK"
1225 return False, "FAIL: empty response"
1226 except asyncio.TimeoutError:
1227 return False, f"FAIL: timed out after {timeout}s"
1228 except Exception as exc:
1229 return False, f"FAIL: {exc}"
1230
1231
1232async def _run_check(args: argparse.Namespace) -> None:
1233 """Run connectivity checks against AI providers."""
1234 from think.models import PROVIDER_DEFAULTS, TIER_FLASH, TIER_LITE, TIER_PRO
1235 from think.providers import PROVIDER_REGISTRY
1236
1237 # --targeted: only check configured provider+tier pairs
1238 targeted_pairs = None
1239 if args.targeted and not args.provider and not args.tier:
1240 import fcntl
1241
1242 from think.models import TYPE_DEFAULTS, get_backup_provider
1243 from think.utils import get_config
1244
1245 targeted_pairs = set()
1246 config = get_config()
1247 providers_config = config.get("providers", {})
1248 for agent_type, defaults in TYPE_DEFAULTS.items():
1249 type_config = providers_config.get(agent_type, {})
1250 provider = type_config.get("provider", defaults["provider"])
1251 tier = type_config.get("tier", defaults["tier"])
1252 targeted_pairs.add((provider, tier))
1253 backup = get_backup_provider(agent_type)
1254 if backup:
1255 targeted_pairs.add((backup, tier))
1256
1257 # flock dedup: only one targeted check runs at a time
1258 lock_dir = Path(get_journal()) / "health"
1259 lock_dir.mkdir(parents=True, exist_ok=True)
1260 lock_fd = open(lock_dir / "recheck.lock", "w")
1261 try:
1262 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
1263 except OSError:
1264 lock_fd.close()
1265 return
1266
1267 if args.provider:
1268 providers = args.provider
1269 for name in providers:
1270 if name not in PROVIDER_REGISTRY:
1271 available = ", ".join(PROVIDER_REGISTRY.keys())
1272 print(
1273 f"Unknown provider: {name}. Available providers: {available}",
1274 file=sys.stderr,
1275 )
1276 sys.exit(1)
1277 else:
1278 providers = list(PROVIDER_REGISTRY.keys())
1279
1280 interfaces = [args.interface] if args.interface else ["generate", "cogitate"]
1281
1282 tier_names = {1: "pro", 2: "flash", 3: "lite"}
1283 tiers = [args.tier] if args.tier else [TIER_PRO, TIER_FLASH, TIER_LITE]
1284
1285 # Pre-compute column widths
1286 provider_width = max(len(n) for n in providers) if providers else 0
1287 tier_width = max(len(tier_names[t]) for t in tiers)
1288 # Resolve all model names to get max width
1289 model_names = set()
1290 for p in providers:
1291 for t in tiers:
1292 model_names.add(PROVIDER_DEFAULTS[p][t])
1293 model_width = max(len(m) for m in model_names) if model_names else 0
1294 interface_width = max(len(n) for n in interfaces) if interfaces else 0
1295
1296 total = 0
1297 passed = 0
1298 failed = 0
1299 results = []
1300 cache = {} # (provider, model, interface) -> (ok, message, source_tier)
1301
1302 for provider_name in providers:
1303 for tier in tiers:
1304 if (
1305 targeted_pairs is not None
1306 and (provider_name, tier) not in targeted_pairs
1307 ):
1308 continue
1309 model = PROVIDER_DEFAULTS[provider_name][tier]
1310 for interface_name in interfaces:
1311 cache_key = (provider_name, model, interface_name)
1312 if cache_key in cache:
1313 ok, message, source_tier = cache[cache_key]
1314 elapsed_s = 0.0
1315 elapsed_s_rounded = 0.0
1316 reused_from = source_tier
1317 else:
1318 start = time.perf_counter()
1319 if interface_name == "generate":
1320 ok, message = _check_generate(provider_name, tier, args.timeout)
1321 else:
1322 ok, message = await _check_cogitate(
1323 provider_name, tier, args.timeout
1324 )
1325 elapsed_s = time.perf_counter() - start
1326 elapsed_s_rounded = round(elapsed_s, 1)
1327 cache[cache_key] = (ok, message, tier_names[tier])
1328 reused_from = None
1329
1330 result = {
1331 "provider": provider_name,
1332 "tier": tier_names[tier],
1333 "model": model,
1334 "interface": interface_name,
1335 "ok": bool(ok),
1336 "message": str(message),
1337 "elapsed_s": elapsed_s_rounded,
1338 }
1339 if reused_from:
1340 result["reused_from"] = reused_from
1341 results.append(result)
1342
1343 if not args.json:
1344 if reused_from:
1345 mark = "="
1346 display_message = f"{message} (={reused_from})"
1347 else:
1348 mark = "✓" if ok else "✗"
1349 display_message = str(message)
1350 print(
1351 f"{mark} "
1352 f"{provider_name:<{provider_width}} "
1353 f"{tier_names[tier]:<{tier_width}} "
1354 f"{model:<{model_width}} "
1355 f"{interface_name:<{interface_width}} "
1356 f"{display_message} ({elapsed_s:.1f}s)"
1357 )
1358
1359 total += 1
1360 if ok:
1361 passed += 1
1362 else:
1363 failed += 1
1364
1365 # Determine if any provider is fully failed (all checks failed)
1366 provider_failed = False
1367 providers_seen: dict[str, list[bool]] = {}
1368 for r in results:
1369 providers_seen.setdefault(r["provider"], []).append(r["ok"])
1370 for ok_values in providers_seen.values():
1371 if ok_values and not any(ok_values):
1372 provider_failed = True
1373 break
1374
1375 # Write results to health file
1376 payload = {
1377 "results": results,
1378 "summary": {"total": total, "passed": passed, "failed": failed},
1379 "checked_at": datetime.now(timezone.utc).isoformat(),
1380 }
1381 health_dir = Path(get_journal()) / "health"
1382 health_dir.mkdir(parents=True, exist_ok=True)
1383 (health_dir / "agents.json").write_text(json.dumps(payload, indent=2))
1384
1385 if args.json:
1386 print(
1387 json.dumps(
1388 {
1389 "results": results,
1390 "summary": {"total": total, "passed": passed, "failed": failed},
1391 },
1392 indent=2,
1393 )
1394 )
1395 else:
1396 print(f"{total} checks: {passed} passed, {failed} failed")
1397 sys.exit(1 if provider_failed else 0)
1398
1399
1400# =============================================================================
1401# Main Entry Point
1402# =============================================================================
1403
1404
1405async def main_async() -> None:
1406 """NDJSON-based CLI for agents."""
1407 from think.providers import PROVIDER_REGISTRY
1408
1409 parser = argparse.ArgumentParser(
1410 description="solstone Agent CLI - Accepts NDJSON input via stdin"
1411 )
1412 parser.add_argument(
1413 "--dry-run",
1414 action="store_true",
1415 help="Show what would be sent to the provider without calling the LLM",
1416 )
1417 subparsers = parser.add_subparsers(dest="subcommand")
1418 check_parser = subparsers.add_parser("check", help="Check AI provider connectivity")
1419 check_parser.add_argument(
1420 "--provider",
1421 action="append",
1422 help=f"Provider to check (repeatable). Available: {', '.join(PROVIDER_REGISTRY.keys())}",
1423 )
1424 check_parser.add_argument(
1425 "--interface",
1426 choices=["generate", "cogitate"],
1427 default=None,
1428 help="Interface to check (default: both)",
1429 )
1430 check_parser.add_argument(
1431 "--timeout",
1432 type=int,
1433 default=30,
1434 help="Timeout in seconds for generate checks (default: 30)",
1435 )
1436 check_parser.add_argument(
1437 "--tier",
1438 type=int,
1439 choices=[1, 2, 3],
1440 default=None,
1441 help="Tier to check (1=pro, 2=flash, 3=lite; default: all)",
1442 )
1443 check_parser.add_argument(
1444 "--json", action="store_true", help="Output results as JSON"
1445 )
1446 check_parser.add_argument(
1447 "--targeted",
1448 action="store_true",
1449 help="Only check configured provider+tier pairs (used by automated rechecks)",
1450 )
1451
1452 args = setup_cli(parser)
1453 if args.subcommand == "check":
1454 await _run_check(args)
1455 return
1456
1457 dry_run = args.dry_run
1458
1459 app_logger = setup_logging(args.verbose)
1460 event_writer = JSONEventWriter(None)
1461
1462 def emit_event(data: Event) -> None:
1463 if "ts" not in data:
1464 data["ts"] = now_ms()
1465 event_writer.emit(data)
1466
1467 try:
1468 app_logger.info("Processing NDJSON input from stdin")
1469 for line in sys.stdin:
1470 line = line.strip()
1471 if not line:
1472 continue
1473
1474 try:
1475 request = json.loads(line)
1476 config = prepare_config(request)
1477
1478 error = validate_config(config)
1479 if error:
1480 emit_event({"event": "error", "error": error, "ts": now_ms()})
1481 continue
1482
1483 await _run_agent(config, emit_event, dry_run=dry_run)
1484
1485 except json.JSONDecodeError as e:
1486 emit_event(
1487 {
1488 "event": "error",
1489 "error": f"Invalid JSON: {str(e)}",
1490 "ts": now_ms(),
1491 }
1492 )
1493 except Exception as e:
1494 if getattr(e, "_evented", False):
1495 continue
1496 from think.models import IncompleteJSONError
1497
1498 event = {
1499 "event": "error",
1500 "error": str(e),
1501 "trace": traceback.format_exc(),
1502 "ts": now_ms(),
1503 }
1504 if isinstance(e, IncompleteJSONError):
1505 from think.hooks import log_extraction_failure
1506
1507 event["partial_text_length"] = len(e.partial_text)
1508 event["partial_text_tail"] = e.partial_text[-500:]
1509 name = config.get("name", "unknown") if config else "unknown"
1510 log_extraction_failure(e, name)
1511 emit_event(event)
1512
1513 except Exception as exc:
1514 err = {
1515 "event": "error",
1516 "error": str(exc),
1517 "trace": traceback.format_exc(),
1518 }
1519 if not getattr(exc, "_evented", False):
1520 emit_event(err)
1521 raise
1522 finally:
1523 event_writer.close()
1524
1525
1526def main() -> None:
1527 """Entry point wrapper."""
1528 asyncio.run(main_async())
1529
1530
1531__all__ = [
1532 "prepare_config",
1533 "validate_config",
1534 "scan_day",
1535]