linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4import json
5import os
6import time
7from pathlib import Path
8from unittest.mock import MagicMock, patch
9
10from solstone_linux.config import Config
11from solstone_linux.recovery import recover_incomplete_segments
12from solstone_linux.upload import ErrorType, UploadClient, UploadResult
13
14
15class TestRecovery:
16 """Test crash recovery for incomplete segments."""
17
18 def _make_incomplete(
19 self, captures_dir: Path, day: str, stream: str, time_prefix: str, age: int = 300
20 ) -> Path:
21 """Create an incomplete segment directory with a dummy file."""
22 seg_dir = captures_dir / day / stream / f"{time_prefix}.incomplete"
23 seg_dir.mkdir(parents=True)
24 (seg_dir / "center_DP-3_screen.webm").write_bytes(b"\x00" * 100)
25
26 # Set timestamps to simulate age
27 old_time = time.time() - age
28 os.utime(seg_dir, (old_time, old_time))
29 return seg_dir
30
31 def test_recovers_old_incomplete(self, tmp_path: Path):
32 captures_dir = tmp_path / "captures"
33 self._make_incomplete(captures_dir, "20260403", "archon", "140000", age=300)
34
35 recovered = recover_incomplete_segments(captures_dir)
36 assert recovered == 1
37
38 stream_dir = captures_dir / "20260403" / "archon"
39 dirs = [d.name for d in stream_dir.iterdir() if d.is_dir()]
40 assert len(dirs) == 1
41 assert dirs[0].startswith("140000_")
42 assert not dirs[0].endswith(".incomplete")
43
44 def test_recovers_with_metadata(self, tmp_path: Path):
45 """Recovery uses .metadata start_timestamp for accurate duration."""
46 captures_dir = tmp_path / "captures"
47 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete"
48 seg_dir.mkdir(parents=True)
49 (seg_dir / "center_DP-3_screen.webm").write_bytes(b"\x00" * 100)
50
51 # Write metadata with known start timestamp (60 seconds ago)
52 start_ts = time.time() - 60
53 meta = {"start_timestamp": start_ts}
54 (seg_dir / ".metadata").write_text(json.dumps(meta))
55
56 # Age the directory
57 old_time = time.time() - 300
58 os.utime(seg_dir, (old_time, old_time))
59
60 recovered = recover_incomplete_segments(captures_dir)
61 assert recovered == 1
62
63 stream_dir = captures_dir / "20260403" / "archon"
64 dirs = [d.name for d in stream_dir.iterdir() if d.is_dir()]
65 assert len(dirs) == 1
66 # Duration should be based on metadata start timestamp, not mtime-ctime
67 duration = int(dirs[0].split("_")[1])
68 assert 55 <= duration <= 65 # ~60 seconds
69
70 def test_skips_recent_incomplete(self, tmp_path: Path):
71 captures_dir = tmp_path / "captures"
72 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete"
73 seg_dir.mkdir(parents=True)
74 (seg_dir / "test.webm").write_bytes(b"\x00")
75
76 recovered = recover_incomplete_segments(captures_dir)
77 assert recovered == 0
78 assert seg_dir.exists()
79
80 def test_marks_empty_as_failed(self, tmp_path: Path):
81 captures_dir = tmp_path / "captures"
82 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete"
83 seg_dir.mkdir(parents=True)
84 # No files inside — should fail
85
86 old_time = time.time() - 300
87 os.utime(seg_dir, (old_time, old_time))
88
89 recovered = recover_incomplete_segments(captures_dir)
90 assert recovered == 0
91
92 failed_dir = captures_dir / "20260403" / "archon" / "140000.failed"
93 assert failed_dir.exists()
94
95 def test_metadata_removed_on_recovery(self, tmp_path: Path):
96 """The .metadata file should be removed during recovery."""
97 captures_dir = tmp_path / "captures"
98 seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete"
99 seg_dir.mkdir(parents=True)
100 (seg_dir / "screen.webm").write_bytes(b"\x00")
101 (seg_dir / ".metadata").write_text('{"start_timestamp": 1000}')
102
103 old_time = time.time() - 300
104 os.utime(seg_dir, (old_time, old_time))
105
106 recover_incomplete_segments(captures_dir)
107
108 stream_dir = captures_dir / "20260403" / "archon"
109 for d in stream_dir.iterdir():
110 if d.is_dir() and not d.name.endswith((".incomplete", ".failed")):
111 # .metadata should not be in the recovered dir
112 assert not (d / ".metadata").exists()
113
114 def test_no_captures_dir(self, tmp_path: Path):
115 assert recover_incomplete_segments(tmp_path / "nonexistent") == 0
116
117
118class TestSyncServiceCollect:
119 """Test segment collection logic."""
120
121 def test_skips_incomplete_and_failed(self, tmp_path: Path):
122 from solstone_linux.sync import SyncService
123
124 config = Config(base_dir=tmp_path)
125 config.ensure_dirs()
126
127 captures = config.captures_dir
128 stream_dir = captures / "20260403" / "archon"
129 stream_dir.mkdir(parents=True)
130
131 (stream_dir / "140000_300").mkdir()
132 (stream_dir / "140000_300" / "screen.webm").write_bytes(b"\x00")
133 (stream_dir / "145000.incomplete").mkdir()
134 (stream_dir / "143000.failed").mkdir()
135 (stream_dir / "150000_300").mkdir()
136 (stream_dir / "150000_300" / "audio.flac").write_bytes(b"\x00")
137
138 client = UploadClient(config)
139 sync = SyncService(config, client)
140
141 segments = sync._collect_segments(captures)
142 assert "20260403" in segments
143 names = [s.name for s in segments["20260403"]]
144 assert "140000_300" in names
145 assert "150000_300" in names
146 assert "145000.incomplete" not in names
147 assert "143000.failed" not in names
148
149
150class TestSyncedDaysPruning:
151 """Test that synced-days cache is pruned to 90 days."""
152
153 def test_prunes_old_entries(self, tmp_path: Path):
154 from solstone_linux.sync import SyncService
155
156 config = Config(base_dir=tmp_path)
157 config.ensure_dirs()
158
159 client = UploadClient(config)
160 sync = SyncService(config, client)
161
162 # Add entries spanning 100 days
163 from datetime import datetime, timedelta
164 today = datetime.now()
165 for i in range(100):
166 day = (today - timedelta(days=i)).strftime("%Y%m%d")
167 sync._synced_days.add(day)
168
169 sync._prune_synced_days()
170
171 # Should have ~90 entries (not 100)
172 assert len(sync._synced_days) <= 91 # Allow 1 day tolerance
173
174
175class TestErrorClassification:
176 """Test HTTP error classification for circuit breaker tuning."""
177
178 def test_auth_errors(self):
179 assert UploadClient.classify_error(401) == ErrorType.AUTH
180 assert UploadClient.classify_error(403) == ErrorType.AUTH
181
182 def test_client_errors(self):
183 assert UploadClient.classify_error(400) == ErrorType.CLIENT
184
185 def test_transient_errors(self):
186 assert UploadClient.classify_error(500) == ErrorType.TRANSIENT
187 assert UploadClient.classify_error(502) == ErrorType.TRANSIENT
188 assert UploadClient.classify_error(503) == ErrorType.TRANSIENT
189
190 def test_network_errors(self):
191 assert UploadClient.classify_error(None, is_network_error=True) == ErrorType.TRANSIENT
192
193 def test_unknown_status(self):
194 assert UploadClient.classify_error(418) == ErrorType.TRANSIENT
195
196
197class TestCircuitBreakerThresholds:
198 """Test circuit breaker state transitions with error-type tuning."""
199
200 def test_auth_opens_immediately(self, tmp_path: Path):
201 from solstone_linux.sync import SyncService, CIRCUIT_THRESHOLD_AUTH
202
203 config = Config(base_dir=tmp_path)
204 config.ensure_dirs()
205 client = UploadClient(config)
206 sync = SyncService(config, client)
207
208 sync._last_error_type = ErrorType.AUTH
209 assert sync._circuit_threshold() == CIRCUIT_THRESHOLD_AUTH
210 assert CIRCUIT_THRESHOLD_AUTH == 1
211
212 def test_transient_allows_more_failures(self, tmp_path: Path):
213 from solstone_linux.sync import SyncService, CIRCUIT_THRESHOLD_TRANSIENT
214
215 config = Config(base_dir=tmp_path)
216 config.ensure_dirs()
217 client = UploadClient(config)
218 sync = SyncService(config, client)
219
220 sync._last_error_type = ErrorType.TRANSIENT
221 assert sync._circuit_threshold() == CIRCUIT_THRESHOLD_TRANSIENT
222 assert CIRCUIT_THRESHOLD_TRANSIENT >= 5
223
224
225class TestRetryCapRespected:
226 """Test that upload respects configured retry cap (no hard min(config,3))."""
227
228 def test_respects_configured_max_retries(self):
229 """Upload client should use the configured max_retries, not cap at 3."""
230 config = Config()
231 config.sync_max_retries = 10
232 client = UploadClient(config)
233 assert client._max_retries == 10
234
235 def test_low_max_retries_respected(self):
236 config = Config()
237 config.sync_max_retries = 1
238 client = UploadClient(config)
239 assert client._max_retries == 1