personal memory agent
at main 202 lines 6.6 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Speaker subsystem status aggregation.""" 5 6from __future__ import annotations 7 8import json 9import logging 10from pathlib import Path 11from typing import Any 12 13from think.awareness import get_current 14from think.utils import day_dirs, get_journal 15 16logger = logging.getLogger(__name__) 17 18SECTIONS = ("embeddings", "owner", "speakers", "clusters", "imports", "attribution") 19 20 21def get_speakers_status(section: str | None = None) -> Any: 22 """Aggregate speaker subsystem status. 23 24 Args: 25 section: Optional section name to return. If None, returns all sections. 26 27 Returns: 28 Dict with all sections, or a single section's value if section is specified. 29 """ 30 builders = { 31 "embeddings": _embeddings_section, 32 "owner": _owner_section, 33 "speakers": _speakers_section, 34 "clusters": _clusters_section, 35 "imports": _imports_section, 36 "attribution": _attribution_section, 37 } 38 39 if section: 40 builder = builders.get(section) 41 if builder is None: 42 return { 43 "error": f"Unknown section '{section}'. Valid: {', '.join(SECTIONS)}" 44 } 45 return builder() 46 47 return {name: builder() for name, builder in builders.items()} 48 49 50def _embeddings_section() -> dict[str, Any]: 51 from apps.speakers.routes import _scan_segment_embeddings 52 53 segments = 0 54 streams: dict[str, int] = {} 55 days_seen: set[str] = set() 56 57 for day in day_dirs().keys(): 58 day_segments = _scan_segment_embeddings(day) 59 if day_segments: 60 days_seen.add(day) 61 for seg in day_segments: 62 segments += 1 63 stream = seg["stream"] 64 streams[stream] = streams.get(stream, 0) + 1 65 66 sorted_days = sorted(days_seen) if days_seen else [] 67 return { 68 "segments": segments, 69 "streams": streams, 70 "days": len(sorted_days), 71 "date_range": [sorted_days[0], sorted_days[-1]] if sorted_days else None, 72 } 73 74 75def _owner_section() -> dict[str, Any]: 76 from apps.speakers.owner import load_owner_centroid 77 78 voiceprint = get_current().get("voiceprint", {}) 79 status = voiceprint.get("status", "none") 80 result: dict[str, Any] = {"status": status} 81 82 if status == "candidate": 83 result["cluster_size"] = voiceprint.get("cluster_size") 84 result["detected_at"] = voiceprint.get("detected_at") 85 result["streams_represented"] = voiceprint.get("streams_represented") 86 result["recommendation"] = voiceprint.get("recommendation") 87 elif status == "no_cluster": 88 result["segments_checked"] = voiceprint.get("segments_checked") 89 result["attempted_at"] = voiceprint.get("attempted_at") 90 91 result["centroid_saved"] = load_owner_centroid() is not None 92 return result 93 94 95def _speakers_section() -> list[dict[str, Any]]: 96 from apps.speakers.routes import _load_entity_voiceprints_file 97 from think.entities.journal import load_journal_entity, scan_journal_entities 98 99 speakers = [] 100 for entity_id in scan_journal_entities(): 101 result = _load_entity_voiceprints_file(entity_id) 102 if result is None: 103 continue 104 105 embeddings, metadata_list = result 106 streams: set[str] = set() 107 segments: set[tuple[str, str]] = set() 108 for metadata in metadata_list: 109 if "stream" in metadata: 110 streams.add(metadata["stream"]) 111 segments.add((metadata.get("day", ""), metadata.get("segment_key", ""))) 112 113 entity = load_journal_entity(entity_id) or {} 114 speakers.append( 115 { 116 "entity_id": entity_id, 117 "name": entity.get("name", entity_id), 118 "embedding_count": len(embeddings), 119 "segment_count": len(segments), 120 "streams": sorted(streams), 121 } 122 ) 123 124 return speakers 125 126 127def _clusters_section() -> dict[str, Any] | None: 128 cache_path = Path(get_journal()) / "awareness" / "discovery_clusters.json" 129 if not cache_path.exists(): 130 return None 131 try: 132 data = json.loads(cache_path.read_text()) 133 clusters = data.get("clusters", []) 134 return { 135 "cached_at": data.get("version"), 136 "count": len(clusters), 137 "clusters": clusters, 138 } 139 except Exception: 140 logger.warning("Failed to read discovery cache", exc_info=True) 141 return None 142 143 144def _imports_section() -> dict[str, Any]: 145 meetings = 0 146 screens = 0 147 148 for _day, day_abs in day_dirs().items(): 149 day_dir = Path(day_abs) 150 if not day_dir.is_dir(): 151 continue 152 for stream_dir in sorted(day_dir.iterdir()): 153 if not stream_dir.is_dir(): 154 continue 155 for seg_dir in sorted(stream_dir.iterdir()): 156 if not seg_dir.is_dir(): 157 continue 158 if (seg_dir / "meetings.md").exists(): 159 meetings += 1 160 if (seg_dir / "screen.md").exists(): 161 screens += 1 162 163 return {"meetings_files": meetings, "screen_files": screens} 164 165 166def _attribution_section() -> dict[str, Any]: 167 total_files = 0 168 total_labels = 0 169 by_confidence: dict[str, int] = {} 170 by_method: dict[str, int] = {} 171 172 for _day, day_abs in day_dirs().items(): 173 day_dir = Path(day_abs) 174 if not day_dir.is_dir(): 175 continue 176 for stream_dir in sorted(day_dir.iterdir()): 177 if not stream_dir.is_dir(): 178 continue 179 for seg_dir in sorted(stream_dir.iterdir()): 180 if not seg_dir.is_dir(): 181 continue 182 labels_file = seg_dir / "talents" / "speaker_labels.json" 183 if not labels_file.exists(): 184 continue 185 try: 186 data = json.loads(labels_file.read_text()) 187 except Exception: 188 continue 189 total_files += 1 190 for label in data.get("labels", []): 191 total_labels += 1 192 confidence = label.get("confidence", "unknown") 193 method = label.get("method", "unknown") 194 by_confidence[confidence] = by_confidence.get(confidence, 0) + 1 195 by_method[method] = by_method.get(method, 0) + 1 196 197 return { 198 "files": total_files, 199 "labels": total_labels, 200 "by_confidence": by_confidence, 201 "by_method": by_method, 202 }