personal memory agent
at main 656 lines 24 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Tests for think.retention — media retention service.""" 5 6import hashlib 7import json 8import os 9import shutil 10from datetime import datetime 11 12from think.retention import ( 13 RetentionConfig, 14 RetentionPolicy, 15 StorageSummary, 16 _human_bytes, 17 check_storage_health, 18 get_raw_media_files, 19 is_raw_media, 20 is_segment_complete, 21 load_retention_config, 22 purge, 23) 24 25# --------------------------------------------------------------------------- 26# is_raw_media 27# --------------------------------------------------------------------------- 28 29 30class TestIsRawMedia: 31 def test_audio_extensions(self, tmp_path): 32 for ext in (".flac", ".opus", ".ogg", ".m4a", ".mp3", ".wav"): 33 p = tmp_path / f"audio{ext}" 34 p.touch() 35 assert is_raw_media(p), f"{ext} should be raw media" 36 37 def test_video_extensions(self, tmp_path): 38 for ext in (".webm", ".mov", ".mp4"): 39 p = tmp_path / f"screen{ext}" 40 p.touch() 41 assert is_raw_media(p), f"{ext} should be raw media" 42 43 def test_monitor_diff_png(self, tmp_path): 44 p = tmp_path / "monitor_1_diff.png" 45 p.touch() 46 assert is_raw_media(p) 47 48 p2 = tmp_path / "monitor_2_diff.png" 49 p2.touch() 50 assert is_raw_media(p2) 51 52 def test_not_raw_media(self, tmp_path): 53 for name in ( 54 "audio.jsonl", 55 "screen.jsonl", 56 "stream.json", 57 "speaker_labels.json", 58 "audio.npz", 59 "summary.md", 60 "regular.png", 61 ): 62 p = tmp_path / name 63 p.touch() 64 assert not is_raw_media(p), f"{name} should NOT be raw media" 65 66 67# --------------------------------------------------------------------------- 68# get_raw_media_files 69# --------------------------------------------------------------------------- 70 71 72class TestGetRawMediaFiles: 73 def test_returns_only_raw(self, tmp_path): 74 (tmp_path / "audio.flac").write_bytes(b"x" * 100) 75 (tmp_path / "screen.webm").write_bytes(b"x" * 200) 76 (tmp_path / "audio.jsonl").write_text("transcript") 77 (tmp_path / "stream.json").write_text("{}") 78 79 raw = get_raw_media_files(tmp_path) 80 names = {f.name for f in raw} 81 assert names == {"audio.flac", "screen.webm"} 82 83 def test_empty_dir(self, tmp_path): 84 assert get_raw_media_files(tmp_path) == [] 85 86 def test_nonexistent_dir(self, tmp_path): 87 assert get_raw_media_files(tmp_path / "nope") == [] 88 89 90# --------------------------------------------------------------------------- 91# is_segment_complete 92# --------------------------------------------------------------------------- 93 94 95def _make_segment( 96 tmp_path, 97 *, 98 audio=False, 99 video=False, 100 video_name="screen.webm", 101 embeddings=False, 102 audio_extract=True, 103 screen_extract=True, 104 speaker_labels=True, 105 active_agents=False, 106): 107 """Create a segment directory with specified contents.""" 108 seg = tmp_path / "segment" 109 seg.mkdir(exist_ok=True) 110 agents_dir = seg / "agents" 111 agents_dir.mkdir(exist_ok=True) 112 113 if audio: 114 (seg / "audio.flac").write_bytes(b"audio") 115 if video: 116 (seg / video_name).write_bytes(b"video") 117 if embeddings: 118 (seg / "audio.npz").write_bytes(b"npz") 119 if audio and audio_extract: 120 (seg / "audio.jsonl").write_text('{"raw":"audio.flac"}\n') 121 if video and screen_extract: 122 (seg / "screen.jsonl").write_text('{"raw":"screen.webm"}\n') 123 if embeddings and speaker_labels: 124 (agents_dir / "speaker_labels.json").write_text("{}") 125 if active_agents: 126 (agents_dir / "1234_active.jsonl").write_text("{}") 127 128 (seg / "stream.json").write_text('{"stream":"default"}') 129 return seg 130 131 132class TestIsSegmentComplete: 133 def test_complete_audio_video(self, tmp_path): 134 seg = _make_segment(tmp_path, audio=True, video=True, embeddings=True) 135 assert is_segment_complete(seg) 136 137 def test_complete_audio_only(self, tmp_path): 138 seg = _make_segment(tmp_path, audio=True) 139 assert is_segment_complete(seg) 140 141 def test_complete_video_only(self, tmp_path): 142 seg = _make_segment(tmp_path, video=True) 143 assert is_segment_complete(seg) 144 145 def test_incomplete_missing_audio_extract(self, tmp_path): 146 seg = _make_segment(tmp_path, audio=True, audio_extract=False) 147 assert not is_segment_complete(seg) 148 149 def test_incomplete_missing_screen_extract(self, tmp_path): 150 seg = _make_segment(tmp_path, video=True, screen_extract=False) 151 assert not is_segment_complete(seg) 152 153 def test_incomplete_missing_screen_extract_for_mp4(self, tmp_path): 154 seg = _make_segment( 155 tmp_path, video=True, video_name="screen.mp4", screen_extract=False 156 ) 157 assert not is_segment_complete(seg) 158 159 def test_complete_mp4_with_screen_extract(self, tmp_path): 160 seg = _make_segment(tmp_path, video=True, video_name="screen.mp4") 161 assert is_segment_complete(seg) 162 163 def test_incomplete_missing_speaker_labels(self, tmp_path): 164 seg = _make_segment(tmp_path, audio=True, embeddings=True, speaker_labels=False) 165 assert not is_segment_complete(seg) 166 167 def test_complete_with_stub_speaker_labels(self, tmp_path): 168 """Stub speaker_labels.json (skipped=True, labels=[]) unblocks retention.""" 169 seg = _make_segment(tmp_path, audio=True, embeddings=True, speaker_labels=False) 170 stub = seg / "agents" / "speaker_labels.json" 171 stub.write_text( 172 json.dumps({"labels": [], "skipped": True, "reason": "no_owner_centroid"}) 173 ) 174 assert is_segment_complete(seg) 175 176 def test_incomplete_active_agents(self, tmp_path): 177 seg = _make_segment(tmp_path, audio=True, active_agents=True) 178 assert not is_segment_complete(seg) 179 180 def test_no_raw_media_is_complete(self, tmp_path): 181 """Segment with only derived content is considered complete.""" 182 seg = tmp_path / "segment" 183 seg.mkdir() 184 (seg / "audio.jsonl").write_text("transcript") 185 (seg / "stream.json").write_text("{}") 186 assert is_segment_complete(seg) 187 188 def test_no_agents_dir_is_ok(self, tmp_path): 189 """No agents/ directory = no active agents = passes check 1.""" 190 seg = tmp_path / "segment" 191 seg.mkdir() 192 (seg / "stream.json").write_text("{}") 193 assert is_segment_complete(seg) 194 195 196# --------------------------------------------------------------------------- 197# RetentionPolicy 198# --------------------------------------------------------------------------- 199 200 201class TestRetentionPolicy: 202 def test_keep_never_eligible(self): 203 p = RetentionPolicy(mode="keep") 204 assert not p.is_eligible(0) 205 assert not p.is_eligible(365) 206 207 def test_processed_always_eligible(self): 208 p = RetentionPolicy(mode="processed") 209 assert p.is_eligible(0) 210 assert p.is_eligible(1) 211 212 def test_days_threshold(self): 213 p = RetentionPolicy(mode="days", days=30) 214 assert not p.is_eligible(29) 215 assert p.is_eligible(30) 216 assert p.is_eligible(31) 217 218 def test_days_no_value(self): 219 p = RetentionPolicy(mode="days", days=None) 220 assert not p.is_eligible(100) 221 222 223class TestRetentionConfig: 224 def test_default_policy(self): 225 cfg = RetentionConfig() 226 assert cfg.policy_for_stream("default").mode == "days" 227 assert cfg.policy_for_stream("default").days == 7 228 229 def test_per_stream_override(self): 230 cfg = RetentionConfig( 231 default=RetentionPolicy(mode="keep"), 232 per_stream={ 233 "archon.plaud": RetentionPolicy(mode="days", days=7), 234 }, 235 ) 236 assert cfg.policy_for_stream("archon.plaud").mode == "days" 237 assert cfg.policy_for_stream("archon.plaud").days == 7 238 assert cfg.policy_for_stream("default").mode == "keep" 239 240 241# --------------------------------------------------------------------------- 242# load_retention_config 243# --------------------------------------------------------------------------- 244 245 246class TestLoadRetentionConfig: 247 def test_default_config(self, monkeypatch): 248 monkeypatch.setattr("think.utils.get_config", lambda: {}) 249 cfg = load_retention_config() 250 assert cfg.default.mode == "days" 251 assert cfg.default.days == 7 252 assert cfg.per_stream == {} 253 254 def test_custom_config(self, monkeypatch): 255 monkeypatch.setattr( 256 "think.utils.get_config", 257 lambda: { 258 "retention": { 259 "raw_media": "days", 260 "raw_media_days": 30, 261 "per_stream": { 262 "default": {"raw_media": "processed"}, 263 }, 264 } 265 }, 266 ) 267 cfg = load_retention_config() 268 assert cfg.default.mode == "days" 269 assert cfg.default.days == 30 270 assert cfg.per_stream["default"].mode == "processed" 271 272 273# --------------------------------------------------------------------------- 274# purge 275# --------------------------------------------------------------------------- 276 277 278class TestPurge: 279 def _setup_journal(self, tmp_path, monkeypatch): 280 """Create a journal structure with test segments.""" 281 journal = tmp_path / "journal" 282 283 # Day 1: 60 days old — two complete segments 284 day1 = journal / "chronicle" / "20260115" / "default" / "100000_300" 285 day1.mkdir(parents=True) 286 (day1 / "audio.flac").write_bytes(b"x" * 1000) 287 (day1 / "audio.jsonl").write_text('{"raw":"audio.flac"}\n') 288 (day1 / "stream.json").write_text('{"stream":"default"}') 289 (day1 / "agents").mkdir() 290 291 day1b = journal / "chronicle" / "20260115" / "plaud" / "103000_300" 292 day1b.mkdir(parents=True) 293 (day1b / "audio.m4a").write_bytes(b"x" * 500) 294 (day1b / "audio.jsonl").write_text('{"raw":"audio.m4a"}\n') 295 (day1b / "stream.json").write_text('{"stream":"plaud"}') 296 (day1b / "agents").mkdir() 297 298 # Day 2: recent — one complete segment (must stay within 30d window) 299 day2 = journal / "chronicle" / "20260401" / "default" / "120000_300" 300 day2.mkdir(parents=True) 301 (day2 / "audio.flac").write_bytes(b"x" * 800) 302 (day2 / "audio.jsonl").write_text('{"raw":"audio.flac"}\n') 303 (day2 / "stream.json").write_text('{"stream":"default"}') 304 (day2 / "agents").mkdir() 305 306 # Day 3: incomplete segment (no audio.jsonl) 307 day3 = journal / "chronicle" / "20260101" / "default" / "140000_300" 308 day3.mkdir(parents=True) 309 (day3 / "audio.flac").write_bytes(b"x" * 600) 310 (day3 / "stream.json").write_text('{"stream":"default"}') 311 312 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal)) 313 # Clear cached journal path 314 import think.utils 315 316 think.utils._journal_path_cache = None 317 318 return journal 319 320 def test_dry_run(self, tmp_path, monkeypatch): 321 journal = self._setup_journal(tmp_path, monkeypatch) 322 323 result = purge(older_than_days=30, dry_run=True) 324 325 # Should report but not delete 326 assert result.files_deleted == 2 # day1 default + plaud 327 assert result.bytes_freed == 1500 328 assert ( 329 journal / "chronicle" / "20260115" / "default" / "100000_300" / "audio.flac" 330 ).exists() 331 assert ( 332 journal / "chronicle" / "20260115" / "plaud" / "103000_300" / "audio.m4a" 333 ).exists() 334 # No retention log for dry run 335 assert not (journal / "health" / "retention.log").exists() 336 337 def test_actual_purge(self, tmp_path, monkeypatch): 338 journal = self._setup_journal(tmp_path, monkeypatch) 339 340 result = purge(older_than_days=30, dry_run=False) 341 342 assert result.files_deleted == 2 343 # Files should be gone 344 assert not ( 345 journal / "chronicle" / "20260115" / "default" / "100000_300" / "audio.flac" 346 ).exists() 347 assert not ( 348 journal / "chronicle" / "20260115" / "plaud" / "103000_300" / "audio.m4a" 349 ).exists() 350 # Derived content preserved 351 assert ( 352 journal 353 / "chronicle" 354 / "20260115" 355 / "default" 356 / "100000_300" 357 / "audio.jsonl" 358 ).exists() 359 # Retention log written 360 assert (journal / "health" / "retention.log").exists() 361 362 def test_skips_incomplete(self, tmp_path, monkeypatch): 363 self._setup_journal(tmp_path, monkeypatch) 364 365 result = purge(older_than_days=0, dry_run=True) 366 367 # Day3 segment should be skipped (incomplete) 368 assert result.segments_skipped_incomplete == 1 369 370 def test_stream_filter(self, tmp_path, monkeypatch): 371 self._setup_journal(tmp_path, monkeypatch) 372 373 result = purge(older_than_days=30, stream_filter="plaud", dry_run=True) 374 375 assert result.files_deleted == 1 376 assert result.details[0]["stream"] == "plaud" 377 378 def test_policy_based_purge(self, tmp_path, monkeypatch): 379 self._setup_journal(tmp_path, monkeypatch) 380 381 config = RetentionConfig( 382 default=RetentionPolicy(mode="keep"), 383 per_stream={ 384 "plaud": RetentionPolicy(mode="days", days=7), 385 }, 386 ) 387 388 result = purge(dry_run=True, config=config) 389 390 # Only plaud segment (60 days old) should be eligible 391 assert result.files_deleted == 1 392 assert result.details[0]["stream"] == "plaud" 393 394 395class TestPurgeProvenance: 396 def _setup_journal(self, tmp_path, monkeypatch): 397 return TestPurge()._setup_journal(tmp_path, monkeypatch) 398 399 def test_hash_field_in_dry_run(self, tmp_path, monkeypatch): 400 self._setup_journal(tmp_path, monkeypatch) 401 402 result = purge(older_than_days=30, dry_run=True) 403 expected_hash = hashlib.sha256(b"x" * 1000).hexdigest() 404 405 for detail in result.details: 406 for file_info in detail["files"]: 407 file_hash = file_info["hash"] 408 assert len(file_hash) == 64 409 assert all(c in "0123456789abcdef" for c in file_hash) 410 411 default_detail = next( 412 detail 413 for detail in result.details 414 if detail["stream"] == "default" and detail["segment"] == "100000_300" 415 ) 416 assert default_detail["files"][0]["hash"] == expected_hash 417 418 def test_hash_field_in_actual_purge(self, tmp_path, monkeypatch): 419 self._setup_journal(tmp_path, monkeypatch) 420 421 result = purge(older_than_days=30, dry_run=False) 422 expected_hash = hashlib.sha256(b"x" * 1000).hexdigest() 423 424 for detail in result.details: 425 for file_info in detail["files"]: 426 file_hash = file_info["hash"] 427 assert len(file_hash) == 64 428 assert all(c in "0123456789abcdef" for c in file_hash) 429 430 default_detail = next( 431 detail 432 for detail in result.details 433 if detail["stream"] == "default" and detail["segment"] == "100000_300" 434 ) 435 assert default_detail["files"][0]["hash"] == expected_hash 436 437 def test_processed_at_field(self, tmp_path, monkeypatch): 438 self._setup_journal(tmp_path, monkeypatch) 439 440 result = purge(older_than_days=30, dry_run=True) 441 442 for detail in result.details: 443 assert "processed_at" in detail 444 assert isinstance(detail["processed_at"], str) 445 datetime.fromisoformat(detail["processed_at"]) 446 447 def test_processed_at_reflects_latest_mtime(self, tmp_path, monkeypatch): 448 journal = self._setup_journal(tmp_path, monkeypatch) 449 segment = journal / "chronicle" / "20260115" / "default" / "100000_300" 450 audio_jsonl = segment / "audio.jsonl" 451 alternate_audio_jsonl = segment / "meeting_audio.jsonl" 452 speaker_labels = segment / "agents" / "speaker_labels.json" 453 454 alternate_audio_jsonl.write_text('{"raw":"audio.flac"}\n') 455 speaker_labels.write_text("{}") 456 457 older_ts = datetime(2026, 1, 15, 10, 0, 0).timestamp() 458 middle_ts = datetime(2026, 1, 15, 11, 0, 0).timestamp() 459 latest_ts = datetime(2026, 1, 15, 12, 0, 0).timestamp() 460 461 os.utime(audio_jsonl, (older_ts, older_ts)) 462 os.utime(speaker_labels, (middle_ts, middle_ts)) 463 os.utime(alternate_audio_jsonl, (latest_ts, latest_ts)) 464 465 result = purge(older_than_days=30, dry_run=True) 466 467 default_detail = next( 468 detail 469 for detail in result.details 470 if detail["stream"] == "default" and detail["segment"] == "100000_300" 471 ) 472 assert ( 473 default_detail["processed_at"] 474 == datetime.fromtimestamp(latest_ts).isoformat() 475 ) 476 477 478# --------------------------------------------------------------------------- 479# _human_bytes 480# --------------------------------------------------------------------------- 481 482 483class TestHumanBytes: 484 def test_bytes(self): 485 assert _human_bytes(0) == "0 B" 486 assert _human_bytes(512) == "512 B" 487 488 def test_kilobytes(self): 489 assert _human_bytes(1024) == "1.0 KB" 490 491 def test_megabytes(self): 492 assert _human_bytes(1024 * 1024) == "1.0 MB" 493 494 def test_gigabytes(self): 495 assert _human_bytes(1024**3) == "1.0 GB" 496 497 def test_large(self): 498 result = _human_bytes(12_400_000_000) 499 assert "GB" in result 500 501 502class TestCheckStorageHealth: 503 """Tests for check_storage_health threshold evaluation.""" 504 505 def _make_summary(self, raw_media_bytes=0, derived_bytes=0): 506 return StorageSummary( 507 raw_media_bytes=raw_media_bytes, 508 derived_bytes=derived_bytes, 509 total_segments=10, 510 segments_with_raw=5, 511 segments_purged=3, 512 ) 513 514 def test_no_warnings_when_healthy(self, tmp_path, monkeypatch): 515 """No warnings when disk is below threshold and raw media GB is null.""" 516 usage_type = type(shutil.disk_usage(tmp_path)) 517 monkeypatch.setattr( 518 "shutil.disk_usage", 519 lambda path: usage_type(1000, 500, 500), # 50% used 520 ) 521 config = { 522 "retention": { 523 "storage_warning_disk_percent": 80, 524 "storage_warning_raw_media_gb": None, 525 } 526 } 527 summary = self._make_summary() 528 warnings = check_storage_health(summary, tmp_path, config=config) 529 assert warnings == [] 530 531 def test_disk_percent_exceeded(self, tmp_path, monkeypatch): 532 """Warning when disk usage exceeds threshold.""" 533 config = { 534 "retention": { 535 "storage_warning_disk_percent": 1, 536 } 537 } 538 summary = self._make_summary() 539 warnings = check_storage_health(summary, tmp_path, config=config) 540 assert len(warnings) == 1 541 assert warnings[0]["type"] == "disk_percent" 542 assert warnings[0]["level"] == "warning" 543 assert warnings[0]["current"] >= 1 544 assert warnings[0]["threshold"] == 1 545 assert "retention settings" in warnings[0]["message"] 546 assert "Clean Up Now" in warnings[0]["message"] 547 548 def test_disk_percent_not_exceeded(self, tmp_path, monkeypatch): 549 """No warning when disk is well below threshold.""" 550 config = { 551 "retention": { 552 "storage_warning_disk_percent": 100, 553 } 554 } 555 summary = self._make_summary() 556 warnings = check_storage_health(summary, tmp_path, config=config) 557 assert warnings == [] 558 559 def test_raw_media_gb_exceeded(self, tmp_path, monkeypatch): 560 """Warning when raw media exceeds GB threshold.""" 561 raw_bytes = int(5.5 * 1024**3) 562 config = { 563 "retention": { 564 "storage_warning_disk_percent": None, 565 "storage_warning_raw_media_gb": 5.0, 566 } 567 } 568 summary = self._make_summary(raw_media_bytes=raw_bytes) 569 warnings = check_storage_health(summary, tmp_path, config=config) 570 assert len(warnings) == 1 571 assert warnings[0]["type"] == "raw_media_gb" 572 assert warnings[0]["level"] == "warning" 573 assert warnings[0]["current"] >= 5.0 574 assert warnings[0]["threshold"] == 5.0 575 assert "retention settings" in warnings[0]["message"] 576 577 def test_raw_media_gb_not_exceeded(self, tmp_path, monkeypatch): 578 """No warning when raw media is below threshold.""" 579 raw_bytes = int(2.0 * 1024**3) 580 config = { 581 "retention": { 582 "storage_warning_disk_percent": None, 583 "storage_warning_raw_media_gb": 5.0, 584 } 585 } 586 summary = self._make_summary(raw_media_bytes=raw_bytes) 587 warnings = check_storage_health(summary, tmp_path, config=config) 588 assert warnings == [] 589 590 def test_both_thresholds_exceeded(self, tmp_path, monkeypatch): 591 """Both warnings when both thresholds exceeded.""" 592 raw_bytes = int(10 * 1024**3) 593 config = { 594 "retention": { 595 "storage_warning_disk_percent": 1, 596 "storage_warning_raw_media_gb": 5.0, 597 } 598 } 599 summary = self._make_summary(raw_media_bytes=raw_bytes) 600 warnings = check_storage_health(summary, tmp_path, config=config) 601 assert len(warnings) == 2 602 types = {w["type"] for w in warnings} 603 assert types == {"disk_percent", "raw_media_gb"} 604 605 def test_null_thresholds_disables_checks(self, tmp_path, monkeypatch): 606 """Both thresholds null means no warnings ever.""" 607 raw_bytes = int(100 * 1024**3) 608 config = { 609 "retention": { 610 "storage_warning_disk_percent": None, 611 "storage_warning_raw_media_gb": None, 612 } 613 } 614 summary = self._make_summary(raw_media_bytes=raw_bytes) 615 warnings = check_storage_health(summary, tmp_path, config=config) 616 assert warnings == [] 617 618 def test_exact_threshold_triggers(self, tmp_path, monkeypatch): 619 """Warning triggers at exactly the threshold (>=, not >).""" 620 raw_bytes = int(5.0 * 1024**3) 621 config = { 622 "retention": { 623 "storage_warning_disk_percent": None, 624 "storage_warning_raw_media_gb": 5.0, 625 } 626 } 627 summary = self._make_summary(raw_media_bytes=raw_bytes) 628 warnings = check_storage_health(summary, tmp_path, config=config) 629 assert len(warnings) == 1 630 assert warnings[0]["type"] == "raw_media_gb" 631 632 def test_missing_retention_section_uses_defaults(self, tmp_path, monkeypatch): 633 """Missing retention section falls back to defaults (80% disk, null raw media).""" 634 config = {} 635 summary = self._make_summary() 636 warnings = check_storage_health(summary, tmp_path, config=config) 637 for w in warnings: 638 assert w["type"] != "raw_media_gb" 639 640 def test_warning_dict_structure(self, tmp_path, monkeypatch): 641 """Each warning has all required keys.""" 642 config = { 643 "retention": { 644 "storage_warning_disk_percent": 1, 645 "storage_warning_raw_media_gb": 0.001, 646 } 647 } 648 raw_bytes = int(1 * 1024**3) 649 summary = self._make_summary(raw_media_bytes=raw_bytes) 650 warnings = check_storage_health(summary, tmp_path, config=config) 651 for w in warnings: 652 assert "level" in w 653 assert "type" in w 654 assert "message" in w 655 assert "current" in w 656 assert "threshold" in w