personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Transcript segmentation utilities using LLM analysis."""
5
6from __future__ import annotations
7
8import json
9import logging
10from pathlib import Path
11from typing import List, Optional
12
13from .prompts import load_prompt
14
15
16def _load_json_prompt() -> str:
17 """Load the JSON system prompt."""
18 return load_prompt("detect_transcript_json", base_dir=Path(__file__).parent).text
19
20
21def _load_segment_prompt() -> str:
22 """Load the system prompt for segment detection."""
23 return load_prompt("detect_transcript_segment", base_dir=Path(__file__).parent).text
24
25
26def number_lines(text: str) -> tuple[str, List[str]]:
27 """Return text with prefixed line numbers and the original lines."""
28 lines = text.splitlines()
29 numbered = "\n".join(f"{idx + 1}: {line}" for idx, line in enumerate(lines))
30 return numbered, lines
31
32
33def parse_segment_boundaries(json_text: str, num_lines: int) -> List[dict]:
34 """Validate and return segment boundaries from ``json_text``.
35
36 Args:
37 json_text: JSON array of {"start_at": "HH:MM:SS", "line": N} objects
38 num_lines: Total number of lines in the transcript
39
40 Returns:
41 List of boundary dicts with "start_at" and "line" keys
42 """
43 try:
44 data = json.loads(json_text)
45 except json.JSONDecodeError as exc: # pragma: no cover - network errors
46 logging.error("Failed to parse JSON response")
47 raise ValueError("invalid JSON") from exc
48
49 if not isinstance(data, list) or not data:
50 logging.error("JSON response is not a non-empty list")
51 raise ValueError("expected non-empty list")
52
53 boundaries: List[dict] = []
54 last_line = 0
55 for item in data:
56 if not isinstance(item, dict):
57 logging.error(f"Invalid boundary type: {type(item)}")
58 raise ValueError("boundaries must be objects")
59
60 if "start_at" not in item or "line" not in item:
61 logging.error(f"Missing required fields in boundary: {item}")
62 raise ValueError("boundary must have 'start_at' and 'line' fields")
63
64 line = item["line"]
65 start_at = item["start_at"]
66
67 if (
68 not isinstance(line, int)
69 or line <= last_line
70 or line < 1
71 or line > num_lines
72 ):
73 logging.error(
74 f"Invalid line number: {line} (last: {last_line}, max: {num_lines})"
75 )
76 raise ValueError("invalid line number")
77
78 if not isinstance(start_at, str):
79 logging.error(f"Invalid start_at type: {type(start_at)}")
80 raise ValueError("start_at must be a string")
81
82 boundaries.append({"start_at": start_at, "line": line})
83 last_line = line
84
85 logging.info(f"Successfully parsed {len(boundaries)} segment boundaries")
86 return boundaries
87
88
89def segments_from_boundaries(
90 lines: List[str], boundaries: List[dict]
91) -> List[tuple[str, str]]:
92 """Return transcript segments split at boundaries.
93
94 Args:
95 lines: Original transcript lines
96 boundaries: List of {"start_at": "HH:MM:SS", "line": N} dicts
97
98 Returns:
99 List of (start_at, text) tuples for each segment
100 """
101 segments: List[tuple[str, str]] = []
102
103 for idx, boundary in enumerate(boundaries):
104 start_at = boundary["start_at"]
105 start_line = boundary["line"]
106
107 # Determine end line (next boundary or end of file)
108 if idx + 1 < len(boundaries):
109 end_line = boundaries[idx + 1]["line"]
110 segment_lines = lines[start_line - 1 : end_line - 1] # noqa: E203
111 else:
112 segment_lines = lines[start_line - 1 :] # noqa: E203
113
114 text = "\n".join(segment_lines).strip()
115 segments.append((start_at, text))
116
117 logging.info(f"Created {len(segments)} transcript segments")
118 return segments
119
120
121def detect_transcript_segment(text: str, start_time: str) -> List[tuple[str, str]]:
122 """Return transcript segments with absolute timestamps using LLM analysis.
123
124 Args:
125 text: The transcript text to segment
126 start_time: Absolute start time in HH:MM:SS format
127
128 Returns:
129 List of (start_at, text) tuples where start_at is absolute HH:MM:SS.
130 Returns empty list on LLM or parsing failure.
131 """
132 numbered, lines = number_lines(text)
133 # Prepend START_TIME for the prompt
134 contents = f"START_TIME: {start_time}\n{numbered}"
135 logging.info(f"Starting transcript segmentation (start: {start_time})...")
136
137 from think.models import generate
138
139 try:
140 response_text = generate(
141 contents=contents,
142 context="observe.detect.segment",
143 temperature=0.3,
144 max_output_tokens=4096,
145 thinking_budget=8192,
146 system_instruction=_load_segment_prompt(),
147 json_output=True,
148 )
149
150 logging.info(f"Received segmentation response: {response_text}")
151 boundaries = parse_segment_boundaries(response_text, len(lines))
152 segments = segments_from_boundaries(lines, boundaries)
153
154 return segments
155 except (ValueError, json.JSONDecodeError) as e:
156 logging.error(f"Transcript segmentation failed: {e}")
157 return []
158
159
160def detect_transcript_json(text: str, segment_start: str) -> Optional[list]:
161 """Return transcript ``text`` converted to JSON using LLM analysis.
162
163 Args:
164 text: The transcript segment text
165 segment_start: Absolute start time of this segment in HH:MM:SS format
166
167 Returns:
168 List of transcript entries with absolute timestamps
169 """
170 logging.info(
171 f"Starting transcript JSON conversion (segment_start: {segment_start})..."
172 )
173
174 # Prepend SEGMENT_START for the prompt
175 contents = f"SEGMENT_START: {segment_start}\n{text}"
176
177 from think.models import generate
178
179 response_text = generate(
180 contents=contents,
181 context="observe.detect.json",
182 temperature=0.3,
183 max_output_tokens=8192,
184 thinking_budget=8192,
185 system_instruction=_load_json_prompt(),
186 json_output=True,
187 )
188
189 logging.info(f"Received JSON conversion response: {response_text[:100]}")
190 try:
191 result = json.loads(response_text)
192 logging.info("Successfully converted transcript to JSON")
193 return result
194 except json.JSONDecodeError:
195 logging.error("Failed to parse JSON response from LLM")
196 return None
197
198
199__all__ = [
200 "detect_transcript_segment",
201 "detect_transcript_json",
202 "number_lines",
203 "parse_segment_boundaries",
204 "segments_from_boundaries",
205]