personal memory agent
at main 1235 lines 41 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""CLI commands for journal search and browsing. 5 6Provides human-friendly CLI access to journal operations, paralleling the 7tool functions in ``think/tools/search.py`` and ``think/tools/facets.py`` but 8optimized for terminal use. 9 10Mounted by ``think.call`` as ``sol call journal ...``. 11""" 12 13import json 14import shutil 15import subprocess 16import sys 17from datetime import datetime, timezone 18from pathlib import Path 19 20import typer 21 22from think.entities import scan_facet_relationships 23from think.facets import ( 24 create_facet, 25 delete_facet, 26 facet_summary, 27 get_enabled_facets, 28 get_facet_news, 29 get_facets, 30 log_call_action, 31 rename_facet, 32 set_facet_muted, 33 update_facet, 34) 35from think.importers.utils import ( 36 build_import_info, 37 get_import_details, 38 list_import_timestamps, 39) 40from think.indexer.journal import get_events as get_events_impl 41from think.indexer.journal import search_counts as search_counts_impl 42from think.indexer.journal import search_journal as search_journal_impl 43from think.utils import ( 44 get_journal, 45 iter_segments, 46 resolve_sol_day, 47 resolve_sol_facet, 48 resolve_sol_segment, 49 truncated_echo, 50) 51 52app = typer.Typer(help="Journal search and browsing.") 53facet_app = typer.Typer(help="Facet management.") 54app.add_typer(facet_app, name="facet") 55retention_app = typer.Typer(help="Media retention management.") 56app.add_typer(retention_app, name="retention") 57 58 59@app.command() 60def search( 61 query: str = typer.Argument("", help="Search query (FTS5 syntax)."), 62 limit: int = typer.Option(10, "--limit", "-n", help="Max results."), 63 offset: int = typer.Option(0, "--offset", help="Skip N results."), 64 day: str | None = typer.Option(None, "--day", "-d", help="Filter by day YYYYMMDD."), 65 day_from: str | None = typer.Option( 66 None, "--day-from", help="Date range start YYYYMMDD." 67 ), 68 day_to: str | None = typer.Option( 69 None, "--day-to", help="Date range end YYYYMMDD." 70 ), 71 facet: str | None = typer.Option(None, "--facet", "-f", help="Filter by facet."), 72 agent: str | None = typer.Option(None, "--agent", "-a", help="Filter by agent."), 73 stream: str | None = typer.Option( 74 None, "--stream", help="Filter by stream (e.g. import.ics, archon)." 75 ), 76) -> None: 77 """Search the journal index.""" 78 kwargs = {} 79 if day is not None: 80 kwargs["day"] = day 81 if day_from is not None: 82 kwargs["day_from"] = day_from 83 if day_to is not None: 84 kwargs["day_to"] = day_to 85 if facet is not None: 86 kwargs["facet"] = facet 87 if agent is not None: 88 kwargs["agent"] = agent 89 if stream is not None: 90 kwargs["stream"] = stream 91 92 total, results = search_journal_impl(query, limit, offset, **kwargs) 93 94 # Counts summary 95 counts = search_counts_impl(query, **kwargs) 96 typer.echo(f"{total} results") 97 98 facet_counts = counts.get("facets", {}) 99 if facet_counts: 100 parts = [f"{f}:{c}" for f, c in facet_counts.most_common(10)] 101 typer.echo(f"Facets: {', '.join(parts)}") 102 103 agent_counts = counts.get("agents", {}) 104 if agent_counts: 105 parts = [f"{a}:{c}" for a, c in agent_counts.most_common(10)] 106 typer.echo(f"Agents: {', '.join(parts)}") 107 108 day_counts = counts.get("days", {}) 109 if day_counts: 110 top_days = sorted(day_counts.items(), key=lambda x: (-x[1], x[0]))[:10] 111 parts = [f"{d}:{c}" for d, c in top_days] 112 typer.echo(f"Top days: {', '.join(parts)}") 113 114 # Results 115 for r in results: 116 meta = r["metadata"] 117 stream_tag = f" | {meta['stream']}" if meta.get("stream") else "" 118 typer.echo( 119 f"\n--- {meta['day']} | {meta['facet']} | {meta['agent']}{stream_tag} | {r['id']} ---" 120 ) 121 typer.echo(r["text"].strip()) 122 123 124@app.command() 125def events( 126 day: str | None = typer.Argument( 127 default=None, help="Day YYYYMMDD (default: SOL_DAY env)." 128 ), 129 facet: str | None = typer.Option(None, "--facet", "-f", help="Filter by facet."), 130) -> None: 131 """List events for a day.""" 132 day = resolve_sol_day(day) 133 items = get_events_impl(day, facet) 134 if not items: 135 typer.echo("No events found.") 136 return 137 for ev in items: 138 time_range = "" 139 if ev.get("start"): 140 time_range = ev["start"] 141 if ev.get("end"): 142 time_range += f"-{ev['end']}" 143 time_range = f" ({time_range})" 144 facet_tag = f" [{ev.get('facet', '')}]" if ev.get("facet") else "" 145 typer.echo(f"- {ev.get('title', 'Untitled')}{time_range}{facet_tag}") 146 if ev.get("summary"): 147 typer.echo(f" {ev['summary']}") 148 participants = ev.get("participants", []) 149 if participants: 150 typer.echo(f" Participants: {', '.join(participants)}") 151 if ev.get("details"): 152 typer.echo(f" Details: {ev['details']}") 153 154 155@facet_app.command() 156def show( 157 name: str | None = typer.Argument( 158 default=None, help="Facet name (default: SOL_FACET env)." 159 ), 160) -> None: 161 """Show facet summary.""" 162 name = resolve_sol_facet(name) 163 try: 164 summary = facet_summary(name) 165 except FileNotFoundError: 166 typer.echo(f"Facet '{name}' not found.", err=True) 167 raise typer.Exit(1) 168 typer.echo(summary) 169 170 171@app.command() 172def facets( 173 all_: bool = typer.Option(False, "--all", help="Include muted facets."), 174) -> None: 175 """List facets.""" 176 if all_: 177 all_facets = get_facets() 178 else: 179 all_facets = get_enabled_facets() 180 if not all_facets: 181 typer.echo("No facets found.") 182 return 183 for name, info in sorted(all_facets.items()): 184 title = info.get("title", name) 185 emoji = info.get("emoji", "") 186 desc = info.get("description", "") 187 muted = info.get("muted", False) 188 189 parts = [] 190 if emoji: 191 parts.append(f"{emoji} {title} ({name})") 192 else: 193 parts.append(f"{title} ({name})") 194 if desc: 195 parts.append(f": {desc}") 196 if muted: 197 parts.append(" [muted]") 198 typer.echo(f"- {''.join(parts)}") 199 200 201@facet_app.command() 202def create( 203 title: str = typer.Argument(help="Display title for the new facet."), 204 emoji: str = typer.Option("📦", "--emoji", help="Icon emoji."), 205 color: str = typer.Option("#667eea", "--color", help="Hex color."), 206 description: str = typer.Option("", "--description", help="Facet description."), 207 consent: bool = typer.Option( 208 False, 209 "--consent", 210 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).", 211 ), 212) -> None: 213 """Create a new facet.""" 214 try: 215 slug = create_facet( 216 title, 217 emoji=emoji, 218 color=color, 219 description=description, 220 consent=consent, 221 ) 222 except ValueError as e: 223 typer.echo(f"Error: {e}", err=True) 224 raise typer.Exit(1) 225 typer.echo(f"Created facet '{slug}'.") 226 227 228@facet_app.command() 229def update( 230 name: str = typer.Argument(help="Facet name to update."), 231 title: str | None = typer.Option(None, "--title", help="New display title."), 232 description: str | None = typer.Option( 233 None, "--description", help="New description." 234 ), 235 emoji: str | None = typer.Option(None, "--emoji", help="New icon emoji."), 236 color: str | None = typer.Option(None, "--color", help="New hex color."), 237) -> None: 238 """Update facet configuration.""" 239 kwargs = {} 240 if title is not None: 241 kwargs["title"] = title 242 if description is not None: 243 kwargs["description"] = description 244 if emoji is not None: 245 kwargs["emoji"] = emoji 246 if color is not None: 247 kwargs["color"] = color 248 249 if not kwargs: 250 typer.echo( 251 "Error: No fields to update. Use --title, --description, --emoji, or --color.", 252 err=True, 253 ) 254 raise typer.Exit(1) 255 256 try: 257 changed = update_facet(name, **kwargs) 258 except FileNotFoundError: 259 typer.echo(f"Error: Facet '{name}' not found.", err=True) 260 raise typer.Exit(1) 261 except ValueError as e: 262 typer.echo(f"Error: {e}", err=True) 263 raise typer.Exit(1) 264 265 if changed: 266 fields = ", ".join(changed.keys()) 267 typer.echo(f"Updated {fields} for facet '{name}'.") 268 else: 269 typer.echo(f"No changes for facet '{name}'.") 270 271 272@facet_app.command() 273def rename( 274 name: str = typer.Argument(help="Current facet name."), 275 new_name: str = typer.Argument(help="New facet name."), 276 consent: bool = typer.Option( 277 False, 278 "--consent", 279 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).", 280 ), 281) -> None: 282 """Rename a facet.""" 283 try: 284 rename_facet(name, new_name) 285 except ValueError as e: 286 typer.echo(f"Error: {e}", err=True) 287 raise typer.Exit(1) 288 params: dict = {"old_name": name, "new_name": new_name} 289 if consent: 290 params["consent"] = True 291 log_call_action(facet=new_name, action="facet_rename", params=params) 292 293 294@facet_app.command() 295def mute(name: str = typer.Argument(help="Facet name to mute.")) -> None: 296 """Mute a facet (hide from default listings).""" 297 try: 298 set_facet_muted(name, True) 299 except FileNotFoundError: 300 typer.echo(f"Error: Facet '{name}' not found.", err=True) 301 raise typer.Exit(1) 302 typer.echo(f"Facet '{name}' muted.") 303 304 305@facet_app.command() 306def unmute(name: str = typer.Argument(help="Facet name to unmute.")) -> None: 307 """Unmute a facet (show in default listings).""" 308 try: 309 set_facet_muted(name, False) 310 except FileNotFoundError: 311 typer.echo(f"Error: Facet '{name}' not found.", err=True) 312 raise typer.Exit(1) 313 typer.echo(f"Facet '{name}' unmuted.") 314 315 316@facet_app.command() 317def delete( 318 name: str = typer.Argument(help="Facet name to delete."), 319 yes: bool = typer.Option(False, "--yes", help="Skip confirmation."), 320 consent: bool = typer.Option( 321 False, 322 "--consent", 323 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).", 324 ), 325) -> None: 326 """Delete a facet and all its data.""" 327 if not yes: 328 typer.echo( 329 f"This will permanently delete facet '{name}' and all its data " 330 "(entities, todos, events, logs, news).\n" 331 "Use --yes to confirm." 332 ) 333 raise typer.Exit(1) 334 335 try: 336 delete_facet(name, consent=consent) 337 except FileNotFoundError: 338 typer.echo(f"Error: Facet '{name}' not found.", err=True) 339 raise typer.Exit(1) 340 typer.echo(f"Deleted facet '{name}'.") 341 342 343@facet_app.command("merge") 344def merge( 345 source: str = typer.Argument(help="Source facet to merge from (will be deleted)."), 346 dest: str = typer.Option(..., "--into", help="Destination facet to merge into."), 347 consent: bool = typer.Option( 348 False, 349 "--consent", 350 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).", 351 ), 352) -> None: 353 """Merge all data from SOURCE facet into DEST facet, then delete SOURCE.""" 354 from apps.calendar import event as event_module 355 from apps.todos import todo as todo_module 356 from think.entities.observations import load_observations, save_observations 357 from think.entities.relationships import ( 358 load_facet_relationship, 359 save_facet_relationship, 360 ) 361 362 if source == dest: 363 typer.echo("Error: Source and destination facets must be different.", err=True) 364 raise typer.Exit(1) 365 366 journal = Path(get_journal()) 367 src_path = journal / "facets" / source 368 dst_path = journal / "facets" / dest 369 370 if not src_path.is_dir(): 371 typer.echo(f"Error: Facet '{source}' not found.", err=True) 372 raise typer.Exit(1) 373 if not dst_path.is_dir(): 374 typer.echo(f"Error: Facet '{dest}' not found.", err=True) 375 raise typer.Exit(1) 376 377 entity_slugs = scan_facet_relationships(source) 378 379 open_todos: list[tuple[str, int, todo_module.TodoItem]] = [] 380 todos_dir = src_path / "todos" 381 if todos_dir.is_dir(): 382 for todo_file in sorted(todos_dir.glob("*.jsonl")): 383 checklist = todo_module.TodoChecklist.load(todo_file.stem, source) 384 for item in checklist.items: 385 if not item.completed and not item.cancelled: 386 open_todos.append((todo_file.stem, item.index, item)) 387 388 open_events: list[tuple[str, int, event_module.CalendarEvent]] = [] 389 calendar_dir = src_path / "calendar" 390 if calendar_dir.is_dir(): 391 for calendar_file in sorted(calendar_dir.glob("*.jsonl")): 392 event_day = event_module.EventDay.load(calendar_file.stem, source) 393 for item in event_day.items: 394 if not item.cancelled: 395 open_events.append((calendar_file.stem, item.index, item)) 396 397 news_to_copy: list[tuple[Path, Path]] = [] 398 src_news_dir = src_path / "news" 399 dst_news_dir = dst_path / "news" 400 if src_news_dir.is_dir(): 401 for news_file in sorted(src_news_dir.glob("*.md")): 402 dest_file = dst_news_dir / news_file.name 403 if not dest_file.exists(): 404 news_to_copy.append((news_file, dest_file)) 405 406 typer.echo( 407 f"Merging '{source}' into '{dest}': " 408 f"{len(entity_slugs)} entities, {len(open_todos)} open todos, " 409 f"{len(open_events)} calendar events, {len(news_to_copy)} news files. " 410 f"This cannot be undone. Proceeding..." 411 ) 412 413 for entity_id in entity_slugs: 414 src_dir = src_path / "entities" / entity_id 415 dst_dir = dst_path / "entities" / entity_id 416 if dst_dir.exists(): 417 src_rel = load_facet_relationship(source, entity_id) 418 dst_rel = load_facet_relationship(dest, entity_id) 419 if src_rel is not None or dst_rel is not None: 420 merged_rel = {**(src_rel or {}), **(dst_rel or {})} 421 save_facet_relationship(dest, entity_id, merged_rel) 422 423 src_obs = load_observations(source, entity_id) 424 dst_obs = load_observations(dest, entity_id) 425 seen = {(o.get("content", ""), o.get("observed_at")) for o in dst_obs} 426 merged_obs = list(dst_obs) 427 for observation in src_obs: 428 key = ( 429 observation.get("content", ""), 430 observation.get("observed_at"), 431 ) 432 if key not in seen: 433 merged_obs.append(observation) 434 seen.add(key) 435 save_observations(dest, entity_id, merged_obs) 436 shutil.rmtree(str(src_dir)) 437 else: 438 dst_dir.parent.mkdir(parents=True, exist_ok=True) 439 shutil.move(str(src_dir), str(dst_dir)) 440 441 for day, line_number, item in open_todos: 442 captured_item = item 443 444 def _append_todo( 445 checklist: todo_module.TodoChecklist, 446 ) -> tuple[todo_module.TodoChecklist, todo_module.TodoItem]: 447 new_item = checklist.append_entry( 448 captured_item.text, 449 captured_item.nudge, 450 created_at=captured_item.created_at, 451 ) 452 return checklist, new_item 453 454 captured_line_number = line_number 455 captured_dest = dest 456 457 def _cancel_todo( 458 checklist: todo_module.TodoChecklist, 459 ) -> tuple[todo_module.TodoChecklist, todo_module.TodoItem]: 460 cancelled_item = checklist.cancel_entry( 461 captured_line_number, 462 cancelled_reason="moved_to_facet", 463 moved_to=captured_dest, 464 ) 465 return checklist, cancelled_item 466 467 todo_module.TodoChecklist.locked_modify(day, dest, _append_todo) 468 todo_module.TodoChecklist.locked_modify(day, source, _cancel_todo) 469 470 for day, line_number, item in open_events: 471 captured_item = item 472 473 def _append_event( 474 event_day: event_module.EventDay, 475 ) -> tuple[event_module.EventDay, event_module.CalendarEvent]: 476 new_item = event_day.append_event( 477 captured_item.title, 478 captured_item.start, 479 captured_item.end, 480 captured_item.summary, 481 captured_item.participants, 482 created_at=captured_item.created_at, 483 ) 484 return event_day, new_item 485 486 captured_line_number = line_number 487 captured_dest = dest 488 489 def _cancel_event( 490 event_day: event_module.EventDay, 491 ) -> tuple[event_module.EventDay, event_module.CalendarEvent]: 492 cancelled_item = event_day.cancel_event( 493 captured_line_number, 494 cancelled_reason="moved_to_facet", 495 moved_to=captured_dest, 496 ) 497 return event_day, cancelled_item 498 499 event_module.EventDay.locked_modify(day, dest, _append_event) 500 event_module.EventDay.locked_modify(day, source, _cancel_event) 501 502 if news_to_copy: 503 dst_news_dir.mkdir(parents=True, exist_ok=True) 504 for src_file, dest_file in news_to_copy: 505 shutil.copy2(src_file, dest_file) 506 507 params: dict[str, object] = { 508 "source": source, 509 "dest": dest, 510 "entity_count": len(entity_slugs), 511 "todo_count": len(open_todos), 512 "calendar_count": len(open_events), 513 "news_count": len(news_to_copy), 514 } 515 if consent: 516 params["consent"] = True 517 log_call_action(facet=None, action="facet_merge", params=params) 518 519 delete_facet(source) 520 521 subprocess.run( 522 ["sol", "indexer", "--rescan-full"], 523 check=False, 524 capture_output=True, 525 ) 526 527 typer.echo(f"Merged '{source}' into '{dest}'. Index rebuild started.") 528 529 530@app.command() 531def news( 532 name: str | None = typer.Argument( 533 default=None, help="Facet name (default: SOL_FACET env)." 534 ), 535 day: str | None = typer.Option(None, "--day", "-d", help="Specific day YYYYMMDD."), 536 limit: int = typer.Option(5, "--limit", "-n", help="Max days to show."), 537 cursor: str | None = typer.Option(None, "--cursor", help="Pagination cursor."), 538 write: bool = typer.Option(False, "--write", "-w", help="Write news from stdin."), 539) -> None: 540 """Read or write facet news.""" 541 name = resolve_sol_facet(name) 542 if write: 543 day = resolve_sol_day(day) 544 elif day is None: 545 from think.utils import get_sol_day 546 547 day = get_sol_day() 548 if write: 549 # Read markdown from stdin 550 markdown = sys.stdin.read() 551 if not markdown.strip(): 552 typer.echo("Error: no content provided on stdin.", err=True) 553 raise typer.Exit(1) 554 555 journal_path = Path(get_journal()) 556 facet_path = journal_path / "facets" / name 557 if not facet_path.exists(): 558 typer.echo(f"Error: facet '{name}' not found.", err=True) 559 raise typer.Exit(1) 560 561 news_dir = facet_path / "news" 562 news_dir.mkdir(exist_ok=True) 563 news_file = news_dir / f"{day}.md" 564 news_file.write_text(markdown, encoding="utf-8") 565 typer.echo(f"News for {day} saved to {name}.") 566 return 567 568 result = get_facet_news(name, cursor=cursor, limit=limit, day=day) 569 days = result.get("days", []) 570 if not days: 571 typer.echo("No news found.") 572 return 573 for entry in days: 574 typer.echo(entry.get("raw_content", "")) 575 576 577@app.command() 578def agents( 579 day: str | None = typer.Argument( 580 default=None, help="Day YYYYMMDD (default: SOL_DAY env)." 581 ), 582 segment: str | None = typer.Option( 583 None, 584 "--segment", 585 "-s", 586 help="Segment key (HHMMSS_LEN, default: SOL_SEGMENT env).", 587 ), 588) -> None: 589 """List available agent outputs for a day.""" 590 day = resolve_sol_day(day) 591 segment = resolve_sol_segment(segment) 592 journal = get_journal() 593 day_path = Path(journal) / day 594 595 if not day_path.is_dir(): 596 typer.echo(f"No data for {day}.") 597 return 598 599 if segment: 600 # List outputs in a specific segment directory 601 seg_path = day_path / segment / "agents" 602 if not seg_path.is_dir(): 603 typer.echo(f"Segment {segment} not found for {day}.") 604 return 605 _list_outputs(seg_path, f"Segment {segment}") 606 return 607 608 # List daily agent outputs 609 agents_path = day_path / "agents" 610 if agents_path.is_dir(): 611 _list_outputs(agents_path, "Daily agents") 612 613 # List segments and their outputs (across all streams) 614 seg_list = iter_segments(day) 615 if seg_list: 616 typer.echo(f"\nSegments: {len(seg_list)}") 617 for stream_name, seg_key, seg_path_obj in seg_list: 618 agents_dir = seg_path_obj / "agents" 619 outputs = _get_output_names(agents_dir) 620 label = f" {stream_name}/{seg_key}" if stream_name else f" {seg_key}" 621 if outputs: 622 typer.echo(f"{label}: {', '.join(outputs)}") 623 else: 624 typer.echo(f"{label}: (no outputs)") 625 626 627def _get_output_names(directory: Path) -> list[str]: 628 """Get sorted list of output file basenames in a directory.""" 629 names = [] 630 if not directory.is_dir(): 631 return names 632 633 for f in sorted(directory.iterdir()): 634 if f.is_file() and f.suffix in (".md", ".json", ".jsonl"): 635 names.append(f.name) 636 elif f.is_dir(): 637 for nested in sorted(f.iterdir()): 638 if nested.is_file() and nested.suffix in (".md", ".json", ".jsonl"): 639 names.append(f"{f.name}/{nested.name}") 640 return names 641 642 643def _list_outputs(directory: Path, label: str) -> None: 644 """Print output files in a directory.""" 645 outputs = _get_output_names(directory) 646 if not outputs: 647 typer.echo(f"{label}: (none)") 648 return 649 typer.echo(f"{label}:") 650 for name in outputs: 651 size = (directory / name).stat().st_size 652 typer.echo(f" {name} ({size:,} bytes)") 653 654 655@app.command() 656def read( 657 agent: str = typer.Argument(help="Agent name (e.g., flow, meetings, activity)."), 658 day: str | None = typer.Option( 659 None, "--day", "-d", help="Day YYYYMMDD (default: SOL_DAY env)." 660 ), 661 segment: str | None = typer.Option( 662 None, 663 "--segment", 664 "-s", 665 help="Segment key (HHMMSS_LEN, default: SOL_SEGMENT env).", 666 ), 667 max_bytes: int = typer.Option( 668 16384, "--max", help="Max output bytes (0 = unlimited)." 669 ), 670) -> None: 671 """Read full content of an agent output.""" 672 day = resolve_sol_day(day) 673 segment = resolve_sol_segment(segment) 674 journal = get_journal() 675 day_path = Path(journal) / day 676 677 if not day_path.is_dir(): 678 typer.echo(f"No data for {day}.", err=True) 679 raise typer.Exit(1) 680 681 if segment: 682 base_dir = day_path / segment / "agents" 683 else: 684 base_dir = day_path / "agents" 685 686 if not base_dir.is_dir(): 687 location = f"segment {segment}" if segment else "agents" 688 typer.echo(f"No {location} directory for {day}.", err=True) 689 raise typer.Exit(1) 690 691 # Try common extensions 692 for ext in (".md", ".json", ".jsonl"): 693 candidate = base_dir / f"{agent}{ext}" 694 if candidate.is_file(): 695 truncated_echo(candidate.read_text(encoding="utf-8"), max_bytes) 696 return 697 698 # List what is available 699 available = _get_output_names(base_dir) 700 if available: 701 typer.echo( 702 f"Agent '{agent}' not found. Available: {', '.join(available)}", err=True 703 ) 704 else: 705 typer.echo(f"Agent '{agent}' not found and no outputs exist.", err=True) 706 raise typer.Exit(1) 707 708 709# ============================================================================ 710# Import Commands 711# ============================================================================ 712 713 714def _derive_status(info: dict) -> str: 715 """Derive import status from info dict fields.""" 716 if info.get("error"): 717 return "failed" 718 if info.get("processed"): 719 return "success" 720 if info.get("task_id"): 721 return "running" 722 return "pending" 723 724 725def _get_source_type(journal_root: Path, timestamp: str, info: dict) -> str: 726 """Get source type from manifest.json or infer from mime_type.""" 727 manifest_path = journal_root / "imports" / timestamp / "manifest.json" 728 if manifest_path.exists(): 729 try: 730 manifest = json.loads(manifest_path.read_text(encoding="utf-8")) 731 if manifest.get("source_type"): 732 return manifest["source_type"] 733 except Exception: 734 pass 735 736 # Infer from mime_type 737 mime = info.get("mime_type", "") 738 if "calendar" in mime or "ics" in mime: 739 return "ics" 740 if "zip" in mime: 741 return "archive" 742 if "audio" in mime: 743 return "audio" 744 if "text" in mime: 745 return "text" 746 return "unknown" 747 748 749def _get_entry_count(journal_root: Path, timestamp: str, info: dict) -> int | None: 750 """Get entry count from manifest.json or total_files_created.""" 751 manifest_path = journal_root / "imports" / timestamp / "manifest.json" 752 if manifest_path.exists(): 753 try: 754 manifest = json.loads(manifest_path.read_text(encoding="utf-8")) 755 if manifest.get("entry_count") is not None: 756 return manifest["entry_count"] 757 except Exception: 758 pass 759 count = info.get("total_files_created") 760 if count is not None and count > 0: 761 return count 762 return None 763 764 765def _match_import_id(timestamps: list[str], prefix: str) -> str | None: 766 """Match a partial import ID prefix to a full timestamp.""" 767 matches = [t for t in timestamps if t.startswith(prefix)] 768 if len(matches) == 1: 769 return matches[0] 770 if len(matches) > 1: 771 # Check for exact match first 772 if prefix in matches: 773 return prefix 774 typer.echo( 775 f"Ambiguous prefix '{prefix}' matches {len(matches)} imports. " 776 "Be more specific.", 777 err=True, 778 ) 779 raise typer.Exit(1) 780 return None 781 782 783@app.command(name="imports") 784def imports_list( 785 limit: int = typer.Option(20, "--limit", "-n", help="Max results."), 786 source: str | None = typer.Option( 787 None, "--source", "-s", help="Filter by source type." 788 ), 789 json_output: bool = typer.Option(False, "--json", help="Output as JSON."), 790) -> None: 791 """List recent imports with metadata.""" 792 journal_root = Path(get_journal()) 793 timestamps = list_import_timestamps(journal_root) 794 795 if not timestamps: 796 typer.echo("No imports found.") 797 return 798 799 # Reverse chronological 800 timestamps.sort(reverse=True) 801 802 # Build info for each import 803 rows = [] 804 for ts in timestamps: 805 info = build_import_info(journal_root, ts) 806 source_type = _get_source_type(journal_root, ts, info) 807 808 if source and source_type != source: 809 continue 810 811 status = _derive_status(info) 812 filename = info.get("original_filename", "unknown") 813 entry_count = _get_entry_count(journal_root, ts, info) 814 815 rows.append( 816 { 817 "timestamp": ts, 818 "status": status, 819 "source_type": source_type, 820 "filename": filename, 821 "entry_count": entry_count, 822 "error": info.get("error"), 823 } 824 ) 825 826 if len(rows) >= limit: 827 break 828 829 if not rows: 830 typer.echo("No imports found.") 831 return 832 833 if json_output: 834 typer.echo(json.dumps(rows, indent=2)) 835 return 836 837 for row in rows: 838 parts = [f"{row['timestamp']} [{row['status']}]"] 839 parts.append(f"{row['source_type']:8s}") 840 parts.append(row["filename"]) 841 if row["status"] == "failed" and row.get("error"): 842 parts.append(f"— error: {row['error']}") 843 elif row.get("entry_count") is not None: 844 parts.append(f"({row['entry_count']} entries)") 845 typer.echo(" ".join(parts)) 846 847 848@app.command(name="import") 849def import_detail( 850 id: str = typer.Argument(help="Import ID or prefix (e.g. 20260309_143000)."), 851) -> None: 852 """Show full metadata for a single import.""" 853 journal_root = Path(get_journal()) 854 timestamps = list_import_timestamps(journal_root) 855 856 if not timestamps: 857 typer.echo("No imports found.", err=True) 858 raise typer.Exit(1) 859 860 # Match partial prefix 861 matched = _match_import_id(timestamps, id) 862 if matched is None: 863 typer.echo(f"Import '{id}' not found.", err=True) 864 raise typer.Exit(1) 865 866 info = build_import_info(journal_root, matched) 867 info["status"] = _derive_status(info) 868 info["source_type"] = _get_source_type(journal_root, matched, info) 869 870 # Merge manifest and detail data 871 try: 872 details = get_import_details(journal_root, matched) 873 if details.get("import_json"): 874 info["import_metadata"] = details["import_json"] 875 if details.get("imported_json"): 876 info["imported_results"] = details["imported_json"] 877 if details.get("segments_json"): 878 info["segments"] = details["segments_json"] 879 except FileNotFoundError: 880 pass 881 882 typer.echo(json.dumps(info, indent=2, default=str)) 883 884 885# ============================================================================ 886# Retention Commands 887# ============================================================================ 888 889 890def _parse_age(value: str) -> int: 891 """Parse age string like '30d' or '30' to number of days.""" 892 value = value.strip().lower() 893 if value.endswith("d"): 894 return int(value[:-1]) 895 return int(value) 896 897 898@retention_app.command() 899def purge( 900 older_than: str | None = typer.Option( 901 None, "--older-than", help="Age threshold (e.g. 30d, 7d)." 902 ), 903 stream: str | None = typer.Option( 904 None, "--stream", help="Only purge from this stream." 905 ), 906 dry_run: bool = typer.Option( 907 False, "--dry-run", help="Show what would be deleted." 908 ), 909) -> None: 910 """Purge raw media from completed segments.""" 911 from think.retention import _human_bytes, load_retention_config 912 from think.retention import purge as run_purge 913 914 older_than_days = _parse_age(older_than) if older_than else None 915 config = load_retention_config() 916 917 if dry_run: 918 typer.echo("DRY RUN — no files will be deleted.\n") 919 920 result = run_purge( 921 older_than_days=older_than_days, 922 stream_filter=stream, 923 dry_run=dry_run, 924 config=config, 925 ) 926 927 if result.details: 928 for detail in result.details: 929 typer.echo( 930 f" {detail['day']}/{detail['stream']}/{detail['segment']}: " 931 f"{len(detail['files'])} files, {_human_bytes(detail['bytes_freed'])}" 932 ) 933 typer.echo("") 934 935 action = "Would delete" if dry_run else "Deleted" 936 typer.echo( 937 f"{action} {result.files_deleted} files, " 938 f"freeing {_human_bytes(result.bytes_freed)}" 939 ) 940 941 if result.segments_skipped_incomplete: 942 typer.echo( 943 f"Skipped {result.segments_skipped_incomplete} incomplete segments " 944 "(processing not finished)." 945 ) 946 947 if result.segments_skipped_policy: 948 typer.echo( 949 f"Skipped {result.segments_skipped_policy} segments " 950 "(not yet eligible under retention policy)." 951 ) 952 953 954@retention_app.command() 955def config( 956 mode: str | None = typer.Option( 957 None, "--mode", help="Retention mode: keep, days, or processed." 958 ), 959 days: int | None = typer.Option( 960 None, "--days", help="Days to retain (required when mode is 'days')." 961 ), 962 stream: str | None = typer.Option( 963 None, "--stream", help="Apply to a specific stream instead of global." 964 ), 965 clear: bool = typer.Option( 966 False, "--clear", help="Clear per-stream override (requires --stream)." 967 ), 968) -> None: 969 """Show or update retention configuration.""" 970 import os 971 972 from think.retention import load_retention_config 973 from think.utils import get_config, get_journal 974 975 if mode is None and days is None and not clear: 976 cfg = load_retention_config() 977 result = { 978 "default": {"mode": cfg.default.mode, "days": cfg.default.days}, 979 "per_stream": { 980 name: {"mode": policy.mode, "days": policy.days} 981 for name, policy in cfg.per_stream.items() 982 }, 983 } 984 typer.echo(json.dumps(result, indent=2)) 985 return 986 987 if clear: 988 if not stream: 989 typer.echo("--clear requires --stream", err=True) 990 raise typer.Exit(1) 991 if mode is not None or days is not None: 992 typer.echo("--clear cannot be combined with --mode or --days", err=True) 993 raise typer.Exit(1) 994 995 if mode is not None and mode not in ("keep", "days", "processed"): 996 typer.echo( 997 f"Invalid mode: {mode}. Must be keep, days, or processed.", err=True 998 ) 999 raise typer.Exit(1) 1000 1001 if mode == "days" and days is None: 1002 typer.echo("--days is required when mode is 'days'.", err=True) 1003 raise typer.Exit(1) 1004 1005 if days is not None and days < 1: 1006 typer.echo("--days must be a positive integer.", err=True) 1007 raise typer.Exit(1) 1008 1009 journal_config = get_config() 1010 retention = journal_config.setdefault("retention", {}) 1011 1012 if clear: 1013 ps = retention.get("per_stream", {}) 1014 if stream in ps: 1015 del ps[stream] 1016 if not ps: 1017 retention.pop("per_stream", None) 1018 log_call_action( 1019 facet=None, 1020 action="retention_config", 1021 params={"stream": stream, "clear": True}, 1022 ) 1023 elif stream: 1024 ps = retention.setdefault("per_stream", {}) 1025 entry = ps.setdefault(stream, {}) 1026 if mode is not None: 1027 entry["raw_media"] = mode 1028 if days is not None: 1029 entry["raw_media_days"] = days 1030 log_call_action( 1031 facet=None, 1032 action="retention_config", 1033 params={"stream": stream, "mode": mode, "days": days}, 1034 ) 1035 else: 1036 if mode is not None: 1037 retention["raw_media"] = mode 1038 if days is not None: 1039 retention["raw_media_days"] = days 1040 log_call_action( 1041 facet=None, 1042 action="retention_config", 1043 params={"mode": mode, "days": days}, 1044 ) 1045 1046 config_dir = Path(get_journal()) / "config" 1047 config_dir.mkdir(parents=True, exist_ok=True) 1048 config_path = config_dir / "journal.json" 1049 1050 with open(config_path, "w", encoding="utf-8") as f: 1051 json.dump(journal_config, f, indent=2, ensure_ascii=False) 1052 f.write("\n") 1053 os.chmod(config_path, 0o600) 1054 1055 cfg = load_retention_config() 1056 result = { 1057 "default": {"mode": cfg.default.mode, "days": cfg.default.days}, 1058 "per_stream": { 1059 name: {"mode": policy.mode, "days": policy.days} 1060 for name, policy in cfg.per_stream.items() 1061 }, 1062 } 1063 typer.echo(json.dumps(result, indent=2)) 1064 1065 1066@app.command(name="storage-summary") 1067def storage_summary( 1068 json_output: bool = typer.Option(False, "--json", help="Output as JSON."), 1069 check: bool = typer.Option( 1070 False, "--check", help="Check storage health thresholds." 1071 ), 1072) -> None: 1073 """Show journal storage summary.""" 1074 from think.retention import compute_storage_summary 1075 1076 summary = compute_storage_summary() 1077 1078 if check: 1079 from think.retention import check_storage_health 1080 from think.utils import get_journal 1081 1082 journal_path = get_journal() 1083 warnings = check_storage_health(summary, journal_path) 1084 1085 if json_output: 1086 typer.echo( 1087 json.dumps( 1088 { 1089 "raw_media_bytes": summary.raw_media_bytes, 1090 "derived_bytes": summary.derived_bytes, 1091 "total_segments": summary.total_segments, 1092 "segments_with_raw": summary.segments_with_raw, 1093 "segments_purged": summary.segments_purged, 1094 "warnings": warnings, 1095 }, 1096 indent=2, 1097 ) 1098 ) 1099 else: 1100 typer.echo(f"Raw media: {summary.raw_media_human}") 1101 typer.echo(f"AI-processed content: {summary.derived_human}") 1102 typer.echo( 1103 f"Segments: {summary.total_segments} total, " 1104 f"{summary.segments_with_raw} with raw media, " 1105 f"{summary.segments_purged} purged" 1106 ) 1107 if warnings: 1108 typer.echo("") 1109 for w in warnings: 1110 typer.echo(f"{w['message']}") 1111 else: 1112 typer.echo("\nAll storage thresholds OK.") 1113 return 1114 1115 if json_output: 1116 typer.echo( 1117 json.dumps( 1118 { 1119 "raw_media_bytes": summary.raw_media_bytes, 1120 "derived_bytes": summary.derived_bytes, 1121 "total_segments": summary.total_segments, 1122 "segments_with_raw": summary.segments_with_raw, 1123 "segments_purged": summary.segments_purged, 1124 }, 1125 indent=2, 1126 ) 1127 ) 1128 return 1129 1130 typer.echo(f"Raw media: {summary.raw_media_human}") 1131 typer.echo(f"AI-processed content: {summary.derived_human}") 1132 typer.echo( 1133 f"Segments: {summary.total_segments} total, " 1134 f"{summary.segments_with_raw} with raw media, " 1135 f"{summary.segments_purged} purged" 1136 ) 1137 1138 1139@app.command("merge") 1140def journal_merge( 1141 source: str = typer.Argument( 1142 help="Path to source journal directory to merge from." 1143 ), 1144 dry_run: bool = typer.Option( 1145 False, 1146 "--dry-run", 1147 help="Show what would be merged without making changes.", 1148 ), 1149) -> None: 1150 """Merge segments, entities, facets, and imports from a source journal.""" 1151 from think.merge import MergeSummary, merge_journals 1152 from think.utils import get_journal 1153 1154 source_path = Path(source).resolve() 1155 1156 if not source_path.is_dir(): 1157 typer.echo(f"Error: '{source}' is not a directory.", err=True) 1158 raise typer.Exit(1) 1159 1160 import re 1161 1162 has_day = any( 1163 entry.is_dir() and re.match(r"^\d{8}$", entry.name) 1164 for entry in source_path.iterdir() 1165 ) 1166 if not has_day: 1167 typer.echo( 1168 f"Error: '{source}' does not appear to be a journal (no YYYYMMDD directories found).", 1169 err=True, 1170 ) 1171 raise typer.Exit(1) 1172 1173 target_path = Path(get_journal()) 1174 run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ") 1175 artifact_root = target_path.parent / f"{target_path.name}.merge" / run_id 1176 log_path = artifact_root / "decisions.jsonl" 1177 staging_path = artifact_root / "staging" 1178 1179 summary: MergeSummary = merge_journals( 1180 source_path, 1181 target_path, 1182 dry_run=dry_run, 1183 log_path=log_path, 1184 staging_path=staging_path, 1185 ) 1186 1187 action = "Would merge" if dry_run else "Merged" 1188 typer.echo(f"\n{action}:") 1189 typer.echo( 1190 f" Segments: {summary.segments_copied} copied, {summary.segments_skipped} skipped, {summary.segments_errored} errored" 1191 ) 1192 typer.echo( 1193 f" Entities: {summary.entities_created} created, {summary.entities_merged} merged, {summary.entities_staged} staged, {summary.entities_skipped} skipped" 1194 ) 1195 typer.echo( 1196 f" Facets: {summary.facets_created} created, {summary.facets_merged} merged" 1197 ) 1198 typer.echo( 1199 f" Imports: {summary.imports_copied} copied, {summary.imports_skipped} skipped" 1200 ) 1201 1202 if summary.errors: 1203 typer.echo(f"\n{len(summary.errors)} errors:") 1204 for error in summary.errors: 1205 typer.echo(f" - {error}") 1206 1207 if log_path.exists(): 1208 typer.echo(f"\nDecision log: {log_path}") 1209 if summary.entities_staged > 0: 1210 typer.echo(f"Staged entities: {staging_path}") 1211 1212 if not dry_run: 1213 subprocess.run( 1214 ["sol", "indexer", "--rescan-full"], 1215 check=False, 1216 capture_output=True, 1217 ) 1218 1219 log_call_action( 1220 facet=None, 1221 action="journal_merge", 1222 params={ 1223 "source": str(source_path), 1224 "segments_copied": summary.segments_copied, 1225 "entities_created": summary.entities_created, 1226 "entities_merged": summary.entities_merged, 1227 "entities_staged": summary.entities_staged, 1228 "facets_created": summary.facets_created, 1229 "facets_merged": summary.facets_merged, 1230 "imports_copied": summary.imports_copied, 1231 "errors": len(summary.errors), 1232 }, 1233 ) 1234 1235 typer.echo("Index rebuild started.")