personal memory agent
at main 1241 lines 40 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""CLI for inspecting talent prompt configurations. 5 6Lists all system and app prompts with their frontmatter metadata, 7supports filtering by schedule and source, and provides detail views. 8 9Usage: 10 sol talent List all prompts grouped by schedule 11 sol talent list --schedule daily Filter by schedule type 12 sol talent list --json Output all configs as JSONL 13 sol talent show <name> Show details for a specific prompt 14 sol talent show <name> --json Output a single prompt as JSONL 15 sol talent show <name> --prompt Show full prompt context (dry-run) 16 sol talent logs Show recent agent runs 17 sol talent logs <agent> -c 5 Show last 5 runs for an agent 18 sol talent log <id> Show events for an agent run 19 sol talent log <id> --json Output raw JSONL events 20 sol talent log <id> --full Show expanded event details 21""" 22 23from __future__ import annotations 24 25import argparse 26import json 27import re 28import subprocess 29import sys 30import time 31from collections import deque 32from datetime import datetime, timedelta 33from pathlib import Path 34from typing import Any 35 36import frontmatter 37 38from think.talent import ( 39 TALENT_DIR, 40 _load_prompt_metadata, 41 get_talent_configs, 42) 43from think.utils import setup_cli 44 45# Project root for computing relative paths 46_PROJECT_ROOT = Path(__file__).parent.parent 47 48# Internal bookkeeping keys to exclude from JSONL output 49_INTERNAL_KEYS = frozenset({"path", "mtime"}) 50 51 52def _relative_path(abs_path: str) -> str: 53 """Convert absolute path to project-relative path.""" 54 try: 55 return str(Path(abs_path).relative_to(_PROJECT_ROOT)) 56 except ValueError: 57 return abs_path 58 59 60def _resolve_md_path(name: str) -> Path: 61 """Resolve a prompt name to its .md file path.""" 62 if ":" in name: 63 app, agent_name = name.split(":", 1) 64 return _PROJECT_ROOT / "apps" / app / "talent" / f"{agent_name}.md" 65 return TALENT_DIR / f"{name}.md" 66 67 68def _scan_variables(body: str) -> list[str]: 69 """Scan prompt body text for $template variables.""" 70 # Match $word or ${word} but not $$ (escaped dollar signs) 71 matches = re.findall(r"(?<!\$)\$\{?([a-zA-Z_]\w*)\}?", body) 72 # Deduplicate preserving order 73 seen: set[str] = set() 74 result: list[str] = [] 75 for m in matches: 76 if m not in seen: 77 seen.add(m) 78 result.append(m) 79 return result 80 81 82def _format_last_run(key: str, agents_dir: Path) -> tuple[str, bool]: 83 """Format age of last run with optional runtime duration. 84 85 Returns (display_string, failed) where failed is True if the last 86 event in the log was an error. 87 """ 88 safe_name = key.replace(":", "--") 89 link_path = agents_dir / f"{safe_name}.log" 90 if not link_path.exists(): 91 return "-", False 92 93 try: 94 with link_path.open() as f: 95 first_line = f.readline() 96 last_line = next(iter(deque(f, maxlen=1)), None) 97 98 first_event = json.loads(first_line) 99 first_ts = first_event["ts"] 100 age_seconds = time.time() - (first_ts / 1000) 101 102 if age_seconds < 60: 103 age = f"{int(age_seconds)}s ago" 104 elif age_seconds < 3600: 105 age = f"{int(age_seconds / 60)}m ago" 106 elif age_seconds < 86400: 107 age = f"{int(age_seconds / 3600)}h ago" 108 else: 109 age = f"{int(age_seconds / 86400)}d ago" 110 111 failed = False 112 if last_line: 113 last_event = json.loads(last_line) 114 failed = last_event.get("event") == "error" 115 last_ts = last_event["ts"] 116 duration_seconds = (last_ts - first_ts) / 1000 117 if duration_seconds < 60: 118 duration = f"{int(duration_seconds)}s" 119 elif duration_seconds < 3600: 120 duration = f"{int(duration_seconds / 60)}m" 121 else: 122 duration = f"{int(duration_seconds / 3600)}h" 123 age = f"{age} ({duration})" 124 125 return age, failed 126 except Exception: 127 return "-", False 128 129 130def _format_tags(info: dict[str, Any], *, failed: bool = False) -> str: 131 """Build compact space-separated tags string.""" 132 tags: list[str] = [] 133 134 output = info.get("output") 135 if output == "json": 136 tags.append("json") 137 elif output: 138 tags.append("md") 139 140 hook = info.get("hook") 141 if hook: 142 if isinstance(hook, dict): 143 if hook.get("pre"): 144 tags.append("pre") 145 if hook.get("post"): 146 tags.append("post") 147 else: 148 tags.append("hook") 149 150 if info.get("disabled"): 151 tags.append("disabled") 152 153 if failed: 154 tags.append("FAIL") 155 156 return " ".join(tags) 157 158 159def _collect_configs( 160 *, 161 schedule: str | None = None, 162 source: str | None = None, 163 include_disabled: bool = False, 164) -> dict[str, dict[str, Any]]: 165 """Collect all talent configs with optional filters applied.""" 166 configs = get_talent_configs(schedule=schedule, include_disabled=True) 167 168 filtered: dict[str, dict[str, Any]] = {} 169 for key, info in configs.items(): 170 if not include_disabled and info.get("disabled", False): 171 continue 172 if source and info.get("source") != source: 173 continue 174 filtered[key] = info 175 176 return filtered 177 178 179def _to_jsonl_record(key: str, info: dict[str, Any]) -> dict[str, Any]: 180 """Build a clean JSONL record from a config entry.""" 181 record: dict[str, Any] = {"file": _relative_path(str(info["path"]))} 182 for k, v in info.items(): 183 if k not in _INTERNAL_KEYS: 184 record[k] = v 185 return record 186 187 188def list_prompts( 189 *, 190 schedule: str | None = None, 191 source: str | None = None, 192 include_disabled: bool = False, 193) -> None: 194 """Print prompts grouped by schedule.""" 195 configs = _collect_configs( 196 schedule=schedule, source=source, include_disabled=include_disabled 197 ) 198 from think.utils import get_journal 199 200 agents_dir = Path(get_journal()) / "agents" 201 202 if not configs: 203 print("No prompts found matching filters.") 204 return 205 206 # Group by schedule 207 groups: dict[str, list[tuple[str, dict[str, Any]]]] = { 208 "segment": [], 209 "daily": [], 210 "weekly": [], 211 "activity": [], 212 "unscheduled": [], 213 } 214 215 for key, info in sorted(configs.items()): 216 sched = info.get("schedule") 217 if sched in ("segment", "daily", "weekly", "activity"): 218 groups[sched].append((key, info)) 219 else: 220 groups["unscheduled"].append((key, info)) 221 222 # Compute column widths 223 all_names = list(configs.keys()) 224 name_width = max(len(n) for n in all_names) if all_names else 20 225 name_width = max(name_width, 10) 226 227 # Fixed widths for other columns 228 title_width = 28 229 last_run_width = 18 230 231 # Print column header 232 header = ( 233 f" {'NAME':<{name_width}} {'TITLE':<{title_width}} " 234 f"{'LAST RUN':<{last_run_width}} TAGS" 235 ) 236 print(header) 237 print() 238 239 # Print each non-empty group 240 for group_name in ("segment", "daily", "activity", "unscheduled"): 241 items = groups[group_name] 242 if not items: 243 continue 244 245 # Skip group header if filtering to a single schedule 246 if not schedule: 247 print(f"{group_name}:") 248 249 for key, info in items: 250 title = info.get("title", "")[:title_width] 251 last_run_str, failed = _format_last_run(key, agents_dir) 252 last_run = last_run_str[:last_run_width] 253 tags = _format_tags(info, failed=failed) 254 src = "" 255 if info.get("source") == "app": 256 src = f" [{info.get('app', 'app')}]" 257 258 tag_part = f" {tags}" if tags else "" 259 line = ( 260 f" {key:<{name_width}} {title:<{title_width}} " 261 f"{last_run:<{last_run_width}}{tag_part}{src}" 262 ) 263 print(line.rstrip()) 264 265 if not schedule: 266 print() 267 268 # Show disabled count hint 269 if not include_disabled: 270 all_configs = _collect_configs( 271 schedule=schedule, source=source, include_disabled=True 272 ) 273 disabled_count = len(all_configs) - len(configs) 274 if disabled_count: 275 print( 276 f"{len(configs)} prompts ({disabled_count} disabled hidden, use --disabled)" 277 ) 278 279 280def show_prompt(name: str, *, as_json: bool = False) -> None: 281 """Print detailed info for a single prompt.""" 282 md_path = _resolve_md_path(name) 283 284 if not md_path.exists(): 285 print(f"Prompt not found: {name}", file=sys.stderr) 286 print(f" looked at: {_relative_path(str(md_path))}", file=sys.stderr) 287 sys.exit(1) 288 289 info = _load_prompt_metadata(md_path) 290 rel_path = _relative_path(str(md_path)) 291 292 # Load body once for variables and line count 293 try: 294 post = frontmatter.load(md_path) 295 body = post.content.strip() 296 except Exception: 297 body = None 298 299 if as_json: 300 record = _to_jsonl_record(name, info) 301 print(json.dumps(record, default=str)) 302 return 303 304 print(f"\n{rel_path}\n") 305 306 # Display frontmatter fields 307 # Order: title, description, key config fields, then alphabetical for the rest 308 priority_keys = [ 309 "title", 310 "description", 311 "schedule", 312 "priority", 313 "output", 314 "tools", 315 "hook", 316 "color", 317 ] 318 skip_keys = {"path", "mtime"} 319 320 label_width = 14 321 322 def print_field(key: str, value: Any) -> None: 323 if key in skip_keys: 324 return 325 val_str = str(value) 326 # Truncate long descriptions for readability 327 if key == "description" and len(val_str) > 72: 328 val_str = val_str[:72] + "..." 329 # Format hook config nicely 330 if key == "hook" and isinstance(value, dict): 331 post_hook = value.get("post", "") 332 if post_hook: 333 val_str = f"post: {post_hook}" 334 print(f" {key + ':':<{label_width}} {val_str}") 335 336 printed: set[str] = set() 337 for key in priority_keys: 338 if key in info and key not in skip_keys: 339 print_field(key, info[key]) 340 printed.add(key) 341 342 # Remaining fields alphabetically 343 for key in sorted(info.keys()): 344 if key not in printed and key not in skip_keys: 345 print_field(key, info[key]) 346 347 # Template variables and body line count from single parse 348 if body is not None: 349 variables = _scan_variables(body) 350 if variables: 351 vars_str = ", ".join(f"${v}" for v in variables) 352 print(f" {'variables:':<{label_width}} {vars_str}") 353 354 line_count = len(body.splitlines()) 355 print(f" {'body:':<{label_width}} {line_count} lines") 356 357 print() 358 359 360def json_output( 361 *, 362 schedule: str | None = None, 363 source: str | None = None, 364 include_disabled: bool = False, 365) -> None: 366 """Print JSONL output with one config per line, including filename.""" 367 configs = _collect_configs( 368 schedule=schedule, source=source, include_disabled=include_disabled 369 ) 370 371 for key, info in sorted(configs.items()): 372 print(json.dumps(_to_jsonl_record(key, info), default=str)) 373 374 375def _truncate_content(text: str, max_lines: int = 100) -> tuple[str, int]: 376 """Truncate text to max_lines, returning (text, omitted_count).""" 377 lines = text.splitlines() 378 if len(lines) <= max_lines: 379 return text, 0 380 # Show first half and last half 381 half = max_lines // 2 382 truncated = ( 383 lines[:half] 384 + ["", f"... ({len(lines) - max_lines} lines omitted)"] 385 + lines[-half:] 386 ) 387 return "\n".join(truncated), len(lines) - max_lines 388 389 390def _format_section(title: str, content: str, full: bool = False) -> None: 391 """Print a section with header and content.""" 392 print(f"\n{'=' * 60}") 393 print(f" {title}") 394 print(f"{'=' * 60}\n") 395 if not content or not content.strip(): 396 print("(empty)") 397 elif full: 398 print(content) 399 else: 400 truncated, omitted = _truncate_content(content) 401 print(truncated) 402 if omitted: 403 print(f"\n(use --full to see all {omitted + 100} lines)") 404 405 406def _yesterday() -> str: 407 """Return yesterday's date in YYYYMMDD format.""" 408 return (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") 409 410 411def show_prompt_context( 412 name: str, 413 *, 414 day: str | None = None, 415 segment: str | None = None, 416 facet: str | None = None, 417 activity: str | None = None, 418 query: str | None = None, 419 full: bool = False, 420) -> None: 421 """Show full prompt context via dry-run. 422 423 Builds config and pipes to `sol agents --dry-run` to show exactly 424 what would be sent to the LLM provider. 425 """ 426 # Load prompt metadata 427 configs = get_talent_configs(include_disabled=True) 428 if name not in configs: 429 print(f"Prompt not found: {name}", file=sys.stderr) 430 sys.exit(1) 431 432 info = configs[name] 433 prompt_type = info.get("type", "prompt") 434 schedule = info.get("schedule") 435 is_multi_facet = info.get("multi_facet", False) 436 437 # Validate day format if provided 438 if day and (len(day) != 8 or not day.isdigit()): 439 print(f"Invalid --day format: {day}. Expected YYYYMMDD.", file=sys.stderr) 440 sys.exit(1) 441 442 # Validate arguments based on type and schedule 443 if prompt_type == "generate": 444 # Generators need day, and segment-scheduled need segment 445 if schedule == "segment" and not segment: 446 print( 447 f"Prompt '{name}' is segment-scheduled. Use --segment HHMMSS_LEN", 448 file=sys.stderr, 449 ) 450 sys.exit(1) 451 if not day: 452 day = _yesterday() 453 print(f"Using day: {day} (yesterday)") 454 elif prompt_type == "prompt": 455 print( 456 f"Prompt '{name}' is a hook prompt and cannot be run directly.", 457 file=sys.stderr, 458 ) 459 sys.exit(1) 460 461 # Activity-scheduled agents need --facet and --activity 462 if schedule == "activity": 463 if not facet: 464 try: 465 from think.facets import get_facets 466 467 facets = get_facets() 468 facet_names = [ 469 k for k, v in facets.items() if not v.get("muted", False) 470 ] 471 print( 472 f"Prompt '{name}' is activity-scheduled. Use --facet NAME", 473 file=sys.stderr, 474 ) 475 print(f"Available facets: {', '.join(facet_names)}", file=sys.stderr) 476 except Exception: 477 print( 478 f"Prompt '{name}' is activity-scheduled. Use --facet NAME", 479 file=sys.stderr, 480 ) 481 sys.exit(1) 482 483 if not day: 484 day = _yesterday() 485 print(f"Using day: {day} (yesterday)") 486 487 if not activity: 488 from think.activities import load_activity_records 489 490 records = load_activity_records(facet, day) 491 if not records: 492 print( 493 f"No activity records for facet '{facet}' on {day}", 494 file=sys.stderr, 495 ) 496 sys.exit(1) 497 print( 498 f"Prompt '{name}' is activity-scheduled. Use --activity ID", 499 file=sys.stderr, 500 ) 501 print(f"Activities for {facet} on {day}:", file=sys.stderr) 502 for r in records: 503 desc = r.get("description", "") 504 if len(desc) > 50: 505 desc = desc[:50] + "..." 506 print( 507 f" {r['id']} ({r.get('activity', '?')}) {desc}", file=sys.stderr 508 ) 509 sys.exit(1) 510 511 if is_multi_facet and not facet: 512 # List available facets 513 try: 514 from think.facets import get_facets 515 516 facets = get_facets() 517 facet_names = [k for k, v in facets.items() if not v.get("muted", False)] 518 print( 519 f"Prompt '{name}' is multi-facet. Use --facet NAME", 520 file=sys.stderr, 521 ) 522 print(f"Available facets: {', '.join(facet_names)}", file=sys.stderr) 523 except Exception: 524 print( 525 f"Prompt '{name}' is multi-facet. Use --facet NAME", 526 file=sys.stderr, 527 ) 528 sys.exit(1) 529 530 # Build config for dry-run 531 config: dict[str, Any] = {"name": name} 532 533 if schedule == "activity": 534 # Build activity config matching dream.py:run_activity_prompts() 535 from think.activities import get_activity_output_path, load_activity_records 536 537 records = load_activity_records(facet, day) 538 record = None 539 for r in records: 540 if r.get("id") == activity: 541 record = r 542 break 543 544 if not record: 545 print( 546 f"Activity '{activity}' not found in facet '{facet}' on {day}", 547 file=sys.stderr, 548 ) 549 sys.exit(1) 550 551 segments = record.get("segments", []) 552 if not segments: 553 print(f"Activity '{activity}' has no segments", file=sys.stderr) 554 sys.exit(1) 555 556 output_format = info.get("output", "md") 557 config["day"] = day 558 config["facet"] = facet 559 config["span"] = segments 560 config["activity"] = record 561 config["output"] = output_format 562 config["output_path"] = str( 563 get_activity_output_path(facet, day, activity, name, output_format) 564 ) 565 elif prompt_type == "generate": 566 config["day"] = day 567 config["output"] = info.get("output", "md") 568 if segment: 569 config["segment"] = segment 570 if facet: 571 config["facet"] = facet 572 else: 573 # Cogitate prompt - use get_agent() to build full config with instructions 574 from think.talent import get_agent 575 576 try: 577 agent_config = get_agent(name, facet=facet) 578 config.update(agent_config) 579 except Exception as e: 580 print(f"Failed to load agent config: {e}", file=sys.stderr) 581 sys.exit(1) 582 583 # Override prompt with user query 584 if query: 585 config["prompt"] = query 586 else: 587 config["prompt"] = "(no --query provided)" 588 589 # Run sol agents --dry-run 590 config_json = json.dumps(config) 591 try: 592 result = subprocess.run( 593 ["sol", "agents", "--dry-run"], 594 input=config_json + "\n", 595 capture_output=True, 596 text=True, 597 timeout=30, 598 ) 599 except subprocess.TimeoutExpired: 600 print("Dry-run timed out", file=sys.stderr) 601 sys.exit(1) 602 except FileNotFoundError: 603 print("Could not find 'sol' command", file=sys.stderr) 604 sys.exit(1) 605 606 if result.returncode != 0: 607 print(f"Dry-run failed: {result.stderr}", file=sys.stderr) 608 sys.exit(1) 609 610 # Parse JSONL output to find dry_run event 611 dry_run_event = None 612 for line in result.stdout.strip().splitlines(): 613 if not line: 614 continue 615 try: 616 event = json.loads(line) 617 if event.get("event") == "dry_run": 618 dry_run_event = event 619 break 620 elif event.get("event") == "error": 621 print(f"Error: {event.get('error')}", file=sys.stderr) 622 sys.exit(1) 623 except json.JSONDecodeError: 624 continue 625 626 if not dry_run_event: 627 print("No dry_run event received", file=sys.stderr) 628 if result.stderr: 629 print(result.stderr, file=sys.stderr) 630 sys.exit(1) 631 632 # Format and display output 633 print(f"\n Dry-run for: {name} ({dry_run_event.get('type', 'unknown')})") 634 print(f" Provider: {dry_run_event.get('provider')} / {dry_run_event.get('model')}") 635 if dry_run_event.get("day"): 636 print(f" Day: {dry_run_event.get('day')}") 637 if dry_run_event.get("segment"): 638 print(f" Segment: {dry_run_event.get('segment')}") 639 if activity: 640 act_type = config.get("activity", {}).get("activity", "unknown") 641 span = config.get("span", []) 642 print(f" Activity: {activity} ({act_type}, {len(span)} segments)") 643 print(f" Facet: {facet}") 644 if dry_run_event.get("output_path"): 645 print(f" Output: {dry_run_event.get('output_path')}") 646 647 # Pre-hook info 648 if dry_run_event.get("pre_hook"): 649 mods = dry_run_event.get("pre_hook_modifications", []) 650 print( 651 f" Pre-hook: {dry_run_event.get('pre_hook')} (modified: {', '.join(mods) or 'none'})" 652 ) 653 654 # System instruction (show before first if pre-hook modified it) 655 if dry_run_event.get("system_instruction_before"): 656 _format_section( 657 "SYSTEM INSTRUCTION (before pre-hook)", 658 dry_run_event.get("system_instruction_before", ""), 659 full=full, 660 ) 661 _format_section( 662 f"SYSTEM INSTRUCTION (source: {dry_run_event.get('system_instruction_source', 'unknown')})", 663 dry_run_event.get("system_instruction", ""), 664 full=full, 665 ) 666 667 # User instruction (agents only, show before first if pre-hook modified it) 668 if dry_run_event.get("user_instruction"): 669 if dry_run_event.get("user_instruction_before"): 670 _format_section( 671 "USER INSTRUCTION (before pre-hook)", 672 dry_run_event.get("user_instruction_before", ""), 673 full=full, 674 ) 675 _format_section( 676 "USER INSTRUCTION", dry_run_event.get("user_instruction", ""), full=full 677 ) 678 679 # Extra context (agents only) 680 if dry_run_event.get("extra_context"): 681 _format_section( 682 "EXTRA CONTEXT", dry_run_event.get("extra_context", ""), full=full 683 ) 684 685 # Prompt (show before first if pre-hook modified it) 686 prompt_source = dry_run_event.get("prompt_source", "") 687 if prompt_source: 688 prompt_source = f" (source: {_relative_path(prompt_source)})" 689 if dry_run_event.get("prompt_before"): 690 _format_section( 691 "PROMPT (before pre-hook)", 692 dry_run_event.get("prompt_before", ""), 693 full=full, 694 ) 695 _format_section( 696 f"PROMPT{prompt_source}", dry_run_event.get("prompt", ""), full=full 697 ) 698 699 # Transcript (generators only, show before first if pre-hook modified it) 700 if "transcript" in dry_run_event: 701 chars = dry_run_event.get("transcript_chars", 0) 702 files = dry_run_event.get("transcript_files", 0) 703 if dry_run_event.get("transcript_before"): 704 before_chars = dry_run_event.get("transcript_before_chars", 0) 705 _format_section( 706 f"TRANSCRIPT (before pre-hook, {before_chars:,} chars)", 707 dry_run_event.get("transcript_before", ""), 708 full=full, 709 ) 710 _format_section( 711 f"TRANSCRIPT ({chars:,} chars from {files} files)", 712 dry_run_event.get("transcript", ""), 713 full=full, 714 ) 715 716 # Tools (agents only) 717 if dry_run_event.get("tools"): 718 tools = dry_run_event.get("tools", []) 719 if isinstance(tools, list): 720 tools_str = ", ".join(tools) 721 else: 722 tools_str = str(tools) 723 print(f"\n{'=' * 60}") 724 print(" TOOLS") 725 print(f"{'=' * 60}\n") 726 print(tools_str) 727 728 print() 729 730 731def _find_run_file(agents_dir: Path, agent_id: str) -> Path | None: 732 """Locate an agent run JSONL file by ID.""" 733 for match in agents_dir.glob(f"*/{agent_id}.jsonl"): 734 return match 735 for match in agents_dir.glob(f"*/{agent_id}_active.jsonl"): 736 return match 737 return None 738 739 740def _parse_run_stats(jsonl_path: Path) -> dict[str, Any]: 741 """Parse an agent JSONL file for summary statistics.""" 742 stats: dict[str, Any] = { 743 "event_count": 0, 744 "tool_count": 0, 745 "model": None, 746 "usage": None, 747 "request": None, 748 } 749 for line in jsonl_path.read_text().splitlines(): 750 if not line.strip(): 751 continue 752 try: 753 event = json.loads(line) 754 except json.JSONDecodeError: 755 continue 756 etype = event.get("event") 757 if etype == "request": 758 stats["request"] = event 759 continue 760 stats["event_count"] += 1 761 if etype == "tool_start": 762 stats["tool_count"] += 1 763 elif etype == "start": 764 stats["model"] = event.get("model") 765 elif etype == "finish": 766 stats["usage"] = event.get("usage") 767 return stats 768 769 770def _format_bytes(n: int) -> str: 771 """Format byte count as human-readable string.""" 772 if n < 1000: 773 return str(n) 774 elif n < 1_000_000: 775 return f"{n / 1000:.1f}K" 776 else: 777 return f"{n / 1_000_000:.1f}M" 778 779 780def _format_cost(cost_usd: float | None) -> str: 781 """Format USD cost as rounded cents.""" 782 if cost_usd is None: 783 return "-" 784 cents = round(cost_usd * 100) 785 if cents == 0 and cost_usd > 0: 786 return "<1¢" 787 return f"{cents}¢" 788 789 790def _get_output_size(request_event: dict[str, Any], journal_root: str) -> int | None: 791 """Get output file size in bytes from a request event, or None.""" 792 from think.talent import get_output_path 793 794 req_output = request_event.get("output") 795 if not req_output: 796 return None 797 798 # Prefer explicit output_path (set for activity agents, custom paths) 799 if request_event.get("output_path"): 800 out_path = Path(request_event["output_path"]) 801 else: 802 req_day = request_event.get("day") 803 if not req_day: 804 return None 805 req_segment = request_event.get("segment") 806 req_facet = request_event.get("facet") 807 req_name = request_event.get("name", "unified") 808 req_env = request_event.get("env") or {} 809 req_stream = req_env.get("SOL_STREAM") if req_env else None 810 day_dir = Path(journal_root) / req_day 811 out_path = get_output_path( 812 day_dir, 813 req_name, 814 segment=req_segment, 815 output_format=req_output, 816 facet=req_facet, 817 stream=req_stream, 818 ) 819 820 if out_path.exists(): 821 return out_path.stat().st_size 822 return None 823 824 825def _print_summary(records: list[dict[str, Any]]) -> None: 826 """Print grouped summary of agent runs.""" 827 from collections import defaultdict 828 829 groups: dict[str, list[dict[str, Any]]] = defaultdict(list) 830 for r in records: 831 groups[r.get("name", "unknown")].append(r) 832 833 total_pass = 0 834 total_fail = 0 835 total_runtime = 0.0 836 837 for name in sorted(groups): 838 runs = groups[name] 839 passed = sum(1 for r in runs if r.get("status") == "completed") 840 failed = len(runs) - passed 841 runtimes = [r.get("runtime_seconds") or 0 for r in runs] 842 min_rt = min(runtimes) 843 max_rt = max(runtimes) 844 total_rt = sum(runtimes) 845 846 total_pass += passed 847 total_fail += failed 848 total_runtime += total_rt 849 850 if min_rt == max_rt: 851 rt_str = f"{min_rt:.1f}s" 852 else: 853 rt_str = f"{min_rt:.1f}s–{max_rt:.1f}s" 854 855 status_str = f"{passed}" 856 if failed: 857 status_str += f" {failed}" 858 859 print(f" {name:<20} {status_str:<10} {rt_str}") 860 861 print(f" {'' * 40}") 862 status_str = f"{total_pass}" 863 if total_fail: 864 status_str += f" {total_fail}" 865 print(f" {'total':<20} {status_str:<10} {total_runtime:.1f}s") 866 867 868def logs_runs( 869 *, 870 agent: str | None = None, 871 count: int | None = None, 872 day: str | None = None, 873 daily: bool = False, 874 errors: bool = False, 875 summary: bool = False, 876) -> None: 877 """Print one-line summaries of recent agent runs from day-index files.""" 878 from think.models import calc_agent_cost 879 from think.utils import get_journal 880 881 journal_root = get_journal() 882 agents_dir = Path(journal_root) / "agents" 883 if not agents_dir.is_dir(): 884 return 885 886 # Validate --day format 887 if day and (len(day) != 8 or not day.isdigit()): 888 print(f"Invalid --day format: {day}. Expected YYYYMMDD.", file=sys.stderr) 889 sys.exit(1) 890 891 # Resolve default count: 50 for --daily, 20 otherwise 892 if count is None: 893 count = 50 if daily else 20 894 895 # Find day-index files, most recent first 896 if day: 897 day_file = agents_dir / f"{day}.jsonl" 898 day_files = [day_file] if day_file.is_file() else [] 899 else: 900 day_files = sorted(agents_dir.glob("????????.jsonl"), reverse=True) 901 if not day_files: 902 return 903 904 # Collect records across day files 905 records: list[dict[str, Any]] = [] 906 _schedule_lookup: dict[str, str | None] | None = None 907 for day_file in day_files: 908 for line in day_file.read_text().splitlines(): 909 if not line.strip(): 910 continue 911 try: 912 record = json.loads(line) 913 except json.JSONDecodeError: 914 continue 915 if agent and record.get("name") != agent: 916 continue 917 if errors and record.get("status") != "error": 918 continue 919 if daily: 920 rec_schedule = record.get("schedule") 921 if rec_schedule is None: 922 if _schedule_lookup is None: 923 all_configs = get_talent_configs(include_disabled=True) 924 _schedule_lookup = { 925 key: info.get("schedule") 926 for key, info in all_configs.items() 927 } 928 rec_schedule = _schedule_lookup.get(record.get("name")) 929 if rec_schedule != "daily": 930 continue 931 records.append(record) 932 if len(records) >= count: 933 break 934 935 if not records: 936 return 937 938 # Sort by timestamp descending and trim 939 records.sort(key=lambda r: r.get("ts", 0), reverse=True) 940 records = records[:count] 941 942 if summary: 943 _print_summary(records) 944 return 945 946 # Compute column widths 947 name_width = max((len(r.get("name", "")) for r in records), default=10) 948 name_width = max(name_width, 10) 949 950 for r in records: 951 agent_id = r.get("agent_id") 952 run_file = ( 953 _find_run_file(agents_dir, agent_id) if isinstance(agent_id, str) else None 954 ) 955 stats: dict[str, Any] = { 956 "event_count": 0, 957 "tool_count": 0, 958 "model": None, 959 "usage": None, 960 "request": None, 961 } 962 cost_usd: float | None = None 963 output_size: int | None = None 964 if run_file: 965 stats = _parse_run_stats(run_file) 966 cost_usd = calc_agent_cost(stats["model"] or r.get("model"), stats["usage"]) 967 request_event = stats.get("request") 968 if isinstance(request_event, dict): 969 output_size = _get_output_size(request_event, journal_root) 970 r["_run_file"] = run_file 971 r["_stats"] = stats 972 r["_cost_usd"] = cost_usd 973 r["_output_size"] = output_size 974 975 today = datetime.now().strftime("%Y%m%d") 976 use_color = sys.stdout.isatty() 977 978 for r in records: 979 run_file = r.get("_run_file") 980 stats = r.get("_stats") or {} 981 cost_usd = r.get("_cost_usd") 982 output_size = r.get("_output_size") 983 agent_id = r.get("agent_id", "") 984 985 ts = r.get("ts", 0) 986 dt = datetime.fromtimestamp(ts / 1000) 987 day = r.get("day", dt.strftime("%Y%m%d")) 988 989 # Time column 990 if day == today: 991 time_str = dt.strftime("%H:%M") 992 else: 993 time_str = dt.strftime("%b %d %H:%M") 994 995 name = r.get("name", "unknown") 996 status = r.get("status", "") 997 status_sym = "\u2713" if status == "completed" else "\u2717" 998 runtime = r.get("runtime_seconds") or 0 999 1000 # Format runtime 1001 if runtime < 60: 1002 runtime_str = f"{runtime:.1f}s" 1003 else: 1004 mins = int(runtime // 60) 1005 secs = int(runtime % 60) 1006 runtime_str = f"{mins}m {secs:02d}s" 1007 1008 model = r.get("model", "") 1009 facet = r.get("facet") or "" 1010 cost_str = _format_cost(cost_usd) if run_file else "-" 1011 events_str = str(stats["event_count"]) if run_file else "-" 1012 tools_str = str(stats["tool_count"]) if run_file else "-" 1013 output_str = _format_bytes(output_size) if output_size is not None else "-" 1014 1015 facet_part = f" {facet}" if facet else "" 1016 line = ( 1017 f"{agent_id:<15}{time_str:>12} {name:<{name_width}} {status_sym} " 1018 f"{runtime_str:>7} {cost_str:>4} {events_str:>3} {tools_str:>3} " 1019 f"{output_str:>5} {model}{facet_part}" 1020 ) 1021 1022 if use_color and status != "completed": 1023 line = f"\033[31m{line}\033[0m" 1024 1025 print(line) 1026 1027 1028def _event_detail(event: dict[str, Any], etype: str) -> str: 1029 """Extract detail string for an event.""" 1030 if etype == "request": 1031 return event.get("prompt", "") or "" 1032 elif etype == "start": 1033 model = event.get("model", "") 1034 prompt = event.get("prompt", "") 1035 return f'{model} "{prompt}"' 1036 elif etype == "thinking": 1037 return event.get("summary") or event.get("content") or "" 1038 elif etype == "tool_start": 1039 tool = event.get("tool", "") 1040 args = event.get("args") 1041 if isinstance(args, dict): 1042 parts = [f"{k}={json.dumps(v)}" for k, v in args.items()] 1043 return f"{tool}({', '.join(parts)})" 1044 return tool 1045 elif etype == "tool_end": 1046 tool = event.get("tool", "") 1047 result = event.get("result", "") 1048 return f"{tool}{result}" 1049 elif etype == "agent_updated": 1050 return event.get("agent", "") 1051 elif etype == "finish": 1052 result = event.get("result", "") 1053 usage = event.get("usage") 1054 if usage: 1055 inp = usage.get("input_tokens", 0) 1056 out = usage.get("output_tokens", 0) 1057 return f"{result} [{inp}in/{out}out]" 1058 return result 1059 elif etype == "error": 1060 return event.get("error", "") 1061 return "" 1062 1063 1064def _format_event_line(event: dict[str, Any], *, full: bool = False) -> str: 1065 """Format a single JSONL event as a one-line summary.""" 1066 ts = event.get("ts", 0) 1067 dt = datetime.fromtimestamp(ts / 1000) 1068 time_str = dt.strftime("%H:%M:%S") + f".{ts % 1000:03d}" 1069 1070 etype = event.get("event", "?") 1071 label_map = { 1072 "request": "request", 1073 "start": "start", 1074 "thinking": "think", 1075 "tool_start": "tool", 1076 "tool_end": "tool_end", 1077 "agent_updated": "updated", 1078 "finish": "finish", 1079 "error": "error", 1080 } 1081 label = label_map.get(etype, etype) 1082 1083 detail = _event_detail(event, etype) 1084 1085 if full: 1086 detail = detail.replace("\n", "\\n") 1087 else: 1088 detail = detail.replace("\n", " ") 1089 max_detail = 100 - 24 1090 if len(detail) > max_detail: 1091 detail = detail[: max_detail - 1] + "" 1092 1093 return f"{time_str} {label:<8} {detail}" 1094 1095 1096def log_run(agent_id: str, *, json_mode: bool = False, full: bool = False) -> None: 1097 """Show events for a single agent run.""" 1098 from think.utils import get_journal 1099 1100 agents_dir = Path(get_journal()) / "agents" 1101 run_file = _find_run_file(agents_dir, agent_id) 1102 if run_file is None: 1103 print(f"Agent run not found: {agent_id}", file=sys.stderr) 1104 sys.exit(1) 1105 1106 if json_mode: 1107 print(run_file.read_text(), end="") 1108 return 1109 1110 for line in run_file.read_text().splitlines(): 1111 if not line.strip(): 1112 continue 1113 try: 1114 event = json.loads(line) 1115 except json.JSONDecodeError: 1116 continue 1117 print(_format_event_line(event, full=full)) 1118 1119 1120def main() -> None: 1121 """Entry point for sol talent.""" 1122 parser = argparse.ArgumentParser(description="Inspect talent prompt configurations") 1123 subparsers = parser.add_subparsers(dest="subcommand") 1124 1125 # --- list subcommand --- 1126 list_parser = subparsers.add_parser("list", help="List prompts grouped by schedule") 1127 list_parser.add_argument( 1128 "--schedule", 1129 choices=["daily", "segment", "activity"], 1130 help="Filter by schedule type", 1131 ) 1132 list_parser.add_argument( 1133 "--source", choices=["system", "app"], help="Filter by origin" 1134 ) 1135 list_parser.add_argument( 1136 "--disabled", action="store_true", help="Include disabled prompts" 1137 ) 1138 list_parser.add_argument("--json", action="store_true", help="Output as JSONL") 1139 1140 # --- show subcommand --- 1141 show_parser = subparsers.add_parser( 1142 "show", help="Show details for a specific prompt" 1143 ) 1144 show_parser.add_argument("name", help="Prompt name") 1145 show_parser.add_argument("--json", action="store_true", help="Output as JSONL") 1146 show_parser.add_argument( 1147 "--prompt", action="store_true", help="Show full prompt context (dry-run mode)" 1148 ) 1149 show_parser.add_argument("--day", metavar="YYYYMMDD", help="Day for prompt context") 1150 show_parser.add_argument( 1151 "--segment", metavar="HHMMSS_LEN", help="Segment for segment-scheduled prompts" 1152 ) 1153 show_parser.add_argument( 1154 "--facet", metavar="NAME", help="Facet for multi-facet prompts" 1155 ) 1156 show_parser.add_argument( 1157 "--activity", metavar="ID", help="Activity ID for activity-scheduled prompts" 1158 ) 1159 show_parser.add_argument( 1160 "--query", metavar="TEXT", help="Sample query for tool agents" 1161 ) 1162 show_parser.add_argument( 1163 "--full", action="store_true", help="Show full content without truncation" 1164 ) 1165 1166 # --- logs subcommand --- 1167 logs_parser = subparsers.add_parser("logs", help="Show recent agent run log") 1168 logs_parser.add_argument("agent", nargs="?", help="Filter to a specific agent") 1169 logs_parser.add_argument( 1170 "-c", 1171 "--count", 1172 type=int, 1173 default=None, 1174 help="Number of runs to show (default: 20)", 1175 ) 1176 logs_parser.add_argument( 1177 "--day", metavar="YYYYMMDD", help="Show only runs from this day" 1178 ) 1179 logs_parser.add_argument( 1180 "--daily", action="store_true", help="Show only daily-scheduled runs" 1181 ) 1182 logs_parser.add_argument( 1183 "--errors", action="store_true", help="Show only error runs" 1184 ) 1185 logs_parser.add_argument( 1186 "--summary", action="store_true", help="Show grouped summary" 1187 ) 1188 1189 # --- log subcommand --- 1190 log_parser = subparsers.add_parser("log", help="Show events for an agent run") 1191 log_parser.add_argument("id", help="Agent ID") 1192 log_parser.add_argument( 1193 "--json", action="store_true", dest="json_mode", help="Output raw JSONL" 1194 ) 1195 log_parser.add_argument("--full", action="store_true", help="Expand event details") 1196 1197 args = setup_cli(parser) 1198 1199 if args.subcommand == "show": 1200 if args.prompt: 1201 show_prompt_context( 1202 args.name, 1203 day=args.day, 1204 segment=args.segment, 1205 facet=args.facet, 1206 activity=args.activity, 1207 query=args.query, 1208 full=args.full, 1209 ) 1210 else: 1211 show_prompt(args.name, as_json=args.json) 1212 elif args.subcommand == "logs": 1213 logs_runs( 1214 agent=args.agent, 1215 count=args.count, 1216 day=args.day, 1217 daily=args.daily, 1218 errors=args.errors, 1219 summary=args.summary, 1220 ) 1221 elif args.subcommand == "log": 1222 log_run(args.id, json_mode=args.json_mode, full=args.full) 1223 elif args.subcommand == "list" and args.json: 1224 json_output( 1225 schedule=args.schedule, 1226 source=args.source, 1227 include_disabled=args.disabled, 1228 ) 1229 elif args.subcommand == "list": 1230 list_prompts( 1231 schedule=args.schedule, 1232 source=args.source, 1233 include_disabled=args.disabled, 1234 ) 1235 else: 1236 # Default: no subcommand given -> list all prompts 1237 list_prompts() 1238 1239 1240if __name__ == "__main__": 1241 main()