linux observer
at main 275 lines 9.7 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Background sync service for uploading captured segments. 5 6Modeled on solstone-macos's SyncService.swift. Runs as an asyncio 7background task in the same event loop as capture. Walks cache days 8newest-to-oldest, queries server for existing segments, uploads missing ones. 9 10Refinements over tmux baseline: 11- Respects configured sync_max_retries (no hard min(config,3) cap) 12- Circuit breaker tuned by error type: auth=immediate, transient=5-10 13- Synced-days pruning at 90 days to prevent unbounded cache growth 14""" 15 16from __future__ import annotations 17 18import asyncio 19import json 20import logging 21import os 22import time 23from datetime import datetime, timedelta 24from pathlib import Path 25from typing import Any 26 27from .config import Config 28from .upload import ErrorType, UploadClient 29 30logger = logging.getLogger(__name__) 31 32# Circuit breaker thresholds by error type 33CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately 34CIRCUIT_THRESHOLD_TRANSIENT = 5 # Transient failures need 5 consecutive 35 36# Synced days older than this are pruned from the cache 37SYNCED_DAYS_MAX_AGE = 90 38 39 40class SyncService: 41 """Background sync service that uploads completed segments to the server.""" 42 43 def __init__(self, config: Config, client: UploadClient): 44 self._config = config 45 self._client = client 46 self._synced_days: set[str] = set() 47 self._consecutive_failures = 0 48 self._last_error_type: ErrorType | None = None 49 self._circuit_open = False 50 self._last_full_sync: float = 0 51 self._running = True 52 self._trigger = asyncio.Event() 53 54 # Load synced days cache 55 self._load_synced_days() 56 57 def _synced_days_path(self) -> Path: 58 return self._config.state_dir / "synced_days.json" 59 60 def _load_synced_days(self) -> None: 61 path = self._synced_days_path() 62 if not path.exists(): 63 return 64 try: 65 with open(path, encoding="utf-8") as f: 66 data = json.load(f) 67 self._synced_days = set(data) if isinstance(data, list) else set() 68 except (json.JSONDecodeError, OSError): 69 self._synced_days = set() 70 71 def _save_synced_days(self) -> None: 72 self._config.state_dir.mkdir(parents=True, exist_ok=True) 73 path = self._synced_days_path() 74 tmp = path.with_suffix(f".{os.getpid()}.tmp") 75 try: 76 with open(tmp, "w", encoding="utf-8") as f: 77 json.dump(sorted(self._synced_days), f) 78 f.write("\n") 79 os.rename(str(tmp), str(path)) 80 except OSError as e: 81 logger.warning(f"Failed to save synced days: {e}") 82 83 def _prune_synced_days(self) -> None: 84 """Remove synced-days entries older than 90 days.""" 85 if not self._synced_days: 86 return 87 cutoff = (datetime.now() - timedelta(days=SYNCED_DAYS_MAX_AGE)).strftime("%Y%m%d") 88 before = len(self._synced_days) 89 self._synced_days = {d for d in self._synced_days if d >= cutoff} 90 pruned = before - len(self._synced_days) 91 if pruned: 92 logger.info(f"Pruned {pruned} synced-days entries older than {SYNCED_DAYS_MAX_AGE} days") 93 self._save_synced_days() 94 95 def _circuit_threshold(self) -> int: 96 """Get circuit breaker threshold based on last error type.""" 97 if self._last_error_type == ErrorType.AUTH: 98 return CIRCUIT_THRESHOLD_AUTH 99 return CIRCUIT_THRESHOLD_TRANSIENT 100 101 def trigger(self) -> None: 102 """Trigger a sync pass (called by observer on segment completion).""" 103 self._trigger.set() 104 105 def stop(self) -> None: 106 """Stop the sync service.""" 107 self._running = False 108 self._trigger.set() 109 110 async def run(self) -> None: 111 """Main sync loop — waits for triggers, then syncs.""" 112 # Prune on startup 113 self._prune_synced_days() 114 115 while self._running: 116 try: 117 # Wait for trigger or periodic check (60s timeout) 118 try: 119 await asyncio.wait_for(self._trigger.wait(), timeout=60) 120 except asyncio.TimeoutError: 121 pass 122 123 self._trigger.clear() 124 125 if not self._running: 126 break 127 128 if self._circuit_open: 129 logger.warning("Circuit breaker open — skipping sync") 130 continue 131 132 # Force full sync daily 133 now = time.time() 134 force_full = (now - self._last_full_sync) > 86400 135 136 await self._sync(force_full=force_full) 137 138 if force_full: 139 self._last_full_sync = now 140 141 except Exception as e: 142 logger.error(f"Sync error: {e}", exc_info=True) 143 await asyncio.sleep(5) 144 145 async def _sync(self, force_full: bool = False) -> None: 146 """Walk days newest-to-oldest and upload missing segments.""" 147 captures_dir = self._config.captures_dir 148 if not captures_dir.exists(): 149 return 150 151 today = datetime.now().strftime("%Y%m%d") 152 153 # Collect segments by day 154 segments_by_day = self._collect_segments(captures_dir) 155 if not segments_by_day: 156 return 157 158 for day in sorted(segments_by_day.keys(), reverse=True): 159 if not self._running: 160 break 161 162 if self._circuit_open: 163 break 164 165 # Skip past days already fully synced (unless forcing) 166 if day != today and day in self._synced_days and not force_full: 167 continue 168 169 local_segments = segments_by_day[day] 170 171 # Query server for existing segments 172 server_segments = await asyncio.to_thread( 173 self._client.get_server_segments, day 174 ) 175 if server_segments is None: 176 logger.warning(f"Failed to query server for day {day}") 177 continue 178 179 # Build lookup 180 server_keys: set[str] = set() 181 for seg in server_segments: 182 server_keys.add(seg.get("key", "")) 183 if "original_key" in seg: 184 server_keys.add(seg["original_key"]) 185 186 any_needed_upload = False 187 188 for segment_dir in local_segments: 189 if not self._running or self._circuit_open: 190 break 191 192 segment_key = segment_dir.name 193 if segment_key in server_keys: 194 continue 195 196 any_needed_upload = True 197 success = await self._upload_segment(day, segment_dir) 198 199 if not success: 200 self._consecutive_failures += 1 201 threshold = self._circuit_threshold() 202 if self._consecutive_failures >= threshold: 203 self._circuit_open = True 204 logger.error( 205 f"Circuit breaker OPEN: {self._consecutive_failures} consecutive " 206 f"{self._last_error_type.value if self._last_error_type else 'unknown'} " 207 f"failures (threshold: {threshold})" 208 ) 209 break 210 else: 211 self._consecutive_failures = 0 212 self._last_error_type = None 213 214 # Mark past days as synced if nothing needed upload 215 if day != today and not any_needed_upload: 216 self._synced_days.add(day) 217 self._save_synced_days() 218 219 def _collect_segments(self, captures_dir: Path) -> dict[str, list[Path]]: 220 """Collect completed segments grouped by day.""" 221 result: dict[str, list[Path]] = {} 222 223 for day_dir in sorted(captures_dir.iterdir(), reverse=True): 224 if not day_dir.is_dir(): 225 continue 226 227 day = day_dir.name 228 229 for stream_dir in day_dir.iterdir(): 230 if not stream_dir.is_dir(): 231 continue 232 233 segments = [] 234 for seg_dir in sorted(stream_dir.iterdir(), reverse=True): 235 if not seg_dir.is_dir(): 236 continue 237 name = seg_dir.name 238 # Skip incomplete and failed 239 if name.endswith(".incomplete") or name.endswith(".failed"): 240 continue 241 segments.append(seg_dir) 242 243 if segments: 244 result.setdefault(day, []).extend(segments) 245 246 return result 247 248 async def _upload_segment(self, day: str, segment_dir: Path) -> bool: 249 """Upload a single segment with retry logic.""" 250 segment_key = segment_dir.name 251 files = [f for f in segment_dir.iterdir() if f.is_file()] 252 if not files: 253 return True # Nothing to upload 254 255 meta: dict[str, Any] = {"stream": self._config.stream} 256 257 result = await asyncio.to_thread( 258 self._client.upload_segment, day, segment_key, files, meta 259 ) 260 261 if result.success: 262 logger.info(f"Uploaded: {day}/{segment_key} ({len(files)} files)") 263 return True 264 265 # Track error type for circuit breaker 266 self._last_error_type = result.error_type 267 268 # Non-retryable errors 269 if self._client.is_revoked: 270 logger.error("Client revoked — disabling sync") 271 self._circuit_open = True 272 return False 273 274 logger.error(f"Upload failed: {day}/{segment_key}") 275 return False