personal memory agent
1#!/usr/bin/env python3
2# SPDX-License-Identifier: AGPL-3.0-only
3# Copyright (c) 2026 sol pbc
4
5"""Unified process spawning and lifecycle management utilities.
6
7All subprocess output is automatically logged to:
8 journal/{YYYYMMDD}/health/{ref}_{process_name}.log
9
10Where process_name is derived from cmd[0] basename, and ref is a unique correlation ID.
11
12Symlinks provide stable access paths:
13 journal/{YYYYMMDD}/health/{process_name}.log (day-level symlink)
14 journal/health/{process_name}.log (journal-level symlink)
15
16Logs automatically roll over at midnight for long-running processes.
17"""
18
19from __future__ import annotations
20
21import logging
22import os
23import subprocess
24import threading
25import time
26from dataclasses import dataclass
27from datetime import datetime
28from pathlib import Path
29
30from think.callosum import CallosumConnection
31from think.utils import get_journal, now_ms
32
33logger = logging.getLogger(__name__)
34
35
36def _get_journal_path() -> Path:
37 """Return the journal path (auto-creates if needed)."""
38 return Path(get_journal())
39
40
41def _current_day() -> str:
42 """Get current day in YYYYMMDD format."""
43 return datetime.now().strftime("%Y%m%d")
44
45
46def _day_health_log_path(day: str, ref: str, name: str) -> Path:
47 """Build path to day health log.
48
49 Returns: journal/{day}/health/{ref}_{name}.log
50 """
51 return _get_journal_path() / day / "health" / f"{ref}_{name}.log"
52
53
54def _atomic_symlink(link_path: Path, target: str) -> None:
55 """Create or update symlink atomically.
56
57 Args:
58 link_path: Path where symlink should be created
59 target: Target path (can be relative or absolute)
60 """
61 link_path.parent.mkdir(parents=True, exist_ok=True)
62 tmp_link = link_path.with_suffix(f".tmp{os.getpid()}_{threading.get_ident()}")
63 try:
64 tmp_link.symlink_to(target)
65 tmp_link.replace(link_path)
66 finally:
67 # Clean up temp file if it still exists
68 if tmp_link.exists() or tmp_link.is_symlink():
69 tmp_link.unlink(missing_ok=True)
70
71
72def _format_log_line(prefix: str, stream: str, line: str) -> str:
73 """Format log line with ISO timestamp and labels.
74
75 Args:
76 prefix: Process identifier (e.g., "observer" or "describe:file.webm")
77 stream: "stdout" or "stderr"
78 line: Output line from process
79
80 Returns:
81 Formatted line: "2024-11-01T10:30:45 [prefix:stream] line\\n"
82 """
83 timestamp = datetime.now().isoformat(timespec="seconds")
84 clean_line = line.rstrip("\n")
85 return f"{timestamp} [{prefix}:{stream}] {clean_line}\n"
86
87
88class DailyLogWriter:
89 """Thread-safe log writer that automatically rolls over at midnight.
90
91 When ``day`` is provided, the writer is pinned to that day directory
92 and midnight rollover is disabled (batch processing of historical days).
93
94 Writes to: journal/{YYYYMMDD}/health/{ref}_{name}.log
95
96 Creates and maintains symlinks:
97 - journal/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level)
98 - journal/health/{name}.log -> {YYYYMMDD}/health/{ref}_{name}.log (journal-level)
99
100 When the day changes, automatically closes old file, opens new file, and updates symlinks.
101 """
102
103 def __init__(self, ref: str, name: str, day: str | None = None):
104 self._ref = ref
105 self._name = name
106 self._pinned = day is not None
107 self._lock = threading.Lock()
108 self._current_day = day or _current_day()
109 self._fh = self._open_log()
110 self._update_symlinks()
111
112 def _open_log(self):
113 """Open log file for current day."""
114 log_path = _day_health_log_path(self._current_day, self._ref, self._name)
115 log_path.parent.mkdir(parents=True, exist_ok=True)
116 return log_path.open("a", encoding="utf-8")
117
118 def _update_symlinks(self) -> None:
119 """Update day-level and journal-level symlinks to point to current log."""
120 journal = _get_journal_path()
121 day_health = journal / self._current_day / "health"
122 log_filename = f"{self._ref}_{self._name}.log"
123
124 # Day-level symlink: {YYYYMMDD}/health/{name}.log -> {ref}_{name}.log
125 day_symlink = day_health / f"{self._name}.log"
126 _atomic_symlink(day_symlink, log_filename)
127
128 # Journal-level symlink: health/{name}.log -> ../{YYYYMMDD}/health/{ref}_{name}.log
129 # Relative from journal/health/ to journal/{YYYYMMDD}/health/
130 journal_symlink = journal / "health" / f"{self._name}.log"
131 relative_target = f"../{self._current_day}/health/{log_filename}"
132 _atomic_symlink(journal_symlink, relative_target)
133
134 def write(self, message: str) -> None:
135 """Write message to log, handling day rollover."""
136 with self._lock:
137 if not self._pinned:
138 # Check for day change
139 day_now = _current_day()
140 if day_now != self._current_day:
141 # Close old log
142 if not self._fh.closed:
143 self._fh.close()
144 # Open new log for new day
145 self._current_day = day_now
146 self._fh = self._open_log()
147 # Update symlinks to point to new day's file
148 self._update_symlinks()
149
150 # Write and flush
151 self._fh.write(message)
152 self._fh.flush()
153
154 def close(self) -> None:
155 """Close log file."""
156 with self._lock:
157 if not self._fh.closed:
158 self._fh.close()
159
160 @property
161 def path(self) -> Path:
162 """Get current log file path."""
163 return _day_health_log_path(self._current_day, self._ref, self._name)
164
165
166@dataclass
167class ManagedProcess:
168 """Subprocess wrapper with automatic output logging and lifecycle management.
169
170 All output is automatically logged to:
171 journal/{YYYYMMDD}/health/{ref}_{name}.log
172
173 Where name is derived from cmd[0] basename, and ref is a unique correlation ID.
174
175 Symlinks are automatically created and maintained:
176 journal/{YYYYMMDD}/health/{name}.log -> {ref}_{name}.log (day-level)
177 journal/health/{name}.log -> {YYYYMMDD}/health/{ref}_{name}.log (journal-level)
178
179 Logs roll over automatically at midnight for long-running processes.
180
181 Process lifecycle events are broadcast via Callosum logs tract.
182 """
183
184 process: subprocess.Popen
185 name: str
186 log_writer: DailyLogWriter
187 cmd: list[str]
188 _threads: list[threading.Thread]
189 ref: str
190 _start_time: float
191 _callosum: CallosumConnection | None
192 _owns_callosum: bool = True
193
194 @property
195 def start_time(self) -> float:
196 """Epoch timestamp when this process was spawned."""
197 return self._start_time
198
199 @classmethod
200 def spawn(
201 cls,
202 cmd: list[str],
203 *,
204 env: dict | None = None,
205 ref: str | None = None,
206 callosum: CallosumConnection | None = None,
207 day: str | None = None,
208 ) -> "ManagedProcess":
209 """Spawn process with automatic output logging to daily health directory.
210
211 Args:
212 cmd: Command and arguments
213 env: Optional environment variables (inherits parent env if not provided)
214 ref: Optional correlation ID (auto-generated if not provided)
215 callosum: Optional shared CallosumConnection (creates new one if not provided)
216 day: Optional day override (YYYYMMDD). When provided, logs are placed
217 in that day's health directory instead of today's.
218
219 Returns:
220 ManagedProcess instance
221
222 Raises:
223 RuntimeError: If process fails to spawn
224
225 Example:
226 managed = ManagedProcess.spawn(["observer", "-v"])
227 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_observer.log
228 # Symlinks: {YYYYMMDD}/health/observer.log (day-level)
229 # health/observer.log (journal-level)
230
231 # With explicit correlation ID:
232 managed = ManagedProcess.spawn(
233 ["sol", "indexer", "--rescan"],
234 ref="1730476800000",
235 )
236 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log
237 """
238 # Derive name from command - use subcommand if invoked via sol
239 if cmd[0] == "sol" and len(cmd) > 1:
240 name = cmd[1]
241 else:
242 name = Path(cmd[0]).name
243
244 # Generate correlation ID (use provided ref, else timestamp)
245 ref = ref if ref else str(now_ms())
246 start_time = time.time()
247
248 # Use provided callosum or create new one
249 owns_callosum = callosum is None
250 if owns_callosum:
251 callosum = CallosumConnection()
252 callosum.start()
253
254 log_writer = DailyLogWriter(ref, name, day=day)
255
256 logger.info(f"Starting {name}: {' '.join(cmd)}")
257
258 try:
259 proc = subprocess.Popen(
260 cmd,
261 stdout=subprocess.PIPE,
262 stderr=subprocess.PIPE,
263 text=True,
264 bufsize=1,
265 env=env,
266 )
267 except Exception as exc:
268 log_writer.close()
269 if owns_callosum and callosum:
270 callosum.stop()
271 raise RuntimeError(f"Failed to spawn {name}: {exc}") from exc
272
273 logger.info(f"Started {name} with PID {proc.pid}")
274
275 # Emit exec event
276 if callosum:
277 callosum.emit(
278 "logs",
279 "exec",
280 ref=ref,
281 name=name,
282 pid=proc.pid,
283 cmd=list(cmd),
284 log_path=str(log_writer.path),
285 )
286
287 # Start output streaming threads
288 def stream_output(pipe, stream_label: str):
289 if pipe is None:
290 return
291 with pipe:
292 for line in pipe:
293 formatted = _format_log_line(name, stream_label, line)
294 log_writer.write(formatted)
295
296 # Emit line event
297 if callosum:
298 callosum.emit(
299 "logs",
300 "line",
301 ref=ref,
302 name=name,
303 pid=proc.pid,
304 stream=stream_label,
305 line=line.rstrip("\n"),
306 )
307
308 threads = [
309 threading.Thread(
310 target=stream_output,
311 args=(proc.stdout, "stdout"),
312 daemon=True,
313 ),
314 threading.Thread(
315 target=stream_output,
316 args=(proc.stderr, "stderr"),
317 daemon=True,
318 ),
319 ]
320 for thread in threads:
321 thread.start()
322
323 return cls(
324 process=proc,
325 name=name,
326 log_writer=log_writer,
327 cmd=list(cmd),
328 _threads=threads,
329 ref=ref,
330 _start_time=start_time,
331 _callosum=callosum,
332 _owns_callosum=owns_callosum,
333 )
334
335 def wait(self, timeout: float | None = None) -> int:
336 """Wait for process completion, return exit code.
337
338 Args:
339 timeout: Optional timeout in seconds
340
341 Returns:
342 Exit code
343
344 Raises:
345 subprocess.TimeoutExpired: If timeout exceeded
346 """
347 return self.process.wait(timeout=timeout)
348
349 def poll(self) -> int | None:
350 """Check if process has terminated.
351
352 Returns:
353 Exit code if terminated, None if still running
354 """
355 return self.process.poll()
356
357 def is_running(self) -> bool:
358 """Check if process is still running."""
359 return self.process.poll() is None
360
361 def terminate(self, timeout: float = 15) -> int:
362 """Gracefully terminate process with automatic escalation.
363
364 This method handles the full termination sequence in ONE CALL:
365 1. Send SIGTERM (graceful shutdown request)
366 2. Wait up to `timeout` seconds for process to exit
367 3. If still alive, send SIGKILL (force kill)
368 4. Wait for final cleanup (max 1 second)
369 5. Return exit code
370
371 Args:
372 timeout: Seconds to wait after SIGTERM before SIGKILL (default: 15)
373
374 Returns:
375 Exit code (may be negative for signals, e.g., -15 for SIGTERM)
376
377 Example:
378 exit_code = managed.terminate(timeout=10) # One call, blocks until dead
379 """
380 logger.debug(f"Terminating {self.name} (PID {self.pid})...")
381 try:
382 self.process.terminate()
383 exit_code = self.process.wait(timeout=timeout)
384 logger.debug(f"{self.name} terminated gracefully with code {exit_code}")
385 return exit_code
386 except subprocess.TimeoutExpired:
387 logger.warning(
388 f"{self.name} did not terminate after {timeout}s, force killing..."
389 )
390 self.process.kill()
391 exit_code = self.process.wait(timeout=1)
392 logger.debug(f"{self.name} killed with code {exit_code}")
393 return exit_code
394
395 def cleanup(self) -> None:
396 """Wait for output threads to finish and close log file.
397
398 Call this after process exits to clean up resources.
399 """
400 for thread in self._threads:
401 thread.join(timeout=1)
402 self.log_writer.close()
403
404 # Emit exit event
405 if self._callosum:
406 duration_ms = int((time.time() - self._start_time) * 1000)
407 self._callosum.emit(
408 "logs",
409 "exit",
410 ref=self.ref,
411 name=self.name,
412 pid=self.pid,
413 exit_code=self.returncode,
414 duration_ms=duration_ms,
415 cmd=self.cmd,
416 log_path=str(self.log_writer.path),
417 )
418 # Only stop callosum if we created it (not shared)
419 if self._owns_callosum:
420 self._callosum.stop()
421
422 @property
423 def pid(self) -> int:
424 """Process ID."""
425 return self.process.pid
426
427 @property
428 def returncode(self) -> int | None:
429 """Return code if process has exited, None otherwise."""
430 return self.process.returncode
431
432
433def run_task(
434 cmd: list[str],
435 *,
436 timeout: float | None = None,
437 env: dict | None = None,
438 ref: str | None = None,
439 callosum: CallosumConnection | None = None,
440 day: str | None = None,
441) -> tuple[bool, int, Path]:
442 """Run a task to completion with automatic logging (blocking).
443
444 Spawns process, waits for completion, cleans up resources.
445 Output is automatically logged to: journal/{YYYYMMDD}/health/{ref}_{name}.log
446 where name is derived from cmd[0] basename.
447
448 Args:
449 cmd: Command and arguments
450 timeout: Optional timeout in seconds
451 env: Optional environment variables
452 ref: Optional correlation ID (auto-generated if not provided)
453 callosum: Optional shared CallosumConnection (creates new one if not provided)
454 day: Optional day override (YYYYMMDD). When provided, logs are placed
455 in that day's health directory instead of today's.
456
457 Returns:
458 (success, exit_code, log_path) tuple where success = (exit_code == 0)
459 and log_path points to the process output log file.
460
461 Example:
462 success, code, log = run_task(
463 ["sol", "generate", "20241101", "-f", "flow"],
464 timeout=300,
465 )
466 # Logs to: {JOURNAL}/{YYYYMMDD}/health/{ref}_generate.log
467
468 # With explicit correlation ID:
469 success, code, log = run_task(
470 ["sol", "indexer", "--rescan"],
471 ref="1730476800000",
472 )
473 # Logs to: {JOURNAL}/{YYYYMMDD}/health/1730476800000_indexer.log
474 """
475 managed = ManagedProcess.spawn(cmd, env=env, ref=ref, callosum=callosum, day=day)
476 log_path = managed.log_writer.path
477 try:
478 exit_code = managed.wait(timeout=timeout)
479 except subprocess.TimeoutExpired:
480 logger.error(f"{managed.name} timed out after {timeout}s, terminating...")
481 exit_code = managed.terminate()
482 finally:
483 managed.cleanup()
484
485 if exit_code != 0:
486 logger.warning(f"{managed.name} exited with code {exit_code}")
487
488 return (exit_code == 0, exit_code, log_path)