personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for observe/transfer.py - day archive export and import."""
5
6import json
7import tarfile
8from pathlib import Path
9from unittest.mock import patch
10
11import pytest
12
13
14class TestSegmentDeconfliction:
15 """Tests for segment deconfliction via find_available_segment."""
16
17 def test_find_available_segment_returns_original_if_free(self, tmp_path):
18 """Test find_available_segment returns original if available."""
19 from observe.utils import find_available_segment
20
21 # No existing segments
22 result = find_available_segment(tmp_path, "120000_300")
23 assert result == "120000_300"
24
25 def test_find_available_segment_finds_alternative(self, tmp_path):
26 """Test find_available_segment finds alternative when original taken."""
27 from observe.utils import find_available_segment
28
29 # Create existing segment
30 (tmp_path / "120000_300").mkdir()
31
32 result = find_available_segment(tmp_path, "120000_300")
33 assert result is not None
34 assert result != "120000_300"
35 # Should be a valid segment key format
36 assert "_" in result
37
38 def test_find_available_segment_returns_none_when_exhausted(self, tmp_path):
39 """Test find_available_segment returns None when all slots taken."""
40 from observe.utils import find_available_segment
41
42 # Create many segments around the target
43 for delta in range(-50, 51):
44 for dur_delta in range(-50, 51):
45 total_seconds = 12 * 3600 + delta
46 if 0 <= total_seconds < 86400:
47 h = total_seconds // 3600
48 m = (total_seconds % 3600) // 60
49 s = total_seconds % 60
50 dur = 300 + dur_delta
51 if dur > 0:
52 (tmp_path / f"{h:02d}{m:02d}{s:02d}_{dur}").mkdir(exist_ok=True)
53
54 # With so many slots filled, should eventually fail
55 result = find_available_segment(tmp_path, "120000_300", max_attempts=10)
56 # May or may not find one depending on random walk, but shouldn't crash
57 assert result is None or "_" in result
58
59
60class TestComputeSha256:
61 """Tests for SHA256 computation utilities."""
62
63 def test_compute_file_sha256(self, tmp_path):
64 """Test compute_file_sha256 returns correct hash."""
65 from observe.utils import compute_file_sha256
66
67 test_file = tmp_path / "test.txt"
68 test_file.write_bytes(b"hello world")
69
70 # Known SHA256 of "hello world"
71 expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
72 assert compute_file_sha256(test_file) == expected
73
74 def test_compute_bytes_sha256(self):
75 """Test compute_bytes_sha256 returns correct hash."""
76 from observe.utils import compute_bytes_sha256
77
78 # Known SHA256 of "hello world"
79 expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9"
80 assert compute_bytes_sha256(b"hello world") == expected
81
82
83class TestTransferExport:
84 """Tests for archive creation (export)."""
85
86 def test_create_archive_basic(self, tmp_path, monkeypatch):
87 """Test create_archive creates valid archive."""
88 from observe.transfer import create_archive
89
90 # Set up mock journal with day/stream/segment structure
91 journal_path = tmp_path / "journal"
92 day_dir = journal_path / "20250101"
93 segment_dir = day_dir / "default" / "120000_300"
94 segment_dir.mkdir(parents=True)
95
96 # Add test files to segment
97 (segment_dir / "audio.flac").write_bytes(b"fake audio data")
98 (segment_dir / "audio.jsonl").write_text('{"raw": "audio.flac"}\n')
99
100 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
101
102 # Clear cache
103 import think.utils
104
105 think.utils._journal_path_cache = None
106
107 output_path = tmp_path / "test.tgz"
108 result = create_archive("20250101", output_path)
109
110 assert result == output_path
111 assert output_path.exists()
112
113 # Verify archive contents
114 with tarfile.open(output_path, "r:gz") as tar:
115 names = tar.getnames()
116 assert "manifest.json" in names
117 assert "default/120000_300/audio.flac" in names
118 assert "default/120000_300/audio.jsonl" in names
119
120 # Verify manifest
121 manifest_file = tar.extractfile("manifest.json")
122 manifest = json.load(manifest_file)
123 assert manifest["version"] == 1
124 assert manifest["day"] == "20250101"
125 assert "default/120000_300" in manifest["segments"]
126
127 def test_create_archive_no_segments_error(self, tmp_path, monkeypatch):
128 """Test create_archive raises error for empty day."""
129 from observe.transfer import create_archive
130
131 journal_path = tmp_path / "journal"
132 day_dir = journal_path / "20250101"
133 day_dir.mkdir(parents=True)
134
135 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
136
137 import think.utils
138
139 think.utils._journal_path_cache = None
140
141 with pytest.raises(ValueError, match="No segments found"):
142 create_archive("20250101")
143
144 def test_create_archive_no_day_error(self, tmp_path, monkeypatch):
145 """Test create_archive raises error for missing day."""
146 from observe.transfer import create_archive
147
148 journal_path = tmp_path / "journal"
149 journal_path.mkdir(parents=True)
150
151 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
152
153 import think.utils
154
155 think.utils._journal_path_cache = None
156
157 with pytest.raises(ValueError, match="does not exist"):
158 create_archive("20250101")
159
160
161class TestTransferImport:
162 """Tests for archive import."""
163
164 def _create_test_archive(self, tmp_path, segments: dict) -> Path:
165 """Helper to create test archive."""
166 archive_path = tmp_path / "test.tgz"
167
168 manifest = {
169 "version": 1,
170 "day": "20250101",
171 "created_at": 1704067200000,
172 "host": "test-host",
173 "segments": {},
174 }
175
176 with tarfile.open(archive_path, "w:gz") as tar:
177 for segment, files in segments.items():
178 manifest["segments"][segment] = {"files": []}
179 for filename, content in files.items():
180 # Add to manifest
181 from observe.utils import compute_bytes_sha256
182
183 manifest["segments"][segment]["files"].append(
184 {
185 "name": filename,
186 "sha256": compute_bytes_sha256(content),
187 "size": len(content),
188 }
189 )
190
191 # Add file to archive
192 import io
193
194 info = tarfile.TarInfo(name=f"{segment}/{filename}")
195 info.size = len(content)
196 tar.addfile(info, io.BytesIO(content))
197
198 # Add manifest
199 import io
200
201 manifest_json = json.dumps(manifest).encode()
202 info = tarfile.TarInfo(name="manifest.json")
203 info.size = len(manifest_json)
204 tar.addfile(info, io.BytesIO(manifest_json))
205
206 return archive_path
207
208 def test_validate_archive_all_new(self, tmp_path, monkeypatch):
209 """Test validate_archive with no existing segments."""
210 from observe.transfer import validate_archive
211
212 # Create archive
213 archive_path = self._create_test_archive(
214 tmp_path,
215 {
216 "120000_300": {"audio.flac": b"audio data"},
217 "130000_300": {"audio.flac": b"more audio"},
218 },
219 )
220
221 # Set up empty journal
222 journal_path = tmp_path / "journal"
223 journal_path.mkdir()
224
225 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
226
227 import think.utils
228
229 think.utils._journal_path_cache = None
230
231 result = validate_archive(archive_path)
232
233 assert result["skip"] == []
234 assert len(result["import_as"]) == 2
235 assert result["import_as"]["120000_300"] == "120000_300"
236 assert result["import_as"]["130000_300"] == "130000_300"
237
238 def test_validate_archive_skip_matching(self, tmp_path, monkeypatch):
239 """Test validate_archive skips segments with matching hashes."""
240 from observe.transfer import validate_archive
241
242 # Create archive
243 content = b"audio data"
244 archive_path = self._create_test_archive(
245 tmp_path,
246 {"120000_300": {"audio.flac": content}},
247 )
248
249 # Set up journal with matching segment
250 journal_path = tmp_path / "journal"
251 segment_dir = journal_path / "20250101" / "120000_300"
252 segment_dir.mkdir(parents=True)
253 (segment_dir / "audio.flac").write_bytes(content)
254
255 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
256
257 import think.utils
258
259 think.utils._journal_path_cache = None
260
261 result = validate_archive(archive_path)
262
263 assert "120000_300" in result["skip"]
264 assert "120000_300" not in result["import_as"]
265
266 def test_validate_archive_deconflict_different(self, tmp_path, monkeypatch):
267 """Test validate_archive deconflicts segments with different content."""
268 from observe.transfer import validate_archive
269
270 # Create archive
271 archive_path = self._create_test_archive(
272 tmp_path,
273 {"120000_300": {"audio.flac": b"new audio data"}},
274 )
275
276 # Set up journal with different content in same segment
277 journal_path = tmp_path / "journal"
278 segment_dir = journal_path / "20250101" / "120000_300"
279 segment_dir.mkdir(parents=True)
280 (segment_dir / "audio.flac").write_bytes(b"existing different data")
281
282 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
283
284 import think.utils
285
286 think.utils._journal_path_cache = None
287
288 result = validate_archive(archive_path)
289
290 assert "120000_300" in result["deconflicted"]
291 assert result["import_as"]["120000_300"] != "120000_300"
292
293 def test_import_archive_basic(self, tmp_path, monkeypatch):
294 """Test import_archive extracts segments correctly."""
295 from observe.transfer import import_archive
296
297 # Create archive
298 audio_content = b"fake audio data"
299 jsonl_content = b'{"raw": "audio.flac"}\n'
300
301 archive_path = self._create_test_archive(
302 tmp_path,
303 {
304 "120000_300": {
305 "audio.flac": audio_content,
306 "audio.jsonl": jsonl_content,
307 }
308 },
309 )
310
311 # Set up empty journal
312 journal_path = tmp_path / "journal"
313 journal_path.mkdir()
314
315 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
316
317 import think.utils
318
319 think.utils._journal_path_cache = None
320
321 # Mock subprocess to avoid running real indexer
322 with patch("subprocess.run"):
323 result = import_archive(archive_path)
324
325 assert result["status"] == "imported"
326 assert "120000_300" in result["imported"]
327
328 # Verify files were extracted
329 segment_dir = journal_path / "20250101" / "120000_300"
330 assert segment_dir.exists()
331 assert (segment_dir / "audio.flac").read_bytes() == audio_content
332 assert (segment_dir / "audio.jsonl").read_bytes() == jsonl_content
333
334 def test_import_archive_dry_run(self, tmp_path, monkeypatch):
335 """Test import_archive dry run doesn't modify filesystem."""
336 from observe.transfer import import_archive
337
338 archive_path = self._create_test_archive(
339 tmp_path,
340 {"120000_300": {"audio.flac": b"audio data"}},
341 )
342
343 journal_path = tmp_path / "journal"
344 journal_path.mkdir()
345
346 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
347
348 import think.utils
349
350 think.utils._journal_path_cache = None
351
352 result = import_archive(archive_path, dry_run=True)
353
354 assert result["status"] == "dry_run"
355 # Directory should not be created
356 assert not (journal_path / "20250101").exists()
357
358 def test_import_archive_nothing_to_import(self, tmp_path, monkeypatch):
359 """Test import_archive when all segments already synced."""
360 from observe.transfer import import_archive
361
362 content = b"audio data"
363 archive_path = self._create_test_archive(
364 tmp_path,
365 {"120000_300": {"audio.flac": content}},
366 )
367
368 # Set up journal with matching content
369 journal_path = tmp_path / "journal"
370 segment_dir = journal_path / "20250101" / "120000_300"
371 segment_dir.mkdir(parents=True)
372 (segment_dir / "audio.flac").write_bytes(content)
373
374 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path))
375
376 import think.utils
377
378 think.utils._journal_path_cache = None
379
380 result = import_archive(archive_path)
381
382 assert result["status"] == "nothing_to_import"
383
384
385class TestManifestValidation:
386 """Tests for manifest reading and validation."""
387
388 def test_read_manifest_missing(self, tmp_path):
389 """Test error when manifest is missing from archive."""
390 from observe.transfer import _read_manifest
391
392 # Create archive without manifest
393 archive_path = tmp_path / "test.tgz"
394 with tarfile.open(archive_path, "w:gz") as tar:
395 import io
396
397 info = tarfile.TarInfo(name="some_file.txt")
398 info.size = 4
399 tar.addfile(info, io.BytesIO(b"test"))
400
401 with pytest.raises(ValueError, match="manifest.json not found"):
402 _read_manifest(archive_path)
403
404 def test_read_manifest_wrong_version(self, tmp_path):
405 """Test error when manifest has wrong version."""
406 from observe.transfer import _read_manifest
407
408 archive_path = tmp_path / "test.tgz"
409 with tarfile.open(archive_path, "w:gz") as tar:
410 import io
411
412 manifest = json.dumps({"version": 999, "day": "20250101", "segments": {}})
413 info = tarfile.TarInfo(name="manifest.json")
414 info.size = len(manifest)
415 tar.addfile(info, io.BytesIO(manifest.encode()))
416
417 with pytest.raises(ValueError, match="Unsupported manifest version"):
418 _read_manifest(archive_path)
419
420 def test_read_manifest_missing_fields(self, tmp_path):
421 """Test error when manifest has missing required fields."""
422 from observe.transfer import _read_manifest
423
424 archive_path = tmp_path / "test.tgz"
425 with tarfile.open(archive_path, "w:gz") as tar:
426 import io
427
428 manifest = json.dumps({"version": 1}) # Missing day and segments
429 info = tarfile.TarInfo(name="manifest.json")
430 info.size = len(manifest)
431 tar.addfile(info, io.BytesIO(manifest.encode()))
432
433 with pytest.raises(ValueError, match="missing required fields"):
434 _read_manifest(archive_path)