linux observer
at main 493 lines 17 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4""" 5Portal-based multi-monitor screencast recording. 6 7Uses xdg-desktop-portal ScreenCast API with PipeWire + GStreamer to record 8each monitor as a separate file. This replaces the old GNOME Shell D-Bus approach. 9 10Extracted from solstone's observe/linux/screencast.py. 11 12Changes from monorepo version: 13- Replaces `from think.utils import get_journal` with config-based restore token path 14- Replaces `from observe.gnome.activity import get_monitor_geometries` with local activity module 15 16Runtime deps: 17 - xdg-desktop-portal with org.freedesktop.portal.ScreenCast 18 - Portal backend: xdg-desktop-portal-gnome (or -kde, -wlr, etc.) 19 - PipeWire running 20 - GStreamer with PipeWire plugin: gst-launch-1.0 pipewiresrc 21""" 22 23import asyncio 24import logging 25import os 26import signal 27import subprocess 28import uuid 29from dataclasses import dataclass 30from pathlib import Path 31 32from dbus_next import Variant, introspection 33from dbus_next.aio import MessageBus 34from dbus_next.constants import BusType 35 36# Workaround for dbus-next issue #122: portal has properties with hyphens 37# (e.g., "power-saver-enabled") which violate strict D-Bus naming validation. 38introspection.assert_member_name_valid = lambda name: None 39 40logger = logging.getLogger(__name__) 41 42# Portal D-Bus constants 43PORTAL_BUS = "org.freedesktop.portal.Desktop" 44PORTAL_PATH = "/org/freedesktop/portal/desktop" 45SC_IFACE = "org.freedesktop.portal.ScreenCast" 46REQ_IFACE = "org.freedesktop.portal.Request" 47SESSION_IFACE = "org.freedesktop.portal.Session" 48 49 50@dataclass 51class StreamInfo: 52 """Information about a single monitor's recording stream.""" 53 54 node_id: int 55 position: str 56 connector: str 57 x: int 58 y: int 59 width: int 60 height: int 61 file_path: str # Final path in segment directory 62 63 @property 64 def filename(self) -> str: 65 """Return just the filename for event payloads.""" 66 return os.path.basename(self.file_path) 67 68 69def _load_restore_token(token_path: Path) -> str | None: 70 """Load restore token from disk.""" 71 try: 72 data = token_path.read_text(encoding="utf-8").strip() 73 return data or None 74 except (FileNotFoundError, OSError): 75 return None 76 77 78def _save_restore_token(token: str, token_path: Path) -> None: 79 """Save restore token to disk.""" 80 try: 81 token_path.parent.mkdir(parents=True, exist_ok=True) 82 token_path.write_text(token.strip() + "\n", encoding="utf-8") 83 logger.debug(f"Saved restore token to {token_path}") 84 except OSError as e: 85 logger.warning(f"Failed to save restore token: {e}") 86 87 88def _make_request_handle(bus: MessageBus, token: str) -> str: 89 """Compute expected Request object path for a handle_token.""" 90 sender = bus.unique_name.lstrip(":").replace(".", "_") 91 return f"/org/freedesktop/portal/desktop/request/{sender}/{token}" 92 93 94def _prepare_request_handler(bus: MessageBus, handle: str) -> asyncio.Future: 95 """Set up signal handler for Request::Response before calling portal method.""" 96 loop = asyncio.get_running_loop() 97 fut: asyncio.Future = loop.create_future() 98 99 def _message_handler(msg): 100 if ( 101 msg.message_type.name == "SIGNAL" 102 and msg.path == handle 103 and msg.interface == REQ_IFACE 104 and msg.member == "Response" 105 ): 106 response = msg.body[0] 107 results = msg.body[1] if len(msg.body) > 1 else {} 108 if not fut.done(): 109 fut.set_result((int(response), results)) 110 bus.remove_message_handler(_message_handler) 111 112 bus.add_message_handler(_message_handler) 113 return fut 114 115 116def _variant_or_value(val): 117 """Extract value from Variant if needed.""" 118 if isinstance(val, Variant): 119 return val.value 120 return val 121 122 123def _match_streams_to_monitors(streams: list[dict], monitors: list[dict]) -> list[dict]: 124 """ 125 Match portal stream geometries to GDK monitor info. 126 127 Portal streams have position (x, y) and size (width, height). 128 GDK monitors have connector IDs and box coordinates. 129 130 Returns streams augmented with connector and position labels. 131 """ 132 matched = [] 133 134 for stream in streams: 135 props = stream.get("props", {}) 136 137 # Extract stream geometry from portal properties 138 stream_pos = _variant_or_value(props.get("position", (0, 0))) 139 stream_size = _variant_or_value(props.get("size", (0, 0))) 140 141 if isinstance(stream_pos, (tuple, list)) and len(stream_pos) >= 2: 142 sx, sy = int(stream_pos[0]), int(stream_pos[1]) 143 else: 144 sx, sy = 0, 0 145 146 if isinstance(stream_size, (tuple, list)) and len(stream_size) >= 2: 147 sw, sh = int(stream_size[0]), int(stream_size[1]) 148 else: 149 sw, sh = 0, 0 150 151 # Find matching monitor by geometry 152 best_match = None 153 best_overlap = 0 154 155 for monitor in monitors: 156 mx1, my1, mx2, my2 = monitor["box"] 157 mw, mh = mx2 - mx1, my2 - my1 158 159 # Check if geometries match (within tolerance for scaling) 160 if abs(sx - mx1) < 10 and abs(sy - my1) < 10: 161 overlap = min(sw, mw) * min(sh, mh) 162 if overlap > best_overlap: 163 best_overlap = overlap 164 best_match = monitor 165 166 if best_match: 167 stream["connector"] = best_match["id"] 168 stream["position_label"] = best_match.get("position", "unknown") 169 stream["x"] = best_match["box"][0] 170 stream["y"] = best_match["box"][1] 171 stream["width"] = best_match["box"][2] - best_match["box"][0] 172 stream["height"] = best_match["box"][3] - best_match["box"][1] 173 else: 174 # Fallback: use stream index as identifier 175 stream["connector"] = f"monitor-{stream['idx']}" 176 stream["position_label"] = "unknown" 177 stream["x"] = sx 178 stream["y"] = sy 179 stream["width"] = sw 180 stream["height"] = sh 181 182 matched.append(stream) 183 184 return matched 185 186 187class Screencaster: 188 """Portal-based multi-monitor screencast manager.""" 189 190 def __init__(self, restore_token_path: Path): 191 self.bus: MessageBus | None = None 192 self.session_handle: str | None = None 193 self.pw_fd: int | None = None 194 self.gst_process: subprocess.Popen | None = None 195 self.streams: list[StreamInfo] = [] 196 self._started = False 197 self._restore_token_path = restore_token_path 198 199 async def connect(self) -> bool: 200 """ 201 Establish D-Bus connection and verify portal availability. 202 203 Returns: 204 True if portal is available, False otherwise. 205 """ 206 if self.bus is not None: 207 return True 208 209 try: 210 self.bus = await MessageBus( 211 bus_type=BusType.SESSION, 212 negotiate_unix_fd=True, 213 ).connect() 214 215 # Verify portal interface exists 216 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH) 217 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro) 218 root_obj.get_interface(SC_IFACE) 219 return True 220 221 except Exception as e: 222 logger.error(f"Portal not available: {e}") 223 self.bus = None 224 return False 225 226 async def start( 227 self, 228 output_dir: str, 229 framerate: int = 1, 230 draw_cursor: bool = True, 231 ) -> list[StreamInfo]: 232 """ 233 Start screencast recording for all monitors. 234 235 Files are written directly to output_dir with final names (position_connector_screen.webm). 236 The output_dir is typically a segment directory that will be renamed on completion. 237 238 Args: 239 output_dir: Directory for output files (e.g., YYYYMMDD/stream/HHMMSS.incomplete/) 240 framerate: Frames per second (default: 1) 241 draw_cursor: Whether to draw mouse cursor (default: True) 242 243 Returns: 244 List of StreamInfo for each monitor being recorded. 245 246 Raises: 247 RuntimeError: If recording fails to start. 248 """ 249 if not await self.connect(): 250 raise RuntimeError("Portal not available") 251 252 # Get monitor info from GDK for connector IDs 253 from .activity import get_monitor_geometries 254 try: 255 monitors = get_monitor_geometries() 256 except Exception as e: 257 logger.warning(f"Failed to get monitor geometries: {e}") 258 monitors = [] 259 260 # Get portal interface 261 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH) 262 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro) 263 screencast = root_obj.get_interface(SC_IFACE) 264 265 # 1) CreateSession 266 create_token = "h_" + uuid.uuid4().hex 267 create_handle = _make_request_handle(self.bus, create_token) 268 create_fut = _prepare_request_handler(self.bus, create_handle) 269 270 create_opts = { 271 "handle_token": Variant("s", create_token), 272 "session_handle_token": Variant("s", "s_" + uuid.uuid4().hex), 273 } 274 275 await screencast.call_create_session(create_opts) 276 resp, results = await create_fut 277 if resp != 0: 278 raise RuntimeError(f"CreateSession failed with code {resp}") 279 280 self.session_handle = str(_variant_or_value(results.get("session_handle"))) 281 if not self.session_handle: 282 raise RuntimeError("CreateSession returned no session_handle") 283 284 logger.debug(f"Portal session: {self.session_handle}") 285 286 # 2) SelectSources 287 restore_token = _load_restore_token(self._restore_token_path) 288 if restore_token: 289 logger.debug("Using saved restore token") 290 291 cursor_mode = 1 if draw_cursor else 0 292 293 select_token = "h_" + uuid.uuid4().hex 294 select_handle = _make_request_handle(self.bus, select_token) 295 select_fut = _prepare_request_handler(self.bus, select_handle) 296 297 select_opts = { 298 "handle_token": Variant("s", select_token), 299 "types": Variant("u", 1), # 1 = MONITOR 300 "multiple": Variant("b", True), 301 "cursor_mode": Variant("u", cursor_mode), 302 "persist_mode": Variant("u", 2), # Persist until revoked 303 } 304 if restore_token: 305 select_opts["restore_token"] = Variant("s", restore_token) 306 307 await screencast.call_select_sources(self.session_handle, select_opts) 308 resp, _ = await select_fut 309 if resp != 0: 310 await self._close_session() 311 raise RuntimeError(f"SelectSources failed with code {resp}") 312 313 # 3) Start 314 start_token = "h_" + uuid.uuid4().hex 315 start_handle = _make_request_handle(self.bus, start_token) 316 start_fut = _prepare_request_handler(self.bus, start_handle) 317 318 start_opts = {"handle_token": Variant("s", start_token)} 319 await screencast.call_start(self.session_handle, "", start_opts) 320 resp, results = await start_fut 321 if resp != 0: 322 await self._close_session() 323 raise RuntimeError(f"Start failed with code {resp}") 324 325 portal_streams = _variant_or_value(results.get("streams")) or [] 326 if not portal_streams: 327 await self._close_session() 328 raise RuntimeError("Start returned no streams") 329 330 # Save new restore token if provided 331 new_token = _variant_or_value(results.get("restore_token")) 332 if isinstance(new_token, str) and new_token.strip(): 333 _save_restore_token(new_token, self._restore_token_path) 334 335 # Parse streams 336 stream_info = [] 337 for idx, stream in enumerate(portal_streams): 338 try: 339 node_id = int(stream[0]) 340 props = stream[1] if len(stream) > 1 else {} 341 stream_info.append({"idx": idx, "node_id": node_id, "props": props}) 342 except Exception as e: 343 logger.warning(f"Could not parse stream {idx}: {e}") 344 345 if not stream_info: 346 await self._close_session() 347 raise RuntimeError("No valid streams found") 348 349 # Match streams to monitors 350 stream_info = _match_streams_to_monitors(stream_info, monitors) 351 352 logger.info(f"Portal returned {len(stream_info)} stream(s)") 353 354 # 4) OpenPipeWireRemote 355 fd_obj = await screencast.call_open_pipe_wire_remote(self.session_handle, {}) 356 if hasattr(fd_obj, "take"): 357 self.pw_fd = fd_obj.take() 358 else: 359 self.pw_fd = int(fd_obj) 360 361 # 5) Build GStreamer pipeline 362 self.streams = [] 363 pipeline_parts = [] 364 365 for info in stream_info: 366 node_id = info["node_id"] 367 position = info["position_label"] 368 connector = info["connector"] 369 370 # Final file path: position_connector_screen.webm 371 file_path = os.path.join(output_dir, f"{position}_{connector}_screen.webm") 372 373 stream_obj = StreamInfo( 374 node_id=node_id, 375 position=position, 376 connector=connector, 377 x=info["x"], 378 y=info["y"], 379 width=info["width"], 380 height=info["height"], 381 file_path=file_path, 382 ) 383 self.streams.append(stream_obj) 384 385 # GStreamer branch for this stream 386 branch = ( 387 f"pipewiresrc fd={self.pw_fd} path={node_id} ! " 388 f"videorate ! video/x-raw,framerate={framerate}/1 ! " 389 f"videoconvert ! vp8enc end-usage=cq cq-level=4 max-quantizer=15 " 390 f"keyframe-max-dist=30 static-threshold=100 ! webmmux ! " 391 f"filesink location={file_path}" 392 ) 393 pipeline_parts.append(branch) 394 395 logger.info(f" Stream {node_id}: {position} ({connector}) -> {file_path}") 396 397 pipeline_str = " ".join(pipeline_parts) 398 cmd = ["gst-launch-1.0", "-e"] + pipeline_str.split() 399 400 try: 401 self.gst_process = subprocess.Popen( 402 cmd, 403 pass_fds=(self.pw_fd,), 404 stdout=subprocess.DEVNULL, 405 stderr=subprocess.PIPE, 406 ) 407 except FileNotFoundError: 408 await self._close_session() 409 raise RuntimeError("gst-launch-1.0 not found") 410 except Exception as e: 411 await self._close_session() 412 raise RuntimeError(f"Failed to start GStreamer: {e}") 413 414 # Brief delay to check for immediate failure 415 await asyncio.sleep(0.2) 416 if self.gst_process.poll() is not None: 417 stderr = ( 418 self.gst_process.stderr.read().decode() 419 if self.gst_process.stderr 420 else "" 421 ) 422 await self._close_session() 423 raise RuntimeError(f"GStreamer exited immediately: {stderr[:200]}") 424 425 self._started = True 426 return self.streams 427 428 async def stop(self) -> list[StreamInfo]: 429 """ 430 Stop screencast recording gracefully. 431 432 Returns: 433 List of StreamInfo with file_path for the recorded files. 434 """ 435 streams = self.streams.copy() 436 437 # Stop GStreamer with SIGINT for clean EOS 438 if self.gst_process and self.gst_process.poll() is None: 439 try: 440 self.gst_process.send_signal(signal.SIGINT) 441 try: 442 await asyncio.wait_for( 443 asyncio.to_thread(self.gst_process.wait), 444 timeout=5.0, 445 ) 446 except asyncio.TimeoutError: 447 logger.warning("GStreamer did not exit cleanly, killing") 448 self.gst_process.kill() 449 self.gst_process.wait() 450 except Exception as e: 451 logger.warning(f"Error stopping GStreamer: {e}") 452 453 self.gst_process = None 454 455 # Close PipeWire fd 456 if self.pw_fd is not None: 457 try: 458 os.close(self.pw_fd) 459 except OSError: 460 pass 461 self.pw_fd = None 462 463 # Close portal session 464 await self._close_session() 465 466 self.streams = [] 467 self._started = False 468 469 return streams 470 471 async def _close_session(self): 472 """Close the portal session.""" 473 if self.session_handle and self.bus: 474 try: 475 session_intro = await self.bus.introspect( 476 PORTAL_BUS, self.session_handle 477 ) 478 session_obj = self.bus.get_proxy_object( 479 PORTAL_BUS, self.session_handle, session_intro 480 ) 481 session_iface = session_obj.get_interface(SESSION_IFACE) 482 await session_iface.call_close() 483 except Exception: 484 pass 485 self.session_handle = None 486 487 def is_healthy(self) -> bool: 488 """Check if recording is still running.""" 489 if not self._started: 490 return False 491 if self.gst_process is None: 492 return False 493 return self.gst_process.poll() is None