personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Self-contained fixtures for speakers app tests."""
5
6from __future__ import annotations
7
8import json
9from pathlib import Path
10
11import numpy as np
12import pytest
13
14from think.entities import entity_slug
15
16# Default stream name for test fixtures
17STREAM = "test"
18
19
20@pytest.fixture(autouse=True)
21def _skip_supervisor_check(monkeypatch):
22 """Allow app CLI tests to run without a live solstone supervisor."""
23 monkeypatch.setenv("SOL_SKIP_SUPERVISOR_CHECK", "1")
24
25
26@pytest.fixture
27def speakers_env(tmp_path, monkeypatch):
28 """Create a temporary journal environment for speaker tests.
29
30 Provides helpers to create:
31 - Day directories with sentence embeddings
32 - Journal-level entities with voiceprints
33
34 Usage:
35 def test_example(speakers_env):
36 env = speakers_env()
37 env.create_segment("20240101", "143022_300", ["mic_audio"])
38 env.create_entity("Alice Test")
39 # Now _SOLSTONE_JOURNAL_OVERRIDE is set and data exists
40 """
41
42 class SpeakersEnv:
43 def __init__(self, journal_path: Path):
44 self.journal = journal_path
45 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
46
47 def create_segment(
48 self,
49 day: str,
50 segment_key: str,
51 sources: list[str],
52 num_sentences: int = 5,
53 *,
54 stream: str | None = None,
55 embeddings: np.ndarray | None = None,
56 ) -> Path:
57 """Create a segment with sentence embeddings.
58
59 Creates both JSONL transcripts and NPZ embedding files.
60
61 Args:
62 day: Day string (YYYYMMDD)
63 segment_key: Segment key (HHMMSS_LEN)
64 sources: List of audio sources (e.g., ["mic_audio", "sys_audio"])
65 num_sentences: Number of sentences to create
66 """
67 segment_dir = self.journal / day / (stream or STREAM) / segment_key
68 segment_dir.mkdir(parents=True, exist_ok=True)
69
70 sentence_count = (
71 embeddings.shape[0] if embeddings is not None else num_sentences
72 )
73
74 for source in sources:
75 # Create JSONL transcript
76 jsonl_path = segment_dir / f"{source}.jsonl"
77 lines = [json.dumps({"raw": f"{source}.flac", "model": "medium.en"})]
78
79 # Parse segment_key to get base time (e.g., "143022_300" -> 14:30:22)
80 # This matches real transcriber output which uses absolute timestamps
81 time_part = segment_key.split("_")[0]
82 base_h = int(time_part[0:2])
83 base_m = int(time_part[2:4])
84 base_s = int(time_part[4:6])
85 base_seconds = base_h * 3600 + base_m * 60 + base_s
86
87 for i in range(sentence_count):
88 offset = i * 5 # 5 seconds per sentence
89 abs_seconds = base_seconds + offset
90 h = (abs_seconds // 3600) % 24
91 m = (abs_seconds % 3600) // 60
92 s = abs_seconds % 60
93 lines.append(
94 json.dumps(
95 {
96 "start": f"{h:02d}:{m:02d}:{s:02d}",
97 "text": f"This is sentence {i + 1}.",
98 }
99 )
100 )
101 jsonl_path.write_text("\n".join(lines) + "\n")
102
103 # Create NPZ embeddings
104 npz_path = segment_dir / f"{source}.npz"
105 if embeddings is None:
106 source_embeddings = np.random.randn(sentence_count, 256).astype(
107 np.float32
108 )
109 norms = np.linalg.norm(source_embeddings, axis=1, keepdims=True)
110 source_embeddings = source_embeddings / norms
111 else:
112 source_embeddings = embeddings.astype(np.float32)
113 statement_ids = np.arange(1, sentence_count + 1, dtype=np.int32)
114 np.savez_compressed(
115 npz_path,
116 embeddings=source_embeddings,
117 statement_ids=statement_ids,
118 )
119
120 # Create dummy audio file
121 audio_path = segment_dir / f"{source}.flac"
122 audio_path.write_bytes(b"") # Empty placeholder
123
124 return segment_dir
125
126 def create_embedding(self, vector: list[float] | None = None) -> np.ndarray:
127 """Create a normalized 256-dim embedding."""
128 if vector is None:
129 emb = np.random.randn(256).astype(np.float32)
130 else:
131 emb = np.array(vector + [0.0] * (256 - len(vector)), dtype=np.float32)
132 return emb / np.linalg.norm(emb)
133
134 def create_entity(
135 self,
136 name: str,
137 voiceprints: list[tuple[str, str, str, int]] | None = None,
138 is_principal: bool = False,
139 ) -> Path:
140 """Create a journal-level entity with optional voiceprint files.
141
142 Args:
143 name: Entity name
144 voiceprints: Optional list of (day, segment_key, source, sentence_id)
145 tuples for voiceprints
146 is_principal: If True, mark this entity as the principal (self)
147 """
148 # Create journal-level entity
149 entity_id = entity_slug(name)
150 journal_entity_dir = self.journal / "entities" / entity_id
151 journal_entity_dir.mkdir(parents=True, exist_ok=True)
152 journal_entity = {
153 "id": entity_id,
154 "name": name,
155 "type": "Person",
156 "created_at": 1700000000000,
157 }
158 if is_principal:
159 journal_entity["is_principal"] = True
160 with open(journal_entity_dir / "entity.json", "w", encoding="utf-8") as f:
161 json.dump(journal_entity, f)
162
163 # Create voiceprints.npz at journal level if specified
164 if voiceprints:
165 all_embeddings = []
166 all_metadata = []
167 for day, segment_key, source, sentence_id in voiceprints:
168 emb = self.create_embedding()
169 all_embeddings.append(emb)
170 metadata = {
171 "day": day,
172 "segment_key": segment_key,
173 "source": source,
174 "sentence_id": sentence_id,
175 "added_at": 1700000000000,
176 }
177 all_metadata.append(json.dumps(metadata))
178
179 np.savez_compressed(
180 journal_entity_dir / "voiceprints.npz",
181 embeddings=np.array(all_embeddings, dtype=np.float32),
182 metadata=np.array(all_metadata, dtype=str),
183 )
184
185 return journal_entity_dir
186
187 def create_speakers_json(
188 self, day: str, segment_key: str, speakers: list[str]
189 ) -> Path:
190 """Create a speakers.json file in a segment directory.
191
192 Args:
193 day: Day string (YYYYMMDD)
194 segment_key: Segment key (HHMMSS_LEN)
195 speakers: List of speaker names
196 """
197 agents_dir = self.journal / day / STREAM / segment_key / "talents"
198 agents_dir.mkdir(parents=True, exist_ok=True)
199
200 speakers_path = agents_dir / "speakers.json"
201 with open(speakers_path, "w", encoding="utf-8") as f:
202 json.dump(speakers, f)
203
204 return speakers_path
205
206 def create_speaker_labels(
207 self,
208 day: str,
209 segment_key: str,
210 labels: list[dict],
211 metadata: dict | None = None,
212 ) -> Path:
213 """Create a speaker_labels.json file in a segment directory.
214
215 Args:
216 day: Day string (YYYYMMDD)
217 segment_key: Segment key (HHMMSS_LEN)
218 labels: List of label dicts with sentence_id, speaker, confidence,
219 method
220 metadata: Optional extra metadata (owner_centroid_version,
221 voiceprint_versions)
222 """
223 agents_dir = self.journal / day / STREAM / segment_key / "talents"
224 agents_dir.mkdir(parents=True, exist_ok=True)
225
226 data = {"labels": labels}
227 if metadata:
228 data.update(metadata)
229 else:
230 data["owner_centroid_version"] = None
231 data["voiceprint_versions"] = {}
232
233 labels_path = agents_dir / "speaker_labels.json"
234 with open(labels_path, "w", encoding="utf-8") as f:
235 json.dump(data, f)
236
237 return labels_path
238
239 def create_speaker_corrections(
240 self,
241 day: str,
242 segment_key: str,
243 corrections: list[dict],
244 *,
245 stream: str | None = None,
246 ) -> Path:
247 """Create a speaker_corrections.json file in a segment directory.
248
249 Args:
250 day: Day string (YYYYMMDD)
251 segment_key: Segment key (HHMMSS_LEN)
252 corrections: List of correction dicts with sentence_id,
253 original_speaker, corrected_speaker, timestamp
254 stream: Optional stream name (defaults to STREAM)
255 """
256 agents_dir = (
257 self.journal / day / (stream or STREAM) / segment_key / "talents"
258 )
259 agents_dir.mkdir(parents=True, exist_ok=True)
260
261 data = {"corrections": corrections}
262 corrections_path = agents_dir / "speaker_corrections.json"
263 with open(corrections_path, "w", encoding="utf-8") as f:
264 json.dump(data, f)
265
266 return corrections_path
267
268 def create_facet_relationship(
269 self,
270 facet: str,
271 entity_id: str,
272 *,
273 description: str = "",
274 attached_at: int = 1700000000000,
275 updated_at: int | None = None,
276 last_seen: str | None = None,
277 observations: list[str] | None = None,
278 ) -> Path:
279 """Create a facet relationship for an entity.
280
281 Args:
282 facet: Facet name (e.g., "work", "personal")
283 entity_id: Entity ID (slug)
284 description: Relationship description
285 attached_at: When the relationship was created
286 updated_at: Last update timestamp
287 last_seen: Last seen day string (YYYYMMDD)
288 observations: Optional list of observation strings
289 """
290 rel_dir = self.journal / "facets" / facet / "entities" / entity_id
291 rel_dir.mkdir(parents=True, exist_ok=True)
292
293 relationship: dict = {
294 "entity_id": entity_id,
295 "attached_at": attached_at,
296 }
297 if description:
298 relationship["description"] = description
299 if updated_at is not None:
300 relationship["updated_at"] = updated_at
301 if last_seen is not None:
302 relationship["last_seen"] = last_seen
303
304 with open(rel_dir / "entity.json", "w", encoding="utf-8") as f:
305 json.dump(relationship, f, indent=2)
306
307 if observations:
308 with open(rel_dir / "observations.jsonl", "w", encoding="utf-8") as f:
309 for obs in observations:
310 f.write(
311 json.dumps({"content": obs, "observed_at": 1700000000000})
312 + "\n"
313 )
314
315 return rel_dir
316
317 def create_import_segment(
318 self,
319 day: str,
320 segment_key: str,
321 speakers: list[tuple[str, str]],
322 *,
323 stream: str = "import.granola",
324 embeddings: np.ndarray | None = None,
325 ) -> Path:
326 """Create an import segment with conversation_transcript and embeddings.
327
328 Creates both a conversation_transcript.jsonl (with speaker labels) and
329 imported_audio.{jsonl,npz,flac} (with aligned embeddings) in the
330 same segment directory.
331
332 Args:
333 day: Day string (YYYYMMDD)
334 segment_key: Segment key (HHMMSS_LEN)
335 speakers: List of (speaker_name, text) tuples for each sentence
336 stream: Import stream name (default: import.granola)
337 embeddings: Optional pre-built embeddings array (num_sentences x 256)
338 """
339 segment_dir = self.journal / day / stream / segment_key
340 segment_dir.mkdir(parents=True, exist_ok=True)
341
342 num_sentences = len(speakers)
343
344 time_part = segment_key.split("_")[0]
345 base_h = int(time_part[0:2])
346 base_m = int(time_part[2:4])
347 base_s = int(time_part[4:6])
348 base_seconds = base_h * 3600 + base_m * 60 + base_s
349
350 ct_lines = [
351 json.dumps({"imported": {"id": "test-import"}, "topics": "test"})
352 ]
353 for i, (speaker, text) in enumerate(speakers):
354 offset = i * 5
355 abs_seconds = base_seconds + offset
356 h = (abs_seconds // 3600) % 24
357 m = (abs_seconds % 3600) // 60
358 s = abs_seconds % 60
359 ct_lines.append(
360 json.dumps(
361 {
362 "start": f"{h:02d}:{m:02d}:{s:02d}",
363 "speaker": speaker,
364 "text": text,
365 "source": "import",
366 }
367 )
368 )
369 ct_path = segment_dir / "conversation_transcript.jsonl"
370 ct_path.write_text("\n".join(ct_lines) + "\n")
371
372 audio_lines = [
373 json.dumps({"raw": "imported_audio.flac", "model": "medium.en"})
374 ]
375 for i, (_speaker, text) in enumerate(speakers):
376 offset = i * 5
377 abs_seconds = base_seconds + offset
378 h = (abs_seconds // 3600) % 24
379 m = (abs_seconds % 3600) // 60
380 s = abs_seconds % 60
381 audio_lines.append(
382 json.dumps(
383 {
384 "start": f"{h:02d}:{m:02d}:{s:02d}",
385 "text": text,
386 }
387 )
388 )
389 audio_jsonl_path = segment_dir / "imported_audio.jsonl"
390 audio_jsonl_path.write_text("\n".join(audio_lines) + "\n")
391
392 if embeddings is None:
393 source_embeddings = np.random.randn(num_sentences, 256).astype(
394 np.float32
395 )
396 norms = np.linalg.norm(source_embeddings, axis=1, keepdims=True)
397 source_embeddings = source_embeddings / norms
398 else:
399 source_embeddings = embeddings.astype(np.float32)
400 statement_ids = np.arange(1, num_sentences + 1, dtype=np.int32)
401 np.savez_compressed(
402 segment_dir / "imported_audio.npz",
403 embeddings=source_embeddings,
404 statement_ids=statement_ids,
405 )
406
407 (segment_dir / "imported_audio.flac").write_bytes(b"")
408
409 return segment_dir
410
411 def _create():
412 return SpeakersEnv(tmp_path)
413
414 return _create