personal memory agent
at main 205 lines 6.6 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Transcript segmentation utilities using LLM analysis.""" 5 6from __future__ import annotations 7 8import json 9import logging 10from pathlib import Path 11from typing import List, Optional 12 13from .prompts import load_prompt 14 15 16def _load_json_prompt() -> str: 17 """Load the JSON system prompt.""" 18 return load_prompt("detect_transcript_json", base_dir=Path(__file__).parent).text 19 20 21def _load_segment_prompt() -> str: 22 """Load the system prompt for segment detection.""" 23 return load_prompt("detect_transcript_segment", base_dir=Path(__file__).parent).text 24 25 26def number_lines(text: str) -> tuple[str, List[str]]: 27 """Return text with prefixed line numbers and the original lines.""" 28 lines = text.splitlines() 29 numbered = "\n".join(f"{idx + 1}: {line}" for idx, line in enumerate(lines)) 30 return numbered, lines 31 32 33def parse_segment_boundaries(json_text: str, num_lines: int) -> List[dict]: 34 """Validate and return segment boundaries from ``json_text``. 35 36 Args: 37 json_text: JSON array of {"start_at": "HH:MM:SS", "line": N} objects 38 num_lines: Total number of lines in the transcript 39 40 Returns: 41 List of boundary dicts with "start_at" and "line" keys 42 """ 43 try: 44 data = json.loads(json_text) 45 except json.JSONDecodeError as exc: # pragma: no cover - network errors 46 logging.error("Failed to parse JSON response") 47 raise ValueError("invalid JSON") from exc 48 49 if not isinstance(data, list) or not data: 50 logging.error("JSON response is not a non-empty list") 51 raise ValueError("expected non-empty list") 52 53 boundaries: List[dict] = [] 54 last_line = 0 55 for item in data: 56 if not isinstance(item, dict): 57 logging.error(f"Invalid boundary type: {type(item)}") 58 raise ValueError("boundaries must be objects") 59 60 if "start_at" not in item or "line" not in item: 61 logging.error(f"Missing required fields in boundary: {item}") 62 raise ValueError("boundary must have 'start_at' and 'line' fields") 63 64 line = item["line"] 65 start_at = item["start_at"] 66 67 if ( 68 not isinstance(line, int) 69 or line <= last_line 70 or line < 1 71 or line > num_lines 72 ): 73 logging.error( 74 f"Invalid line number: {line} (last: {last_line}, max: {num_lines})" 75 ) 76 raise ValueError("invalid line number") 77 78 if not isinstance(start_at, str): 79 logging.error(f"Invalid start_at type: {type(start_at)}") 80 raise ValueError("start_at must be a string") 81 82 boundaries.append({"start_at": start_at, "line": line}) 83 last_line = line 84 85 logging.info(f"Successfully parsed {len(boundaries)} segment boundaries") 86 return boundaries 87 88 89def segments_from_boundaries( 90 lines: List[str], boundaries: List[dict] 91) -> List[tuple[str, str]]: 92 """Return transcript segments split at boundaries. 93 94 Args: 95 lines: Original transcript lines 96 boundaries: List of {"start_at": "HH:MM:SS", "line": N} dicts 97 98 Returns: 99 List of (start_at, text) tuples for each segment 100 """ 101 segments: List[tuple[str, str]] = [] 102 103 for idx, boundary in enumerate(boundaries): 104 start_at = boundary["start_at"] 105 start_line = boundary["line"] 106 107 # Determine end line (next boundary or end of file) 108 if idx + 1 < len(boundaries): 109 end_line = boundaries[idx + 1]["line"] 110 segment_lines = lines[start_line - 1 : end_line - 1] # noqa: E203 111 else: 112 segment_lines = lines[start_line - 1 :] # noqa: E203 113 114 text = "\n".join(segment_lines).strip() 115 segments.append((start_at, text)) 116 117 logging.info(f"Created {len(segments)} transcript segments") 118 return segments 119 120 121def detect_transcript_segment(text: str, start_time: str) -> List[tuple[str, str]]: 122 """Return transcript segments with absolute timestamps using LLM analysis. 123 124 Args: 125 text: The transcript text to segment 126 start_time: Absolute start time in HH:MM:SS format 127 128 Returns: 129 List of (start_at, text) tuples where start_at is absolute HH:MM:SS. 130 Returns empty list on LLM or parsing failure. 131 """ 132 numbered, lines = number_lines(text) 133 # Prepend START_TIME for the prompt 134 contents = f"START_TIME: {start_time}\n{numbered}" 135 logging.info(f"Starting transcript segmentation (start: {start_time})...") 136 137 from think.models import generate 138 139 try: 140 response_text = generate( 141 contents=contents, 142 context="observe.detect.segment", 143 temperature=0.3, 144 max_output_tokens=4096, 145 thinking_budget=8192, 146 system_instruction=_load_segment_prompt(), 147 json_output=True, 148 ) 149 150 logging.info(f"Received segmentation response: {response_text}") 151 boundaries = parse_segment_boundaries(response_text, len(lines)) 152 segments = segments_from_boundaries(lines, boundaries) 153 154 return segments 155 except (ValueError, json.JSONDecodeError) as e: 156 logging.error(f"Transcript segmentation failed: {e}") 157 return [] 158 159 160def detect_transcript_json(text: str, segment_start: str) -> Optional[list]: 161 """Return transcript ``text`` converted to JSON using LLM analysis. 162 163 Args: 164 text: The transcript segment text 165 segment_start: Absolute start time of this segment in HH:MM:SS format 166 167 Returns: 168 List of transcript entries with absolute timestamps 169 """ 170 logging.info( 171 f"Starting transcript JSON conversion (segment_start: {segment_start})..." 172 ) 173 174 # Prepend SEGMENT_START for the prompt 175 contents = f"SEGMENT_START: {segment_start}\n{text}" 176 177 from think.models import generate 178 179 response_text = generate( 180 contents=contents, 181 context="observe.detect.json", 182 temperature=0.3, 183 max_output_tokens=8192, 184 thinking_budget=8192, 185 system_instruction=_load_json_prompt(), 186 json_output=True, 187 ) 188 189 logging.info(f"Received JSON conversion response: {response_text[:100]}") 190 try: 191 result = json.loads(response_text) 192 logging.info("Successfully converted transcript to JSON") 193 return result 194 except json.JSONDecodeError: 195 logging.error("Failed to parse JSON response from LLM") 196 return None 197 198 199__all__ = [ 200 "detect_transcript_segment", 201 "detect_transcript_json", 202 "number_lines", 203 "parse_segment_boundaries", 204 "segments_from_boundaries", 205]