personal memory agent
at main 1535 lines 53 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Unified agent CLI for solstone. 5 6Spawned by cortex for all agent types: 7- Tool-using agents (with configured tools) 8- Generators (transcript analysis, no tools) 9 10Both paths share unified config preparation and execution flow. 11Reads NDJSON config from stdin, emits JSONL events to stdout. 12""" 13 14from __future__ import annotations 15 16import argparse 17import asyncio 18import json 19import logging 20import os 21import sys 22import time 23import traceback 24from datetime import datetime, timezone 25from pathlib import Path 26from string import Template 27from typing import Any, Callable, Optional 28 29from think.cluster import cluster, cluster_period, cluster_span 30from think.providers.shared import Event 31from think.talent import ( 32 get_agent_filter, 33 get_output_path, 34 get_talent_configs, 35 load_post_hook, 36 load_pre_hook, 37 load_prompt, 38 source_is_enabled, 39 source_is_required, 40) 41from think.utils import ( 42 day_log, 43 day_path, 44 format_day, 45 format_segment_times, 46 get_journal, 47 now_ms, 48 segment_parse, 49 setup_cli, 50) 51 52LOG = logging.getLogger("think.agents") 53 54# Minimum content length for transcript-based generation 55MIN_INPUT_CHARS = 50 56 57 58def setup_logging(verbose: bool = False) -> logging.Logger: 59 """Configure logging for agent CLI.""" 60 level = logging.DEBUG if verbose else logging.INFO 61 logging.basicConfig(level=level, stream=sys.stdout) 62 return LOG 63 64 65class JSONEventWriter: 66 """Write JSONL events to stdout and optionally to a file.""" 67 68 def __init__(self, path: Optional[str] = None) -> None: 69 self.path = path 70 self.file = None 71 if path: 72 try: 73 Path(path).parent.mkdir(parents=True, exist_ok=True) 74 self.file = open(path, "a", encoding="utf-8") 75 except Exception: 76 pass # Fail silently if can't open file 77 78 def emit(self, data: Event) -> None: 79 line = json.dumps(data, ensure_ascii=False) 80 print(line) 81 sys.stdout.flush() # Ensure immediate output for cortex 82 if self.file: 83 try: 84 self.file.write(line + "\n") 85 self.file.flush() 86 except Exception: 87 pass # Fail silently on write errors 88 89 def close(self) -> None: 90 if self.file: 91 try: 92 self.file.close() 93 except Exception: 94 pass 95 96 97# ============================================================================= 98# Unified Config Preparation 99# ============================================================================= 100 101 102def _stream_content_description(stream: str | None) -> str: 103 """Return a human-readable content description for a stream. 104 105 Used in preamble templates so agents know what kind of content they're 106 analyzing (live capture vs imported conversations, notes, etc.). 107 """ 108 if not stream: 109 return "audio transcription and screen recording" 110 111 STREAM_DESCRIPTIONS = { 112 "archon": "audio transcription and screen recording", 113 "import.chatgpt": "an imported ChatGPT conversation", 114 "import.claude": "an imported Claude conversation", 115 "import.gemini": "an imported Gemini conversation", 116 "import.ics": "an imported calendar event", 117 "import.obsidian": "an imported note from Obsidian", 118 "import.document": "an imported document (PDF)", 119 "import.kindle": "imported Kindle reading highlights", 120 } 121 122 if stream in STREAM_DESCRIPTIONS: 123 return STREAM_DESCRIPTIONS[stream] 124 125 # Fallback for unknown import streams 126 if stream.startswith("import."): 127 source = stream.split(".", 1)[1] 128 return f"imported content from {source}" 129 130 return "captured content" 131 132 133def _stream_import_guidance(stream: str | None) -> str: 134 """Return stream-conditional guidance for the activity agent. 135 136 For live capture, returns guidance about frame comparison and spoken audio. 137 For imports, returns content-type-specific analysis instructions. 138 Returns empty string for unknown streams. 139 """ 140 if not stream or stream == "archon": 141 return ( 142 "## Live Capture Guidance\n\n" 143 "ONLY report what CHANGED between screenshots or was SPOKEN in audio. " 144 "If content looks the same across frames, skip it entirely.\n\n" 145 "### Your Inputs\n\n" 146 "- **Screenshots**: Sampled across this segment. Compare frames — what's different?\n" 147 "- **Audio**: Transcript of speech. What was said?\n\n" 148 "### SKIP Entirely\n\n" 149 "- Windows that look identical in first and last frame\n" 150 "- Apps open but showing same content throughout\n" 151 "- Background windows never brought to focus\n" 152 '- Anything you\'d describe as "had open" or "was visible"' 153 ) 154 155 IMPORT_GUIDANCE = { 156 "import.chatgpt": ( 157 "This is an AI conversation. Summarize the key topics discussed, " 158 "questions asked, solutions proposed, and decisions reached. " 159 "Focus on what the human was trying to accomplish and what they learned or decided." 160 ), 161 "import.claude": ( 162 "This is an AI conversation. Summarize the key topics discussed, " 163 "questions asked, solutions proposed, and decisions reached. " 164 "Focus on what the human was trying to accomplish and what they learned or decided." 165 ), 166 "import.gemini": ( 167 "This is an AI conversation. Summarize the key topics discussed, " 168 "questions asked, solutions proposed, and decisions reached. " 169 "Focus on what the human was trying to accomplish and what they learned or decided." 170 ), 171 "import.ics": ( 172 "This is a calendar event. Describe the event: its purpose, " 173 "participants, and any context from the description about why it was scheduled." 174 ), 175 "import.obsidian": ( 176 "This is a note. Summarize the key ideas, references, and connections. " 177 "What was the author thinking about and working through?" 178 ), 179 "import.document": ( 180 "This is an imported document (legal, financial, medical, or personal). " 181 "Extract all named parties and their roles (grantor, trustee, beneficiary, " 182 "attorney, witness, agent, etc.). Produce a plain-language summary that a " 183 "non-expert could understand. Identify key provisions, dates, conditions, " 184 "obligations, and deadlines. Note any time-sensitive requirements (renewal " 185 "dates, filing deadlines, review periods)." 186 ), 187 "import.kindle": ( 188 "These are reading highlights. Describe what was being read and what " 189 "the reader found noteworthy. What themes or ideas do these highlights capture?" 190 ), 191 } 192 193 if stream in IMPORT_GUIDANCE: 194 return f"## Content Guidance\n\n{IMPORT_GUIDANCE[stream]}" 195 196 if stream.startswith("import."): 197 return ( 198 "## Content Guidance\n\n" 199 "This is imported content. Summarize the key topics, actions, " 200 "and takeaways present in this segment." 201 ) 202 203 return "" 204 205 206def _build_prompt_context( 207 day: str | None, 208 segment: str | None, 209 span: list[str] | None, 210 activity: dict | None = None, 211) -> dict[str, str]: 212 """Build context dict for prompt template substitution. 213 214 Args: 215 day: Day in YYYYMMDD format 216 segment: Segment key (HHMMSS_LEN) 217 span: List of segment keys 218 activity: Optional activity record dict for activity-scheduled agents 219 220 Returns: 221 Dict with template variables: 222 - day: Friendly format (e.g., "Sunday, February 2, 2025") 223 - day_YYYYMMDD: Raw day string (e.g., "20250202") 224 - segment_start, segment_end: Time strings if segment/span provided 225 - stream, content_description: Stream name and human-readable description 226 - activity_*: Activity fields if activity record provided 227 """ 228 context: dict[str, str] = {} 229 if not day: 230 return context 231 232 context["day"] = format_day(day) 233 context["day_YYYYMMDD"] = day 234 235 # Stream-aware content description and import guidance 236 stream = os.environ.get("SOL_STREAM") 237 context["stream"] = stream or "archon" 238 context["content_description"] = _stream_content_description(stream) 239 context["import_guidance"] = _stream_import_guidance(stream) 240 241 if segment: 242 start_str, end_str = format_segment_times(segment) 243 if start_str and end_str: 244 context["segment"] = segment 245 context["segment_start"] = start_str 246 context["segment_end"] = end_str 247 elif span: 248 all_times = [] 249 for seg in span: 250 start_time, end_time = segment_parse(seg) 251 if start_time and end_time: 252 all_times.append((start_time, end_time)) 253 254 if all_times: 255 earliest_start = min(t[0] for t in all_times) 256 latest_end = max(t[1] for t in all_times) 257 context["segment_start"] = ( 258 datetime.combine(datetime.today(), earliest_start) 259 .strftime("%I:%M %p") 260 .lstrip("0") 261 ) 262 context["segment_end"] = ( 263 datetime.combine(datetime.today(), latest_end) 264 .strftime("%I:%M %p") 265 .lstrip("0") 266 ) 267 268 # Activity template variables 269 if activity: 270 from think.activities import estimate_duration_minutes 271 272 context["activity_id"] = activity.get("id", "") 273 context["activity_type"] = activity.get("activity", "") 274 context["activity_description"] = activity.get("description", "") 275 context["activity_level"] = str(activity.get("level_avg", 0.5)) 276 entities = activity.get("active_entities", []) 277 context["activity_entities"] = ", ".join(entities) if entities else "" 278 segments = activity.get("segments", []) 279 context["activity_segments"] = ", ".join(segments) if segments else "" 280 context["activity_duration"] = str(estimate_duration_minutes(segments)) 281 282 return context 283 284 285def _build_activity_context( 286 activity: dict, 287 span: list[str], 288 facet: str, 289 day: str, 290) -> str | None: 291 """Build activity context sections for $activity_context. 292 293 Args: 294 activity: Activity record dict (from activity records JSONL) 295 span: List of segment keys in the activity's span 296 facet: Facet name 297 day: Day in YYYYMMDD format 298 299 Returns: 300 Formatted string for the $activity_context template variable. 301 """ 302 activity_cfg = {"context": True, "state": True, "focus": True} 303 304 parts: list[str] = [] 305 activity_type = activity.get("activity", "unknown") 306 307 # --- activity.context: Activity metadata section --- 308 if activity_cfg.get("context"): 309 from think.activities import estimate_duration_minutes 310 311 level_avg = activity.get("level_avg", 0.5) 312 level_label = ( 313 "high" if level_avg >= 0.75 else "medium" if level_avg >= 0.4 else "low" 314 ) 315 segments = activity.get("segments", []) 316 duration = estimate_duration_minutes(segments) 317 entities = activity.get("active_entities", []) 318 entities_str = ", ".join(entities) if entities else "none detected" 319 320 parts.append( 321 f"## Activity Context\n" 322 f"- **Type:** {activity_type}\n" 323 f"- **Description:** {activity.get('description', '')}\n" 324 f"- **Engagement Level:** {level_avg} ({level_label})\n" 325 f"- **Duration:** ~{duration} minutes ({len(segments)} segments)\n" 326 f"- **Active Entities:** {entities_str}" 327 ) 328 329 # --- activity.state: Per-segment activity descriptions --- 330 if activity_cfg.get("state"): 331 from think.activities import load_segment_activity_state 332 333 state_lines: list[str] = [] 334 for seg in span: 335 entry = load_segment_activity_state(day, seg, facet, activity_type) 336 if entry: 337 level = entry.get("level", "") 338 desc = entry.get("description", "") 339 # Format segment time for readability 340 start_str, end_str = format_segment_times(seg) 341 time_label = ( 342 f" ({start_str} - {end_str})" if start_str and end_str else "" 343 ) 344 state_lines.append( 345 f"### {seg}{time_label}\n{activity_type} [{level}]: {desc}" 346 ) 347 348 if state_lines: 349 parts.append("## Activity State Per Segment\n\n" + "\n\n".join(state_lines)) 350 351 # --- activity.focus: Focusing instructions --- 352 if activity_cfg.get("focus"): 353 parts.append( 354 f"## Analysis Focus\n" 355 f"You are analyzing ONLY the **{activity_type}** activity within the " 356 f"**{facet}** facet. The transcript segments may contain content from " 357 f"other concurrent activities (e.g., background meetings, messaging). " 358 f"Use the Activity State Per Segment section above to identify which " 359 f"content relates to this activity, and ignore unrelated content. " 360 f"Your analysis should only cover what happened within this specific activity." 361 ) 362 363 if not parts: 364 return None 365 366 return "\n\n".join(parts) 367 368 369def _load_transcript( 370 day: str, 371 segment: str | None, 372 span: list[str] | None, 373 sources: dict, 374) -> tuple[str, dict[str, int]]: 375 """Load and cluster transcript for day/segment/span. 376 377 Args: 378 day: Day in YYYYMMDD format 379 segment: Optional segment key 380 span: Optional list of segment keys 381 sources: Source config dict from frontmatter load 382 383 Returns: 384 Tuple of (transcript text, source_counts dict) 385 """ 386 # Set segment key for token usage logging 387 if segment: 388 os.environ["SOL_SEGMENT"] = segment 389 elif span: 390 os.environ["SOL_SEGMENT"] = span[0] 391 392 # Convert sources config for clustering 393 cluster_sources: dict = {} 394 for k, v in sources.items(): 395 if k == "agents": 396 agent_filter = get_agent_filter(v) 397 if agent_filter is None: 398 cluster_sources[k] = source_is_enabled(v) 399 elif not agent_filter: 400 cluster_sources[k] = False 401 else: 402 cluster_sources[k] = agent_filter 403 else: 404 cluster_sources[k] = source_is_enabled(v) 405 406 # Build transcript via clustering 407 stream = os.environ.get("SOL_STREAM") 408 if span: 409 return cluster_span(day, span, sources=cluster_sources, stream=stream) 410 elif segment: 411 return cluster_period(day, segment, sources=cluster_sources, stream=stream) 412 else: 413 return cluster(day, sources=cluster_sources) 414 415 416def prepare_config(request: dict) -> dict: 417 """Prepare complete agent config from request. 418 419 Single unified preparation path for all agent types. Takes raw request 420 from cortex and returns fully prepared config ready for execution. 421 422 Config fields produced: 423 - name: Agent name 424 - provider, model: Resolved from context/request 425 - user_instruction: Agent instruction from .md file 426 - prompt: User's runtime query/request 427 - transcript: Clustered transcript (if day provided) 428 - output_path: Where to write output (if output format set) 429 - skip_reason: Why to skip (if applicable) 430 431 Args: 432 request: Raw request dict from cortex 433 434 Returns: 435 Fully prepared config dict 436 """ 437 from think.models import resolve_model_for_provider, resolve_provider 438 from think.talent import get_agent, key_to_context 439 440 name = request.get("name", "unified") 441 facet = request.get("facet") 442 day = request.get("day") 443 segment = request.get("segment") 444 span = request.get("span") 445 activity = request.get("activity") 446 output_format = request.get("output") 447 output_path_override = request.get("output_path") 448 user_prompt = request.get("prompt", "") 449 450 # Load complete agent config 451 config = get_agent(name, facet=facet, analysis_day=day) 452 453 # Config now contains all frontmatter fields plus: 454 # - path: Path to the .md file 455 # - sources: Source config for transcript loading 456 # - All frontmatter: tools, hook, disabled, thinking_budget, max_output_tokens, etc. 457 458 # Convert path string to Path object for convenience 459 agent_path = Path(config["path"]) if config.get("path") else None 460 sources = config.get("sources", {}) 461 462 # Merge request values (request overrides agent defaults) 463 config.update({k: v for k, v in request.items() if v is not None}) 464 465 # Populate stream from env if not already in config (dream passes it as 466 # SOL_STREAM env var but not as a top-level request key — hooks need it) 467 if "stream" not in config: 468 sol_stream = os.environ.get("SOL_STREAM") 469 if sol_stream: 470 config["stream"] = sol_stream 471 472 # Track additional state 473 config["span_mode"] = bool(span) 474 config["source_counts"] = {} 475 476 # Resolve provider and model from context 477 context = key_to_context(name) 478 agent_type = config["type"] 479 default_provider, default_model = resolve_provider(context, agent_type) 480 481 provider = config.get("provider") or default_provider 482 model = config.get("model") 483 if not model: 484 if provider != default_provider: 485 model = resolve_model_for_provider(context, provider, agent_type) 486 else: 487 model = default_model 488 489 config["provider"] = provider 490 config["model"] = model 491 config["context"] = context 492 493 # --- Provider fallback: preflight swap if primary is unhealthy --- 494 from think.models import ( 495 get_backup_provider, 496 is_provider_healthy, 497 load_health_status, 498 should_recheck_health, 499 ) 500 from think.providers import PROVIDER_METADATA 501 502 health_data = load_health_status() 503 config["health_stale"] = should_recheck_health(health_data) 504 505 if not is_provider_healthy(provider, health_data): 506 backup = get_backup_provider(agent_type) 507 if backup and backup != provider: 508 env_key = PROVIDER_METADATA.get(backup, {}).get("env_key") 509 if env_key and os.getenv(env_key): 510 config["fallback_from"] = provider 511 config["provider"] = backup 512 config["model"] = resolve_model_for_provider( 513 context, backup, agent_type 514 ) 515 516 # Check if disabled 517 if config.get("disabled"): 518 config["skip_reason"] = "disabled" 519 return config 520 521 # Day-based processing: load transcript and apply template substitution 522 if day: 523 # Load transcript (only when agent has enabled sources to consume) 524 if any(source_is_enabled(v) for v in sources.values()): 525 transcript, source_counts = _load_transcript(day, segment, span, sources) 526 config["transcript"] = transcript 527 config["source_counts"] = source_counts 528 total_count = sum(source_counts.values()) 529 530 # Check required sources 531 for source_type, mode in sources.items(): 532 if source_is_required(mode) and source_counts.get(source_type, 0) == 0: 533 config["skip_reason"] = f"missing_required_{source_type}" 534 return config 535 536 # Skip if no content 537 if total_count == 0 or len(transcript.strip()) < MIN_INPUT_CHARS: 538 config["skip_reason"] = "no_input" 539 return config 540 541 # Note for limited recordings 542 if total_count < 3: 543 config["transcript"] = ( 544 "**Input Note:** Limited recordings for this day. " 545 "Scale analysis to available input.\n\n" + transcript 546 ) 547 548 # Reload agent instruction with template substitution for day/segment context 549 if agent_path and agent_path.exists(): 550 from think.prompts import _resolve_facets 551 552 prompt_context = _build_prompt_context( 553 day, segment, span, activity=activity 554 ) 555 prompt_context["facets"] = _resolve_facets(facet) 556 557 if activity and span and facet: 558 activity_ctx = _build_activity_context(activity, span, facet, day) 559 if activity_ctx: 560 prompt_context["activity_context"] = activity_ctx 561 562 agent_prompt_obj = load_prompt( 563 agent_path.stem, base_dir=agent_path.parent, context=prompt_context 564 ) 565 config["user_instruction"] = agent_prompt_obj.text 566 567 # Set prompt (user's runtime query) 568 # For tool agents: prompt is the user's question 569 # For generators: prompt is typically empty (instruction is in user_instruction) 570 config["prompt"] = user_prompt 571 572 # Determine output path 573 if output_format: 574 if output_path_override: 575 config["output_path"] = Path(output_path_override) 576 elif day: 577 stream = os.environ.get("SOL_STREAM") 578 day_dir = str(day_path(day)) 579 config["output_path"] = get_output_path( 580 day_dir, 581 name, 582 segment=segment, 583 output_format=output_format, 584 facet=facet, 585 stream=stream, 586 ) 587 588 return config 589 590 591def validate_config(config: dict) -> str | None: 592 """Validate prepared config. 593 594 Args: 595 config: Prepared config dict 596 597 Returns: 598 Error message string if invalid, None if valid 599 """ 600 is_cogitate = config["type"] == "cogitate" 601 has_prompt = bool(config.get("prompt")) 602 has_user_instruction = bool(config.get("user_instruction")) 603 has_day = bool(config.get("day")) 604 605 # Cogitate agents need a prompt (user's question) 606 if is_cogitate and not has_prompt: 607 return "Missing 'prompt' field for cogitate agent" 608 609 # Generate prompts need either day (transcript) or user_instruction 610 if not is_cogitate and not has_day and not has_user_instruction and not has_prompt: 611 return "Invalid config: must have 'type', 'day', or 'prompt'" 612 613 # Segment/span requires day 614 if (config.get("segment") or config.get("span")) and not has_day: 615 return "Invalid config: 'segment' or 'span' requires 'day'" 616 617 return None 618 619 620# ============================================================================= 621# Hook Execution 622# ============================================================================= 623 624 625def _run_pre_hooks(config: dict) -> dict: 626 """Run pre-processing hooks, return dict of modifications. 627 628 Args: 629 config: Full config dict (hooks receive this directly) 630 631 Returns: 632 Dict of field modifications to apply to config 633 """ 634 pre_hook = load_pre_hook(config) 635 if not pre_hook: 636 return {} 637 638 try: 639 modifications = pre_hook(config) 640 if modifications: 641 LOG.info("Pre-hook returned modifications: %s", list(modifications.keys())) 642 return modifications 643 except Exception as exc: 644 LOG.error("Pre-hook failed: %s", exc) 645 646 return {} 647 648 649def _apply_template_vars(config: dict, template_vars: dict) -> None: 650 """Substitute template_vars into text fields of config in-place. 651 652 Expands each key with auto-capitalize convention (matching load_prompt): 653 {"foo": "bar"} -> $foo="bar", $Foo="Bar" 654 """ 655 expanded = {} 656 for key, value in template_vars.items(): 657 str_value = str(value) 658 expanded[key] = str_value 659 expanded[key.capitalize()] = str_value.capitalize() 660 661 for field in ("user_instruction", "transcript", "prompt"): 662 text = config.get(field) 663 if text: 664 config[field] = Template(text).safe_substitute(expanded) 665 666 667def _run_post_hooks(result: str, config: dict) -> str: 668 """Run post-processing hooks, return transformed result. 669 670 Args: 671 result: LLM output text 672 config: Full config dict (hooks receive this directly) 673 674 Returns: 675 Transformed result (or original if no hook) 676 """ 677 post_hook = load_post_hook(config) 678 if not post_hook: 679 return result 680 681 try: 682 hook_result = post_hook(result, config) 683 if hook_result is not None: 684 LOG.info("Post-hook transformed result") 685 return hook_result 686 except Exception as exc: 687 LOG.error("Post-hook failed: %s", exc) 688 689 return result 690 691 692# ============================================================================= 693# Unified Agent Execution 694# ============================================================================= 695 696 697def _write_output(output_path: Path, result: str) -> None: 698 """Write result to output file.""" 699 output_path.parent.mkdir(parents=True, exist_ok=True) 700 with open(output_path, "w", encoding="utf-8") as f: 701 f.write(result) 702 LOG.info("Wrote output to %s", output_path) 703 704 705def _build_dry_run_event(config: dict, before_values: dict) -> dict: 706 """Build a dry-run event with all context.""" 707 agent_type = config["type"] 708 709 event: dict[str, Any] = { 710 "event": "dry_run", 711 "ts": now_ms(), 712 "type": agent_type, 713 "name": config.get("name", "unified"), 714 "provider": config.get("provider", ""), 715 "model": config.get("model") or "unknown", 716 "system_instruction": config.get("system_instruction", ""), 717 "user_instruction": config.get("user_instruction", ""), 718 "prompt": config.get("prompt", ""), 719 } 720 721 extra_context = config.get("extra_context", "") 722 if extra_context: 723 event["extra_context"] = extra_context 724 725 # Day-based fields 726 if config.get("day"): 727 event["day"] = config["day"] 728 event["segment"] = config.get("segment") 729 transcript = config.get("transcript", "") 730 if transcript: 731 event["transcript"] = transcript 732 event["transcript_chars"] = len(transcript) 733 event["transcript_files"] = sum(config.get("source_counts", {}).values()) 734 output_path = Path(config["output_path"]) if config.get("output_path") else None 735 if output_path: 736 event["output_path"] = str(output_path) 737 738 # Show before values for comparison 739 for key, before_val in before_values.items(): 740 current_val = config.get(key, "") 741 if current_val != before_val: 742 if key == "transcript": 743 event["transcript_before_chars"] = len(before_val) 744 else: 745 event[f"{key}_before"] = before_val 746 747 return event 748 749 750_NON_RETRYABLE_ERRORS = ( 751 ValueError, 752 json.JSONDecodeError, 753 KeyError, 754 TypeError, 755 AttributeError, 756 FileNotFoundError, 757 PermissionError, 758 NotImplementedError, 759) 760 761 762def _is_retryable_error(exc: Exception) -> bool: 763 """Check if an exception is likely a provider error worth retrying. 764 765 Returns False for local/code errors (ValueError, KeyError, etc.). 766 Returns True for everything else (SDK connection, timeout, server errors). 767 """ 768 return not isinstance(exc, _NON_RETRYABLE_ERRORS) 769 770 771async def _execute_with_tools( 772 config: dict, 773 emit_event: Callable[[dict], None], 774) -> None: 775 """Execute tool-using agent via provider's run_cogitate. 776 777 Args: 778 config: Prepared config dict 779 emit_event: Event emission callback 780 """ 781 from .providers import PROVIDER_REGISTRY, get_provider_module 782 783 provider = config.get("provider", "google") 784 output_path = Path(config["output_path"]) if config.get("output_path") else None 785 786 if provider not in PROVIDER_REGISTRY: 787 valid = ", ".join(sorted(PROVIDER_REGISTRY.keys())) 788 raise ValueError(f"Unknown provider: {provider!r}. Valid providers: {valid}") 789 790 provider_mod = get_provider_module(provider) 791 792 # Wrapper to intercept finish event for post-processing 793 def agent_emit_event(data: Event) -> None: 794 if data.get("event") == "finish": 795 result = data.get("result", "") 796 result = _run_post_hooks(result, config) 797 if result != data.get("result", ""): 798 data = {**data, "result": result} 799 if output_path and result: 800 _write_output(output_path, result) 801 802 # Filter out start events from providers (we already emitted ours) 803 if data.get("event") == "start": 804 return 805 806 emit_event(data) 807 808 try: 809 await provider_mod.run_cogitate(config=config, on_event=agent_emit_event) 810 except Exception as exc: 811 if not _is_retryable_error(exc) or config.get("fallback_from"): 812 raise 813 from think.models import ( 814 get_backup_provider, 815 resolve_model_for_provider, 816 ) 817 from think.providers import PROVIDER_METADATA 818 819 backup = get_backup_provider("cogitate") 820 if not backup or backup == provider: 821 raise 822 env_key = PROVIDER_METADATA.get(backup, {}).get("env_key") 823 if not env_key or not os.getenv(env_key): 824 raise 825 826 context = config.get("context") 827 if not context: 828 from think.talent import key_to_context 829 830 context = key_to_context(config.get("name", "unified")) 831 backup_model = resolve_model_for_provider(context, backup, "cogitate") 832 833 emit_event( 834 { 835 "event": "fallback", 836 "ts": now_ms(), 837 "original_provider": provider, 838 "backup_provider": backup, 839 "reason": "on_failure", 840 "error": str(exc), 841 } 842 ) 843 844 config["fallback_from"] = provider 845 config["provider"] = backup 846 config["model"] = backup_model 847 848 backup_mod = get_provider_module(backup) 849 850 # Suppress error events from backup provider — if backup also fails 851 # we report the original error, not the backup's error. 852 def backup_emit(data: Event) -> None: 853 if data.get("event") == "error": 854 return 855 agent_emit_event(data) 856 857 try: 858 await backup_mod.run_cogitate(config=config, on_event=backup_emit) 859 except Exception: 860 # Ensure the original error is reported by the caller even if the 861 # primary provider already emitted its own error event (_evented). 862 if hasattr(exc, "_evented"): 863 delattr(exc, "_evented") 864 raise exc 865 finally: 866 if config.get("health_stale"): 867 from think.models import request_health_recheck 868 869 request_health_recheck() 870 config["health_stale"] = False 871 872 873async def _execute_generate( 874 config: dict, 875 emit_event: Callable[[dict], None], 876) -> None: 877 """Execute single-shot generation (no tools). 878 879 Args: 880 config: Prepared config dict 881 emit_event: Event emission callback 882 """ 883 from think.models import generate_with_result 884 from think.talent import key_to_context 885 886 name = config.get("name", "unified") 887 transcript = config.get("transcript", "") 888 user_instruction = config.get("user_instruction", "") 889 prompt = config.get("prompt", "") 890 system_instruction = config.get("system_instruction") or None 891 output_path = Path(config["output_path"]) if config.get("output_path") else None 892 output_format = config.get("output") 893 894 # Get generation parameters from config (set in frontmatter) 895 thinking_budget = config.get("thinking_budget") or 8192 * 3 896 max_output_tokens = config.get("max_output_tokens") or 8192 * 6 897 is_json_output = output_format == "json" 898 899 # Derive LLM request timeout from token budget: scale with output size, 900 # floor at 120s, cap at 480s (well under cortex's 600s subprocess kill). 901 timeout_s = config.get("timeout_s") or min( 902 480, max(120, (max_output_tokens + thinking_budget) // 100) 903 ) 904 905 # Build contents: transcript + instruction + prompt 906 contents = [] 907 if transcript: 908 contents.append(transcript) 909 if user_instruction: 910 contents.append(user_instruction) 911 if prompt: 912 contents.append(prompt) 913 914 # Fallback if no contents 915 if not contents: 916 contents = ["No input provided."] 917 918 context = key_to_context(name) 919 try: 920 gen_result = generate_with_result( 921 contents=contents, 922 context=context, 923 temperature=0.3, 924 max_output_tokens=max_output_tokens, 925 thinking_budget=thinking_budget, 926 system_instruction=system_instruction, 927 json_output=is_json_output, 928 timeout_s=timeout_s, 929 ) 930 except Exception as exc: 931 if not _is_retryable_error(exc) or config.get("fallback_from"): 932 raise 933 from think.models import ( 934 get_backup_provider, 935 resolve_model_for_provider, 936 ) 937 from think.providers import PROVIDER_METADATA 938 939 provider = config.get("provider", "google") 940 backup = get_backup_provider("generate") 941 if not backup or backup == provider: 942 raise 943 env_key = PROVIDER_METADATA.get(backup, {}).get("env_key") 944 if not env_key or not os.getenv(env_key): 945 raise 946 947 backup_model = resolve_model_for_provider(context, backup, "generate") 948 949 emit_event( 950 { 951 "event": "fallback", 952 "ts": now_ms(), 953 "original_provider": provider, 954 "backup_provider": backup, 955 "reason": "on_failure", 956 "error": str(exc), 957 } 958 ) 959 960 config["fallback_from"] = provider 961 config["provider"] = backup 962 config["model"] = backup_model 963 964 try: 965 gen_result = generate_with_result( 966 contents=contents, 967 context=context, 968 temperature=0.3, 969 max_output_tokens=max_output_tokens, 970 thinking_budget=thinking_budget, 971 system_instruction=system_instruction, 972 json_output=is_json_output, 973 timeout_s=timeout_s, 974 provider=backup, 975 model=backup_model, 976 ) 977 except Exception: 978 raise exc 979 finally: 980 if config.get("health_stale"): 981 from think.models import request_health_recheck 982 983 request_health_recheck() 984 config["health_stale"] = False 985 986 result = gen_result["text"] 987 usage_data = gen_result.get("usage") 988 989 # Run post-hooks 990 result = _run_post_hooks(result, config) 991 992 # Write output 993 if output_path and result: 994 _write_output(output_path, result) 995 996 # Emit finish event 997 finish_event: dict[str, Any] = { 998 "event": "finish", 999 "ts": now_ms(), 1000 "result": result, 1001 } 1002 if usage_data: 1003 finish_event["usage"] = usage_data 1004 emit_event(finish_event) 1005 1006 1007async def _run_agent( 1008 config: dict, 1009 emit_event: Callable[[dict], None], 1010 dry_run: bool = False, 1011) -> None: 1012 """Execute agent based on config. 1013 1014 Unified execution path for all agent types. Handles: 1015 - Skip conditions (disabled, no input, etc.) 1016 - Output existence checking (skip if exists unless refresh) 1017 - Pre/post hooks 1018 - Dry-run mode 1019 - Routing to tool or generate execution 1020 1021 Args: 1022 config: Fully prepared config dict 1023 emit_event: Callback to emit JSONL events 1024 dry_run: If True, emit dry_run event instead of calling LLM 1025 """ 1026 name = config.get("name", "unified") 1027 provider = config.get("provider", "google") 1028 model = config.get("model") 1029 is_cogitate = config["type"] == "cogitate" 1030 refresh = config.get("refresh", False) 1031 output_path = Path(config["output_path"]) if config.get("output_path") else None 1032 1033 # Emit start event 1034 start_event: dict[str, Any] = { 1035 "event": "start", 1036 "ts": now_ms(), 1037 "prompt": config.get("prompt", ""), 1038 "name": name, 1039 "model": model or "unknown", 1040 "provider": provider, 1041 } 1042 if config.get("session_id"): 1043 start_event["session_id"] = config["session_id"] 1044 if config.get("chat_id"): 1045 start_event["chat_id"] = config["chat_id"] 1046 emit_event(start_event) 1047 1048 # Emit preflight fallback event if provider was swapped 1049 if config.get("fallback_from"): 1050 emit_event( 1051 { 1052 "event": "fallback", 1053 "ts": now_ms(), 1054 "original_provider": config["fallback_from"], 1055 "backup_provider": config["provider"], 1056 "reason": "preflight", 1057 } 1058 ) 1059 1060 # Handle skip conditions 1061 skip_reason = config.get("skip_reason") 1062 if skip_reason: 1063 LOG.info("Config %s skipped: %s", name, skip_reason) 1064 emit_event( 1065 { 1066 "event": "finish", 1067 "ts": now_ms(), 1068 "result": "", 1069 "skipped": skip_reason, 1070 } 1071 ) 1072 if config.get("day"): 1073 day_log(config["day"], f"agent {name} skipped ({skip_reason})") 1074 return 1075 1076 # Check if output already exists (applies to both tool agents and generators) 1077 if output_path and not refresh and not dry_run: 1078 if output_path.exists() and output_path.stat().st_size > 0: 1079 LOG.info("Output exists, loading: %s", output_path) 1080 with open(output_path, "r") as f: 1081 result = f.read() 1082 emit_event( 1083 { 1084 "event": "finish", 1085 "ts": now_ms(), 1086 "result": result, 1087 } 1088 ) 1089 return 1090 1091 # Capture state before pre-hooks 1092 before_values = { 1093 "prompt": config.get("prompt", ""), 1094 "system_instruction": config.get("system_instruction", ""), 1095 "user_instruction": config.get("user_instruction", ""), 1096 "transcript": config.get("transcript", ""), 1097 } 1098 before_values["extra_context"] = config.get("extra_context", "") 1099 1100 # Run pre-hooks 1101 modifications = _run_pre_hooks(config) 1102 template_vars = modifications.pop("template_vars", None) 1103 for key, value in modifications.items(): 1104 config[key] = value 1105 if template_vars: 1106 LOG.info("Pre-hook template_vars: %s", list(template_vars.keys())) 1107 _apply_template_vars(config, template_vars) 1108 1109 # Handle skip conditions set by pre-hooks 1110 skip_reason = config.get("skip_reason") 1111 if skip_reason: 1112 LOG.info("Config %s skipped by pre-hook: %s", name, skip_reason) 1113 emit_event( 1114 { 1115 "event": "finish", 1116 "ts": now_ms(), 1117 "result": "", 1118 "skipped": skip_reason, 1119 } 1120 ) 1121 if config.get("day"): 1122 day_log(config["day"], f"agent {name} skipped ({skip_reason})") 1123 return 1124 1125 # Dry-run mode 1126 if dry_run: 1127 emit_event(_build_dry_run_event(config, before_values)) 1128 return 1129 1130 # Execute based on agent type 1131 if is_cogitate: 1132 await _execute_with_tools(config, emit_event) 1133 else: 1134 await _execute_generate(config, emit_event) 1135 1136 # Log completion 1137 if config.get("day"): 1138 day_log(config["day"], f"agent {name} ok") 1139 1140 1141# ============================================================================= 1142# Utility Functions 1143# ============================================================================= 1144 1145 1146def scan_day(day: str) -> dict[str, list[str]]: 1147 """Return lists of processed and pending daily generator output files. 1148 1149 Only scans daily generators (schedule='daily'). Segment generators are 1150 stored within segment directories and are not included here. 1151 """ 1152 day_dir = day_path(day) 1153 daily_generators = get_talent_configs( 1154 type="generate", schedule="daily", include_disabled=True 1155 ) 1156 processed: list[str] = [] 1157 pending: list[str] = [] 1158 for key, meta in sorted(daily_generators.items()): 1159 output_format = meta.get("output") 1160 output_file = get_output_path(day_dir, key, output_format=output_format) 1161 if output_file.exists(): 1162 processed.append(os.path.join("agents", output_file.name)) 1163 else: 1164 pending.append(os.path.join("agents", output_file.name)) 1165 return {"processed": sorted(processed), "repairable": sorted(pending)} 1166 1167 1168def _check_generate(provider_name: str, tier: int, timeout: int) -> tuple[bool, str]: 1169 """Check generate interface for a provider.""" 1170 from think.models import PROVIDER_DEFAULTS 1171 from think.providers import PROVIDER_METADATA, get_provider_module 1172 1173 env_key = PROVIDER_METADATA[provider_name]["env_key"] 1174 if not os.getenv(env_key): 1175 return False, f"FAIL: {env_key} not set" 1176 1177 try: 1178 module = get_provider_module(provider_name) 1179 model = PROVIDER_DEFAULTS[provider_name][tier] 1180 result = module.run_generate( 1181 contents="Say OK", 1182 model=model, 1183 temperature=0, 1184 max_output_tokens=16, 1185 system_instruction=None, 1186 json_output=False, 1187 thinking_budget=None, 1188 timeout_s=timeout, 1189 ) 1190 text = result.get("text", "") if isinstance(result, dict) else "" 1191 if text: 1192 usage = result.get("usage") if isinstance(result, dict) else None 1193 if usage: 1194 from think.models import log_token_usage 1195 1196 log_token_usage( 1197 model=PROVIDER_DEFAULTS[provider_name][tier], 1198 usage=usage, 1199 context="health.check.generate", 1200 type="generate", 1201 ) 1202 return True, "OK" 1203 return False, "FAIL: empty response text" 1204 except Exception as exc: 1205 return False, f"FAIL: {exc}" 1206 1207 1208async def _check_cogitate( 1209 provider_name: str, tier: int, timeout: int 1210) -> tuple[bool, str]: 1211 """Check cogitate interface for a provider by running a real prompt.""" 1212 from think.models import PROVIDER_DEFAULTS 1213 from think.providers import get_provider_module 1214 1215 try: 1216 module = get_provider_module(provider_name) 1217 model = PROVIDER_DEFAULTS[provider_name][tier] 1218 config = {"prompt": "Say OK", "model": model} 1219 result = await asyncio.wait_for( 1220 module.run_cogitate(config=config, on_event=None), 1221 timeout=timeout, 1222 ) 1223 if result: 1224 return True, "OK" 1225 return False, "FAIL: empty response" 1226 except asyncio.TimeoutError: 1227 return False, f"FAIL: timed out after {timeout}s" 1228 except Exception as exc: 1229 return False, f"FAIL: {exc}" 1230 1231 1232async def _run_check(args: argparse.Namespace) -> None: 1233 """Run connectivity checks against AI providers.""" 1234 from think.models import PROVIDER_DEFAULTS, TIER_FLASH, TIER_LITE, TIER_PRO 1235 from think.providers import PROVIDER_REGISTRY 1236 1237 # --targeted: only check configured provider+tier pairs 1238 targeted_pairs = None 1239 if args.targeted and not args.provider and not args.tier: 1240 import fcntl 1241 1242 from think.models import TYPE_DEFAULTS, get_backup_provider 1243 from think.utils import get_config 1244 1245 targeted_pairs = set() 1246 config = get_config() 1247 providers_config = config.get("providers", {}) 1248 for agent_type, defaults in TYPE_DEFAULTS.items(): 1249 type_config = providers_config.get(agent_type, {}) 1250 provider = type_config.get("provider", defaults["provider"]) 1251 tier = type_config.get("tier", defaults["tier"]) 1252 targeted_pairs.add((provider, tier)) 1253 backup = get_backup_provider(agent_type) 1254 if backup: 1255 targeted_pairs.add((backup, tier)) 1256 1257 # flock dedup: only one targeted check runs at a time 1258 lock_dir = Path(get_journal()) / "health" 1259 lock_dir.mkdir(parents=True, exist_ok=True) 1260 lock_fd = open(lock_dir / "recheck.lock", "w") 1261 try: 1262 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 1263 except OSError: 1264 lock_fd.close() 1265 return 1266 1267 if args.provider: 1268 providers = args.provider 1269 for name in providers: 1270 if name not in PROVIDER_REGISTRY: 1271 available = ", ".join(PROVIDER_REGISTRY.keys()) 1272 print( 1273 f"Unknown provider: {name}. Available providers: {available}", 1274 file=sys.stderr, 1275 ) 1276 sys.exit(1) 1277 else: 1278 providers = list(PROVIDER_REGISTRY.keys()) 1279 1280 interfaces = [args.interface] if args.interface else ["generate", "cogitate"] 1281 1282 tier_names = {1: "pro", 2: "flash", 3: "lite"} 1283 tiers = [args.tier] if args.tier else [TIER_PRO, TIER_FLASH, TIER_LITE] 1284 1285 # Pre-compute column widths 1286 provider_width = max(len(n) for n in providers) if providers else 0 1287 tier_width = max(len(tier_names[t]) for t in tiers) 1288 # Resolve all model names to get max width 1289 model_names = set() 1290 for p in providers: 1291 for t in tiers: 1292 model_names.add(PROVIDER_DEFAULTS[p][t]) 1293 model_width = max(len(m) for m in model_names) if model_names else 0 1294 interface_width = max(len(n) for n in interfaces) if interfaces else 0 1295 1296 total = 0 1297 passed = 0 1298 failed = 0 1299 results = [] 1300 cache = {} # (provider, model, interface) -> (ok, message, source_tier) 1301 1302 for provider_name in providers: 1303 for tier in tiers: 1304 if ( 1305 targeted_pairs is not None 1306 and (provider_name, tier) not in targeted_pairs 1307 ): 1308 continue 1309 model = PROVIDER_DEFAULTS[provider_name][tier] 1310 for interface_name in interfaces: 1311 cache_key = (provider_name, model, interface_name) 1312 if cache_key in cache: 1313 ok, message, source_tier = cache[cache_key] 1314 elapsed_s = 0.0 1315 elapsed_s_rounded = 0.0 1316 reused_from = source_tier 1317 else: 1318 start = time.perf_counter() 1319 if interface_name == "generate": 1320 ok, message = _check_generate(provider_name, tier, args.timeout) 1321 else: 1322 ok, message = await _check_cogitate( 1323 provider_name, tier, args.timeout 1324 ) 1325 elapsed_s = time.perf_counter() - start 1326 elapsed_s_rounded = round(elapsed_s, 1) 1327 cache[cache_key] = (ok, message, tier_names[tier]) 1328 reused_from = None 1329 1330 result = { 1331 "provider": provider_name, 1332 "tier": tier_names[tier], 1333 "model": model, 1334 "interface": interface_name, 1335 "ok": bool(ok), 1336 "message": str(message), 1337 "elapsed_s": elapsed_s_rounded, 1338 } 1339 if reused_from: 1340 result["reused_from"] = reused_from 1341 results.append(result) 1342 1343 if not args.json: 1344 if reused_from: 1345 mark = "=" 1346 display_message = f"{message} (={reused_from})" 1347 else: 1348 mark = "" if ok else "" 1349 display_message = str(message) 1350 print( 1351 f"{mark} " 1352 f"{provider_name:<{provider_width}} " 1353 f"{tier_names[tier]:<{tier_width}} " 1354 f"{model:<{model_width}} " 1355 f"{interface_name:<{interface_width}} " 1356 f"{display_message} ({elapsed_s:.1f}s)" 1357 ) 1358 1359 total += 1 1360 if ok: 1361 passed += 1 1362 else: 1363 failed += 1 1364 1365 # Determine if any provider is fully failed (all checks failed) 1366 provider_failed = False 1367 providers_seen: dict[str, list[bool]] = {} 1368 for r in results: 1369 providers_seen.setdefault(r["provider"], []).append(r["ok"]) 1370 for ok_values in providers_seen.values(): 1371 if ok_values and not any(ok_values): 1372 provider_failed = True 1373 break 1374 1375 # Write results to health file 1376 payload = { 1377 "results": results, 1378 "summary": {"total": total, "passed": passed, "failed": failed}, 1379 "checked_at": datetime.now(timezone.utc).isoformat(), 1380 } 1381 health_dir = Path(get_journal()) / "health" 1382 health_dir.mkdir(parents=True, exist_ok=True) 1383 (health_dir / "agents.json").write_text(json.dumps(payload, indent=2)) 1384 1385 if args.json: 1386 print( 1387 json.dumps( 1388 { 1389 "results": results, 1390 "summary": {"total": total, "passed": passed, "failed": failed}, 1391 }, 1392 indent=2, 1393 ) 1394 ) 1395 else: 1396 print(f"{total} checks: {passed} passed, {failed} failed") 1397 sys.exit(1 if provider_failed else 0) 1398 1399 1400# ============================================================================= 1401# Main Entry Point 1402# ============================================================================= 1403 1404 1405async def main_async() -> None: 1406 """NDJSON-based CLI for agents.""" 1407 from think.providers import PROVIDER_REGISTRY 1408 1409 parser = argparse.ArgumentParser( 1410 description="solstone Agent CLI - Accepts NDJSON input via stdin" 1411 ) 1412 parser.add_argument( 1413 "--dry-run", 1414 action="store_true", 1415 help="Show what would be sent to the provider without calling the LLM", 1416 ) 1417 subparsers = parser.add_subparsers(dest="subcommand") 1418 check_parser = subparsers.add_parser("check", help="Check AI provider connectivity") 1419 check_parser.add_argument( 1420 "--provider", 1421 action="append", 1422 help=f"Provider to check (repeatable). Available: {', '.join(PROVIDER_REGISTRY.keys())}", 1423 ) 1424 check_parser.add_argument( 1425 "--interface", 1426 choices=["generate", "cogitate"], 1427 default=None, 1428 help="Interface to check (default: both)", 1429 ) 1430 check_parser.add_argument( 1431 "--timeout", 1432 type=int, 1433 default=30, 1434 help="Timeout in seconds for generate checks (default: 30)", 1435 ) 1436 check_parser.add_argument( 1437 "--tier", 1438 type=int, 1439 choices=[1, 2, 3], 1440 default=None, 1441 help="Tier to check (1=pro, 2=flash, 3=lite; default: all)", 1442 ) 1443 check_parser.add_argument( 1444 "--json", action="store_true", help="Output results as JSON" 1445 ) 1446 check_parser.add_argument( 1447 "--targeted", 1448 action="store_true", 1449 help="Only check configured provider+tier pairs (used by automated rechecks)", 1450 ) 1451 1452 args = setup_cli(parser) 1453 if args.subcommand == "check": 1454 await _run_check(args) 1455 return 1456 1457 dry_run = args.dry_run 1458 1459 app_logger = setup_logging(args.verbose) 1460 event_writer = JSONEventWriter(None) 1461 1462 def emit_event(data: Event) -> None: 1463 if "ts" not in data: 1464 data["ts"] = now_ms() 1465 event_writer.emit(data) 1466 1467 try: 1468 app_logger.info("Processing NDJSON input from stdin") 1469 for line in sys.stdin: 1470 line = line.strip() 1471 if not line: 1472 continue 1473 1474 try: 1475 request = json.loads(line) 1476 config = prepare_config(request) 1477 1478 error = validate_config(config) 1479 if error: 1480 emit_event({"event": "error", "error": error, "ts": now_ms()}) 1481 continue 1482 1483 await _run_agent(config, emit_event, dry_run=dry_run) 1484 1485 except json.JSONDecodeError as e: 1486 emit_event( 1487 { 1488 "event": "error", 1489 "error": f"Invalid JSON: {str(e)}", 1490 "ts": now_ms(), 1491 } 1492 ) 1493 except Exception as e: 1494 if getattr(e, "_evented", False): 1495 continue 1496 from think.models import IncompleteJSONError 1497 1498 event = { 1499 "event": "error", 1500 "error": str(e), 1501 "trace": traceback.format_exc(), 1502 "ts": now_ms(), 1503 } 1504 if isinstance(e, IncompleteJSONError): 1505 from think.hooks import log_extraction_failure 1506 1507 event["partial_text_length"] = len(e.partial_text) 1508 event["partial_text_tail"] = e.partial_text[-500:] 1509 name = config.get("name", "unknown") if config else "unknown" 1510 log_extraction_failure(e, name) 1511 emit_event(event) 1512 1513 except Exception as exc: 1514 err = { 1515 "event": "error", 1516 "error": str(exc), 1517 "trace": traceback.format_exc(), 1518 } 1519 if not getattr(exc, "_evented", False): 1520 emit_event(err) 1521 raise 1522 finally: 1523 event_writer.close() 1524 1525 1526def main() -> None: 1527 """Entry point wrapper.""" 1528 asyncio.run(main_async()) 1529 1530 1531__all__ = [ 1532 "prepare_config", 1533 "validate_config", 1534 "scan_day", 1535]