linux observer
at main 239 lines 8.9 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4import json 5import os 6import time 7from pathlib import Path 8from unittest.mock import MagicMock, patch 9 10from solstone_linux.config import Config 11from solstone_linux.recovery import recover_incomplete_segments 12from solstone_linux.upload import ErrorType, UploadClient, UploadResult 13 14 15class TestRecovery: 16 """Test crash recovery for incomplete segments.""" 17 18 def _make_incomplete( 19 self, captures_dir: Path, day: str, stream: str, time_prefix: str, age: int = 300 20 ) -> Path: 21 """Create an incomplete segment directory with a dummy file.""" 22 seg_dir = captures_dir / day / stream / f"{time_prefix}.incomplete" 23 seg_dir.mkdir(parents=True) 24 (seg_dir / "center_DP-3_screen.webm").write_bytes(b"\x00" * 100) 25 26 # Set timestamps to simulate age 27 old_time = time.time() - age 28 os.utime(seg_dir, (old_time, old_time)) 29 return seg_dir 30 31 def test_recovers_old_incomplete(self, tmp_path: Path): 32 captures_dir = tmp_path / "captures" 33 self._make_incomplete(captures_dir, "20260403", "archon", "140000", age=300) 34 35 recovered = recover_incomplete_segments(captures_dir) 36 assert recovered == 1 37 38 stream_dir = captures_dir / "20260403" / "archon" 39 dirs = [d.name for d in stream_dir.iterdir() if d.is_dir()] 40 assert len(dirs) == 1 41 assert dirs[0].startswith("140000_") 42 assert not dirs[0].endswith(".incomplete") 43 44 def test_recovers_with_metadata(self, tmp_path: Path): 45 """Recovery uses .metadata start_timestamp for accurate duration.""" 46 captures_dir = tmp_path / "captures" 47 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 48 seg_dir.mkdir(parents=True) 49 (seg_dir / "center_DP-3_screen.webm").write_bytes(b"\x00" * 100) 50 51 # Write metadata with known start timestamp (60 seconds ago) 52 start_ts = time.time() - 60 53 meta = {"start_timestamp": start_ts} 54 (seg_dir / ".metadata").write_text(json.dumps(meta)) 55 56 # Age the directory 57 old_time = time.time() - 300 58 os.utime(seg_dir, (old_time, old_time)) 59 60 recovered = recover_incomplete_segments(captures_dir) 61 assert recovered == 1 62 63 stream_dir = captures_dir / "20260403" / "archon" 64 dirs = [d.name for d in stream_dir.iterdir() if d.is_dir()] 65 assert len(dirs) == 1 66 # Duration should be based on metadata start timestamp, not mtime-ctime 67 duration = int(dirs[0].split("_")[1]) 68 assert 55 <= duration <= 65 # ~60 seconds 69 70 def test_skips_recent_incomplete(self, tmp_path: Path): 71 captures_dir = tmp_path / "captures" 72 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 73 seg_dir.mkdir(parents=True) 74 (seg_dir / "test.webm").write_bytes(b"\x00") 75 76 recovered = recover_incomplete_segments(captures_dir) 77 assert recovered == 0 78 assert seg_dir.exists() 79 80 def test_marks_empty_as_failed(self, tmp_path: Path): 81 captures_dir = tmp_path / "captures" 82 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 83 seg_dir.mkdir(parents=True) 84 # No files inside — should fail 85 86 old_time = time.time() - 300 87 os.utime(seg_dir, (old_time, old_time)) 88 89 recovered = recover_incomplete_segments(captures_dir) 90 assert recovered == 0 91 92 failed_dir = captures_dir / "20260403" / "archon" / "140000.failed" 93 assert failed_dir.exists() 94 95 def test_metadata_removed_on_recovery(self, tmp_path: Path): 96 """The .metadata file should be removed during recovery.""" 97 captures_dir = tmp_path / "captures" 98 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 99 seg_dir.mkdir(parents=True) 100 (seg_dir / "screen.webm").write_bytes(b"\x00") 101 (seg_dir / ".metadata").write_text('{"start_timestamp": 1000}') 102 103 old_time = time.time() - 300 104 os.utime(seg_dir, (old_time, old_time)) 105 106 recover_incomplete_segments(captures_dir) 107 108 stream_dir = captures_dir / "20260403" / "archon" 109 for d in stream_dir.iterdir(): 110 if d.is_dir() and not d.name.endswith((".incomplete", ".failed")): 111 # .metadata should not be in the recovered dir 112 assert not (d / ".metadata").exists() 113 114 def test_no_captures_dir(self, tmp_path: Path): 115 assert recover_incomplete_segments(tmp_path / "nonexistent") == 0 116 117 118class TestSyncServiceCollect: 119 """Test segment collection logic.""" 120 121 def test_skips_incomplete_and_failed(self, tmp_path: Path): 122 from solstone_linux.sync import SyncService 123 124 config = Config(base_dir=tmp_path) 125 config.ensure_dirs() 126 127 captures = config.captures_dir 128 stream_dir = captures / "20260403" / "archon" 129 stream_dir.mkdir(parents=True) 130 131 (stream_dir / "140000_300").mkdir() 132 (stream_dir / "140000_300" / "screen.webm").write_bytes(b"\x00") 133 (stream_dir / "145000.incomplete").mkdir() 134 (stream_dir / "143000.failed").mkdir() 135 (stream_dir / "150000_300").mkdir() 136 (stream_dir / "150000_300" / "audio.flac").write_bytes(b"\x00") 137 138 client = UploadClient(config) 139 sync = SyncService(config, client) 140 141 segments = sync._collect_segments(captures) 142 assert "20260403" in segments 143 names = [s.name for s in segments["20260403"]] 144 assert "140000_300" in names 145 assert "150000_300" in names 146 assert "145000.incomplete" not in names 147 assert "143000.failed" not in names 148 149 150class TestSyncedDaysPruning: 151 """Test that synced-days cache is pruned to 90 days.""" 152 153 def test_prunes_old_entries(self, tmp_path: Path): 154 from solstone_linux.sync import SyncService 155 156 config = Config(base_dir=tmp_path) 157 config.ensure_dirs() 158 159 client = UploadClient(config) 160 sync = SyncService(config, client) 161 162 # Add entries spanning 100 days 163 from datetime import datetime, timedelta 164 today = datetime.now() 165 for i in range(100): 166 day = (today - timedelta(days=i)).strftime("%Y%m%d") 167 sync._synced_days.add(day) 168 169 sync._prune_synced_days() 170 171 # Should have ~90 entries (not 100) 172 assert len(sync._synced_days) <= 91 # Allow 1 day tolerance 173 174 175class TestErrorClassification: 176 """Test HTTP error classification for circuit breaker tuning.""" 177 178 def test_auth_errors(self): 179 assert UploadClient.classify_error(401) == ErrorType.AUTH 180 assert UploadClient.classify_error(403) == ErrorType.AUTH 181 182 def test_client_errors(self): 183 assert UploadClient.classify_error(400) == ErrorType.CLIENT 184 185 def test_transient_errors(self): 186 assert UploadClient.classify_error(500) == ErrorType.TRANSIENT 187 assert UploadClient.classify_error(502) == ErrorType.TRANSIENT 188 assert UploadClient.classify_error(503) == ErrorType.TRANSIENT 189 190 def test_network_errors(self): 191 assert UploadClient.classify_error(None, is_network_error=True) == ErrorType.TRANSIENT 192 193 def test_unknown_status(self): 194 assert UploadClient.classify_error(418) == ErrorType.TRANSIENT 195 196 197class TestCircuitBreakerThresholds: 198 """Test circuit breaker state transitions with error-type tuning.""" 199 200 def test_auth_opens_immediately(self, tmp_path: Path): 201 from solstone_linux.sync import SyncService, CIRCUIT_THRESHOLD_AUTH 202 203 config = Config(base_dir=tmp_path) 204 config.ensure_dirs() 205 client = UploadClient(config) 206 sync = SyncService(config, client) 207 208 sync._last_error_type = ErrorType.AUTH 209 assert sync._circuit_threshold() == CIRCUIT_THRESHOLD_AUTH 210 assert CIRCUIT_THRESHOLD_AUTH == 1 211 212 def test_transient_allows_more_failures(self, tmp_path: Path): 213 from solstone_linux.sync import SyncService, CIRCUIT_THRESHOLD_TRANSIENT 214 215 config = Config(base_dir=tmp_path) 216 config.ensure_dirs() 217 client = UploadClient(config) 218 sync = SyncService(config, client) 219 220 sync._last_error_type = ErrorType.TRANSIENT 221 assert sync._circuit_threshold() == CIRCUIT_THRESHOLD_TRANSIENT 222 assert CIRCUIT_THRESHOLD_TRANSIENT >= 5 223 224 225class TestRetryCapRespected: 226 """Test that upload respects configured retry cap (no hard min(config,3)).""" 227 228 def test_respects_configured_max_retries(self): 229 """Upload client should use the configured max_retries, not cap at 3.""" 230 config = Config() 231 config.sync_max_retries = 10 232 client = UploadClient(config) 233 assert client._max_retries == 10 234 235 def test_low_max_retries_respected(self): 236 config = Config() 237 config.sync_max_retries = 1 238 client = UploadClient(config) 239 assert client._max_retries == 1