linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""HTTP upload client for solstone ingest server.
5
6Extracted from solstone's observe/remote_client.py. Accepts Config
7as constructor parameter instead of reading config internally.
8
9Refinements over tmux baseline:
10- Respects configured sync_max_retries without hard cap
11- Error classification: auth (401/403) vs transient (5xx/network)
12"""
13
14from __future__ import annotations
15
16import json
17import logging
18import shutil
19import subprocess
20import time
21from enum import Enum
22from pathlib import Path
23from typing import Any, NamedTuple
24
25import requests
26
27from .config import Config
28
29logger = logging.getLogger(__name__)
30
31UPLOAD_TIMEOUT = 300
32EVENT_TIMEOUT = 30
33
34
35class ErrorType(Enum):
36 """Classification of upload errors for circuit breaker tuning."""
37 AUTH = "auth" # 401, 403 — open circuit immediately
38 CLIENT = "client" # 400 — non-retryable, don't count for circuit
39 TRANSIENT = "transient" # 5xx, network, timeout — allow more failures
40
41
42class UploadResult(NamedTuple):
43 success: bool
44 duplicate: bool = False
45 error_type: ErrorType | None = None
46
47
48class UploadClient:
49 """HTTP client for uploading observer segments to the ingest server."""
50
51 def __init__(self, config: Config):
52 self._url = config.server_url.rstrip("/") if config.server_url else ""
53 self._key = config.key
54 self._stream = config.stream
55 self._revoked = False
56 self._session = requests.Session()
57 self._retry_backoff = config.sync_retry_delays or [5, 30, 120, 300]
58 # Respect configured retry cap — no hard min(config, 3)
59 self._max_retries = config.sync_max_retries
60
61 @property
62 def is_revoked(self) -> bool:
63 return self._revoked
64
65 def _persist_key(self, config: Config, key: str) -> None:
66 """Save auto-registered key back to config."""
67 from .config import save_config
68
69 config.key = key
70 save_config(config)
71
72 def ensure_registered(self, config: Config) -> bool:
73 """Ensure the client has a valid key, auto-registering if needed.
74
75 Tries sol CLI first (no server needed), falls back to HTTP.
76 Returns True if a key is available.
77 """
78 if self._key:
79 return True
80
81 # Try sol CLI registration first
82 name = self._stream or "solstone-linux"
83 sol = shutil.which("sol")
84 if sol:
85 try:
86 result = subprocess.run(
87 [sol, "observer", "--json", "create", name],
88 capture_output=True, text=True, timeout=10,
89 )
90 if result.returncode == 0:
91 data = json.loads(result.stdout)
92 self._key = data["key"]
93 self._persist_key(config, self._key)
94 logger.info(f"CLI-registered as '{name}' (key: {self._key[:8]}...)")
95 return True
96 except (subprocess.TimeoutExpired, json.JSONDecodeError, KeyError, OSError) as e:
97 logger.debug(f"CLI registration failed: {e}")
98
99 if not self._url:
100 return False
101
102 url = f"{self._url}/app/observer/api/create"
103
104 retries = min(3, len(self._retry_backoff))
105 for attempt in range(retries):
106 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)]
107 try:
108 resp = self._session.post(
109 url, json={"name": name}, timeout=EVENT_TIMEOUT
110 )
111 if resp.status_code == 200:
112 data = resp.json()
113 self._key = data["key"]
114 self._persist_key(config, self._key)
115 logger.info(f"Auto-registered as '{name}' (key: {self._key[:8]}...)")
116 return True
117 elif resp.status_code == 403:
118 self._revoked = True
119 logger.error("Registration rejected (403)")
120 return False
121 else:
122 logger.warning(
123 f"Registration attempt {attempt + 1} failed: {resp.status_code}"
124 )
125 except requests.RequestException as e:
126 logger.warning(f"Registration attempt {attempt + 1} failed: {e}")
127 if attempt < retries - 1:
128 time.sleep(delay)
129
130 logger.error(f"Registration failed after {retries} attempts")
131 return False
132
133 @staticmethod
134 def classify_error(status_code: int | None, is_network_error: bool = False) -> ErrorType:
135 """Classify an error for circuit breaker and retry decisions."""
136 if is_network_error:
137 return ErrorType.TRANSIENT
138 if status_code is None:
139 return ErrorType.TRANSIENT
140 if status_code in (401, 403):
141 return ErrorType.AUTH
142 if status_code == 400:
143 return ErrorType.CLIENT
144 # 5xx and anything else
145 return ErrorType.TRANSIENT
146
147 def upload_segment(
148 self,
149 day: str,
150 segment: str,
151 files: list[Path],
152 meta: dict[str, Any] | None = None,
153 ) -> UploadResult:
154 """Upload a segment's files to the ingest server."""
155 if self._revoked or not self._key or not self._url:
156 return UploadResult(False, error_type=ErrorType.AUTH if self._revoked else None)
157
158 url = f"{self._url}/app/observer/ingest/{self._key}"
159
160 for attempt in range(self._max_retries):
161 file_handles = []
162 files_data = []
163 error_type = None
164 try:
165 for path in files:
166 if not path.exists():
167 logger.warning(f"File not found, skipping: {path}")
168 continue
169 fh = open(path, "rb")
170 file_handles.append(fh)
171 files_data.append(
172 ("files", (path.name, fh, "application/octet-stream"))
173 )
174
175 if not files_data:
176 return UploadResult(False)
177
178 data: dict[str, Any] = {"day": day, "segment": segment}
179 if meta:
180 data["meta"] = json.dumps(meta)
181
182 response = self._session.post(
183 url, data=data, files=files_data, timeout=UPLOAD_TIMEOUT
184 )
185
186 if response.status_code == 200:
187 resp_data = response.json()
188 is_duplicate = resp_data.get("status") == "duplicate"
189 return UploadResult(True, duplicate=is_duplicate)
190
191 error_type = self.classify_error(response.status_code)
192
193 if error_type == ErrorType.AUTH:
194 if response.status_code == 403:
195 self._revoked = True
196 logger.error(
197 f"Upload rejected ({response.status_code}): {response.text}"
198 )
199 return UploadResult(False, error_type=error_type)
200
201 if error_type == ErrorType.CLIENT:
202 logger.error(
203 f"Upload rejected ({response.status_code}): {response.text}"
204 )
205 return UploadResult(False, error_type=error_type)
206
207 logger.warning(
208 f"Upload attempt {attempt + 1} failed: "
209 f"{response.status_code} {response.text}"
210 )
211 except requests.RequestException as e:
212 error_type = ErrorType.TRANSIENT
213 logger.warning(f"Upload attempt {attempt + 1} failed: {e}")
214 finally:
215 for fh in file_handles:
216 try:
217 fh.close()
218 except Exception:
219 pass
220
221 if attempt < self._max_retries - 1:
222 delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)]
223 time.sleep(delay)
224
225 logger.error(f"Upload failed after {self._max_retries} attempts: {day}/{segment}")
226 return UploadResult(False, error_type=error_type)
227
228 def get_server_segments(self, day: str) -> list[dict] | None:
229 """Query server for segments on a given day.
230
231 Returns list of segment dicts, or None on failure.
232 """
233 if self._revoked or not self._key or not self._url:
234 return None
235
236 url = f"{self._url}/app/observer/ingest/{self._key}/segments/{day}"
237 params = {}
238 if self._stream:
239 params["stream"] = self._stream
240
241 try:
242 resp = self._session.get(url, params=params, timeout=EVENT_TIMEOUT)
243 if resp.status_code == 200:
244 return resp.json()
245 if resp.status_code in (401, 403):
246 if resp.status_code == 403:
247 self._revoked = True
248 logger.error(f"Segments query rejected ({resp.status_code})")
249 return None
250 logger.warning(f"Segments query failed: {resp.status_code}")
251 return None
252 except requests.RequestException as e:
253 logger.debug(f"Segments query failed: {e}")
254 return None
255
256 def relay_event(self, tract: str, event: str, **fields: Any) -> bool:
257 """Fire-and-forget event relay."""
258 if self._revoked or not self._key or not self._url:
259 return False
260
261 url = f"{self._url}/app/observer/ingest/{self._key}/event"
262 payload = {"tract": tract, "event": event, **fields}
263 try:
264 resp = self._session.post(url, json=payload, timeout=EVENT_TIMEOUT)
265 if resp.status_code == 200:
266 return True
267 if resp.status_code == 403:
268 self._revoked = True
269 return False
270 except requests.RequestException:
271 return False
272
273 def stop(self) -> None:
274 self._session.close()