personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Shared utilities for the observer app.
5
6Provides common helpers for observer metadata management and sync history
7that are used by both routes.py and events.py.
8"""
9
10from __future__ import annotations
11
12import json
13import logging
14import os
15from pathlib import Path
16
17from apps.utils import get_app_storage_path
18
19logger = logging.getLogger(__name__)
20
21
22def get_observers_dir() -> Path:
23 """Get the observers storage directory."""
24 return get_app_storage_path("observer", "observers", ensure_exists=True)
25
26
27def get_hist_dir(key_prefix: str, ensure_exists: bool = True) -> Path:
28 """Get the history directory for an observer.
29
30 Args:
31 key_prefix: First 8 chars of observer key
32 ensure_exists: Create directory if it doesn't exist (default: True)
33
34 Returns:
35 Path to apps/observer/observers/<key_prefix>/hist/
36 """
37 return get_app_storage_path(
38 "observer", "observers", key_prefix, "hist", ensure_exists=ensure_exists
39 )
40
41
42def load_observer(key: str) -> dict | None:
43 """Load observer metadata by key.
44
45 Args:
46 key: Full observer authentication key
47
48 Returns:
49 Observer metadata dict if found and key matches, None otherwise
50 """
51 observers_dir = get_observers_dir()
52 observer_path = observers_dir / f"{key[:8]}.json"
53 if not observer_path.exists():
54 return None
55 try:
56 with open(observer_path) as f:
57 data = json.load(f)
58 # Verify full key matches
59 if data.get("key") != key:
60 return None
61 return data
62 except (json.JSONDecodeError, OSError):
63 return None
64
65
66def save_observer(data: dict) -> bool:
67 """Save observer metadata.
68
69 Args:
70 data: Observer metadata dict (must contain 'key' field)
71
72 Returns:
73 True if saved successfully, False otherwise
74 """
75 key = data.get("key")
76 if not key:
77 return False
78 observers_dir = get_observers_dir()
79 observer_path = observers_dir / f"{key[:8]}.json"
80 try:
81 with open(observer_path, "w") as f:
82 json.dump(data, f, indent=2)
83 os.chmod(observer_path, 0o600)
84 return True
85 except OSError:
86 return False
87
88
89def list_observers() -> list[dict]:
90 """List all registered observers.
91
92 Returns:
93 List of observer metadata dicts, sorted by created_at descending
94 """
95 observers_dir = get_observers_dir()
96 observers = []
97 for observer_path in observers_dir.glob("*.json"):
98 try:
99 with open(observer_path) as f:
100 data = json.load(f)
101 observers.append(data)
102 except (json.JSONDecodeError, OSError):
103 continue
104 observers.sort(key=lambda x: x.get("created_at", 0), reverse=True)
105 return observers
106
107
108def find_observer_by_name(name: str) -> dict | None:
109 """Find observer metadata by name.
110
111 Args:
112 name: Observer name to search for
113
114 Returns:
115 Observer metadata dict if found, None otherwise
116 """
117 for observer in list_observers():
118 if observer.get("name") == name:
119 return observer
120 return None
121
122
123def append_history_record(key_prefix: str, day: str, record: dict) -> None:
124 """Append a record to the sync history file.
125
126 Args:
127 key_prefix: First 8 chars of observer key
128 day: Day string (YYYYMMDD)
129 record: Record to append (will be JSON-serialized)
130 """
131 hist_dir = get_hist_dir(key_prefix)
132 hist_path = hist_dir / f"{day}.jsonl"
133 with open(hist_path, "a", encoding="utf-8") as f:
134 f.write(json.dumps(record, ensure_ascii=False) + "\n")
135
136
137def load_history(key_prefix: str, day: str) -> list[dict]:
138 """Load sync history for an observer on a given day.
139
140 Args:
141 key_prefix: First 8 chars of observer key
142 day: Day string (YYYYMMDD)
143
144 Returns:
145 List of history records, empty if file doesn't exist
146 """
147 hist_dir = get_hist_dir(key_prefix, ensure_exists=False)
148 hist_path = hist_dir / f"{day}.jsonl"
149 if not hist_path.exists():
150 return []
151
152 records = []
153 try:
154 with open(hist_path, encoding="utf-8") as f:
155 for line in f:
156 line = line.strip()
157 if line:
158 records.append(json.loads(line))
159 except (json.JSONDecodeError, OSError) as e:
160 logger.warning(f"Failed to load sync history {hist_path}: {e}")
161 return records
162
163
164def increment_stat(key_prefix: str, stat_name: str) -> None:
165 """Increment a stat counter for an observer.
166
167 Args:
168 key_prefix: First 8 chars of observer key
169 stat_name: Name of the stat to increment (e.g., 'segments_observed')
170 """
171 observers_dir = get_observers_dir()
172 observer_path = observers_dir / f"{key_prefix}.json"
173 if not observer_path.exists():
174 return
175
176 try:
177 with open(observer_path) as f:
178 data = json.load(f)
179
180 data["stats"][stat_name] = data["stats"].get(stat_name, 0) + 1
181
182 with open(observer_path, "w") as f:
183 json.dump(data, f, indent=2)
184 os.chmod(observer_path, 0o600)
185 except (json.JSONDecodeError, OSError, KeyError) as e:
186 logger.warning(f"Failed to update {stat_name} for {key_prefix}: {e}")
187
188
189def find_segment_by_sha256(
190 key_prefix: str, day: str, file_sha256s: set[str]
191) -> tuple[str | None, set[str]]:
192 """Find existing segment with matching file SHA256 signatures.
193
194 Searches history records for the given day to find a segment where
195 all provided SHA256 hashes match existing files.
196
197 Args:
198 key_prefix: First 8 chars of observer key
199 day: Day string (YYYYMMDD)
200 file_sha256s: Set of SHA256 hashes to match
201
202 Returns:
203 Tuple of (segment_key, matched_sha256s):
204 - If full match: (segment_key, all sha256s)
205 - If partial match: (None, set of matching sha256s)
206 - If no match: (None, empty set)
207 """
208 records = load_history(key_prefix, day)
209 if not records:
210 return None, set()
211
212 # Build map of sha256 -> segment for all upload records
213 sha256_to_segment: dict[str, str] = {}
214 segment_sha256s: dict[str, set[str]] = {}
215
216 for record in records:
217 # Skip non-upload records (e.g., "observed" type)
218 if record.get("type"):
219 continue
220
221 segment = record.get("segment", "")
222 if not segment:
223 continue
224
225 if segment not in segment_sha256s:
226 segment_sha256s[segment] = set()
227
228 for file_rec in record.get("files", []):
229 sha256 = file_rec.get("sha256", "")
230 if sha256:
231 sha256_to_segment[sha256] = segment
232 segment_sha256s[segment].add(sha256)
233
234 # Check for full match - all incoming sha256s exist in a single segment
235 for segment, existing_sha256s in segment_sha256s.items():
236 if file_sha256s and file_sha256s.issubset(existing_sha256s):
237 return segment, file_sha256s
238
239 # Check for partial match - some sha256s already exist
240 matched = file_sha256s & set(sha256_to_segment.keys())
241 return None, matched