personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""CLI for inspecting talent prompt configurations.
5
6Lists all system and app prompts with their frontmatter metadata,
7supports filtering by schedule and source, and provides detail views.
8
9Usage:
10 sol talent List all prompts grouped by schedule
11 sol talent list --schedule daily Filter by schedule type
12 sol talent list --json Output all configs as JSONL
13 sol talent show <name> Show details for a specific prompt
14 sol talent show <name> --json Output a single prompt as JSONL
15 sol talent show <name> --prompt Show full prompt context (dry-run)
16 sol talent logs Show recent agent runs
17 sol talent logs <agent> -c 5 Show last 5 runs for an agent
18 sol talent log <id> Show events for an agent run
19 sol talent log <id> --json Output raw JSONL events
20 sol talent log <id> --full Show expanded event details
21"""
22
23from __future__ import annotations
24
25import argparse
26import json
27import re
28import subprocess
29import sys
30import time
31from collections import deque
32from datetime import datetime, timedelta
33from pathlib import Path
34from typing import Any
35
36import frontmatter
37
38from think.talent import (
39 TALENT_DIR,
40 _load_prompt_metadata,
41 get_talent_configs,
42)
43from think.utils import setup_cli
44
45# Project root for computing relative paths
46_PROJECT_ROOT = Path(__file__).parent.parent
47
48# Internal bookkeeping keys to exclude from JSONL output
49_INTERNAL_KEYS = frozenset({"path", "mtime"})
50
51
52def _relative_path(abs_path: str) -> str:
53 """Convert absolute path to project-relative path."""
54 try:
55 return str(Path(abs_path).relative_to(_PROJECT_ROOT))
56 except ValueError:
57 return abs_path
58
59
60def _resolve_md_path(name: str) -> Path:
61 """Resolve a prompt name to its .md file path."""
62 if ":" in name:
63 app, agent_name = name.split(":", 1)
64 return _PROJECT_ROOT / "apps" / app / "talent" / f"{agent_name}.md"
65 return TALENT_DIR / f"{name}.md"
66
67
68def _scan_variables(body: str) -> list[str]:
69 """Scan prompt body text for $template variables."""
70 # Match $word or ${word} but not $$ (escaped dollar signs)
71 matches = re.findall(r"(?<!\$)\$\{?([a-zA-Z_]\w*)\}?", body)
72 # Deduplicate preserving order
73 seen: set[str] = set()
74 result: list[str] = []
75 for m in matches:
76 if m not in seen:
77 seen.add(m)
78 result.append(m)
79 return result
80
81
82def _format_last_run(key: str, agents_dir: Path) -> tuple[str, bool]:
83 """Format age of last run with optional runtime duration.
84
85 Returns (display_string, failed) where failed is True if the last
86 event in the log was an error.
87 """
88 safe_name = key.replace(":", "--")
89 link_path = agents_dir / f"{safe_name}.log"
90 if not link_path.exists():
91 return "-", False
92
93 try:
94 with link_path.open() as f:
95 first_line = f.readline()
96 last_line = next(iter(deque(f, maxlen=1)), None)
97
98 first_event = json.loads(first_line)
99 first_ts = first_event["ts"]
100 age_seconds = time.time() - (first_ts / 1000)
101
102 if age_seconds < 60:
103 age = f"{int(age_seconds)}s ago"
104 elif age_seconds < 3600:
105 age = f"{int(age_seconds / 60)}m ago"
106 elif age_seconds < 86400:
107 age = f"{int(age_seconds / 3600)}h ago"
108 else:
109 age = f"{int(age_seconds / 86400)}d ago"
110
111 failed = False
112 if last_line:
113 last_event = json.loads(last_line)
114 failed = last_event.get("event") == "error"
115 last_ts = last_event["ts"]
116 duration_seconds = (last_ts - first_ts) / 1000
117 if duration_seconds < 60:
118 duration = f"{int(duration_seconds)}s"
119 elif duration_seconds < 3600:
120 duration = f"{int(duration_seconds / 60)}m"
121 else:
122 duration = f"{int(duration_seconds / 3600)}h"
123 age = f"{age} ({duration})"
124
125 return age, failed
126 except Exception:
127 return "-", False
128
129
130def _format_tags(info: dict[str, Any], *, failed: bool = False) -> str:
131 """Build compact space-separated tags string."""
132 tags: list[str] = []
133
134 output = info.get("output")
135 if output == "json":
136 tags.append("json")
137 elif output:
138 tags.append("md")
139
140 hook = info.get("hook")
141 if hook:
142 if isinstance(hook, dict):
143 if hook.get("pre"):
144 tags.append("pre")
145 if hook.get("post"):
146 tags.append("post")
147 else:
148 tags.append("hook")
149
150 if info.get("disabled"):
151 tags.append("disabled")
152
153 if failed:
154 tags.append("FAIL")
155
156 return " ".join(tags)
157
158
159def _collect_configs(
160 *,
161 schedule: str | None = None,
162 source: str | None = None,
163 include_disabled: bool = False,
164) -> dict[str, dict[str, Any]]:
165 """Collect all talent configs with optional filters applied."""
166 configs = get_talent_configs(schedule=schedule, include_disabled=True)
167
168 filtered: dict[str, dict[str, Any]] = {}
169 for key, info in configs.items():
170 if not include_disabled and info.get("disabled", False):
171 continue
172 if source and info.get("source") != source:
173 continue
174 filtered[key] = info
175
176 return filtered
177
178
179def _to_jsonl_record(key: str, info: dict[str, Any]) -> dict[str, Any]:
180 """Build a clean JSONL record from a config entry."""
181 record: dict[str, Any] = {"file": _relative_path(str(info["path"]))}
182 for k, v in info.items():
183 if k not in _INTERNAL_KEYS:
184 record[k] = v
185 return record
186
187
188def list_prompts(
189 *,
190 schedule: str | None = None,
191 source: str | None = None,
192 include_disabled: bool = False,
193) -> None:
194 """Print prompts grouped by schedule."""
195 configs = _collect_configs(
196 schedule=schedule, source=source, include_disabled=include_disabled
197 )
198 from think.utils import get_journal
199
200 agents_dir = Path(get_journal()) / "agents"
201
202 if not configs:
203 print("No prompts found matching filters.")
204 return
205
206 # Group by schedule
207 groups: dict[str, list[tuple[str, dict[str, Any]]]] = {
208 "segment": [],
209 "daily": [],
210 "weekly": [],
211 "activity": [],
212 "unscheduled": [],
213 }
214
215 for key, info in sorted(configs.items()):
216 sched = info.get("schedule")
217 if sched in ("segment", "daily", "weekly", "activity"):
218 groups[sched].append((key, info))
219 else:
220 groups["unscheduled"].append((key, info))
221
222 # Compute column widths
223 all_names = list(configs.keys())
224 name_width = max(len(n) for n in all_names) if all_names else 20
225 name_width = max(name_width, 10)
226
227 # Fixed widths for other columns
228 title_width = 28
229 last_run_width = 18
230
231 # Print column header
232 header = (
233 f" {'NAME':<{name_width}} {'TITLE':<{title_width}} "
234 f"{'LAST RUN':<{last_run_width}} TAGS"
235 )
236 print(header)
237 print()
238
239 # Print each non-empty group
240 for group_name in ("segment", "daily", "activity", "unscheduled"):
241 items = groups[group_name]
242 if not items:
243 continue
244
245 # Skip group header if filtering to a single schedule
246 if not schedule:
247 print(f"{group_name}:")
248
249 for key, info in items:
250 title = info.get("title", "")[:title_width]
251 last_run_str, failed = _format_last_run(key, agents_dir)
252 last_run = last_run_str[:last_run_width]
253 tags = _format_tags(info, failed=failed)
254 src = ""
255 if info.get("source") == "app":
256 src = f" [{info.get('app', 'app')}]"
257
258 tag_part = f" {tags}" if tags else ""
259 line = (
260 f" {key:<{name_width}} {title:<{title_width}} "
261 f"{last_run:<{last_run_width}}{tag_part}{src}"
262 )
263 print(line.rstrip())
264
265 if not schedule:
266 print()
267
268 # Show disabled count hint
269 if not include_disabled:
270 all_configs = _collect_configs(
271 schedule=schedule, source=source, include_disabled=True
272 )
273 disabled_count = len(all_configs) - len(configs)
274 if disabled_count:
275 print(
276 f"{len(configs)} prompts ({disabled_count} disabled hidden, use --disabled)"
277 )
278
279
280def show_prompt(name: str, *, as_json: bool = False) -> None:
281 """Print detailed info for a single prompt."""
282 md_path = _resolve_md_path(name)
283
284 if not md_path.exists():
285 print(f"Prompt not found: {name}", file=sys.stderr)
286 print(f" looked at: {_relative_path(str(md_path))}", file=sys.stderr)
287 sys.exit(1)
288
289 info = _load_prompt_metadata(md_path)
290 rel_path = _relative_path(str(md_path))
291
292 # Load body once for variables and line count
293 try:
294 post = frontmatter.load(md_path)
295 body = post.content.strip()
296 except Exception:
297 body = None
298
299 if as_json:
300 record = _to_jsonl_record(name, info)
301 print(json.dumps(record, default=str))
302 return
303
304 print(f"\n{rel_path}\n")
305
306 # Display frontmatter fields
307 # Order: title, description, key config fields, then alphabetical for the rest
308 priority_keys = [
309 "title",
310 "description",
311 "schedule",
312 "priority",
313 "output",
314 "tools",
315 "hook",
316 "color",
317 ]
318 skip_keys = {"path", "mtime"}
319
320 label_width = 14
321
322 def print_field(key: str, value: Any) -> None:
323 if key in skip_keys:
324 return
325 val_str = str(value)
326 # Truncate long descriptions for readability
327 if key == "description" and len(val_str) > 72:
328 val_str = val_str[:72] + "..."
329 # Format hook config nicely
330 if key == "hook" and isinstance(value, dict):
331 post_hook = value.get("post", "")
332 if post_hook:
333 val_str = f"post: {post_hook}"
334 print(f" {key + ':':<{label_width}} {val_str}")
335
336 printed: set[str] = set()
337 for key in priority_keys:
338 if key in info and key not in skip_keys:
339 print_field(key, info[key])
340 printed.add(key)
341
342 # Remaining fields alphabetically
343 for key in sorted(info.keys()):
344 if key not in printed and key not in skip_keys:
345 print_field(key, info[key])
346
347 # Template variables and body line count from single parse
348 if body is not None:
349 variables = _scan_variables(body)
350 if variables:
351 vars_str = ", ".join(f"${v}" for v in variables)
352 print(f" {'variables:':<{label_width}} {vars_str}")
353
354 line_count = len(body.splitlines())
355 print(f" {'body:':<{label_width}} {line_count} lines")
356
357 print()
358
359
360def json_output(
361 *,
362 schedule: str | None = None,
363 source: str | None = None,
364 include_disabled: bool = False,
365) -> None:
366 """Print JSONL output with one config per line, including filename."""
367 configs = _collect_configs(
368 schedule=schedule, source=source, include_disabled=include_disabled
369 )
370
371 for key, info in sorted(configs.items()):
372 print(json.dumps(_to_jsonl_record(key, info), default=str))
373
374
375def _truncate_content(text: str, max_lines: int = 100) -> tuple[str, int]:
376 """Truncate text to max_lines, returning (text, omitted_count)."""
377 lines = text.splitlines()
378 if len(lines) <= max_lines:
379 return text, 0
380 # Show first half and last half
381 half = max_lines // 2
382 truncated = (
383 lines[:half]
384 + ["", f"... ({len(lines) - max_lines} lines omitted)"]
385 + lines[-half:]
386 )
387 return "\n".join(truncated), len(lines) - max_lines
388
389
390def _format_section(title: str, content: str, full: bool = False) -> None:
391 """Print a section with header and content."""
392 print(f"\n{'=' * 60}")
393 print(f" {title}")
394 print(f"{'=' * 60}\n")
395 if not content or not content.strip():
396 print("(empty)")
397 elif full:
398 print(content)
399 else:
400 truncated, omitted = _truncate_content(content)
401 print(truncated)
402 if omitted:
403 print(f"\n(use --full to see all {omitted + 100} lines)")
404
405
406def _yesterday() -> str:
407 """Return yesterday's date in YYYYMMDD format."""
408 return (datetime.now() - timedelta(days=1)).strftime("%Y%m%d")
409
410
411def show_prompt_context(
412 name: str,
413 *,
414 day: str | None = None,
415 segment: str | None = None,
416 facet: str | None = None,
417 activity: str | None = None,
418 query: str | None = None,
419 full: bool = False,
420) -> None:
421 """Show full prompt context via dry-run.
422
423 Builds config and pipes to `sol agents --dry-run` to show exactly
424 what would be sent to the LLM provider.
425 """
426 # Load prompt metadata
427 configs = get_talent_configs(include_disabled=True)
428 if name not in configs:
429 print(f"Prompt not found: {name}", file=sys.stderr)
430 sys.exit(1)
431
432 info = configs[name]
433 prompt_type = info.get("type", "prompt")
434 schedule = info.get("schedule")
435 is_multi_facet = info.get("multi_facet", False)
436
437 # Validate day format if provided
438 if day and (len(day) != 8 or not day.isdigit()):
439 print(f"Invalid --day format: {day}. Expected YYYYMMDD.", file=sys.stderr)
440 sys.exit(1)
441
442 # Validate arguments based on type and schedule
443 if prompt_type == "generate":
444 # Generators need day, and segment-scheduled need segment
445 if schedule == "segment" and not segment:
446 print(
447 f"Prompt '{name}' is segment-scheduled. Use --segment HHMMSS_LEN",
448 file=sys.stderr,
449 )
450 sys.exit(1)
451 if not day:
452 day = _yesterday()
453 print(f"Using day: {day} (yesterday)")
454 elif prompt_type == "prompt":
455 print(
456 f"Prompt '{name}' is a hook prompt and cannot be run directly.",
457 file=sys.stderr,
458 )
459 sys.exit(1)
460
461 # Activity-scheduled agents need --facet and --activity
462 if schedule == "activity":
463 if not facet:
464 try:
465 from think.facets import get_facets
466
467 facets = get_facets()
468 facet_names = [
469 k for k, v in facets.items() if not v.get("muted", False)
470 ]
471 print(
472 f"Prompt '{name}' is activity-scheduled. Use --facet NAME",
473 file=sys.stderr,
474 )
475 print(f"Available facets: {', '.join(facet_names)}", file=sys.stderr)
476 except Exception:
477 print(
478 f"Prompt '{name}' is activity-scheduled. Use --facet NAME",
479 file=sys.stderr,
480 )
481 sys.exit(1)
482
483 if not day:
484 day = _yesterday()
485 print(f"Using day: {day} (yesterday)")
486
487 if not activity:
488 from think.activities import load_activity_records
489
490 records = load_activity_records(facet, day)
491 if not records:
492 print(
493 f"No activity records for facet '{facet}' on {day}",
494 file=sys.stderr,
495 )
496 sys.exit(1)
497 print(
498 f"Prompt '{name}' is activity-scheduled. Use --activity ID",
499 file=sys.stderr,
500 )
501 print(f"Activities for {facet} on {day}:", file=sys.stderr)
502 for r in records:
503 desc = r.get("description", "")
504 if len(desc) > 50:
505 desc = desc[:50] + "..."
506 print(
507 f" {r['id']} ({r.get('activity', '?')}) {desc}", file=sys.stderr
508 )
509 sys.exit(1)
510
511 if is_multi_facet and not facet:
512 # List available facets
513 try:
514 from think.facets import get_facets
515
516 facets = get_facets()
517 facet_names = [k for k, v in facets.items() if not v.get("muted", False)]
518 print(
519 f"Prompt '{name}' is multi-facet. Use --facet NAME",
520 file=sys.stderr,
521 )
522 print(f"Available facets: {', '.join(facet_names)}", file=sys.stderr)
523 except Exception:
524 print(
525 f"Prompt '{name}' is multi-facet. Use --facet NAME",
526 file=sys.stderr,
527 )
528 sys.exit(1)
529
530 # Build config for dry-run
531 config: dict[str, Any] = {"name": name}
532
533 if schedule == "activity":
534 # Build activity config matching dream.py:run_activity_prompts()
535 from think.activities import get_activity_output_path, load_activity_records
536
537 records = load_activity_records(facet, day)
538 record = None
539 for r in records:
540 if r.get("id") == activity:
541 record = r
542 break
543
544 if not record:
545 print(
546 f"Activity '{activity}' not found in facet '{facet}' on {day}",
547 file=sys.stderr,
548 )
549 sys.exit(1)
550
551 segments = record.get("segments", [])
552 if not segments:
553 print(f"Activity '{activity}' has no segments", file=sys.stderr)
554 sys.exit(1)
555
556 output_format = info.get("output", "md")
557 config["day"] = day
558 config["facet"] = facet
559 config["span"] = segments
560 config["activity"] = record
561 config["output"] = output_format
562 config["output_path"] = str(
563 get_activity_output_path(facet, day, activity, name, output_format)
564 )
565 elif prompt_type == "generate":
566 config["day"] = day
567 config["output"] = info.get("output", "md")
568 if segment:
569 config["segment"] = segment
570 if facet:
571 config["facet"] = facet
572 else:
573 # Cogitate prompt - use get_agent() to build full config with instructions
574 from think.talent import get_agent
575
576 try:
577 agent_config = get_agent(name, facet=facet)
578 config.update(agent_config)
579 except Exception as e:
580 print(f"Failed to load agent config: {e}", file=sys.stderr)
581 sys.exit(1)
582
583 # Override prompt with user query
584 if query:
585 config["prompt"] = query
586 else:
587 config["prompt"] = "(no --query provided)"
588
589 # Run sol agents --dry-run
590 config_json = json.dumps(config)
591 try:
592 result = subprocess.run(
593 ["sol", "agents", "--dry-run"],
594 input=config_json + "\n",
595 capture_output=True,
596 text=True,
597 timeout=30,
598 )
599 except subprocess.TimeoutExpired:
600 print("Dry-run timed out", file=sys.stderr)
601 sys.exit(1)
602 except FileNotFoundError:
603 print("Could not find 'sol' command", file=sys.stderr)
604 sys.exit(1)
605
606 if result.returncode != 0:
607 print(f"Dry-run failed: {result.stderr}", file=sys.stderr)
608 sys.exit(1)
609
610 # Parse JSONL output to find dry_run event
611 dry_run_event = None
612 for line in result.stdout.strip().splitlines():
613 if not line:
614 continue
615 try:
616 event = json.loads(line)
617 if event.get("event") == "dry_run":
618 dry_run_event = event
619 break
620 elif event.get("event") == "error":
621 print(f"Error: {event.get('error')}", file=sys.stderr)
622 sys.exit(1)
623 except json.JSONDecodeError:
624 continue
625
626 if not dry_run_event:
627 print("No dry_run event received", file=sys.stderr)
628 if result.stderr:
629 print(result.stderr, file=sys.stderr)
630 sys.exit(1)
631
632 # Format and display output
633 print(f"\n Dry-run for: {name} ({dry_run_event.get('type', 'unknown')})")
634 print(f" Provider: {dry_run_event.get('provider')} / {dry_run_event.get('model')}")
635 if dry_run_event.get("day"):
636 print(f" Day: {dry_run_event.get('day')}")
637 if dry_run_event.get("segment"):
638 print(f" Segment: {dry_run_event.get('segment')}")
639 if activity:
640 act_type = config.get("activity", {}).get("activity", "unknown")
641 span = config.get("span", [])
642 print(f" Activity: {activity} ({act_type}, {len(span)} segments)")
643 print(f" Facet: {facet}")
644 if dry_run_event.get("output_path"):
645 print(f" Output: {dry_run_event.get('output_path')}")
646
647 # Pre-hook info
648 if dry_run_event.get("pre_hook"):
649 mods = dry_run_event.get("pre_hook_modifications", [])
650 print(
651 f" Pre-hook: {dry_run_event.get('pre_hook')} (modified: {', '.join(mods) or 'none'})"
652 )
653
654 # System instruction (show before first if pre-hook modified it)
655 if dry_run_event.get("system_instruction_before"):
656 _format_section(
657 "SYSTEM INSTRUCTION (before pre-hook)",
658 dry_run_event.get("system_instruction_before", ""),
659 full=full,
660 )
661 _format_section(
662 f"SYSTEM INSTRUCTION (source: {dry_run_event.get('system_instruction_source', 'unknown')})",
663 dry_run_event.get("system_instruction", ""),
664 full=full,
665 )
666
667 # User instruction (agents only, show before first if pre-hook modified it)
668 if dry_run_event.get("user_instruction"):
669 if dry_run_event.get("user_instruction_before"):
670 _format_section(
671 "USER INSTRUCTION (before pre-hook)",
672 dry_run_event.get("user_instruction_before", ""),
673 full=full,
674 )
675 _format_section(
676 "USER INSTRUCTION", dry_run_event.get("user_instruction", ""), full=full
677 )
678
679 # Extra context (agents only)
680 if dry_run_event.get("extra_context"):
681 _format_section(
682 "EXTRA CONTEXT", dry_run_event.get("extra_context", ""), full=full
683 )
684
685 # Prompt (show before first if pre-hook modified it)
686 prompt_source = dry_run_event.get("prompt_source", "")
687 if prompt_source:
688 prompt_source = f" (source: {_relative_path(prompt_source)})"
689 if dry_run_event.get("prompt_before"):
690 _format_section(
691 "PROMPT (before pre-hook)",
692 dry_run_event.get("prompt_before", ""),
693 full=full,
694 )
695 _format_section(
696 f"PROMPT{prompt_source}", dry_run_event.get("prompt", ""), full=full
697 )
698
699 # Transcript (generators only, show before first if pre-hook modified it)
700 if "transcript" in dry_run_event:
701 chars = dry_run_event.get("transcript_chars", 0)
702 files = dry_run_event.get("transcript_files", 0)
703 if dry_run_event.get("transcript_before"):
704 before_chars = dry_run_event.get("transcript_before_chars", 0)
705 _format_section(
706 f"TRANSCRIPT (before pre-hook, {before_chars:,} chars)",
707 dry_run_event.get("transcript_before", ""),
708 full=full,
709 )
710 _format_section(
711 f"TRANSCRIPT ({chars:,} chars from {files} files)",
712 dry_run_event.get("transcript", ""),
713 full=full,
714 )
715
716 # Tools (agents only)
717 if dry_run_event.get("tools"):
718 tools = dry_run_event.get("tools", [])
719 if isinstance(tools, list):
720 tools_str = ", ".join(tools)
721 else:
722 tools_str = str(tools)
723 print(f"\n{'=' * 60}")
724 print(" TOOLS")
725 print(f"{'=' * 60}\n")
726 print(tools_str)
727
728 print()
729
730
731def _find_run_file(agents_dir: Path, agent_id: str) -> Path | None:
732 """Locate an agent run JSONL file by ID."""
733 for match in agents_dir.glob(f"*/{agent_id}.jsonl"):
734 return match
735 for match in agents_dir.glob(f"*/{agent_id}_active.jsonl"):
736 return match
737 return None
738
739
740def _parse_run_stats(jsonl_path: Path) -> dict[str, Any]:
741 """Parse an agent JSONL file for summary statistics."""
742 stats: dict[str, Any] = {
743 "event_count": 0,
744 "tool_count": 0,
745 "model": None,
746 "usage": None,
747 "request": None,
748 }
749 for line in jsonl_path.read_text().splitlines():
750 if not line.strip():
751 continue
752 try:
753 event = json.loads(line)
754 except json.JSONDecodeError:
755 continue
756 etype = event.get("event")
757 if etype == "request":
758 stats["request"] = event
759 continue
760 stats["event_count"] += 1
761 if etype == "tool_start":
762 stats["tool_count"] += 1
763 elif etype == "start":
764 stats["model"] = event.get("model")
765 elif etype == "finish":
766 stats["usage"] = event.get("usage")
767 return stats
768
769
770def _format_bytes(n: int) -> str:
771 """Format byte count as human-readable string."""
772 if n < 1000:
773 return str(n)
774 elif n < 1_000_000:
775 return f"{n / 1000:.1f}K"
776 else:
777 return f"{n / 1_000_000:.1f}M"
778
779
780def _format_cost(cost_usd: float | None) -> str:
781 """Format USD cost as rounded cents."""
782 if cost_usd is None:
783 return "-"
784 cents = round(cost_usd * 100)
785 if cents == 0 and cost_usd > 0:
786 return "<1¢"
787 return f"{cents}¢"
788
789
790def _get_output_size(request_event: dict[str, Any], journal_root: str) -> int | None:
791 """Get output file size in bytes from a request event, or None."""
792 from think.talent import get_output_path
793
794 req_output = request_event.get("output")
795 if not req_output:
796 return None
797
798 # Prefer explicit output_path (set for activity agents, custom paths)
799 if request_event.get("output_path"):
800 out_path = Path(request_event["output_path"])
801 else:
802 req_day = request_event.get("day")
803 if not req_day:
804 return None
805 req_segment = request_event.get("segment")
806 req_facet = request_event.get("facet")
807 req_name = request_event.get("name", "unified")
808 req_env = request_event.get("env") or {}
809 req_stream = req_env.get("SOL_STREAM") if req_env else None
810 day_dir = Path(journal_root) / req_day
811 out_path = get_output_path(
812 day_dir,
813 req_name,
814 segment=req_segment,
815 output_format=req_output,
816 facet=req_facet,
817 stream=req_stream,
818 )
819
820 if out_path.exists():
821 return out_path.stat().st_size
822 return None
823
824
825def _print_summary(records: list[dict[str, Any]]) -> None:
826 """Print grouped summary of agent runs."""
827 from collections import defaultdict
828
829 groups: dict[str, list[dict[str, Any]]] = defaultdict(list)
830 for r in records:
831 groups[r.get("name", "unknown")].append(r)
832
833 total_pass = 0
834 total_fail = 0
835 total_runtime = 0.0
836
837 for name in sorted(groups):
838 runs = groups[name]
839 passed = sum(1 for r in runs if r.get("status") == "completed")
840 failed = len(runs) - passed
841 runtimes = [r.get("runtime_seconds") or 0 for r in runs]
842 min_rt = min(runtimes)
843 max_rt = max(runtimes)
844 total_rt = sum(runtimes)
845
846 total_pass += passed
847 total_fail += failed
848 total_runtime += total_rt
849
850 if min_rt == max_rt:
851 rt_str = f"{min_rt:.1f}s"
852 else:
853 rt_str = f"{min_rt:.1f}s–{max_rt:.1f}s"
854
855 status_str = f"{passed}✓"
856 if failed:
857 status_str += f" {failed}✗"
858
859 print(f" {name:<20} {status_str:<10} {rt_str}")
860
861 print(f" {'—' * 40}")
862 status_str = f"{total_pass}✓"
863 if total_fail:
864 status_str += f" {total_fail}✗"
865 print(f" {'total':<20} {status_str:<10} {total_runtime:.1f}s")
866
867
868def logs_runs(
869 *,
870 agent: str | None = None,
871 count: int | None = None,
872 day: str | None = None,
873 daily: bool = False,
874 errors: bool = False,
875 summary: bool = False,
876) -> None:
877 """Print one-line summaries of recent agent runs from day-index files."""
878 from think.models import calc_agent_cost
879 from think.utils import get_journal
880
881 journal_root = get_journal()
882 agents_dir = Path(journal_root) / "agents"
883 if not agents_dir.is_dir():
884 return
885
886 # Validate --day format
887 if day and (len(day) != 8 or not day.isdigit()):
888 print(f"Invalid --day format: {day}. Expected YYYYMMDD.", file=sys.stderr)
889 sys.exit(1)
890
891 # Resolve default count: 50 for --daily, 20 otherwise
892 if count is None:
893 count = 50 if daily else 20
894
895 # Find day-index files, most recent first
896 if day:
897 day_file = agents_dir / f"{day}.jsonl"
898 day_files = [day_file] if day_file.is_file() else []
899 else:
900 day_files = sorted(agents_dir.glob("????????.jsonl"), reverse=True)
901 if not day_files:
902 return
903
904 # Collect records across day files
905 records: list[dict[str, Any]] = []
906 _schedule_lookup: dict[str, str | None] | None = None
907 for day_file in day_files:
908 for line in day_file.read_text().splitlines():
909 if not line.strip():
910 continue
911 try:
912 record = json.loads(line)
913 except json.JSONDecodeError:
914 continue
915 if agent and record.get("name") != agent:
916 continue
917 if errors and record.get("status") != "error":
918 continue
919 if daily:
920 rec_schedule = record.get("schedule")
921 if rec_schedule is None:
922 if _schedule_lookup is None:
923 all_configs = get_talent_configs(include_disabled=True)
924 _schedule_lookup = {
925 key: info.get("schedule")
926 for key, info in all_configs.items()
927 }
928 rec_schedule = _schedule_lookup.get(record.get("name"))
929 if rec_schedule != "daily":
930 continue
931 records.append(record)
932 if len(records) >= count:
933 break
934
935 if not records:
936 return
937
938 # Sort by timestamp descending and trim
939 records.sort(key=lambda r: r.get("ts", 0), reverse=True)
940 records = records[:count]
941
942 if summary:
943 _print_summary(records)
944 return
945
946 # Compute column widths
947 name_width = max((len(r.get("name", "")) for r in records), default=10)
948 name_width = max(name_width, 10)
949
950 for r in records:
951 agent_id = r.get("agent_id")
952 run_file = (
953 _find_run_file(agents_dir, agent_id) if isinstance(agent_id, str) else None
954 )
955 stats: dict[str, Any] = {
956 "event_count": 0,
957 "tool_count": 0,
958 "model": None,
959 "usage": None,
960 "request": None,
961 }
962 cost_usd: float | None = None
963 output_size: int | None = None
964 if run_file:
965 stats = _parse_run_stats(run_file)
966 cost_usd = calc_agent_cost(stats["model"] or r.get("model"), stats["usage"])
967 request_event = stats.get("request")
968 if isinstance(request_event, dict):
969 output_size = _get_output_size(request_event, journal_root)
970 r["_run_file"] = run_file
971 r["_stats"] = stats
972 r["_cost_usd"] = cost_usd
973 r["_output_size"] = output_size
974
975 today = datetime.now().strftime("%Y%m%d")
976 use_color = sys.stdout.isatty()
977
978 for r in records:
979 run_file = r.get("_run_file")
980 stats = r.get("_stats") or {}
981 cost_usd = r.get("_cost_usd")
982 output_size = r.get("_output_size")
983 agent_id = r.get("agent_id", "")
984
985 ts = r.get("ts", 0)
986 dt = datetime.fromtimestamp(ts / 1000)
987 day = r.get("day", dt.strftime("%Y%m%d"))
988
989 # Time column
990 if day == today:
991 time_str = dt.strftime("%H:%M")
992 else:
993 time_str = dt.strftime("%b %d %H:%M")
994
995 name = r.get("name", "unknown")
996 status = r.get("status", "")
997 status_sym = "\u2713" if status == "completed" else "\u2717"
998 runtime = r.get("runtime_seconds") or 0
999
1000 # Format runtime
1001 if runtime < 60:
1002 runtime_str = f"{runtime:.1f}s"
1003 else:
1004 mins = int(runtime // 60)
1005 secs = int(runtime % 60)
1006 runtime_str = f"{mins}m {secs:02d}s"
1007
1008 model = r.get("model", "")
1009 facet = r.get("facet") or ""
1010 cost_str = _format_cost(cost_usd) if run_file else "-"
1011 events_str = str(stats["event_count"]) if run_file else "-"
1012 tools_str = str(stats["tool_count"]) if run_file else "-"
1013 output_str = _format_bytes(output_size) if output_size is not None else "-"
1014
1015 facet_part = f" {facet}" if facet else ""
1016 line = (
1017 f"{agent_id:<15}{time_str:>12} {name:<{name_width}} {status_sym} "
1018 f"{runtime_str:>7} {cost_str:>4} {events_str:>3} {tools_str:>3} "
1019 f"{output_str:>5} {model}{facet_part}"
1020 )
1021
1022 if use_color and status != "completed":
1023 line = f"\033[31m{line}\033[0m"
1024
1025 print(line)
1026
1027
1028def _event_detail(event: dict[str, Any], etype: str) -> str:
1029 """Extract detail string for an event."""
1030 if etype == "request":
1031 return event.get("prompt", "") or ""
1032 elif etype == "start":
1033 model = event.get("model", "")
1034 prompt = event.get("prompt", "")
1035 return f'{model} "{prompt}"'
1036 elif etype == "thinking":
1037 return event.get("summary") or event.get("content") or ""
1038 elif etype == "tool_start":
1039 tool = event.get("tool", "")
1040 args = event.get("args")
1041 if isinstance(args, dict):
1042 parts = [f"{k}={json.dumps(v)}" for k, v in args.items()]
1043 return f"{tool}({', '.join(parts)})"
1044 return tool
1045 elif etype == "tool_end":
1046 tool = event.get("tool", "")
1047 result = event.get("result", "")
1048 return f"{tool} → {result}"
1049 elif etype == "agent_updated":
1050 return event.get("agent", "")
1051 elif etype == "finish":
1052 result = event.get("result", "")
1053 usage = event.get("usage")
1054 if usage:
1055 inp = usage.get("input_tokens", 0)
1056 out = usage.get("output_tokens", 0)
1057 return f"{result} [{inp}in/{out}out]"
1058 return result
1059 elif etype == "error":
1060 return event.get("error", "")
1061 return ""
1062
1063
1064def _format_event_line(event: dict[str, Any], *, full: bool = False) -> str:
1065 """Format a single JSONL event as a one-line summary."""
1066 ts = event.get("ts", 0)
1067 dt = datetime.fromtimestamp(ts / 1000)
1068 time_str = dt.strftime("%H:%M:%S") + f".{ts % 1000:03d}"
1069
1070 etype = event.get("event", "?")
1071 label_map = {
1072 "request": "request",
1073 "start": "start",
1074 "thinking": "think",
1075 "tool_start": "tool",
1076 "tool_end": "tool_end",
1077 "agent_updated": "updated",
1078 "finish": "finish",
1079 "error": "error",
1080 }
1081 label = label_map.get(etype, etype)
1082
1083 detail = _event_detail(event, etype)
1084
1085 if full:
1086 detail = detail.replace("\n", "\\n")
1087 else:
1088 detail = detail.replace("\n", " ")
1089 max_detail = 100 - 24
1090 if len(detail) > max_detail:
1091 detail = detail[: max_detail - 1] + "…"
1092
1093 return f"{time_str} {label:<8} {detail}"
1094
1095
1096def log_run(agent_id: str, *, json_mode: bool = False, full: bool = False) -> None:
1097 """Show events for a single agent run."""
1098 from think.utils import get_journal
1099
1100 agents_dir = Path(get_journal()) / "agents"
1101 run_file = _find_run_file(agents_dir, agent_id)
1102 if run_file is None:
1103 print(f"Agent run not found: {agent_id}", file=sys.stderr)
1104 sys.exit(1)
1105
1106 if json_mode:
1107 print(run_file.read_text(), end="")
1108 return
1109
1110 for line in run_file.read_text().splitlines():
1111 if not line.strip():
1112 continue
1113 try:
1114 event = json.loads(line)
1115 except json.JSONDecodeError:
1116 continue
1117 print(_format_event_line(event, full=full))
1118
1119
1120def main() -> None:
1121 """Entry point for sol talent."""
1122 parser = argparse.ArgumentParser(description="Inspect talent prompt configurations")
1123 subparsers = parser.add_subparsers(dest="subcommand")
1124
1125 # --- list subcommand ---
1126 list_parser = subparsers.add_parser("list", help="List prompts grouped by schedule")
1127 list_parser.add_argument(
1128 "--schedule",
1129 choices=["daily", "segment", "activity"],
1130 help="Filter by schedule type",
1131 )
1132 list_parser.add_argument(
1133 "--source", choices=["system", "app"], help="Filter by origin"
1134 )
1135 list_parser.add_argument(
1136 "--disabled", action="store_true", help="Include disabled prompts"
1137 )
1138 list_parser.add_argument("--json", action="store_true", help="Output as JSONL")
1139
1140 # --- show subcommand ---
1141 show_parser = subparsers.add_parser(
1142 "show", help="Show details for a specific prompt"
1143 )
1144 show_parser.add_argument("name", help="Prompt name")
1145 show_parser.add_argument("--json", action="store_true", help="Output as JSONL")
1146 show_parser.add_argument(
1147 "--prompt", action="store_true", help="Show full prompt context (dry-run mode)"
1148 )
1149 show_parser.add_argument("--day", metavar="YYYYMMDD", help="Day for prompt context")
1150 show_parser.add_argument(
1151 "--segment", metavar="HHMMSS_LEN", help="Segment for segment-scheduled prompts"
1152 )
1153 show_parser.add_argument(
1154 "--facet", metavar="NAME", help="Facet for multi-facet prompts"
1155 )
1156 show_parser.add_argument(
1157 "--activity", metavar="ID", help="Activity ID for activity-scheduled prompts"
1158 )
1159 show_parser.add_argument(
1160 "--query", metavar="TEXT", help="Sample query for tool agents"
1161 )
1162 show_parser.add_argument(
1163 "--full", action="store_true", help="Show full content without truncation"
1164 )
1165
1166 # --- logs subcommand ---
1167 logs_parser = subparsers.add_parser("logs", help="Show recent agent run log")
1168 logs_parser.add_argument("agent", nargs="?", help="Filter to a specific agent")
1169 logs_parser.add_argument(
1170 "-c",
1171 "--count",
1172 type=int,
1173 default=None,
1174 help="Number of runs to show (default: 20)",
1175 )
1176 logs_parser.add_argument(
1177 "--day", metavar="YYYYMMDD", help="Show only runs from this day"
1178 )
1179 logs_parser.add_argument(
1180 "--daily", action="store_true", help="Show only daily-scheduled runs"
1181 )
1182 logs_parser.add_argument(
1183 "--errors", action="store_true", help="Show only error runs"
1184 )
1185 logs_parser.add_argument(
1186 "--summary", action="store_true", help="Show grouped summary"
1187 )
1188
1189 # --- log subcommand ---
1190 log_parser = subparsers.add_parser("log", help="Show events for an agent run")
1191 log_parser.add_argument("id", help="Agent ID")
1192 log_parser.add_argument(
1193 "--json", action="store_true", dest="json_mode", help="Output raw JSONL"
1194 )
1195 log_parser.add_argument("--full", action="store_true", help="Expand event details")
1196
1197 args = setup_cli(parser)
1198
1199 if args.subcommand == "show":
1200 if args.prompt:
1201 show_prompt_context(
1202 args.name,
1203 day=args.day,
1204 segment=args.segment,
1205 facet=args.facet,
1206 activity=args.activity,
1207 query=args.query,
1208 full=args.full,
1209 )
1210 else:
1211 show_prompt(args.name, as_json=args.json)
1212 elif args.subcommand == "logs":
1213 logs_runs(
1214 agent=args.agent,
1215 count=args.count,
1216 day=args.day,
1217 daily=args.daily,
1218 errors=args.errors,
1219 summary=args.summary,
1220 )
1221 elif args.subcommand == "log":
1222 log_run(args.id, json_mode=args.json_mode, full=args.full)
1223 elif args.subcommand == "list" and args.json:
1224 json_output(
1225 schedule=args.schedule,
1226 source=args.source,
1227 include_disabled=args.disabled,
1228 )
1229 elif args.subcommand == "list":
1230 list_prompts(
1231 schedule=args.schedule,
1232 source=args.source,
1233 include_disabled=args.disabled,
1234 )
1235 else:
1236 # Default: no subcommand given -> list all prompts
1237 list_prompts()
1238
1239
1240if __name__ == "__main__":
1241 main()