linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Background sync service for uploading captured segments.
5
6Modeled on solstone-macos's SyncService.swift. Runs as an asyncio
7background task in the same event loop as capture. Walks cache days
8newest-to-oldest, queries server for existing segments, uploads missing ones.
9
10Refinements over tmux baseline:
11- Respects configured sync_max_retries (no hard min(config,3) cap)
12- Circuit breaker tuned by error type: auth=immediate, transient=5-10
13- Synced-days pruning at 90 days to prevent unbounded cache growth
14"""
15
16from __future__ import annotations
17
18import asyncio
19import json
20import logging
21import os
22import time
23from datetime import datetime, timedelta
24from pathlib import Path
25from typing import Any
26
27from .config import Config
28from .upload import ErrorType, UploadClient
29
30logger = logging.getLogger(__name__)
31
32# Circuit breaker thresholds by error type
33CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately
34CIRCUIT_THRESHOLD_TRANSIENT = 5 # Transient failures need 5 consecutive
35
36# Synced days older than this are pruned from the cache
37SYNCED_DAYS_MAX_AGE = 90
38
39
40class SyncService:
41 """Background sync service that uploads completed segments to the server."""
42
43 def __init__(self, config: Config, client: UploadClient):
44 self._config = config
45 self._client = client
46 self._synced_days: set[str] = set()
47 self._consecutive_failures = 0
48 self._last_error_type: ErrorType | None = None
49 self._circuit_open = False
50 self._last_full_sync: float = 0
51 self._running = True
52 self._trigger = asyncio.Event()
53
54 # Load synced days cache
55 self._load_synced_days()
56
57 def _synced_days_path(self) -> Path:
58 return self._config.state_dir / "synced_days.json"
59
60 def _load_synced_days(self) -> None:
61 path = self._synced_days_path()
62 if not path.exists():
63 return
64 try:
65 with open(path, encoding="utf-8") as f:
66 data = json.load(f)
67 self._synced_days = set(data) if isinstance(data, list) else set()
68 except (json.JSONDecodeError, OSError):
69 self._synced_days = set()
70
71 def _save_synced_days(self) -> None:
72 self._config.state_dir.mkdir(parents=True, exist_ok=True)
73 path = self._synced_days_path()
74 tmp = path.with_suffix(f".{os.getpid()}.tmp")
75 try:
76 with open(tmp, "w", encoding="utf-8") as f:
77 json.dump(sorted(self._synced_days), f)
78 f.write("\n")
79 os.rename(str(tmp), str(path))
80 except OSError as e:
81 logger.warning(f"Failed to save synced days: {e}")
82
83 def _prune_synced_days(self) -> None:
84 """Remove synced-days entries older than 90 days."""
85 if not self._synced_days:
86 return
87 cutoff = (datetime.now() - timedelta(days=SYNCED_DAYS_MAX_AGE)).strftime("%Y%m%d")
88 before = len(self._synced_days)
89 self._synced_days = {d for d in self._synced_days if d >= cutoff}
90 pruned = before - len(self._synced_days)
91 if pruned:
92 logger.info(f"Pruned {pruned} synced-days entries older than {SYNCED_DAYS_MAX_AGE} days")
93 self._save_synced_days()
94
95 def _circuit_threshold(self) -> int:
96 """Get circuit breaker threshold based on last error type."""
97 if self._last_error_type == ErrorType.AUTH:
98 return CIRCUIT_THRESHOLD_AUTH
99 return CIRCUIT_THRESHOLD_TRANSIENT
100
101 def trigger(self) -> None:
102 """Trigger a sync pass (called by observer on segment completion)."""
103 self._trigger.set()
104
105 def stop(self) -> None:
106 """Stop the sync service."""
107 self._running = False
108 self._trigger.set()
109
110 async def run(self) -> None:
111 """Main sync loop — waits for triggers, then syncs."""
112 # Prune on startup
113 self._prune_synced_days()
114
115 while self._running:
116 try:
117 # Wait for trigger or periodic check (60s timeout)
118 try:
119 await asyncio.wait_for(self._trigger.wait(), timeout=60)
120 except asyncio.TimeoutError:
121 pass
122
123 self._trigger.clear()
124
125 if not self._running:
126 break
127
128 if self._circuit_open:
129 logger.warning("Circuit breaker open — skipping sync")
130 continue
131
132 # Force full sync daily
133 now = time.time()
134 force_full = (now - self._last_full_sync) > 86400
135
136 await self._sync(force_full=force_full)
137
138 if force_full:
139 self._last_full_sync = now
140
141 except Exception as e:
142 logger.error(f"Sync error: {e}", exc_info=True)
143 await asyncio.sleep(5)
144
145 async def _sync(self, force_full: bool = False) -> None:
146 """Walk days newest-to-oldest and upload missing segments."""
147 captures_dir = self._config.captures_dir
148 if not captures_dir.exists():
149 return
150
151 today = datetime.now().strftime("%Y%m%d")
152
153 # Collect segments by day
154 segments_by_day = self._collect_segments(captures_dir)
155 if not segments_by_day:
156 return
157
158 for day in sorted(segments_by_day.keys(), reverse=True):
159 if not self._running:
160 break
161
162 if self._circuit_open:
163 break
164
165 # Skip past days already fully synced (unless forcing)
166 if day != today and day in self._synced_days and not force_full:
167 continue
168
169 local_segments = segments_by_day[day]
170
171 # Query server for existing segments
172 server_segments = await asyncio.to_thread(
173 self._client.get_server_segments, day
174 )
175 if server_segments is None:
176 logger.warning(f"Failed to query server for day {day}")
177 continue
178
179 # Build lookup
180 server_keys: set[str] = set()
181 for seg in server_segments:
182 server_keys.add(seg.get("key", ""))
183 if "original_key" in seg:
184 server_keys.add(seg["original_key"])
185
186 any_needed_upload = False
187
188 for segment_dir in local_segments:
189 if not self._running or self._circuit_open:
190 break
191
192 segment_key = segment_dir.name
193 if segment_key in server_keys:
194 continue
195
196 any_needed_upload = True
197 success = await self._upload_segment(day, segment_dir)
198
199 if not success:
200 self._consecutive_failures += 1
201 threshold = self._circuit_threshold()
202 if self._consecutive_failures >= threshold:
203 self._circuit_open = True
204 logger.error(
205 f"Circuit breaker OPEN: {self._consecutive_failures} consecutive "
206 f"{self._last_error_type.value if self._last_error_type else 'unknown'} "
207 f"failures (threshold: {threshold})"
208 )
209 break
210 else:
211 self._consecutive_failures = 0
212 self._last_error_type = None
213
214 # Mark past days as synced if nothing needed upload
215 if day != today and not any_needed_upload:
216 self._synced_days.add(day)
217 self._save_synced_days()
218
219 def _collect_segments(self, captures_dir: Path) -> dict[str, list[Path]]:
220 """Collect completed segments grouped by day."""
221 result: dict[str, list[Path]] = {}
222
223 for day_dir in sorted(captures_dir.iterdir(), reverse=True):
224 if not day_dir.is_dir():
225 continue
226
227 day = day_dir.name
228
229 for stream_dir in day_dir.iterdir():
230 if not stream_dir.is_dir():
231 continue
232
233 segments = []
234 for seg_dir in sorted(stream_dir.iterdir(), reverse=True):
235 if not seg_dir.is_dir():
236 continue
237 name = seg_dir.name
238 # Skip incomplete and failed
239 if name.endswith(".incomplete") or name.endswith(".failed"):
240 continue
241 segments.append(seg_dir)
242
243 if segments:
244 result.setdefault(day, []).extend(segments)
245
246 return result
247
248 async def _upload_segment(self, day: str, segment_dir: Path) -> bool:
249 """Upload a single segment with retry logic."""
250 segment_key = segment_dir.name
251 files = [f for f in segment_dir.iterdir() if f.is_file()]
252 if not files:
253 return True # Nothing to upload
254
255 meta: dict[str, Any] = {"stream": self._config.stream}
256
257 result = await asyncio.to_thread(
258 self._client.upload_segment, day, segment_key, files, meta
259 )
260
261 if result.success:
262 logger.info(f"Uploaded: {day}/{segment_key} ({len(files)} files)")
263 return True
264
265 # Track error type for circuit breaker
266 self._last_error_type = result.error_type
267
268 # Non-retryable errors
269 if self._client.is_revoked:
270 logger.error("Client revoked — disabling sync")
271 self._circuit_open = True
272 return False
273
274 logger.error(f"Upload failed: {day}/{segment_key}")
275 return False