personal memory agent
at main 434 lines 15 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for observe/transfer.py - day archive export and import.""" 5 6import json 7import tarfile 8from pathlib import Path 9from unittest.mock import patch 10 11import pytest 12 13 14class TestSegmentDeconfliction: 15 """Tests for segment deconfliction via find_available_segment.""" 16 17 def test_find_available_segment_returns_original_if_free(self, tmp_path): 18 """Test find_available_segment returns original if available.""" 19 from observe.utils import find_available_segment 20 21 # No existing segments 22 result = find_available_segment(tmp_path, "120000_300") 23 assert result == "120000_300" 24 25 def test_find_available_segment_finds_alternative(self, tmp_path): 26 """Test find_available_segment finds alternative when original taken.""" 27 from observe.utils import find_available_segment 28 29 # Create existing segment 30 (tmp_path / "120000_300").mkdir() 31 32 result = find_available_segment(tmp_path, "120000_300") 33 assert result is not None 34 assert result != "120000_300" 35 # Should be a valid segment key format 36 assert "_" in result 37 38 def test_find_available_segment_returns_none_when_exhausted(self, tmp_path): 39 """Test find_available_segment returns None when all slots taken.""" 40 from observe.utils import find_available_segment 41 42 # Create many segments around the target 43 for delta in range(-50, 51): 44 for dur_delta in range(-50, 51): 45 total_seconds = 12 * 3600 + delta 46 if 0 <= total_seconds < 86400: 47 h = total_seconds // 3600 48 m = (total_seconds % 3600) // 60 49 s = total_seconds % 60 50 dur = 300 + dur_delta 51 if dur > 0: 52 (tmp_path / f"{h:02d}{m:02d}{s:02d}_{dur}").mkdir(exist_ok=True) 53 54 # With so many slots filled, should eventually fail 55 result = find_available_segment(tmp_path, "120000_300", max_attempts=10) 56 # May or may not find one depending on random walk, but shouldn't crash 57 assert result is None or "_" in result 58 59 60class TestComputeSha256: 61 """Tests for SHA256 computation utilities.""" 62 63 def test_compute_file_sha256(self, tmp_path): 64 """Test compute_file_sha256 returns correct hash.""" 65 from observe.utils import compute_file_sha256 66 67 test_file = tmp_path / "test.txt" 68 test_file.write_bytes(b"hello world") 69 70 # Known SHA256 of "hello world" 71 expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" 72 assert compute_file_sha256(test_file) == expected 73 74 def test_compute_bytes_sha256(self): 75 """Test compute_bytes_sha256 returns correct hash.""" 76 from observe.utils import compute_bytes_sha256 77 78 # Known SHA256 of "hello world" 79 expected = "b94d27b9934d3e08a52e52d7da7dabfac484efe37a5380ee9088f7ace2efcde9" 80 assert compute_bytes_sha256(b"hello world") == expected 81 82 83class TestTransferExport: 84 """Tests for archive creation (export).""" 85 86 def test_create_archive_basic(self, tmp_path, monkeypatch): 87 """Test create_archive creates valid archive.""" 88 from observe.transfer import create_archive 89 90 # Set up mock journal with day/stream/segment structure 91 journal_path = tmp_path / "journal" 92 day_dir = journal_path / "20250101" 93 segment_dir = day_dir / "default" / "120000_300" 94 segment_dir.mkdir(parents=True) 95 96 # Add test files to segment 97 (segment_dir / "audio.flac").write_bytes(b"fake audio data") 98 (segment_dir / "audio.jsonl").write_text('{"raw": "audio.flac"}\n') 99 100 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 101 102 # Clear cache 103 import think.utils 104 105 think.utils._journal_path_cache = None 106 107 output_path = tmp_path / "test.tgz" 108 result = create_archive("20250101", output_path) 109 110 assert result == output_path 111 assert output_path.exists() 112 113 # Verify archive contents 114 with tarfile.open(output_path, "r:gz") as tar: 115 names = tar.getnames() 116 assert "manifest.json" in names 117 assert "default/120000_300/audio.flac" in names 118 assert "default/120000_300/audio.jsonl" in names 119 120 # Verify manifest 121 manifest_file = tar.extractfile("manifest.json") 122 manifest = json.load(manifest_file) 123 assert manifest["version"] == 1 124 assert manifest["day"] == "20250101" 125 assert "default/120000_300" in manifest["segments"] 126 127 def test_create_archive_no_segments_error(self, tmp_path, monkeypatch): 128 """Test create_archive raises error for empty day.""" 129 from observe.transfer import create_archive 130 131 journal_path = tmp_path / "journal" 132 day_dir = journal_path / "20250101" 133 day_dir.mkdir(parents=True) 134 135 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 136 137 import think.utils 138 139 think.utils._journal_path_cache = None 140 141 with pytest.raises(ValueError, match="No segments found"): 142 create_archive("20250101") 143 144 def test_create_archive_no_day_error(self, tmp_path, monkeypatch): 145 """Test create_archive raises error for missing day.""" 146 from observe.transfer import create_archive 147 148 journal_path = tmp_path / "journal" 149 journal_path.mkdir(parents=True) 150 151 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 152 153 import think.utils 154 155 think.utils._journal_path_cache = None 156 157 with pytest.raises(ValueError, match="does not exist"): 158 create_archive("20250101") 159 160 161class TestTransferImport: 162 """Tests for archive import.""" 163 164 def _create_test_archive(self, tmp_path, segments: dict) -> Path: 165 """Helper to create test archive.""" 166 archive_path = tmp_path / "test.tgz" 167 168 manifest = { 169 "version": 1, 170 "day": "20250101", 171 "created_at": 1704067200000, 172 "host": "test-host", 173 "segments": {}, 174 } 175 176 with tarfile.open(archive_path, "w:gz") as tar: 177 for segment, files in segments.items(): 178 manifest["segments"][segment] = {"files": []} 179 for filename, content in files.items(): 180 # Add to manifest 181 from observe.utils import compute_bytes_sha256 182 183 manifest["segments"][segment]["files"].append( 184 { 185 "name": filename, 186 "sha256": compute_bytes_sha256(content), 187 "size": len(content), 188 } 189 ) 190 191 # Add file to archive 192 import io 193 194 info = tarfile.TarInfo(name=f"{segment}/{filename}") 195 info.size = len(content) 196 tar.addfile(info, io.BytesIO(content)) 197 198 # Add manifest 199 import io 200 201 manifest_json = json.dumps(manifest).encode() 202 info = tarfile.TarInfo(name="manifest.json") 203 info.size = len(manifest_json) 204 tar.addfile(info, io.BytesIO(manifest_json)) 205 206 return archive_path 207 208 def test_validate_archive_all_new(self, tmp_path, monkeypatch): 209 """Test validate_archive with no existing segments.""" 210 from observe.transfer import validate_archive 211 212 # Create archive 213 archive_path = self._create_test_archive( 214 tmp_path, 215 { 216 "120000_300": {"audio.flac": b"audio data"}, 217 "130000_300": {"audio.flac": b"more audio"}, 218 }, 219 ) 220 221 # Set up empty journal 222 journal_path = tmp_path / "journal" 223 journal_path.mkdir() 224 225 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 226 227 import think.utils 228 229 think.utils._journal_path_cache = None 230 231 result = validate_archive(archive_path) 232 233 assert result["skip"] == [] 234 assert len(result["import_as"]) == 2 235 assert result["import_as"]["120000_300"] == "120000_300" 236 assert result["import_as"]["130000_300"] == "130000_300" 237 238 def test_validate_archive_skip_matching(self, tmp_path, monkeypatch): 239 """Test validate_archive skips segments with matching hashes.""" 240 from observe.transfer import validate_archive 241 242 # Create archive 243 content = b"audio data" 244 archive_path = self._create_test_archive( 245 tmp_path, 246 {"120000_300": {"audio.flac": content}}, 247 ) 248 249 # Set up journal with matching segment 250 journal_path = tmp_path / "journal" 251 segment_dir = journal_path / "20250101" / "120000_300" 252 segment_dir.mkdir(parents=True) 253 (segment_dir / "audio.flac").write_bytes(content) 254 255 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 256 257 import think.utils 258 259 think.utils._journal_path_cache = None 260 261 result = validate_archive(archive_path) 262 263 assert "120000_300" in result["skip"] 264 assert "120000_300" not in result["import_as"] 265 266 def test_validate_archive_deconflict_different(self, tmp_path, monkeypatch): 267 """Test validate_archive deconflicts segments with different content.""" 268 from observe.transfer import validate_archive 269 270 # Create archive 271 archive_path = self._create_test_archive( 272 tmp_path, 273 {"120000_300": {"audio.flac": b"new audio data"}}, 274 ) 275 276 # Set up journal with different content in same segment 277 journal_path = tmp_path / "journal" 278 segment_dir = journal_path / "20250101" / "120000_300" 279 segment_dir.mkdir(parents=True) 280 (segment_dir / "audio.flac").write_bytes(b"existing different data") 281 282 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 283 284 import think.utils 285 286 think.utils._journal_path_cache = None 287 288 result = validate_archive(archive_path) 289 290 assert "120000_300" in result["deconflicted"] 291 assert result["import_as"]["120000_300"] != "120000_300" 292 293 def test_import_archive_basic(self, tmp_path, monkeypatch): 294 """Test import_archive extracts segments correctly.""" 295 from observe.transfer import import_archive 296 297 # Create archive 298 audio_content = b"fake audio data" 299 jsonl_content = b'{"raw": "audio.flac"}\n' 300 301 archive_path = self._create_test_archive( 302 tmp_path, 303 { 304 "120000_300": { 305 "audio.flac": audio_content, 306 "audio.jsonl": jsonl_content, 307 } 308 }, 309 ) 310 311 # Set up empty journal 312 journal_path = tmp_path / "journal" 313 journal_path.mkdir() 314 315 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 316 317 import think.utils 318 319 think.utils._journal_path_cache = None 320 321 # Mock subprocess to avoid running real indexer 322 with patch("subprocess.run"): 323 result = import_archive(archive_path) 324 325 assert result["status"] == "imported" 326 assert "120000_300" in result["imported"] 327 328 # Verify files were extracted 329 segment_dir = journal_path / "20250101" / "120000_300" 330 assert segment_dir.exists() 331 assert (segment_dir / "audio.flac").read_bytes() == audio_content 332 assert (segment_dir / "audio.jsonl").read_bytes() == jsonl_content 333 334 def test_import_archive_dry_run(self, tmp_path, monkeypatch): 335 """Test import_archive dry run doesn't modify filesystem.""" 336 from observe.transfer import import_archive 337 338 archive_path = self._create_test_archive( 339 tmp_path, 340 {"120000_300": {"audio.flac": b"audio data"}}, 341 ) 342 343 journal_path = tmp_path / "journal" 344 journal_path.mkdir() 345 346 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 347 348 import think.utils 349 350 think.utils._journal_path_cache = None 351 352 result = import_archive(archive_path, dry_run=True) 353 354 assert result["status"] == "dry_run" 355 # Directory should not be created 356 assert not (journal_path / "20250101").exists() 357 358 def test_import_archive_nothing_to_import(self, tmp_path, monkeypatch): 359 """Test import_archive when all segments already synced.""" 360 from observe.transfer import import_archive 361 362 content = b"audio data" 363 archive_path = self._create_test_archive( 364 tmp_path, 365 {"120000_300": {"audio.flac": content}}, 366 ) 367 368 # Set up journal with matching content 369 journal_path = tmp_path / "journal" 370 segment_dir = journal_path / "20250101" / "120000_300" 371 segment_dir.mkdir(parents=True) 372 (segment_dir / "audio.flac").write_bytes(content) 373 374 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal_path)) 375 376 import think.utils 377 378 think.utils._journal_path_cache = None 379 380 result = import_archive(archive_path) 381 382 assert result["status"] == "nothing_to_import" 383 384 385class TestManifestValidation: 386 """Tests for manifest reading and validation.""" 387 388 def test_read_manifest_missing(self, tmp_path): 389 """Test error when manifest is missing from archive.""" 390 from observe.transfer import _read_manifest 391 392 # Create archive without manifest 393 archive_path = tmp_path / "test.tgz" 394 with tarfile.open(archive_path, "w:gz") as tar: 395 import io 396 397 info = tarfile.TarInfo(name="some_file.txt") 398 info.size = 4 399 tar.addfile(info, io.BytesIO(b"test")) 400 401 with pytest.raises(ValueError, match="manifest.json not found"): 402 _read_manifest(archive_path) 403 404 def test_read_manifest_wrong_version(self, tmp_path): 405 """Test error when manifest has wrong version.""" 406 from observe.transfer import _read_manifest 407 408 archive_path = tmp_path / "test.tgz" 409 with tarfile.open(archive_path, "w:gz") as tar: 410 import io 411 412 manifest = json.dumps({"version": 999, "day": "20250101", "segments": {}}) 413 info = tarfile.TarInfo(name="manifest.json") 414 info.size = len(manifest) 415 tar.addfile(info, io.BytesIO(manifest.encode())) 416 417 with pytest.raises(ValueError, match="Unsupported manifest version"): 418 _read_manifest(archive_path) 419 420 def test_read_manifest_missing_fields(self, tmp_path): 421 """Test error when manifest has missing required fields.""" 422 from observe.transfer import _read_manifest 423 424 archive_path = tmp_path / "test.tgz" 425 with tarfile.open(archive_path, "w:gz") as tar: 426 import io 427 428 manifest = json.dumps({"version": 1}) # Missing day and segments 429 info = tarfile.TarInfo(name="manifest.json") 430 info.size = len(manifest) 431 tar.addfile(info, io.BytesIO(manifest.encode())) 432 433 with pytest.raises(ValueError, match="missing required fields"): 434 _read_manifest(archive_path)