linux observer

Add cache_retention_days config for automatic cleanup of synced segments

Default 7 days. Values: positive int = days to keep, 0 = delete
immediately after confirmed sync, -1 = keep forever. Cleanup runs
at the end of each sync pass with triple-gated safety: synced_days
membership, age check, and per-segment server confirmation.

+346
+14
INSTALL.md
··· 66 66 } 67 67 ``` 68 68 69 + **optional: cache retention.** by default, synced segments are deleted after 7 days. to change this, add `cache_retention_days` to config.json: 70 + - positive number: keep synced segments for that many days (default: `7`) 71 + - `0`: delete immediately after confirmed sync 72 + - `-1`: keep forever (never auto-delete) 73 + 74 + ```json 75 + { 76 + "server_url": "http://localhost:5015", 77 + "key": "THE_API_KEY_FROM_STEP_3", 78 + "stream": "HOSTNAME", 79 + "cache_retention_days": 7 80 + } 81 + ``` 82 + 69 83 5. install and start the systemd user service: 70 84 ``` 71 85 solstone-linux install-service
+9
src/solstone_linux/cli.py
··· 263 263 else: 264 264 print(f"Cache: {captures_dir} (not created yet)") 265 265 266 + # Retention policy 267 + retention = config.cache_retention_days 268 + if retention < 0: 269 + print("Retain: forever") 270 + elif retention == 0: 271 + print("Retain: delete after sync") 272 + else: 273 + print(f"Retain: {retention} day(s)") 274 + 266 275 # Synced days 267 276 synced_path = config.state_dir / "synced_days.json" 268 277 if synced_path.exists():
+6
src/solstone_linux/config.py
··· 37 37 default_factory=lambda: list(DEFAULT_SYNC_RETRY_DELAYS) 38 38 ) 39 39 sync_max_retries: int = DEFAULT_SYNC_MAX_RETRIES 40 + cache_retention_days: int = 7 40 41 base_dir: Path = DEFAULT_BASE_DIR 41 42 42 43 @property ··· 91 92 config.sync_retry_delays = data["sync_retry_delays"] 92 93 if "sync_max_retries" in data: 93 94 config.sync_max_retries = data["sync_max_retries"] 95 + try: 96 + config.cache_retention_days = int(data.get("cache_retention_days", 7)) 97 + except (TypeError, ValueError): 98 + config.cache_retention_days = 7 94 99 95 100 return config 96 101 ··· 106 111 "segment_interval": config.segment_interval, 107 112 "sync_retry_delays": config.sync_retry_delays, 108 113 "sync_max_retries": config.sync_max_retries, 114 + "cache_retention_days": config.cache_retention_days, 109 115 } 110 116 111 117 config_path = config.config_path
+108
src/solstone_linux/sync.py
··· 21 21 import json 22 22 import logging 23 23 import os 24 + import shutil 24 25 import time 25 26 from datetime import datetime, timedelta 26 27 from pathlib import Path ··· 109 110 ) 110 111 self._save_synced_days() 111 112 113 + async def _cleanup_synced_segments(self) -> None: 114 + """Delete synced segments older than cache_retention_days. 115 + 116 + Triple-gated safety: 117 + 1. Day must be in _synced_days (fully synced locally) 118 + 2. Segment must be older than retention threshold (unless retention=0) 119 + 3. Segment must be confirmed present on server (fresh query) 120 + """ 121 + retention = self._config.cache_retention_days 122 + if retention < 0: 123 + return 124 + 125 + captures_dir = self._config.captures_dir 126 + if not captures_dir.exists(): 127 + return 128 + 129 + today = datetime.now().strftime("%Y%m%d") 130 + if retention > 0: 131 + cutoff = (datetime.now() - timedelta(days=retention)).strftime("%Y%m%d") 132 + else: 133 + cutoff = today # 0 means delete immediately — all days qualify 134 + 135 + deleted_total = 0 136 + 137 + for day_dir in sorted(captures_dir.iterdir()): 138 + if not day_dir.is_dir(): 139 + continue 140 + 141 + day = day_dir.name 142 + 143 + if not self._running: 144 + break 145 + 146 + # Gate 1: day must be in synced_days 147 + if day not in self._synced_days: 148 + continue 149 + 150 + # Gate 2: day must be old enough (unless retention=0) 151 + if retention > 0 and day >= cutoff: 152 + continue 153 + 154 + # Don't clean today's segments 155 + if day == today: 156 + continue 157 + 158 + # Gate 3: fresh server confirmation 159 + server_segments = await asyncio.to_thread( 160 + self._client.get_server_segments, day 161 + ) 162 + if server_segments is None: 163 + logger.warning("Cleanup: skipping day %s — server unreachable", day) 164 + continue 165 + 166 + server_keys: set[str] = set() 167 + for seg in server_segments: 168 + server_keys.add(seg.get("key", "")) 169 + if "original_key" in seg: 170 + server_keys.add(seg["original_key"]) 171 + 172 + deleted_day = 0 173 + 174 + for stream_dir in day_dir.iterdir(): 175 + if not stream_dir.is_dir(): 176 + continue 177 + 178 + for seg_dir in sorted(stream_dir.iterdir()): 179 + if not seg_dir.is_dir(): 180 + continue 181 + 182 + name = seg_dir.name 183 + # Never touch incomplete or failed 184 + if name.endswith(".incomplete") or name.endswith(".failed"): 185 + continue 186 + 187 + if name not in server_keys: 188 + logger.warning( 189 + "Cleanup: keeping %s/%s — not confirmed on server", 190 + day, 191 + name, 192 + ) 193 + continue 194 + 195 + shutil.rmtree(seg_dir) 196 + logger.info("Cleanup: deleted %s/%s", day, name) 197 + deleted_day += 1 198 + 199 + # Remove empty stream dir 200 + if stream_dir.is_dir() and not any(stream_dir.iterdir()): 201 + stream_dir.rmdir() 202 + 203 + # Remove empty day dir 204 + if day_dir.is_dir() and not any(day_dir.iterdir()): 205 + day_dir.rmdir() 206 + 207 + if deleted_day: 208 + deleted_total += deleted_day 209 + 210 + if deleted_total: 211 + logger.info("Cleanup: deleted %d segment(s) total", deleted_total) 212 + 112 213 def _circuit_threshold(self) -> int: 113 214 """Get circuit breaker threshold based on last error type.""" 114 215 if self._last_error_type == ErrorType.AUTH: ··· 292 393 if day != today and not any_needed_upload: 293 394 self._synced_days.add(day) 294 395 self._save_synced_days() 396 + 397 + # Cleanup old synced segments 398 + if not self._circuit_open and self._running: 399 + try: 400 + await self._cleanup_synced_segments() 401 + except Exception as e: 402 + logger.error(f"Cleanup error: {e}", exc_info=True) 295 403 296 404 def _collect_segments(self, captures_dir: Path) -> dict[str, list[Path]]: 297 405 """Collect completed segments grouped by day."""
+17
tests/test_config.py
··· 67 67 loaded = load_config(tmp_path) 68 68 assert loaded.sync_retry_delays == [10, 60, 300] 69 69 assert loaded.sync_max_retries == 5 70 + 71 + def test_cache_retention_days_roundtrip(self, tmp_path: Path): 72 + config = Config(base_dir=tmp_path) 73 + config.cache_retention_days = 14 74 + save_config(config) 75 + 76 + loaded = load_config(tmp_path) 77 + assert loaded.cache_retention_days == 14 78 + 79 + def test_cache_retention_days_default(self, tmp_path: Path): 80 + """Existing configs without cache_retention_days default to 7.""" 81 + config_dir = tmp_path / "config" 82 + config_dir.mkdir(parents=True) 83 + (config_dir / "config.json").write_text('{"server_url": "http://test"}') 84 + 85 + loaded = load_config(tmp_path) 86 + assert loaded.cache_retention_days == 7
+192
tests/test_sync.py
··· 384 384 config.sync_max_retries = 1 385 385 client = UploadClient(config) 386 386 assert client._max_retries == 1 387 + 388 + 389 + class TestCleanupSyncedSegments: 390 + """Test cache retention cleanup of synced segments.""" 391 + 392 + def _make_sync(self, tmp_path: Path, retention: int = 7) -> SyncService: 393 + config = Config(base_dir=tmp_path) 394 + config.cache_retention_days = retention 395 + config.ensure_dirs() 396 + client = UploadClient(config) 397 + return SyncService(config, client) 398 + 399 + def _create_segment( 400 + self, captures_dir: Path, day: str, stream: str, name: str 401 + ) -> Path: 402 + seg_dir = captures_dir / day / stream / name 403 + seg_dir.mkdir(parents=True, exist_ok=True) 404 + (seg_dir / "screen.webm").write_bytes(b"\x00" * 100) 405 + return seg_dir 406 + 407 + @pytest.mark.asyncio 408 + async def test_deletes_old_synced_confirmed(self, tmp_path: Path): 409 + """Segments in synced_days + confirmed on server + old enough -> deleted.""" 410 + sync = self._make_sync(tmp_path, retention=7) 411 + captures = sync._config.captures_dir 412 + 413 + self._create_segment(captures, "20260101", "archon", "120000_300") 414 + sync._synced_days.add("20260101") 415 + 416 + server_response = [{"key": "120000_300"}] 417 + with patch( 418 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 419 + ): 420 + await sync._cleanup_synced_segments() 421 + 422 + assert not (captures / "20260101" / "archon" / "120000_300").exists() 423 + 424 + @pytest.mark.asyncio 425 + async def test_keeps_unconfirmed_on_server(self, tmp_path: Path): 426 + """Segments in synced_days + NOT on server -> not deleted.""" 427 + sync = self._make_sync(tmp_path, retention=7) 428 + captures = sync._config.captures_dir 429 + 430 + self._create_segment(captures, "20260101", "archon", "120000_300") 431 + sync._synced_days.add("20260101") 432 + 433 + server_response = [{"key": "999999_300"}] 434 + with patch( 435 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 436 + ): 437 + await sync._cleanup_synced_segments() 438 + 439 + assert (captures / "20260101" / "archon" / "120000_300").exists() 440 + 441 + @pytest.mark.asyncio 442 + async def test_keeps_segments_not_in_synced_days(self, tmp_path: Path): 443 + """Segments NOT in synced_days -> not deleted.""" 444 + sync = self._make_sync(tmp_path, retention=7) 445 + captures = sync._config.captures_dir 446 + 447 + self._create_segment(captures, "20260101", "archon", "120000_300") 448 + 449 + with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: 450 + await sync._cleanup_synced_segments() 451 + 452 + assert (captures / "20260101" / "archon" / "120000_300").exists() 453 + mock_thread.assert_not_called() 454 + 455 + @pytest.mark.asyncio 456 + async def test_keeps_when_server_unreachable(self, tmp_path: Path): 457 + """Server unreachable (returns None) -> nothing deleted.""" 458 + sync = self._make_sync(tmp_path, retention=7) 459 + captures = sync._config.captures_dir 460 + 461 + self._create_segment(captures, "20260101", "archon", "120000_300") 462 + sync._synced_days.add("20260101") 463 + 464 + with patch("asyncio.to_thread", new_callable=AsyncMock, return_value=None): 465 + await sync._cleanup_synced_segments() 466 + 467 + assert (captures / "20260101" / "archon" / "120000_300").exists() 468 + 469 + @pytest.mark.asyncio 470 + async def test_never_touches_incomplete_or_failed(self, tmp_path: Path): 471 + """.incomplete and .failed segments are never deleted.""" 472 + sync = self._make_sync(tmp_path, retention=7) 473 + captures = sync._config.captures_dir 474 + 475 + self._create_segment(captures, "20260101", "archon", "120000.incomplete") 476 + self._create_segment(captures, "20260101", "archon", "130000.failed") 477 + self._create_segment(captures, "20260101", "archon", "140000_300") 478 + sync._synced_days.add("20260101") 479 + 480 + server_response = [{"key": "140000_300"}] 481 + with patch( 482 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 483 + ): 484 + await sync._cleanup_synced_segments() 485 + 486 + assert (captures / "20260101" / "archon" / "120000.incomplete").exists() 487 + assert (captures / "20260101" / "archon" / "130000.failed").exists() 488 + assert not (captures / "20260101" / "archon" / "140000_300").exists() 489 + 490 + @pytest.mark.asyncio 491 + async def test_retention_negative_one_keeps_forever(self, tmp_path: Path): 492 + """cache_retention_days = -1 -> nothing deleted.""" 493 + sync = self._make_sync(tmp_path, retention=-1) 494 + captures = sync._config.captures_dir 495 + 496 + self._create_segment(captures, "20260101", "archon", "120000_300") 497 + sync._synced_days.add("20260101") 498 + 499 + with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: 500 + await sync._cleanup_synced_segments() 501 + 502 + assert (captures / "20260101" / "archon" / "120000_300").exists() 503 + mock_thread.assert_not_called() 504 + 505 + @pytest.mark.asyncio 506 + async def test_retention_zero_deletes_immediately(self, tmp_path: Path): 507 + """cache_retention_days = 0 -> deletes immediately (no age check).""" 508 + sync = self._make_sync(tmp_path, retention=0) 509 + captures = sync._config.captures_dir 510 + 511 + from datetime import datetime, timedelta 512 + 513 + yesterday = (datetime.now() - timedelta(days=1)).strftime("%Y%m%d") 514 + 515 + self._create_segment(captures, yesterday, "archon", "120000_300") 516 + sync._synced_days.add(yesterday) 517 + 518 + server_response = [{"key": "120000_300"}] 519 + with patch( 520 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 521 + ): 522 + await sync._cleanup_synced_segments() 523 + 524 + assert not (captures / yesterday / "archon" / "120000_300").exists() 525 + 526 + @pytest.mark.asyncio 527 + async def test_never_cleans_today(self, tmp_path: Path): 528 + """Today's segments are never cleaned, even with retention=0.""" 529 + sync = self._make_sync(tmp_path, retention=0) 530 + captures = sync._config.captures_dir 531 + 532 + from datetime import datetime 533 + 534 + today = datetime.now().strftime("%Y%m%d") 535 + 536 + self._create_segment(captures, today, "archon", "120000_300") 537 + sync._synced_days.add(today) 538 + 539 + with patch("asyncio.to_thread", new_callable=AsyncMock) as mock_thread: 540 + await sync._cleanup_synced_segments() 541 + 542 + assert (captures / today / "archon" / "120000_300").exists() 543 + mock_thread.assert_not_called() 544 + 545 + @pytest.mark.asyncio 546 + async def test_cleans_empty_dirs(self, tmp_path: Path): 547 + """Empty stream and day dirs are removed after segment deletion.""" 548 + sync = self._make_sync(tmp_path, retention=7) 549 + captures = sync._config.captures_dir 550 + 551 + self._create_segment(captures, "20260101", "archon", "120000_300") 552 + sync._synced_days.add("20260101") 553 + 554 + server_response = [{"key": "120000_300"}] 555 + with patch( 556 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 557 + ): 558 + await sync._cleanup_synced_segments() 559 + 560 + assert not (captures / "20260101" / "archon").exists() 561 + assert not (captures / "20260101").exists() 562 + 563 + @pytest.mark.asyncio 564 + async def test_original_key_lookup(self, tmp_path: Path): 565 + """Server segment with original_key should match local segment.""" 566 + sync = self._make_sync(tmp_path, retention=7) 567 + captures = sync._config.captures_dir 568 + 569 + self._create_segment(captures, "20260101", "archon", "120000_300") 570 + sync._synced_days.add("20260101") 571 + 572 + server_response = [{"key": "renamed_key", "original_key": "120000_300"}] 573 + with patch( 574 + "asyncio.to_thread", new_callable=AsyncMock, return_value=server_response 575 + ): 576 + await sync._cleanup_synced_segments() 577 + 578 + assert not (captures / "20260101" / "archon" / "120000_300").exists()