linux observer
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4"""Crash recovery for orphaned .incomplete segment directories.
5
6Modeled on solstone-macos's IncompleteSegmentRecovery.swift.
7Runs on startup before the capture loop begins.
8
9Improvement over tmux baseline: reads .metadata JSON file for accurate
10start timestamp instead of relying on brittle filesystem timestamps.
11"""
12
13from __future__ import annotations
14
15import json
16import logging
17import os
18import time
19from pathlib import Path
20
21logger = logging.getLogger(__name__)
22
23# Segments newer than this are assumed to be actively recording
24MINIMUM_AGE_SECONDS = 120 # 2 minutes
25
26METADATA_FILENAME = ".metadata"
27
28
29def write_segment_metadata(segment_dir: Path, start_timestamp: float) -> None:
30 """Write metadata file inside a segment directory.
31
32 Called when creating a new .incomplete segment so recovery can
33 use the actual start timestamp instead of filesystem timestamps.
34 """
35 meta_path = segment_dir / METADATA_FILENAME
36 try:
37 data = {"start_timestamp": start_timestamp}
38 with open(meta_path, "w", encoding="utf-8") as f:
39 json.dump(data, f)
40 f.write("\n")
41 except OSError as e:
42 logger.warning(f"Failed to write segment metadata: {e}")
43
44
45def _read_segment_metadata(segment_dir: Path) -> dict | None:
46 """Read metadata file from a segment directory."""
47 meta_path = segment_dir / METADATA_FILENAME
48 if not meta_path.exists():
49 return None
50 try:
51 with open(meta_path, encoding="utf-8") as f:
52 return json.load(f)
53 except (json.JSONDecodeError, OSError):
54 return None
55
56
57def recover_incomplete_segments(captures_dir: Path) -> int:
58 """Scan captures dir for orphaned .incomplete directories and finalize them.
59
60 For each .incomplete directory older than 2 minutes:
61 - Read .metadata for start timestamp if available, else fall back to
62 filesystem timestamps (mtime - ctime)
63 - Rename to HHMMSS_DDD/ format
64 - If recovery fails, rename to HHMMSS.failed/ to prevent infinite retry
65
66 Returns the number of successfully recovered segments.
67 """
68 if not captures_dir.exists():
69 return 0
70
71 recovered = 0
72 now = time.time()
73
74 for day_dir in sorted(captures_dir.iterdir()):
75 if not day_dir.is_dir():
76 continue
77
78 for stream_dir in sorted(day_dir.iterdir()):
79 if not stream_dir.is_dir():
80 continue
81
82 for segment_dir in sorted(stream_dir.iterdir()):
83 if not segment_dir.is_dir():
84 continue
85
86 dir_name = segment_dir.name
87 if not dir_name.endswith(".incomplete"):
88 continue
89
90 # Check age
91 try:
92 dir_stat = segment_dir.stat()
93 age = now - dir_stat.st_mtime
94 if age < MINIMUM_AGE_SECONDS:
95 logger.debug(f"Skipping recent incomplete: {dir_name}")
96 continue
97 except OSError:
98 continue
99
100 logger.info(f"Recovering incomplete segment: {dir_name}")
101 if _recover_segment(segment_dir):
102 recovered += 1
103
104 if recovered:
105 logger.info(f"Recovered {recovered} incomplete segment(s)")
106 return recovered
107
108
109def _recover_segment(segment_dir: Path) -> bool:
110 """Recover a single incomplete segment directory.
111
112 Returns True on success.
113 """
114 dir_name = segment_dir.name
115 time_prefix = dir_name.removesuffix(".incomplete")
116
117 # Try .metadata first for accurate duration
118 metadata = _read_segment_metadata(segment_dir)
119 if metadata and "start_timestamp" in metadata:
120 start_ts = metadata["start_timestamp"]
121 duration = max(1, int(time.time() - start_ts))
122 else:
123 # Fall back to filesystem timestamps
124 try:
125 st = segment_dir.stat()
126 duration = max(1, int(st.st_mtime - st.st_ctime))
127 except OSError:
128 return _mark_failed(segment_dir)
129
130 # Check there are actual files inside (ignore .metadata)
131 try:
132 contents = [f for f in segment_dir.iterdir() if f.name != METADATA_FILENAME]
133 if not contents:
134 logger.warning(f"Empty incomplete segment: {dir_name}")
135 return _mark_failed(segment_dir)
136 except OSError:
137 return _mark_failed(segment_dir)
138
139 # Build final segment key with duration
140 segment_key = f"{time_prefix}_{duration}"
141 final_dir = segment_dir.parent / segment_key
142
143 # Remove .metadata before finalizing (not a capture artifact)
144 meta_path = segment_dir / METADATA_FILENAME
145 if meta_path.exists():
146 try:
147 meta_path.unlink()
148 except OSError:
149 pass
150
151 try:
152 os.rename(str(segment_dir), str(final_dir))
153 logger.info(f"Recovered: {dir_name} -> {segment_key}")
154 return True
155 except OSError as e:
156 logger.warning(f"Failed to rename {dir_name}: {e}")
157 return _mark_failed(segment_dir)
158
159
160def _mark_failed(segment_dir: Path) -> bool:
161 """Rename from .incomplete to .failed to prevent infinite retry."""
162 dir_name = segment_dir.name
163 if not dir_name.endswith(".incomplete"):
164 return False
165
166 failed_name = dir_name.removesuffix(".incomplete") + ".failed"
167 failed_dir = segment_dir.parent / failed_name
168
169 try:
170 os.rename(str(segment_dir), str(failed_dir))
171 logger.warning(f"Marked as failed: {dir_name} -> {failed_name}")
172 except OSError as e:
173 logger.error(f"Failed to mark as failed: {e}")
174
175 return False