linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""
5Standalone Linux desktop observer — screen + audio capture.
6
7Continuously captures audio and manages screencast recording based on activity.
8Creates 5-minute segments in a local cache directory. The sync service handles
9all uploads — the observer only writes locally.
10
11Key architectural change from monorepo version:
12- Capture writes completed segments to local cache only
13- No ObserverClient usage in boundary handling — no network calls in capture loop
14- Sync service picks up completed segments and uploads asynchronously
15
16State machine:
17 SCREENCAST: Screen is active, recording video
18 IDLE: Screen is inactive
19"""
20
21import asyncio
22import datetime
23import logging
24import os
25import platform
26import signal
27import socket
28import time
29from pathlib import Path
30
31import numpy as np
32from dbus_next.aio import MessageBus
33from dbus_next.constants import BusType
34
35from .activity import get_idle_time_ms, is_power_save_active, is_screen_locked
36from .audio_mute import is_sink_muted
37from .audio_recorder import AudioRecorder
38from .config import Config
39from .recovery import write_segment_metadata
40from .screencast import Screencaster, StreamInfo
41from .streams import stream_name
42from .sync import SyncService
43from .upload import UploadClient
44
45logger = logging.getLogger(__name__)
46
47# Host identification
48HOST = socket.gethostname()
49PLATFORM = platform.system().lower()
50
51# Constants
52IDLE_THRESHOLD_MS = 5 * 60 * 1000 # 5 minutes
53RMS_THRESHOLD = 0.01
54MIN_HITS_FOR_SAVE = 3
55CHUNK_DURATION = 5 # seconds
56
57# Capture modes
58MODE_IDLE = "idle"
59MODE_SCREENCAST = "screencast"
60
61# Audio detection retry
62DETECT_RETRIES = 3
63DETECT_RETRY_DELAY = 5 # seconds
64
65
66def _get_timestamp_parts(timestamp: float | None = None) -> tuple[str, str]:
67 """Get date and time parts from timestamp."""
68 if timestamp is None:
69 timestamp = time.time()
70 dt = datetime.datetime.fromtimestamp(timestamp)
71 return dt.strftime("%Y%m%d"), dt.strftime("%H%M%S")
72
73
74class Observer:
75 """Unified audio and screencast observer with local cache + sync."""
76
77 def __init__(self, config: Config):
78 self.config = config
79 self.interval = config.segment_interval
80 self.audio_recorder = AudioRecorder()
81 self.screencaster = Screencaster(config.restore_token_path)
82 self.bus: MessageBus | None = None
83 self.running = True
84 self.stream = config.stream
85
86 self._client: UploadClient | None = None
87 self._sync: SyncService | None = None
88
89 # State tracking
90 self.start_at = time.time()
91 self.start_at_mono = time.monotonic()
92 self.threshold_hits = 0
93 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2)
94
95 # Mode tracking
96 self.current_mode = MODE_IDLE
97
98 # Segment directory (HHMMSS.incomplete/)
99 self.segment_dir: Path | None = None
100
101 # Multi-file screencast tracking
102 self.current_streams: list[StreamInfo] = []
103
104 # Activity status cache (updated each loop)
105 self.cached_is_active = False
106 self.cached_idle_time_ms = 0
107 self.cached_screen_locked = False
108 self.cached_is_muted = False
109 self.cached_power_save = False
110
111 # Mute state at segment start (determines save format)
112 self.segment_is_muted = False
113
114 async def setup(self) -> bool:
115 """Initialize audio devices, DBus connection, and sync service."""
116 # Detect audio devices with retry (devices may still be initializing)
117 detected = False
118 for attempt in range(DETECT_RETRIES):
119 if self.audio_recorder.detect():
120 detected = True
121 break
122 if attempt < DETECT_RETRIES - 1:
123 logger.info(
124 "Audio detection attempt %d/%d failed, retrying in %ds",
125 attempt + 1,
126 DETECT_RETRIES,
127 DETECT_RETRY_DELAY,
128 )
129 await asyncio.sleep(DETECT_RETRY_DELAY)
130 if not detected:
131 logger.error("Failed to detect audio devices")
132 return False
133
134 self.audio_recorder.start_recording()
135 logger.info("Audio recording started")
136
137 # Connect to DBus for idle/lock detection
138 self.bus = await MessageBus(bus_type=BusType.SESSION).connect()
139 logger.info("DBus connection established")
140
141 # Verify portal is available (exit if not)
142 if not await self.screencaster.connect():
143 logger.error("Screencast portal not available")
144 return False
145 logger.info("Screencast portal connected")
146
147 # Initialize upload client and sync service
148 self._client = UploadClient(self.config)
149 if self.config.server_url:
150 self._client.ensure_registered(self.config)
151 self._sync = SyncService(self.config, self._client)
152 logger.info("Sync service initialized")
153
154 return True
155
156 async def check_activity_status(self) -> str:
157 """Check system activity status and determine capture mode."""
158 idle_time = await get_idle_time_ms(self.bus)
159 screen_locked = await is_screen_locked(self.bus)
160 power_save = await is_power_save_active(self.bus)
161 sink_muted = await is_sink_muted()
162
163 # Cache values for status events
164 self.cached_idle_time_ms = idle_time
165 self.cached_screen_locked = screen_locked
166 self.cached_is_muted = sink_muted
167 self.cached_power_save = power_save
168
169 # Determine screen activity
170 screen_idle = (idle_time > IDLE_THRESHOLD_MS) or screen_locked or power_save
171 screen_active = not screen_idle
172
173 # Determine mode
174 if screen_active:
175 mode = MODE_SCREENCAST
176 else:
177 mode = MODE_IDLE
178
179 # Cache legacy is_active for audio threshold logic
180 has_audio_activity = self.threshold_hits >= MIN_HITS_FOR_SAVE
181 self.cached_is_active = screen_active or has_audio_activity
182
183 return mode
184
185 def compute_rms(self, audio_buffer: np.ndarray) -> float:
186 """Compute per-channel RMS and return maximum (stereo: mic=left, sys=right)."""
187 if audio_buffer.size == 0:
188 return 0.0
189 rms_left = float(np.sqrt(np.mean(audio_buffer[:, 0] ** 2)))
190 rms_right = float(np.sqrt(np.mean(audio_buffer[:, 1] ** 2)))
191 return max(rms_left, rms_right)
192
193 def _save_audio_segment(self, segment_dir: Path, is_muted: bool) -> list[str]:
194 """Save accumulated audio buffer to segment directory."""
195 if self.accumulated_audio_buffer.size == 0:
196 logger.warning("No audio buffer to save")
197 return []
198
199 if is_muted:
200 # Split mode: save mic and sys as separate mono files
201 mic_data = self.accumulated_audio_buffer[:, 0]
202 sys_data = self.accumulated_audio_buffer[:, 1]
203
204 mic_bytes = self.audio_recorder.create_mono_flac_bytes(mic_data)
205 sys_bytes = self.audio_recorder.create_mono_flac_bytes(sys_data)
206
207 (segment_dir / "mic_audio.flac").write_bytes(mic_bytes)
208 (segment_dir / "sys_audio.flac").write_bytes(sys_bytes)
209
210 logger.info(f"Saved split audio (muted): {segment_dir}")
211 return ["mic_audio.flac", "sys_audio.flac"]
212 else:
213 # Normal mode: save combined stereo file
214 flac_bytes = self.audio_recorder.create_flac_bytes(
215 self.accumulated_audio_buffer
216 )
217 (segment_dir / "audio.flac").write_bytes(flac_bytes)
218
219 logger.info(f"Saved audio to {segment_dir}/audio.flac")
220 return ["audio.flac"]
221
222 def _start_segment(self) -> Path:
223 """Start a new segment with .incomplete directory."""
224 self.start_at = time.time()
225 self.start_at_mono = time.monotonic()
226
227 date_part, time_part = _get_timestamp_parts(self.start_at)
228 captures_dir = self.config.captures_dir
229
230 # Create YYYYMMDD/stream/HHMMSS.incomplete/
231 segment_dir = captures_dir / date_part / self.stream / f"{time_part}.incomplete"
232 segment_dir.mkdir(parents=True, exist_ok=True)
233 self.segment_dir = segment_dir
234
235 # Write metadata for recovery
236 write_segment_metadata(segment_dir, self.start_at)
237
238 return segment_dir
239
240 def _finalize_segment(self) -> str | None:
241 """Rename .incomplete to HHMMSS_DDD/ and return segment key."""
242 if not self.segment_dir or not self.segment_dir.exists():
243 return None
244
245 # Remove .metadata before finalizing
246 meta_path = self.segment_dir / ".metadata"
247 if meta_path.exists():
248 try:
249 meta_path.unlink()
250 except OSError:
251 pass
252
253 # Check if there are any actual files
254 contents = [f for f in self.segment_dir.iterdir() if f.is_file()]
255 if not contents:
256 # Empty segment, remove it
257 try:
258 os.rmdir(str(self.segment_dir))
259 except OSError:
260 pass
261 return None
262
263 _, time_part = _get_timestamp_parts(self.start_at)
264 duration = int(time.time() - self.start_at)
265 segment_key = f"{time_part}_{duration}"
266 final_dir = self.segment_dir.parent / segment_key
267
268 try:
269 os.rename(str(self.segment_dir), str(final_dir))
270 logger.info(f"Segment finalized: {segment_key}")
271 return segment_key
272 except OSError as e:
273 logger.error(f"Failed to finalize segment: {e}")
274 return None
275
276 async def handle_boundary(self, new_mode: str):
277 """Handle window boundary rollover.
278
279 Closes the current segment, writes audio, finalizes to local cache,
280 and triggers sync. No network calls in the capture loop.
281 """
282 # Stop screencast first (closes file handles)
283 if self.current_mode == MODE_SCREENCAST:
284 logger.info("Stopping previous screencast")
285 await self.screencaster.stop()
286 self.current_streams = []
287
288 # Save audio if we have enough threshold hits
289 did_save_audio = self.threshold_hits >= MIN_HITS_FOR_SAVE
290 if did_save_audio and self.segment_dir:
291 audio_files = self._save_audio_segment(
292 self.segment_dir, self.segment_is_muted
293 )
294 if audio_files:
295 logger.info(
296 f"Saved {len(audio_files)} audio file(s) ({self.threshold_hits} hits)"
297 )
298 else:
299 logger.debug(
300 f"Skipping audio save (only {self.threshold_hits}/{MIN_HITS_FOR_SAVE} hits)"
301 )
302
303 # Reset audio state
304 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2)
305 self.threshold_hits = 0
306
307 # Finalize segment (rename .incomplete -> HHMMSS_DDD/)
308 segment_key = self._finalize_segment()
309 self.segment_dir = None
310
311 # Trigger sync to upload the completed segment
312 if segment_key and self._sync:
313 self._sync.trigger()
314
315 # Update segment mute state for new segment
316 self.segment_is_muted = self.cached_is_muted
317
318 # Update mode
319 old_mode = self.current_mode
320 self.current_mode = new_mode
321
322 # Start new capture based on mode
323 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked:
324 await self.initialize_screencast()
325 else:
326 self._start_segment()
327
328 logger.info(f"Mode transition: {old_mode} -> {new_mode}")
329
330 async def initialize_screencast(self) -> bool:
331 """Start a new screencast recording.
332
333 Creates a segment directory and starts GStreamer recording to it.
334 """
335 segment_dir = self._start_segment()
336
337 try:
338 streams = await self.screencaster.start(
339 str(segment_dir), framerate=1, draw_cursor=True
340 )
341 except RuntimeError as e:
342 logger.error(f"Failed to start screencast: {e}")
343 raise
344
345 if not streams:
346 logger.error("No streams returned from screencast start")
347 raise RuntimeError("No streams available")
348
349 self.current_streams = streams
350
351 logger.info(f"Started screencast with {len(streams)} stream(s)")
352 for stream in streams:
353 logger.info(f" {stream.position} ({stream.connector}): {stream.file_path}")
354
355 return True
356
357 def emit_status(self):
358 """Emit observe.status event with current state (fire-and-forget)."""
359 if not self._client:
360 return
361
362 elapsed = int(time.monotonic() - self.start_at_mono)
363
364 # Screencast info
365 if self.current_mode == MODE_SCREENCAST and self.current_streams:
366 streams_info = [
367 {
368 "position": stream.position,
369 "connector": stream.connector,
370 "file": stream.file_path,
371 }
372 for stream in self.current_streams
373 ]
374 screencast_info = {
375 "recording": True,
376 "streams": streams_info,
377 "window_elapsed_seconds": elapsed,
378 }
379 else:
380 screencast_info = {"recording": False}
381
382 # Audio info
383 audio_info = {
384 "threshold_hits": self.threshold_hits,
385 "will_save": self.threshold_hits >= MIN_HITS_FOR_SAVE,
386 }
387
388 # Activity info
389 activity_info = {
390 "active": self.cached_is_active,
391 "idle_time_ms": self.cached_idle_time_ms,
392 "screen_locked": self.cached_screen_locked,
393 "sink_muted": self.cached_is_muted,
394 "power_save": self.cached_power_save,
395 }
396
397 self._client.relay_event(
398 "observe",
399 "status",
400 mode=self.current_mode,
401 screencast=screencast_info,
402 audio=audio_info,
403 activity=activity_info,
404 host=HOST,
405 platform=PLATFORM,
406 stream=self.stream,
407 )
408
409 async def main_loop(self):
410 """Run the main observer loop with background sync."""
411 logger.info(f"Starting observer loop (interval={self.interval}s)")
412
413 # Start sync service as background task
414 sync_task = None
415 if self._sync:
416 sync_task = asyncio.create_task(self._sync.run())
417
418 # Determine initial mode
419 new_mode = await self.check_activity_status()
420 self.segment_is_muted = self.cached_is_muted
421 self.current_mode = new_mode
422
423 # Start initial capture based on mode
424 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked:
425 try:
426 await self.initialize_screencast()
427 except RuntimeError:
428 self.running = False
429 if sync_task:
430 if self._sync:
431 self._sync.stop()
432 sync_task.cancel()
433 try:
434 await sync_task
435 except asyncio.CancelledError:
436 pass
437 return
438 else:
439 self._start_segment()
440
441 logger.info(f"Initial mode: {self.current_mode}")
442
443 try:
444 while self.running:
445 await asyncio.sleep(CHUNK_DURATION)
446
447 # Check activity status and determine new mode
448 new_mode = await self.check_activity_status()
449
450 # Check for GStreamer failure mid-recording
451 if (
452 self.current_mode == MODE_SCREENCAST
453 and not self.screencaster.is_healthy()
454 ):
455 logger.warning("Screencast recording failed, stopping gracefully")
456 await self.screencaster.stop()
457 self.current_streams = []
458 self.current_mode = MODE_IDLE
459
460 # Detect mode change
461 mode_changed = new_mode != self.current_mode
462 if mode_changed:
463 logger.info(f"Mode changing: {self.current_mode} -> {new_mode}")
464
465 # Only trigger segment boundary on screencast transitions
466 screencast_transition = mode_changed and (
467 self.current_mode == MODE_SCREENCAST or new_mode == MODE_SCREENCAST
468 )
469
470 # Detect mute state transition
471 mute_transition = self.cached_is_muted != self.segment_is_muted
472 if mute_transition:
473 logger.info(
474 f"Mute state changed: "
475 f"{'muted' if self.segment_is_muted else 'unmuted'} -> "
476 f"{'muted' if self.cached_is_muted else 'unmuted'}"
477 )
478
479 # Capture audio buffer for this chunk
480 audio_chunk = self.audio_recorder.get_buffers()
481
482 if audio_chunk.size > 0:
483 self.accumulated_audio_buffer = np.vstack(
484 (self.accumulated_audio_buffer, audio_chunk)
485 )
486 rms = self.compute_rms(audio_chunk)
487 if rms > RMS_THRESHOLD:
488 self.threshold_hits += 1
489 logger.debug(
490 f"RMS {rms:.4f} > threshold (hit {self.threshold_hits})"
491 )
492 else:
493 logger.debug(f"RMS {rms:.4f} below threshold")
494 else:
495 logger.debug("No audio data in chunk")
496
497 # Check for window boundary (monotonic to avoid DST/clock jumps)
498 elapsed = time.monotonic() - self.start_at_mono
499 is_boundary = (
500 (elapsed >= self.interval) or screencast_transition or mute_transition
501 )
502
503 if is_boundary:
504 logger.info(
505 f"Boundary: elapsed={elapsed:.1f}s screencast_change={screencast_transition} "
506 f"mute_change={mute_transition} "
507 f"hits={self.threshold_hits}/{MIN_HITS_FOR_SAVE}"
508 )
509 await self.handle_boundary(new_mode)
510
511 # Emit status event
512 self.emit_status()
513 finally:
514 # Cleanup on exit
515 logger.info("Observer loop stopped, cleaning up...")
516 await self.shutdown()
517 if sync_task:
518 if self._sync:
519 self._sync.stop()
520 sync_task.cancel()
521 try:
522 await sync_task
523 except asyncio.CancelledError:
524 pass
525
526 async def shutdown(self):
527 """Clean shutdown of observer."""
528 # Stop screencast first (closes file handles)
529 if self.current_mode == MODE_SCREENCAST:
530 logger.info("Stopping screencast for shutdown")
531 await self.screencaster.stop()
532 await asyncio.sleep(0.5)
533
534 # Save final audio if threshold met
535 if self.threshold_hits >= MIN_HITS_FOR_SAVE and self.segment_dir:
536 audio_files = self._save_audio_segment(
537 self.segment_dir, self.segment_is_muted
538 )
539 if audio_files:
540 logger.info(f"Saved final audio: {len(audio_files)} file(s)")
541
542 # Finalize segment locally
543 segment_key = self._finalize_segment()
544 self.segment_dir = None
545
546 if segment_key:
547 logger.info(f"Finalized segment locally: {segment_key} (shutdown)")
548
549 # Stop audio recorder
550 self.audio_recorder.stop_recording()
551 logger.info("Audio recording stopped")
552
553 if self._client:
554 self._client.stop()
555 self._client = None
556 logger.info("Client stopped")
557
558
559async def async_run(config: Config) -> int:
560 """Async entry point for the observer."""
561 from .session_env import check_session_ready
562
563 # Pre-flight: check session prerequisites
564 not_ready = check_session_ready()
565 if not_ready:
566 logger.warning("Session not ready: %s", not_ready)
567 return 75 # EXIT_TEMPFAIL
568
569 observer = Observer(config)
570
571 loop = asyncio.get_running_loop()
572
573 def signal_handler():
574 logger.info("Received shutdown signal")
575 observer.running = False
576
577 for sig in (signal.SIGINT, signal.SIGTERM):
578 loop.add_signal_handler(sig, signal_handler)
579
580 if not await observer.setup():
581 logger.error("Observer setup failed")
582 return 1
583
584 try:
585 await observer.main_loop()
586 except RuntimeError as e:
587 logger.error(f"Observer runtime error: {e}")
588 return 1
589 except Exception as e:
590 logger.error(f"Observer error: {e}", exc_info=True)
591 return 1
592
593 return 0