personal memory agent
at main 488 lines 16 kB view raw
1#!/usr/bin/env python3 2# SPDX-License-Identifier: AGPL-3.0-only 3# Copyright (c) 2026 sol pbc 4 5"""Unified process spawning and lifecycle management utilities. 6 7All subprocess output is automatically logged to: 8 journal/{YYYYMMDD}/health/{ref}_{process_name}.log 9 10Where process_name is derived from cmd[0] basename, and ref is a unique correlation ID. 11 12Symlinks provide stable access paths: 13 journal/{YYYYMMDD}/health/{process_name}.log (day-level symlink) 14 journal/health/{process_name}.log (journal-level symlink) 15 16Logs automatically roll over at midnight for long-running processes. 17""" 18 19from __future__ import annotations 20 21import logging 22import os 23import subprocess 24import threading 25import time 26from dataclasses import dataclass 27from datetime import datetime 28from pathlib import Path 29 30from think.callosum import CallosumConnection 31from think.utils import get_journal, now_ms 32 33logger = logging.getLogger(__name__) 34 35 36def _get_journal_path() -> Path: 37 """Return the journal path (auto-creates if needed).""" 38 return Path(get_journal()) 39 40 41def _current_day() -> str: 42 """Get current day in YYYYMMDD format.""" 43 return datetime.now().strftime("%Y%m%d") 44 45 46def _day_health_log_path(day: str, ref: str, name: str) -> Path: 47 """Build path to day health log. 48 49 Returns: journal/{day}/health/{ref}_{name}.log 50 """ 51 return _get_journal_path() / day / "health" / f"{ref}_{name}.log" 52 53 54def _atomic_symlink(link_path: Path, target: str) -> None: 55 """Create or update symlink atomically. 56 57 Args: 58 link_path: Path where symlink should be created 59 target: Target path (can be relative or absolute) 60 """ 61 link_path.parent.mkdir(parents=True, exist_ok=True) 62 tmp_link = link_path.with_suffix(f".tmp{os.getpid()}_{threading.get_ident()}") 63 try: 64 tmp_link.symlink_to(target) 65 tmp_link.replace(link_path) 66 finally: 67 # Clean up temp file if it still exists 68 if tmp_link.exists() or tmp_link.is_symlink(): 69 tmp_link.unlink(missing_ok=True) 70 71 72def _format_log_line(prefix: str, stream: str, line: str) -> str: 73 """Format log line with ISO timestamp and labels. 74 75 Args: 76 prefix: Process identifier (e.g., "observer" or "describe:file.webm") 77 stream: "stdout" or "stderr" 78 line: Output line from process 79 80 Returns: 81 Formatted line: "2024-11-01T10:30:45 [prefix:stream] line\\n" 82 """ 83 timestamp = datetime.now().isoformat(timespec="seconds") 84 clean_line = line.rstrip("\n") 85 return f"{timestamp} [{prefix}:{stream}] {clean_line}\n" 86 87 88class DailyLogWriter: 89 """Thread-safe log writer that automatically rolls over at midnight. 90 91 When ``day`` is provided, the writer is pinned to that day directory 92 and midnight rollover is disabled (batch processing of historical days). 93 94 Writes to: journal/{YYYYMMDD}/health/{ref}_{name}.log 95 96 Creates and maintains symlinks: 97 - journal/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level) 98 - journal/health/{name}.log -> {YYYYMMDD}/health/{ref}_{name}.log (journal-level) 99 100 When the day changes, automatically closes old file, opens new file, and updates symlinks. 101 """ 102 103 def __init__(self, ref: str, name: str, day: str | None = None): 104 self._ref = ref 105 self._name = name 106 self._pinned = day is not None 107 self._lock = threading.Lock() 108 self._current_day = day or _current_day() 109 self._fh = self._open_log() 110 self._update_symlinks() 111 112 def _open_log(self): 113 """Open log file for current day.""" 114 log_path = _day_health_log_path(self._current_day, self._ref, self._name) 115 log_path.parent.mkdir(parents=True, exist_ok=True) 116 return log_path.open("a", encoding="utf-8") 117 118 def _update_symlinks(self) -> None: 119 """Update day-level and journal-level symlinks to point to current log.""" 120 journal = _get_journal_path() 121 day_health = journal / self._current_day / "health" 122 log_filename = f"{self._ref}_{self._name}.log" 123 124 # Day-level symlink: {YYYYMMDD}/health/{name}.log -> {ref}_{name}.log 125 day_symlink = day_health / f"{self._name}.log" 126 _atomic_symlink(day_symlink, log_filename) 127 128 # Journal-level symlink: health/{name}.log -> ../{YYYYMMDD}/health/{ref}_{name}.log 129 # Relative from journal/health/ to journal/{YYYYMMDD}/health/ 130 journal_symlink = journal / "health" / f"{self._name}.log" 131 relative_target = f"../{self._current_day}/health/{log_filename}" 132 _atomic_symlink(journal_symlink, relative_target) 133 134 def write(self, message: str) -> None: 135 """Write message to log, handling day rollover.""" 136 with self._lock: 137 if not self._pinned: 138 # Check for day change 139 day_now = _current_day() 140 if day_now != self._current_day: 141 # Close old log 142 if not self._fh.closed: 143 self._fh.close() 144 # Open new log for new day 145 self._current_day = day_now 146 self._fh = self._open_log() 147 # Update symlinks to point to new day's file 148 self._update_symlinks() 149 150 # Write and flush 151 self._fh.write(message) 152 self._fh.flush() 153 154 def close(self) -> None: 155 """Close log file.""" 156 with self._lock: 157 if not self._fh.closed: 158 self._fh.close() 159 160 @property 161 def path(self) -> Path: 162 """Get current log file path.""" 163 return _day_health_log_path(self._current_day, self._ref, self._name) 164 165 166@dataclass 167class ManagedProcess: 168 """Subprocess wrapper with automatic output logging and lifecycle management. 169 170 All output is automatically logged to: 171 journal/{YYYYMMDD}/health/{ref}_{name}.log 172 173 Where name is derived from cmd[0] basename, and ref is a unique correlation ID. 174 175 Symlinks are automatically created and maintained: 176 journal/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level) 177 journal/health/{name}.log -> {YYYYMMDD}/health/{ref}_{name}.log (journal-level) 178 179 Logs roll over automatically at midnight for long-running processes. 180 181 Process lifecycle events are broadcast via Callosum logs tract. 182 """ 183 184 process: subprocess.Popen 185 name: str 186 log_writer: DailyLogWriter 187 cmd: list[str] 188 _threads: list[threading.Thread] 189 ref: str 190 _start_time: float 191 _callosum: CallosumConnection | None 192 _owns_callosum: bool = True 193 194 @property 195 def start_time(self) -> float: 196 """Epoch timestamp when this process was spawned.""" 197 return self._start_time 198 199 @classmethod 200 def spawn( 201 cls, 202 cmd: list[str], 203 *, 204 env: dict | None = None, 205 ref: str | None = None, 206 callosum: CallosumConnection | None = None, 207 day: str | None = None, 208 ) -> "ManagedProcess": 209 """Spawn process with automatic output logging to daily health directory. 210 211 Args: 212 cmd: Command and arguments 213 env: Optional environment variables (inherits parent env if not provided) 214 ref: Optional correlation ID (auto-generated if not provided) 215 callosum: Optional shared CallosumConnection (creates new one if not provided) 216 day: Optional day override (YYYYMMDD). When provided, logs are placed 217 in that day's health directory instead of today's. 218 219 Returns: 220 ManagedProcess instance 221 222 Raises: 223 RuntimeError: If process fails to spawn 224 225 Example: 226 managed = ManagedProcess.spawn(["observer", "-v"]) 227 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_observer.log 228 # Symlinks: {YYYYMMDD}/health/observer.log (day-level) 229 # health/observer.log (journal-level) 230 231 # With explicit correlation ID: 232 managed = ManagedProcess.spawn( 233 ["sol", "indexer", "--rescan"], 234 ref="1730476800000", 235 ) 236 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log 237 """ 238 # Derive name from command - use subcommand if invoked via sol 239 if cmd[0] == "sol" and len(cmd) > 1: 240 name = cmd[1] 241 else: 242 name = Path(cmd[0]).name 243 244 # Generate correlation ID (use provided ref, else timestamp) 245 ref = ref if ref else str(now_ms()) 246 start_time = time.time() 247 248 # Use provided callosum or create new one 249 owns_callosum = callosum is None 250 if owns_callosum: 251 callosum = CallosumConnection() 252 callosum.start() 253 254 log_writer = DailyLogWriter(ref, name, day=day) 255 256 logger.info(f"Starting {name}: {' '.join(cmd)}") 257 258 try: 259 proc = subprocess.Popen( 260 cmd, 261 stdout=subprocess.PIPE, 262 stderr=subprocess.PIPE, 263 text=True, 264 bufsize=1, 265 env=env, 266 ) 267 except Exception as exc: 268 log_writer.close() 269 if owns_callosum and callosum: 270 callosum.stop() 271 raise RuntimeError(f"Failed to spawn {name}: {exc}") from exc 272 273 logger.info(f"Started {name} with PID {proc.pid}") 274 275 # Emit exec event 276 if callosum: 277 callosum.emit( 278 "logs", 279 "exec", 280 ref=ref, 281 name=name, 282 pid=proc.pid, 283 cmd=list(cmd), 284 log_path=str(log_writer.path), 285 ) 286 287 # Start output streaming threads 288 def stream_output(pipe, stream_label: str): 289 if pipe is None: 290 return 291 with pipe: 292 for line in pipe: 293 formatted = _format_log_line(name, stream_label, line) 294 log_writer.write(formatted) 295 296 # Emit line event 297 if callosum: 298 callosum.emit( 299 "logs", 300 "line", 301 ref=ref, 302 name=name, 303 pid=proc.pid, 304 stream=stream_label, 305 line=line.rstrip("\n"), 306 ) 307 308 threads = [ 309 threading.Thread( 310 target=stream_output, 311 args=(proc.stdout, "stdout"), 312 daemon=True, 313 ), 314 threading.Thread( 315 target=stream_output, 316 args=(proc.stderr, "stderr"), 317 daemon=True, 318 ), 319 ] 320 for thread in threads: 321 thread.start() 322 323 return cls( 324 process=proc, 325 name=name, 326 log_writer=log_writer, 327 cmd=list(cmd), 328 _threads=threads, 329 ref=ref, 330 _start_time=start_time, 331 _callosum=callosum, 332 _owns_callosum=owns_callosum, 333 ) 334 335 def wait(self, timeout: float | None = None) -> int: 336 """Wait for process completion, return exit code. 337 338 Args: 339 timeout: Optional timeout in seconds 340 341 Returns: 342 Exit code 343 344 Raises: 345 subprocess.TimeoutExpired: If timeout exceeded 346 """ 347 return self.process.wait(timeout=timeout) 348 349 def poll(self) -> int | None: 350 """Check if process has terminated. 351 352 Returns: 353 Exit code if terminated, None if still running 354 """ 355 return self.process.poll() 356 357 def is_running(self) -> bool: 358 """Check if process is still running.""" 359 return self.process.poll() is None 360 361 def terminate(self, timeout: float = 15) -> int: 362 """Gracefully terminate process with automatic escalation. 363 364 This method handles the full termination sequence in ONE CALL: 365 1. Send SIGTERM (graceful shutdown request) 366 2. Wait up to `timeout` seconds for process to exit 367 3. If still alive, send SIGKILL (force kill) 368 4. Wait for final cleanup (max 1 second) 369 5. Return exit code 370 371 Args: 372 timeout: Seconds to wait after SIGTERM before SIGKILL (default: 15) 373 374 Returns: 375 Exit code (may be negative for signals, e.g., -15 for SIGTERM) 376 377 Example: 378 exit_code = managed.terminate(timeout=10) # One call, blocks until dead 379 """ 380 logger.debug(f"Terminating {self.name} (PID {self.pid})...") 381 try: 382 self.process.terminate() 383 exit_code = self.process.wait(timeout=timeout) 384 logger.debug(f"{self.name} terminated gracefully with code {exit_code}") 385 return exit_code 386 except subprocess.TimeoutExpired: 387 logger.warning( 388 f"{self.name} did not terminate after {timeout}s, force killing..." 389 ) 390 self.process.kill() 391 exit_code = self.process.wait(timeout=1) 392 logger.debug(f"{self.name} killed with code {exit_code}") 393 return exit_code 394 395 def cleanup(self) -> None: 396 """Wait for output threads to finish and close log file. 397 398 Call this after process exits to clean up resources. 399 """ 400 for thread in self._threads: 401 thread.join(timeout=1) 402 self.log_writer.close() 403 404 # Emit exit event 405 if self._callosum: 406 duration_ms = int((time.time() - self._start_time) * 1000) 407 self._callosum.emit( 408 "logs", 409 "exit", 410 ref=self.ref, 411 name=self.name, 412 pid=self.pid, 413 exit_code=self.returncode, 414 duration_ms=duration_ms, 415 cmd=self.cmd, 416 log_path=str(self.log_writer.path), 417 ) 418 # Only stop callosum if we created it (not shared) 419 if self._owns_callosum: 420 self._callosum.stop() 421 422 @property 423 def pid(self) -> int: 424 """Process ID.""" 425 return self.process.pid 426 427 @property 428 def returncode(self) -> int | None: 429 """Return code if process has exited, None otherwise.""" 430 return self.process.returncode 431 432 433def run_task( 434 cmd: list[str], 435 *, 436 timeout: float | None = None, 437 env: dict | None = None, 438 ref: str | None = None, 439 callosum: CallosumConnection | None = None, 440 day: str | None = None, 441) -> tuple[bool, int, Path]: 442 """Run a task to completion with automatic logging (blocking). 443 444 Spawns process, waits for completion, cleans up resources. 445 Output is automatically logged to: journal/{YYYYMMDD}/health/{ref}_{name}.log 446 where name is derived from cmd[0] basename. 447 448 Args: 449 cmd: Command and arguments 450 timeout: Optional timeout in seconds 451 env: Optional environment variables 452 ref: Optional correlation ID (auto-generated if not provided) 453 callosum: Optional shared CallosumConnection (creates new one if not provided) 454 day: Optional day override (YYYYMMDD). When provided, logs are placed 455 in that day's health directory instead of today's. 456 457 Returns: 458 (success, exit_code, log_path) tuple where success = (exit_code == 0) 459 and log_path points to the process output log file. 460 461 Example: 462 success, code, log = run_task( 463 ["sol", "generate", "20241101", "-f", "flow"], 464 timeout=300, 465 ) 466 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_generate.log 467 468 # With explicit correlation ID: 469 success, code, log = run_task( 470 ["sol", "indexer", "--rescan"], 471 ref="1730476800000", 472 ) 473 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log 474 """ 475 managed = ManagedProcess.spawn(cmd, env=env, ref=ref, callosum=callosum, day=day) 476 log_path = managed.log_writer.path 477 try: 478 exit_code = managed.wait(timeout=timeout) 479 except subprocess.TimeoutExpired: 480 logger.error(f"{managed.name} timed out after {timeout}s, terminating...") 481 exit_code = managed.terminate() 482 finally: 483 managed.cleanup() 484 485 if exit_code != 0: 486 logger.warning(f"{managed.name} exited with code {exit_code}") 487 488 return (exit_code == 0, exit_code, log_path)