linux observer
at main 175 lines 5.7 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4"""Crash recovery for orphaned .incomplete segment directories. 5 6Modeled on solstone-macos's IncompleteSegmentRecovery.swift. 7Runs on startup before the capture loop begins. 8 9Improvement over tmux baseline: reads .metadata JSON file for accurate 10start timestamp instead of relying on brittle filesystem timestamps. 11""" 12 13from __future__ import annotations 14 15import json 16import logging 17import os 18import time 19from pathlib import Path 20 21logger = logging.getLogger(__name__) 22 23# Segments newer than this are assumed to be actively recording 24MINIMUM_AGE_SECONDS = 120 # 2 minutes 25 26METADATA_FILENAME = ".metadata" 27 28 29def write_segment_metadata(segment_dir: Path, start_timestamp: float) -> None: 30 """Write metadata file inside a segment directory. 31 32 Called when creating a new .incomplete segment so recovery can 33 use the actual start timestamp instead of filesystem timestamps. 34 """ 35 meta_path = segment_dir / METADATA_FILENAME 36 try: 37 data = {"start_timestamp": start_timestamp} 38 with open(meta_path, "w", encoding="utf-8") as f: 39 json.dump(data, f) 40 f.write("\n") 41 except OSError as e: 42 logger.warning(f"Failed to write segment metadata: {e}") 43 44 45def _read_segment_metadata(segment_dir: Path) -> dict | None: 46 """Read metadata file from a segment directory.""" 47 meta_path = segment_dir / METADATA_FILENAME 48 if not meta_path.exists(): 49 return None 50 try: 51 with open(meta_path, encoding="utf-8") as f: 52 return json.load(f) 53 except (json.JSONDecodeError, OSError): 54 return None 55 56 57def recover_incomplete_segments(captures_dir: Path) -> int: 58 """Scan captures dir for orphaned .incomplete directories and finalize them. 59 60 For each .incomplete directory older than 2 minutes: 61 - Read .metadata for start timestamp if available, else fall back to 62 filesystem timestamps (mtime - ctime) 63 - Rename to HHMMSS_DDD/ format 64 - If recovery fails, rename to HHMMSS.failed/ to prevent infinite retry 65 66 Returns the number of successfully recovered segments. 67 """ 68 if not captures_dir.exists(): 69 return 0 70 71 recovered = 0 72 now = time.time() 73 74 for day_dir in sorted(captures_dir.iterdir()): 75 if not day_dir.is_dir(): 76 continue 77 78 for stream_dir in sorted(day_dir.iterdir()): 79 if not stream_dir.is_dir(): 80 continue 81 82 for segment_dir in sorted(stream_dir.iterdir()): 83 if not segment_dir.is_dir(): 84 continue 85 86 dir_name = segment_dir.name 87 if not dir_name.endswith(".incomplete"): 88 continue 89 90 # Check age 91 try: 92 dir_stat = segment_dir.stat() 93 age = now - dir_stat.st_mtime 94 if age < MINIMUM_AGE_SECONDS: 95 logger.debug(f"Skipping recent incomplete: {dir_name}") 96 continue 97 except OSError: 98 continue 99 100 logger.info(f"Recovering incomplete segment: {dir_name}") 101 if _recover_segment(segment_dir): 102 recovered += 1 103 104 if recovered: 105 logger.info(f"Recovered {recovered} incomplete segment(s)") 106 return recovered 107 108 109def _recover_segment(segment_dir: Path) -> bool: 110 """Recover a single incomplete segment directory. 111 112 Returns True on success. 113 """ 114 dir_name = segment_dir.name 115 time_prefix = dir_name.removesuffix(".incomplete") 116 117 # Try .metadata first for accurate duration 118 metadata = _read_segment_metadata(segment_dir) 119 if metadata and "start_timestamp" in metadata: 120 start_ts = metadata["start_timestamp"] 121 duration = max(1, int(time.time() - start_ts)) 122 else: 123 # Fall back to filesystem timestamps 124 try: 125 st = segment_dir.stat() 126 duration = max(1, int(st.st_mtime - st.st_ctime)) 127 except OSError: 128 return _mark_failed(segment_dir) 129 130 # Check there are actual files inside (ignore .metadata) 131 try: 132 contents = [f for f in segment_dir.iterdir() if f.name != METADATA_FILENAME] 133 if not contents: 134 logger.warning(f"Empty incomplete segment: {dir_name}") 135 return _mark_failed(segment_dir) 136 except OSError: 137 return _mark_failed(segment_dir) 138 139 # Build final segment key with duration 140 segment_key = f"{time_prefix}_{duration}" 141 final_dir = segment_dir.parent / segment_key 142 143 # Remove .metadata before finalizing (not a capture artifact) 144 meta_path = segment_dir / METADATA_FILENAME 145 if meta_path.exists(): 146 try: 147 meta_path.unlink() 148 except OSError: 149 pass 150 151 try: 152 os.rename(str(segment_dir), str(final_dir)) 153 logger.info(f"Recovered: {dir_name} -> {segment_key}") 154 return True 155 except OSError as e: 156 logger.warning(f"Failed to rename {dir_name}: {e}") 157 return _mark_failed(segment_dir) 158 159 160def _mark_failed(segment_dir: Path) -> bool: 161 """Rename from .incomplete to .failed to prevent infinite retry.""" 162 dir_name = segment_dir.name 163 if not dir_name.endswith(".incomplete"): 164 return False 165 166 failed_name = dir_name.removesuffix(".incomplete") + ".failed" 167 failed_dir = segment_dir.parent / failed_name 168 169 try: 170 os.rename(str(segment_dir), str(failed_dir)) 171 logger.warning(f"Marked as failed: {dir_name} -> {failed_name}") 172 except OSError as e: 173 logger.error(f"Failed to mark as failed: {e}") 174 175 return False