personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""CLI commands for journal search and browsing.
5
6Provides human-friendly CLI access to journal operations, paralleling the
7tool functions in ``think/tools/search.py`` and ``think/tools/facets.py`` but
8optimized for terminal use.
9
10Mounted by ``think.call`` as ``sol call journal ...``.
11"""
12
13import json
14import shutil
15import subprocess
16import sys
17from datetime import datetime, timezone
18from pathlib import Path
19
20import typer
21
22from think.entities import scan_facet_relationships
23from think.facets import (
24 create_facet,
25 delete_facet,
26 facet_summary,
27 get_enabled_facets,
28 get_facet_news,
29 get_facets,
30 log_call_action,
31 rename_facet,
32 set_facet_muted,
33 update_facet,
34)
35from think.importers.utils import (
36 build_import_info,
37 get_import_details,
38 list_import_timestamps,
39)
40from think.indexer.journal import get_events as get_events_impl
41from think.indexer.journal import search_counts as search_counts_impl
42from think.indexer.journal import search_journal as search_journal_impl
43from think.utils import (
44 get_journal,
45 iter_segments,
46 resolve_sol_day,
47 resolve_sol_facet,
48 resolve_sol_segment,
49 truncated_echo,
50)
51
52app = typer.Typer(help="Journal search and browsing.")
53facet_app = typer.Typer(help="Facet management.")
54app.add_typer(facet_app, name="facet")
55retention_app = typer.Typer(help="Media retention management.")
56app.add_typer(retention_app, name="retention")
57
58
59@app.command()
60def search(
61 query: str = typer.Argument("", help="Search query (FTS5 syntax)."),
62 limit: int = typer.Option(10, "--limit", "-n", help="Max results."),
63 offset: int = typer.Option(0, "--offset", help="Skip N results."),
64 day: str | None = typer.Option(None, "--day", "-d", help="Filter by day YYYYMMDD."),
65 day_from: str | None = typer.Option(
66 None, "--day-from", help="Date range start YYYYMMDD."
67 ),
68 day_to: str | None = typer.Option(
69 None, "--day-to", help="Date range end YYYYMMDD."
70 ),
71 facet: str | None = typer.Option(None, "--facet", "-f", help="Filter by facet."),
72 agent: str | None = typer.Option(None, "--agent", "-a", help="Filter by agent."),
73 stream: str | None = typer.Option(
74 None, "--stream", help="Filter by stream (e.g. import.ics, archon)."
75 ),
76) -> None:
77 """Search the journal index."""
78 kwargs = {}
79 if day is not None:
80 kwargs["day"] = day
81 if day_from is not None:
82 kwargs["day_from"] = day_from
83 if day_to is not None:
84 kwargs["day_to"] = day_to
85 if facet is not None:
86 kwargs["facet"] = facet
87 if agent is not None:
88 kwargs["agent"] = agent
89 if stream is not None:
90 kwargs["stream"] = stream
91
92 total, results = search_journal_impl(query, limit, offset, **kwargs)
93
94 # Counts summary
95 counts = search_counts_impl(query, **kwargs)
96 typer.echo(f"{total} results")
97
98 facet_counts = counts.get("facets", {})
99 if facet_counts:
100 parts = [f"{f}:{c}" for f, c in facet_counts.most_common(10)]
101 typer.echo(f"Facets: {', '.join(parts)}")
102
103 agent_counts = counts.get("agents", {})
104 if agent_counts:
105 parts = [f"{a}:{c}" for a, c in agent_counts.most_common(10)]
106 typer.echo(f"Agents: {', '.join(parts)}")
107
108 day_counts = counts.get("days", {})
109 if day_counts:
110 top_days = sorted(day_counts.items(), key=lambda x: (-x[1], x[0]))[:10]
111 parts = [f"{d}:{c}" for d, c in top_days]
112 typer.echo(f"Top days: {', '.join(parts)}")
113
114 # Results
115 for r in results:
116 meta = r["metadata"]
117 stream_tag = f" | {meta['stream']}" if meta.get("stream") else ""
118 typer.echo(
119 f"\n--- {meta['day']} | {meta['facet']} | {meta['agent']}{stream_tag} | {r['id']} ---"
120 )
121 typer.echo(r["text"].strip())
122
123
124@app.command()
125def events(
126 day: str | None = typer.Argument(
127 default=None, help="Day YYYYMMDD (default: SOL_DAY env)."
128 ),
129 facet: str | None = typer.Option(None, "--facet", "-f", help="Filter by facet."),
130) -> None:
131 """List events for a day."""
132 day = resolve_sol_day(day)
133 items = get_events_impl(day, facet)
134 if not items:
135 typer.echo("No events found.")
136 return
137 for ev in items:
138 time_range = ""
139 if ev.get("start"):
140 time_range = ev["start"]
141 if ev.get("end"):
142 time_range += f"-{ev['end']}"
143 time_range = f" ({time_range})"
144 facet_tag = f" [{ev.get('facet', '')}]" if ev.get("facet") else ""
145 typer.echo(f"- {ev.get('title', 'Untitled')}{time_range}{facet_tag}")
146 if ev.get("summary"):
147 typer.echo(f" {ev['summary']}")
148 participants = ev.get("participants", [])
149 if participants:
150 typer.echo(f" Participants: {', '.join(participants)}")
151 if ev.get("details"):
152 typer.echo(f" Details: {ev['details']}")
153
154
155@facet_app.command()
156def show(
157 name: str | None = typer.Argument(
158 default=None, help="Facet name (default: SOL_FACET env)."
159 ),
160) -> None:
161 """Show facet summary."""
162 name = resolve_sol_facet(name)
163 try:
164 summary = facet_summary(name)
165 except FileNotFoundError:
166 typer.echo(f"Facet '{name}' not found.", err=True)
167 raise typer.Exit(1)
168 typer.echo(summary)
169
170
171@app.command()
172def facets(
173 all_: bool = typer.Option(False, "--all", help="Include muted facets."),
174) -> None:
175 """List facets."""
176 if all_:
177 all_facets = get_facets()
178 else:
179 all_facets = get_enabled_facets()
180 if not all_facets:
181 typer.echo("No facets found.")
182 return
183 for name, info in sorted(all_facets.items()):
184 title = info.get("title", name)
185 emoji = info.get("emoji", "")
186 desc = info.get("description", "")
187 muted = info.get("muted", False)
188
189 parts = []
190 if emoji:
191 parts.append(f"{emoji} {title} ({name})")
192 else:
193 parts.append(f"{title} ({name})")
194 if desc:
195 parts.append(f": {desc}")
196 if muted:
197 parts.append(" [muted]")
198 typer.echo(f"- {''.join(parts)}")
199
200
201@facet_app.command()
202def create(
203 title: str = typer.Argument(help="Display title for the new facet."),
204 emoji: str = typer.Option("📦", "--emoji", help="Icon emoji."),
205 color: str = typer.Option("#667eea", "--color", help="Hex color."),
206 description: str = typer.Option("", "--description", help="Facet description."),
207 consent: bool = typer.Option(
208 False,
209 "--consent",
210 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).",
211 ),
212) -> None:
213 """Create a new facet."""
214 try:
215 slug = create_facet(
216 title,
217 emoji=emoji,
218 color=color,
219 description=description,
220 consent=consent,
221 )
222 except ValueError as e:
223 typer.echo(f"Error: {e}", err=True)
224 raise typer.Exit(1)
225 typer.echo(f"Created facet '{slug}'.")
226
227
228@facet_app.command()
229def update(
230 name: str = typer.Argument(help="Facet name to update."),
231 title: str | None = typer.Option(None, "--title", help="New display title."),
232 description: str | None = typer.Option(
233 None, "--description", help="New description."
234 ),
235 emoji: str | None = typer.Option(None, "--emoji", help="New icon emoji."),
236 color: str | None = typer.Option(None, "--color", help="New hex color."),
237) -> None:
238 """Update facet configuration."""
239 kwargs = {}
240 if title is not None:
241 kwargs["title"] = title
242 if description is not None:
243 kwargs["description"] = description
244 if emoji is not None:
245 kwargs["emoji"] = emoji
246 if color is not None:
247 kwargs["color"] = color
248
249 if not kwargs:
250 typer.echo(
251 "Error: No fields to update. Use --title, --description, --emoji, or --color.",
252 err=True,
253 )
254 raise typer.Exit(1)
255
256 try:
257 changed = update_facet(name, **kwargs)
258 except FileNotFoundError:
259 typer.echo(f"Error: Facet '{name}' not found.", err=True)
260 raise typer.Exit(1)
261 except ValueError as e:
262 typer.echo(f"Error: {e}", err=True)
263 raise typer.Exit(1)
264
265 if changed:
266 fields = ", ".join(changed.keys())
267 typer.echo(f"Updated {fields} for facet '{name}'.")
268 else:
269 typer.echo(f"No changes for facet '{name}'.")
270
271
272@facet_app.command()
273def rename(
274 name: str = typer.Argument(help="Current facet name."),
275 new_name: str = typer.Argument(help="New facet name."),
276 consent: bool = typer.Option(
277 False,
278 "--consent",
279 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).",
280 ),
281) -> None:
282 """Rename a facet."""
283 try:
284 rename_facet(name, new_name)
285 except ValueError as e:
286 typer.echo(f"Error: {e}", err=True)
287 raise typer.Exit(1)
288 params: dict = {"old_name": name, "new_name": new_name}
289 if consent:
290 params["consent"] = True
291 log_call_action(facet=new_name, action="facet_rename", params=params)
292
293
294@facet_app.command()
295def mute(name: str = typer.Argument(help="Facet name to mute.")) -> None:
296 """Mute a facet (hide from default listings)."""
297 try:
298 set_facet_muted(name, True)
299 except FileNotFoundError:
300 typer.echo(f"Error: Facet '{name}' not found.", err=True)
301 raise typer.Exit(1)
302 typer.echo(f"Facet '{name}' muted.")
303
304
305@facet_app.command()
306def unmute(name: str = typer.Argument(help="Facet name to unmute.")) -> None:
307 """Unmute a facet (show in default listings)."""
308 try:
309 set_facet_muted(name, False)
310 except FileNotFoundError:
311 typer.echo(f"Error: Facet '{name}' not found.", err=True)
312 raise typer.Exit(1)
313 typer.echo(f"Facet '{name}' unmuted.")
314
315
316@facet_app.command()
317def delete(
318 name: str = typer.Argument(help="Facet name to delete."),
319 yes: bool = typer.Option(False, "--yes", help="Skip confirmation."),
320 consent: bool = typer.Option(
321 False,
322 "--consent",
323 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).",
324 ),
325) -> None:
326 """Delete a facet and all its data."""
327 if not yes:
328 typer.echo(
329 f"This will permanently delete facet '{name}' and all its data "
330 "(entities, todos, events, logs, news).\n"
331 "Use --yes to confirm."
332 )
333 raise typer.Exit(1)
334
335 try:
336 delete_facet(name, consent=consent)
337 except FileNotFoundError:
338 typer.echo(f"Error: Facet '{name}' not found.", err=True)
339 raise typer.Exit(1)
340 typer.echo(f"Deleted facet '{name}'.")
341
342
343@facet_app.command("merge")
344def merge(
345 source: str = typer.Argument(help="Source facet to merge from (will be deleted)."),
346 dest: str = typer.Option(..., "--into", help="Destination facet to merge into."),
347 consent: bool = typer.Option(
348 False,
349 "--consent",
350 help="Assert that explicit user approval was obtained before calling this command (agent audit trail).",
351 ),
352) -> None:
353 """Merge all data from SOURCE facet into DEST facet, then delete SOURCE."""
354 from apps.calendar import event as event_module
355 from apps.todos import todo as todo_module
356 from think.entities.observations import load_observations, save_observations
357 from think.entities.relationships import (
358 load_facet_relationship,
359 save_facet_relationship,
360 )
361
362 if source == dest:
363 typer.echo("Error: Source and destination facets must be different.", err=True)
364 raise typer.Exit(1)
365
366 journal = Path(get_journal())
367 src_path = journal / "facets" / source
368 dst_path = journal / "facets" / dest
369
370 if not src_path.is_dir():
371 typer.echo(f"Error: Facet '{source}' not found.", err=True)
372 raise typer.Exit(1)
373 if not dst_path.is_dir():
374 typer.echo(f"Error: Facet '{dest}' not found.", err=True)
375 raise typer.Exit(1)
376
377 entity_slugs = scan_facet_relationships(source)
378
379 open_todos: list[tuple[str, int, todo_module.TodoItem]] = []
380 todos_dir = src_path / "todos"
381 if todos_dir.is_dir():
382 for todo_file in sorted(todos_dir.glob("*.jsonl")):
383 checklist = todo_module.TodoChecklist.load(todo_file.stem, source)
384 for item in checklist.items:
385 if not item.completed and not item.cancelled:
386 open_todos.append((todo_file.stem, item.index, item))
387
388 open_events: list[tuple[str, int, event_module.CalendarEvent]] = []
389 calendar_dir = src_path / "calendar"
390 if calendar_dir.is_dir():
391 for calendar_file in sorted(calendar_dir.glob("*.jsonl")):
392 event_day = event_module.EventDay.load(calendar_file.stem, source)
393 for item in event_day.items:
394 if not item.cancelled:
395 open_events.append((calendar_file.stem, item.index, item))
396
397 news_to_copy: list[tuple[Path, Path]] = []
398 src_news_dir = src_path / "news"
399 dst_news_dir = dst_path / "news"
400 if src_news_dir.is_dir():
401 for news_file in sorted(src_news_dir.glob("*.md")):
402 dest_file = dst_news_dir / news_file.name
403 if not dest_file.exists():
404 news_to_copy.append((news_file, dest_file))
405
406 typer.echo(
407 f"Merging '{source}' into '{dest}': "
408 f"{len(entity_slugs)} entities, {len(open_todos)} open todos, "
409 f"{len(open_events)} calendar events, {len(news_to_copy)} news files. "
410 f"This cannot be undone. Proceeding..."
411 )
412
413 for entity_id in entity_slugs:
414 src_dir = src_path / "entities" / entity_id
415 dst_dir = dst_path / "entities" / entity_id
416 if dst_dir.exists():
417 src_rel = load_facet_relationship(source, entity_id)
418 dst_rel = load_facet_relationship(dest, entity_id)
419 if src_rel is not None or dst_rel is not None:
420 merged_rel = {**(src_rel or {}), **(dst_rel or {})}
421 save_facet_relationship(dest, entity_id, merged_rel)
422
423 src_obs = load_observations(source, entity_id)
424 dst_obs = load_observations(dest, entity_id)
425 seen = {(o.get("content", ""), o.get("observed_at")) for o in dst_obs}
426 merged_obs = list(dst_obs)
427 for observation in src_obs:
428 key = (
429 observation.get("content", ""),
430 observation.get("observed_at"),
431 )
432 if key not in seen:
433 merged_obs.append(observation)
434 seen.add(key)
435 save_observations(dest, entity_id, merged_obs)
436 shutil.rmtree(str(src_dir))
437 else:
438 dst_dir.parent.mkdir(parents=True, exist_ok=True)
439 shutil.move(str(src_dir), str(dst_dir))
440
441 for day, line_number, item in open_todos:
442 captured_item = item
443
444 def _append_todo(
445 checklist: todo_module.TodoChecklist,
446 ) -> tuple[todo_module.TodoChecklist, todo_module.TodoItem]:
447 new_item = checklist.append_entry(
448 captured_item.text,
449 captured_item.nudge,
450 created_at=captured_item.created_at,
451 )
452 return checklist, new_item
453
454 captured_line_number = line_number
455 captured_dest = dest
456
457 def _cancel_todo(
458 checklist: todo_module.TodoChecklist,
459 ) -> tuple[todo_module.TodoChecklist, todo_module.TodoItem]:
460 cancelled_item = checklist.cancel_entry(
461 captured_line_number,
462 cancelled_reason="moved_to_facet",
463 moved_to=captured_dest,
464 )
465 return checklist, cancelled_item
466
467 todo_module.TodoChecklist.locked_modify(day, dest, _append_todo)
468 todo_module.TodoChecklist.locked_modify(day, source, _cancel_todo)
469
470 for day, line_number, item in open_events:
471 captured_item = item
472
473 def _append_event(
474 event_day: event_module.EventDay,
475 ) -> tuple[event_module.EventDay, event_module.CalendarEvent]:
476 new_item = event_day.append_event(
477 captured_item.title,
478 captured_item.start,
479 captured_item.end,
480 captured_item.summary,
481 captured_item.participants,
482 created_at=captured_item.created_at,
483 )
484 return event_day, new_item
485
486 captured_line_number = line_number
487 captured_dest = dest
488
489 def _cancel_event(
490 event_day: event_module.EventDay,
491 ) -> tuple[event_module.EventDay, event_module.CalendarEvent]:
492 cancelled_item = event_day.cancel_event(
493 captured_line_number,
494 cancelled_reason="moved_to_facet",
495 moved_to=captured_dest,
496 )
497 return event_day, cancelled_item
498
499 event_module.EventDay.locked_modify(day, dest, _append_event)
500 event_module.EventDay.locked_modify(day, source, _cancel_event)
501
502 if news_to_copy:
503 dst_news_dir.mkdir(parents=True, exist_ok=True)
504 for src_file, dest_file in news_to_copy:
505 shutil.copy2(src_file, dest_file)
506
507 params: dict[str, object] = {
508 "source": source,
509 "dest": dest,
510 "entity_count": len(entity_slugs),
511 "todo_count": len(open_todos),
512 "calendar_count": len(open_events),
513 "news_count": len(news_to_copy),
514 }
515 if consent:
516 params["consent"] = True
517 log_call_action(facet=None, action="facet_merge", params=params)
518
519 delete_facet(source)
520
521 subprocess.run(
522 ["sol", "indexer", "--rescan-full"],
523 check=False,
524 capture_output=True,
525 )
526
527 typer.echo(f"Merged '{source}' into '{dest}'. Index rebuild started.")
528
529
530@app.command()
531def news(
532 name: str | None = typer.Argument(
533 default=None, help="Facet name (default: SOL_FACET env)."
534 ),
535 day: str | None = typer.Option(None, "--day", "-d", help="Specific day YYYYMMDD."),
536 limit: int = typer.Option(5, "--limit", "-n", help="Max days to show."),
537 cursor: str | None = typer.Option(None, "--cursor", help="Pagination cursor."),
538 write: bool = typer.Option(False, "--write", "-w", help="Write news from stdin."),
539) -> None:
540 """Read or write facet news."""
541 name = resolve_sol_facet(name)
542 if write:
543 day = resolve_sol_day(day)
544 elif day is None:
545 from think.utils import get_sol_day
546
547 day = get_sol_day()
548 if write:
549 # Read markdown from stdin
550 markdown = sys.stdin.read()
551 if not markdown.strip():
552 typer.echo("Error: no content provided on stdin.", err=True)
553 raise typer.Exit(1)
554
555 journal_path = Path(get_journal())
556 facet_path = journal_path / "facets" / name
557 if not facet_path.exists():
558 typer.echo(f"Error: facet '{name}' not found.", err=True)
559 raise typer.Exit(1)
560
561 news_dir = facet_path / "news"
562 news_dir.mkdir(exist_ok=True)
563 news_file = news_dir / f"{day}.md"
564 news_file.write_text(markdown, encoding="utf-8")
565 typer.echo(f"News for {day} saved to {name}.")
566 return
567
568 result = get_facet_news(name, cursor=cursor, limit=limit, day=day)
569 days = result.get("days", [])
570 if not days:
571 typer.echo("No news found.")
572 return
573 for entry in days:
574 typer.echo(entry.get("raw_content", ""))
575
576
577@app.command()
578def agents(
579 day: str | None = typer.Argument(
580 default=None, help="Day YYYYMMDD (default: SOL_DAY env)."
581 ),
582 segment: str | None = typer.Option(
583 None,
584 "--segment",
585 "-s",
586 help="Segment key (HHMMSS_LEN, default: SOL_SEGMENT env).",
587 ),
588) -> None:
589 """List available agent outputs for a day."""
590 day = resolve_sol_day(day)
591 segment = resolve_sol_segment(segment)
592 journal = get_journal()
593 day_path = Path(journal) / day
594
595 if not day_path.is_dir():
596 typer.echo(f"No data for {day}.")
597 return
598
599 if segment:
600 # List outputs in a specific segment directory
601 seg_path = day_path / segment / "agents"
602 if not seg_path.is_dir():
603 typer.echo(f"Segment {segment} not found for {day}.")
604 return
605 _list_outputs(seg_path, f"Segment {segment}")
606 return
607
608 # List daily agent outputs
609 agents_path = day_path / "agents"
610 if agents_path.is_dir():
611 _list_outputs(agents_path, "Daily agents")
612
613 # List segments and their outputs (across all streams)
614 seg_list = iter_segments(day)
615 if seg_list:
616 typer.echo(f"\nSegments: {len(seg_list)}")
617 for stream_name, seg_key, seg_path_obj in seg_list:
618 agents_dir = seg_path_obj / "agents"
619 outputs = _get_output_names(agents_dir)
620 label = f" {stream_name}/{seg_key}" if stream_name else f" {seg_key}"
621 if outputs:
622 typer.echo(f"{label}: {', '.join(outputs)}")
623 else:
624 typer.echo(f"{label}: (no outputs)")
625
626
627def _get_output_names(directory: Path) -> list[str]:
628 """Get sorted list of output file basenames in a directory."""
629 names = []
630 if not directory.is_dir():
631 return names
632
633 for f in sorted(directory.iterdir()):
634 if f.is_file() and f.suffix in (".md", ".json", ".jsonl"):
635 names.append(f.name)
636 elif f.is_dir():
637 for nested in sorted(f.iterdir()):
638 if nested.is_file() and nested.suffix in (".md", ".json", ".jsonl"):
639 names.append(f"{f.name}/{nested.name}")
640 return names
641
642
643def _list_outputs(directory: Path, label: str) -> None:
644 """Print output files in a directory."""
645 outputs = _get_output_names(directory)
646 if not outputs:
647 typer.echo(f"{label}: (none)")
648 return
649 typer.echo(f"{label}:")
650 for name in outputs:
651 size = (directory / name).stat().st_size
652 typer.echo(f" {name} ({size:,} bytes)")
653
654
655@app.command()
656def read(
657 agent: str = typer.Argument(help="Agent name (e.g., flow, meetings, activity)."),
658 day: str | None = typer.Option(
659 None, "--day", "-d", help="Day YYYYMMDD (default: SOL_DAY env)."
660 ),
661 segment: str | None = typer.Option(
662 None,
663 "--segment",
664 "-s",
665 help="Segment key (HHMMSS_LEN, default: SOL_SEGMENT env).",
666 ),
667 max_bytes: int = typer.Option(
668 16384, "--max", help="Max output bytes (0 = unlimited)."
669 ),
670) -> None:
671 """Read full content of an agent output."""
672 day = resolve_sol_day(day)
673 segment = resolve_sol_segment(segment)
674 journal = get_journal()
675 day_path = Path(journal) / day
676
677 if not day_path.is_dir():
678 typer.echo(f"No data for {day}.", err=True)
679 raise typer.Exit(1)
680
681 if segment:
682 base_dir = day_path / segment / "agents"
683 else:
684 base_dir = day_path / "agents"
685
686 if not base_dir.is_dir():
687 location = f"segment {segment}" if segment else "agents"
688 typer.echo(f"No {location} directory for {day}.", err=True)
689 raise typer.Exit(1)
690
691 # Try common extensions
692 for ext in (".md", ".json", ".jsonl"):
693 candidate = base_dir / f"{agent}{ext}"
694 if candidate.is_file():
695 truncated_echo(candidate.read_text(encoding="utf-8"), max_bytes)
696 return
697
698 # List what is available
699 available = _get_output_names(base_dir)
700 if available:
701 typer.echo(
702 f"Agent '{agent}' not found. Available: {', '.join(available)}", err=True
703 )
704 else:
705 typer.echo(f"Agent '{agent}' not found and no outputs exist.", err=True)
706 raise typer.Exit(1)
707
708
709# ============================================================================
710# Import Commands
711# ============================================================================
712
713
714def _derive_status(info: dict) -> str:
715 """Derive import status from info dict fields."""
716 if info.get("error"):
717 return "failed"
718 if info.get("processed"):
719 return "success"
720 if info.get("task_id"):
721 return "running"
722 return "pending"
723
724
725def _get_source_type(journal_root: Path, timestamp: str, info: dict) -> str:
726 """Get source type from manifest.json or infer from mime_type."""
727 manifest_path = journal_root / "imports" / timestamp / "manifest.json"
728 if manifest_path.exists():
729 try:
730 manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
731 if manifest.get("source_type"):
732 return manifest["source_type"]
733 except Exception:
734 pass
735
736 # Infer from mime_type
737 mime = info.get("mime_type", "")
738 if "calendar" in mime or "ics" in mime:
739 return "ics"
740 if "zip" in mime:
741 return "archive"
742 if "audio" in mime:
743 return "audio"
744 if "text" in mime:
745 return "text"
746 return "unknown"
747
748
749def _get_entry_count(journal_root: Path, timestamp: str, info: dict) -> int | None:
750 """Get entry count from manifest.json or total_files_created."""
751 manifest_path = journal_root / "imports" / timestamp / "manifest.json"
752 if manifest_path.exists():
753 try:
754 manifest = json.loads(manifest_path.read_text(encoding="utf-8"))
755 if manifest.get("entry_count") is not None:
756 return manifest["entry_count"]
757 except Exception:
758 pass
759 count = info.get("total_files_created")
760 if count is not None and count > 0:
761 return count
762 return None
763
764
765def _match_import_id(timestamps: list[str], prefix: str) -> str | None:
766 """Match a partial import ID prefix to a full timestamp."""
767 matches = [t for t in timestamps if t.startswith(prefix)]
768 if len(matches) == 1:
769 return matches[0]
770 if len(matches) > 1:
771 # Check for exact match first
772 if prefix in matches:
773 return prefix
774 typer.echo(
775 f"Ambiguous prefix '{prefix}' matches {len(matches)} imports. "
776 "Be more specific.",
777 err=True,
778 )
779 raise typer.Exit(1)
780 return None
781
782
783@app.command(name="imports")
784def imports_list(
785 limit: int = typer.Option(20, "--limit", "-n", help="Max results."),
786 source: str | None = typer.Option(
787 None, "--source", "-s", help="Filter by source type."
788 ),
789 json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
790) -> None:
791 """List recent imports with metadata."""
792 journal_root = Path(get_journal())
793 timestamps = list_import_timestamps(journal_root)
794
795 if not timestamps:
796 typer.echo("No imports found.")
797 return
798
799 # Reverse chronological
800 timestamps.sort(reverse=True)
801
802 # Build info for each import
803 rows = []
804 for ts in timestamps:
805 info = build_import_info(journal_root, ts)
806 source_type = _get_source_type(journal_root, ts, info)
807
808 if source and source_type != source:
809 continue
810
811 status = _derive_status(info)
812 filename = info.get("original_filename", "unknown")
813 entry_count = _get_entry_count(journal_root, ts, info)
814
815 rows.append(
816 {
817 "timestamp": ts,
818 "status": status,
819 "source_type": source_type,
820 "filename": filename,
821 "entry_count": entry_count,
822 "error": info.get("error"),
823 }
824 )
825
826 if len(rows) >= limit:
827 break
828
829 if not rows:
830 typer.echo("No imports found.")
831 return
832
833 if json_output:
834 typer.echo(json.dumps(rows, indent=2))
835 return
836
837 for row in rows:
838 parts = [f"{row['timestamp']} [{row['status']}]"]
839 parts.append(f"{row['source_type']:8s}")
840 parts.append(row["filename"])
841 if row["status"] == "failed" and row.get("error"):
842 parts.append(f"— error: {row['error']}")
843 elif row.get("entry_count") is not None:
844 parts.append(f"({row['entry_count']} entries)")
845 typer.echo(" ".join(parts))
846
847
848@app.command(name="import")
849def import_detail(
850 id: str = typer.Argument(help="Import ID or prefix (e.g. 20260309_143000)."),
851) -> None:
852 """Show full metadata for a single import."""
853 journal_root = Path(get_journal())
854 timestamps = list_import_timestamps(journal_root)
855
856 if not timestamps:
857 typer.echo("No imports found.", err=True)
858 raise typer.Exit(1)
859
860 # Match partial prefix
861 matched = _match_import_id(timestamps, id)
862 if matched is None:
863 typer.echo(f"Import '{id}' not found.", err=True)
864 raise typer.Exit(1)
865
866 info = build_import_info(journal_root, matched)
867 info["status"] = _derive_status(info)
868 info["source_type"] = _get_source_type(journal_root, matched, info)
869
870 # Merge manifest and detail data
871 try:
872 details = get_import_details(journal_root, matched)
873 if details.get("import_json"):
874 info["import_metadata"] = details["import_json"]
875 if details.get("imported_json"):
876 info["imported_results"] = details["imported_json"]
877 if details.get("segments_json"):
878 info["segments"] = details["segments_json"]
879 except FileNotFoundError:
880 pass
881
882 typer.echo(json.dumps(info, indent=2, default=str))
883
884
885# ============================================================================
886# Retention Commands
887# ============================================================================
888
889
890def _parse_age(value: str) -> int:
891 """Parse age string like '30d' or '30' to number of days."""
892 value = value.strip().lower()
893 if value.endswith("d"):
894 return int(value[:-1])
895 return int(value)
896
897
898@retention_app.command()
899def purge(
900 older_than: str | None = typer.Option(
901 None, "--older-than", help="Age threshold (e.g. 30d, 7d)."
902 ),
903 stream: str | None = typer.Option(
904 None, "--stream", help="Only purge from this stream."
905 ),
906 dry_run: bool = typer.Option(
907 False, "--dry-run", help="Show what would be deleted."
908 ),
909) -> None:
910 """Purge raw media from completed segments."""
911 from think.retention import _human_bytes, load_retention_config
912 from think.retention import purge as run_purge
913
914 older_than_days = _parse_age(older_than) if older_than else None
915 config = load_retention_config()
916
917 if dry_run:
918 typer.echo("DRY RUN — no files will be deleted.\n")
919
920 result = run_purge(
921 older_than_days=older_than_days,
922 stream_filter=stream,
923 dry_run=dry_run,
924 config=config,
925 )
926
927 if result.details:
928 for detail in result.details:
929 typer.echo(
930 f" {detail['day']}/{detail['stream']}/{detail['segment']}: "
931 f"{len(detail['files'])} files, {_human_bytes(detail['bytes_freed'])}"
932 )
933 typer.echo("")
934
935 action = "Would delete" if dry_run else "Deleted"
936 typer.echo(
937 f"{action} {result.files_deleted} files, "
938 f"freeing {_human_bytes(result.bytes_freed)}"
939 )
940
941 if result.segments_skipped_incomplete:
942 typer.echo(
943 f"Skipped {result.segments_skipped_incomplete} incomplete segments "
944 "(processing not finished)."
945 )
946
947 if result.segments_skipped_policy:
948 typer.echo(
949 f"Skipped {result.segments_skipped_policy} segments "
950 "(not yet eligible under retention policy)."
951 )
952
953
954@retention_app.command()
955def config(
956 mode: str | None = typer.Option(
957 None, "--mode", help="Retention mode: keep, days, or processed."
958 ),
959 days: int | None = typer.Option(
960 None, "--days", help="Days to retain (required when mode is 'days')."
961 ),
962 stream: str | None = typer.Option(
963 None, "--stream", help="Apply to a specific stream instead of global."
964 ),
965 clear: bool = typer.Option(
966 False, "--clear", help="Clear per-stream override (requires --stream)."
967 ),
968) -> None:
969 """Show or update retention configuration."""
970 import os
971
972 from think.retention import load_retention_config
973 from think.utils import get_config, get_journal
974
975 if mode is None and days is None and not clear:
976 cfg = load_retention_config()
977 result = {
978 "default": {"mode": cfg.default.mode, "days": cfg.default.days},
979 "per_stream": {
980 name: {"mode": policy.mode, "days": policy.days}
981 for name, policy in cfg.per_stream.items()
982 },
983 }
984 typer.echo(json.dumps(result, indent=2))
985 return
986
987 if clear:
988 if not stream:
989 typer.echo("--clear requires --stream", err=True)
990 raise typer.Exit(1)
991 if mode is not None or days is not None:
992 typer.echo("--clear cannot be combined with --mode or --days", err=True)
993 raise typer.Exit(1)
994
995 if mode is not None and mode not in ("keep", "days", "processed"):
996 typer.echo(
997 f"Invalid mode: {mode}. Must be keep, days, or processed.", err=True
998 )
999 raise typer.Exit(1)
1000
1001 if mode == "days" and days is None:
1002 typer.echo("--days is required when mode is 'days'.", err=True)
1003 raise typer.Exit(1)
1004
1005 if days is not None and days < 1:
1006 typer.echo("--days must be a positive integer.", err=True)
1007 raise typer.Exit(1)
1008
1009 journal_config = get_config()
1010 retention = journal_config.setdefault("retention", {})
1011
1012 if clear:
1013 ps = retention.get("per_stream", {})
1014 if stream in ps:
1015 del ps[stream]
1016 if not ps:
1017 retention.pop("per_stream", None)
1018 log_call_action(
1019 facet=None,
1020 action="retention_config",
1021 params={"stream": stream, "clear": True},
1022 )
1023 elif stream:
1024 ps = retention.setdefault("per_stream", {})
1025 entry = ps.setdefault(stream, {})
1026 if mode is not None:
1027 entry["raw_media"] = mode
1028 if days is not None:
1029 entry["raw_media_days"] = days
1030 log_call_action(
1031 facet=None,
1032 action="retention_config",
1033 params={"stream": stream, "mode": mode, "days": days},
1034 )
1035 else:
1036 if mode is not None:
1037 retention["raw_media"] = mode
1038 if days is not None:
1039 retention["raw_media_days"] = days
1040 log_call_action(
1041 facet=None,
1042 action="retention_config",
1043 params={"mode": mode, "days": days},
1044 )
1045
1046 config_dir = Path(get_journal()) / "config"
1047 config_dir.mkdir(parents=True, exist_ok=True)
1048 config_path = config_dir / "journal.json"
1049
1050 with open(config_path, "w", encoding="utf-8") as f:
1051 json.dump(journal_config, f, indent=2, ensure_ascii=False)
1052 f.write("\n")
1053 os.chmod(config_path, 0o600)
1054
1055 cfg = load_retention_config()
1056 result = {
1057 "default": {"mode": cfg.default.mode, "days": cfg.default.days},
1058 "per_stream": {
1059 name: {"mode": policy.mode, "days": policy.days}
1060 for name, policy in cfg.per_stream.items()
1061 },
1062 }
1063 typer.echo(json.dumps(result, indent=2))
1064
1065
1066@app.command(name="storage-summary")
1067def storage_summary(
1068 json_output: bool = typer.Option(False, "--json", help="Output as JSON."),
1069 check: bool = typer.Option(
1070 False, "--check", help="Check storage health thresholds."
1071 ),
1072) -> None:
1073 """Show journal storage summary."""
1074 from think.retention import compute_storage_summary
1075
1076 summary = compute_storage_summary()
1077
1078 if check:
1079 from think.retention import check_storage_health
1080 from think.utils import get_journal
1081
1082 journal_path = get_journal()
1083 warnings = check_storage_health(summary, journal_path)
1084
1085 if json_output:
1086 typer.echo(
1087 json.dumps(
1088 {
1089 "raw_media_bytes": summary.raw_media_bytes,
1090 "derived_bytes": summary.derived_bytes,
1091 "total_segments": summary.total_segments,
1092 "segments_with_raw": summary.segments_with_raw,
1093 "segments_purged": summary.segments_purged,
1094 "warnings": warnings,
1095 },
1096 indent=2,
1097 )
1098 )
1099 else:
1100 typer.echo(f"Raw media: {summary.raw_media_human}")
1101 typer.echo(f"AI-processed content: {summary.derived_human}")
1102 typer.echo(
1103 f"Segments: {summary.total_segments} total, "
1104 f"{summary.segments_with_raw} with raw media, "
1105 f"{summary.segments_purged} purged"
1106 )
1107 if warnings:
1108 typer.echo("")
1109 for w in warnings:
1110 typer.echo(f"⚠ {w['message']}")
1111 else:
1112 typer.echo("\nAll storage thresholds OK.")
1113 return
1114
1115 if json_output:
1116 typer.echo(
1117 json.dumps(
1118 {
1119 "raw_media_bytes": summary.raw_media_bytes,
1120 "derived_bytes": summary.derived_bytes,
1121 "total_segments": summary.total_segments,
1122 "segments_with_raw": summary.segments_with_raw,
1123 "segments_purged": summary.segments_purged,
1124 },
1125 indent=2,
1126 )
1127 )
1128 return
1129
1130 typer.echo(f"Raw media: {summary.raw_media_human}")
1131 typer.echo(f"AI-processed content: {summary.derived_human}")
1132 typer.echo(
1133 f"Segments: {summary.total_segments} total, "
1134 f"{summary.segments_with_raw} with raw media, "
1135 f"{summary.segments_purged} purged"
1136 )
1137
1138
1139@app.command("merge")
1140def journal_merge(
1141 source: str = typer.Argument(
1142 help="Path to source journal directory to merge from."
1143 ),
1144 dry_run: bool = typer.Option(
1145 False,
1146 "--dry-run",
1147 help="Show what would be merged without making changes.",
1148 ),
1149) -> None:
1150 """Merge segments, entities, facets, and imports from a source journal."""
1151 from think.merge import MergeSummary, merge_journals
1152 from think.utils import get_journal
1153
1154 source_path = Path(source).resolve()
1155
1156 if not source_path.is_dir():
1157 typer.echo(f"Error: '{source}' is not a directory.", err=True)
1158 raise typer.Exit(1)
1159
1160 import re
1161
1162 has_day = any(
1163 entry.is_dir() and re.match(r"^\d{8}$", entry.name)
1164 for entry in source_path.iterdir()
1165 )
1166 if not has_day:
1167 typer.echo(
1168 f"Error: '{source}' does not appear to be a journal (no YYYYMMDD directories found).",
1169 err=True,
1170 )
1171 raise typer.Exit(1)
1172
1173 target_path = Path(get_journal())
1174 run_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
1175 artifact_root = target_path.parent / f"{target_path.name}.merge" / run_id
1176 log_path = artifact_root / "decisions.jsonl"
1177 staging_path = artifact_root / "staging"
1178
1179 summary: MergeSummary = merge_journals(
1180 source_path,
1181 target_path,
1182 dry_run=dry_run,
1183 log_path=log_path,
1184 staging_path=staging_path,
1185 )
1186
1187 action = "Would merge" if dry_run else "Merged"
1188 typer.echo(f"\n{action}:")
1189 typer.echo(
1190 f" Segments: {summary.segments_copied} copied, {summary.segments_skipped} skipped, {summary.segments_errored} errored"
1191 )
1192 typer.echo(
1193 f" Entities: {summary.entities_created} created, {summary.entities_merged} merged, {summary.entities_staged} staged, {summary.entities_skipped} skipped"
1194 )
1195 typer.echo(
1196 f" Facets: {summary.facets_created} created, {summary.facets_merged} merged"
1197 )
1198 typer.echo(
1199 f" Imports: {summary.imports_copied} copied, {summary.imports_skipped} skipped"
1200 )
1201
1202 if summary.errors:
1203 typer.echo(f"\n{len(summary.errors)} errors:")
1204 for error in summary.errors:
1205 typer.echo(f" - {error}")
1206
1207 if log_path.exists():
1208 typer.echo(f"\nDecision log: {log_path}")
1209 if summary.entities_staged > 0:
1210 typer.echo(f"Staged entities: {staging_path}")
1211
1212 if not dry_run:
1213 subprocess.run(
1214 ["sol", "indexer", "--rescan-full"],
1215 check=False,
1216 capture_output=True,
1217 )
1218
1219 log_call_action(
1220 facet=None,
1221 action="journal_merge",
1222 params={
1223 "source": str(source_path),
1224 "segments_copied": summary.segments_copied,
1225 "entities_created": summary.entities_created,
1226 "entities_merged": summary.entities_merged,
1227 "entities_staged": summary.entities_staged,
1228 "facets_created": summary.facets_created,
1229 "facets_merged": summary.facets_merged,
1230 "imports_copied": summary.imports_copied,
1231 "errors": len(summary.errors),
1232 },
1233 )
1234
1235 typer.echo("Index rebuild started.")