linux observer

Initial solstone-linux package — standalone Linux desktop observer

Extracts the Linux desktop observer (screen + audio capture) from the
solstone monorepo into a standalone package. Follows the same pattern
as solstone-tmux (phase 5a) with key improvements:

- Local cache + async sync instead of inline upload at segment boundary
- Circuit breaker tuned by error type (auth=immediate, transient=5-10)
- Respects configured sync_max_retries (no hard min(config,3) cap)
- Recovery .metadata file with start timestamp for accurate duration
- Synced-days pruning at 90 days
- Session env recovery for CLI launch, PassEnvironment for systemd
- Screencast restore token at ~/.local/share/solstone-linux/config/

48 tests passing covering config, streams, monitor positions, recovery,
sync collection, error classification, circuit breaker thresholds, and
session readiness checks.

Co-Authored-By: Claude Opus 4.6 (1M context) <noreply@anthropic.com>

Jer Miller e56c8e13

+3534
+17
LICENSE
··· 1 + GNU AFFERO GENERAL PUBLIC LICENSE 2 + Version 3, 19 November 2007 3 + 4 + Copyright (c) 2026 sol pbc 5 + 6 + This program is free software: you can redistribute it and/or modify 7 + it under the terms of the GNU Affero General Public License as published 8 + by the Free Software Foundation, either version 3 of the License, or 9 + (at your option) any later version. 10 + 11 + This program is distributed in the hope that it will be useful, 12 + but WITHOUT ANY WARRANTY; without even the implied warranty of 13 + MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the 14 + GNU Affero General Public License for more details. 15 + 16 + You should have received a copy of the GNU Affero General Public License 17 + along with this program. If not, see <https://www.gnu.org/licenses/>.
+49
README.md
··· 1 + # solstone-linux 2 + 3 + Standalone Linux desktop observer for [solstone](https://solpbc.org). Captures screen and audio from a GNOME Wayland session, stores segments locally, and syncs to a solstone server. 4 + 5 + ## System Dependencies 6 + 7 + **Fedora:** 8 + ```bash 9 + dnf install python3-gobject gtk4 gstreamer1-plugins-base pipewire-gstreamer alsa-lib-devel pulseaudio-utils pipewire-pulseaudio 10 + ``` 11 + 12 + **Debian/Ubuntu:** 13 + ```bash 14 + apt install python3-gi gir1.2-gdk-4.0 gir1.2-gtk-4.0 gstreamer1.0-pipewire libasound2-dev pulseaudio-utils pipewire-pulse 15 + ``` 16 + 17 + ## Install 18 + 19 + ```bash 20 + pipx install --system-site-packages solstone-linux 21 + ``` 22 + 23 + `--system-site-packages` is required for PyGObject/GStreamer access. 24 + 25 + ## Setup 26 + 27 + ```bash 28 + solstone-linux setup 29 + ``` 30 + 31 + ## Run 32 + 33 + ```bash 34 + # Foreground 35 + solstone-linux run 36 + 37 + # As a systemd user service 38 + solstone-linux install-service 39 + ``` 40 + 41 + ## Status 42 + 43 + ```bash 44 + solstone-linux status 45 + ``` 46 + 47 + ## License 48 + 49 + AGPL-3.0-only — Copyright (c) 2026 sol pbc
+16
contrib/solstone-linux.service
··· 1 + [Unit] 2 + Description=Solstone Linux Desktop Observer 3 + After=graphical-session.target 4 + BindsTo=graphical-session.target 5 + 6 + [Service] 7 + Type=simple 8 + ExecStart=/usr/bin/solstone-linux run 9 + PassEnvironment=DISPLAY WAYLAND_DISPLAY DBUS_SESSION_BUS_ADDRESS XDG_RUNTIME_DIR 10 + Restart=on-failure 11 + RestartSec=10 12 + StartLimitIntervalSec=300 13 + StartLimitBurst=5 14 + 15 + [Install] 16 + WantedBy=graphical-session.target
+22
pyproject.toml
··· 1 + [project] 2 + name = "solstone-linux" 3 + version = "0.1.0" 4 + description = "Standalone Linux desktop observer for solstone" 5 + readme = "README.md" 6 + license = "AGPL-3.0-only" 7 + requires-python = ">=3.10" 8 + dependencies = [ 9 + "requests", 10 + "numpy", 11 + "soundfile", 12 + "soundcard", 13 + "dbus-next", 14 + "PyGObject", 15 + ] 16 + 17 + [project.scripts] 18 + solstone-linux = "solstone_linux.cli:main" 19 + 20 + [build-system] 21 + requires = ["hatchling"] 22 + build-backend = "hatchling.build"
+6
src/solstone_linux/__init__.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Standalone Linux desktop observer for solstone.""" 5 + 6 + __version__ = "0.1.0"
src/solstone_linux/__pycache__/__init__.cpython-313.pyc

This is a binary file and will not be displayed.

src/solstone_linux/__pycache__/config.cpython-313.pyc

This is a binary file and will not be displayed.

src/solstone_linux/__pycache__/monitor_positions.cpython-313.pyc

This is a binary file and will not be displayed.

src/solstone_linux/__pycache__/recovery.cpython-313.pyc

This is a binary file and will not be displayed.

src/solstone_linux/__pycache__/session_env.cpython-313.pyc

This is a binary file and will not be displayed.

src/solstone_linux/__pycache__/streams.cpython-313.pyc

This is a binary file and will not be displayed.

src/solstone_linux/__pycache__/sync.cpython-313.pyc

This is a binary file and will not be displayed.

src/solstone_linux/__pycache__/upload.cpython-313.pyc

This is a binary file and will not be displayed.

+129
src/solstone_linux/activity.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """GNOME-specific activity detection using Mutter and GTK DBus APIs. 5 + 6 + Extracted from solstone's observe/gnome/activity.py. 7 + 8 + Changes from monorepo version: 9 + - Replaces `from observe.utils import assign_monitor_positions` with local module 10 + """ 11 + 12 + import os 13 + 14 + import gi 15 + from dbus_next.aio import MessageBus 16 + 17 + gi.require_version("Gdk", "4.0") # noqa: E402 18 + gi.require_version("Gtk", "4.0") # noqa: E402 19 + from gi.repository import Gdk, Gtk # noqa: E402 20 + 21 + # DBus service constants 22 + IDLE_MONITOR_BUS = "org.gnome.Mutter.IdleMonitor" 23 + IDLE_MONITOR_PATH = "/org/gnome/Mutter/IdleMonitor/Core" 24 + IDLE_MONITOR_IFACE = "org.gnome.Mutter.IdleMonitor" 25 + 26 + SCREENSAVER_BUS = "org.gnome.ScreenSaver" 27 + SCREENSAVER_PATH = "/org/gnome/ScreenSaver" 28 + SCREENSAVER_IFACE = "org.gnome.ScreenSaver" 29 + 30 + DISPLAY_CONFIG_BUS = "org.gnome.Mutter.DisplayConfig" 31 + DISPLAY_CONFIG_PATH = "/org/gnome/Mutter/DisplayConfig" 32 + DISPLAY_CONFIG_IFACE = "org.gnome.Mutter.DisplayConfig" 33 + 34 + 35 + async def get_idle_time_ms(bus: MessageBus) -> int: 36 + """ 37 + Get the current idle time in milliseconds. 38 + 39 + Args: 40 + bus: Connected DBus session bus 41 + 42 + Returns: 43 + Idle time in milliseconds 44 + """ 45 + introspection = await bus.introspect(IDLE_MONITOR_BUS, IDLE_MONITOR_PATH) 46 + proxy_obj = bus.get_proxy_object(IDLE_MONITOR_BUS, IDLE_MONITOR_PATH, introspection) 47 + idle_monitor = proxy_obj.get_interface(IDLE_MONITOR_IFACE) 48 + idle_time = await idle_monitor.call_get_idletime() 49 + return idle_time 50 + 51 + 52 + async def is_screen_locked(bus: MessageBus) -> bool: 53 + """ 54 + Check if the screen is currently locked using GNOME ScreenSaver. 55 + 56 + Args: 57 + bus: Connected DBus session bus 58 + 59 + Returns: 60 + True if screen is locked, False otherwise 61 + """ 62 + try: 63 + intro = await bus.introspect(SCREENSAVER_BUS, SCREENSAVER_PATH) 64 + obj = bus.get_proxy_object(SCREENSAVER_BUS, SCREENSAVER_PATH, intro) 65 + iface = obj.get_interface(SCREENSAVER_IFACE) 66 + return bool(await iface.call_get_active()) 67 + except Exception: 68 + return False 69 + 70 + 71 + async def is_power_save_active(bus: MessageBus) -> bool: 72 + """ 73 + Check if display power save mode is active (screen blanked). 74 + 75 + Args: 76 + bus: Connected DBus session bus 77 + 78 + Returns: 79 + True if power save is active, False otherwise 80 + """ 81 + try: 82 + intro = await bus.introspect(DISPLAY_CONFIG_BUS, DISPLAY_CONFIG_PATH) 83 + obj = bus.get_proxy_object(DISPLAY_CONFIG_BUS, DISPLAY_CONFIG_PATH, intro) 84 + iface = obj.get_interface("org.freedesktop.DBus.Properties") 85 + mode_variant = await iface.call_get(DISPLAY_CONFIG_IFACE, "PowerSaveMode") 86 + mode = int(mode_variant.value) 87 + return mode != 0 88 + except Exception: 89 + return False 90 + 91 + 92 + def get_monitor_geometries() -> list[dict]: 93 + """ 94 + Get structured monitor information. 95 + 96 + Returns: 97 + List of dicts with format: 98 + [{"id": "connector-id", "box": [x1, y1, x2, y2], "position": "center|left|right|..."}, ...] 99 + where box contains [left, top, right, bottom] coordinates 100 + """ 101 + from .monitor_positions import assign_monitor_positions 102 + 103 + # Initialize GTK before using GDK functions 104 + Gtk.init() 105 + 106 + # Get the default display. If it is None, try opening one from the environment. 107 + display = Gdk.Display.get_default() 108 + if display is None: 109 + env_display = os.environ.get("WAYLAND_DISPLAY") or os.environ.get("DISPLAY") 110 + if env_display is not None: 111 + display = Gdk.Display.open(env_display) 112 + if display is None: 113 + raise RuntimeError("No display available") 114 + monitors = display.get_monitors() 115 + 116 + # Collect monitor geometries 117 + geometries = [] 118 + for monitor in monitors: 119 + geom = monitor.get_geometry() 120 + connector = monitor.get_connector() or f"monitor-{len(geometries)}" 121 + geometries.append( 122 + { 123 + "id": connector, 124 + "box": [geom.x, geom.y, geom.x + geom.width, geom.y + geom.height], 125 + } 126 + ) 127 + 128 + # Assign position labels using shared algorithm 129 + return assign_monitor_positions(geometries)
+79
src/solstone_linux/audio_detect.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Audio device detection via ultrasonic tone. 5 + 6 + Direct copy from solstone's observe/detect.py — no solstone imports. 7 + Plays an ultrasonic tone and records from all mics to identify 8 + microphone vs loopback devices. 9 + """ 10 + 11 + import logging 12 + import threading 13 + 14 + import numpy as np 15 + import soundcard as sc 16 + 17 + logger = logging.getLogger(__name__) 18 + 19 + 20 + def input_detect(duration=0.4, sample_rate=44100): 21 + t = np.linspace(0, duration, int(sample_rate * duration), endpoint=False) 22 + tone = 0.5 * np.sin(2 * np.pi * 18000 * t) # ultrasonic 23 + 24 + try: 25 + devices = sc.all_microphones(include_loopback=True) 26 + except Exception: 27 + logger.warning("Failed to enumerate audio devices") 28 + return None, None 29 + if not devices: 30 + logger.warning("No audio devices found") 31 + return None, None 32 + 33 + results = {} 34 + barrier = threading.Barrier(len(devices) + 1) 35 + 36 + def record_mic(mic, results): 37 + barrier.wait() 38 + try: 39 + audio = mic.record( 40 + samplerate=sample_rate, numframes=int(sample_rate * duration) 41 + ) 42 + results[mic.name] = audio 43 + except Exception: 44 + results[mic.name] = None 45 + 46 + def play_tone(): 47 + barrier.wait() 48 + try: 49 + sp = sc.default_speaker() 50 + sp.play(tone, samplerate=sample_rate) 51 + except Exception: 52 + logger.warning("No default speaker available for tone detection") 53 + 54 + threads = [] 55 + for mic in devices: 56 + thread = threading.Thread(target=record_mic, args=(mic, results)) 57 + thread.start() 58 + threads.append(thread) 59 + 60 + play_thread = threading.Thread(target=play_tone) 61 + play_thread.start() 62 + threads.append(play_thread) 63 + 64 + for thread in threads: 65 + thread.join() 66 + 67 + # Analyze the recordings with a simple amplitude threshold 68 + threshold = 0.001 69 + mic_detected = None 70 + loopback_detected = None 71 + for mic in devices: 72 + audio = results.get(mic.name) 73 + if audio is not None and np.max(np.abs(audio)) > threshold: 74 + # First match for each category 75 + if "microphone" in str(mic).lower() and mic_detected is None: 76 + mic_detected = mic 77 + if "loopback" in str(mic).lower() and loopback_detected is None: 78 + loopback_detected = mic 79 + return mic_detected, loopback_detected
+47
src/solstone_linux/audio_mute.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Linux audio mute detection using PulseAudio/PipeWire. 5 + 6 + Direct copy from solstone's observe/linux/audio.py — no solstone imports. 7 + """ 8 + 9 + import asyncio 10 + import logging 11 + 12 + logger = logging.getLogger(__name__) 13 + 14 + 15 + async def is_sink_muted() -> bool: 16 + """ 17 + Check if the default audio sink is muted using PulseAudio. 18 + 19 + Uses `pactl get-sink-mute @DEFAULT_SINK@` to query mute status. 20 + 21 + Returns: 22 + True if muted, False otherwise (including on error). 23 + """ 24 + try: 25 + proc = await asyncio.create_subprocess_exec( 26 + "pactl", 27 + "get-sink-mute", 28 + "@DEFAULT_SINK@", 29 + stdout=asyncio.subprocess.PIPE, 30 + stderr=asyncio.subprocess.PIPE, 31 + ) 32 + stdout, stderr = await proc.communicate() 33 + 34 + if proc.returncode != 0: 35 + stderr_text = stderr.decode().strip() if stderr else "" 36 + logger.warning(f"pactl failed (rc={proc.returncode}): {stderr_text}") 37 + return False 38 + 39 + output = stdout.decode().strip() 40 + return "Mute: yes" in output 41 + 42 + except FileNotFoundError: 43 + logger.warning("pactl not found, assuming unmuted") 44 + return False 45 + except Exception as e: 46 + logger.warning(f"Error checking sink mute status: {e}") 47 + return False
+186
src/solstone_linux/audio_recorder.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Audio recording for Linux desktop observer. 5 + 6 + Extracted from solstone's observe/hear.py — AudioRecorder class only. 7 + load_transcript() and format_audio() remain in solstone core (used by 15+ files). 8 + 9 + Changes from monorepo version: 10 + - Replaces `from observe.detect import input_detect` with local audio_detect 11 + - Replaces conditional `think.callosum` import with local logging 12 + - Defines SAMPLE_RATE locally (was from observe.utils) 13 + """ 14 + 15 + from __future__ import annotations 16 + 17 + import gc 18 + import io 19 + import logging 20 + import os 21 + import signal 22 + import threading 23 + import time 24 + from queue import Queue 25 + 26 + import numpy as np 27 + import soundfile as sf 28 + 29 + logger = logging.getLogger(__name__) 30 + 31 + # Standard sample rate for audio processing 32 + SAMPLE_RATE = 16000 33 + BLOCK_SIZE = 1024 34 + 35 + 36 + class AudioRecorder: 37 + """Records stereo audio from microphone and system audio.""" 38 + 39 + def __init__(self): 40 + # Queue holds stereo chunks (mic=left, sys=right) 41 + self.audio_queue = Queue() 42 + self._running = True 43 + self.recording_thread = None 44 + 45 + def detect(self): 46 + """Detect microphone and system audio devices.""" 47 + from .audio_detect import input_detect 48 + 49 + mic, loopback = input_detect() 50 + if mic is None or loopback is None: 51 + logger.error(f"Detection failed: mic {mic} sys {loopback}") 52 + return False 53 + logger.info(f"Detected microphone: {mic.name}") 54 + logger.info(f"Detected system audio: {loopback.name}") 55 + self.mic_device = mic 56 + self.sys_device = loopback 57 + return True 58 + 59 + def record_both(self): 60 + """Record from both mic and system audio in a loop.""" 61 + while self._running: 62 + try: 63 + with ( 64 + self.mic_device.recorder( 65 + samplerate=SAMPLE_RATE, channels=[-1], blocksize=BLOCK_SIZE 66 + ) as mic_rec, 67 + self.sys_device.recorder( 68 + samplerate=SAMPLE_RATE, channels=[-1], blocksize=BLOCK_SIZE 69 + ) as sys_rec, 70 + ): 71 + block_count = 0 72 + while self._running and block_count < 1000: 73 + try: 74 + mic_chunk = mic_rec.record(numframes=BLOCK_SIZE) 75 + sys_chunk = sys_rec.record(numframes=BLOCK_SIZE) 76 + 77 + # Basic validation 78 + if mic_chunk is None or mic_chunk.size == 0: 79 + logger.warning("Empty microphone buffer") 80 + continue 81 + if sys_chunk is None or sys_chunk.size == 0: 82 + logger.warning("Empty system buffer") 83 + continue 84 + 85 + try: 86 + stereo_chunk = np.column_stack((mic_chunk, sys_chunk)) 87 + self.audio_queue.put(stereo_chunk) 88 + block_count += 1 89 + except (TypeError, ValueError, AttributeError) as e: 90 + error_msg = f"Fatal audio format error: {e}" 91 + logger.error( 92 + f"{error_msg} - triggering clean shutdown\n" 93 + f" mic_chunk type={type(mic_chunk)}, " 94 + f"shape={getattr(mic_chunk, 'shape', 'N/A')}, " 95 + f"dtype={getattr(mic_chunk, 'dtype', 'N/A')}\n" 96 + f" sys_chunk type={type(sys_chunk)}, " 97 + f"shape={getattr(sys_chunk, 'shape', 'N/A')}, " 98 + f"dtype={getattr(sys_chunk, 'dtype', 'N/A')}" 99 + ) 100 + # Stop recording thread and trigger shutdown 101 + self._running = False 102 + os.kill(os.getpid(), signal.SIGTERM) 103 + return 104 + except Exception as e: 105 + logger.error(f"Error recording audio: {e}") 106 + if not self._running: 107 + break 108 + time.sleep(0.5) 109 + del mic_rec, sys_rec 110 + gc.collect() 111 + except Exception as e: 112 + logger.error(f"Error setting up recorders: {e}") 113 + if self._running: 114 + time.sleep(1) 115 + 116 + def get_buffers(self) -> np.ndarray: 117 + """Return concatenated stereo audio data from the queue.""" 118 + stereo_buffer = np.array([], dtype=np.float32).reshape(0, 2) 119 + 120 + while not self.audio_queue.empty(): 121 + stereo_chunk = self.audio_queue.get() 122 + 123 + if stereo_chunk is None or stereo_chunk.size == 0: 124 + logger.warning("Queue contained empty chunk") 125 + continue 126 + 127 + # Clean the data 128 + stereo_chunk = np.nan_to_num( 129 + stereo_chunk, nan=0.0, posinf=1e10, neginf=-1e10 130 + ) 131 + stereo_buffer = np.vstack((stereo_buffer, stereo_chunk)) 132 + 133 + if stereo_buffer.size == 0: 134 + logger.warning("No valid audio data retrieved from queue") 135 + 136 + return stereo_buffer 137 + 138 + def create_flac_bytes(self, stereo_data: np.ndarray) -> bytes: 139 + """Create FLAC bytes from stereo audio data.""" 140 + if stereo_data is None or stereo_data.size == 0: 141 + logger.warning("Audio data is empty. Returning empty bytes.") 142 + return b"" 143 + 144 + audio_data = (np.clip(stereo_data, -1.0, 1.0) * 32767).astype(np.int16) 145 + 146 + buf = io.BytesIO() 147 + try: 148 + sf.write(buf, audio_data, SAMPLE_RATE, format="FLAC") 149 + except Exception as e: 150 + logger.error( 151 + f"Error creating FLAC: {e}. Audio data shape: {audio_data.shape}, dtype: {audio_data.dtype}" 152 + ) 153 + return b"" 154 + 155 + return buf.getvalue() 156 + 157 + def create_mono_flac_bytes(self, mono_data: np.ndarray) -> bytes: 158 + """Create FLAC bytes from mono audio data.""" 159 + if mono_data is None or mono_data.size == 0: 160 + logger.warning("Mono audio data is empty. Returning empty bytes.") 161 + return b"" 162 + 163 + audio_data = (np.clip(mono_data, -1.0, 1.0) * 32767).astype(np.int16) 164 + 165 + buf = io.BytesIO() 166 + try: 167 + sf.write(buf, audio_data, SAMPLE_RATE, format="FLAC") 168 + except Exception as e: 169 + logger.error( 170 + f"Error creating mono FLAC: {e}. Audio shape: {audio_data.shape}" 171 + ) 172 + return b"" 173 + 174 + return buf.getvalue() 175 + 176 + def start_recording(self): 177 + """Start the recording thread.""" 178 + self._running = True 179 + self.recording_thread = threading.Thread(target=self.record_both, daemon=True) 180 + self.recording_thread.start() 181 + 182 + def stop_recording(self): 183 + """Stop the recording thread.""" 184 + self._running = False 185 + if self.recording_thread: 186 + self.recording_thread.join(timeout=2.0)
+285
src/solstone_linux/cli.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """CLI entry point for solstone-linux. 5 + 6 + Subcommands: 7 + run Start capture loop + sync service (default) 8 + setup Interactive configuration 9 + install-service Write systemd user unit, enable, start 10 + status Show capture and sync state 11 + """ 12 + 13 + from __future__ import annotations 14 + 15 + import argparse 16 + import asyncio 17 + import json 18 + import logging 19 + import shutil 20 + import socket 21 + import subprocess 22 + import sys 23 + from pathlib import Path 24 + 25 + from .config import Config, load_config, save_config 26 + from .streams import stream_name 27 + 28 + 29 + def _setup_logging(verbose: bool = False) -> None: 30 + level = logging.DEBUG if verbose else logging.INFO 31 + logging.basicConfig( 32 + level=level, 33 + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", 34 + datefmt="%H:%M:%S", 35 + ) 36 + 37 + 38 + def cmd_run(args: argparse.Namespace) -> int: 39 + """Start the capture loop + sync service.""" 40 + from .observer import async_run 41 + from .recovery import recover_incomplete_segments 42 + 43 + config = load_config() 44 + config.ensure_dirs() 45 + 46 + if not config.stream: 47 + try: 48 + config.stream = stream_name(host=socket.gethostname()) 49 + except ValueError as e: 50 + print(f"Error: {e}", file=sys.stderr) 51 + return 1 52 + 53 + if args.interval: 54 + config.segment_interval = args.interval 55 + 56 + # Crash recovery before starting 57 + recovered = recover_incomplete_segments(config.captures_dir) 58 + if recovered: 59 + print(f"Recovered {recovered} incomplete segment(s)") 60 + 61 + try: 62 + return asyncio.run(async_run(config)) 63 + except KeyboardInterrupt: 64 + return 0 65 + 66 + 67 + def cmd_setup(args: argparse.Namespace) -> int: 68 + """Interactive setup — configure server URL and register.""" 69 + from .upload import UploadClient 70 + 71 + config = load_config() 72 + 73 + # Prompt for server URL 74 + default_url = config.server_url or "" 75 + url = input(f"Solstone server URL [{default_url}]: ").strip() 76 + if url: 77 + config.server_url = url 78 + elif not config.server_url: 79 + print("Error: server URL is required", file=sys.stderr) 80 + return 1 81 + 82 + # Derive stream name 83 + if not config.stream: 84 + try: 85 + config.stream = stream_name(host=socket.gethostname()) 86 + except ValueError as e: 87 + print(f"Error deriving stream name: {e}", file=sys.stderr) 88 + return 1 89 + print(f"Stream: {config.stream}") 90 + 91 + # Save config before registration (so URL is persisted) 92 + config.ensure_dirs() 93 + save_config(config) 94 + 95 + # Auto-register 96 + if not config.key: 97 + print("Registering with server...") 98 + client = UploadClient(config) 99 + if client.ensure_registered(config): 100 + config = load_config() 101 + print(f"Registered (key: {config.key[:8]}...)") 102 + else: 103 + print("Warning: registration failed. Run setup again when server is available.") 104 + else: 105 + print(f"Already registered (key: {config.key[:8]}...)") 106 + 107 + print(f"\nConfig saved to {config.config_path}") 108 + print(f"Captures will go to {config.captures_dir}") 109 + print(f"\nRun 'solstone-linux run' to start, or 'solstone-linux install-service' for systemd.") 110 + return 0 111 + 112 + 113 + def cmd_install_service(args: argparse.Namespace) -> int: 114 + """Write systemd user unit file, enable, and start the service.""" 115 + binary = shutil.which("solstone-linux") 116 + if not binary: 117 + print("Error: solstone-linux not found on PATH", file=sys.stderr) 118 + print("Install with: pipx install --system-site-packages solstone-linux", file=sys.stderr) 119 + return 1 120 + 121 + unit_dir = Path.home() / ".config" / "systemd" / "user" 122 + unit_dir.mkdir(parents=True, exist_ok=True) 123 + unit_path = unit_dir / "solstone-linux.service" 124 + 125 + unit_content = f"""\ 126 + [Unit] 127 + Description=Solstone Linux Desktop Observer 128 + After=graphical-session.target 129 + BindsTo=graphical-session.target 130 + 131 + [Service] 132 + Type=simple 133 + ExecStart={binary} run 134 + PassEnvironment=DISPLAY WAYLAND_DISPLAY DBUS_SESSION_BUS_ADDRESS XDG_RUNTIME_DIR 135 + Restart=on-failure 136 + RestartSec=10 137 + StartLimitIntervalSec=300 138 + StartLimitBurst=5 139 + 140 + [Install] 141 + WantedBy=graphical-session.target 142 + """ 143 + 144 + unit_path.write_text(unit_content) 145 + print(f"Wrote {unit_path}") 146 + 147 + # Reload, enable, start 148 + try: 149 + subprocess.run(["systemctl", "--user", "daemon-reload"], check=True) 150 + subprocess.run( 151 + ["systemctl", "--user", "enable", "--now", "solstone-linux.service"], 152 + check=True, 153 + ) 154 + print("Service enabled and started.") 155 + subprocess.run( 156 + ["systemctl", "--user", "status", "solstone-linux.service"], 157 + check=False, 158 + ) 159 + except FileNotFoundError: 160 + print("Warning: systemctl not found. Enable the service manually.") 161 + except subprocess.CalledProcessError as e: 162 + print(f"Warning: systemctl command failed: {e}") 163 + 164 + return 0 165 + 166 + 167 + def cmd_status(args: argparse.Namespace) -> int: 168 + """Show capture and sync state.""" 169 + config = load_config() 170 + 171 + print(f"Config: {config.config_path}") 172 + print(f"Server: {config.server_url or '(not configured)'}") 173 + print(f"Key: {config.key[:8] + '...' if config.key else '(not registered)'}") 174 + print(f"Stream: {config.stream or '(not set)'}") 175 + print() 176 + 177 + # Cache size 178 + captures_dir = config.captures_dir 179 + if captures_dir.exists(): 180 + total_size = 0 181 + segment_count = 0 182 + day_count = 0 183 + incomplete_count = 0 184 + 185 + for day_dir in sorted(captures_dir.iterdir()): 186 + if not day_dir.is_dir(): 187 + continue 188 + day_count += 1 189 + for stream_dir in day_dir.iterdir(): 190 + if not stream_dir.is_dir(): 191 + continue 192 + for seg_dir in stream_dir.iterdir(): 193 + if not seg_dir.is_dir(): 194 + continue 195 + if seg_dir.name.endswith(".incomplete"): 196 + incomplete_count += 1 197 + continue 198 + if seg_dir.name.endswith(".failed"): 199 + continue 200 + segment_count += 1 201 + for f in seg_dir.iterdir(): 202 + if f.is_file(): 203 + total_size += f.stat().st_size 204 + 205 + size_mb = total_size / (1024 * 1024) 206 + print(f"Cache: {captures_dir}") 207 + print(f" {segment_count} segments across {day_count} day(s), {size_mb:.1f} MB") 208 + if incomplete_count: 209 + print(f" {incomplete_count} incomplete segment(s)") 210 + else: 211 + print(f"Cache: {captures_dir} (not created yet)") 212 + 213 + # Synced days 214 + synced_path = config.state_dir / "synced_days.json" 215 + if synced_path.exists(): 216 + try: 217 + with open(synced_path) as f: 218 + synced = json.load(f) 219 + print(f"Synced: {len(synced)} day(s) fully synced") 220 + except (json.JSONDecodeError, OSError): 221 + pass 222 + 223 + # Systemd status 224 + try: 225 + result = subprocess.run( 226 + ["systemctl", "--user", "is-active", "solstone-linux.service"], 227 + capture_output=True, 228 + text=True, 229 + ) 230 + state = result.stdout.strip() 231 + print(f"\nService: {state}") 232 + except FileNotFoundError: 233 + pass 234 + 235 + return 0 236 + 237 + 238 + def main() -> None: 239 + """CLI entry point.""" 240 + parser = argparse.ArgumentParser( 241 + prog="solstone-linux", 242 + description="Standalone Linux desktop observer for solstone", 243 + ) 244 + parser.add_argument( 245 + "-v", "--verbose", action="store_true", help="Enable debug logging" 246 + ) 247 + subparsers = parser.add_subparsers(dest="command") 248 + 249 + # run 250 + run_parser = subparsers.add_parser("run", help="Start capture + sync") 251 + run_parser.add_argument( 252 + "--interval", 253 + type=int, 254 + default=None, 255 + help="Segment duration in seconds (default: 300)", 256 + ) 257 + 258 + # setup 259 + subparsers.add_parser("setup", help="Interactive configuration") 260 + 261 + # install-service 262 + subparsers.add_parser("install-service", help="Install systemd user service") 263 + 264 + # status 265 + subparsers.add_parser("status", help="Show capture and sync state") 266 + 267 + args = parser.parse_args() 268 + _setup_logging(args.verbose) 269 + 270 + # Default to run if no subcommand 271 + command = args.command or "run" 272 + 273 + commands = { 274 + "run": cmd_run, 275 + "setup": cmd_setup, 276 + "install-service": cmd_install_service, 277 + "status": cmd_status, 278 + } 279 + 280 + handler = commands.get(command) 281 + if handler: 282 + sys.exit(handler(args)) 283 + else: 284 + parser.print_help() 285 + sys.exit(1)
+119
src/solstone_linux/config.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Configuration loading and persistence for solstone-linux. 5 + 6 + Config lives at ~/.local/share/solstone-linux/config/config.json. 7 + Captures go to ~/.local/share/solstone-linux/captures/. 8 + Screencast restore token at ~/.local/share/solstone-linux/config/restore_token. 9 + """ 10 + 11 + from __future__ import annotations 12 + 13 + import json 14 + import logging 15 + import os 16 + import stat 17 + from dataclasses import dataclass, field 18 + from pathlib import Path 19 + 20 + logger = logging.getLogger(__name__) 21 + 22 + DEFAULT_BASE_DIR = Path.home() / ".local" / "share" / "solstone-linux" 23 + DEFAULT_SEGMENT_INTERVAL = 300 24 + DEFAULT_SYNC_RETRY_DELAYS = [5, 30, 120, 300] 25 + DEFAULT_SYNC_MAX_RETRIES = 10 26 + 27 + 28 + @dataclass 29 + class Config: 30 + """Configuration for the Linux desktop observer.""" 31 + 32 + server_url: str = "" 33 + key: str = "" 34 + stream: str = "" 35 + segment_interval: int = DEFAULT_SEGMENT_INTERVAL 36 + sync_retry_delays: list[int] = field(default_factory=lambda: list(DEFAULT_SYNC_RETRY_DELAYS)) 37 + sync_max_retries: int = DEFAULT_SYNC_MAX_RETRIES 38 + base_dir: Path = DEFAULT_BASE_DIR 39 + 40 + @property 41 + def captures_dir(self) -> Path: 42 + return self.base_dir / "captures" 43 + 44 + @property 45 + def config_dir(self) -> Path: 46 + return self.base_dir / "config" 47 + 48 + @property 49 + def state_dir(self) -> Path: 50 + return self.base_dir / "state" 51 + 52 + @property 53 + def config_path(self) -> Path: 54 + return self.config_dir / "config.json" 55 + 56 + @property 57 + def restore_token_path(self) -> Path: 58 + return self.config_dir / "restore_token" 59 + 60 + def ensure_dirs(self) -> None: 61 + """Create all required directories.""" 62 + self.captures_dir.mkdir(parents=True, exist_ok=True) 63 + self.config_dir.mkdir(parents=True, exist_ok=True) 64 + self.state_dir.mkdir(parents=True, exist_ok=True) 65 + 66 + 67 + def load_config(base_dir: Path | None = None) -> Config: 68 + """Load config from disk, returning defaults if not found.""" 69 + config = Config() 70 + if base_dir: 71 + config.base_dir = base_dir 72 + 73 + config_path = config.config_path 74 + if not config_path.exists(): 75 + return config 76 + 77 + try: 78 + with open(config_path, encoding="utf-8") as f: 79 + data = json.load(f) 80 + except (json.JSONDecodeError, OSError) as e: 81 + logger.warning(f"Failed to load config from {config_path}: {e}") 82 + return config 83 + 84 + config.server_url = data.get("server_url", "") 85 + config.key = data.get("key", "") 86 + config.stream = data.get("stream", "") 87 + config.segment_interval = data.get("segment_interval", DEFAULT_SEGMENT_INTERVAL) 88 + if "sync_retry_delays" in data: 89 + config.sync_retry_delays = data["sync_retry_delays"] 90 + if "sync_max_retries" in data: 91 + config.sync_max_retries = data["sync_max_retries"] 92 + 93 + return config 94 + 95 + 96 + def save_config(config: Config) -> None: 97 + """Save config to disk with user-only permissions.""" 98 + config.ensure_dirs() 99 + 100 + data = { 101 + "server_url": config.server_url, 102 + "key": config.key, 103 + "stream": config.stream, 104 + "segment_interval": config.segment_interval, 105 + "sync_retry_delays": config.sync_retry_delays, 106 + "sync_max_retries": config.sync_max_retries, 107 + } 108 + 109 + config_path = config.config_path 110 + tmp_path = config_path.with_suffix(f".{os.getpid()}.tmp") 111 + 112 + with open(tmp_path, "w", encoding="utf-8") as f: 113 + json.dump(data, f, indent=2) 114 + f.write("\n") 115 + 116 + # Set user-only read/write before moving into place 117 + os.chmod(tmp_path, stat.S_IRUSR | stat.S_IWUSR) 118 + os.rename(str(tmp_path), str(config_path)) 119 + logger.info(f"Config saved to {config_path}")
+110
src/solstone_linux/monitor_positions.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Monitor position assignment based on geometry. 5 + 6 + Extracted from solstone's observe/utils.py — the assign_monitor_positions() 7 + function only. Also remains in solstone core (used by server-side naming). 8 + """ 9 + 10 + from __future__ import annotations 11 + 12 + 13 + def assign_monitor_positions(monitors: list[dict]) -> list[dict]: 14 + """ 15 + Assign position labels to monitors based on relative positions. 16 + 17 + Uses pairwise comparison to determine positions. Vertical labels (top/bottom) 18 + are only assigned when monitors actually overlap horizontally, avoiding 19 + phantom relationships from offset monitors. 20 + 21 + Parameters 22 + ---------- 23 + monitors : list[dict] 24 + List of monitor dicts, each with keys: 25 + - id: Monitor identifier (e.g., "DP-3", "HDMI-1") 26 + - box: [x1, y1, x2, y2] coordinates 27 + 28 + Returns 29 + ------- 30 + list[dict] 31 + Same monitors with "position" key added to each: 32 + - "center": No monitors on both sides 33 + - "left"/"right": Horizontal position 34 + - "top"/"bottom": Vertical position (only with horizontal overlap) 35 + - "left-top", "right-bottom", etc.: Corner positions 36 + """ 37 + if not monitors: 38 + return [] 39 + 40 + if len(monitors) == 1: 41 + monitors[0]["position"] = "center" 42 + return monitors 43 + 44 + # Tolerance for center classification 45 + epsilon = 1 46 + 47 + for m in monitors: 48 + x1, y1, x2, y2 = m["box"] 49 + center_x = (x1 + x2) / 2 50 + center_y = (y1 + y2) / 2 51 + 52 + has_left = False 53 + has_right = False 54 + has_above = False 55 + has_below = False 56 + 57 + for other in monitors: 58 + if other is m: 59 + continue 60 + 61 + ox1, oy1, ox2, oy2 = other["box"] 62 + other_center_x = (ox1 + ox2) / 2 63 + other_center_y = (oy1 + oy2) / 2 64 + 65 + # Horizontal relationship (always check) 66 + if other_center_x < center_x - epsilon: 67 + has_left = True 68 + elif other_center_x > center_x + epsilon: 69 + has_right = True 70 + 71 + # Vertical relationship only if horizontal overlap exists 72 + # Overlap means ranges intersect (not just touch) 73 + h_overlap = (x1 < ox2) and (x2 > ox1) 74 + if h_overlap: 75 + if other_center_y < center_y - epsilon: 76 + has_above = True 77 + elif other_center_y > center_y + epsilon: 78 + has_below = True 79 + 80 + # Determine horizontal label 81 + if has_left and has_right: 82 + h_pos = "center" 83 + elif has_left: 84 + h_pos = "right" 85 + elif has_right: 86 + h_pos = "left" 87 + else: 88 + h_pos = "center" 89 + 90 + # Determine vertical label (only if monitors above/below with overlap) 91 + if has_above and has_below: 92 + v_pos = "middle" 93 + elif has_above: 94 + v_pos = "bottom" 95 + elif has_below: 96 + v_pos = "top" 97 + else: 98 + v_pos = None 99 + 100 + # Combine positions 101 + if v_pos is None: 102 + position = h_pos 103 + elif h_pos == "center": 104 + position = v_pos 105 + else: 106 + position = f"{h_pos}-{v_pos}" 107 + 108 + m["position"] = position 109 + 110 + return monitors
+593
src/solstone_linux/observer.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """ 5 + Standalone Linux desktop observer — screen + audio capture. 6 + 7 + Continuously captures audio and manages screencast recording based on activity. 8 + Creates 5-minute segments in a local cache directory. The sync service handles 9 + all uploads — the observer only writes locally. 10 + 11 + Key architectural change from monorepo version: 12 + - Capture writes completed segments to local cache only 13 + - No ObserverClient usage in boundary handling — no network calls in capture loop 14 + - Sync service picks up completed segments and uploads asynchronously 15 + 16 + State machine: 17 + SCREENCAST: Screen is active, recording video 18 + IDLE: Screen is inactive 19 + """ 20 + 21 + import asyncio 22 + import datetime 23 + import logging 24 + import os 25 + import platform 26 + import signal 27 + import socket 28 + import time 29 + from pathlib import Path 30 + 31 + import numpy as np 32 + from dbus_next.aio import MessageBus 33 + from dbus_next.constants import BusType 34 + 35 + from .activity import get_idle_time_ms, is_power_save_active, is_screen_locked 36 + from .audio_mute import is_sink_muted 37 + from .audio_recorder import AudioRecorder 38 + from .config import Config 39 + from .recovery import write_segment_metadata 40 + from .screencast import Screencaster, StreamInfo 41 + from .streams import stream_name 42 + from .sync import SyncService 43 + from .upload import UploadClient 44 + 45 + logger = logging.getLogger(__name__) 46 + 47 + # Host identification 48 + HOST = socket.gethostname() 49 + PLATFORM = platform.system().lower() 50 + 51 + # Constants 52 + IDLE_THRESHOLD_MS = 5 * 60 * 1000 # 5 minutes 53 + RMS_THRESHOLD = 0.01 54 + MIN_HITS_FOR_SAVE = 3 55 + CHUNK_DURATION = 5 # seconds 56 + 57 + # Capture modes 58 + MODE_IDLE = "idle" 59 + MODE_SCREENCAST = "screencast" 60 + 61 + # Audio detection retry 62 + DETECT_RETRIES = 3 63 + DETECT_RETRY_DELAY = 5 # seconds 64 + 65 + 66 + def _get_timestamp_parts(timestamp: float | None = None) -> tuple[str, str]: 67 + """Get date and time parts from timestamp.""" 68 + if timestamp is None: 69 + timestamp = time.time() 70 + dt = datetime.datetime.fromtimestamp(timestamp) 71 + return dt.strftime("%Y%m%d"), dt.strftime("%H%M%S") 72 + 73 + 74 + class Observer: 75 + """Unified audio and screencast observer with local cache + sync.""" 76 + 77 + def __init__(self, config: Config): 78 + self.config = config 79 + self.interval = config.segment_interval 80 + self.audio_recorder = AudioRecorder() 81 + self.screencaster = Screencaster(config.restore_token_path) 82 + self.bus: MessageBus | None = None 83 + self.running = True 84 + self.stream = config.stream 85 + 86 + self._client: UploadClient | None = None 87 + self._sync: SyncService | None = None 88 + 89 + # State tracking 90 + self.start_at = time.time() 91 + self.start_at_mono = time.monotonic() 92 + self.threshold_hits = 0 93 + self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2) 94 + 95 + # Mode tracking 96 + self.current_mode = MODE_IDLE 97 + 98 + # Segment directory (HHMMSS.incomplete/) 99 + self.segment_dir: Path | None = None 100 + 101 + # Multi-file screencast tracking 102 + self.current_streams: list[StreamInfo] = [] 103 + 104 + # Activity status cache (updated each loop) 105 + self.cached_is_active = False 106 + self.cached_idle_time_ms = 0 107 + self.cached_screen_locked = False 108 + self.cached_is_muted = False 109 + self.cached_power_save = False 110 + 111 + # Mute state at segment start (determines save format) 112 + self.segment_is_muted = False 113 + 114 + async def setup(self) -> bool: 115 + """Initialize audio devices, DBus connection, and sync service.""" 116 + # Detect audio devices with retry (devices may still be initializing) 117 + detected = False 118 + for attempt in range(DETECT_RETRIES): 119 + if self.audio_recorder.detect(): 120 + detected = True 121 + break 122 + if attempt < DETECT_RETRIES - 1: 123 + logger.info( 124 + "Audio detection attempt %d/%d failed, retrying in %ds", 125 + attempt + 1, 126 + DETECT_RETRIES, 127 + DETECT_RETRY_DELAY, 128 + ) 129 + await asyncio.sleep(DETECT_RETRY_DELAY) 130 + if not detected: 131 + logger.error("Failed to detect audio devices") 132 + return False 133 + 134 + self.audio_recorder.start_recording() 135 + logger.info("Audio recording started") 136 + 137 + # Connect to DBus for idle/lock detection 138 + self.bus = await MessageBus(bus_type=BusType.SESSION).connect() 139 + logger.info("DBus connection established") 140 + 141 + # Verify portal is available (exit if not) 142 + if not await self.screencaster.connect(): 143 + logger.error("Screencast portal not available") 144 + return False 145 + logger.info("Screencast portal connected") 146 + 147 + # Initialize upload client and sync service 148 + self._client = UploadClient(self.config) 149 + if self.config.server_url: 150 + self._client.ensure_registered(self.config) 151 + self._sync = SyncService(self.config, self._client) 152 + logger.info("Sync service initialized") 153 + 154 + return True 155 + 156 + async def check_activity_status(self) -> str: 157 + """Check system activity status and determine capture mode.""" 158 + idle_time = await get_idle_time_ms(self.bus) 159 + screen_locked = await is_screen_locked(self.bus) 160 + power_save = await is_power_save_active(self.bus) 161 + sink_muted = await is_sink_muted() 162 + 163 + # Cache values for status events 164 + self.cached_idle_time_ms = idle_time 165 + self.cached_screen_locked = screen_locked 166 + self.cached_is_muted = sink_muted 167 + self.cached_power_save = power_save 168 + 169 + # Determine screen activity 170 + screen_idle = (idle_time > IDLE_THRESHOLD_MS) or screen_locked or power_save 171 + screen_active = not screen_idle 172 + 173 + # Determine mode 174 + if screen_active: 175 + mode = MODE_SCREENCAST 176 + else: 177 + mode = MODE_IDLE 178 + 179 + # Cache legacy is_active for audio threshold logic 180 + has_audio_activity = self.threshold_hits >= MIN_HITS_FOR_SAVE 181 + self.cached_is_active = screen_active or has_audio_activity 182 + 183 + return mode 184 + 185 + def compute_rms(self, audio_buffer: np.ndarray) -> float: 186 + """Compute per-channel RMS and return maximum (stereo: mic=left, sys=right).""" 187 + if audio_buffer.size == 0: 188 + return 0.0 189 + rms_left = float(np.sqrt(np.mean(audio_buffer[:, 0] ** 2))) 190 + rms_right = float(np.sqrt(np.mean(audio_buffer[:, 1] ** 2))) 191 + return max(rms_left, rms_right) 192 + 193 + def _save_audio_segment(self, segment_dir: Path, is_muted: bool) -> list[str]: 194 + """Save accumulated audio buffer to segment directory.""" 195 + if self.accumulated_audio_buffer.size == 0: 196 + logger.warning("No audio buffer to save") 197 + return [] 198 + 199 + if is_muted: 200 + # Split mode: save mic and sys as separate mono files 201 + mic_data = self.accumulated_audio_buffer[:, 0] 202 + sys_data = self.accumulated_audio_buffer[:, 1] 203 + 204 + mic_bytes = self.audio_recorder.create_mono_flac_bytes(mic_data) 205 + sys_bytes = self.audio_recorder.create_mono_flac_bytes(sys_data) 206 + 207 + (segment_dir / "mic_audio.flac").write_bytes(mic_bytes) 208 + (segment_dir / "sys_audio.flac").write_bytes(sys_bytes) 209 + 210 + logger.info(f"Saved split audio (muted): {segment_dir}") 211 + return ["mic_audio.flac", "sys_audio.flac"] 212 + else: 213 + # Normal mode: save combined stereo file 214 + flac_bytes = self.audio_recorder.create_flac_bytes( 215 + self.accumulated_audio_buffer 216 + ) 217 + (segment_dir / "audio.flac").write_bytes(flac_bytes) 218 + 219 + logger.info(f"Saved audio to {segment_dir}/audio.flac") 220 + return ["audio.flac"] 221 + 222 + def _start_segment(self) -> Path: 223 + """Start a new segment with .incomplete directory.""" 224 + self.start_at = time.time() 225 + self.start_at_mono = time.monotonic() 226 + 227 + date_part, time_part = _get_timestamp_parts(self.start_at) 228 + captures_dir = self.config.captures_dir 229 + 230 + # Create YYYYMMDD/stream/HHMMSS.incomplete/ 231 + segment_dir = captures_dir / date_part / self.stream / f"{time_part}.incomplete" 232 + segment_dir.mkdir(parents=True, exist_ok=True) 233 + self.segment_dir = segment_dir 234 + 235 + # Write metadata for recovery 236 + write_segment_metadata(segment_dir, self.start_at) 237 + 238 + return segment_dir 239 + 240 + def _finalize_segment(self) -> str | None: 241 + """Rename .incomplete to HHMMSS_DDD/ and return segment key.""" 242 + if not self.segment_dir or not self.segment_dir.exists(): 243 + return None 244 + 245 + # Remove .metadata before finalizing 246 + meta_path = self.segment_dir / ".metadata" 247 + if meta_path.exists(): 248 + try: 249 + meta_path.unlink() 250 + except OSError: 251 + pass 252 + 253 + # Check if there are any actual files 254 + contents = [f for f in self.segment_dir.iterdir() if f.is_file()] 255 + if not contents: 256 + # Empty segment, remove it 257 + try: 258 + os.rmdir(str(self.segment_dir)) 259 + except OSError: 260 + pass 261 + return None 262 + 263 + _, time_part = _get_timestamp_parts(self.start_at) 264 + duration = int(time.time() - self.start_at) 265 + segment_key = f"{time_part}_{duration}" 266 + final_dir = self.segment_dir.parent / segment_key 267 + 268 + try: 269 + os.rename(str(self.segment_dir), str(final_dir)) 270 + logger.info(f"Segment finalized: {segment_key}") 271 + return segment_key 272 + except OSError as e: 273 + logger.error(f"Failed to finalize segment: {e}") 274 + return None 275 + 276 + async def handle_boundary(self, new_mode: str): 277 + """Handle window boundary rollover. 278 + 279 + Closes the current segment, writes audio, finalizes to local cache, 280 + and triggers sync. No network calls in the capture loop. 281 + """ 282 + # Stop screencast first (closes file handles) 283 + if self.current_mode == MODE_SCREENCAST: 284 + logger.info("Stopping previous screencast") 285 + await self.screencaster.stop() 286 + self.current_streams = [] 287 + 288 + # Save audio if we have enough threshold hits 289 + did_save_audio = self.threshold_hits >= MIN_HITS_FOR_SAVE 290 + if did_save_audio and self.segment_dir: 291 + audio_files = self._save_audio_segment( 292 + self.segment_dir, self.segment_is_muted 293 + ) 294 + if audio_files: 295 + logger.info( 296 + f"Saved {len(audio_files)} audio file(s) ({self.threshold_hits} hits)" 297 + ) 298 + else: 299 + logger.debug( 300 + f"Skipping audio save (only {self.threshold_hits}/{MIN_HITS_FOR_SAVE} hits)" 301 + ) 302 + 303 + # Reset audio state 304 + self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2) 305 + self.threshold_hits = 0 306 + 307 + # Finalize segment (rename .incomplete -> HHMMSS_DDD/) 308 + segment_key = self._finalize_segment() 309 + self.segment_dir = None 310 + 311 + # Trigger sync to upload the completed segment 312 + if segment_key and self._sync: 313 + self._sync.trigger() 314 + 315 + # Update segment mute state for new segment 316 + self.segment_is_muted = self.cached_is_muted 317 + 318 + # Update mode 319 + old_mode = self.current_mode 320 + self.current_mode = new_mode 321 + 322 + # Start new capture based on mode 323 + if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 324 + await self.initialize_screencast() 325 + else: 326 + self._start_segment() 327 + 328 + logger.info(f"Mode transition: {old_mode} -> {new_mode}") 329 + 330 + async def initialize_screencast(self) -> bool: 331 + """Start a new screencast recording. 332 + 333 + Creates a segment directory and starts GStreamer recording to it. 334 + """ 335 + segment_dir = self._start_segment() 336 + 337 + try: 338 + streams = await self.screencaster.start( 339 + str(segment_dir), framerate=1, draw_cursor=True 340 + ) 341 + except RuntimeError as e: 342 + logger.error(f"Failed to start screencast: {e}") 343 + raise 344 + 345 + if not streams: 346 + logger.error("No streams returned from screencast start") 347 + raise RuntimeError("No streams available") 348 + 349 + self.current_streams = streams 350 + 351 + logger.info(f"Started screencast with {len(streams)} stream(s)") 352 + for stream in streams: 353 + logger.info(f" {stream.position} ({stream.connector}): {stream.file_path}") 354 + 355 + return True 356 + 357 + def emit_status(self): 358 + """Emit observe.status event with current state (fire-and-forget).""" 359 + if not self._client: 360 + return 361 + 362 + elapsed = int(time.monotonic() - self.start_at_mono) 363 + 364 + # Screencast info 365 + if self.current_mode == MODE_SCREENCAST and self.current_streams: 366 + streams_info = [ 367 + { 368 + "position": stream.position, 369 + "connector": stream.connector, 370 + "file": stream.file_path, 371 + } 372 + for stream in self.current_streams 373 + ] 374 + screencast_info = { 375 + "recording": True, 376 + "streams": streams_info, 377 + "window_elapsed_seconds": elapsed, 378 + } 379 + else: 380 + screencast_info = {"recording": False} 381 + 382 + # Audio info 383 + audio_info = { 384 + "threshold_hits": self.threshold_hits, 385 + "will_save": self.threshold_hits >= MIN_HITS_FOR_SAVE, 386 + } 387 + 388 + # Activity info 389 + activity_info = { 390 + "active": self.cached_is_active, 391 + "idle_time_ms": self.cached_idle_time_ms, 392 + "screen_locked": self.cached_screen_locked, 393 + "sink_muted": self.cached_is_muted, 394 + "power_save": self.cached_power_save, 395 + } 396 + 397 + self._client.relay_event( 398 + "observe", 399 + "status", 400 + mode=self.current_mode, 401 + screencast=screencast_info, 402 + audio=audio_info, 403 + activity=activity_info, 404 + host=HOST, 405 + platform=PLATFORM, 406 + stream=self.stream, 407 + ) 408 + 409 + async def main_loop(self): 410 + """Run the main observer loop with background sync.""" 411 + logger.info(f"Starting observer loop (interval={self.interval}s)") 412 + 413 + # Start sync service as background task 414 + sync_task = None 415 + if self._sync: 416 + sync_task = asyncio.create_task(self._sync.run()) 417 + 418 + # Determine initial mode 419 + new_mode = await self.check_activity_status() 420 + self.segment_is_muted = self.cached_is_muted 421 + self.current_mode = new_mode 422 + 423 + # Start initial capture based on mode 424 + if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 425 + try: 426 + await self.initialize_screencast() 427 + except RuntimeError: 428 + self.running = False 429 + if sync_task: 430 + if self._sync: 431 + self._sync.stop() 432 + sync_task.cancel() 433 + try: 434 + await sync_task 435 + except asyncio.CancelledError: 436 + pass 437 + return 438 + else: 439 + self._start_segment() 440 + 441 + logger.info(f"Initial mode: {self.current_mode}") 442 + 443 + try: 444 + while self.running: 445 + await asyncio.sleep(CHUNK_DURATION) 446 + 447 + # Check activity status and determine new mode 448 + new_mode = await self.check_activity_status() 449 + 450 + # Check for GStreamer failure mid-recording 451 + if ( 452 + self.current_mode == MODE_SCREENCAST 453 + and not self.screencaster.is_healthy() 454 + ): 455 + logger.warning("Screencast recording failed, stopping gracefully") 456 + await self.screencaster.stop() 457 + self.current_streams = [] 458 + self.current_mode = MODE_IDLE 459 + 460 + # Detect mode change 461 + mode_changed = new_mode != self.current_mode 462 + if mode_changed: 463 + logger.info(f"Mode changing: {self.current_mode} -> {new_mode}") 464 + 465 + # Only trigger segment boundary on screencast transitions 466 + screencast_transition = mode_changed and ( 467 + self.current_mode == MODE_SCREENCAST or new_mode == MODE_SCREENCAST 468 + ) 469 + 470 + # Detect mute state transition 471 + mute_transition = self.cached_is_muted != self.segment_is_muted 472 + if mute_transition: 473 + logger.info( 474 + f"Mute state changed: " 475 + f"{'muted' if self.segment_is_muted else 'unmuted'} -> " 476 + f"{'muted' if self.cached_is_muted else 'unmuted'}" 477 + ) 478 + 479 + # Capture audio buffer for this chunk 480 + audio_chunk = self.audio_recorder.get_buffers() 481 + 482 + if audio_chunk.size > 0: 483 + self.accumulated_audio_buffer = np.vstack( 484 + (self.accumulated_audio_buffer, audio_chunk) 485 + ) 486 + rms = self.compute_rms(audio_chunk) 487 + if rms > RMS_THRESHOLD: 488 + self.threshold_hits += 1 489 + logger.debug( 490 + f"RMS {rms:.4f} > threshold (hit {self.threshold_hits})" 491 + ) 492 + else: 493 + logger.debug(f"RMS {rms:.4f} below threshold") 494 + else: 495 + logger.debug("No audio data in chunk") 496 + 497 + # Check for window boundary (monotonic to avoid DST/clock jumps) 498 + elapsed = time.monotonic() - self.start_at_mono 499 + is_boundary = ( 500 + (elapsed >= self.interval) or screencast_transition or mute_transition 501 + ) 502 + 503 + if is_boundary: 504 + logger.info( 505 + f"Boundary: elapsed={elapsed:.1f}s screencast_change={screencast_transition} " 506 + f"mute_change={mute_transition} " 507 + f"hits={self.threshold_hits}/{MIN_HITS_FOR_SAVE}" 508 + ) 509 + await self.handle_boundary(new_mode) 510 + 511 + # Emit status event 512 + self.emit_status() 513 + finally: 514 + # Cleanup on exit 515 + logger.info("Observer loop stopped, cleaning up...") 516 + await self.shutdown() 517 + if sync_task: 518 + if self._sync: 519 + self._sync.stop() 520 + sync_task.cancel() 521 + try: 522 + await sync_task 523 + except asyncio.CancelledError: 524 + pass 525 + 526 + async def shutdown(self): 527 + """Clean shutdown of observer.""" 528 + # Stop screencast first (closes file handles) 529 + if self.current_mode == MODE_SCREENCAST: 530 + logger.info("Stopping screencast for shutdown") 531 + await self.screencaster.stop() 532 + await asyncio.sleep(0.5) 533 + 534 + # Save final audio if threshold met 535 + if self.threshold_hits >= MIN_HITS_FOR_SAVE and self.segment_dir: 536 + audio_files = self._save_audio_segment( 537 + self.segment_dir, self.segment_is_muted 538 + ) 539 + if audio_files: 540 + logger.info(f"Saved final audio: {len(audio_files)} file(s)") 541 + 542 + # Finalize segment locally 543 + segment_key = self._finalize_segment() 544 + self.segment_dir = None 545 + 546 + if segment_key: 547 + logger.info(f"Finalized segment locally: {segment_key} (shutdown)") 548 + 549 + # Stop audio recorder 550 + self.audio_recorder.stop_recording() 551 + logger.info("Audio recording stopped") 552 + 553 + if self._client: 554 + self._client.stop() 555 + self._client = None 556 + logger.info("Client stopped") 557 + 558 + 559 + async def async_run(config: Config) -> int: 560 + """Async entry point for the observer.""" 561 + from .session_env import check_session_ready 562 + 563 + # Pre-flight: check session prerequisites 564 + not_ready = check_session_ready() 565 + if not_ready: 566 + logger.warning("Session not ready: %s", not_ready) 567 + return 75 # EXIT_TEMPFAIL 568 + 569 + observer = Observer(config) 570 + 571 + loop = asyncio.get_running_loop() 572 + 573 + def signal_handler(): 574 + logger.info("Received shutdown signal") 575 + observer.running = False 576 + 577 + for sig in (signal.SIGINT, signal.SIGTERM): 578 + loop.add_signal_handler(sig, signal_handler) 579 + 580 + if not await observer.setup(): 581 + logger.error("Observer setup failed") 582 + return 1 583 + 584 + try: 585 + await observer.main_loop() 586 + except RuntimeError as e: 587 + logger.error(f"Observer runtime error: {e}") 588 + return 1 589 + except Exception as e: 590 + logger.error(f"Observer error: {e}", exc_info=True) 591 + return 1 592 + 593 + return 0
+178
src/solstone_linux/recovery.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Crash recovery for orphaned .incomplete segment directories. 5 + 6 + Modeled on solstone-macos's IncompleteSegmentRecovery.swift. 7 + Runs on startup before the capture loop begins. 8 + 9 + Improvement over tmux baseline: reads .metadata JSON file for accurate 10 + start timestamp instead of relying on brittle filesystem timestamps. 11 + """ 12 + 13 + from __future__ import annotations 14 + 15 + import json 16 + import logging 17 + import os 18 + import time 19 + from pathlib import Path 20 + 21 + logger = logging.getLogger(__name__) 22 + 23 + # Segments newer than this are assumed to be actively recording 24 + MINIMUM_AGE_SECONDS = 120 # 2 minutes 25 + 26 + METADATA_FILENAME = ".metadata" 27 + 28 + 29 + def write_segment_metadata(segment_dir: Path, start_timestamp: float) -> None: 30 + """Write metadata file inside a segment directory. 31 + 32 + Called when creating a new .incomplete segment so recovery can 33 + use the actual start timestamp instead of filesystem timestamps. 34 + """ 35 + meta_path = segment_dir / METADATA_FILENAME 36 + try: 37 + data = {"start_timestamp": start_timestamp} 38 + with open(meta_path, "w", encoding="utf-8") as f: 39 + json.dump(data, f) 40 + f.write("\n") 41 + except OSError as e: 42 + logger.warning(f"Failed to write segment metadata: {e}") 43 + 44 + 45 + def _read_segment_metadata(segment_dir: Path) -> dict | None: 46 + """Read metadata file from a segment directory.""" 47 + meta_path = segment_dir / METADATA_FILENAME 48 + if not meta_path.exists(): 49 + return None 50 + try: 51 + with open(meta_path, encoding="utf-8") as f: 52 + return json.load(f) 53 + except (json.JSONDecodeError, OSError): 54 + return None 55 + 56 + 57 + def recover_incomplete_segments(captures_dir: Path) -> int: 58 + """Scan captures dir for orphaned .incomplete directories and finalize them. 59 + 60 + For each .incomplete directory older than 2 minutes: 61 + - Read .metadata for start timestamp if available, else fall back to 62 + filesystem timestamps (mtime - ctime) 63 + - Rename to HHMMSS_DDD/ format 64 + - If recovery fails, rename to HHMMSS.failed/ to prevent infinite retry 65 + 66 + Returns the number of successfully recovered segments. 67 + """ 68 + if not captures_dir.exists(): 69 + return 0 70 + 71 + recovered = 0 72 + now = time.time() 73 + 74 + for day_dir in sorted(captures_dir.iterdir()): 75 + if not day_dir.is_dir(): 76 + continue 77 + 78 + for stream_dir in sorted(day_dir.iterdir()): 79 + if not stream_dir.is_dir(): 80 + continue 81 + 82 + for segment_dir in sorted(stream_dir.iterdir()): 83 + if not segment_dir.is_dir(): 84 + continue 85 + 86 + dir_name = segment_dir.name 87 + if not dir_name.endswith(".incomplete"): 88 + continue 89 + 90 + # Check age 91 + try: 92 + dir_stat = segment_dir.stat() 93 + age = now - dir_stat.st_mtime 94 + if age < MINIMUM_AGE_SECONDS: 95 + logger.debug(f"Skipping recent incomplete: {dir_name}") 96 + continue 97 + except OSError: 98 + continue 99 + 100 + logger.info(f"Recovering incomplete segment: {dir_name}") 101 + if _recover_segment(segment_dir): 102 + recovered += 1 103 + 104 + if recovered: 105 + logger.info(f"Recovered {recovered} incomplete segment(s)") 106 + return recovered 107 + 108 + 109 + def _recover_segment(segment_dir: Path) -> bool: 110 + """Recover a single incomplete segment directory. 111 + 112 + Returns True on success. 113 + """ 114 + dir_name = segment_dir.name 115 + time_prefix = dir_name.removesuffix(".incomplete") 116 + 117 + # Try .metadata first for accurate duration 118 + metadata = _read_segment_metadata(segment_dir) 119 + if metadata and "start_timestamp" in metadata: 120 + start_ts = metadata["start_timestamp"] 121 + duration = max(1, int(time.time() - start_ts)) 122 + else: 123 + # Fall back to filesystem timestamps 124 + try: 125 + st = segment_dir.stat() 126 + duration = max(1, int(st.st_mtime - st.st_ctime)) 127 + except OSError: 128 + return _mark_failed(segment_dir) 129 + 130 + # Check there are actual files inside (ignore .metadata) 131 + try: 132 + contents = [ 133 + f for f in segment_dir.iterdir() 134 + if f.name != METADATA_FILENAME 135 + ] 136 + if not contents: 137 + logger.warning(f"Empty incomplete segment: {dir_name}") 138 + return _mark_failed(segment_dir) 139 + except OSError: 140 + return _mark_failed(segment_dir) 141 + 142 + # Build final segment key with duration 143 + segment_key = f"{time_prefix}_{duration}" 144 + final_dir = segment_dir.parent / segment_key 145 + 146 + # Remove .metadata before finalizing (not a capture artifact) 147 + meta_path = segment_dir / METADATA_FILENAME 148 + if meta_path.exists(): 149 + try: 150 + meta_path.unlink() 151 + except OSError: 152 + pass 153 + 154 + try: 155 + os.rename(str(segment_dir), str(final_dir)) 156 + logger.info(f"Recovered: {dir_name} -> {segment_key}") 157 + return True 158 + except OSError as e: 159 + logger.warning(f"Failed to rename {dir_name}: {e}") 160 + return _mark_failed(segment_dir) 161 + 162 + 163 + def _mark_failed(segment_dir: Path) -> bool: 164 + """Rename from .incomplete to .failed to prevent infinite retry.""" 165 + dir_name = segment_dir.name 166 + if not dir_name.endswith(".incomplete"): 167 + return False 168 + 169 + failed_name = dir_name.removesuffix(".incomplete") + ".failed" 170 + failed_dir = segment_dir.parent / failed_name 171 + 172 + try: 173 + os.rename(str(segment_dir), str(failed_dir)) 174 + logger.warning(f"Marked as failed: {dir_name} -> {failed_name}") 175 + except OSError as e: 176 + logger.error(f"Failed to mark as failed: {e}") 177 + 178 + return False
+493
src/solstone_linux/screencast.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """ 5 + Portal-based multi-monitor screencast recording. 6 + 7 + Uses xdg-desktop-portal ScreenCast API with PipeWire + GStreamer to record 8 + each monitor as a separate file. This replaces the old GNOME Shell D-Bus approach. 9 + 10 + Extracted from solstone's observe/linux/screencast.py. 11 + 12 + Changes from monorepo version: 13 + - Replaces `from think.utils import get_journal` with config-based restore token path 14 + - Replaces `from observe.gnome.activity import get_monitor_geometries` with local activity module 15 + 16 + Runtime deps: 17 + - xdg-desktop-portal with org.freedesktop.portal.ScreenCast 18 + - Portal backend: xdg-desktop-portal-gnome (or -kde, -wlr, etc.) 19 + - PipeWire running 20 + - GStreamer with PipeWire plugin: gst-launch-1.0 pipewiresrc 21 + """ 22 + 23 + import asyncio 24 + import logging 25 + import os 26 + import signal 27 + import subprocess 28 + import uuid 29 + from dataclasses import dataclass 30 + from pathlib import Path 31 + 32 + from dbus_next import Variant, introspection 33 + from dbus_next.aio import MessageBus 34 + from dbus_next.constants import BusType 35 + 36 + # Workaround for dbus-next issue #122: portal has properties with hyphens 37 + # (e.g., "power-saver-enabled") which violate strict D-Bus naming validation. 38 + introspection.assert_member_name_valid = lambda name: None 39 + 40 + logger = logging.getLogger(__name__) 41 + 42 + # Portal D-Bus constants 43 + PORTAL_BUS = "org.freedesktop.portal.Desktop" 44 + PORTAL_PATH = "/org/freedesktop/portal/desktop" 45 + SC_IFACE = "org.freedesktop.portal.ScreenCast" 46 + REQ_IFACE = "org.freedesktop.portal.Request" 47 + SESSION_IFACE = "org.freedesktop.portal.Session" 48 + 49 + 50 + @dataclass 51 + class StreamInfo: 52 + """Information about a single monitor's recording stream.""" 53 + 54 + node_id: int 55 + position: str 56 + connector: str 57 + x: int 58 + y: int 59 + width: int 60 + height: int 61 + file_path: str # Final path in segment directory 62 + 63 + @property 64 + def filename(self) -> str: 65 + """Return just the filename for event payloads.""" 66 + return os.path.basename(self.file_path) 67 + 68 + 69 + def _load_restore_token(token_path: Path) -> str | None: 70 + """Load restore token from disk.""" 71 + try: 72 + data = token_path.read_text(encoding="utf-8").strip() 73 + return data or None 74 + except (FileNotFoundError, OSError): 75 + return None 76 + 77 + 78 + def _save_restore_token(token: str, token_path: Path) -> None: 79 + """Save restore token to disk.""" 80 + try: 81 + token_path.parent.mkdir(parents=True, exist_ok=True) 82 + token_path.write_text(token.strip() + "\n", encoding="utf-8") 83 + logger.debug(f"Saved restore token to {token_path}") 84 + except OSError as e: 85 + logger.warning(f"Failed to save restore token: {e}") 86 + 87 + 88 + def _make_request_handle(bus: MessageBus, token: str) -> str: 89 + """Compute expected Request object path for a handle_token.""" 90 + sender = bus.unique_name.lstrip(":").replace(".", "_") 91 + return f"/org/freedesktop/portal/desktop/request/{sender}/{token}" 92 + 93 + 94 + def _prepare_request_handler(bus: MessageBus, handle: str) -> asyncio.Future: 95 + """Set up signal handler for Request::Response before calling portal method.""" 96 + loop = asyncio.get_running_loop() 97 + fut: asyncio.Future = loop.create_future() 98 + 99 + def _message_handler(msg): 100 + if ( 101 + msg.message_type.name == "SIGNAL" 102 + and msg.path == handle 103 + and msg.interface == REQ_IFACE 104 + and msg.member == "Response" 105 + ): 106 + response = msg.body[0] 107 + results = msg.body[1] if len(msg.body) > 1 else {} 108 + if not fut.done(): 109 + fut.set_result((int(response), results)) 110 + bus.remove_message_handler(_message_handler) 111 + 112 + bus.add_message_handler(_message_handler) 113 + return fut 114 + 115 + 116 + def _variant_or_value(val): 117 + """Extract value from Variant if needed.""" 118 + if isinstance(val, Variant): 119 + return val.value 120 + return val 121 + 122 + 123 + def _match_streams_to_monitors(streams: list[dict], monitors: list[dict]) -> list[dict]: 124 + """ 125 + Match portal stream geometries to GDK monitor info. 126 + 127 + Portal streams have position (x, y) and size (width, height). 128 + GDK monitors have connector IDs and box coordinates. 129 + 130 + Returns streams augmented with connector and position labels. 131 + """ 132 + matched = [] 133 + 134 + for stream in streams: 135 + props = stream.get("props", {}) 136 + 137 + # Extract stream geometry from portal properties 138 + stream_pos = _variant_or_value(props.get("position", (0, 0))) 139 + stream_size = _variant_or_value(props.get("size", (0, 0))) 140 + 141 + if isinstance(stream_pos, (tuple, list)) and len(stream_pos) >= 2: 142 + sx, sy = int(stream_pos[0]), int(stream_pos[1]) 143 + else: 144 + sx, sy = 0, 0 145 + 146 + if isinstance(stream_size, (tuple, list)) and len(stream_size) >= 2: 147 + sw, sh = int(stream_size[0]), int(stream_size[1]) 148 + else: 149 + sw, sh = 0, 0 150 + 151 + # Find matching monitor by geometry 152 + best_match = None 153 + best_overlap = 0 154 + 155 + for monitor in monitors: 156 + mx1, my1, mx2, my2 = monitor["box"] 157 + mw, mh = mx2 - mx1, my2 - my1 158 + 159 + # Check if geometries match (within tolerance for scaling) 160 + if abs(sx - mx1) < 10 and abs(sy - my1) < 10: 161 + overlap = min(sw, mw) * min(sh, mh) 162 + if overlap > best_overlap: 163 + best_overlap = overlap 164 + best_match = monitor 165 + 166 + if best_match: 167 + stream["connector"] = best_match["id"] 168 + stream["position_label"] = best_match.get("position", "unknown") 169 + stream["x"] = best_match["box"][0] 170 + stream["y"] = best_match["box"][1] 171 + stream["width"] = best_match["box"][2] - best_match["box"][0] 172 + stream["height"] = best_match["box"][3] - best_match["box"][1] 173 + else: 174 + # Fallback: use stream index as identifier 175 + stream["connector"] = f"monitor-{stream['idx']}" 176 + stream["position_label"] = "unknown" 177 + stream["x"] = sx 178 + stream["y"] = sy 179 + stream["width"] = sw 180 + stream["height"] = sh 181 + 182 + matched.append(stream) 183 + 184 + return matched 185 + 186 + 187 + class Screencaster: 188 + """Portal-based multi-monitor screencast manager.""" 189 + 190 + def __init__(self, restore_token_path: Path): 191 + self.bus: MessageBus | None = None 192 + self.session_handle: str | None = None 193 + self.pw_fd: int | None = None 194 + self.gst_process: subprocess.Popen | None = None 195 + self.streams: list[StreamInfo] = [] 196 + self._started = False 197 + self._restore_token_path = restore_token_path 198 + 199 + async def connect(self) -> bool: 200 + """ 201 + Establish D-Bus connection and verify portal availability. 202 + 203 + Returns: 204 + True if portal is available, False otherwise. 205 + """ 206 + if self.bus is not None: 207 + return True 208 + 209 + try: 210 + self.bus = await MessageBus( 211 + bus_type=BusType.SESSION, 212 + negotiate_unix_fd=True, 213 + ).connect() 214 + 215 + # Verify portal interface exists 216 + root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH) 217 + root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro) 218 + root_obj.get_interface(SC_IFACE) 219 + return True 220 + 221 + except Exception as e: 222 + logger.error(f"Portal not available: {e}") 223 + self.bus = None 224 + return False 225 + 226 + async def start( 227 + self, 228 + output_dir: str, 229 + framerate: int = 1, 230 + draw_cursor: bool = True, 231 + ) -> list[StreamInfo]: 232 + """ 233 + Start screencast recording for all monitors. 234 + 235 + Files are written directly to output_dir with final names (position_connector_screen.webm). 236 + The output_dir is typically a segment directory that will be renamed on completion. 237 + 238 + Args: 239 + output_dir: Directory for output files (e.g., YYYYMMDD/stream/HHMMSS.incomplete/) 240 + framerate: Frames per second (default: 1) 241 + draw_cursor: Whether to draw mouse cursor (default: True) 242 + 243 + Returns: 244 + List of StreamInfo for each monitor being recorded. 245 + 246 + Raises: 247 + RuntimeError: If recording fails to start. 248 + """ 249 + if not await self.connect(): 250 + raise RuntimeError("Portal not available") 251 + 252 + # Get monitor info from GDK for connector IDs 253 + from .activity import get_monitor_geometries 254 + try: 255 + monitors = get_monitor_geometries() 256 + except Exception as e: 257 + logger.warning(f"Failed to get monitor geometries: {e}") 258 + monitors = [] 259 + 260 + # Get portal interface 261 + root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH) 262 + root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro) 263 + screencast = root_obj.get_interface(SC_IFACE) 264 + 265 + # 1) CreateSession 266 + create_token = "h_" + uuid.uuid4().hex 267 + create_handle = _make_request_handle(self.bus, create_token) 268 + create_fut = _prepare_request_handler(self.bus, create_handle) 269 + 270 + create_opts = { 271 + "handle_token": Variant("s", create_token), 272 + "session_handle_token": Variant("s", "s_" + uuid.uuid4().hex), 273 + } 274 + 275 + await screencast.call_create_session(create_opts) 276 + resp, results = await create_fut 277 + if resp != 0: 278 + raise RuntimeError(f"CreateSession failed with code {resp}") 279 + 280 + self.session_handle = str(_variant_or_value(results.get("session_handle"))) 281 + if not self.session_handle: 282 + raise RuntimeError("CreateSession returned no session_handle") 283 + 284 + logger.debug(f"Portal session: {self.session_handle}") 285 + 286 + # 2) SelectSources 287 + restore_token = _load_restore_token(self._restore_token_path) 288 + if restore_token: 289 + logger.debug("Using saved restore token") 290 + 291 + cursor_mode = 1 if draw_cursor else 0 292 + 293 + select_token = "h_" + uuid.uuid4().hex 294 + select_handle = _make_request_handle(self.bus, select_token) 295 + select_fut = _prepare_request_handler(self.bus, select_handle) 296 + 297 + select_opts = { 298 + "handle_token": Variant("s", select_token), 299 + "types": Variant("u", 1), # 1 = MONITOR 300 + "multiple": Variant("b", True), 301 + "cursor_mode": Variant("u", cursor_mode), 302 + "persist_mode": Variant("u", 2), # Persist until revoked 303 + } 304 + if restore_token: 305 + select_opts["restore_token"] = Variant("s", restore_token) 306 + 307 + await screencast.call_select_sources(self.session_handle, select_opts) 308 + resp, _ = await select_fut 309 + if resp != 0: 310 + await self._close_session() 311 + raise RuntimeError(f"SelectSources failed with code {resp}") 312 + 313 + # 3) Start 314 + start_token = "h_" + uuid.uuid4().hex 315 + start_handle = _make_request_handle(self.bus, start_token) 316 + start_fut = _prepare_request_handler(self.bus, start_handle) 317 + 318 + start_opts = {"handle_token": Variant("s", start_token)} 319 + await screencast.call_start(self.session_handle, "", start_opts) 320 + resp, results = await start_fut 321 + if resp != 0: 322 + await self._close_session() 323 + raise RuntimeError(f"Start failed with code {resp}") 324 + 325 + portal_streams = _variant_or_value(results.get("streams")) or [] 326 + if not portal_streams: 327 + await self._close_session() 328 + raise RuntimeError("Start returned no streams") 329 + 330 + # Save new restore token if provided 331 + new_token = _variant_or_value(results.get("restore_token")) 332 + if isinstance(new_token, str) and new_token.strip(): 333 + _save_restore_token(new_token, self._restore_token_path) 334 + 335 + # Parse streams 336 + stream_info = [] 337 + for idx, stream in enumerate(portal_streams): 338 + try: 339 + node_id = int(stream[0]) 340 + props = stream[1] if len(stream) > 1 else {} 341 + stream_info.append({"idx": idx, "node_id": node_id, "props": props}) 342 + except Exception as e: 343 + logger.warning(f"Could not parse stream {idx}: {e}") 344 + 345 + if not stream_info: 346 + await self._close_session() 347 + raise RuntimeError("No valid streams found") 348 + 349 + # Match streams to monitors 350 + stream_info = _match_streams_to_monitors(stream_info, monitors) 351 + 352 + logger.info(f"Portal returned {len(stream_info)} stream(s)") 353 + 354 + # 4) OpenPipeWireRemote 355 + fd_obj = await screencast.call_open_pipe_wire_remote(self.session_handle, {}) 356 + if hasattr(fd_obj, "take"): 357 + self.pw_fd = fd_obj.take() 358 + else: 359 + self.pw_fd = int(fd_obj) 360 + 361 + # 5) Build GStreamer pipeline 362 + self.streams = [] 363 + pipeline_parts = [] 364 + 365 + for info in stream_info: 366 + node_id = info["node_id"] 367 + position = info["position_label"] 368 + connector = info["connector"] 369 + 370 + # Final file path: position_connector_screen.webm 371 + file_path = os.path.join(output_dir, f"{position}_{connector}_screen.webm") 372 + 373 + stream_obj = StreamInfo( 374 + node_id=node_id, 375 + position=position, 376 + connector=connector, 377 + x=info["x"], 378 + y=info["y"], 379 + width=info["width"], 380 + height=info["height"], 381 + file_path=file_path, 382 + ) 383 + self.streams.append(stream_obj) 384 + 385 + # GStreamer branch for this stream 386 + branch = ( 387 + f"pipewiresrc fd={self.pw_fd} path={node_id} ! " 388 + f"videorate ! video/x-raw,framerate={framerate}/1 ! " 389 + f"videoconvert ! vp8enc end-usage=cq cq-level=4 max-quantizer=15 " 390 + f"keyframe-max-dist=30 static-threshold=100 ! webmmux ! " 391 + f"filesink location={file_path}" 392 + ) 393 + pipeline_parts.append(branch) 394 + 395 + logger.info(f" Stream {node_id}: {position} ({connector}) -> {file_path}") 396 + 397 + pipeline_str = " ".join(pipeline_parts) 398 + cmd = ["gst-launch-1.0", "-e"] + pipeline_str.split() 399 + 400 + try: 401 + self.gst_process = subprocess.Popen( 402 + cmd, 403 + pass_fds=(self.pw_fd,), 404 + stdout=subprocess.DEVNULL, 405 + stderr=subprocess.PIPE, 406 + ) 407 + except FileNotFoundError: 408 + await self._close_session() 409 + raise RuntimeError("gst-launch-1.0 not found") 410 + except Exception as e: 411 + await self._close_session() 412 + raise RuntimeError(f"Failed to start GStreamer: {e}") 413 + 414 + # Brief delay to check for immediate failure 415 + await asyncio.sleep(0.2) 416 + if self.gst_process.poll() is not None: 417 + stderr = ( 418 + self.gst_process.stderr.read().decode() 419 + if self.gst_process.stderr 420 + else "" 421 + ) 422 + await self._close_session() 423 + raise RuntimeError(f"GStreamer exited immediately: {stderr[:200]}") 424 + 425 + self._started = True 426 + return self.streams 427 + 428 + async def stop(self) -> list[StreamInfo]: 429 + """ 430 + Stop screencast recording gracefully. 431 + 432 + Returns: 433 + List of StreamInfo with file_path for the recorded files. 434 + """ 435 + streams = self.streams.copy() 436 + 437 + # Stop GStreamer with SIGINT for clean EOS 438 + if self.gst_process and self.gst_process.poll() is None: 439 + try: 440 + self.gst_process.send_signal(signal.SIGINT) 441 + try: 442 + await asyncio.wait_for( 443 + asyncio.to_thread(self.gst_process.wait), 444 + timeout=5.0, 445 + ) 446 + except asyncio.TimeoutError: 447 + logger.warning("GStreamer did not exit cleanly, killing") 448 + self.gst_process.kill() 449 + self.gst_process.wait() 450 + except Exception as e: 451 + logger.warning(f"Error stopping GStreamer: {e}") 452 + 453 + self.gst_process = None 454 + 455 + # Close PipeWire fd 456 + if self.pw_fd is not None: 457 + try: 458 + os.close(self.pw_fd) 459 + except OSError: 460 + pass 461 + self.pw_fd = None 462 + 463 + # Close portal session 464 + await self._close_session() 465 + 466 + self.streams = [] 467 + self._started = False 468 + 469 + return streams 470 + 471 + async def _close_session(self): 472 + """Close the portal session.""" 473 + if self.session_handle and self.bus: 474 + try: 475 + session_intro = await self.bus.introspect( 476 + PORTAL_BUS, self.session_handle 477 + ) 478 + session_obj = self.bus.get_proxy_object( 479 + PORTAL_BUS, self.session_handle, session_intro 480 + ) 481 + session_iface = session_obj.get_interface(SESSION_IFACE) 482 + await session_iface.call_close() 483 + except Exception: 484 + pass 485 + self.session_handle = None 486 + 487 + def is_healthy(self) -> bool: 488 + """Check if recording is still running.""" 489 + if not self._started: 490 + return False 491 + if self.gst_process is None: 492 + return False 493 + return self.gst_process.poll() is None
+92
src/solstone_linux/session_env.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Desktop session environment checks and recovery. 5 + 6 + Extracted from solstone's observe/linux/observer.py (lines 598-666). 7 + 8 + _recover_session_env() is kept as fallback for manual CLI launch. 9 + For systemd service launch, PassEnvironment= in the unit file is 10 + the primary mechanism. 11 + """ 12 + 13 + import logging 14 + import os 15 + import shutil 16 + import subprocess 17 + 18 + logger = logging.getLogger(__name__) 19 + 20 + # Exit codes 21 + EXIT_TEMPFAIL = 75 # EX_TEMPFAIL: session not ready, retry later 22 + 23 + 24 + def _recover_session_env() -> None: 25 + """Try to recover desktop session env vars from the systemd user manager. 26 + 27 + On GNOME Wayland, gnome-shell pushes DISPLAY, WAYLAND_DISPLAY, and 28 + DBUS_SESSION_BUS_ADDRESS into the systemd user environment on startup. 29 + When the observer is launched from a non-desktop shell, these vars may be missing 30 + from the inherited environment — but systemctl --user show-environment 31 + has them. 32 + """ 33 + needed = {"DISPLAY", "WAYLAND_DISPLAY", "DBUS_SESSION_BUS_ADDRESS"} 34 + missing = {v for v in needed if not os.environ.get(v)} 35 + if not missing: 36 + return 37 + 38 + # Ensure XDG_RUNTIME_DIR is set (required for systemctl --user to connect) 39 + if not os.environ.get("XDG_RUNTIME_DIR"): 40 + os.environ["XDG_RUNTIME_DIR"] = f"/run/user/{os.getuid()}" 41 + 42 + try: 43 + result = subprocess.run( 44 + ["systemctl", "--user", "show-environment"], 45 + capture_output=True, 46 + text=True, 47 + timeout=5, 48 + ) 49 + if result.returncode != 0: 50 + return 51 + except (FileNotFoundError, subprocess.TimeoutExpired): 52 + return 53 + 54 + recovered = [] 55 + for line in result.stdout.splitlines(): 56 + key, _, value = line.partition("=") 57 + if key in missing and value: 58 + os.environ[key] = value 59 + recovered.append(f"{key}={value}") 60 + 61 + if recovered: 62 + logger.info("Recovered session env from systemd: %s", ", ".join(recovered)) 63 + 64 + 65 + def check_session_ready() -> str | None: 66 + """Check if the desktop session is ready for observation. 67 + 68 + Returns None if ready, or a description of what's missing. 69 + """ 70 + # Try to recover missing session vars from systemd user manager 71 + _recover_session_env() 72 + 73 + # Display server 74 + if not os.environ.get("DISPLAY") and not os.environ.get("WAYLAND_DISPLAY"): 75 + return "no display server (DISPLAY/WAYLAND_DISPLAY not set)" 76 + 77 + # DBus session bus 78 + if not os.environ.get("DBUS_SESSION_BUS_ADDRESS"): 79 + return "no DBus session bus (DBUS_SESSION_BUS_ADDRESS not set)" 80 + 81 + # PulseAudio / PipeWire audio 82 + pactl = shutil.which("pactl") 83 + if pactl: 84 + try: 85 + subprocess.run( 86 + [pactl, "info"], 87 + capture_output=True, 88 + timeout=5, 89 + ).check_returncode() 90 + except (subprocess.CalledProcessError, subprocess.TimeoutExpired): 91 + return "audio server not responding (pactl info failed)" 92 + return None
+87
src/solstone_linux/streams.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Stream identity for observer segments. 5 + 6 + Extracted from solstone's think/streams.py — only the pure naming functions 7 + needed by standalone observers. 8 + 9 + Naming convention (separator is '.'): 10 + Local Linux: {hostname} e.g. "archon" 11 + Remote: {remote_name} e.g. "desktop" 12 + """ 13 + 14 + from __future__ import annotations 15 + 16 + import re 17 + 18 + _STREAM_NAME_RE = re.compile(r"^[a-z0-9][a-z0-9._-]*$") 19 + 20 + 21 + def _strip_hostname(name: str) -> str: 22 + """Strip domain suffix from a hostname, keeping only the first label. 23 + 24 + Dots in stream names are reserved for qualifiers (e.g., '.tmux'). 25 + Hostnames like 'ja1r.local' or '192.168.1.1' must be reduced to a 26 + dot-free base name. 27 + 28 + Examples: 'ja1r.local' -> 'ja1r', '192.168.1.1' -> '192-168-1-1', 29 + 'archon' -> 'archon', 'my.host.example.com' -> 'my' 30 + """ 31 + name = name.strip() 32 + if not name: 33 + return name 34 + parts = name.split(".") 35 + if all(p.isdigit() for p in parts if p): 36 + return "-".join(p for p in parts if p) 37 + return parts[0] 38 + 39 + 40 + def stream_name( 41 + *, 42 + host: str | None = None, 43 + remote: str | None = None, 44 + qualifier: str | None = None, 45 + ) -> str: 46 + """Derive canonical stream name from source characteristics. 47 + 48 + Parameters 49 + ---------- 50 + host : str, optional 51 + Local hostname (e.g., "archon"). 52 + remote : str, optional 53 + Remote observer name (e.g., "desktop"). 54 + qualifier : str, optional 55 + Sub-stream qualifier. Appended with dot separator. 56 + 57 + Returns 58 + ------- 59 + str 60 + Canonical stream name. 61 + 62 + Raises 63 + ------ 64 + ValueError 65 + If no source is provided, or the resulting name is invalid. 66 + """ 67 + if host: 68 + base = _strip_hostname(host) 69 + elif remote: 70 + base = _strip_hostname(remote) 71 + else: 72 + raise ValueError("stream_name requires host or remote") 73 + 74 + name = base.lower().strip() 75 + name = re.sub(r"[\s/\\]+", "-", name) 76 + 77 + if qualifier: 78 + qualifier = qualifier.lower().strip() 79 + qualifier = re.sub(r"[\s/\\]+", "-", qualifier) 80 + name = f"{name}.{qualifier}" 81 + 82 + if not name or ".." in name: 83 + raise ValueError(f"Invalid stream name: {name!r}") 84 + if not _STREAM_NAME_RE.match(name): 85 + raise ValueError(f"Invalid stream name: {name!r}") 86 + 87 + return name
+275
src/solstone_linux/sync.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Background sync service for uploading captured segments. 5 + 6 + Modeled on solstone-macos's SyncService.swift. Runs as an asyncio 7 + background task in the same event loop as capture. Walks cache days 8 + newest-to-oldest, queries server for existing segments, uploads missing ones. 9 + 10 + Refinements over tmux baseline: 11 + - Respects configured sync_max_retries (no hard min(config,3) cap) 12 + - Circuit breaker tuned by error type: auth=immediate, transient=5-10 13 + - Synced-days pruning at 90 days to prevent unbounded cache growth 14 + """ 15 + 16 + from __future__ import annotations 17 + 18 + import asyncio 19 + import json 20 + import logging 21 + import os 22 + import time 23 + from datetime import datetime, timedelta 24 + from pathlib import Path 25 + from typing import Any 26 + 27 + from .config import Config 28 + from .upload import ErrorType, UploadClient 29 + 30 + logger = logging.getLogger(__name__) 31 + 32 + # Circuit breaker thresholds by error type 33 + CIRCUIT_THRESHOLD_AUTH = 1 # Auth failures open immediately 34 + CIRCUIT_THRESHOLD_TRANSIENT = 5 # Transient failures need 5 consecutive 35 + 36 + # Synced days older than this are pruned from the cache 37 + SYNCED_DAYS_MAX_AGE = 90 38 + 39 + 40 + class SyncService: 41 + """Background sync service that uploads completed segments to the server.""" 42 + 43 + def __init__(self, config: Config, client: UploadClient): 44 + self._config = config 45 + self._client = client 46 + self._synced_days: set[str] = set() 47 + self._consecutive_failures = 0 48 + self._last_error_type: ErrorType | None = None 49 + self._circuit_open = False 50 + self._last_full_sync: float = 0 51 + self._running = True 52 + self._trigger = asyncio.Event() 53 + 54 + # Load synced days cache 55 + self._load_synced_days() 56 + 57 + def _synced_days_path(self) -> Path: 58 + return self._config.state_dir / "synced_days.json" 59 + 60 + def _load_synced_days(self) -> None: 61 + path = self._synced_days_path() 62 + if not path.exists(): 63 + return 64 + try: 65 + with open(path, encoding="utf-8") as f: 66 + data = json.load(f) 67 + self._synced_days = set(data) if isinstance(data, list) else set() 68 + except (json.JSONDecodeError, OSError): 69 + self._synced_days = set() 70 + 71 + def _save_synced_days(self) -> None: 72 + self._config.state_dir.mkdir(parents=True, exist_ok=True) 73 + path = self._synced_days_path() 74 + tmp = path.with_suffix(f".{os.getpid()}.tmp") 75 + try: 76 + with open(tmp, "w", encoding="utf-8") as f: 77 + json.dump(sorted(self._synced_days), f) 78 + f.write("\n") 79 + os.rename(str(tmp), str(path)) 80 + except OSError as e: 81 + logger.warning(f"Failed to save synced days: {e}") 82 + 83 + def _prune_synced_days(self) -> None: 84 + """Remove synced-days entries older than 90 days.""" 85 + if not self._synced_days: 86 + return 87 + cutoff = (datetime.now() - timedelta(days=SYNCED_DAYS_MAX_AGE)).strftime("%Y%m%d") 88 + before = len(self._synced_days) 89 + self._synced_days = {d for d in self._synced_days if d >= cutoff} 90 + pruned = before - len(self._synced_days) 91 + if pruned: 92 + logger.info(f"Pruned {pruned} synced-days entries older than {SYNCED_DAYS_MAX_AGE} days") 93 + self._save_synced_days() 94 + 95 + def _circuit_threshold(self) -> int: 96 + """Get circuit breaker threshold based on last error type.""" 97 + if self._last_error_type == ErrorType.AUTH: 98 + return CIRCUIT_THRESHOLD_AUTH 99 + return CIRCUIT_THRESHOLD_TRANSIENT 100 + 101 + def trigger(self) -> None: 102 + """Trigger a sync pass (called by observer on segment completion).""" 103 + self._trigger.set() 104 + 105 + def stop(self) -> None: 106 + """Stop the sync service.""" 107 + self._running = False 108 + self._trigger.set() 109 + 110 + async def run(self) -> None: 111 + """Main sync loop — waits for triggers, then syncs.""" 112 + # Prune on startup 113 + self._prune_synced_days() 114 + 115 + while self._running: 116 + try: 117 + # Wait for trigger or periodic check (60s timeout) 118 + try: 119 + await asyncio.wait_for(self._trigger.wait(), timeout=60) 120 + except asyncio.TimeoutError: 121 + pass 122 + 123 + self._trigger.clear() 124 + 125 + if not self._running: 126 + break 127 + 128 + if self._circuit_open: 129 + logger.warning("Circuit breaker open — skipping sync") 130 + continue 131 + 132 + # Force full sync daily 133 + now = time.time() 134 + force_full = (now - self._last_full_sync) > 86400 135 + 136 + await self._sync(force_full=force_full) 137 + 138 + if force_full: 139 + self._last_full_sync = now 140 + 141 + except Exception as e: 142 + logger.error(f"Sync error: {e}", exc_info=True) 143 + await asyncio.sleep(5) 144 + 145 + async def _sync(self, force_full: bool = False) -> None: 146 + """Walk days newest-to-oldest and upload missing segments.""" 147 + captures_dir = self._config.captures_dir 148 + if not captures_dir.exists(): 149 + return 150 + 151 + today = datetime.now().strftime("%Y%m%d") 152 + 153 + # Collect segments by day 154 + segments_by_day = self._collect_segments(captures_dir) 155 + if not segments_by_day: 156 + return 157 + 158 + for day in sorted(segments_by_day.keys(), reverse=True): 159 + if not self._running: 160 + break 161 + 162 + if self._circuit_open: 163 + break 164 + 165 + # Skip past days already fully synced (unless forcing) 166 + if day != today and day in self._synced_days and not force_full: 167 + continue 168 + 169 + local_segments = segments_by_day[day] 170 + 171 + # Query server for existing segments 172 + server_segments = await asyncio.to_thread( 173 + self._client.get_server_segments, day 174 + ) 175 + if server_segments is None: 176 + logger.warning(f"Failed to query server for day {day}") 177 + continue 178 + 179 + # Build lookup 180 + server_keys: set[str] = set() 181 + for seg in server_segments: 182 + server_keys.add(seg.get("key", "")) 183 + if "original_key" in seg: 184 + server_keys.add(seg["original_key"]) 185 + 186 + any_needed_upload = False 187 + 188 + for segment_dir in local_segments: 189 + if not self._running or self._circuit_open: 190 + break 191 + 192 + segment_key = segment_dir.name 193 + if segment_key in server_keys: 194 + continue 195 + 196 + any_needed_upload = True 197 + success = await self._upload_segment(day, segment_dir) 198 + 199 + if not success: 200 + self._consecutive_failures += 1 201 + threshold = self._circuit_threshold() 202 + if self._consecutive_failures >= threshold: 203 + self._circuit_open = True 204 + logger.error( 205 + f"Circuit breaker OPEN: {self._consecutive_failures} consecutive " 206 + f"{self._last_error_type.value if self._last_error_type else 'unknown'} " 207 + f"failures (threshold: {threshold})" 208 + ) 209 + break 210 + else: 211 + self._consecutive_failures = 0 212 + self._last_error_type = None 213 + 214 + # Mark past days as synced if nothing needed upload 215 + if day != today and not any_needed_upload: 216 + self._synced_days.add(day) 217 + self._save_synced_days() 218 + 219 + def _collect_segments(self, captures_dir: Path) -> dict[str, list[Path]]: 220 + """Collect completed segments grouped by day.""" 221 + result: dict[str, list[Path]] = {} 222 + 223 + for day_dir in sorted(captures_dir.iterdir(), reverse=True): 224 + if not day_dir.is_dir(): 225 + continue 226 + 227 + day = day_dir.name 228 + 229 + for stream_dir in day_dir.iterdir(): 230 + if not stream_dir.is_dir(): 231 + continue 232 + 233 + segments = [] 234 + for seg_dir in sorted(stream_dir.iterdir(), reverse=True): 235 + if not seg_dir.is_dir(): 236 + continue 237 + name = seg_dir.name 238 + # Skip incomplete and failed 239 + if name.endswith(".incomplete") or name.endswith(".failed"): 240 + continue 241 + segments.append(seg_dir) 242 + 243 + if segments: 244 + result.setdefault(day, []).extend(segments) 245 + 246 + return result 247 + 248 + async def _upload_segment(self, day: str, segment_dir: Path) -> bool: 249 + """Upload a single segment with retry logic.""" 250 + segment_key = segment_dir.name 251 + files = [f for f in segment_dir.iterdir() if f.is_file()] 252 + if not files: 253 + return True # Nothing to upload 254 + 255 + meta: dict[str, Any] = {"stream": self._config.stream} 256 + 257 + result = await asyncio.to_thread( 258 + self._client.upload_segment, day, segment_key, files, meta 259 + ) 260 + 261 + if result.success: 262 + logger.info(f"Uploaded: {day}/{segment_key} ({len(files)} files)") 263 + return True 264 + 265 + # Track error type for circuit breaker 266 + self._last_error_type = result.error_type 267 + 268 + # Non-retryable errors 269 + if self._client.is_revoked: 270 + logger.error("Client revoked — disabling sync") 271 + self._circuit_open = True 272 + return False 273 + 274 + logger.error(f"Upload failed: {day}/{segment_key}") 275 + return False
+253
src/solstone_linux/upload.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """HTTP upload client for solstone ingest server. 5 + 6 + Extracted from solstone's observe/remote_client.py. Accepts Config 7 + as constructor parameter instead of reading config internally. 8 + 9 + Refinements over tmux baseline: 10 + - Respects configured sync_max_retries without hard cap 11 + - Error classification: auth (401/403) vs transient (5xx/network) 12 + """ 13 + 14 + from __future__ import annotations 15 + 16 + import json 17 + import logging 18 + import time 19 + from enum import Enum 20 + from pathlib import Path 21 + from typing import Any, NamedTuple 22 + 23 + import requests 24 + 25 + from .config import Config 26 + 27 + logger = logging.getLogger(__name__) 28 + 29 + UPLOAD_TIMEOUT = 300 30 + EVENT_TIMEOUT = 30 31 + 32 + 33 + class ErrorType(Enum): 34 + """Classification of upload errors for circuit breaker tuning.""" 35 + AUTH = "auth" # 401, 403 — open circuit immediately 36 + CLIENT = "client" # 400 — non-retryable, don't count for circuit 37 + TRANSIENT = "transient" # 5xx, network, timeout — allow more failures 38 + 39 + 40 + class UploadResult(NamedTuple): 41 + success: bool 42 + duplicate: bool = False 43 + error_type: ErrorType | None = None 44 + 45 + 46 + class UploadClient: 47 + """HTTP client for uploading observer segments to the ingest server.""" 48 + 49 + def __init__(self, config: Config): 50 + self._url = config.server_url.rstrip("/") if config.server_url else "" 51 + self._key = config.key 52 + self._stream = config.stream 53 + self._revoked = False 54 + self._session = requests.Session() 55 + self._retry_backoff = config.sync_retry_delays or [5, 30, 120, 300] 56 + # Respect configured retry cap — no hard min(config, 3) 57 + self._max_retries = config.sync_max_retries 58 + 59 + @property 60 + def is_revoked(self) -> bool: 61 + return self._revoked 62 + 63 + def _persist_key(self, config: Config, key: str) -> None: 64 + """Save auto-registered key back to config.""" 65 + from .config import save_config 66 + 67 + config.key = key 68 + save_config(config) 69 + 70 + def ensure_registered(self, config: Config) -> bool: 71 + """Ensure the client has a valid key, auto-registering if needed. 72 + 73 + Returns True if a key is available. 74 + """ 75 + if self._key: 76 + return True 77 + if not self._url: 78 + return False 79 + 80 + url = f"{self._url}/app/remote/api/create" 81 + name = self._stream or "solstone-linux" 82 + 83 + retries = min(3, len(self._retry_backoff)) 84 + for attempt in range(retries): 85 + delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)] 86 + try: 87 + resp = self._session.post( 88 + url, json={"name": name}, timeout=EVENT_TIMEOUT 89 + ) 90 + if resp.status_code == 200: 91 + data = resp.json() 92 + self._key = data["key"] 93 + self._persist_key(config, self._key) 94 + logger.info(f"Auto-registered as '{name}' (key: {self._key[:8]}...)") 95 + return True 96 + elif resp.status_code == 403: 97 + self._revoked = True 98 + logger.error("Registration rejected (403)") 99 + return False 100 + else: 101 + logger.warning( 102 + f"Registration attempt {attempt + 1} failed: {resp.status_code}" 103 + ) 104 + except requests.RequestException as e: 105 + logger.warning(f"Registration attempt {attempt + 1} failed: {e}") 106 + if attempt < retries - 1: 107 + time.sleep(delay) 108 + 109 + logger.error(f"Registration failed after {retries} attempts") 110 + return False 111 + 112 + @staticmethod 113 + def classify_error(status_code: int | None, is_network_error: bool = False) -> ErrorType: 114 + """Classify an error for circuit breaker and retry decisions.""" 115 + if is_network_error: 116 + return ErrorType.TRANSIENT 117 + if status_code is None: 118 + return ErrorType.TRANSIENT 119 + if status_code in (401, 403): 120 + return ErrorType.AUTH 121 + if status_code == 400: 122 + return ErrorType.CLIENT 123 + # 5xx and anything else 124 + return ErrorType.TRANSIENT 125 + 126 + def upload_segment( 127 + self, 128 + day: str, 129 + segment: str, 130 + files: list[Path], 131 + meta: dict[str, Any] | None = None, 132 + ) -> UploadResult: 133 + """Upload a segment's files to the ingest server.""" 134 + if self._revoked or not self._key or not self._url: 135 + return UploadResult(False, error_type=ErrorType.AUTH if self._revoked else None) 136 + 137 + url = f"{self._url}/app/remote/ingest/{self._key}" 138 + 139 + for attempt in range(self._max_retries): 140 + file_handles = [] 141 + files_data = [] 142 + error_type = None 143 + try: 144 + for path in files: 145 + if not path.exists(): 146 + logger.warning(f"File not found, skipping: {path}") 147 + continue 148 + fh = open(path, "rb") 149 + file_handles.append(fh) 150 + files_data.append( 151 + ("files", (path.name, fh, "application/octet-stream")) 152 + ) 153 + 154 + if not files_data: 155 + return UploadResult(False) 156 + 157 + data: dict[str, Any] = {"day": day, "segment": segment} 158 + if meta: 159 + data["meta"] = json.dumps(meta) 160 + 161 + response = self._session.post( 162 + url, data=data, files=files_data, timeout=UPLOAD_TIMEOUT 163 + ) 164 + 165 + if response.status_code == 200: 166 + resp_data = response.json() 167 + is_duplicate = resp_data.get("status") == "duplicate" 168 + return UploadResult(True, duplicate=is_duplicate) 169 + 170 + error_type = self.classify_error(response.status_code) 171 + 172 + if error_type == ErrorType.AUTH: 173 + if response.status_code == 403: 174 + self._revoked = True 175 + logger.error( 176 + f"Upload rejected ({response.status_code}): {response.text}" 177 + ) 178 + return UploadResult(False, error_type=error_type) 179 + 180 + if error_type == ErrorType.CLIENT: 181 + logger.error( 182 + f"Upload rejected ({response.status_code}): {response.text}" 183 + ) 184 + return UploadResult(False, error_type=error_type) 185 + 186 + logger.warning( 187 + f"Upload attempt {attempt + 1} failed: " 188 + f"{response.status_code} {response.text}" 189 + ) 190 + except requests.RequestException as e: 191 + error_type = ErrorType.TRANSIENT 192 + logger.warning(f"Upload attempt {attempt + 1} failed: {e}") 193 + finally: 194 + for fh in file_handles: 195 + try: 196 + fh.close() 197 + except Exception: 198 + pass 199 + 200 + if attempt < self._max_retries - 1: 201 + delay = self._retry_backoff[min(attempt, len(self._retry_backoff) - 1)] 202 + time.sleep(delay) 203 + 204 + logger.error(f"Upload failed after {self._max_retries} attempts: {day}/{segment}") 205 + return UploadResult(False, error_type=error_type) 206 + 207 + def get_server_segments(self, day: str) -> list[dict] | None: 208 + """Query server for segments on a given day. 209 + 210 + Returns list of segment dicts, or None on failure. 211 + """ 212 + if self._revoked or not self._key or not self._url: 213 + return None 214 + 215 + url = f"{self._url}/app/remote/ingest/{self._key}/segments/{day}" 216 + params = {} 217 + if self._stream: 218 + params["stream"] = self._stream 219 + 220 + try: 221 + resp = self._session.get(url, params=params, timeout=EVENT_TIMEOUT) 222 + if resp.status_code == 200: 223 + return resp.json() 224 + if resp.status_code in (401, 403): 225 + if resp.status_code == 403: 226 + self._revoked = True 227 + logger.error(f"Segments query rejected ({resp.status_code})") 228 + return None 229 + logger.warning(f"Segments query failed: {resp.status_code}") 230 + return None 231 + except requests.RequestException as e: 232 + logger.debug(f"Segments query failed: {e}") 233 + return None 234 + 235 + def relay_event(self, tract: str, event: str, **fields: Any) -> bool: 236 + """Fire-and-forget event relay.""" 237 + if self._revoked or not self._key or not self._url: 238 + return False 239 + 240 + url = f"{self._url}/app/remote/ingest/{self._key}/event" 241 + payload = {"tract": tract, "event": event, **fields} 242 + try: 243 + resp = self._session.post(url, json=payload, timeout=EVENT_TIMEOUT) 244 + if resp.status_code == 200: 245 + return True 246 + if resp.status_code == 403: 247 + self._revoked = True 248 + return False 249 + except requests.RequestException: 250 + return False 251 + 252 + def stop(self) -> None: 253 + self._session.close()
tests/__init__.py

This is a binary file and will not be displayed.

tests/__pycache__/__init__.cpython-313.pyc

This is a binary file and will not be displayed.

tests/__pycache__/test_config.cpython-313-pytest-9.0.2.pyc

This is a binary file and will not be displayed.

tests/__pycache__/test_monitor_positions.cpython-313-pytest-9.0.2.pyc

This is a binary file and will not be displayed.

tests/__pycache__/test_observer.cpython-313-pytest-9.0.2.pyc

This is a binary file and will not be displayed.

tests/__pycache__/test_session_env.cpython-313-pytest-9.0.2.pyc

This is a binary file and will not be displayed.

tests/__pycache__/test_streams.cpython-313-pytest-9.0.2.pyc

This is a binary file and will not be displayed.

tests/__pycache__/test_sync.cpython-313-pytest-9.0.2.pyc

This is a binary file and will not be displayed.

+69
tests/test_config.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from pathlib import Path 5 + 6 + from solstone_linux.config import Config, load_config, save_config 7 + 8 + 9 + class TestConfig: 10 + def test_defaults(self): 11 + config = Config() 12 + assert config.server_url == "" 13 + assert config.key == "" 14 + assert config.segment_interval == 300 15 + 16 + def test_captures_dir(self): 17 + config = Config() 18 + assert config.captures_dir == config.base_dir / "captures" 19 + 20 + def test_restore_token_path(self): 21 + config = Config() 22 + assert config.restore_token_path == config.base_dir / "config" / "restore_token" 23 + 24 + def test_round_trip(self, tmp_path: Path): 25 + config = Config(base_dir=tmp_path) 26 + config.server_url = "https://example.com" 27 + config.key = "test-key-123" 28 + config.stream = "archon" 29 + config.segment_interval = 600 30 + 31 + save_config(config) 32 + 33 + loaded = load_config(tmp_path) 34 + assert loaded.server_url == "https://example.com" 35 + assert loaded.key == "test-key-123" 36 + assert loaded.stream == "archon" 37 + assert loaded.segment_interval == 600 38 + 39 + def test_load_missing(self, tmp_path: Path): 40 + config = load_config(tmp_path) 41 + assert config.server_url == "" 42 + assert config.key == "" 43 + 44 + def test_load_corrupt(self, tmp_path: Path): 45 + config_dir = tmp_path / "config" 46 + config_dir.mkdir(parents=True) 47 + (config_dir / "config.json").write_text("not json!") 48 + 49 + config = load_config(tmp_path) 50 + assert config.server_url == "" 51 + 52 + def test_permissions(self, tmp_path: Path): 53 + config = Config(base_dir=tmp_path) 54 + config.server_url = "https://example.com" 55 + config.key = "secret" 56 + save_config(config) 57 + 58 + mode = config.config_path.stat().st_mode & 0o777 59 + assert mode == 0o600 60 + 61 + def test_sync_config_roundtrip(self, tmp_path: Path): 62 + config = Config(base_dir=tmp_path) 63 + config.sync_retry_delays = [10, 60, 300] 64 + config.sync_max_retries = 5 65 + save_config(config) 66 + 67 + loaded = load_config(tmp_path) 68 + assert loaded.sync_retry_delays == [10, 60, 300] 69 + assert loaded.sync_max_retries == 5
+58
tests/test_monitor_positions.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + from solstone_linux.monitor_positions import assign_monitor_positions 5 + 6 + 7 + class TestAssignMonitorPositions: 8 + def test_single_monitor(self): 9 + monitors = [{"id": "DP-1", "box": [0, 0, 1920, 1080]}] 10 + result = assign_monitor_positions(monitors) 11 + assert result[0]["position"] == "center" 12 + 13 + def test_two_horizontal(self): 14 + monitors = [ 15 + {"id": "DP-1", "box": [0, 0, 1920, 1080]}, 16 + {"id": "DP-2", "box": [1920, 0, 3840, 1080]}, 17 + ] 18 + result = assign_monitor_positions(monitors) 19 + positions = {m["id"]: m["position"] for m in result} 20 + assert positions["DP-1"] == "left" 21 + assert positions["DP-2"] == "right" 22 + 23 + def test_three_horizontal(self): 24 + monitors = [ 25 + {"id": "DP-1", "box": [0, 0, 1920, 1080]}, 26 + {"id": "DP-2", "box": [1920, 0, 3840, 1080]}, 27 + {"id": "DP-3", "box": [3840, 0, 5760, 1080]}, 28 + ] 29 + result = assign_monitor_positions(monitors) 30 + positions = {m["id"]: m["position"] for m in result} 31 + assert positions["DP-1"] == "left" 32 + assert positions["DP-2"] == "center" 33 + assert positions["DP-3"] == "right" 34 + 35 + def test_stacked_vertical(self): 36 + monitors = [ 37 + {"id": "DP-1", "box": [0, 0, 1920, 1080]}, 38 + {"id": "DP-2", "box": [0, 1080, 1920, 2160]}, 39 + ] 40 + result = assign_monitor_positions(monitors) 41 + positions = {m["id"]: m["position"] for m in result} 42 + assert positions["DP-1"] == "top" 43 + assert positions["DP-2"] == "bottom" 44 + 45 + def test_empty(self): 46 + assert assign_monitor_positions([]) == [] 47 + 48 + def test_offset_monitors_no_phantom_vertical(self): 49 + # Two side-by-side monitors that don't overlap horizontally 50 + # should NOT get vertical labels 51 + monitors = [ 52 + {"id": "DP-1", "box": [0, 0, 1920, 1080]}, 53 + {"id": "DP-2", "box": [1920, 200, 3840, 1280]}, 54 + ] 55 + result = assign_monitor_positions(monitors) 56 + positions = {m["id"]: m["position"] for m in result} 57 + assert positions["DP-1"] == "left" 58 + assert positions["DP-2"] == "right"
+39
tests/test_observer.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for the observer module — segment lifecycle and local cache.""" 5 + 6 + from pathlib import Path 7 + 8 + from solstone_linux.config import Config 9 + from solstone_linux.recovery import write_segment_metadata 10 + 11 + 12 + class TestSegmentMetadata: 13 + """Test .metadata file creation for recovery.""" 14 + 15 + def test_writes_metadata(self, tmp_path: Path): 16 + import json 17 + 18 + seg_dir = tmp_path / "test.incomplete" 19 + seg_dir.mkdir() 20 + write_segment_metadata(seg_dir, 1712160000.0) 21 + 22 + meta_path = seg_dir / ".metadata" 23 + assert meta_path.exists() 24 + 25 + data = json.loads(meta_path.read_text()) 26 + assert data["start_timestamp"] == 1712160000.0 27 + 28 + 29 + class TestSegmentDirStructure: 30 + """Test that config directories follow the expected structure.""" 31 + 32 + def test_captures_dir_path(self, tmp_path: Path): 33 + config = Config(base_dir=tmp_path) 34 + assert str(config.captures_dir).endswith("captures") 35 + 36 + def test_restore_token_path(self, tmp_path: Path): 37 + config = Config(base_dir=tmp_path) 38 + assert str(config.restore_token_path).endswith("restore_token") 39 + assert "config" in str(config.restore_token_path)
+47
tests/test_session_env.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + """Tests for session environment checks.""" 5 + 6 + import os 7 + from unittest.mock import patch 8 + 9 + from solstone_linux.session_env import check_session_ready 10 + 11 + 12 + class TestCheckSessionReady: 13 + """Test desktop session readiness checks.""" 14 + 15 + def test_no_display_server(self): 16 + env = { 17 + k: v for k, v in os.environ.items() 18 + if k not in ("DISPLAY", "WAYLAND_DISPLAY") 19 + } 20 + with patch.dict(os.environ, env, clear=True): 21 + with patch("solstone_linux.session_env._recover_session_env"): 22 + result = check_session_ready() 23 + assert result is not None 24 + assert "display server" in result 25 + 26 + def test_no_dbus(self): 27 + env = { 28 + k: v for k, v in os.environ.items() 29 + if k != "DBUS_SESSION_BUS_ADDRESS" 30 + } 31 + env["DISPLAY"] = ":0" 32 + with patch.dict(os.environ, env, clear=True): 33 + with patch("solstone_linux.session_env._recover_session_env"): 34 + result = check_session_ready() 35 + assert result is not None 36 + assert "DBus" in result 37 + 38 + def test_ready_with_display_and_dbus(self): 39 + env = dict(os.environ) 40 + env["DISPLAY"] = ":0" 41 + env["DBUS_SESSION_BUS_ADDRESS"] = "unix:path=/run/user/1000/bus" 42 + with patch.dict(os.environ, env, clear=True): 43 + with patch("solstone_linux.session_env._recover_session_env"): 44 + with patch("solstone_linux.session_env.shutil") as mock_shutil: 45 + mock_shutil.which.return_value = None # No pactl 46 + result = check_session_ready() 47 + assert result is None # Ready
+46
tests/test_streams.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import pytest 5 + 6 + from solstone_linux.streams import _strip_hostname, stream_name 7 + 8 + 9 + class TestStripHostname: 10 + def test_simple(self): 11 + assert _strip_hostname("archon") == "archon" 12 + 13 + def test_with_domain(self): 14 + assert _strip_hostname("ja1r.local") == "ja1r" 15 + 16 + def test_ip_address(self): 17 + assert _strip_hostname("192.168.1.1") == "192-168-1-1" 18 + 19 + def test_fqdn(self): 20 + assert _strip_hostname("my.host.example.com") == "my" 21 + 22 + def test_empty(self): 23 + assert _strip_hostname("") == "" 24 + 25 + 26 + class TestStreamName: 27 + def test_host_only(self): 28 + assert stream_name(host="archon") == "archon" 29 + 30 + def test_host_with_qualifier(self): 31 + assert stream_name(host="archon", qualifier="tmux") == "archon.tmux" 32 + 33 + def test_host_no_qualifier(self): 34 + # Linux observer uses host without qualifier 35 + assert stream_name(host="archon") == "archon" 36 + 37 + def test_remote(self): 38 + assert stream_name(remote="desktop") == "desktop" 39 + 40 + def test_rejects_empty(self): 41 + with pytest.raises(ValueError): 42 + stream_name() 43 + 44 + def test_rejects_invalid_chars(self): 45 + with pytest.raises(ValueError): 46 + stream_name(host="!invalid")
+239
tests/test_sync.py
··· 1 + # SPDX-License-Identifier: AGPL-3.0-only 2 + # Copyright (c) 2026 sol pbc 3 + 4 + import json 5 + import os 6 + import time 7 + from pathlib import Path 8 + from unittest.mock import MagicMock, patch 9 + 10 + from solstone_linux.config import Config 11 + from solstone_linux.recovery import recover_incomplete_segments 12 + from solstone_linux.upload import ErrorType, UploadClient, UploadResult 13 + 14 + 15 + class TestRecovery: 16 + """Test crash recovery for incomplete segments.""" 17 + 18 + def _make_incomplete( 19 + self, captures_dir: Path, day: str, stream: str, time_prefix: str, age: int = 300 20 + ) -> Path: 21 + """Create an incomplete segment directory with a dummy file.""" 22 + seg_dir = captures_dir / day / stream / f"{time_prefix}.incomplete" 23 + seg_dir.mkdir(parents=True) 24 + (seg_dir / "center_DP-3_screen.webm").write_bytes(b"\x00" * 100) 25 + 26 + # Set timestamps to simulate age 27 + old_time = time.time() - age 28 + os.utime(seg_dir, (old_time, old_time)) 29 + return seg_dir 30 + 31 + def test_recovers_old_incomplete(self, tmp_path: Path): 32 + captures_dir = tmp_path / "captures" 33 + self._make_incomplete(captures_dir, "20260403", "archon", "140000", age=300) 34 + 35 + recovered = recover_incomplete_segments(captures_dir) 36 + assert recovered == 1 37 + 38 + stream_dir = captures_dir / "20260403" / "archon" 39 + dirs = [d.name for d in stream_dir.iterdir() if d.is_dir()] 40 + assert len(dirs) == 1 41 + assert dirs[0].startswith("140000_") 42 + assert not dirs[0].endswith(".incomplete") 43 + 44 + def test_recovers_with_metadata(self, tmp_path: Path): 45 + """Recovery uses .metadata start_timestamp for accurate duration.""" 46 + captures_dir = tmp_path / "captures" 47 + seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 48 + seg_dir.mkdir(parents=True) 49 + (seg_dir / "center_DP-3_screen.webm").write_bytes(b"\x00" * 100) 50 + 51 + # Write metadata with known start timestamp (60 seconds ago) 52 + start_ts = time.time() - 60 53 + meta = {"start_timestamp": start_ts} 54 + (seg_dir / ".metadata").write_text(json.dumps(meta)) 55 + 56 + # Age the directory 57 + old_time = time.time() - 300 58 + os.utime(seg_dir, (old_time, old_time)) 59 + 60 + recovered = recover_incomplete_segments(captures_dir) 61 + assert recovered == 1 62 + 63 + stream_dir = captures_dir / "20260403" / "archon" 64 + dirs = [d.name for d in stream_dir.iterdir() if d.is_dir()] 65 + assert len(dirs) == 1 66 + # Duration should be based on metadata start timestamp, not mtime-ctime 67 + duration = int(dirs[0].split("_")[1]) 68 + assert 55 <= duration <= 65 # ~60 seconds 69 + 70 + def test_skips_recent_incomplete(self, tmp_path: Path): 71 + captures_dir = tmp_path / "captures" 72 + seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 73 + seg_dir.mkdir(parents=True) 74 + (seg_dir / "test.webm").write_bytes(b"\x00") 75 + 76 + recovered = recover_incomplete_segments(captures_dir) 77 + assert recovered == 0 78 + assert seg_dir.exists() 79 + 80 + def test_marks_empty_as_failed(self, tmp_path: Path): 81 + captures_dir = tmp_path / "captures" 82 + seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 83 + seg_dir.mkdir(parents=True) 84 + # No files inside — should fail 85 + 86 + old_time = time.time() - 300 87 + os.utime(seg_dir, (old_time, old_time)) 88 + 89 + recovered = recover_incomplete_segments(captures_dir) 90 + assert recovered == 0 91 + 92 + failed_dir = captures_dir / "20260403" / "archon" / "140000.failed" 93 + assert failed_dir.exists() 94 + 95 + def test_metadata_removed_on_recovery(self, tmp_path: Path): 96 + """The .metadata file should be removed during recovery.""" 97 + captures_dir = tmp_path / "captures" 98 + seg_dir = captures_dir / "20260403" / "archon" / "140000.incomplete" 99 + seg_dir.mkdir(parents=True) 100 + (seg_dir / "screen.webm").write_bytes(b"\x00") 101 + (seg_dir / ".metadata").write_text('{"start_timestamp": 1000}') 102 + 103 + old_time = time.time() - 300 104 + os.utime(seg_dir, (old_time, old_time)) 105 + 106 + recover_incomplete_segments(captures_dir) 107 + 108 + stream_dir = captures_dir / "20260403" / "archon" 109 + for d in stream_dir.iterdir(): 110 + if d.is_dir() and not d.name.endswith((".incomplete", ".failed")): 111 + # .metadata should not be in the recovered dir 112 + assert not (d / ".metadata").exists() 113 + 114 + def test_no_captures_dir(self, tmp_path: Path): 115 + assert recover_incomplete_segments(tmp_path / "nonexistent") == 0 116 + 117 + 118 + class TestSyncServiceCollect: 119 + """Test segment collection logic.""" 120 + 121 + def test_skips_incomplete_and_failed(self, tmp_path: Path): 122 + from solstone_linux.sync import SyncService 123 + 124 + config = Config(base_dir=tmp_path) 125 + config.ensure_dirs() 126 + 127 + captures = config.captures_dir 128 + stream_dir = captures / "20260403" / "archon" 129 + stream_dir.mkdir(parents=True) 130 + 131 + (stream_dir / "140000_300").mkdir() 132 + (stream_dir / "140000_300" / "screen.webm").write_bytes(b"\x00") 133 + (stream_dir / "145000.incomplete").mkdir() 134 + (stream_dir / "143000.failed").mkdir() 135 + (stream_dir / "150000_300").mkdir() 136 + (stream_dir / "150000_300" / "audio.flac").write_bytes(b"\x00") 137 + 138 + client = UploadClient(config) 139 + sync = SyncService(config, client) 140 + 141 + segments = sync._collect_segments(captures) 142 + assert "20260403" in segments 143 + names = [s.name for s in segments["20260403"]] 144 + assert "140000_300" in names 145 + assert "150000_300" in names 146 + assert "145000.incomplete" not in names 147 + assert "143000.failed" not in names 148 + 149 + 150 + class TestSyncedDaysPruning: 151 + """Test that synced-days cache is pruned to 90 days.""" 152 + 153 + def test_prunes_old_entries(self, tmp_path: Path): 154 + from solstone_linux.sync import SyncService 155 + 156 + config = Config(base_dir=tmp_path) 157 + config.ensure_dirs() 158 + 159 + client = UploadClient(config) 160 + sync = SyncService(config, client) 161 + 162 + # Add entries spanning 100 days 163 + from datetime import datetime, timedelta 164 + today = datetime.now() 165 + for i in range(100): 166 + day = (today - timedelta(days=i)).strftime("%Y%m%d") 167 + sync._synced_days.add(day) 168 + 169 + sync._prune_synced_days() 170 + 171 + # Should have ~90 entries (not 100) 172 + assert len(sync._synced_days) <= 91 # Allow 1 day tolerance 173 + 174 + 175 + class TestErrorClassification: 176 + """Test HTTP error classification for circuit breaker tuning.""" 177 + 178 + def test_auth_errors(self): 179 + assert UploadClient.classify_error(401) == ErrorType.AUTH 180 + assert UploadClient.classify_error(403) == ErrorType.AUTH 181 + 182 + def test_client_errors(self): 183 + assert UploadClient.classify_error(400) == ErrorType.CLIENT 184 + 185 + def test_transient_errors(self): 186 + assert UploadClient.classify_error(500) == ErrorType.TRANSIENT 187 + assert UploadClient.classify_error(502) == ErrorType.TRANSIENT 188 + assert UploadClient.classify_error(503) == ErrorType.TRANSIENT 189 + 190 + def test_network_errors(self): 191 + assert UploadClient.classify_error(None, is_network_error=True) == ErrorType.TRANSIENT 192 + 193 + def test_unknown_status(self): 194 + assert UploadClient.classify_error(418) == ErrorType.TRANSIENT 195 + 196 + 197 + class TestCircuitBreakerThresholds: 198 + """Test circuit breaker state transitions with error-type tuning.""" 199 + 200 + def test_auth_opens_immediately(self, tmp_path: Path): 201 + from solstone_linux.sync import SyncService, CIRCUIT_THRESHOLD_AUTH 202 + 203 + config = Config(base_dir=tmp_path) 204 + config.ensure_dirs() 205 + client = UploadClient(config) 206 + sync = SyncService(config, client) 207 + 208 + sync._last_error_type = ErrorType.AUTH 209 + assert sync._circuit_threshold() == CIRCUIT_THRESHOLD_AUTH 210 + assert CIRCUIT_THRESHOLD_AUTH == 1 211 + 212 + def test_transient_allows_more_failures(self, tmp_path: Path): 213 + from solstone_linux.sync import SyncService, CIRCUIT_THRESHOLD_TRANSIENT 214 + 215 + config = Config(base_dir=tmp_path) 216 + config.ensure_dirs() 217 + client = UploadClient(config) 218 + sync = SyncService(config, client) 219 + 220 + sync._last_error_type = ErrorType.TRANSIENT 221 + assert sync._circuit_threshold() == CIRCUIT_THRESHOLD_TRANSIENT 222 + assert CIRCUIT_THRESHOLD_TRANSIENT >= 5 223 + 224 + 225 + class TestRetryCapRespected: 226 + """Test that upload respects configured retry cap (no hard min(config,3)).""" 227 + 228 + def test_respects_configured_max_retries(self): 229 + """Upload client should use the configured max_retries, not cap at 3.""" 230 + config = Config() 231 + config.sync_max_retries = 10 232 + client = UploadClient(config) 233 + assert client._max_retries == 10 234 + 235 + def test_low_max_retries_respected(self): 236 + config = Config() 237 + config.sync_max_retries = 1 238 + client = UploadClient(config) 239 + assert client._max_retries == 1