personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Speaker subsystem status aggregation."""
5
6from __future__ import annotations
7
8import json
9import logging
10from pathlib import Path
11from typing import Any
12
13from think.awareness import get_current
14from think.utils import day_dirs, get_journal
15
16logger = logging.getLogger(__name__)
17
18SECTIONS = ("embeddings", "owner", "speakers", "clusters", "imports", "attribution")
19
20
21def get_speakers_status(section: str | None = None) -> Any:
22 """Aggregate speaker subsystem status.
23
24 Args:
25 section: Optional section name to return. If None, returns all sections.
26
27 Returns:
28 Dict with all sections, or a single section's value if section is specified.
29 """
30 builders = {
31 "embeddings": _embeddings_section,
32 "owner": _owner_section,
33 "speakers": _speakers_section,
34 "clusters": _clusters_section,
35 "imports": _imports_section,
36 "attribution": _attribution_section,
37 }
38
39 if section:
40 builder = builders.get(section)
41 if builder is None:
42 return {
43 "error": f"Unknown section '{section}'. Valid: {', '.join(SECTIONS)}"
44 }
45 return builder()
46
47 return {name: builder() for name, builder in builders.items()}
48
49
50def _embeddings_section() -> dict[str, Any]:
51 from apps.speakers.routes import _scan_segment_embeddings
52
53 segments = 0
54 streams: dict[str, int] = {}
55 days_seen: set[str] = set()
56
57 for day in day_dirs().keys():
58 day_segments = _scan_segment_embeddings(day)
59 if day_segments:
60 days_seen.add(day)
61 for seg in day_segments:
62 segments += 1
63 stream = seg["stream"]
64 streams[stream] = streams.get(stream, 0) + 1
65
66 sorted_days = sorted(days_seen) if days_seen else []
67 return {
68 "segments": segments,
69 "streams": streams,
70 "days": len(sorted_days),
71 "date_range": [sorted_days[0], sorted_days[-1]] if sorted_days else None,
72 }
73
74
75def _owner_section() -> dict[str, Any]:
76 from apps.speakers.owner import load_owner_centroid
77
78 voiceprint = get_current().get("voiceprint", {})
79 status = voiceprint.get("status", "none")
80 result: dict[str, Any] = {"status": status}
81
82 if status == "candidate":
83 result["cluster_size"] = voiceprint.get("cluster_size")
84 result["detected_at"] = voiceprint.get("detected_at")
85 result["streams_represented"] = voiceprint.get("streams_represented")
86 result["recommendation"] = voiceprint.get("recommendation")
87 elif status == "no_cluster":
88 result["segments_checked"] = voiceprint.get("segments_checked")
89 result["attempted_at"] = voiceprint.get("attempted_at")
90
91 result["centroid_saved"] = load_owner_centroid() is not None
92 return result
93
94
95def _speakers_section() -> list[dict[str, Any]]:
96 from apps.speakers.routes import _load_entity_voiceprints_file
97 from think.entities.journal import load_journal_entity, scan_journal_entities
98
99 speakers = []
100 for entity_id in scan_journal_entities():
101 result = _load_entity_voiceprints_file(entity_id)
102 if result is None:
103 continue
104
105 embeddings, metadata_list = result
106 streams: set[str] = set()
107 segments: set[tuple[str, str]] = set()
108 for metadata in metadata_list:
109 if "stream" in metadata:
110 streams.add(metadata["stream"])
111 segments.add((metadata.get("day", ""), metadata.get("segment_key", "")))
112
113 entity = load_journal_entity(entity_id) or {}
114 speakers.append(
115 {
116 "entity_id": entity_id,
117 "name": entity.get("name", entity_id),
118 "embedding_count": len(embeddings),
119 "segment_count": len(segments),
120 "streams": sorted(streams),
121 }
122 )
123
124 return speakers
125
126
127def _clusters_section() -> dict[str, Any] | None:
128 cache_path = Path(get_journal()) / "awareness" / "discovery_clusters.json"
129 if not cache_path.exists():
130 return None
131 try:
132 data = json.loads(cache_path.read_text())
133 clusters = data.get("clusters", [])
134 return {
135 "cached_at": data.get("version"),
136 "count": len(clusters),
137 "clusters": clusters,
138 }
139 except Exception:
140 logger.warning("Failed to read discovery cache", exc_info=True)
141 return None
142
143
144def _imports_section() -> dict[str, Any]:
145 meetings = 0
146 screens = 0
147
148 for _day, day_abs in day_dirs().items():
149 day_dir = Path(day_abs)
150 if not day_dir.is_dir():
151 continue
152 for stream_dir in sorted(day_dir.iterdir()):
153 if not stream_dir.is_dir():
154 continue
155 for seg_dir in sorted(stream_dir.iterdir()):
156 if not seg_dir.is_dir():
157 continue
158 if (seg_dir / "meetings.md").exists():
159 meetings += 1
160 if (seg_dir / "screen.md").exists():
161 screens += 1
162
163 return {"meetings_files": meetings, "screen_files": screens}
164
165
166def _attribution_section() -> dict[str, Any]:
167 total_files = 0
168 total_labels = 0
169 by_confidence: dict[str, int] = {}
170 by_method: dict[str, int] = {}
171
172 for _day, day_abs in day_dirs().items():
173 day_dir = Path(day_abs)
174 if not day_dir.is_dir():
175 continue
176 for stream_dir in sorted(day_dir.iterdir()):
177 if not stream_dir.is_dir():
178 continue
179 for seg_dir in sorted(stream_dir.iterdir()):
180 if not seg_dir.is_dir():
181 continue
182 labels_file = seg_dir / "talents" / "speaker_labels.json"
183 if not labels_file.exists():
184 continue
185 try:
186 data = json.loads(labels_file.read_text())
187 except Exception:
188 continue
189 total_files += 1
190 for label in data.get("labels", []):
191 total_labels += 1
192 confidence = label.get("confidence", "unknown")
193 method = label.get("method", "unknown")
194 by_confidence[confidence] = by_confidence.get(confidence, 0) + 1
195 by_method[method] = by_method.get(method, 0) + 1
196
197 return {
198 "files": total_files,
199 "labels": total_labels,
200 "by_confidence": by_confidence,
201 "by_method": by_method,
202 }