personal memory agent
at main 759 lines 26 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4import os 5import re 6import sys 7from collections import Counter, defaultdict 8from datetime import datetime, timedelta 9from pathlib import Path 10from typing import Any 11 12from observe.screen import format_screen_text 13 14from .streams import read_segment_stream 15from .utils import day_from_path, day_path 16 17 18def _date_str(day_dir: str) -> str: 19 base = os.path.basename(os.path.normpath(day_dir)) 20 if not re.fullmatch(r"\d{8}", base): 21 raise ValueError("day_dir must end with YYYYMMDD") 22 return base 23 24 25def _filename_to_agent_key(filename: str) -> str: 26 """Convert output filename stem to agent key. 27 28 Reverse of get_output_name(): converts filesystem names back to agent keys. 29 30 Args: 31 filename: Filename stem (e.g., "entities" or "_todos_review") 32 33 Returns: 34 Agent key (e.g., "entities" or "todos:review") 35 """ 36 if filename.startswith("_"): 37 # App agent: "_app_name" -> "app:name" 38 parts = filename[1:].split("_", 1) 39 if len(parts) == 2: 40 return f"{parts[0]}:{parts[1]}" 41 return filename 42 43 44def _agent_matches_filter( 45 filename: str, agent_filter: dict[str, bool | str] | None 46) -> bool: 47 """Check if an agent output file matches the filter. 48 49 Args: 50 filename: Filename stem (e.g., "entities" or "_todos_review") 51 agent_filter: Dict mapping agent keys to bool/"required", or None for all 52 53 Returns: 54 True if the file should be included 55 """ 56 if agent_filter is None: 57 # None means include all agents 58 return True 59 60 if not agent_filter: 61 # Empty dict means no agents 62 return False 63 64 agent_key = _filename_to_agent_key(filename) 65 66 # Check if this agent is enabled in the filter 67 if agent_key in agent_filter: 68 value = agent_filter[agent_key] 69 return value is True or value == "required" 70 71 return False 72 73 74def _transcript_header(stream: str | None, filename: str) -> str: 75 """Generate contextual header for a transcript entry. 76 77 Uses stream name and filename to produce a descriptive header 78 instead of hardcoded "Audio Transcript". 79 """ 80 _IMPORT_LABELS = { 81 "import.chatgpt": "ChatGPT Conversation", 82 "import.claude": "Claude Conversation", 83 "import.gemini": "Gemini Conversation", 84 "import.ics": "Calendar Event", 85 "import.obsidian": "Note", 86 "import.kindle": "Highlights", 87 } 88 if stream and stream in _IMPORT_LABELS: 89 return _IMPORT_LABELS[stream] 90 return "Transcript" 91 92 93def _process_segment( 94 segment_path: Path, 95 date_str: str, 96 transcripts: bool, 97 percepts: bool, 98 agents: bool | dict[str, bool | str], 99) -> list[dict[str, Any]]: 100 """Process a single segment directory and return entries. 101 102 Args: 103 segment_path: Path to segment directory 104 date_str: Date in YYYYMMDD format 105 transcripts: Whether to load transcript content (JSONL and markdown) 106 percepts: Whether to load raw screen data from *screen.jsonl files 107 agents: Whether to load agent output summaries from *.md files. 108 Can be bool (all/none) or dict for selective filtering 109 (e.g., {"entities": True, "meetings": "required"}). 110 111 Returns: 112 List of entry dicts with timestamp, segment_key, prefix, content, name, etc. 113 """ 114 from think.utils import segment_parse 115 116 entries: list[dict[str, Any]] = [] 117 118 start_time, end_time = segment_parse(segment_path.name) 119 if not start_time or not end_time: 120 return entries 121 122 # Read stream identity 123 marker = read_segment_stream(segment_path) 124 stream = marker.get("stream") if marker else None 125 126 # Compute segment times 127 segment_key = segment_path.name 128 day_date = datetime.strptime(date_str, "%Y%m%d").date() 129 segment_start = datetime.combine(day_date, start_time) 130 segment_end = datetime.combine(day_date, end_time) 131 132 # Process transcript content (JSONL: legacy *audio.jsonl + new *_transcript.jsonl) 133 if transcripts: 134 jsonl_files = set() 135 for pattern in ("*audio.jsonl", "*_transcript.jsonl"): 136 jsonl_files.update(f for f in segment_path.glob(pattern) if f.is_file()) 137 for jsonl_file in sorted(jsonl_files): 138 from observe.hear import load_transcript 139 140 metadata, transcript_entries, formatted_text = load_transcript( 141 str(jsonl_file) 142 ) 143 if transcript_entries is None: 144 print( 145 f"Warning: Could not load transcript {jsonl_file.name}: {metadata.get('error')}", 146 file=sys.stderr, 147 ) 148 continue 149 150 entries.append( 151 { 152 "timestamp": segment_start, 153 "segment_key": segment_key, 154 "segment_start": segment_start, 155 "segment_end": segment_end, 156 "prefix": "transcript", 157 "content": formatted_text, 158 "name": f"{segment_path.name}/{jsonl_file.name}", 159 "stream": stream, 160 } 161 ) 162 163 # Process markdown transcript files (*_transcript.md + legacy imported.md) 164 md_files = set() 165 for pattern in ("*_transcript.md", "imported.md"): 166 md_files.update(f for f in segment_path.glob(pattern) if f.is_file()) 167 for md_file in sorted(md_files): 168 try: 169 content = md_file.read_text() 170 if content.strip(): 171 entries.append( 172 { 173 "timestamp": segment_start, 174 "segment_key": segment_key, 175 "segment_start": segment_start, 176 "segment_end": segment_end, 177 "prefix": "transcript", 178 "content": content, 179 "name": f"{segment_path.name}/{md_file.name}", 180 "stream": stream, 181 } 182 ) 183 except Exception as e: # pragma: no cover - warning only 184 print( 185 f"Warning: Could not read transcript {md_file.name}: {e}", 186 file=sys.stderr, 187 ) 188 189 # Process raw screen data from screen.jsonl and *_screen.jsonl 190 if percepts: 191 screen_files = list(segment_path.glob("*screen.jsonl")) 192 for screen_jsonl in screen_files: 193 try: 194 content = format_screen_text(screen_jsonl) 195 if content: 196 entries.append( 197 { 198 "timestamp": segment_start, 199 "segment_key": segment_key, 200 "segment_start": segment_start, 201 "segment_end": segment_end, 202 "prefix": "percept", 203 "content": content, 204 "name": f"{segment_path.name}/{screen_jsonl.name}", 205 "stream": stream, 206 } 207 ) 208 except Exception as e: # pragma: no cover - warning only 209 print( 210 f"Warning: Could not read JSONL file {screen_jsonl.name}: {e}", 211 file=sys.stderr, 212 ) 213 214 # Process agent output summaries from agents/**/*.md files (with optional filtering) 215 if agents: 216 # Convert bool to filter: True -> None (all), False handled by outer if 217 agent_filter = ( 218 None if agents is True else agents if isinstance(agents, dict) else None 219 ) 220 agents_dir = segment_path / "agents" 221 if agents_dir.is_dir(): 222 for md_file in sorted(agents_dir.rglob("*.md")): 223 if not md_file.is_file(): 224 continue 225 226 # Check if this agent matches the filter 227 if not _agent_matches_filter(md_file.stem, agent_filter): 228 continue 229 230 try: 231 content = md_file.read_text() 232 if content.strip(): 233 rel_md_path = md_file.relative_to(agents_dir).as_posix() 234 entries.append( 235 { 236 "timestamp": segment_start, 237 "segment_key": segment_key, 238 "segment_start": segment_start, 239 "segment_end": segment_end, 240 "prefix": "agent_output", 241 "output_name": md_file.stem, 242 "content": content, 243 "name": f"{segment_path.name}/agents/{rel_md_path}", 244 "stream": stream, 245 } 246 ) 247 except Exception as e: # pragma: no cover - warning only 248 print( 249 f"Warning: Could not read file {md_file.name}: {e}", 250 file=sys.stderr, 251 ) 252 253 return entries 254 255 256def _load_entries( 257 day_dir: str, 258 transcripts: bool, 259 percepts: bool, 260 agents: bool | dict[str, bool | str], 261) -> list[dict[str, Any]]: 262 """Load all transcript entries from a day directory.""" 263 from think.utils import segment_parse 264 265 date_str = _date_str(day_dir) 266 entries: list[dict[str, Any]] = [] 267 day_path_obj = Path(day_dir) 268 269 from think.utils import iter_segments 270 271 for _stream, _seg_key, seg_path in iter_segments(day_path_obj): 272 start_time, _ = segment_parse(seg_path.name) 273 if not start_time: 274 continue 275 entries.extend( 276 _process_segment(seg_path, date_str, transcripts, percepts, agents) 277 ) 278 279 entries.sort(key=lambda e: e["timestamp"]) 280 return entries 281 282 283def _group_entries( 284 entries: list[dict[str, Any]], 285) -> dict[str, list[dict[str, Any]]]: 286 """Group entries by segment key. 287 288 Returns dict mapping segment_key to list of entries for that segment. 289 """ 290 grouped: dict[str, list[dict[str, Any]]] = defaultdict(list) 291 for e in entries: 292 grouped[e["segment_key"]].append(e) 293 return grouped 294 295 296def _count_by_source(entries: list[dict[str, Any]]) -> dict[str, int]: 297 """Count entries by source type (prefix). 298 299 Maps the internal prefix names to source config names: 300 - "transcript" -> "transcripts" 301 - "percept" -> "percepts" 302 - "agent_output" -> "agents" 303 304 Returns: 305 Dict with counts for each source type, e.g., {"transcripts": 2, "percepts": 1, "agents": 0} 306 """ 307 # Map internal prefix to source config name 308 prefix_to_source = { 309 "transcript": "transcripts", 310 "percept": "percepts", 311 "agent_output": "agents", 312 } 313 314 counts = Counter(prefix_to_source.get(e["prefix"], e["prefix"]) for e in entries) 315 316 # Ensure all standard sources are present (even if 0) 317 return { 318 "transcripts": counts.get("transcripts", 0), 319 "percepts": counts.get("percepts", 0), 320 "agents": counts.get("agents", 0), 321 } 322 323 324def _groups_to_markdown(groups: dict[str, list[dict[str, Any]]]) -> str: 325 """Render grouped entries as markdown with segment-based headers.""" 326 lines: list[str] = [] 327 328 # Sort by segment start time (entries within each group have same segment_start) 329 def sort_key(segment_key: str) -> datetime: 330 entries = groups[segment_key] 331 return entries[0]["segment_start"] if entries else datetime.min 332 333 for segment_key in sorted(groups.keys(), key=sort_key): 334 segment_entries = groups[segment_key] 335 if not segment_entries: 336 continue 337 338 # Use segment times from first entry (all entries in group share same segment) 339 segment_start = segment_entries[0]["segment_start"] 340 segment_end = segment_entries[0]["segment_end"] 341 lines.append( 342 f"## {segment_start.strftime('%Y-%m-%d %H:%M:%S')} - {segment_end.strftime('%H:%M:%S')}" 343 ) 344 lines.append("") 345 346 for entry in segment_entries: 347 if entry["prefix"] == "transcript": 348 header = _transcript_header(entry.get("stream"), entry.get("name", "")) 349 lines.append(f"### {header}") 350 lines.append(entry["content"].strip()) 351 lines.append("") 352 elif entry["prefix"] == "percept": 353 lines.append("### Screen Activity") 354 lines.append(entry["content"].strip()) 355 lines.append("") 356 elif entry["prefix"] == "agent_output": 357 output_name = entry.get("output_name", "output") 358 lines.append(f"### {output_name} summary") 359 lines.append(entry["content"].strip()) 360 lines.append("") 361 362 return "\n".join(lines) 363 364 365def _slots_to_ranges(slots: list[datetime]) -> list[tuple[str, str]]: 366 """Collapse 15-minute slots into start/end pairs. 367 368 Args: 369 slots: Sorted list of datetimes marking 15-minute interval starts. 370 371 Returns: 372 List of (start, end) time strings in ``HH:MM`` format representing 373 contiguous 15-minute ranges. 374 """ 375 376 ranges: list[tuple[str, str]] = [] 377 if not slots: 378 return ranges 379 380 start = slots[0] 381 prev = slots[0] 382 for current in slots[1:]: 383 if current - prev == timedelta(minutes=15): 384 prev = current 385 continue 386 ranges.append( 387 (start.strftime("%H:%M"), (prev + timedelta(minutes=15)).strftime("%H:%M")) 388 ) 389 start = prev = current 390 391 ranges.append( 392 (start.strftime("%H:%M"), (prev + timedelta(minutes=15)).strftime("%H:%M")) 393 ) 394 return ranges 395 396 397def _detect_content_types(seg_path: Path) -> list[str]: 398 """Detect content types present in a segment directory.""" 399 types = [] 400 if ( 401 (seg_path / "audio.jsonl").exists() 402 or any(seg_path.glob("*_audio.jsonl")) 403 or any(seg_path.glob("*_transcript.jsonl")) 404 or any(seg_path.glob("*_transcript.md")) 405 or (seg_path / "imported.md").exists() 406 ): 407 types.append("audio") 408 if (seg_path / "screen.jsonl").exists() or any(seg_path.glob("*_screen.jsonl")): 409 types.append("screen") 410 return types 411 412 413def scan_day( 414 day: str, 415) -> tuple[list[tuple[str, str]], list[tuple[str, str]], list[dict[str, Any]]]: 416 """Single-pass scan returning both range aggregation and segment list. 417 418 Combines the work of ``cluster_scan()`` and ``cluster_segments()`` 419 into one ``iter_segments()`` traversal. 420 421 Args: 422 day: Day folder in ``YYYYMMDD`` format. 423 424 Returns: 425 Tuple of (audio_ranges, screen_ranges, segments) where ranges are 426 ``(start, end)`` pairs in ``HH:MM`` format and segments is a list 427 of dicts with ``key``, ``start``, ``end``, ``types``, and ``stream``. 428 """ 429 from think.utils import iter_segments, segment_parse 430 431 day_dir = day_path(day, create=False) 432 if not day_dir.is_dir(): 433 return [], [], [] 434 435 date_str = _date_str(str(day_dir)) 436 day_date = datetime.strptime(date_str, "%Y%m%d").date() 437 transcript_slots: set[datetime] = set() 438 percept_slots: set[datetime] = set() 439 segments: list[dict[str, Any]] = [] 440 441 for stream_name, _, seg_path in iter_segments(day_dir): 442 start_time, end_time = segment_parse(seg_path.name) 443 444 types = _detect_content_types(seg_path) if start_time else [] 445 446 if start_time and types: 447 dt = datetime.combine(day_date, start_time) 448 slot = dt.replace( 449 minute=dt.minute - (dt.minute % 15), second=0, microsecond=0 450 ) 451 if "audio" in types: 452 transcript_slots.add(slot) 453 if "screen" in types: 454 percept_slots.add(slot) 455 456 if start_time and end_time and types: 457 segments.append( 458 { 459 "key": seg_path.name, 460 "start": start_time.strftime("%H:%M"), 461 "end": end_time.strftime("%H:%M"), 462 "types": types, 463 "stream": stream_name, 464 } 465 ) 466 467 audio_ranges = _slots_to_ranges(sorted(transcript_slots)) 468 screen_ranges = _slots_to_ranges(sorted(percept_slots)) 469 segments.sort(key=lambda s: s["start"]) 470 return audio_ranges, screen_ranges, segments 471 472 473def cluster_scan(day: str) -> tuple[list[tuple[str, str]], list[tuple[str, str]]]: 474 """Return 15-minute ranges with transcript and screen content for ``day``. 475 476 Args: 477 day: Day folder in ``YYYYMMDD`` format. 478 479 Returns: 480 Two lists containing ``(start, end)`` pairs (``HH:MM``) for transcript and 481 screen content respectively. 482 """ 483 484 audio_ranges, screen_ranges, _ = scan_day(day) 485 return audio_ranges, screen_ranges 486 487 488def cluster_segments(day: str) -> list[dict[str, Any]]: 489 """Return individual recording segments for a day with their content types. 490 491 Unlike ``cluster_scan()`` which collapses segments into 15-minute ranges, 492 this returns actual segment directories with their precise times. 493 494 Args: 495 day: Day folder in ``YYYYMMDD`` format. 496 497 Returns: 498 List of dicts with segment info: 499 - key: segment directory name (HHMMSS_LEN format) 500 - start: start time as HH:MM 501 - end: end time as HH:MM 502 - types: list of content types present ("audio", "screen", or both) 503 """ 504 _, _, segments = scan_day(day) 505 return segments 506 507 508def _find_segment_dir(day: str, segment: str, stream: str | None) -> Path | None: 509 """Locate a segment directory, optionally searching across streams. 510 511 Args: 512 day: Day in YYYYMMDD format 513 segment: Segment key in HHMMSS_LEN format 514 stream: Stream name. If None, searches all streams under the day. 515 516 Returns: 517 Path to the segment directory, or None if not found. 518 """ 519 from think.utils import segment_path as _segment_path 520 521 if stream: 522 path = _segment_path(day, segment, stream) 523 return path if path.is_dir() else None 524 525 # Search all streams for this segment 526 from think.utils import iter_segments 527 528 for _s, _k, seg_path in iter_segments(day): 529 if seg_path.name == segment: 530 return seg_path 531 return None 532 533 534def cluster( 535 day: str, 536 sources: dict[str, bool | str | dict], 537) -> tuple[str, dict[str, int]]: 538 """Return Markdown summary for one day's JSON files and counts by source. 539 540 Args: 541 day: Day in YYYYMMDD format 542 sources: Dict with keys "transcripts", "percepts", "agents". 543 Values can be bool, "required" string, or dict (for agents). 544 The "agents" source can be a dict for selective filtering, 545 e.g., {"entities": True, "meetings": "required"}. 546 547 Returns: 548 Tuple of (markdown, source_counts) where source_counts is a dict 549 with keys "transcripts", "percepts", "agents" mapping to entry counts. 550 """ 551 empty_counts = {"transcripts": 0, "screen": 0, "agents": 0} 552 553 day_dir = str(day_path(day)) 554 # day_path now ensures dir exists, but check anyway for safety 555 if not os.path.isdir(day_dir): 556 return f"Day folder not found: {day_dir}", empty_counts 557 558 entries = _load_entries( 559 day_dir, 560 transcripts=sources.get("transcripts", False), 561 percepts=sources.get("percepts", False), 562 agents=sources.get("agents", False), 563 ) 564 if not entries: 565 return ( 566 f"No transcript or screen files found for date {day} in {day_dir}.", 567 empty_counts, 568 ) 569 570 groups = _group_entries(entries) 571 markdown = _groups_to_markdown(groups) 572 return markdown, _count_by_source(entries) 573 574 575def cluster_period( 576 day: str, 577 segment: str, 578 sources: dict[str, bool | str | dict], 579 stream: str | None = None, 580) -> tuple[str, dict[str, int]]: 581 """Return Markdown summary for one segment's JSON files and counts by source. 582 583 Args: 584 day: Day in YYYYMMDD format 585 segment: Segment key in HHMMSS_LEN format (e.g., "163045_300") 586 sources: Dict with keys "transcripts", "percepts", "agents". 587 Values can be bool, "required" string, or dict (for agents). 588 stream: Stream name. If None, searches all streams for the segment. 589 590 Returns: 591 Tuple of (markdown, source_counts) where source_counts is a dict 592 with keys "transcripts", "percepts", "agents" mapping to entry counts. 593 """ 594 empty_counts = {"transcripts": 0, "screen": 0, "agents": 0} 595 596 segment_dir = _find_segment_dir(day, segment, stream) 597 598 if segment_dir is None or not segment_dir.is_dir(): 599 return f"Segment folder not found: {day}/{segment}", empty_counts 600 601 entries = _load_entries_from_segment( 602 str(segment_dir), 603 transcripts=sources.get("transcripts", False), 604 percepts=sources.get("percepts", False), 605 agents=sources.get("agents", False), 606 ) 607 if not entries: 608 return ( 609 f"No transcript or screen files found for segment {segment}", 610 empty_counts, 611 ) 612 613 groups = _group_entries(entries) 614 markdown = _groups_to_markdown(groups) 615 return markdown, _count_by_source(entries) 616 617 618def _load_entries_from_segment( 619 segment_dir: str, 620 transcripts: bool, 621 percepts: bool, 622 agents: bool | dict[str, bool | str], 623) -> list[dict[str, Any]]: 624 """Load entries from a single segment directory. 625 626 Args: 627 segment_dir: Path to segment directory (e.g., /path/to/20251109/163045_300) 628 transcripts: Whether to load transcript content (JSONL and markdown) 629 percepts: Whether to load raw screen data from *screen.jsonl files 630 agents: Whether to load agent output summaries from *.md files 631 632 Returns: 633 List of entry dicts with timestamp, prefix, content, etc. 634 """ 635 segment_path_obj = Path(segment_dir) 636 day_str = day_from_path(segment_path_obj) 637 if day_str is None: 638 raise ValueError(f"Cannot determine day from segment path: {segment_dir}") 639 date_str = day_str 640 entries = _process_segment( 641 segment_path_obj, date_str, transcripts, percepts, agents 642 ) 643 entries.sort(key=lambda e: e["timestamp"]) 644 return entries 645 646 647def cluster_span( 648 day: str, 649 span: list[str], 650 sources: dict[str, bool | str | dict], 651 stream: str | None = None, 652) -> tuple[str, dict[str, int]]: 653 """Return Markdown summary for a span of segments and counts by source. 654 655 A span is a list of sequential segment keys (e.g., from an import that created 656 multiple 5-minute segments from one audio file). 657 658 Validates all segments exist before processing; raises ValueError if any are missing. 659 660 Args: 661 day: Day in YYYYMMDD format 662 span: List of segment keys in HHMMSS_LEN format (e.g., ["163045_300", "170000_600"]) 663 sources: Dict with keys "transcripts", "percepts", "agents". 664 Values can be bool, "required" string, or dict (for agents). 665 stream: Stream name. If None, searches all streams for each segment. 666 667 Returns: 668 Tuple of (markdown, source_counts) where source_counts is a dict 669 with keys "transcripts", "percepts", "agents" mapping to entry counts. 670 671 Raises: 672 ValueError: If any segment directories are missing 673 """ 674 empty_counts = {"transcripts": 0, "screen": 0, "agents": 0} 675 676 # Validate all segments in span exist upfront (fail fast) 677 missing = [] 678 found_dirs: list[Path] = [] 679 for seg_key in span: 680 seg_dir = _find_segment_dir(day, seg_key, stream) 681 if seg_dir is None: 682 missing.append(seg_key) 683 else: 684 found_dirs.append(seg_dir) 685 686 if missing: 687 raise ValueError(f"Segment directories not found: {', '.join(missing)}") 688 689 # Load entries from all segments in span 690 entries: list[dict[str, Any]] = [] 691 for seg_dir in found_dirs: 692 segment_entries = _load_entries_from_segment( 693 str(seg_dir), 694 transcripts=sources.get("transcripts", False), 695 percepts=sources.get("percepts", False), 696 agents=sources.get("agents", False), 697 ) 698 entries.extend(segment_entries) 699 700 if not entries: 701 return ( 702 f"No transcript or screen files found in span: {', '.join(span)}", 703 empty_counts, 704 ) 705 706 # Sort all entries by timestamp, group, and render 707 entries.sort(key=lambda e: e["timestamp"]) 708 groups = _group_entries(entries) 709 markdown = _groups_to_markdown(groups) 710 return markdown, _count_by_source(entries) 711 712 713def _segments_overlap( 714 seg_start: datetime, seg_end: datetime, range_start: datetime, range_end: datetime 715) -> bool: 716 """Check if a segment overlaps with a time range. 717 718 Returns True if any part of the segment falls within the range. 719 """ 720 return seg_start < range_end and seg_end > range_start 721 722 723def cluster_range( 724 day: str, 725 start: str, 726 end: str, 727 sources: dict[str, bool | str | dict], 728) -> str: 729 """Return markdown for ``day`` limited to ``start``-``end`` (HHMMSS). 730 731 Includes any segment that overlaps with the requested time range, 732 even if only partially. 733 734 Args: 735 day: Day in YYYYMMDD format 736 start: Start time in HHMMSS format 737 end: End time in HHMMSS format 738 sources: Dict with keys "transcripts", "percepts", "agents". 739 Values can be bool, "required" string, or dict (for agents). 740 """ 741 day_dir = str(day_path(day)) 742 date_str = _date_str(day_dir) 743 start_dt = datetime.strptime(date_str + start, "%Y%m%d%H%M%S") 744 end_dt = datetime.strptime(date_str + end, "%Y%m%d%H%M%S") 745 746 entries = _load_entries( 747 day_dir, 748 transcripts=sources.get("transcripts", False), 749 percepts=sources.get("percepts", False), 750 agents=sources.get("agents", False), 751 ) 752 # Include segments that overlap with the requested range 753 entries = [ 754 e 755 for e in entries 756 if _segments_overlap(e["segment_start"], e["segment_end"], start_dt, end_dt) 757 ] 758 groups = _group_entries(entries) 759 return _groups_to_markdown(groups)