personal memory agent
at main 414 lines 16 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Self-contained fixtures for speakers app tests.""" 5 6from __future__ import annotations 7 8import json 9from pathlib import Path 10 11import numpy as np 12import pytest 13 14from think.entities import entity_slug 15 16# Default stream name for test fixtures 17STREAM = "test" 18 19 20@pytest.fixture(autouse=True) 21def _skip_supervisor_check(monkeypatch): 22 """Allow app CLI tests to run without a live solstone supervisor.""" 23 monkeypatch.setenv("SOL_SKIP_SUPERVISOR_CHECK", "1") 24 25 26@pytest.fixture 27def speakers_env(tmp_path, monkeypatch): 28 """Create a temporary journal environment for speaker tests. 29 30 Provides helpers to create: 31 - Day directories with sentence embeddings 32 - Journal-level entities with voiceprints 33 34 Usage: 35 def test_example(speakers_env): 36 env = speakers_env() 37 env.create_segment("20240101", "143022_300", ["mic_audio"]) 38 env.create_entity("Alice Test") 39 # Now _SOLSTONE_JOURNAL_OVERRIDE is set and data exists 40 """ 41 42 class SpeakersEnv: 43 def __init__(self, journal_path: Path): 44 self.journal = journal_path 45 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 46 47 def create_segment( 48 self, 49 day: str, 50 segment_key: str, 51 sources: list[str], 52 num_sentences: int = 5, 53 *, 54 stream: str | None = None, 55 embeddings: np.ndarray | None = None, 56 ) -> Path: 57 """Create a segment with sentence embeddings. 58 59 Creates both JSONL transcripts and NPZ embedding files. 60 61 Args: 62 day: Day string (YYYYMMDD) 63 segment_key: Segment key (HHMMSS_LEN) 64 sources: List of audio sources (e.g., ["mic_audio", "sys_audio"]) 65 num_sentences: Number of sentences to create 66 """ 67 segment_dir = self.journal / day / (stream or STREAM) / segment_key 68 segment_dir.mkdir(parents=True, exist_ok=True) 69 70 sentence_count = ( 71 embeddings.shape[0] if embeddings is not None else num_sentences 72 ) 73 74 for source in sources: 75 # Create JSONL transcript 76 jsonl_path = segment_dir / f"{source}.jsonl" 77 lines = [json.dumps({"raw": f"{source}.flac", "model": "medium.en"})] 78 79 # Parse segment_key to get base time (e.g., "143022_300" -> 14:30:22) 80 # This matches real transcriber output which uses absolute timestamps 81 time_part = segment_key.split("_")[0] 82 base_h = int(time_part[0:2]) 83 base_m = int(time_part[2:4]) 84 base_s = int(time_part[4:6]) 85 base_seconds = base_h * 3600 + base_m * 60 + base_s 86 87 for i in range(sentence_count): 88 offset = i * 5 # 5 seconds per sentence 89 abs_seconds = base_seconds + offset 90 h = (abs_seconds // 3600) % 24 91 m = (abs_seconds % 3600) // 60 92 s = abs_seconds % 60 93 lines.append( 94 json.dumps( 95 { 96 "start": f"{h:02d}:{m:02d}:{s:02d}", 97 "text": f"This is sentence {i + 1}.", 98 } 99 ) 100 ) 101 jsonl_path.write_text("\n".join(lines) + "\n") 102 103 # Create NPZ embeddings 104 npz_path = segment_dir / f"{source}.npz" 105 if embeddings is None: 106 source_embeddings = np.random.randn(sentence_count, 256).astype( 107 np.float32 108 ) 109 norms = np.linalg.norm(source_embeddings, axis=1, keepdims=True) 110 source_embeddings = source_embeddings / norms 111 else: 112 source_embeddings = embeddings.astype(np.float32) 113 statement_ids = np.arange(1, sentence_count + 1, dtype=np.int32) 114 np.savez_compressed( 115 npz_path, 116 embeddings=source_embeddings, 117 statement_ids=statement_ids, 118 ) 119 120 # Create dummy audio file 121 audio_path = segment_dir / f"{source}.flac" 122 audio_path.write_bytes(b"") # Empty placeholder 123 124 return segment_dir 125 126 def create_embedding(self, vector: list[float] | None = None) -> np.ndarray: 127 """Create a normalized 256-dim embedding.""" 128 if vector is None: 129 emb = np.random.randn(256).astype(np.float32) 130 else: 131 emb = np.array(vector + [0.0] * (256 - len(vector)), dtype=np.float32) 132 return emb / np.linalg.norm(emb) 133 134 def create_entity( 135 self, 136 name: str, 137 voiceprints: list[tuple[str, str, str, int]] | None = None, 138 is_principal: bool = False, 139 ) -> Path: 140 """Create a journal-level entity with optional voiceprint files. 141 142 Args: 143 name: Entity name 144 voiceprints: Optional list of (day, segment_key, source, sentence_id) 145 tuples for voiceprints 146 is_principal: If True, mark this entity as the principal (self) 147 """ 148 # Create journal-level entity 149 entity_id = entity_slug(name) 150 journal_entity_dir = self.journal / "entities" / entity_id 151 journal_entity_dir.mkdir(parents=True, exist_ok=True) 152 journal_entity = { 153 "id": entity_id, 154 "name": name, 155 "type": "Person", 156 "created_at": 1700000000000, 157 } 158 if is_principal: 159 journal_entity["is_principal"] = True 160 with open(journal_entity_dir / "entity.json", "w", encoding="utf-8") as f: 161 json.dump(journal_entity, f) 162 163 # Create voiceprints.npz at journal level if specified 164 if voiceprints: 165 all_embeddings = [] 166 all_metadata = [] 167 for day, segment_key, source, sentence_id in voiceprints: 168 emb = self.create_embedding() 169 all_embeddings.append(emb) 170 metadata = { 171 "day": day, 172 "segment_key": segment_key, 173 "source": source, 174 "sentence_id": sentence_id, 175 "added_at": 1700000000000, 176 } 177 all_metadata.append(json.dumps(metadata)) 178 179 np.savez_compressed( 180 journal_entity_dir / "voiceprints.npz", 181 embeddings=np.array(all_embeddings, dtype=np.float32), 182 metadata=np.array(all_metadata, dtype=str), 183 ) 184 185 return journal_entity_dir 186 187 def create_speakers_json( 188 self, day: str, segment_key: str, speakers: list[str] 189 ) -> Path: 190 """Create a speakers.json file in a segment directory. 191 192 Args: 193 day: Day string (YYYYMMDD) 194 segment_key: Segment key (HHMMSS_LEN) 195 speakers: List of speaker names 196 """ 197 agents_dir = self.journal / day / STREAM / segment_key / "talents" 198 agents_dir.mkdir(parents=True, exist_ok=True) 199 200 speakers_path = agents_dir / "speakers.json" 201 with open(speakers_path, "w", encoding="utf-8") as f: 202 json.dump(speakers, f) 203 204 return speakers_path 205 206 def create_speaker_labels( 207 self, 208 day: str, 209 segment_key: str, 210 labels: list[dict], 211 metadata: dict | None = None, 212 ) -> Path: 213 """Create a speaker_labels.json file in a segment directory. 214 215 Args: 216 day: Day string (YYYYMMDD) 217 segment_key: Segment key (HHMMSS_LEN) 218 labels: List of label dicts with sentence_id, speaker, confidence, 219 method 220 metadata: Optional extra metadata (owner_centroid_version, 221 voiceprint_versions) 222 """ 223 agents_dir = self.journal / day / STREAM / segment_key / "talents" 224 agents_dir.mkdir(parents=True, exist_ok=True) 225 226 data = {"labels": labels} 227 if metadata: 228 data.update(metadata) 229 else: 230 data["owner_centroid_version"] = None 231 data["voiceprint_versions"] = {} 232 233 labels_path = agents_dir / "speaker_labels.json" 234 with open(labels_path, "w", encoding="utf-8") as f: 235 json.dump(data, f) 236 237 return labels_path 238 239 def create_speaker_corrections( 240 self, 241 day: str, 242 segment_key: str, 243 corrections: list[dict], 244 *, 245 stream: str | None = None, 246 ) -> Path: 247 """Create a speaker_corrections.json file in a segment directory. 248 249 Args: 250 day: Day string (YYYYMMDD) 251 segment_key: Segment key (HHMMSS_LEN) 252 corrections: List of correction dicts with sentence_id, 253 original_speaker, corrected_speaker, timestamp 254 stream: Optional stream name (defaults to STREAM) 255 """ 256 agents_dir = ( 257 self.journal / day / (stream or STREAM) / segment_key / "talents" 258 ) 259 agents_dir.mkdir(parents=True, exist_ok=True) 260 261 data = {"corrections": corrections} 262 corrections_path = agents_dir / "speaker_corrections.json" 263 with open(corrections_path, "w", encoding="utf-8") as f: 264 json.dump(data, f) 265 266 return corrections_path 267 268 def create_facet_relationship( 269 self, 270 facet: str, 271 entity_id: str, 272 *, 273 description: str = "", 274 attached_at: int = 1700000000000, 275 updated_at: int | None = None, 276 last_seen: str | None = None, 277 observations: list[str] | None = None, 278 ) -> Path: 279 """Create a facet relationship for an entity. 280 281 Args: 282 facet: Facet name (e.g., "work", "personal") 283 entity_id: Entity ID (slug) 284 description: Relationship description 285 attached_at: When the relationship was created 286 updated_at: Last update timestamp 287 last_seen: Last seen day string (YYYYMMDD) 288 observations: Optional list of observation strings 289 """ 290 rel_dir = self.journal / "facets" / facet / "entities" / entity_id 291 rel_dir.mkdir(parents=True, exist_ok=True) 292 293 relationship: dict = { 294 "entity_id": entity_id, 295 "attached_at": attached_at, 296 } 297 if description: 298 relationship["description"] = description 299 if updated_at is not None: 300 relationship["updated_at"] = updated_at 301 if last_seen is not None: 302 relationship["last_seen"] = last_seen 303 304 with open(rel_dir / "entity.json", "w", encoding="utf-8") as f: 305 json.dump(relationship, f, indent=2) 306 307 if observations: 308 with open(rel_dir / "observations.jsonl", "w", encoding="utf-8") as f: 309 for obs in observations: 310 f.write( 311 json.dumps({"content": obs, "observed_at": 1700000000000}) 312 + "\n" 313 ) 314 315 return rel_dir 316 317 def create_import_segment( 318 self, 319 day: str, 320 segment_key: str, 321 speakers: list[tuple[str, str]], 322 *, 323 stream: str = "import.granola", 324 embeddings: np.ndarray | None = None, 325 ) -> Path: 326 """Create an import segment with conversation_transcript and embeddings. 327 328 Creates both a conversation_transcript.jsonl (with speaker labels) and 329 imported_audio.{jsonl,npz,flac} (with aligned embeddings) in the 330 same segment directory. 331 332 Args: 333 day: Day string (YYYYMMDD) 334 segment_key: Segment key (HHMMSS_LEN) 335 speakers: List of (speaker_name, text) tuples for each sentence 336 stream: Import stream name (default: import.granola) 337 embeddings: Optional pre-built embeddings array (num_sentences x 256) 338 """ 339 segment_dir = self.journal / day / stream / segment_key 340 segment_dir.mkdir(parents=True, exist_ok=True) 341 342 num_sentences = len(speakers) 343 344 time_part = segment_key.split("_")[0] 345 base_h = int(time_part[0:2]) 346 base_m = int(time_part[2:4]) 347 base_s = int(time_part[4:6]) 348 base_seconds = base_h * 3600 + base_m * 60 + base_s 349 350 ct_lines = [ 351 json.dumps({"imported": {"id": "test-import"}, "topics": "test"}) 352 ] 353 for i, (speaker, text) in enumerate(speakers): 354 offset = i * 5 355 abs_seconds = base_seconds + offset 356 h = (abs_seconds // 3600) % 24 357 m = (abs_seconds % 3600) // 60 358 s = abs_seconds % 60 359 ct_lines.append( 360 json.dumps( 361 { 362 "start": f"{h:02d}:{m:02d}:{s:02d}", 363 "speaker": speaker, 364 "text": text, 365 "source": "import", 366 } 367 ) 368 ) 369 ct_path = segment_dir / "conversation_transcript.jsonl" 370 ct_path.write_text("\n".join(ct_lines) + "\n") 371 372 audio_lines = [ 373 json.dumps({"raw": "imported_audio.flac", "model": "medium.en"}) 374 ] 375 for i, (_speaker, text) in enumerate(speakers): 376 offset = i * 5 377 abs_seconds = base_seconds + offset 378 h = (abs_seconds // 3600) % 24 379 m = (abs_seconds % 3600) // 60 380 s = abs_seconds % 60 381 audio_lines.append( 382 json.dumps( 383 { 384 "start": f"{h:02d}:{m:02d}:{s:02d}", 385 "text": text, 386 } 387 ) 388 ) 389 audio_jsonl_path = segment_dir / "imported_audio.jsonl" 390 audio_jsonl_path.write_text("\n".join(audio_lines) + "\n") 391 392 if embeddings is None: 393 source_embeddings = np.random.randn(num_sentences, 256).astype( 394 np.float32 395 ) 396 norms = np.linalg.norm(source_embeddings, axis=1, keepdims=True) 397 source_embeddings = source_embeddings / norms 398 else: 399 source_embeddings = embeddings.astype(np.float32) 400 statement_ids = np.arange(1, num_sentences + 1, dtype=np.int32) 401 np.savez_compressed( 402 segment_dir / "imported_audio.npz", 403 embeddings=source_embeddings, 404 statement_ids=statement_ids, 405 ) 406 407 (segment_dir / "imported_audio.flac").write_bytes(b"") 408 409 return segment_dir 410 411 def _create(): 412 return SpeakersEnv(tmp_path) 413 414 return _create