linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""
5Portal-based multi-monitor screencast recording.
6
7Uses xdg-desktop-portal ScreenCast API with PipeWire + GStreamer to record
8each monitor as a separate file. This replaces the old GNOME Shell D-Bus approach.
9
10Extracted from solstone's observe/linux/screencast.py.
11
12Changes from monorepo version:
13- Replaces `from think.utils import get_journal` with config-based restore token path
14- Replaces `from observe.gnome.activity import get_monitor_geometries` with local activity module
15
16Runtime deps:
17 - xdg-desktop-portal with org.freedesktop.portal.ScreenCast
18 - Portal backend: xdg-desktop-portal-gnome (or -kde, -wlr, etc.)
19 - PipeWire running
20 - GStreamer with PipeWire plugin: gst-launch-1.0 pipewiresrc
21"""
22
23import asyncio
24import logging
25import os
26import signal
27import subprocess
28import uuid
29from dataclasses import dataclass
30from pathlib import Path
31
32from dbus_next import Variant, introspection
33from dbus_next.aio import MessageBus
34from dbus_next.constants import BusType
35
36# Workaround for dbus-next issue #122: portal has properties with hyphens
37# (e.g., "power-saver-enabled") which violate strict D-Bus naming validation.
38introspection.assert_member_name_valid = lambda name: None
39
40logger = logging.getLogger(__name__)
41
42# Portal D-Bus constants
43PORTAL_BUS = "org.freedesktop.portal.Desktop"
44PORTAL_PATH = "/org/freedesktop/portal/desktop"
45SC_IFACE = "org.freedesktop.portal.ScreenCast"
46REQ_IFACE = "org.freedesktop.portal.Request"
47SESSION_IFACE = "org.freedesktop.portal.Session"
48
49
50@dataclass
51class StreamInfo:
52 """Information about a single monitor's recording stream."""
53
54 node_id: int
55 position: str
56 connector: str
57 x: int
58 y: int
59 width: int
60 height: int
61 file_path: str # Final path in segment directory
62
63 @property
64 def filename(self) -> str:
65 """Return just the filename for event payloads."""
66 return os.path.basename(self.file_path)
67
68
69def _load_restore_token(token_path: Path) -> str | None:
70 """Load restore token from disk."""
71 try:
72 data = token_path.read_text(encoding="utf-8").strip()
73 return data or None
74 except (FileNotFoundError, OSError):
75 return None
76
77
78def _save_restore_token(token: str, token_path: Path) -> None:
79 """Save restore token to disk."""
80 try:
81 token_path.parent.mkdir(parents=True, exist_ok=True)
82 token_path.write_text(token.strip() + "\n", encoding="utf-8")
83 logger.debug(f"Saved restore token to {token_path}")
84 except OSError as e:
85 logger.warning(f"Failed to save restore token: {e}")
86
87
88def _make_request_handle(bus: MessageBus, token: str) -> str:
89 """Compute expected Request object path for a handle_token."""
90 sender = bus.unique_name.lstrip(":").replace(".", "_")
91 return f"/org/freedesktop/portal/desktop/request/{sender}/{token}"
92
93
94def _prepare_request_handler(bus: MessageBus, handle: str) -> asyncio.Future:
95 """Set up signal handler for Request::Response before calling portal method."""
96 loop = asyncio.get_running_loop()
97 fut: asyncio.Future = loop.create_future()
98
99 def _message_handler(msg):
100 if (
101 msg.message_type.name == "SIGNAL"
102 and msg.path == handle
103 and msg.interface == REQ_IFACE
104 and msg.member == "Response"
105 ):
106 response = msg.body[0]
107 results = msg.body[1] if len(msg.body) > 1 else {}
108 if not fut.done():
109 fut.set_result((int(response), results))
110 bus.remove_message_handler(_message_handler)
111
112 bus.add_message_handler(_message_handler)
113 return fut
114
115
116def _variant_or_value(val):
117 """Extract value from Variant if needed."""
118 if isinstance(val, Variant):
119 return val.value
120 return val
121
122
123def _match_streams_to_monitors(streams: list[dict], monitors: list[dict]) -> list[dict]:
124 """
125 Match portal stream geometries to GDK monitor info.
126
127 Portal streams have position (x, y) and size (width, height).
128 GDK monitors have connector IDs and box coordinates.
129
130 Returns streams augmented with connector and position labels.
131 """
132 matched = []
133
134 for stream in streams:
135 props = stream.get("props", {})
136
137 # Extract stream geometry from portal properties
138 stream_pos = _variant_or_value(props.get("position", (0, 0)))
139 stream_size = _variant_or_value(props.get("size", (0, 0)))
140
141 if isinstance(stream_pos, (tuple, list)) and len(stream_pos) >= 2:
142 sx, sy = int(stream_pos[0]), int(stream_pos[1])
143 else:
144 sx, sy = 0, 0
145
146 if isinstance(stream_size, (tuple, list)) and len(stream_size) >= 2:
147 sw, sh = int(stream_size[0]), int(stream_size[1])
148 else:
149 sw, sh = 0, 0
150
151 # Find matching monitor by geometry
152 best_match = None
153 best_overlap = 0
154
155 for monitor in monitors:
156 mx1, my1, mx2, my2 = monitor["box"]
157 mw, mh = mx2 - mx1, my2 - my1
158
159 # Check if geometries match (within tolerance for scaling)
160 if abs(sx - mx1) < 10 and abs(sy - my1) < 10:
161 overlap = min(sw, mw) * min(sh, mh)
162 if overlap > best_overlap:
163 best_overlap = overlap
164 best_match = monitor
165
166 if best_match:
167 stream["connector"] = best_match["id"]
168 stream["position_label"] = best_match.get("position", "unknown")
169 stream["x"] = best_match["box"][0]
170 stream["y"] = best_match["box"][1]
171 stream["width"] = best_match["box"][2] - best_match["box"][0]
172 stream["height"] = best_match["box"][3] - best_match["box"][1]
173 else:
174 # Fallback: use stream index as identifier
175 stream["connector"] = f"monitor-{stream['idx']}"
176 stream["position_label"] = "unknown"
177 stream["x"] = sx
178 stream["y"] = sy
179 stream["width"] = sw
180 stream["height"] = sh
181
182 matched.append(stream)
183
184 return matched
185
186
187class Screencaster:
188 """Portal-based multi-monitor screencast manager."""
189
190 def __init__(self, restore_token_path: Path):
191 self.bus: MessageBus | None = None
192 self.session_handle: str | None = None
193 self.pw_fd: int | None = None
194 self.gst_process: subprocess.Popen | None = None
195 self.streams: list[StreamInfo] = []
196 self._started = False
197 self._restore_token_path = restore_token_path
198
199 async def connect(self) -> bool:
200 """
201 Establish D-Bus connection and verify portal availability.
202
203 Returns:
204 True if portal is available, False otherwise.
205 """
206 if self.bus is not None:
207 return True
208
209 try:
210 self.bus = await MessageBus(
211 bus_type=BusType.SESSION,
212 negotiate_unix_fd=True,
213 ).connect()
214
215 # Verify portal interface exists
216 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH)
217 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro)
218 root_obj.get_interface(SC_IFACE)
219 return True
220
221 except Exception as e:
222 logger.error(f"Portal not available: {e}")
223 self.bus = None
224 return False
225
226 async def start(
227 self,
228 output_dir: str,
229 framerate: int = 1,
230 draw_cursor: bool = True,
231 ) -> list[StreamInfo]:
232 """
233 Start screencast recording for all monitors.
234
235 Files are written directly to output_dir with final names (position_connector_screen.webm).
236 The output_dir is typically a segment directory that will be renamed on completion.
237
238 Args:
239 output_dir: Directory for output files (e.g., YYYYMMDD/stream/HHMMSS.incomplete/)
240 framerate: Frames per second (default: 1)
241 draw_cursor: Whether to draw mouse cursor (default: True)
242
243 Returns:
244 List of StreamInfo for each monitor being recorded.
245
246 Raises:
247 RuntimeError: If recording fails to start.
248 """
249 if not await self.connect():
250 raise RuntimeError("Portal not available")
251
252 # Get monitor info from GDK for connector IDs
253 from .activity import get_monitor_geometries
254 try:
255 monitors = get_monitor_geometries()
256 except Exception as e:
257 logger.warning(f"Failed to get monitor geometries: {e}")
258 monitors = []
259
260 # Get portal interface
261 root_intro = await self.bus.introspect(PORTAL_BUS, PORTAL_PATH)
262 root_obj = self.bus.get_proxy_object(PORTAL_BUS, PORTAL_PATH, root_intro)
263 screencast = root_obj.get_interface(SC_IFACE)
264
265 # 1) CreateSession
266 create_token = "h_" + uuid.uuid4().hex
267 create_handle = _make_request_handle(self.bus, create_token)
268 create_fut = _prepare_request_handler(self.bus, create_handle)
269
270 create_opts = {
271 "handle_token": Variant("s", create_token),
272 "session_handle_token": Variant("s", "s_" + uuid.uuid4().hex),
273 }
274
275 await screencast.call_create_session(create_opts)
276 resp, results = await create_fut
277 if resp != 0:
278 raise RuntimeError(f"CreateSession failed with code {resp}")
279
280 self.session_handle = str(_variant_or_value(results.get("session_handle")))
281 if not self.session_handle:
282 raise RuntimeError("CreateSession returned no session_handle")
283
284 logger.debug(f"Portal session: {self.session_handle}")
285
286 # 2) SelectSources
287 restore_token = _load_restore_token(self._restore_token_path)
288 if restore_token:
289 logger.debug("Using saved restore token")
290
291 cursor_mode = 1 if draw_cursor else 0
292
293 select_token = "h_" + uuid.uuid4().hex
294 select_handle = _make_request_handle(self.bus, select_token)
295 select_fut = _prepare_request_handler(self.bus, select_handle)
296
297 select_opts = {
298 "handle_token": Variant("s", select_token),
299 "types": Variant("u", 1), # 1 = MONITOR
300 "multiple": Variant("b", True),
301 "cursor_mode": Variant("u", cursor_mode),
302 "persist_mode": Variant("u", 2), # Persist until revoked
303 }
304 if restore_token:
305 select_opts["restore_token"] = Variant("s", restore_token)
306
307 await screencast.call_select_sources(self.session_handle, select_opts)
308 resp, _ = await select_fut
309 if resp != 0:
310 await self._close_session()
311 raise RuntimeError(f"SelectSources failed with code {resp}")
312
313 # 3) Start
314 start_token = "h_" + uuid.uuid4().hex
315 start_handle = _make_request_handle(self.bus, start_token)
316 start_fut = _prepare_request_handler(self.bus, start_handle)
317
318 start_opts = {"handle_token": Variant("s", start_token)}
319 await screencast.call_start(self.session_handle, "", start_opts)
320 resp, results = await start_fut
321 if resp != 0:
322 await self._close_session()
323 raise RuntimeError(f"Start failed with code {resp}")
324
325 portal_streams = _variant_or_value(results.get("streams")) or []
326 if not portal_streams:
327 await self._close_session()
328 raise RuntimeError("Start returned no streams")
329
330 # Save new restore token if provided
331 new_token = _variant_or_value(results.get("restore_token"))
332 if isinstance(new_token, str) and new_token.strip():
333 _save_restore_token(new_token, self._restore_token_path)
334
335 # Parse streams
336 stream_info = []
337 for idx, stream in enumerate(portal_streams):
338 try:
339 node_id = int(stream[0])
340 props = stream[1] if len(stream) > 1 else {}
341 stream_info.append({"idx": idx, "node_id": node_id, "props": props})
342 except Exception as e:
343 logger.warning(f"Could not parse stream {idx}: {e}")
344
345 if not stream_info:
346 await self._close_session()
347 raise RuntimeError("No valid streams found")
348
349 # Match streams to monitors
350 stream_info = _match_streams_to_monitors(stream_info, monitors)
351
352 logger.info(f"Portal returned {len(stream_info)} stream(s)")
353
354 # 4) OpenPipeWireRemote
355 fd_obj = await screencast.call_open_pipe_wire_remote(self.session_handle, {})
356 if hasattr(fd_obj, "take"):
357 self.pw_fd = fd_obj.take()
358 else:
359 self.pw_fd = int(fd_obj)
360
361 # 5) Build GStreamer pipeline
362 self.streams = []
363 pipeline_parts = []
364
365 for info in stream_info:
366 node_id = info["node_id"]
367 position = info["position_label"]
368 connector = info["connector"]
369
370 # Final file path: position_connector_screen.webm
371 file_path = os.path.join(output_dir, f"{position}_{connector}_screen.webm")
372
373 stream_obj = StreamInfo(
374 node_id=node_id,
375 position=position,
376 connector=connector,
377 x=info["x"],
378 y=info["y"],
379 width=info["width"],
380 height=info["height"],
381 file_path=file_path,
382 )
383 self.streams.append(stream_obj)
384
385 # GStreamer branch for this stream
386 branch = (
387 f"pipewiresrc fd={self.pw_fd} path={node_id} ! "
388 f"videorate ! video/x-raw,framerate={framerate}/1 ! "
389 f"videoconvert ! vp8enc end-usage=cq cq-level=4 max-quantizer=15 "
390 f"keyframe-max-dist=30 static-threshold=100 ! webmmux ! "
391 f"filesink location={file_path}"
392 )
393 pipeline_parts.append(branch)
394
395 logger.info(f" Stream {node_id}: {position} ({connector}) -> {file_path}")
396
397 pipeline_str = " ".join(pipeline_parts)
398 cmd = ["gst-launch-1.0", "-e"] + pipeline_str.split()
399
400 try:
401 self.gst_process = subprocess.Popen(
402 cmd,
403 pass_fds=(self.pw_fd,),
404 stdout=subprocess.DEVNULL,
405 stderr=subprocess.PIPE,
406 )
407 except FileNotFoundError:
408 await self._close_session()
409 raise RuntimeError("gst-launch-1.0 not found")
410 except Exception as e:
411 await self._close_session()
412 raise RuntimeError(f"Failed to start GStreamer: {e}")
413
414 # Brief delay to check for immediate failure
415 await asyncio.sleep(0.2)
416 if self.gst_process.poll() is not None:
417 stderr = (
418 self.gst_process.stderr.read().decode()
419 if self.gst_process.stderr
420 else ""
421 )
422 await self._close_session()
423 raise RuntimeError(f"GStreamer exited immediately: {stderr[:200]}")
424
425 self._started = True
426 return self.streams
427
428 async def stop(self) -> list[StreamInfo]:
429 """
430 Stop screencast recording gracefully.
431
432 Returns:
433 List of StreamInfo with file_path for the recorded files.
434 """
435 streams = self.streams.copy()
436
437 # Stop GStreamer with SIGINT for clean EOS
438 if self.gst_process and self.gst_process.poll() is None:
439 try:
440 self.gst_process.send_signal(signal.SIGINT)
441 try:
442 await asyncio.wait_for(
443 asyncio.to_thread(self.gst_process.wait),
444 timeout=5.0,
445 )
446 except asyncio.TimeoutError:
447 logger.warning("GStreamer did not exit cleanly, killing")
448 self.gst_process.kill()
449 self.gst_process.wait()
450 except Exception as e:
451 logger.warning(f"Error stopping GStreamer: {e}")
452
453 self.gst_process = None
454
455 # Close PipeWire fd
456 if self.pw_fd is not None:
457 try:
458 os.close(self.pw_fd)
459 except OSError:
460 pass
461 self.pw_fd = None
462
463 # Close portal session
464 await self._close_session()
465
466 self.streams = []
467 self._started = False
468
469 return streams
470
471 async def _close_session(self):
472 """Close the portal session."""
473 if self.session_handle and self.bus:
474 try:
475 session_intro = await self.bus.introspect(
476 PORTAL_BUS, self.session_handle
477 )
478 session_obj = self.bus.get_proxy_object(
479 PORTAL_BUS, self.session_handle, session_intro
480 )
481 session_iface = session_obj.get_interface(SESSION_IFACE)
482 await session_iface.call_close()
483 except Exception:
484 pass
485 self.session_handle = None
486
487 def is_healthy(self) -> bool:
488 """Check if recording is still running."""
489 if not self._started:
490 return False
491 if self.gst_process is None:
492 return False
493 return self.gst_process.poll() is None