# SPDX-License-Identifier: AGPL-3.0-only # Copyright (c) 2026 sol pbc from __future__ import annotations import argparse import asyncio import json import logging import os import signal import subprocess import sys import threading import time from dataclasses import dataclass, field from datetime import datetime from pathlib import Path from desktop_notifier import DesktopNotifier, Urgency from observe.sync import check_remote_health from think import routines, scheduler from think.callosum import CallosumConnection, CallosumServer from think.runner import DailyLogWriter from think.runner import ManagedProcess as RunnerManagedProcess from think.utils import ( find_available_port, get_journal, get_journal_info, get_rev, now_ms, setup_cli, updated_days, ) DEFAULT_THRESHOLD = 60 CHECK_INTERVAL = 30 MAX_UPDATED_CATCHUP = 4 EXIT_TEMPFAIL = 75 # EX_TEMPFAIL: service prerequisites not ready TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit # Global shutdown flag shutdown_requested = False # Supervisor identity (set in main() once ref is assigned) _supervisor_ref: str | None = None _supervisor_start: float | None = None class CallosumLogHandler(logging.Handler): """Logging handler that emits log records as callosum ``logs`` tract events. Silently drops events on any error — callosum mirroring is best-effort. """ def __init__(self, conn: CallosumConnection, ref: str): super().__init__() self._conn = conn self._ref = ref self._pid = os.getpid() self._emitting = False def emit(self, record: logging.LogRecord) -> None: if self._emitting: return self._emitting = True try: self._conn.emit( "logs", "line", ref=self._ref, name="supervisor", pid=self._pid, stream="log", line=self.format(record), ) except Exception: pass finally: self._emitting = False # Desktop notification system _notifier: DesktopNotifier | None = None _notification_ids: dict[tuple, str] = {} # Maps alert_key -> notification_id class AlertManager: """Manages alerts with exponential backoff and notification clearing.""" def __init__(self, initial_backoff: int = 60, max_backoff: int = 3600): self._state: dict[tuple, tuple[float, int]] = {} # {key: (last_time, backoff)} self._initial_backoff = initial_backoff self._max_backoff = max_backoff async def alert_if_ready(self, key: tuple, message: str) -> bool: """Send alert with exponential backoff. Returns True if sent.""" now = time.time() if key in self._state: last_time, backoff = self._state[key] if now - last_time >= backoff: await send_notification(message, alert_key=key) new_backoff = min(backoff * 2, self._max_backoff) self._state[key] = (now, new_backoff) logging.info(f"Alert sent, next backoff: {new_backoff}s") return True else: remaining = int(backoff - (now - last_time)) logging.info(f"Suppressing alert, next in {remaining}s") return False else: await send_notification(message, alert_key=key) self._state[key] = (now, self._initial_backoff) return True async def clear(self, key: tuple) -> None: """Clear alert state and notification.""" if key in self._state: del self._state[key] await clear_notification(key) def clear_matching(self, predicate) -> None: """Clear alert states matching predicate.""" self._state = {k: v for k, v in self._state.items() if not predicate(k, v)} class TaskQueue: """Manages on-demand task execution with per-command serialization. Tasks are serialized by command name - only one task per command runs at a time. Additional requests for the same command are queued (deduped by exact cmd match). Multiple callers requesting the same work have their refs coalesced so all get notified when the task completes. The lock only protects state mutations, never held during I/O operations. """ def __init__(self, on_queue_change: callable = None): """Initialize task queue. Args: on_queue_change: Optional callback(cmd_name, running_ref, queue_entries) called after queue state changes. Called outside lock. """ self._running: dict[str, str] = {} # command_name -> ref of running task self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts self._active: dict[str, RunnerManagedProcess] = {} # ref -> process self._lock = threading.Lock() self._on_queue_change = on_queue_change @staticmethod def get_command_name(cmd: list[str]) -> str: """Extract command name from cmd array for queue serialization. For 'sol X' commands, returns X. Otherwise returns cmd[0] basename. """ if cmd and cmd[0] == "sol" and len(cmd) > 1: return cmd[1] return Path(cmd[0]).name if cmd else "unknown" def _notify_queue_change(self, cmd_name: str) -> None: """Notify listener of queue state change (called outside lock).""" if not self._on_queue_change: return with self._lock: queue = list(self._queues.get(cmd_name, [])) running_ref = self._running.get(cmd_name) self._on_queue_change(cmd_name, running_ref, queue) def submit( self, cmd: list[str], ref: str | None = None, day: str | None = None, ) -> str | None: """Submit a task for execution. If no task of this command type is running, starts immediately. Otherwise queues (deduped by exact cmd match, refs coalesced). Args: cmd: Command to execute ref: Optional caller-provided ref for tracking day: Optional day override (YYYYMMDD) for log placement Returns: ref if task was started/queued, None if already tracked (no change) """ ref = ref or str(now_ms()) cmd_name = self.get_command_name(cmd) should_notify = False should_start = False with self._lock: if cmd_name in self._running: # Command already running - queue or coalesce queue = self._queues.setdefault(cmd_name, []) existing = next((q for q in queue if q["cmd"] == cmd), None) if existing: if ref not in existing["refs"]: existing["refs"].append(ref) logging.info( f"Added ref {ref} to queued task {cmd_name} " f"(refs: {len(existing['refs'])})" ) should_notify = True else: logging.debug(f"Ref already tracked for queued task: {ref}") return None else: queue.append({"refs": [ref], "cmd": cmd, "day": day}) logging.info( f"Queued task {cmd_name}: {' '.join(cmd)} ref={ref} " f"(queue: {len(queue)})" ) should_notify = True else: # Not running - mark as running and start self._running[cmd_name] = ref should_start = True # Notify outside lock if should_notify: self._notify_queue_change(cmd_name) return ref # Start task outside lock if should_start: threading.Thread( target=self._run_task, args=([ref], cmd, cmd_name, day), daemon=True, ).start() return ref return None def _run_task( self, refs: list[str], cmd: list[str], cmd_name: str, day: str | None = None, ) -> None: """Execute a task and handle completion. Args: refs: List of refs to notify on completion cmd: Command to execute cmd_name: Command name for queue management day: Optional day override (YYYYMMDD) for log placement """ callosum = CallosumConnection() managed = None primary_ref = refs[0] service = cmd_name try: callosum.start() logging.info(f"Starting task {primary_ref}: {' '.join(cmd)}") managed = RunnerManagedProcess.spawn( cmd, ref=primary_ref, callosum=callosum, day=day ) self._active[primary_ref] = managed callosum.emit( "supervisor", "started", service=service, pid=managed.pid, ref=primary_ref, ) exit_code = managed.wait() for ref in refs: callosum.emit( "supervisor", "stopped", service=service, pid=managed.pid, ref=ref, exit_code=exit_code, ) if exit_code == 0: logging.info(f"Task {cmd_name} ({primary_ref}) finished successfully") else: logging.warning( f"Task {cmd_name} ({primary_ref}) failed with exit code {exit_code}" ) except Exception as e: logging.exception( f"Task {cmd_name} ({primary_ref}) encountered exception: {e}" ) for ref in refs: callosum.emit( "supervisor", "stopped", service=service, pid=managed.pid if managed else 0, ref=ref, exit_code=-1, ) finally: if managed: managed.cleanup() self._active.pop(primary_ref, None) callosum.stop() self._process_next(cmd_name) def _process_next(self, cmd_name: str) -> None: """Process next queued task after completion.""" next_cmd = None refs = None day = None with self._lock: queue = self._queues.get(cmd_name, []) if queue: entry = queue.pop(0) refs = entry["refs"] next_cmd = entry["cmd"] day = entry.get("day") self._running[cmd_name] = refs[0] logging.info( f"Dequeued task {cmd_name}: {' '.join(next_cmd)} refs={refs} " f"(remaining: {len(queue)})" ) else: self._running.pop(cmd_name, None) # Notify and spawn outside lock self._notify_queue_change(cmd_name) if next_cmd: threading.Thread( target=self._run_task, args=(refs, next_cmd, cmd_name, day), daemon=True, ).start() def cancel(self, ref: str) -> bool: """Cancel a running task. Returns: True if task was found and terminated, False otherwise """ if ref not in self._active: logging.warning(f"Cannot cancel task {ref}: not found") return False managed = self._active[ref] if not managed.is_running(): logging.debug(f"Task {ref} already finished") return False logging.info(f"Cancelling task {ref}...") managed.terminate() return True def get_status(self, ref: str) -> dict: """Get status of a task.""" if ref not in self._active: return {"status": "not_found"} managed = self._active[ref] return { "status": "running" if managed.is_running() else "finished", "pid": managed.pid, "returncode": managed.returncode, "log_path": str(managed.log_writer.path), "cmd": managed.cmd, } def collect_task_status(self) -> list[dict]: """Collect status of all running tasks for supervisor status.""" now = time.time() tasks = [] for ref, managed in self._active.items(): if managed.is_running(): duration = int(now - managed.start_time) cmd_name = TaskQueue.get_command_name(managed.cmd) tasks.append( { "ref": ref, "name": cmd_name, "duration_seconds": duration, } ) return tasks def collect_queue_counts(self) -> dict[str, int]: """Snapshot per-command queue depths for status reporting.""" with self._lock: return { cmd_name: len(queue) for cmd_name, queue in self._queues.items() if queue } # Global task queue instance (initialized in main()) _task_queue: TaskQueue | None = None # Global supervisor callosum connection for event emissions _supervisor_callosum: CallosumConnection | None = None # Global reference to managed processes for restart control _managed_procs: list[ManagedProcess] = [] # Global reference to in-process Callosum server _callosum_server: CallosumServer | None = None _callosum_thread: threading.Thread | None = None # Restart request tracking for SIGKILL enforcement _restart_requests: dict[str, tuple[float, subprocess.Popen]] = {} # Track whether running in remote mode (upload-only, no local processing) _is_remote_mode: bool = False # State for daily processing (tracks day boundary for midnight dream trigger) _daily_state = { "last_day": None, # Track which day we last processed } # Timeout before flushing stale segments (seconds) FLUSH_TIMEOUT = 3600 # State for segment flush (close out dangling agent state after inactivity) _flush_state: dict = { "last_segment_ts": 0.0, # Wall-clock time of last observe.observed event "day": None, # Day of last observed segment "segment": None, # Last observed segment key "flushed": False, # Whether flush has already run for current segment } def _get_journal_path() -> Path: return Path(get_journal()) class RestartPolicy: """Track restart attempts and compute backoff delays.""" _SCHEDULE = (0, 1, 5) def __init__(self) -> None: self.attempts = 0 self.last_start = 0.0 def record_start(self) -> None: self.last_start = time.time() def reset_attempts(self) -> None: self.attempts = 0 def next_delay(self) -> int: delay = self._SCHEDULE[min(self.attempts, len(self._SCHEDULE) - 1)] self.attempts += 1 return delay _RESTART_POLICIES: dict[str, RestartPolicy] = {} def _get_restart_policy(name: str) -> RestartPolicy: return _RESTART_POLICIES.setdefault(name, RestartPolicy()) @dataclass class ManagedProcess: """Wrapper around RunnerManagedProcess for restart policy tracking.""" process: subprocess.Popen name: str log_writer: DailyLogWriter cmd: list[str] restart: bool = False shutdown_timeout: int = 15 threads: list[threading.Thread] = field(default_factory=list) ref: str = "" def cleanup(self) -> None: for thread in self.threads: thread.join(timeout=1) self.log_writer.close() def _launch_process( name: str, cmd: list[str], *, restart: bool = False, ref: str | None = None, ) -> ManagedProcess: # NOTE: All child processes should include -v for verbose logging by default. # This ensures their output is captured in logs for debugging. """Launch process with automatic output logging and restart policy tracking.""" policy: RestartPolicy | None = None if restart: policy = _get_restart_policy(name) # Generate ref if not provided ref = ref if ref else str(now_ms()) # Use unified runner to spawn process (share supervisor's callosum) try: managed = RunnerManagedProcess.spawn( cmd, ref=ref, callosum=_supervisor_callosum ) except RuntimeError as exc: logging.error(str(exc)) raise if policy: policy.record_start() # Emit started event if _supervisor_callosum: _supervisor_callosum.emit( "supervisor", "started", service=name, pid=managed.process.pid, ref=managed.ref, ) # Wrap in ManagedProcess for restart tracking return ManagedProcess( process=managed.process, name=name, log_writer=managed.log_writer, cmd=list(cmd), restart=restart, threads=managed._threads, ref=managed.ref, ) def _get_notifier() -> DesktopNotifier: """Get or create the global desktop notifier instance.""" global _notifier if _notifier is None: _notifier = DesktopNotifier(app_name="solstone Supervisor") return _notifier async def send_notification(message: str, alert_key: tuple | None = None) -> None: """Send a desktop notification with ``message``. Args: message: The notification message to display alert_key: Optional key to track this notification for later clearing """ try: notifier = _get_notifier() notification_id = await notifier.send( title="solstone Supervisor", message=message, urgency=Urgency.Critical, ) # Store notification ID if we have an alert key if alert_key and notification_id: _notification_ids[alert_key] = notification_id logging.debug(f"Stored notification {notification_id} for key {alert_key}") except Exception as exc: # pragma: no cover - system issues logging.error("Failed to send notification: %s", exc) async def clear_notification(alert_key: tuple) -> None: """Clear a notification by its alert key. Args: alert_key: The key used when the notification was sent """ if alert_key not in _notification_ids: return try: notifier = _get_notifier() notification_id = _notification_ids[alert_key] await notifier.clear(notification_id) del _notification_ids[alert_key] logging.debug(f"Cleared notification for key {alert_key}") except Exception as exc: # pragma: no cover - system issues logging.error("Failed to clear notification: %s", exc) def _emit_queue_event(cmd_name: str, running_ref: str, queue: list) -> None: """Emit supervisor.queue event with current queue state for a command. This is the callback passed to TaskQueue for queue change notifications. """ if not _supervisor_callosum: return _supervisor_callosum.emit( "supervisor", "queue", command=cmd_name, running=running_ref, queued=len(queue), queue=queue, ) def _handle_task_request(message: dict) -> None: """Handle incoming task request from Callosum.""" if message.get("tract") != "supervisor" or message.get("event") != "request": return cmd = message.get("cmd") if not cmd: logging.error(f"Invalid task request: missing cmd: {message}") return ref = message.get("ref") day = message.get("day") if _task_queue: _task_queue.submit(cmd, ref, day=day) def _restart_service(service: str) -> bool: """Send SIGINT to a managed service to trigger graceful restart. Returns True if the service was found and running, False if not found or already exited. """ for proc in _managed_procs: if proc.name == service: if proc.process.poll() is not None: logging.debug( f"Ignoring restart for {service}: already exited, awaiting auto-restart" ) return False logging.info(f"Restart requested for {service}, sending SIGINT...") if _supervisor_callosum: _supervisor_callosum.emit( "supervisor", "restarting", service=service, pid=proc.process.pid, ref=proc.ref, ) try: proc.process.send_signal(signal.SIGINT) _restart_requests[service] = (time.time(), proc.process) except Exception as e: logging.error(f"Failed to send SIGINT to {service}: {e}") return True logging.warning(f"Cannot restart {service}: not found in managed processes") return False def _handle_supervisor_request(message: dict) -> None: """Handle incoming supervisor control messages.""" if message.get("tract") != "supervisor" or message.get("event") != "restart": return service = message.get("service") if not service: logging.error("Invalid restart request: missing service") return if service == "supervisor": logging.debug("Ignoring restart request for supervisor itself") return _restart_service(service) def get_task_status(ref: str) -> dict: """Get status of a task. Args: ref: Task correlation ID Returns: Dict with status info, or {"status": "not_found"} if task doesn't exist """ if _task_queue: return _task_queue.get_status(ref) return {"status": "not_found"} def collect_status(procs: list[ManagedProcess]) -> dict: """Collect current supervisor status for broadcasting.""" now = time.time() # Running services services = [] running_names = set() for proc in procs: if proc.process.poll() is None: # Still running policy = _get_restart_policy(proc.name) uptime = int(now - policy.last_start) if policy.last_start else 0 services.append( { "name": proc.name, "ref": proc.ref, "pid": proc.process.pid, "uptime_seconds": uptime, } ) running_names.add(proc.name) # Prepend supervisor itself if _supervisor_ref and _supervisor_start: services.insert( 0, { "name": "supervisor", "ref": _supervisor_ref, "pid": os.getpid(), "uptime_seconds": int(now - _supervisor_start), }, ) # Crashed services (in restart backoff) crashed = [] for name, policy in _RESTART_POLICIES.items(): if name not in running_names and policy.attempts > 0: crashed.append( { "name": name, "restart_attempts": policy.attempts, } ) # Running tasks tasks = _task_queue.collect_task_status() if _task_queue else [] queues = _task_queue.collect_queue_counts() if _task_queue else {} # Scheduled tasks schedules = scheduler.collect_status() # Connected callosum clients callosum_clients = _callosum_server.client_count() if _callosum_server else 0 return { "services": services, "crashed": crashed, "tasks": tasks, "queues": queues, "stale_heartbeats": [], "schedules": schedules, "callosum_clients": callosum_clients, } def start_sense() -> ManagedProcess: """Launch sol sense with output logging.""" return _launch_process("sense", ["sol", "sense", "-v"], restart=True) def start_sync(remote_url: str) -> ManagedProcess: """Launch sol sync with output logging. Args: remote_url: Remote ingest URL for sync service """ managed = _launch_process( "sync", ["sol", "sync", "-v", "--remote", remote_url], restart=True, ) # Sync shutdown can block while draining pending segments. # Give it extra time so the supervisor does not cut it off early. managed.shutdown_timeout = 90 return managed def start_callosum_in_process() -> CallosumServer: """Start Callosum message bus server in-process. Runs the server in a background thread and waits for socket to be ready. Returns: CallosumServer instance """ global _callosum_server, _callosum_thread server = CallosumServer() _callosum_server = server # Pre-delete stale socket to avoid race condition where the ready check # passes due to an old socket file before the server thread deletes it socket_path = server.socket_path socket_path.parent.mkdir(parents=True, exist_ok=True) if socket_path.exists(): socket_path.unlink() # Start server in background thread (server.start() is blocking) thread = threading.Thread(target=server.start, daemon=False, name="callosum-server") thread.start() _callosum_thread = thread # Wait for socket to be ready (with timeout) for _ in range(50): # Wait up to 500ms if socket_path.exists(): logging.info(f"Callosum server started on {socket_path}") return server time.sleep(0.01) raise RuntimeError("Callosum server failed to create socket within 500ms") def stop_callosum_in_process() -> None: """Stop the in-process Callosum server.""" global _callosum_server, _callosum_thread if _callosum_server: logging.info("Stopping Callosum server...") _callosum_server.stop() if _callosum_thread: _callosum_thread.join(timeout=5) if _callosum_thread.is_alive(): logging.warning("Callosum server thread did not stop cleanly") _callosum_server = None _callosum_thread = None def start_cortex_server() -> ManagedProcess: """Launch the Cortex WebSocket API server.""" cmd = ["sol", "cortex", "-v"] return _launch_process("cortex", cmd, restart=True) def start_convey_server( verbose: bool, debug: bool = False, port: int = 0 ) -> tuple[ManagedProcess, int]: """Launch the Convey web application with optional verbose and debug logging. Returns: Tuple of (ManagedProcess, resolved_port) where resolved_port is the actual port being used (auto-selected if port was 0). """ # Resolve port 0 to an available port before launching resolved_port = port if port != 0 else find_available_port() cmd = ["sol", "convey", "--port", str(resolved_port)] if debug: cmd.append("-d") elif verbose: cmd.append("-v") return _launch_process("convey", cmd, restart=True), resolved_port def check_runner_exits(procs: list[ManagedProcess]) -> list[ManagedProcess]: """Return managed processes that have exited.""" exited: list[ManagedProcess] = [] for managed in procs: if managed.process.poll() is not None: exited.append(managed) return exited async def handle_runner_exits( procs: list[ManagedProcess], alert_mgr: AlertManager, ) -> None: """Check for and handle exited processes with restart policy.""" exited = check_runner_exits(procs) if not exited: return exited_names = [managed.name for managed in exited] exit_key = ("runner_exit", tuple(sorted(exited_names))) # Check if all exits are tempfail (session not ready) all_tempfail = all(m.process.returncode == EXIT_TEMPFAIL for m in exited) if all_tempfail: logging.info("Runner waiting for session: %s", ", ".join(sorted(exited_names))) else: msg = f"Runner process exited: {', '.join(sorted(exited_names))}" logging.error(msg) await alert_mgr.alert_if_ready(exit_key, msg) for managed in exited: # Clear any pending restart request for this service _restart_requests.pop(managed.name, None) returncode = managed.process.returncode is_tempfail = returncode == EXIT_TEMPFAIL logging.info("%s exited with code %s", managed.name, returncode) # Emit stopped event if _supervisor_callosum: _supervisor_callosum.emit( "supervisor", "stopped", service=managed.name, pid=managed.process.pid, ref=managed.ref, exit_code=returncode, ) # Remove from procs list try: procs.remove(managed) except ValueError: pass managed.cleanup() # Handle restart if needed if managed.restart and not shutdown_requested: # Tempfail: use fixed longer delay, don't burn through backoff if is_tempfail: delay = TEMPFAIL_DELAY else: policy = _get_restart_policy(managed.name) uptime = time.time() - policy.last_start if policy.last_start else 0 if uptime >= 60: policy.reset_attempts() delay = policy.next_delay() if delay: logging.info("Waiting %ss before restarting %s", delay, managed.name) for _ in range(delay): if shutdown_requested: break await asyncio.sleep(1) if shutdown_requested: continue logging.info("Restarting %s...", managed.name) try: new_proc = _launch_process( managed.name, managed.cmd, restart=True, ) except Exception as exc: logging.exception("Failed to restart %s: %s", managed.name, exc) continue procs.append(new_proc) logging.info("Restarted %s after exit code %s", managed.name, returncode) # Clear the notification now that process has restarted await alert_mgr.clear(exit_key) else: logging.info("Not restarting %s", managed.name) def handle_daily_tasks() -> None: """Check for day change and submit daily dream for updated days (non-blocking). Triggers once when the day rolls over at midnight. Queries ``updated_days()`` for journal days that have new stream data but haven't completed a daily dream yet, then submits up to ``MAX_UPDATED_CATCHUP`` dreams in chronological order (oldest first, yesterday last) via the TaskQueue. Dream auto-detects updated state and enables ``--refresh`` internally, so we don't pass it here. Skipped in remote mode (no local data to process). """ # Remote mode: no local processing, data is on the server if _is_remote_mode: return today = datetime.now().date() # Only trigger when day actually changes (at midnight) if today != _daily_state["last_day"]: # The day that just ended is what we process prev_day = _daily_state["last_day"] # Guard against None (e.g., module reloaded without going through main()) if prev_day is None: logging.warning("Daily state not initialized, skipping daily processing") _daily_state["last_day"] = today return prev_day_str = prev_day.strftime("%Y%m%d") # Update state for new day _daily_state["last_day"] = today # Flush any dangling segment state from the previous day before daily dream if not _flush_state["flushed"] and _flush_state["day"] == prev_day_str: _check_segment_flush(force=True) today_str = today.strftime("%Y%m%d") all_updated = updated_days(exclude={today_str}) if not all_updated: logging.info("Day changed to %s, no updated days to process", today) return # Take the newest MAX_UPDATED_CATCHUP days (already sorted ascending) days_to_process = all_updated[-MAX_UPDATED_CATCHUP:] skipped = len(all_updated) - len(days_to_process) if skipped: logging.warning( "Skipping %d older updated days (max catchup %d): %s", skipped, MAX_UPDATED_CATCHUP, all_updated[:skipped], ) logging.info( "Day changed to %s, queuing daily dream for %d updated day(s): %s", today, len(days_to_process), days_to_process, ) # Submit oldest-first so yesterday is processed last for day_str in days_to_process: cmd = ["sol", "dream", "-v", "--day", day_str] if _task_queue: _task_queue.submit(cmd, day=day_str) logging.debug("Submitted daily dream for %s", day_str) else: logging.warning( "No task queue available for daily processing: %s", day_str ) def _handle_segment_observed(message: dict) -> None: """Handle segment completion events (from live observation or imports). Submits sol dream in segment mode via task queue, which handles both generators and segment agents. Also updates flush state to track segment recency. """ if message.get("tract") != "observe" or message.get("event") != "observed": return segment = message.get("segment") # e.g., "163045_300" if not segment: logging.warning("observed event missing segment field") return # Use day from event payload, fallback to today (for live observation) day = message.get("day") or datetime.now().strftime("%Y%m%d") stream = message.get("stream") # Update flush state — new segment resets the flush timer _flush_state["last_segment_ts"] = time.time() _flush_state["day"] = day _flush_state["segment"] = segment _flush_state["stream"] = stream _flush_state["flushed"] = False logging.info(f"Segment observed: {day}/{segment}, submitting processing...") # Submit via task queue — serializes with other dream invocations cmd = ["sol", "dream", "-v", "--day", day, "--segment", segment] if stream: cmd.extend(["--stream", stream]) if _task_queue: _task_queue.submit(cmd, day=day) else: logging.warning( "No task queue available for segment processing: %s/%s", day, segment ) def _check_segment_flush(force: bool = False) -> None: """Check if the last observed segment needs flushing. If no new segments have arrived within FLUSH_TIMEOUT seconds, runs ``sol dream --flush`` on the last segment to let flush-enabled agents close out dangling state (e.g., end active activities). Args: force: Skip timeout check (used at day boundary to flush before daily dream regardless of elapsed time). Skipped in remote mode (no local processing). """ if _is_remote_mode: return last_ts = _flush_state["last_segment_ts"] if not last_ts or _flush_state["flushed"]: return if not force and time.time() - last_ts < FLUSH_TIMEOUT: return day = _flush_state["day"] segment = _flush_state["segment"] if not day or not segment: return _flush_state["flushed"] = True stream = _flush_state.get("stream") cmd = ["sol", "dream", "-v", "--day", day, "--segment", segment, "--flush"] if stream: cmd.extend(["--stream", stream]) if _task_queue: _task_queue.submit(cmd, day=day) logging.info(f"Queued segment flush: {day}/{segment}") else: logging.warning( "No task queue available for segment flush: %s/%s", day, segment ) def _handle_segment_event_log(message: dict) -> None: """Log observe, dream, and activity events with day+segment to segment/events.jsonl. Any observe, dream, or activity tract message with both day and segment fields gets logged to journal/day/segment/events.jsonl if that directory exists. """ if message.get("tract") not in {"observe", "dream", "activity"}: return day = message.get("day") segment = message.get("segment") if not day or not segment: return stream = message.get("stream") try: journal_path = _get_journal_path() if stream: segment_dir = journal_path / day / stream / segment else: segment_dir = journal_path / day / segment # Only log if segment directory exists if not segment_dir.is_dir(): return events_file = segment_dir / "events.jsonl" # Append event as JSON line with open(events_file, "a", encoding="utf-8") as f: f.write(json.dumps(message, ensure_ascii=False) + "\n") except Exception as e: logging.debug(f"Failed to log segment event: {e}") def _handle_activity_recorded(message: dict) -> None: """Queue a per-activity dream task when an activity is recorded. Listens for activity.recorded events and submits a queued dream task for per-activity agent processing (serialized via TaskQueue). """ if message.get("tract") != "activity" or message.get("event") != "recorded": return record_id = message.get("id") facet = message.get("facet") day = message.get("day") if not record_id or not facet or not day: logging.warning("activity.recorded event missing required fields") return cmd = ["sol", "dream", "--activity", record_id, "--facet", facet, "--day", day] if _task_queue: _task_queue.submit(cmd, day=day) logging.info(f"Queued activity dream: {record_id} for #{facet}") else: logging.warning("No task queue available for activity dream: %s", record_id) def _handle_dream_daily_complete(message: dict) -> None: """Submit a heartbeat task after daily dream processing completes. Listens for dream.daily_complete events. Skips if a heartbeat process is already running (PID file guard). """ if message.get("tract") != "dream" or message.get("event") != "daily_complete": return # Check if heartbeat is already running via PID file pid_file = Path(get_journal()) / "health" / "heartbeat.pid" if pid_file.exists(): try: existing_pid = int(pid_file.read_text().strip()) os.kill(existing_pid, 0) logging.info("Heartbeat already running (pid=%d), skipping", existing_pid) return except ProcessLookupError: pass # Stale PID file, proceed except PermissionError: logging.info( "Heartbeat running under different user (pid file exists), skipping" ) return except ValueError: pass # Corrupt PID file, proceed cmd = ["sol", "heartbeat"] if _task_queue: _task_queue.submit(cmd) logging.info("Queued heartbeat after daily dream completion") else: logging.warning("No task queue available for heartbeat submission") def _handle_callosum_message(message: dict) -> None: """Dispatch incoming Callosum messages to appropriate handlers.""" _handle_task_request(message) _handle_supervisor_request(message) _handle_segment_observed(message) _handle_activity_recorded(message) _handle_dream_daily_complete(message) _handle_segment_event_log(message) async def supervise( *, daily: bool = True, schedule: bool = True, procs: list[ManagedProcess] | None = None, ) -> None: """Main supervision loop. Runs at 1-second intervals for responsiveness. Monitors runner health, emits status, triggers daily processing, and checks scheduled agents. """ alert_mgr = AlertManager() last_status_emit = 0.0 try: while ( not shutdown_requested ): # pragma: no cover - loop checked via unit tests by patching # Check for restart timeouts (enforce SIGKILL after 15s) for service, (start_time, proc) in list(_restart_requests.items()): if proc.poll() is not None: # Already exited _restart_requests.pop(service, None) elif time.time() - start_time > 15: logging.warning( f"{service} did not exit within 15s after SIGINT, sending SIGKILL" ) try: proc.kill() except Exception as e: logging.error(f"Failed to kill {service}: {e}") # Don't delete here - let handle_runner_exits clean up # Check for runner exits first (immediate alert) if procs: await handle_runner_exits(procs, alert_mgr) # Emit status every 5 seconds now = time.time() if now - last_status_emit >= 5: if _supervisor_callosum and procs: try: status = collect_status(procs) _supervisor_callosum.emit("supervisor", "status", **status) except Exception as e: logging.debug(f"Status emission failed: {e}") last_status_emit = now # Check for segment flush (non-blocking, submits via task queue) _check_segment_flush() # Check for daily processing (non-blocking, submits via task queue) if daily: handle_daily_tasks() # Check periodic task schedules (non-blocking, submits via callosum) if schedule: scheduler.check() routines.check() # Sleep 1 second before next iteration (responsive to shutdown) await asyncio.sleep(1) finally: pass # Callosum cleanup happens in main() def parse_args() -> argparse.ArgumentParser: parser = argparse.ArgumentParser(description="Monitor journaling health") parser.add_argument( "port", nargs="?", type=int, default=0, help="Convey port (0 = auto-select available port)", ) parser.add_argument( "--threshold", type=int, default=DEFAULT_THRESHOLD, help="Seconds before heartbeat considered stale", ) parser.add_argument( "--interval", type=int, default=CHECK_INTERVAL, help="Polling interval seconds" ) parser.add_argument( "--no-daily", action="store_true", help="Disable daily processing run at midnight", ) parser.add_argument( "--no-cortex", action="store_true", help="Do not start the Cortex server (run it manually for debugging)", ) parser.add_argument( "--no-convey", action="store_true", help="Do not start the Convey web application", ) parser.add_argument( "--no-schedule", action="store_true", help="Disable periodic task scheduler", ) parser.add_argument( "--remote", type=str, help="Remote mode: sync to server URL instead of local processing", ) return parser def handle_shutdown(signum, frame): """Handle shutdown signals gracefully.""" global shutdown_requested if not shutdown_requested: # Only log once shutdown_requested = True logging.info("Shutdown requested, cleaning up...") raise KeyboardInterrupt def main() -> None: parser = parse_args() # Capture journal info BEFORE setup_cli() loads .env and pollutes os.environ journal_info = get_journal_info() args = setup_cli(parser) journal_path = _get_journal_path() log_level = logging.DEBUG if args.debug else logging.INFO log_path = journal_path / "health" / "supervisor.log" log_path.parent.mkdir(parents=True, exist_ok=True) logging.getLogger().handlers = [] logging.basicConfig( level=log_level, handlers=[logging.FileHandler(log_path, encoding="utf-8")], format="%(asctime)s [supervisor:log] %(levelname)s %(message)s", datefmt="%Y-%m-%dT%H:%M:%S", ) if args.verbose or args.debug: console_handler = logging.StreamHandler() console_handler.setLevel(log_level) console_handler.setFormatter( logging.Formatter("%(asctime)s %(levelname)s %(message)s") ) logging.getLogger().addHandler(console_handler) # Singleton guard: only one supervisor per journal health_dir = journal_path / "health" lock_path = health_dir / "supervisor.lock" pid_path = health_dir / "supervisor.pid" import fcntl lock_fd = open(lock_path, "w") try: fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) except OSError: lock_fd.close() pid_str = "" try: pid_str = pid_path.read_text().strip() except OSError: pass pid_msg = f" (PID {pid_str})" if pid_str else "" sock_path = health_dir / "callosum.sock" if sock_path.exists(): try: from think.health_cli import health_check print(f"Supervisor already running{pid_msg}\n") health_check() except Exception: print(f"Supervisor already running{pid_msg}") else: print(f"Supervisor already running{pid_msg}") sys.exit(1) pid_path.write_text(str(os.getpid())) logging.info("Singleton lock acquired (PID %d)", os.getpid()) # Set up signal handlers signal.signal(signal.SIGINT, handle_shutdown) signal.signal(signal.SIGTERM, handle_shutdown) # Show journal path and source on startup path, source = journal_info print(f"Journal: {path} (from {source})") logging.info("Supervisor starting...") global _managed_procs, _supervisor_callosum, _is_remote_mode global _task_queue procs: list[ManagedProcess] = [] convey_port = None # Remote mode: run sync instead of local processing _is_remote_mode = bool(args.remote) # Start Callosum in-process first - it's the message bus that other services depend on try: start_callosum_in_process() except RuntimeError as e: logging.error(f"Failed to start Callosum server: {e}") parser.error(f"Failed to start Callosum server: {e}") return # Connect supervisor's Callosum client to capture startup events from other services try: _supervisor_callosum = CallosumConnection(defaults={"rev": get_rev()}) _supervisor_callosum.start(callback=_handle_callosum_message) logging.info("Supervisor connected to Callosum") except Exception as e: logging.warning(f"Failed to start Callosum connection: {e}") # Mirror supervisor log output to callosum logs tract (best-effort) supervisor_ref = str(now_ms()) global _supervisor_ref, _supervisor_start _supervisor_ref = supervisor_ref _supervisor_start = time.time() if _supervisor_callosum: try: handler = CallosumLogHandler(_supervisor_callosum, supervisor_ref) handler.setFormatter( logging.Formatter("%(asctime)s %(levelname)s %(message)s") ) logging.getLogger().addHandler(handler) except Exception: pass # Initialize task queue with callosum event callback _task_queue = TaskQueue(on_queue_change=_emit_queue_event) # Now start other services (their startup events will be captured) if _is_remote_mode: # Remote mode: verify remote server is reachable before starting sync logging.info("Remote mode: checking server connectivity...") success, message = check_remote_health(args.remote) if not success: logging.error(f"Remote health check failed: {message}") stop_callosum_in_process() parser.error(f"Remote server not available: {message}") logging.info(f"Remote server verified: {message}") procs.append(start_sync(args.remote)) else: # Local mode: convey first, then sense for file processing if not args.no_convey: proc, convey_port = start_convey_server( verbose=args.verbose, debug=args.debug, port=args.port ) procs.append(proc) # Sense handles file processing procs.append(start_sense()) # Cortex for agent execution if not args.no_cortex: procs.append(start_cortex_server()) # Make procs accessible to restart handler _managed_procs = procs # Initialize daily state to today - dream only triggers at midnight when day changes _daily_state["last_day"] = datetime.now().date() # Initialize periodic task scheduler schedule_enabled = not args.no_schedule and not _is_remote_mode if schedule_enabled and _supervisor_callosum: scheduler.init(_supervisor_callosum) scheduler.register_defaults() routines.init(_supervisor_callosum) # Show Convey URL if running if convey_port: print(f"Convey: http://localhost:{convey_port}/") logging.info(f"Started {len(procs)} processes, entering supervision loop") daily_enabled = not args.no_daily and not _is_remote_mode if daily_enabled: logging.info("Daily processing scheduled for midnight") # Startup catchup: submit dreams for days with pending stream data if daily_enabled: all_updated = updated_days() if all_updated: days_to_process = all_updated[-MAX_UPDATED_CATCHUP:] skipped = len(all_updated) - len(days_to_process) if skipped: logging.warning( "Startup catchup: skipping %d older updated days (max %d): %s", skipped, MAX_UPDATED_CATCHUP, all_updated[:skipped], ) logging.info( "Startup catchup: submitted %d day(s) with pending stream data: %s", len(days_to_process), days_to_process, ) for day_str in days_to_process: cmd = ["sol", "dream", "-v", "--day", day_str] if _task_queue: _task_queue.submit(cmd, day=day_str) logging.debug("Startup catchup: submitted dream for %s", day_str) else: logging.warning( "No task queue available for startup catchup: %s", day_str ) try: asyncio.run( supervise( daily=daily_enabled, schedule=schedule_enabled, procs=procs if procs else None, ) ) except KeyboardInterrupt: logging.info("Caught KeyboardInterrupt, shutting down...") finally: logging.info("Stopping all processes...") print("\nShutting down gracefully (this may take a moment)...", flush=True) def _stop_process(managed: ManagedProcess) -> None: name = managed.name proc = managed.process logging.info(f"Stopping {name}...") print(f" Stopping {name}...", end="", flush=True) try: proc.terminate() except Exception: pass try: timeout = getattr(managed, "shutdown_timeout", 15) proc.wait(timeout=timeout) print(" done", flush=True) except subprocess.TimeoutExpired: logging.warning(f"{name} did not terminate gracefully, killing...") print(" timeout, forcing kill...", flush=True) try: proc.kill() proc.wait(timeout=1) except Exception: pass managed.cleanup() # Stop services in reverse order for managed in reversed(procs): _stop_process(managed) # Save scheduler state before disconnecting if schedule_enabled and scheduler._state: try: scheduler.save_state() except Exception as exc: logging.warning("Failed to save scheduler state on shutdown: %s", exc) if schedule_enabled: try: routines.save_state() except Exception as exc: logging.warning("Failed to save routines state on shutdown: %s", exc) # Disconnect supervisor's Callosum connection if _supervisor_callosum: _supervisor_callosum.stop() logging.info("Supervisor disconnected from Callosum") # Stop in-process Callosum server last stop_callosum_in_process() logging.info("Supervisor shutdown complete.") print("Shutdown complete.", flush=True) if __name__ == "__main__": main()