linux observer
at main 593 lines 21 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4""" 5Standalone Linux desktop observer — screen + audio capture. 6 7Continuously captures audio and manages screencast recording based on activity. 8Creates 5-minute segments in a local cache directory. The sync service handles 9all uploads — the observer only writes locally. 10 11Key architectural change from monorepo version: 12- Capture writes completed segments to local cache only 13- No ObserverClient usage in boundary handling — no network calls in capture loop 14- Sync service picks up completed segments and uploads asynchronously 15 16State machine: 17 SCREENCAST: Screen is active, recording video 18 IDLE: Screen is inactive 19""" 20 21import asyncio 22import datetime 23import logging 24import os 25import platform 26import signal 27import socket 28import time 29from pathlib import Path 30 31import numpy as np 32from dbus_next.aio import MessageBus 33from dbus_next.constants import BusType 34 35from .activity import get_idle_time_ms, is_power_save_active, is_screen_locked 36from .audio_mute import is_sink_muted 37from .audio_recorder import AudioRecorder 38from .config import Config 39from .recovery import write_segment_metadata 40from .screencast import Screencaster, StreamInfo 41from .streams import stream_name 42from .sync import SyncService 43from .upload import UploadClient 44 45logger = logging.getLogger(__name__) 46 47# Host identification 48HOST = socket.gethostname() 49PLATFORM = platform.system().lower() 50 51# Constants 52IDLE_THRESHOLD_MS = 5 * 60 * 1000 # 5 minutes 53RMS_THRESHOLD = 0.01 54MIN_HITS_FOR_SAVE = 3 55CHUNK_DURATION = 5 # seconds 56 57# Capture modes 58MODE_IDLE = "idle" 59MODE_SCREENCAST = "screencast" 60 61# Audio detection retry 62DETECT_RETRIES = 3 63DETECT_RETRY_DELAY = 5 # seconds 64 65 66def _get_timestamp_parts(timestamp: float | None = None) -> tuple[str, str]: 67 """Get date and time parts from timestamp.""" 68 if timestamp is None: 69 timestamp = time.time() 70 dt = datetime.datetime.fromtimestamp(timestamp) 71 return dt.strftime("%Y%m%d"), dt.strftime("%H%M%S") 72 73 74class Observer: 75 """Unified audio and screencast observer with local cache + sync.""" 76 77 def __init__(self, config: Config): 78 self.config = config 79 self.interval = config.segment_interval 80 self.audio_recorder = AudioRecorder() 81 self.screencaster = Screencaster(config.restore_token_path) 82 self.bus: MessageBus | None = None 83 self.running = True 84 self.stream = config.stream 85 86 self._client: UploadClient | None = None 87 self._sync: SyncService | None = None 88 89 # State tracking 90 self.start_at = time.time() 91 self.start_at_mono = time.monotonic() 92 self.threshold_hits = 0 93 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2) 94 95 # Mode tracking 96 self.current_mode = MODE_IDLE 97 98 # Segment directory (HHMMSS.incomplete/) 99 self.segment_dir: Path | None = None 100 101 # Multi-file screencast tracking 102 self.current_streams: list[StreamInfo] = [] 103 104 # Activity status cache (updated each loop) 105 self.cached_is_active = False 106 self.cached_idle_time_ms = 0 107 self.cached_screen_locked = False 108 self.cached_is_muted = False 109 self.cached_power_save = False 110 111 # Mute state at segment start (determines save format) 112 self.segment_is_muted = False 113 114 async def setup(self) -> bool: 115 """Initialize audio devices, DBus connection, and sync service.""" 116 # Detect audio devices with retry (devices may still be initializing) 117 detected = False 118 for attempt in range(DETECT_RETRIES): 119 if self.audio_recorder.detect(): 120 detected = True 121 break 122 if attempt < DETECT_RETRIES - 1: 123 logger.info( 124 "Audio detection attempt %d/%d failed, retrying in %ds", 125 attempt + 1, 126 DETECT_RETRIES, 127 DETECT_RETRY_DELAY, 128 ) 129 await asyncio.sleep(DETECT_RETRY_DELAY) 130 if not detected: 131 logger.error("Failed to detect audio devices") 132 return False 133 134 self.audio_recorder.start_recording() 135 logger.info("Audio recording started") 136 137 # Connect to DBus for idle/lock detection 138 self.bus = await MessageBus(bus_type=BusType.SESSION).connect() 139 logger.info("DBus connection established") 140 141 # Verify portal is available (exit if not) 142 if not await self.screencaster.connect(): 143 logger.error("Screencast portal not available") 144 return False 145 logger.info("Screencast portal connected") 146 147 # Initialize upload client and sync service 148 self._client = UploadClient(self.config) 149 if self.config.server_url: 150 self._client.ensure_registered(self.config) 151 self._sync = SyncService(self.config, self._client) 152 logger.info("Sync service initialized") 153 154 return True 155 156 async def check_activity_status(self) -> str: 157 """Check system activity status and determine capture mode.""" 158 idle_time = await get_idle_time_ms(self.bus) 159 screen_locked = await is_screen_locked(self.bus) 160 power_save = await is_power_save_active(self.bus) 161 sink_muted = await is_sink_muted() 162 163 # Cache values for status events 164 self.cached_idle_time_ms = idle_time 165 self.cached_screen_locked = screen_locked 166 self.cached_is_muted = sink_muted 167 self.cached_power_save = power_save 168 169 # Determine screen activity 170 screen_idle = (idle_time > IDLE_THRESHOLD_MS) or screen_locked or power_save 171 screen_active = not screen_idle 172 173 # Determine mode 174 if screen_active: 175 mode = MODE_SCREENCAST 176 else: 177 mode = MODE_IDLE 178 179 # Cache legacy is_active for audio threshold logic 180 has_audio_activity = self.threshold_hits >= MIN_HITS_FOR_SAVE 181 self.cached_is_active = screen_active or has_audio_activity 182 183 return mode 184 185 def compute_rms(self, audio_buffer: np.ndarray) -> float: 186 """Compute per-channel RMS and return maximum (stereo: mic=left, sys=right).""" 187 if audio_buffer.size == 0: 188 return 0.0 189 rms_left = float(np.sqrt(np.mean(audio_buffer[:, 0] ** 2))) 190 rms_right = float(np.sqrt(np.mean(audio_buffer[:, 1] ** 2))) 191 return max(rms_left, rms_right) 192 193 def _save_audio_segment(self, segment_dir: Path, is_muted: bool) -> list[str]: 194 """Save accumulated audio buffer to segment directory.""" 195 if self.accumulated_audio_buffer.size == 0: 196 logger.warning("No audio buffer to save") 197 return [] 198 199 if is_muted: 200 # Split mode: save mic and sys as separate mono files 201 mic_data = self.accumulated_audio_buffer[:, 0] 202 sys_data = self.accumulated_audio_buffer[:, 1] 203 204 mic_bytes = self.audio_recorder.create_mono_flac_bytes(mic_data) 205 sys_bytes = self.audio_recorder.create_mono_flac_bytes(sys_data) 206 207 (segment_dir / "mic_audio.flac").write_bytes(mic_bytes) 208 (segment_dir / "sys_audio.flac").write_bytes(sys_bytes) 209 210 logger.info(f"Saved split audio (muted): {segment_dir}") 211 return ["mic_audio.flac", "sys_audio.flac"] 212 else: 213 # Normal mode: save combined stereo file 214 flac_bytes = self.audio_recorder.create_flac_bytes( 215 self.accumulated_audio_buffer 216 ) 217 (segment_dir / "audio.flac").write_bytes(flac_bytes) 218 219 logger.info(f"Saved audio to {segment_dir}/audio.flac") 220 return ["audio.flac"] 221 222 def _start_segment(self) -> Path: 223 """Start a new segment with .incomplete directory.""" 224 self.start_at = time.time() 225 self.start_at_mono = time.monotonic() 226 227 date_part, time_part = _get_timestamp_parts(self.start_at) 228 captures_dir = self.config.captures_dir 229 230 # Create YYYYMMDD/stream/HHMMSS.incomplete/ 231 segment_dir = captures_dir / date_part / self.stream / f"{time_part}.incomplete" 232 segment_dir.mkdir(parents=True, exist_ok=True) 233 self.segment_dir = segment_dir 234 235 # Write metadata for recovery 236 write_segment_metadata(segment_dir, self.start_at) 237 238 return segment_dir 239 240 def _finalize_segment(self) -> str | None: 241 """Rename .incomplete to HHMMSS_DDD/ and return segment key.""" 242 if not self.segment_dir or not self.segment_dir.exists(): 243 return None 244 245 # Remove .metadata before finalizing 246 meta_path = self.segment_dir / ".metadata" 247 if meta_path.exists(): 248 try: 249 meta_path.unlink() 250 except OSError: 251 pass 252 253 # Check if there are any actual files 254 contents = [f for f in self.segment_dir.iterdir() if f.is_file()] 255 if not contents: 256 # Empty segment, remove it 257 try: 258 os.rmdir(str(self.segment_dir)) 259 except OSError: 260 pass 261 return None 262 263 _, time_part = _get_timestamp_parts(self.start_at) 264 duration = int(time.time() - self.start_at) 265 segment_key = f"{time_part}_{duration}" 266 final_dir = self.segment_dir.parent / segment_key 267 268 try: 269 os.rename(str(self.segment_dir), str(final_dir)) 270 logger.info(f"Segment finalized: {segment_key}") 271 return segment_key 272 except OSError as e: 273 logger.error(f"Failed to finalize segment: {e}") 274 return None 275 276 async def handle_boundary(self, new_mode: str): 277 """Handle window boundary rollover. 278 279 Closes the current segment, writes audio, finalizes to local cache, 280 and triggers sync. No network calls in the capture loop. 281 """ 282 # Stop screencast first (closes file handles) 283 if self.current_mode == MODE_SCREENCAST: 284 logger.info("Stopping previous screencast") 285 await self.screencaster.stop() 286 self.current_streams = [] 287 288 # Save audio if we have enough threshold hits 289 did_save_audio = self.threshold_hits >= MIN_HITS_FOR_SAVE 290 if did_save_audio and self.segment_dir: 291 audio_files = self._save_audio_segment( 292 self.segment_dir, self.segment_is_muted 293 ) 294 if audio_files: 295 logger.info( 296 f"Saved {len(audio_files)} audio file(s) ({self.threshold_hits} hits)" 297 ) 298 else: 299 logger.debug( 300 f"Skipping audio save (only {self.threshold_hits}/{MIN_HITS_FOR_SAVE} hits)" 301 ) 302 303 # Reset audio state 304 self.accumulated_audio_buffer = np.array([], dtype=np.float32).reshape(0, 2) 305 self.threshold_hits = 0 306 307 # Finalize segment (rename .incomplete -> HHMMSS_DDD/) 308 segment_key = self._finalize_segment() 309 self.segment_dir = None 310 311 # Trigger sync to upload the completed segment 312 if segment_key and self._sync: 313 self._sync.trigger() 314 315 # Update segment mute state for new segment 316 self.segment_is_muted = self.cached_is_muted 317 318 # Update mode 319 old_mode = self.current_mode 320 self.current_mode = new_mode 321 322 # Start new capture based on mode 323 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 324 await self.initialize_screencast() 325 else: 326 self._start_segment() 327 328 logger.info(f"Mode transition: {old_mode} -> {new_mode}") 329 330 async def initialize_screencast(self) -> bool: 331 """Start a new screencast recording. 332 333 Creates a segment directory and starts GStreamer recording to it. 334 """ 335 segment_dir = self._start_segment() 336 337 try: 338 streams = await self.screencaster.start( 339 str(segment_dir), framerate=1, draw_cursor=True 340 ) 341 except RuntimeError as e: 342 logger.error(f"Failed to start screencast: {e}") 343 raise 344 345 if not streams: 346 logger.error("No streams returned from screencast start") 347 raise RuntimeError("No streams available") 348 349 self.current_streams = streams 350 351 logger.info(f"Started screencast with {len(streams)} stream(s)") 352 for stream in streams: 353 logger.info(f" {stream.position} ({stream.connector}): {stream.file_path}") 354 355 return True 356 357 def emit_status(self): 358 """Emit observe.status event with current state (fire-and-forget).""" 359 if not self._client: 360 return 361 362 elapsed = int(time.monotonic() - self.start_at_mono) 363 364 # Screencast info 365 if self.current_mode == MODE_SCREENCAST and self.current_streams: 366 streams_info = [ 367 { 368 "position": stream.position, 369 "connector": stream.connector, 370 "file": stream.file_path, 371 } 372 for stream in self.current_streams 373 ] 374 screencast_info = { 375 "recording": True, 376 "streams": streams_info, 377 "window_elapsed_seconds": elapsed, 378 } 379 else: 380 screencast_info = {"recording": False} 381 382 # Audio info 383 audio_info = { 384 "threshold_hits": self.threshold_hits, 385 "will_save": self.threshold_hits >= MIN_HITS_FOR_SAVE, 386 } 387 388 # Activity info 389 activity_info = { 390 "active": self.cached_is_active, 391 "idle_time_ms": self.cached_idle_time_ms, 392 "screen_locked": self.cached_screen_locked, 393 "sink_muted": self.cached_is_muted, 394 "power_save": self.cached_power_save, 395 } 396 397 self._client.relay_event( 398 "observe", 399 "status", 400 mode=self.current_mode, 401 screencast=screencast_info, 402 audio=audio_info, 403 activity=activity_info, 404 host=HOST, 405 platform=PLATFORM, 406 stream=self.stream, 407 ) 408 409 async def main_loop(self): 410 """Run the main observer loop with background sync.""" 411 logger.info(f"Starting observer loop (interval={self.interval}s)") 412 413 # Start sync service as background task 414 sync_task = None 415 if self._sync: 416 sync_task = asyncio.create_task(self._sync.run()) 417 418 # Determine initial mode 419 new_mode = await self.check_activity_status() 420 self.segment_is_muted = self.cached_is_muted 421 self.current_mode = new_mode 422 423 # Start initial capture based on mode 424 if new_mode == MODE_SCREENCAST and not self.cached_screen_locked: 425 try: 426 await self.initialize_screencast() 427 except RuntimeError: 428 self.running = False 429 if sync_task: 430 if self._sync: 431 self._sync.stop() 432 sync_task.cancel() 433 try: 434 await sync_task 435 except asyncio.CancelledError: 436 pass 437 return 438 else: 439 self._start_segment() 440 441 logger.info(f"Initial mode: {self.current_mode}") 442 443 try: 444 while self.running: 445 await asyncio.sleep(CHUNK_DURATION) 446 447 # Check activity status and determine new mode 448 new_mode = await self.check_activity_status() 449 450 # Check for GStreamer failure mid-recording 451 if ( 452 self.current_mode == MODE_SCREENCAST 453 and not self.screencaster.is_healthy() 454 ): 455 logger.warning("Screencast recording failed, stopping gracefully") 456 await self.screencaster.stop() 457 self.current_streams = [] 458 self.current_mode = MODE_IDLE 459 460 # Detect mode change 461 mode_changed = new_mode != self.current_mode 462 if mode_changed: 463 logger.info(f"Mode changing: {self.current_mode} -> {new_mode}") 464 465 # Only trigger segment boundary on screencast transitions 466 screencast_transition = mode_changed and ( 467 self.current_mode == MODE_SCREENCAST or new_mode == MODE_SCREENCAST 468 ) 469 470 # Detect mute state transition 471 mute_transition = self.cached_is_muted != self.segment_is_muted 472 if mute_transition: 473 logger.info( 474 f"Mute state changed: " 475 f"{'muted' if self.segment_is_muted else 'unmuted'} -> " 476 f"{'muted' if self.cached_is_muted else 'unmuted'}" 477 ) 478 479 # Capture audio buffer for this chunk 480 audio_chunk = self.audio_recorder.get_buffers() 481 482 if audio_chunk.size > 0: 483 self.accumulated_audio_buffer = np.vstack( 484 (self.accumulated_audio_buffer, audio_chunk) 485 ) 486 rms = self.compute_rms(audio_chunk) 487 if rms > RMS_THRESHOLD: 488 self.threshold_hits += 1 489 logger.debug( 490 f"RMS {rms:.4f} > threshold (hit {self.threshold_hits})" 491 ) 492 else: 493 logger.debug(f"RMS {rms:.4f} below threshold") 494 else: 495 logger.debug("No audio data in chunk") 496 497 # Check for window boundary (monotonic to avoid DST/clock jumps) 498 elapsed = time.monotonic() - self.start_at_mono 499 is_boundary = ( 500 (elapsed >= self.interval) or screencast_transition or mute_transition 501 ) 502 503 if is_boundary: 504 logger.info( 505 f"Boundary: elapsed={elapsed:.1f}s screencast_change={screencast_transition} " 506 f"mute_change={mute_transition} " 507 f"hits={self.threshold_hits}/{MIN_HITS_FOR_SAVE}" 508 ) 509 await self.handle_boundary(new_mode) 510 511 # Emit status event 512 self.emit_status() 513 finally: 514 # Cleanup on exit 515 logger.info("Observer loop stopped, cleaning up...") 516 await self.shutdown() 517 if sync_task: 518 if self._sync: 519 self._sync.stop() 520 sync_task.cancel() 521 try: 522 await sync_task 523 except asyncio.CancelledError: 524 pass 525 526 async def shutdown(self): 527 """Clean shutdown of observer.""" 528 # Stop screencast first (closes file handles) 529 if self.current_mode == MODE_SCREENCAST: 530 logger.info("Stopping screencast for shutdown") 531 await self.screencaster.stop() 532 await asyncio.sleep(0.5) 533 534 # Save final audio if threshold met 535 if self.threshold_hits >= MIN_HITS_FOR_SAVE and self.segment_dir: 536 audio_files = self._save_audio_segment( 537 self.segment_dir, self.segment_is_muted 538 ) 539 if audio_files: 540 logger.info(f"Saved final audio: {len(audio_files)} file(s)") 541 542 # Finalize segment locally 543 segment_key = self._finalize_segment() 544 self.segment_dir = None 545 546 if segment_key: 547 logger.info(f"Finalized segment locally: {segment_key} (shutdown)") 548 549 # Stop audio recorder 550 self.audio_recorder.stop_recording() 551 logger.info("Audio recording stopped") 552 553 if self._client: 554 self._client.stop() 555 self._client = None 556 logger.info("Client stopped") 557 558 559async def async_run(config: Config) -> int: 560 """Async entry point for the observer.""" 561 from .session_env import check_session_ready 562 563 # Pre-flight: check session prerequisites 564 not_ready = check_session_ready() 565 if not_ready: 566 logger.warning("Session not ready: %s", not_ready) 567 return 75 # EXIT_TEMPFAIL 568 569 observer = Observer(config) 570 571 loop = asyncio.get_running_loop() 572 573 def signal_handler(): 574 logger.info("Received shutdown signal") 575 observer.running = False 576 577 for sig in (signal.SIGINT, signal.SIGTERM): 578 loop.add_signal_handler(sig, signal_handler) 579 580 if not await observer.setup(): 581 logger.error("Observer setup failed") 582 return 1 583 584 try: 585 await observer.main_loop() 586 except RuntimeError as e: 587 logger.error(f"Observer runtime error: {e}") 588 return 1 589 except Exception as e: 590 logger.error(f"Observer error: {e}", exc_info=True) 591 return 1 592 593 return 0