personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for observer app utilities."""
5
6from __future__ import annotations
7
8import json
9
10import pytest
11
12from apps.observer.utils import (
13 append_history_record,
14 find_observer_by_name,
15 find_segment_by_sha256,
16 get_hist_dir,
17 get_observers_dir,
18 increment_stat,
19 list_observers,
20 load_history,
21 load_observer,
22 save_observer,
23)
24
25
26@pytest.fixture
27def storage_env(tmp_path, monkeypatch):
28 """Create a temporary journal environment for storage tests."""
29 from convey import state
30
31 journal = tmp_path / "journal"
32 journal.mkdir()
33 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
34 monkeypatch.setattr(state, "journal_root", str(journal))
35
36 # Create observers directory
37 observers_dir = journal / "apps" / "observer" / "observers"
38 observers_dir.mkdir(parents=True)
39
40 class Env:
41 def __init__(self):
42 self.journal = journal
43 self.observers_dir = observers_dir
44
45 return Env()
46
47
48class TestObserverStorage:
49 """Tests for observer metadata storage."""
50
51 def test_get_observers_dir_creates_directory(self, storage_env):
52 """get_observers_dir creates and returns observers directory."""
53 result = get_observers_dir()
54 assert result.exists()
55 assert result == storage_env.observers_dir
56
57 def test_save_and_load_observer(self, storage_env):
58 """save_observer and load_observer work together."""
59 observer = {
60 "key": "testkey123456789",
61 "name": "test-observer",
62 "stats": {"segments_received": 0},
63 }
64
65 assert save_observer(observer) is True
66
67 loaded = load_observer("testkey123456789")
68 assert loaded is not None
69 assert loaded["name"] == "test-observer"
70
71 def test_load_observer_wrong_key(self, storage_env):
72 """load_observer returns None for wrong key."""
73 observer = {
74 "key": "testkey123456789",
75 "name": "test-observer",
76 "stats": {},
77 }
78 save_observer(observer)
79
80 # Same prefix but different key
81 result = load_observer("testkey1xxxxxxxx")
82 assert result is None
83
84 def test_load_observer_not_found(self, storage_env):
85 """load_observer returns None when observer doesn't exist."""
86 result = load_observer("nonexistent12345")
87 assert result is None
88
89 def test_list_observers_empty(self, storage_env):
90 """list_observers returns empty list when no observers."""
91 result = list_observers()
92 assert result == []
93
94 def test_list_observers_returns_all(self, storage_env):
95 """list_observers returns all registered observers."""
96 for i in range(3):
97 save_observer(
98 {
99 "key": f"obs{i:05d}123456789",
100 "name": f"observer-{i}",
101 "created_at": 1000 + i,
102 "stats": {},
103 }
104 )
105
106 result = list_observers()
107 assert len(result) == 3
108 # Sorted by created_at descending
109 assert result[0]["name"] == "observer-2"
110 assert result[1]["name"] == "observer-1"
111 assert result[2]["name"] == "observer-0"
112
113 def test_find_observer_by_name(self, storage_env):
114 """find_observer_by_name finds existing observer."""
115 save_observer(
116 {
117 "key": "findme123456789",
118 "name": "find-me",
119 "stats": {},
120 }
121 )
122
123 result = find_observer_by_name("find-me")
124 assert result is not None
125 assert result["key"] == "findme123456789"
126
127 def test_find_observer_by_name_not_found(self, storage_env):
128 """find_observer_by_name returns None for unknown name."""
129 result = find_observer_by_name("unknown")
130 assert result is None
131
132
133class TestHistoryStorage:
134 """Tests for sync history storage."""
135
136 def test_get_hist_dir_creates_directory(self, storage_env):
137 """get_hist_dir creates history directory."""
138 result = get_hist_dir("testkey1")
139 assert result.exists()
140 assert result == storage_env.observers_dir / "testkey1" / "hist"
141
142 def test_get_hist_dir_no_create(self, storage_env):
143 """get_hist_dir with ensure_exists=False doesn't create."""
144 result = get_hist_dir("nonexistent", ensure_exists=False)
145 assert not result.exists()
146
147 def test_append_history_record(self, storage_env):
148 """append_history_record creates and appends to JSONL file."""
149 append_history_record(
150 "testkey1", "20250103", {"type": "upload", "segment": "120000_300"}
151 )
152 append_history_record(
153 "testkey1", "20250103", {"type": "observed", "segment": "120000_300"}
154 )
155
156 hist_path = storage_env.observers_dir / "testkey1" / "hist" / "20250103.jsonl"
157 assert hist_path.exists()
158
159 with open(hist_path) as f:
160 lines = f.readlines()
161
162 assert len(lines) == 2
163 assert json.loads(lines[0])["type"] == "upload"
164 assert json.loads(lines[1])["type"] == "observed"
165
166 def test_load_history_empty(self, storage_env):
167 """load_history returns empty list when no history."""
168 result = load_history("testkey1", "20250103")
169 assert result == []
170
171 def test_load_history(self, storage_env):
172 """load_history returns all records."""
173 append_history_record("testkey1", "20250103", {"segment": "a"})
174 append_history_record("testkey1", "20250103", {"segment": "b"})
175
176 result = load_history("testkey1", "20250103")
177 assert len(result) == 2
178 assert result[0]["segment"] == "a"
179 assert result[1]["segment"] == "b"
180
181
182class TestIncrementStat:
183 """Tests for stat increment."""
184
185 def test_increment_stat_new_counter(self, storage_env):
186 """increment_stat creates new counter."""
187 save_observer(
188 {
189 "key": "testkey123456789",
190 "name": "test",
191 "stats": {},
192 }
193 )
194
195 increment_stat("testkey1", "segments_observed")
196
197 loaded = load_observer("testkey123456789")
198 assert loaded["stats"]["segments_observed"] == 1
199
200 def test_increment_stat_existing_counter(self, storage_env):
201 """increment_stat increments existing counter."""
202 save_observer(
203 {
204 "key": "testkey123456789",
205 "name": "test",
206 "stats": {"segments_observed": 5},
207 }
208 )
209
210 increment_stat("testkey1", "segments_observed")
211
212 loaded = load_observer("testkey123456789")
213 assert loaded["stats"]["segments_observed"] == 6
214
215 def test_increment_stat_missing_observer(self, storage_env):
216 """increment_stat handles missing observer gracefully."""
217 # Should not raise
218 increment_stat("nonexistent", "segments_observed")
219
220
221class TestFindSegmentBySha256:
222 """Tests for find_segment_by_sha256."""
223
224 def test_no_history_returns_no_match(self, storage_env):
225 """Returns (None, empty set) when no history exists."""
226 segment, matched = find_segment_by_sha256(
227 "testkey1", "20250103", {"sha256_abc"}
228 )
229 assert segment is None
230 assert matched == set()
231
232 def test_full_match_returns_segment(self, storage_env):
233 """Returns segment key when all SHA256s match."""
234 # Create history with segment upload
235 append_history_record(
236 "testkey1",
237 "20250103",
238 {
239 "segment": "120000_300",
240 "files": [
241 {"sha256": "sha256_aaa", "written": "audio.flac"},
242 {"sha256": "sha256_bbb", "written": "screen.mp4"},
243 ],
244 },
245 )
246
247 segment, matched = find_segment_by_sha256(
248 "testkey1", "20250103", {"sha256_aaa", "sha256_bbb"}
249 )
250 assert segment == "120000_300"
251 assert matched == {"sha256_aaa", "sha256_bbb"}
252
253 def test_partial_match_returns_matched_set(self, storage_env):
254 """Returns (None, matched set) when only some SHA256s match."""
255 append_history_record(
256 "testkey1",
257 "20250103",
258 {
259 "segment": "120000_300",
260 "files": [
261 {"sha256": "sha256_aaa", "written": "audio.flac"},
262 ],
263 },
264 )
265
266 # Request includes one matching and one new
267 segment, matched = find_segment_by_sha256(
268 "testkey1", "20250103", {"sha256_aaa", "sha256_new"}
269 )
270 assert segment is None
271 assert matched == {"sha256_aaa"}
272
273 def test_no_match_returns_empty_set(self, storage_env):
274 """Returns (None, empty set) when no SHA256s match."""
275 append_history_record(
276 "testkey1",
277 "20250103",
278 {
279 "segment": "120000_300",
280 "files": [
281 {"sha256": "sha256_aaa", "written": "audio.flac"},
282 ],
283 },
284 )
285
286 segment, matched = find_segment_by_sha256(
287 "testkey1", "20250103", {"sha256_xxx", "sha256_yyy"}
288 )
289 assert segment is None
290 assert matched == set()
291
292 def test_skips_observed_records(self, storage_env):
293 """Ignores records with type field (e.g., 'observed')."""
294 # Upload record
295 append_history_record(
296 "testkey1",
297 "20250103",
298 {
299 "segment": "120000_300",
300 "files": [
301 {"sha256": "sha256_aaa", "written": "audio.flac"},
302 ],
303 },
304 )
305 # Observed record
306 append_history_record(
307 "testkey1",
308 "20250103",
309 {
310 "type": "observed",
311 "segment": "120000_300",
312 },
313 )
314
315 segment, matched = find_segment_by_sha256(
316 "testkey1", "20250103", {"sha256_aaa"}
317 )
318 assert segment == "120000_300"
319 assert matched == {"sha256_aaa"}
320
321 def test_subset_match_returns_segment(self, storage_env):
322 """Returns segment when incoming is subset of existing files."""
323 # Segment has 3 files
324 append_history_record(
325 "testkey1",
326 "20250103",
327 {
328 "segment": "120000_300",
329 "files": [
330 {"sha256": "sha256_aaa", "written": "audio.flac"},
331 {"sha256": "sha256_bbb", "written": "screen.mp4"},
332 {"sha256": "sha256_ccc", "written": "audio.jsonl"},
333 ],
334 },
335 )
336
337 # Request only 2 of the 3 files (subset)
338 segment, matched = find_segment_by_sha256(
339 "testkey1", "20250103", {"sha256_aaa", "sha256_bbb"}
340 )
341 assert segment == "120000_300"
342 assert matched == {"sha256_aaa", "sha256_bbb"}