personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Tests for think.retention — media retention service."""
5
6import hashlib
7import json
8import os
9import shutil
10from datetime import datetime
11
12from think.retention import (
13 RetentionConfig,
14 RetentionPolicy,
15 StorageSummary,
16 _human_bytes,
17 check_storage_health,
18 get_raw_media_files,
19 is_raw_media,
20 is_segment_complete,
21 load_retention_config,
22 purge,
23)
24
25# ---------------------------------------------------------------------------
26# is_raw_media
27# ---------------------------------------------------------------------------
28
29
30class TestIsRawMedia:
31 def test_audio_extensions(self, tmp_path):
32 for ext in (".flac", ".opus", ".ogg", ".m4a", ".mp3", ".wav"):
33 p = tmp_path / f"audio{ext}"
34 p.touch()
35 assert is_raw_media(p), f"{ext} should be raw media"
36
37 def test_video_extensions(self, tmp_path):
38 for ext in (".webm", ".mov", ".mp4"):
39 p = tmp_path / f"screen{ext}"
40 p.touch()
41 assert is_raw_media(p), f"{ext} should be raw media"
42
43 def test_monitor_diff_png(self, tmp_path):
44 p = tmp_path / "monitor_1_diff.png"
45 p.touch()
46 assert is_raw_media(p)
47
48 p2 = tmp_path / "monitor_2_diff.png"
49 p2.touch()
50 assert is_raw_media(p2)
51
52 def test_not_raw_media(self, tmp_path):
53 for name in (
54 "audio.jsonl",
55 "screen.jsonl",
56 "stream.json",
57 "speaker_labels.json",
58 "audio.npz",
59 "summary.md",
60 "regular.png",
61 ):
62 p = tmp_path / name
63 p.touch()
64 assert not is_raw_media(p), f"{name} should NOT be raw media"
65
66
67# ---------------------------------------------------------------------------
68# get_raw_media_files
69# ---------------------------------------------------------------------------
70
71
72class TestGetRawMediaFiles:
73 def test_returns_only_raw(self, tmp_path):
74 (tmp_path / "audio.flac").write_bytes(b"x" * 100)
75 (tmp_path / "screen.webm").write_bytes(b"x" * 200)
76 (tmp_path / "audio.jsonl").write_text("transcript")
77 (tmp_path / "stream.json").write_text("{}")
78
79 raw = get_raw_media_files(tmp_path)
80 names = {f.name for f in raw}
81 assert names == {"audio.flac", "screen.webm"}
82
83 def test_empty_dir(self, tmp_path):
84 assert get_raw_media_files(tmp_path) == []
85
86 def test_nonexistent_dir(self, tmp_path):
87 assert get_raw_media_files(tmp_path / "nope") == []
88
89
90# ---------------------------------------------------------------------------
91# is_segment_complete
92# ---------------------------------------------------------------------------
93
94
95def _make_segment(
96 tmp_path,
97 *,
98 audio=False,
99 video=False,
100 video_name="screen.webm",
101 embeddings=False,
102 audio_extract=True,
103 screen_extract=True,
104 speaker_labels=True,
105 active_agents=False,
106):
107 """Create a segment directory with specified contents."""
108 seg = tmp_path / "segment"
109 seg.mkdir(exist_ok=True)
110 agents_dir = seg / "agents"
111 agents_dir.mkdir(exist_ok=True)
112
113 if audio:
114 (seg / "audio.flac").write_bytes(b"audio")
115 if video:
116 (seg / video_name).write_bytes(b"video")
117 if embeddings:
118 (seg / "audio.npz").write_bytes(b"npz")
119 if audio and audio_extract:
120 (seg / "audio.jsonl").write_text('{"raw":"audio.flac"}\n')
121 if video and screen_extract:
122 (seg / "screen.jsonl").write_text('{"raw":"screen.webm"}\n')
123 if embeddings and speaker_labels:
124 (agents_dir / "speaker_labels.json").write_text("{}")
125 if active_agents:
126 (agents_dir / "1234_active.jsonl").write_text("{}")
127
128 (seg / "stream.json").write_text('{"stream":"default"}')
129 return seg
130
131
132class TestIsSegmentComplete:
133 def test_complete_audio_video(self, tmp_path):
134 seg = _make_segment(tmp_path, audio=True, video=True, embeddings=True)
135 assert is_segment_complete(seg)
136
137 def test_complete_audio_only(self, tmp_path):
138 seg = _make_segment(tmp_path, audio=True)
139 assert is_segment_complete(seg)
140
141 def test_complete_video_only(self, tmp_path):
142 seg = _make_segment(tmp_path, video=True)
143 assert is_segment_complete(seg)
144
145 def test_incomplete_missing_audio_extract(self, tmp_path):
146 seg = _make_segment(tmp_path, audio=True, audio_extract=False)
147 assert not is_segment_complete(seg)
148
149 def test_incomplete_missing_screen_extract(self, tmp_path):
150 seg = _make_segment(tmp_path, video=True, screen_extract=False)
151 assert not is_segment_complete(seg)
152
153 def test_incomplete_missing_screen_extract_for_mp4(self, tmp_path):
154 seg = _make_segment(
155 tmp_path, video=True, video_name="screen.mp4", screen_extract=False
156 )
157 assert not is_segment_complete(seg)
158
159 def test_complete_mp4_with_screen_extract(self, tmp_path):
160 seg = _make_segment(tmp_path, video=True, video_name="screen.mp4")
161 assert is_segment_complete(seg)
162
163 def test_incomplete_missing_speaker_labels(self, tmp_path):
164 seg = _make_segment(tmp_path, audio=True, embeddings=True, speaker_labels=False)
165 assert not is_segment_complete(seg)
166
167 def test_complete_with_stub_speaker_labels(self, tmp_path):
168 """Stub speaker_labels.json (skipped=True, labels=[]) unblocks retention."""
169 seg = _make_segment(tmp_path, audio=True, embeddings=True, speaker_labels=False)
170 stub = seg / "agents" / "speaker_labels.json"
171 stub.write_text(
172 json.dumps({"labels": [], "skipped": True, "reason": "no_owner_centroid"})
173 )
174 assert is_segment_complete(seg)
175
176 def test_incomplete_active_agents(self, tmp_path):
177 seg = _make_segment(tmp_path, audio=True, active_agents=True)
178 assert not is_segment_complete(seg)
179
180 def test_no_raw_media_is_complete(self, tmp_path):
181 """Segment with only derived content is considered complete."""
182 seg = tmp_path / "segment"
183 seg.mkdir()
184 (seg / "audio.jsonl").write_text("transcript")
185 (seg / "stream.json").write_text("{}")
186 assert is_segment_complete(seg)
187
188 def test_no_agents_dir_is_ok(self, tmp_path):
189 """No agents/ directory = no active agents = passes check 1."""
190 seg = tmp_path / "segment"
191 seg.mkdir()
192 (seg / "stream.json").write_text("{}")
193 assert is_segment_complete(seg)
194
195
196# ---------------------------------------------------------------------------
197# RetentionPolicy
198# ---------------------------------------------------------------------------
199
200
201class TestRetentionPolicy:
202 def test_keep_never_eligible(self):
203 p = RetentionPolicy(mode="keep")
204 assert not p.is_eligible(0)
205 assert not p.is_eligible(365)
206
207 def test_processed_always_eligible(self):
208 p = RetentionPolicy(mode="processed")
209 assert p.is_eligible(0)
210 assert p.is_eligible(1)
211
212 def test_days_threshold(self):
213 p = RetentionPolicy(mode="days", days=30)
214 assert not p.is_eligible(29)
215 assert p.is_eligible(30)
216 assert p.is_eligible(31)
217
218 def test_days_no_value(self):
219 p = RetentionPolicy(mode="days", days=None)
220 assert not p.is_eligible(100)
221
222
223class TestRetentionConfig:
224 def test_default_policy(self):
225 cfg = RetentionConfig()
226 assert cfg.policy_for_stream("default").mode == "days"
227 assert cfg.policy_for_stream("default").days == 7
228
229 def test_per_stream_override(self):
230 cfg = RetentionConfig(
231 default=RetentionPolicy(mode="keep"),
232 per_stream={
233 "archon.plaud": RetentionPolicy(mode="days", days=7),
234 },
235 )
236 assert cfg.policy_for_stream("archon.plaud").mode == "days"
237 assert cfg.policy_for_stream("archon.plaud").days == 7
238 assert cfg.policy_for_stream("default").mode == "keep"
239
240
241# ---------------------------------------------------------------------------
242# load_retention_config
243# ---------------------------------------------------------------------------
244
245
246class TestLoadRetentionConfig:
247 def test_default_config(self, monkeypatch):
248 monkeypatch.setattr("think.utils.get_config", lambda: {})
249 cfg = load_retention_config()
250 assert cfg.default.mode == "days"
251 assert cfg.default.days == 7
252 assert cfg.per_stream == {}
253
254 def test_custom_config(self, monkeypatch):
255 monkeypatch.setattr(
256 "think.utils.get_config",
257 lambda: {
258 "retention": {
259 "raw_media": "days",
260 "raw_media_days": 30,
261 "per_stream": {
262 "default": {"raw_media": "processed"},
263 },
264 }
265 },
266 )
267 cfg = load_retention_config()
268 assert cfg.default.mode == "days"
269 assert cfg.default.days == 30
270 assert cfg.per_stream["default"].mode == "processed"
271
272
273# ---------------------------------------------------------------------------
274# purge
275# ---------------------------------------------------------------------------
276
277
278class TestPurge:
279 def _setup_journal(self, tmp_path, monkeypatch):
280 """Create a journal structure with test segments."""
281 journal = tmp_path / "journal"
282
283 # Day 1: 60 days old — two complete segments
284 day1 = journal / "chronicle" / "20260115" / "default" / "100000_300"
285 day1.mkdir(parents=True)
286 (day1 / "audio.flac").write_bytes(b"x" * 1000)
287 (day1 / "audio.jsonl").write_text('{"raw":"audio.flac"}\n')
288 (day1 / "stream.json").write_text('{"stream":"default"}')
289 (day1 / "agents").mkdir()
290
291 day1b = journal / "chronicle" / "20260115" / "plaud" / "103000_300"
292 day1b.mkdir(parents=True)
293 (day1b / "audio.m4a").write_bytes(b"x" * 500)
294 (day1b / "audio.jsonl").write_text('{"raw":"audio.m4a"}\n')
295 (day1b / "stream.json").write_text('{"stream":"plaud"}')
296 (day1b / "agents").mkdir()
297
298 # Day 2: recent — one complete segment (must stay within 30d window)
299 day2 = journal / "chronicle" / "20260401" / "default" / "120000_300"
300 day2.mkdir(parents=True)
301 (day2 / "audio.flac").write_bytes(b"x" * 800)
302 (day2 / "audio.jsonl").write_text('{"raw":"audio.flac"}\n')
303 (day2 / "stream.json").write_text('{"stream":"default"}')
304 (day2 / "agents").mkdir()
305
306 # Day 3: incomplete segment (no audio.jsonl)
307 day3 = journal / "chronicle" / "20260101" / "default" / "140000_300"
308 day3.mkdir(parents=True)
309 (day3 / "audio.flac").write_bytes(b"x" * 600)
310 (day3 / "stream.json").write_text('{"stream":"default"}')
311
312 monkeypatch.setenv("_SOLSTONE_JOURNAL_OVERRIDE", str(journal))
313 # Clear cached journal path
314 import think.utils
315
316 think.utils._journal_path_cache = None
317
318 return journal
319
320 def test_dry_run(self, tmp_path, monkeypatch):
321 journal = self._setup_journal(tmp_path, monkeypatch)
322
323 result = purge(older_than_days=30, dry_run=True)
324
325 # Should report but not delete
326 assert result.files_deleted == 2 # day1 default + plaud
327 assert result.bytes_freed == 1500
328 assert (
329 journal / "chronicle" / "20260115" / "default" / "100000_300" / "audio.flac"
330 ).exists()
331 assert (
332 journal / "chronicle" / "20260115" / "plaud" / "103000_300" / "audio.m4a"
333 ).exists()
334 # No retention log for dry run
335 assert not (journal / "health" / "retention.log").exists()
336
337 def test_actual_purge(self, tmp_path, monkeypatch):
338 journal = self._setup_journal(tmp_path, monkeypatch)
339
340 result = purge(older_than_days=30, dry_run=False)
341
342 assert result.files_deleted == 2
343 # Files should be gone
344 assert not (
345 journal / "chronicle" / "20260115" / "default" / "100000_300" / "audio.flac"
346 ).exists()
347 assert not (
348 journal / "chronicle" / "20260115" / "plaud" / "103000_300" / "audio.m4a"
349 ).exists()
350 # Derived content preserved
351 assert (
352 journal
353 / "chronicle"
354 / "20260115"
355 / "default"
356 / "100000_300"
357 / "audio.jsonl"
358 ).exists()
359 # Retention log written
360 assert (journal / "health" / "retention.log").exists()
361
362 def test_skips_incomplete(self, tmp_path, monkeypatch):
363 self._setup_journal(tmp_path, monkeypatch)
364
365 result = purge(older_than_days=0, dry_run=True)
366
367 # Day3 segment should be skipped (incomplete)
368 assert result.segments_skipped_incomplete == 1
369
370 def test_stream_filter(self, tmp_path, monkeypatch):
371 self._setup_journal(tmp_path, monkeypatch)
372
373 result = purge(older_than_days=30, stream_filter="plaud", dry_run=True)
374
375 assert result.files_deleted == 1
376 assert result.details[0]["stream"] == "plaud"
377
378 def test_policy_based_purge(self, tmp_path, monkeypatch):
379 self._setup_journal(tmp_path, monkeypatch)
380
381 config = RetentionConfig(
382 default=RetentionPolicy(mode="keep"),
383 per_stream={
384 "plaud": RetentionPolicy(mode="days", days=7),
385 },
386 )
387
388 result = purge(dry_run=True, config=config)
389
390 # Only plaud segment (60 days old) should be eligible
391 assert result.files_deleted == 1
392 assert result.details[0]["stream"] == "plaud"
393
394
395class TestPurgeProvenance:
396 def _setup_journal(self, tmp_path, monkeypatch):
397 return TestPurge()._setup_journal(tmp_path, monkeypatch)
398
399 def test_hash_field_in_dry_run(self, tmp_path, monkeypatch):
400 self._setup_journal(tmp_path, monkeypatch)
401
402 result = purge(older_than_days=30, dry_run=True)
403 expected_hash = hashlib.sha256(b"x" * 1000).hexdigest()
404
405 for detail in result.details:
406 for file_info in detail["files"]:
407 file_hash = file_info["hash"]
408 assert len(file_hash) == 64
409 assert all(c in "0123456789abcdef" for c in file_hash)
410
411 default_detail = next(
412 detail
413 for detail in result.details
414 if detail["stream"] == "default" and detail["segment"] == "100000_300"
415 )
416 assert default_detail["files"][0]["hash"] == expected_hash
417
418 def test_hash_field_in_actual_purge(self, tmp_path, monkeypatch):
419 self._setup_journal(tmp_path, monkeypatch)
420
421 result = purge(older_than_days=30, dry_run=False)
422 expected_hash = hashlib.sha256(b"x" * 1000).hexdigest()
423
424 for detail in result.details:
425 for file_info in detail["files"]:
426 file_hash = file_info["hash"]
427 assert len(file_hash) == 64
428 assert all(c in "0123456789abcdef" for c in file_hash)
429
430 default_detail = next(
431 detail
432 for detail in result.details
433 if detail["stream"] == "default" and detail["segment"] == "100000_300"
434 )
435 assert default_detail["files"][0]["hash"] == expected_hash
436
437 def test_processed_at_field(self, tmp_path, monkeypatch):
438 self._setup_journal(tmp_path, monkeypatch)
439
440 result = purge(older_than_days=30, dry_run=True)
441
442 for detail in result.details:
443 assert "processed_at" in detail
444 assert isinstance(detail["processed_at"], str)
445 datetime.fromisoformat(detail["processed_at"])
446
447 def test_processed_at_reflects_latest_mtime(self, tmp_path, monkeypatch):
448 journal = self._setup_journal(tmp_path, monkeypatch)
449 segment = journal / "chronicle" / "20260115" / "default" / "100000_300"
450 audio_jsonl = segment / "audio.jsonl"
451 alternate_audio_jsonl = segment / "meeting_audio.jsonl"
452 speaker_labels = segment / "agents" / "speaker_labels.json"
453
454 alternate_audio_jsonl.write_text('{"raw":"audio.flac"}\n')
455 speaker_labels.write_text("{}")
456
457 older_ts = datetime(2026, 1, 15, 10, 0, 0).timestamp()
458 middle_ts = datetime(2026, 1, 15, 11, 0, 0).timestamp()
459 latest_ts = datetime(2026, 1, 15, 12, 0, 0).timestamp()
460
461 os.utime(audio_jsonl, (older_ts, older_ts))
462 os.utime(speaker_labels, (middle_ts, middle_ts))
463 os.utime(alternate_audio_jsonl, (latest_ts, latest_ts))
464
465 result = purge(older_than_days=30, dry_run=True)
466
467 default_detail = next(
468 detail
469 for detail in result.details
470 if detail["stream"] == "default" and detail["segment"] == "100000_300"
471 )
472 assert (
473 default_detail["processed_at"]
474 == datetime.fromtimestamp(latest_ts).isoformat()
475 )
476
477
478# ---------------------------------------------------------------------------
479# _human_bytes
480# ---------------------------------------------------------------------------
481
482
483class TestHumanBytes:
484 def test_bytes(self):
485 assert _human_bytes(0) == "0 B"
486 assert _human_bytes(512) == "512 B"
487
488 def test_kilobytes(self):
489 assert _human_bytes(1024) == "1.0 KB"
490
491 def test_megabytes(self):
492 assert _human_bytes(1024 * 1024) == "1.0 MB"
493
494 def test_gigabytes(self):
495 assert _human_bytes(1024**3) == "1.0 GB"
496
497 def test_large(self):
498 result = _human_bytes(12_400_000_000)
499 assert "GB" in result
500
501
502class TestCheckStorageHealth:
503 """Tests for check_storage_health threshold evaluation."""
504
505 def _make_summary(self, raw_media_bytes=0, derived_bytes=0):
506 return StorageSummary(
507 raw_media_bytes=raw_media_bytes,
508 derived_bytes=derived_bytes,
509 total_segments=10,
510 segments_with_raw=5,
511 segments_purged=3,
512 )
513
514 def test_no_warnings_when_healthy(self, tmp_path, monkeypatch):
515 """No warnings when disk is below threshold and raw media GB is null."""
516 usage_type = type(shutil.disk_usage(tmp_path))
517 monkeypatch.setattr(
518 "shutil.disk_usage",
519 lambda path: usage_type(1000, 500, 500), # 50% used
520 )
521 config = {
522 "retention": {
523 "storage_warning_disk_percent": 80,
524 "storage_warning_raw_media_gb": None,
525 }
526 }
527 summary = self._make_summary()
528 warnings = check_storage_health(summary, tmp_path, config=config)
529 assert warnings == []
530
531 def test_disk_percent_exceeded(self, tmp_path, monkeypatch):
532 """Warning when disk usage exceeds threshold."""
533 config = {
534 "retention": {
535 "storage_warning_disk_percent": 1,
536 }
537 }
538 summary = self._make_summary()
539 warnings = check_storage_health(summary, tmp_path, config=config)
540 assert len(warnings) == 1
541 assert warnings[0]["type"] == "disk_percent"
542 assert warnings[0]["level"] == "warning"
543 assert warnings[0]["current"] >= 1
544 assert warnings[0]["threshold"] == 1
545 assert "retention settings" in warnings[0]["message"]
546 assert "Clean Up Now" in warnings[0]["message"]
547
548 def test_disk_percent_not_exceeded(self, tmp_path, monkeypatch):
549 """No warning when disk is well below threshold."""
550 config = {
551 "retention": {
552 "storage_warning_disk_percent": 100,
553 }
554 }
555 summary = self._make_summary()
556 warnings = check_storage_health(summary, tmp_path, config=config)
557 assert warnings == []
558
559 def test_raw_media_gb_exceeded(self, tmp_path, monkeypatch):
560 """Warning when raw media exceeds GB threshold."""
561 raw_bytes = int(5.5 * 1024**3)
562 config = {
563 "retention": {
564 "storage_warning_disk_percent": None,
565 "storage_warning_raw_media_gb": 5.0,
566 }
567 }
568 summary = self._make_summary(raw_media_bytes=raw_bytes)
569 warnings = check_storage_health(summary, tmp_path, config=config)
570 assert len(warnings) == 1
571 assert warnings[0]["type"] == "raw_media_gb"
572 assert warnings[0]["level"] == "warning"
573 assert warnings[0]["current"] >= 5.0
574 assert warnings[0]["threshold"] == 5.0
575 assert "retention settings" in warnings[0]["message"]
576
577 def test_raw_media_gb_not_exceeded(self, tmp_path, monkeypatch):
578 """No warning when raw media is below threshold."""
579 raw_bytes = int(2.0 * 1024**3)
580 config = {
581 "retention": {
582 "storage_warning_disk_percent": None,
583 "storage_warning_raw_media_gb": 5.0,
584 }
585 }
586 summary = self._make_summary(raw_media_bytes=raw_bytes)
587 warnings = check_storage_health(summary, tmp_path, config=config)
588 assert warnings == []
589
590 def test_both_thresholds_exceeded(self, tmp_path, monkeypatch):
591 """Both warnings when both thresholds exceeded."""
592 raw_bytes = int(10 * 1024**3)
593 config = {
594 "retention": {
595 "storage_warning_disk_percent": 1,
596 "storage_warning_raw_media_gb": 5.0,
597 }
598 }
599 summary = self._make_summary(raw_media_bytes=raw_bytes)
600 warnings = check_storage_health(summary, tmp_path, config=config)
601 assert len(warnings) == 2
602 types = {w["type"] for w in warnings}
603 assert types == {"disk_percent", "raw_media_gb"}
604
605 def test_null_thresholds_disables_checks(self, tmp_path, monkeypatch):
606 """Both thresholds null means no warnings ever."""
607 raw_bytes = int(100 * 1024**3)
608 config = {
609 "retention": {
610 "storage_warning_disk_percent": None,
611 "storage_warning_raw_media_gb": None,
612 }
613 }
614 summary = self._make_summary(raw_media_bytes=raw_bytes)
615 warnings = check_storage_health(summary, tmp_path, config=config)
616 assert warnings == []
617
618 def test_exact_threshold_triggers(self, tmp_path, monkeypatch):
619 """Warning triggers at exactly the threshold (>=, not >)."""
620 raw_bytes = int(5.0 * 1024**3)
621 config = {
622 "retention": {
623 "storage_warning_disk_percent": None,
624 "storage_warning_raw_media_gb": 5.0,
625 }
626 }
627 summary = self._make_summary(raw_media_bytes=raw_bytes)
628 warnings = check_storage_health(summary, tmp_path, config=config)
629 assert len(warnings) == 1
630 assert warnings[0]["type"] == "raw_media_gb"
631
632 def test_missing_retention_section_uses_defaults(self, tmp_path, monkeypatch):
633 """Missing retention section falls back to defaults (80% disk, null raw media)."""
634 config = {}
635 summary = self._make_summary()
636 warnings = check_storage_health(summary, tmp_path, config=config)
637 for w in warnings:
638 assert w["type"] != "raw_media_gb"
639
640 def test_warning_dict_structure(self, tmp_path, monkeypatch):
641 """Each warning has all required keys."""
642 config = {
643 "retention": {
644 "storage_warning_disk_percent": 1,
645 "storage_warning_raw_media_gb": 0.001,
646 }
647 }
648 raw_bytes = int(1 * 1024**3)
649 summary = self._make_summary(raw_media_bytes=raw_bytes)
650 warnings = check_storage_health(summary, tmp_path, config=config)
651 for w in warnings:
652 assert "level" in w
653 assert "type" in w
654 assert "message" in w
655 assert "current" in w
656 assert "threshold" in w