linux observer
at main 274 lines 10 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""HTTP upload client for solstone ingest server. 5 6Extracted from solstone's observe/remote_client.py. Accepts Config 7as constructor parameter instead of reading config internally. 8 9Refinements over tmux baseline: 10- Respects configured sync_max_retries without hard cap 11- Error classification: auth (401/403) vs transient (5xx/network) 12""" 13 14from __future__ import annotations 15 16import json 17import logging 18import shutil 19import subprocess 20import time 21from enum import Enum 22from pathlib import Path 23from typing import Any, NamedTuple 24 25import requests 26 27from .config import Config 28 29logger = logging.getLogger(__name__) 30 31UPLOAD_TIMEOUT = 300 32EVENT_TIMEOUT = 30 33 34 35class ErrorType(Enum): 36 """Classification of upload errors for circuit breaker tuning.""" 37 AUTH = "auth" # 401, 403 — open circuit immediately 38 CLIENT = "client" # 400 — non-retryable, don't count for circuit 39 TRANSIENT = "transient" # 5xx, network, timeout — allow more failures 40 41 42class UploadResult(NamedTuple): 43 success: bool 44 duplicate: bool = False 45 error_type: ErrorType | None = None 46 47 48class UploadClient: 49 """HTTP client for uploading observer segments to the ingest server.""" 50 51 def __init__(self, config: Config): 52 self._url = config.server_url.rstrip("/") if config.server_url else "" 53 self._key = config.key 54 self._stream = config.stream 55 self._revoked = False 56 self._session = requests.Session() 57 self._retry_backoff = config.sync_retry_delays or [5, 30, 120, 300] 58 # Respect configured retry cap — no hard min(config, 3) 59 self._max_retries = config.sync_max_retries 60 61 @property 62 def is_revoked(self) -> bool: 63 return self._revoked 64 65 def _persist_key(self, config: Config, key: str) -> None: 66 """Save auto-registered key back to config.""" 67 from .config import save_config 68 69 config.key = key 70 save_config(config) 71 72 def ensure_registered(self, config: Config) -> bool: 73 """Ensure the client has a valid key, auto-registering if needed. 74 75 Tries sol CLI first (no server needed), falls back to HTTP. 76 Returns True if a key is available. 77 """ 78 if self._key: 79 return True 80 81 # Try sol CLI registration first 82 name = self._stream or "solstone-linux" 83 sol = shutil.which("sol") 84 if sol: 85 try: 86 result = subprocess.run( 87 [sol, "observer", "--json", "create", name], 88 capture_output=True, text=True, timeout=10, 89 ) 90 if result.returncode == 0: 91 data = json.loads(result.stdout) 92 self._key = data["key"] 93 self._persist_key(config, self._key) 94 logger.info(f"CLI-registered as '{name}' (key: {self._key[:8]}...)") 95 return True 96 except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, OSError) as e: 97 logger.debug(f"CLI registration failed: {e}") 98 99 if not self._url: 100 return False 101 102 url = f"{self._url}/app/observer/api/create" 103 104 retries = min(3, len(self._retry_backoff)) 105 for attempt in range(retries): 106 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)] 107 try: 108 resp = self._session.post( 109 url, json={"name": name}, timeout=EVENT_TIMEOUT 110 ) 111 if resp.status_code == 200: 112 data = resp.json() 113 self._key = data["key"] 114 self._persist_key(config, self._key) 115 logger.info(f"Auto-registered as '{name}' (key: {self._key[:8]}...)") 116 return True 117 elif resp.status_code == 403: 118 self._revoked = True 119 logger.error("Registration rejected (403)") 120 return False 121 else: 122 logger.warning( 123 f"Registration attempt {attempt + 1} failed: {resp.status_code}" 124 ) 125 except requests.RequestException as e: 126 logger.warning(f"Registration attempt {attempt + 1} failed: {e}") 127 if attempt < retries - 1: 128 time.sleep(delay) 129 130 logger.error(f"Registration failed after {retries} attempts") 131 return False 132 133 @staticmethod 134 def classify_error(status_code: int | None, is_network_error: bool = False) -> ErrorType: 135 """Classify an error for circuit breaker and retry decisions.""" 136 if is_network_error: 137 return ErrorType.TRANSIENT 138 if status_code is None: 139 return ErrorType.TRANSIENT 140 if status_code in (401, 403): 141 return ErrorType.AUTH 142 if status_code == 400: 143 return ErrorType.CLIENT 144 # 5xx and anything else 145 return ErrorType.TRANSIENT 146 147 def upload_segment( 148 self, 149 day: str, 150 segment: str, 151 files: list[Path], 152 meta: dict[str, Any] | None = None, 153 ) -> UploadResult: 154 """Upload a segment's files to the ingest server.""" 155 if self._revoked or not self._key or not self._url: 156 return UploadResult(False, error_type=ErrorType.AUTH if self._revoked else None) 157 158 url = f"{self._url}/app/observer/ingest/{self._key}" 159 160 for attempt in range(self._max_retries): 161 file_handles = [] 162 files_data = [] 163 error_type = None 164 try: 165 for path in files: 166 if not path.exists(): 167 logger.warning(f"File not found, skipping: {path}") 168 continue 169 fh = open(path, "rb") 170 file_handles.append(fh) 171 files_data.append( 172 ("files", (path.name, fh, "application/octet-stream")) 173 ) 174 175 if not files_data: 176 return UploadResult(False) 177 178 data: dict[str, Any] = {"day": day, "segment": segment} 179 if meta: 180 data["meta"] = json.dumps(meta) 181 182 response = self._session.post( 183 url, data=data, files=files_data, timeout=UPLOAD_TIMEOUT 184 ) 185 186 if response.status_code == 200: 187 resp_data = response.json() 188 is_duplicate = resp_data.get("status") == "duplicate" 189 return UploadResult(True, duplicate=is_duplicate) 190 191 error_type = self.classify_error(response.status_code) 192 193 if error_type == ErrorType.AUTH: 194 if response.status_code == 403: 195 self._revoked = True 196 logger.error( 197 f"Upload rejected ({response.status_code}): {response.text}" 198 ) 199 return UploadResult(False, error_type=error_type) 200 201 if error_type == ErrorType.CLIENT: 202 logger.error( 203 f"Upload rejected ({response.status_code}): {response.text}" 204 ) 205 return UploadResult(False, error_type=error_type) 206 207 logger.warning( 208 f"Upload attempt {attempt + 1} failed: " 209 f"{response.status_code} {response.text}" 210 ) 211 except requests.RequestException as e: 212 error_type = ErrorType.TRANSIENT 213 logger.warning(f"Upload attempt {attempt + 1} failed: {e}") 214 finally: 215 for fh in file_handles: 216 try: 217 fh.close() 218 except Exception: 219 pass 220 221 if attempt < self._max_retries - 1: 222 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)] 223 time.sleep(delay) 224 225 logger.error(f"Upload failed after {self._max_retries} attempts: {day}/{segment}") 226 return UploadResult(False, error_type=error_type) 227 228 def get_server_segments(self, day: str) -> list[dict] | None: 229 """Query server for segments on a given day. 230 231 Returns list of segment dicts, or None on failure. 232 """ 233 if self._revoked or not self._key or not self._url: 234 return None 235 236 url = f"{self._url}/app/observer/ingest/{self._key}/segments/{day}" 237 params = {} 238 if self._stream: 239 params["stream"] = self._stream 240 241 try: 242 resp = self._session.get(url, params=params, timeout=EVENT_TIMEOUT) 243 if resp.status_code == 200: 244 return resp.json() 245 if resp.status_code in (401, 403): 246 if resp.status_code == 403: 247 self._revoked = True 248 logger.error(f"Segments query rejected ({resp.status_code})") 249 return None 250 logger.warning(f"Segments query failed: {resp.status_code}") 251 return None 252 except requests.RequestException as e: 253 logger.debug(f"Segments query failed: {e}") 254 return None 255 256 def relay_event(self, tract: str, event: str, **fields: Any) -> bool: 257 """Fire-and-forget event relay.""" 258 if self._revoked or not self._key or not self._url: 259 return False 260 261 url = f"{self._url}/app/observer/ingest/{self._key}/event" 262 payload = {"tract": tract, "event": event, **fields} 263 try: 264 resp = self._session.post(url, json=payload, timeout=EVENT_TIMEOUT) 265 if resp.status_code == 200: 266 return True 267 if resp.status_code == 403: 268 self._revoked = True 269 return False 270 except requests.RequestException: 271 return False 272 273 def stop(self) -> None: 274 self._session.close()