personal memory agent
at main 1605 lines 54 kB view raw
1# SPDX-License-Identifier: AGPL-3.0-only 2# Copyright (c) 2026 sol pbc 3 4from __future__ import annotations 5 6import argparse 7import asyncio 8import json 9import logging 10import os 11import signal 12import subprocess 13import sys 14import threading 15import time 16from dataclasses import dataclass, field 17from datetime import datetime 18from pathlib import Path 19 20from desktop_notifier import DesktopNotifier, Urgency 21 22from observe.sync import check_remote_health 23from think import routines, scheduler 24from think.callosum import CallosumConnection, CallosumServer 25from think.runner import DailyLogWriter 26from think.runner import ManagedProcess as RunnerManagedProcess 27from think.utils import ( 28 find_available_port, 29 get_journal, 30 get_journal_info, 31 get_rev, 32 now_ms, 33 setup_cli, 34 updated_days, 35) 36 37DEFAULT_THRESHOLD = 60 38CHECK_INTERVAL = 30 39MAX_UPDATED_CATCHUP = 4 40EXIT_TEMPFAIL = 75 # EX_TEMPFAIL: service prerequisites not ready 41TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit 42 43# Global shutdown flag 44shutdown_requested = False 45# Supervisor identity (set in main() once ref is assigned) 46_supervisor_ref: str | None = None 47_supervisor_start: float | None = None 48 49 50class CallosumLogHandler(logging.Handler): 51 """Logging handler that emits log records as callosum ``logs`` tract events. 52 53 Silently drops events on any error — callosum mirroring is best-effort. 54 """ 55 56 def __init__(self, conn: CallosumConnection, ref: str): 57 super().__init__() 58 self._conn = conn 59 self._ref = ref 60 self._pid = os.getpid() 61 self._emitting = False 62 63 def emit(self, record: logging.LogRecord) -> None: 64 if self._emitting: 65 return 66 self._emitting = True 67 try: 68 self._conn.emit( 69 "logs", 70 "line", 71 ref=self._ref, 72 name="supervisor", 73 pid=self._pid, 74 stream="log", 75 line=self.format(record), 76 ) 77 except Exception: 78 pass 79 finally: 80 self._emitting = False 81 82 83# Desktop notification system 84_notifier: DesktopNotifier | None = None 85_notification_ids: dict[tuple, str] = {} # Maps alert_key -> notification_id 86 87 88class AlertManager: 89 """Manages alerts with exponential backoff and notification clearing.""" 90 91 def __init__(self, initial_backoff: int = 60, max_backoff: int = 3600): 92 self._state: dict[tuple, tuple[float, int]] = {} # {key: (last_time, backoff)} 93 self._initial_backoff = initial_backoff 94 self._max_backoff = max_backoff 95 96 async def alert_if_ready(self, key: tuple, message: str) -> bool: 97 """Send alert with exponential backoff. Returns True if sent.""" 98 now = time.time() 99 100 if key in self._state: 101 last_time, backoff = self._state[key] 102 if now - last_time >= backoff: 103 await send_notification(message, alert_key=key) 104 new_backoff = min(backoff * 2, self._max_backoff) 105 self._state[key] = (now, new_backoff) 106 logging.info(f"Alert sent, next backoff: {new_backoff}s") 107 return True 108 else: 109 remaining = int(backoff - (now - last_time)) 110 logging.info(f"Suppressing alert, next in {remaining}s") 111 return False 112 else: 113 await send_notification(message, alert_key=key) 114 self._state[key] = (now, self._initial_backoff) 115 return True 116 117 async def clear(self, key: tuple) -> None: 118 """Clear alert state and notification.""" 119 if key in self._state: 120 del self._state[key] 121 await clear_notification(key) 122 123 def clear_matching(self, predicate) -> None: 124 """Clear alert states matching predicate.""" 125 self._state = {k: v for k, v in self._state.items() if not predicate(k, v)} 126 127 128class TaskQueue: 129 """Manages on-demand task execution with per-command serialization. 130 131 Tasks are serialized by command name - only one task per command runs at a time. 132 Additional requests for the same command are queued (deduped by exact cmd match). 133 Multiple callers requesting the same work have their refs coalesced so all get 134 notified when the task completes. 135 136 The lock only protects state mutations, never held during I/O operations. 137 """ 138 139 def __init__(self, on_queue_change: callable = None): 140 """Initialize task queue. 141 142 Args: 143 on_queue_change: Optional callback(cmd_name, running_ref, queue_entries) 144 called after queue state changes. Called outside lock. 145 """ 146 self._running: dict[str, str] = {} # command_name -> ref of running task 147 self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts 148 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process 149 self._lock = threading.Lock() 150 self._on_queue_change = on_queue_change 151 152 @staticmethod 153 def get_command_name(cmd: list[str]) -> str: 154 """Extract command name from cmd array for queue serialization. 155 156 For 'sol X' commands, returns X. Otherwise returns cmd[0] basename. 157 """ 158 if cmd and cmd[0] == "sol" and len(cmd) > 1: 159 return cmd[1] 160 return Path(cmd[0]).name if cmd else "unknown" 161 162 def _notify_queue_change(self, cmd_name: str) -> None: 163 """Notify listener of queue state change (called outside lock).""" 164 if not self._on_queue_change: 165 return 166 167 with self._lock: 168 queue = list(self._queues.get(cmd_name, [])) 169 running_ref = self._running.get(cmd_name) 170 171 self._on_queue_change(cmd_name, running_ref, queue) 172 173 def submit( 174 self, 175 cmd: list[str], 176 ref: str | None = None, 177 day: str | None = None, 178 ) -> str | None: 179 """Submit a task for execution. 180 181 If no task of this command type is running, starts immediately. 182 Otherwise queues (deduped by exact cmd match, refs coalesced). 183 184 Args: 185 cmd: Command to execute 186 ref: Optional caller-provided ref for tracking 187 day: Optional day override (YYYYMMDD) for log placement 188 189 Returns: 190 ref if task was started/queued, None if already tracked (no change) 191 """ 192 ref = ref or str(now_ms()) 193 cmd_name = self.get_command_name(cmd) 194 195 should_notify = False 196 should_start = False 197 198 with self._lock: 199 if cmd_name in self._running: 200 # Command already running - queue or coalesce 201 queue = self._queues.setdefault(cmd_name, []) 202 existing = next((q for q in queue if q["cmd"] == cmd), None) 203 if existing: 204 if ref not in existing["refs"]: 205 existing["refs"].append(ref) 206 logging.info( 207 f"Added ref {ref} to queued task {cmd_name} " 208 f"(refs: {len(existing['refs'])})" 209 ) 210 should_notify = True 211 else: 212 logging.debug(f"Ref already tracked for queued task: {ref}") 213 return None 214 else: 215 queue.append({"refs": [ref], "cmd": cmd, "day": day}) 216 logging.info( 217 f"Queued task {cmd_name}: {' '.join(cmd)} ref={ref} " 218 f"(queue: {len(queue)})" 219 ) 220 should_notify = True 221 else: 222 # Not running - mark as running and start 223 self._running[cmd_name] = ref 224 should_start = True 225 226 # Notify outside lock 227 if should_notify: 228 self._notify_queue_change(cmd_name) 229 return ref 230 231 # Start task outside lock 232 if should_start: 233 threading.Thread( 234 target=self._run_task, 235 args=([ref], cmd, cmd_name, day), 236 daemon=True, 237 ).start() 238 return ref 239 240 return None 241 242 def _run_task( 243 self, 244 refs: list[str], 245 cmd: list[str], 246 cmd_name: str, 247 day: str | None = None, 248 ) -> None: 249 """Execute a task and handle completion. 250 251 Args: 252 refs: List of refs to notify on completion 253 cmd: Command to execute 254 cmd_name: Command name for queue management 255 day: Optional day override (YYYYMMDD) for log placement 256 """ 257 callosum = CallosumConnection() 258 managed = None 259 primary_ref = refs[0] 260 service = cmd_name 261 262 try: 263 callosum.start() 264 logging.info(f"Starting task {primary_ref}: {' '.join(cmd)}") 265 266 managed = RunnerManagedProcess.spawn( 267 cmd, ref=primary_ref, callosum=callosum, day=day 268 ) 269 self._active[primary_ref] = managed 270 271 callosum.emit( 272 "supervisor", 273 "started", 274 service=service, 275 pid=managed.pid, 276 ref=primary_ref, 277 ) 278 279 exit_code = managed.wait() 280 281 for ref in refs: 282 callosum.emit( 283 "supervisor", 284 "stopped", 285 service=service, 286 pid=managed.pid, 287 ref=ref, 288 exit_code=exit_code, 289 ) 290 291 if exit_code == 0: 292 logging.info(f"Task {cmd_name} ({primary_ref}) finished successfully") 293 else: 294 logging.warning( 295 f"Task {cmd_name} ({primary_ref}) failed with exit code {exit_code}" 296 ) 297 298 except Exception as e: 299 logging.exception( 300 f"Task {cmd_name} ({primary_ref}) encountered exception: {e}" 301 ) 302 for ref in refs: 303 callosum.emit( 304 "supervisor", 305 "stopped", 306 service=service, 307 pid=managed.pid if managed else 0, 308 ref=ref, 309 exit_code=-1, 310 ) 311 finally: 312 if managed: 313 managed.cleanup() 314 self._active.pop(primary_ref, None) 315 callosum.stop() 316 self._process_next(cmd_name) 317 318 def _process_next(self, cmd_name: str) -> None: 319 """Process next queued task after completion.""" 320 next_cmd = None 321 refs = None 322 day = None 323 324 with self._lock: 325 queue = self._queues.get(cmd_name, []) 326 if queue: 327 entry = queue.pop(0) 328 refs = entry["refs"] 329 next_cmd = entry["cmd"] 330 day = entry.get("day") 331 self._running[cmd_name] = refs[0] 332 logging.info( 333 f"Dequeued task {cmd_name}: {' '.join(next_cmd)} refs={refs} " 334 f"(remaining: {len(queue)})" 335 ) 336 else: 337 self._running.pop(cmd_name, None) 338 339 # Notify and spawn outside lock 340 self._notify_queue_change(cmd_name) 341 if next_cmd: 342 threading.Thread( 343 target=self._run_task, 344 args=(refs, next_cmd, cmd_name, day), 345 daemon=True, 346 ).start() 347 348 def cancel(self, ref: str) -> bool: 349 """Cancel a running task. 350 351 Returns: 352 True if task was found and terminated, False otherwise 353 """ 354 if ref not in self._active: 355 logging.warning(f"Cannot cancel task {ref}: not found") 356 return False 357 358 managed = self._active[ref] 359 if not managed.is_running(): 360 logging.debug(f"Task {ref} already finished") 361 return False 362 363 logging.info(f"Cancelling task {ref}...") 364 managed.terminate() 365 return True 366 367 def get_status(self, ref: str) -> dict: 368 """Get status of a task.""" 369 if ref not in self._active: 370 return {"status": "not_found"} 371 372 managed = self._active[ref] 373 return { 374 "status": "running" if managed.is_running() else "finished", 375 "pid": managed.pid, 376 "returncode": managed.returncode, 377 "log_path": str(managed.log_writer.path), 378 "cmd": managed.cmd, 379 } 380 381 def collect_task_status(self) -> list[dict]: 382 """Collect status of all running tasks for supervisor status.""" 383 now = time.time() 384 tasks = [] 385 for ref, managed in self._active.items(): 386 if managed.is_running(): 387 duration = int(now - managed.start_time) 388 cmd_name = TaskQueue.get_command_name(managed.cmd) 389 tasks.append( 390 { 391 "ref": ref, 392 "name": cmd_name, 393 "duration_seconds": duration, 394 } 395 ) 396 return tasks 397 398 def collect_queue_counts(self) -> dict[str, int]: 399 """Snapshot per-command queue depths for status reporting.""" 400 with self._lock: 401 return { 402 cmd_name: len(queue) 403 for cmd_name, queue in self._queues.items() 404 if queue 405 } 406 407 408# Global task queue instance (initialized in main()) 409_task_queue: TaskQueue | None = None 410 411# Global supervisor callosum connection for event emissions 412_supervisor_callosum: CallosumConnection | None = None 413 414# Global reference to managed processes for restart control 415_managed_procs: list[ManagedProcess] = [] 416 417# Global reference to in-process Callosum server 418_callosum_server: CallosumServer | None = None 419_callosum_thread: threading.Thread | None = None 420 421# Restart request tracking for SIGKILL enforcement 422_restart_requests: dict[str, tuple[float, subprocess.Popen]] = {} 423 424# Track whether running in remote mode (upload-only, no local processing) 425_is_remote_mode: bool = False 426 427# State for daily processing (tracks day boundary for midnight dream trigger) 428_daily_state = { 429 "last_day": None, # Track which day we last processed 430} 431 432# Timeout before flushing stale segments (seconds) 433FLUSH_TIMEOUT = 3600 434 435# State for segment flush (close out dangling agent state after inactivity) 436_flush_state: dict = { 437 "last_segment_ts": 0.0, # Wall-clock time of last observe.observed event 438 "day": None, # Day of last observed segment 439 "segment": None, # Last observed segment key 440 "flushed": False, # Whether flush has already run for current segment 441} 442 443 444def _get_journal_path() -> Path: 445 return Path(get_journal()) 446 447 448class RestartPolicy: 449 """Track restart attempts and compute backoff delays.""" 450 451 _SCHEDULE = (0, 1, 5) 452 453 def __init__(self) -> None: 454 self.attempts = 0 455 self.last_start = 0.0 456 457 def record_start(self) -> None: 458 self.last_start = time.time() 459 460 def reset_attempts(self) -> None: 461 self.attempts = 0 462 463 def next_delay(self) -> int: 464 delay = self._SCHEDULE[min(self.attempts, len(self._SCHEDULE) - 1)] 465 self.attempts += 1 466 return delay 467 468 469_RESTART_POLICIES: dict[str, RestartPolicy] = {} 470 471 472def _get_restart_policy(name: str) -> RestartPolicy: 473 return _RESTART_POLICIES.setdefault(name, RestartPolicy()) 474 475 476@dataclass 477class ManagedProcess: 478 """Wrapper around RunnerManagedProcess for restart policy tracking.""" 479 480 process: subprocess.Popen 481 name: str 482 log_writer: DailyLogWriter 483 cmd: list[str] 484 restart: bool = False 485 shutdown_timeout: int = 15 486 threads: list[threading.Thread] = field(default_factory=list) 487 ref: str = "" 488 489 def cleanup(self) -> None: 490 for thread in self.threads: 491 thread.join(timeout=1) 492 self.log_writer.close() 493 494 495def _launch_process( 496 name: str, 497 cmd: list[str], 498 *, 499 restart: bool = False, 500 ref: str | None = None, 501) -> ManagedProcess: 502 # NOTE: All child processes should include -v for verbose logging by default. 503 # This ensures their output is captured in logs for debugging. 504 """Launch process with automatic output logging and restart policy tracking.""" 505 policy: RestartPolicy | None = None 506 if restart: 507 policy = _get_restart_policy(name) 508 509 # Generate ref if not provided 510 ref = ref if ref else str(now_ms()) 511 512 # Use unified runner to spawn process (share supervisor's callosum) 513 try: 514 managed = RunnerManagedProcess.spawn( 515 cmd, ref=ref, callosum=_supervisor_callosum 516 ) 517 except RuntimeError as exc: 518 logging.error(str(exc)) 519 raise 520 521 if policy: 522 policy.record_start() 523 524 # Emit started event 525 if _supervisor_callosum: 526 _supervisor_callosum.emit( 527 "supervisor", 528 "started", 529 service=name, 530 pid=managed.process.pid, 531 ref=managed.ref, 532 ) 533 534 # Wrap in ManagedProcess for restart tracking 535 return ManagedProcess( 536 process=managed.process, 537 name=name, 538 log_writer=managed.log_writer, 539 cmd=list(cmd), 540 restart=restart, 541 threads=managed._threads, 542 ref=managed.ref, 543 ) 544 545 546def _get_notifier() -> DesktopNotifier: 547 """Get or create the global desktop notifier instance.""" 548 global _notifier 549 if _notifier is None: 550 _notifier = DesktopNotifier(app_name="solstone Supervisor") 551 return _notifier 552 553 554async def send_notification(message: str, alert_key: tuple | None = None) -> None: 555 """Send a desktop notification with ``message``. 556 557 Args: 558 message: The notification message to display 559 alert_key: Optional key to track this notification for later clearing 560 """ 561 try: 562 notifier = _get_notifier() 563 notification_id = await notifier.send( 564 title="solstone Supervisor", 565 message=message, 566 urgency=Urgency.Critical, 567 ) 568 569 # Store notification ID if we have an alert key 570 if alert_key and notification_id: 571 _notification_ids[alert_key] = notification_id 572 logging.debug(f"Stored notification {notification_id} for key {alert_key}") 573 574 except Exception as exc: # pragma: no cover - system issues 575 logging.error("Failed to send notification: %s", exc) 576 577 578async def clear_notification(alert_key: tuple) -> None: 579 """Clear a notification by its alert key. 580 581 Args: 582 alert_key: The key used when the notification was sent 583 """ 584 if alert_key not in _notification_ids: 585 return 586 587 try: 588 notifier = _get_notifier() 589 notification_id = _notification_ids[alert_key] 590 await notifier.clear(notification_id) 591 del _notification_ids[alert_key] 592 logging.debug(f"Cleared notification for key {alert_key}") 593 594 except Exception as exc: # pragma: no cover - system issues 595 logging.error("Failed to clear notification: %s", exc) 596 597 598def _emit_queue_event(cmd_name: str, running_ref: str, queue: list) -> None: 599 """Emit supervisor.queue event with current queue state for a command. 600 601 This is the callback passed to TaskQueue for queue change notifications. 602 """ 603 if not _supervisor_callosum: 604 return 605 606 _supervisor_callosum.emit( 607 "supervisor", 608 "queue", 609 command=cmd_name, 610 running=running_ref, 611 queued=len(queue), 612 queue=queue, 613 ) 614 615 616def _handle_task_request(message: dict) -> None: 617 """Handle incoming task request from Callosum.""" 618 if message.get("tract") != "supervisor" or message.get("event") != "request": 619 return 620 621 cmd = message.get("cmd") 622 if not cmd: 623 logging.error(f"Invalid task request: missing cmd: {message}") 624 return 625 626 ref = message.get("ref") 627 day = message.get("day") 628 if _task_queue: 629 _task_queue.submit(cmd, ref, day=day) 630 631 632def _restart_service(service: str) -> bool: 633 """Send SIGINT to a managed service to trigger graceful restart. 634 635 Returns True if the service was found and running, False if not found 636 or already exited. 637 """ 638 for proc in _managed_procs: 639 if proc.name == service: 640 if proc.process.poll() is not None: 641 logging.debug( 642 f"Ignoring restart for {service}: already exited, awaiting auto-restart" 643 ) 644 return False 645 646 logging.info(f"Restart requested for {service}, sending SIGINT...") 647 648 if _supervisor_callosum: 649 _supervisor_callosum.emit( 650 "supervisor", 651 "restarting", 652 service=service, 653 pid=proc.process.pid, 654 ref=proc.ref, 655 ) 656 657 try: 658 proc.process.send_signal(signal.SIGINT) 659 _restart_requests[service] = (time.time(), proc.process) 660 except Exception as e: 661 logging.error(f"Failed to send SIGINT to {service}: {e}") 662 return True 663 664 logging.warning(f"Cannot restart {service}: not found in managed processes") 665 return False 666 667 668def _handle_supervisor_request(message: dict) -> None: 669 """Handle incoming supervisor control messages.""" 670 if message.get("tract") != "supervisor" or message.get("event") != "restart": 671 return 672 673 service = message.get("service") 674 if not service: 675 logging.error("Invalid restart request: missing service") 676 return 677 if service == "supervisor": 678 logging.debug("Ignoring restart request for supervisor itself") 679 return 680 681 _restart_service(service) 682 683 684def get_task_status(ref: str) -> dict: 685 """Get status of a task. 686 687 Args: 688 ref: Task correlation ID 689 690 Returns: 691 Dict with status info, or {"status": "not_found"} if task doesn't exist 692 """ 693 if _task_queue: 694 return _task_queue.get_status(ref) 695 return {"status": "not_found"} 696 697 698def collect_status(procs: list[ManagedProcess]) -> dict: 699 """Collect current supervisor status for broadcasting.""" 700 now = time.time() 701 702 # Running services 703 services = [] 704 running_names = set() 705 for proc in procs: 706 if proc.process.poll() is None: # Still running 707 policy = _get_restart_policy(proc.name) 708 uptime = int(now - policy.last_start) if policy.last_start else 0 709 services.append( 710 { 711 "name": proc.name, 712 "ref": proc.ref, 713 "pid": proc.process.pid, 714 "uptime_seconds": uptime, 715 } 716 ) 717 running_names.add(proc.name) 718 719 # Prepend supervisor itself 720 if _supervisor_ref and _supervisor_start: 721 services.insert( 722 0, 723 { 724 "name": "supervisor", 725 "ref": _supervisor_ref, 726 "pid": os.getpid(), 727 "uptime_seconds": int(now - _supervisor_start), 728 }, 729 ) 730 731 # Crashed services (in restart backoff) 732 crashed = [] 733 for name, policy in _RESTART_POLICIES.items(): 734 if name not in running_names and policy.attempts > 0: 735 crashed.append( 736 { 737 "name": name, 738 "restart_attempts": policy.attempts, 739 } 740 ) 741 742 # Running tasks 743 tasks = _task_queue.collect_task_status() if _task_queue else [] 744 queues = _task_queue.collect_queue_counts() if _task_queue else {} 745 746 # Scheduled tasks 747 schedules = scheduler.collect_status() 748 # Connected callosum clients 749 callosum_clients = _callosum_server.client_count() if _callosum_server else 0 750 751 return { 752 "services": services, 753 "crashed": crashed, 754 "tasks": tasks, 755 "queues": queues, 756 "stale_heartbeats": [], 757 "schedules": schedules, 758 "callosum_clients": callosum_clients, 759 } 760 761 762def start_sense() -> ManagedProcess: 763 """Launch sol sense with output logging.""" 764 return _launch_process("sense", ["sol", "sense", "-v"], restart=True) 765 766 767def start_sync(remote_url: str) -> ManagedProcess: 768 """Launch sol sync with output logging. 769 770 Args: 771 remote_url: Remote ingest URL for sync service 772 """ 773 managed = _launch_process( 774 "sync", 775 ["sol", "sync", "-v", "--remote", remote_url], 776 restart=True, 777 ) 778 # Sync shutdown can block while draining pending segments. 779 # Give it extra time so the supervisor does not cut it off early. 780 managed.shutdown_timeout = 90 781 return managed 782 783 784def start_callosum_in_process() -> CallosumServer: 785 """Start Callosum message bus server in-process. 786 787 Runs the server in a background thread and waits for socket to be ready. 788 789 Returns: 790 CallosumServer instance 791 """ 792 global _callosum_server, _callosum_thread 793 794 server = CallosumServer() 795 _callosum_server = server 796 797 # Pre-delete stale socket to avoid race condition where the ready check 798 # passes due to an old socket file before the server thread deletes it 799 socket_path = server.socket_path 800 socket_path.parent.mkdir(parents=True, exist_ok=True) 801 if socket_path.exists(): 802 socket_path.unlink() 803 804 # Start server in background thread (server.start() is blocking) 805 thread = threading.Thread(target=server.start, daemon=False, name="callosum-server") 806 thread.start() 807 _callosum_thread = thread 808 809 # Wait for socket to be ready (with timeout) 810 for _ in range(50): # Wait up to 500ms 811 if socket_path.exists(): 812 logging.info(f"Callosum server started on {socket_path}") 813 return server 814 time.sleep(0.01) 815 816 raise RuntimeError("Callosum server failed to create socket within 500ms") 817 818 819def stop_callosum_in_process() -> None: 820 """Stop the in-process Callosum server.""" 821 global _callosum_server, _callosum_thread 822 823 if _callosum_server: 824 logging.info("Stopping Callosum server...") 825 _callosum_server.stop() 826 827 if _callosum_thread: 828 _callosum_thread.join(timeout=5) 829 if _callosum_thread.is_alive(): 830 logging.warning("Callosum server thread did not stop cleanly") 831 832 _callosum_server = None 833 _callosum_thread = None 834 835 836def start_cortex_server() -> ManagedProcess: 837 """Launch the Cortex WebSocket API server.""" 838 cmd = ["sol", "cortex", "-v"] 839 return _launch_process("cortex", cmd, restart=True) 840 841 842def start_convey_server( 843 verbose: bool, debug: bool = False, port: int = 0 844) -> tuple[ManagedProcess, int]: 845 """Launch the Convey web application with optional verbose and debug logging. 846 847 Returns: 848 Tuple of (ManagedProcess, resolved_port) where resolved_port is the 849 actual port being used (auto-selected if port was 0). 850 """ 851 # Resolve port 0 to an available port before launching 852 resolved_port = port if port != 0 else find_available_port() 853 854 cmd = ["sol", "convey", "--port", str(resolved_port)] 855 if debug: 856 cmd.append("-d") 857 elif verbose: 858 cmd.append("-v") 859 return _launch_process("convey", cmd, restart=True), resolved_port 860 861 862def check_runner_exits(procs: list[ManagedProcess]) -> list[ManagedProcess]: 863 """Return managed processes that have exited.""" 864 865 exited: list[ManagedProcess] = [] 866 for managed in procs: 867 if managed.process.poll() is not None: 868 exited.append(managed) 869 return exited 870 871 872async def handle_runner_exits( 873 procs: list[ManagedProcess], 874 alert_mgr: AlertManager, 875) -> None: 876 """Check for and handle exited processes with restart policy.""" 877 exited = check_runner_exits(procs) 878 if not exited: 879 return 880 881 exited_names = [managed.name for managed in exited] 882 exit_key = ("runner_exit", tuple(sorted(exited_names))) 883 884 # Check if all exits are tempfail (session not ready) 885 all_tempfail = all(m.process.returncode == EXIT_TEMPFAIL for m in exited) 886 887 if all_tempfail: 888 logging.info("Runner waiting for session: %s", ", ".join(sorted(exited_names))) 889 else: 890 msg = f"Runner process exited: {', '.join(sorted(exited_names))}" 891 logging.error(msg) 892 await alert_mgr.alert_if_ready(exit_key, msg) 893 894 for managed in exited: 895 # Clear any pending restart request for this service 896 _restart_requests.pop(managed.name, None) 897 898 returncode = managed.process.returncode 899 is_tempfail = returncode == EXIT_TEMPFAIL 900 logging.info("%s exited with code %s", managed.name, returncode) 901 902 # Emit stopped event 903 if _supervisor_callosum: 904 _supervisor_callosum.emit( 905 "supervisor", 906 "stopped", 907 service=managed.name, 908 pid=managed.process.pid, 909 ref=managed.ref, 910 exit_code=returncode, 911 ) 912 913 # Remove from procs list 914 try: 915 procs.remove(managed) 916 except ValueError: 917 pass 918 919 managed.cleanup() 920 921 # Handle restart if needed 922 if managed.restart and not shutdown_requested: 923 # Tempfail: use fixed longer delay, don't burn through backoff 924 if is_tempfail: 925 delay = TEMPFAIL_DELAY 926 else: 927 policy = _get_restart_policy(managed.name) 928 uptime = time.time() - policy.last_start if policy.last_start else 0 929 if uptime >= 60: 930 policy.reset_attempts() 931 delay = policy.next_delay() 932 if delay: 933 logging.info("Waiting %ss before restarting %s", delay, managed.name) 934 for _ in range(delay): 935 if shutdown_requested: 936 break 937 await asyncio.sleep(1) 938 if shutdown_requested: 939 continue 940 logging.info("Restarting %s...", managed.name) 941 try: 942 new_proc = _launch_process( 943 managed.name, 944 managed.cmd, 945 restart=True, 946 ) 947 except Exception as exc: 948 logging.exception("Failed to restart %s: %s", managed.name, exc) 949 continue 950 951 procs.append(new_proc) 952 logging.info("Restarted %s after exit code %s", managed.name, returncode) 953 # Clear the notification now that process has restarted 954 await alert_mgr.clear(exit_key) 955 else: 956 logging.info("Not restarting %s", managed.name) 957 958 959def handle_daily_tasks() -> None: 960 """Check for day change and submit daily dream for updated days (non-blocking). 961 962 Triggers once when the day rolls over at midnight. Queries ``updated_days()`` 963 for journal days that have new stream data but haven't completed a daily 964 dream yet, then submits up to ``MAX_UPDATED_CATCHUP`` dreams in chronological 965 order (oldest first, yesterday last) via the TaskQueue. 966 967 Dream auto-detects updated state and enables ``--refresh`` internally, so we 968 don't pass it here. 969 970 Skipped in remote mode (no local data to process). 971 """ 972 # Remote mode: no local processing, data is on the server 973 if _is_remote_mode: 974 return 975 976 today = datetime.now().date() 977 978 # Only trigger when day actually changes (at midnight) 979 if today != _daily_state["last_day"]: 980 # The day that just ended is what we process 981 prev_day = _daily_state["last_day"] 982 983 # Guard against None (e.g., module reloaded without going through main()) 984 if prev_day is None: 985 logging.warning("Daily state not initialized, skipping daily processing") 986 _daily_state["last_day"] = today 987 return 988 989 prev_day_str = prev_day.strftime("%Y%m%d") 990 991 # Update state for new day 992 _daily_state["last_day"] = today 993 994 # Flush any dangling segment state from the previous day before daily dream 995 if not _flush_state["flushed"] and _flush_state["day"] == prev_day_str: 996 _check_segment_flush(force=True) 997 998 today_str = today.strftime("%Y%m%d") 999 all_updated = updated_days(exclude={today_str}) 1000 1001 if not all_updated: 1002 logging.info("Day changed to %s, no updated days to process", today) 1003 return 1004 1005 # Take the newest MAX_UPDATED_CATCHUP days (already sorted ascending) 1006 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:] 1007 skipped = len(all_updated) - len(days_to_process) 1008 1009 if skipped: 1010 logging.warning( 1011 "Skipping %d older updated days (max catchup %d): %s", 1012 skipped, 1013 MAX_UPDATED_CATCHUP, 1014 all_updated[:skipped], 1015 ) 1016 1017 logging.info( 1018 "Day changed to %s, queuing daily dream for %d updated day(s): %s", 1019 today, 1020 len(days_to_process), 1021 days_to_process, 1022 ) 1023 1024 # Submit oldest-first so yesterday is processed last 1025 for day_str in days_to_process: 1026 cmd = ["sol", "dream", "-v", "--day", day_str] 1027 if _task_queue: 1028 _task_queue.submit(cmd, day=day_str) 1029 logging.debug("Submitted daily dream for %s", day_str) 1030 else: 1031 logging.warning( 1032 "No task queue available for daily processing: %s", day_str 1033 ) 1034 1035 1036def _handle_segment_observed(message: dict) -> None: 1037 """Handle segment completion events (from live observation or imports). 1038 1039 Submits sol dream in segment mode via task queue, which handles both 1040 generators and segment agents. Also updates flush state to track 1041 segment recency. 1042 """ 1043 if message.get("tract") != "observe" or message.get("event") != "observed": 1044 return 1045 1046 segment = message.get("segment") # e.g., "163045_300" 1047 if not segment: 1048 logging.warning("observed event missing segment field") 1049 return 1050 1051 # Use day from event payload, fallback to today (for live observation) 1052 day = message.get("day") or datetime.now().strftime("%Y%m%d") 1053 stream = message.get("stream") 1054 1055 # Update flush state — new segment resets the flush timer 1056 _flush_state["last_segment_ts"] = time.time() 1057 _flush_state["day"] = day 1058 _flush_state["segment"] = segment 1059 _flush_state["stream"] = stream 1060 _flush_state["flushed"] = False 1061 1062 logging.info(f"Segment observed: {day}/{segment}, submitting processing...") 1063 1064 # Submit via task queue — serializes with other dream invocations 1065 cmd = ["sol", "dream", "-v", "--day", day, "--segment", segment] 1066 if stream: 1067 cmd.extend(["--stream", stream]) 1068 if _task_queue: 1069 _task_queue.submit(cmd, day=day) 1070 else: 1071 logging.warning( 1072 "No task queue available for segment processing: %s/%s", day, segment 1073 ) 1074 1075 1076def _check_segment_flush(force: bool = False) -> None: 1077 """Check if the last observed segment needs flushing. 1078 1079 If no new segments have arrived within FLUSH_TIMEOUT seconds, runs 1080 ``sol dream --flush`` on the last segment to let flush-enabled agents 1081 close out dangling state (e.g., end active activities). 1082 1083 Args: 1084 force: Skip timeout check (used at day boundary to flush 1085 before daily dream regardless of elapsed time). 1086 1087 Skipped in remote mode (no local processing). 1088 """ 1089 if _is_remote_mode: 1090 return 1091 1092 last_ts = _flush_state["last_segment_ts"] 1093 if not last_ts or _flush_state["flushed"]: 1094 return 1095 1096 if not force and time.time() - last_ts < FLUSH_TIMEOUT: 1097 return 1098 1099 day = _flush_state["day"] 1100 segment = _flush_state["segment"] 1101 if not day or not segment: 1102 return 1103 1104 _flush_state["flushed"] = True 1105 1106 stream = _flush_state.get("stream") 1107 cmd = ["sol", "dream", "-v", "--day", day, "--segment", segment, "--flush"] 1108 if stream: 1109 cmd.extend(["--stream", stream]) 1110 if _task_queue: 1111 _task_queue.submit(cmd, day=day) 1112 logging.info(f"Queued segment flush: {day}/{segment}") 1113 else: 1114 logging.warning( 1115 "No task queue available for segment flush: %s/%s", day, segment 1116 ) 1117 1118 1119def _handle_segment_event_log(message: dict) -> None: 1120 """Log observe, dream, and activity events with day+segment to segment/events.jsonl. 1121 1122 Any observe, dream, or activity tract message with both day and segment fields 1123 gets logged to journal/day/segment/events.jsonl if that directory exists. 1124 """ 1125 if message.get("tract") not in {"observe", "dream", "activity"}: 1126 return 1127 1128 day = message.get("day") 1129 segment = message.get("segment") 1130 1131 if not day or not segment: 1132 return 1133 1134 stream = message.get("stream") 1135 1136 try: 1137 journal_path = _get_journal_path() 1138 1139 if stream: 1140 segment_dir = journal_path / day / stream / segment 1141 else: 1142 segment_dir = journal_path / day / segment 1143 1144 # Only log if segment directory exists 1145 if not segment_dir.is_dir(): 1146 return 1147 1148 events_file = segment_dir / "events.jsonl" 1149 1150 # Append event as JSON line 1151 with open(events_file, "a", encoding="utf-8") as f: 1152 f.write(json.dumps(message, ensure_ascii=False) + "\n") 1153 1154 except Exception as e: 1155 logging.debug(f"Failed to log segment event: {e}") 1156 1157 1158def _handle_activity_recorded(message: dict) -> None: 1159 """Queue a per-activity dream task when an activity is recorded. 1160 1161 Listens for activity.recorded events and submits a queued dream task 1162 for per-activity agent processing (serialized via TaskQueue). 1163 """ 1164 if message.get("tract") != "activity" or message.get("event") != "recorded": 1165 return 1166 1167 record_id = message.get("id") 1168 facet = message.get("facet") 1169 day = message.get("day") 1170 1171 if not record_id or not facet or not day: 1172 logging.warning("activity.recorded event missing required fields") 1173 return 1174 1175 cmd = ["sol", "dream", "--activity", record_id, "--facet", facet, "--day", day] 1176 1177 if _task_queue: 1178 _task_queue.submit(cmd, day=day) 1179 logging.info(f"Queued activity dream: {record_id} for #{facet}") 1180 else: 1181 logging.warning("No task queue available for activity dream: %s", record_id) 1182 1183 1184def _handle_dream_daily_complete(message: dict) -> None: 1185 """Submit a heartbeat task after daily dream processing completes. 1186 1187 Listens for dream.daily_complete events. Skips if a heartbeat process 1188 is already running (PID file guard). 1189 """ 1190 if message.get("tract") != "dream" or message.get("event") != "daily_complete": 1191 return 1192 1193 # Check if heartbeat is already running via PID file 1194 pid_file = Path(get_journal()) / "health" / "heartbeat.pid" 1195 if pid_file.exists(): 1196 try: 1197 existing_pid = int(pid_file.read_text().strip()) 1198 os.kill(existing_pid, 0) 1199 logging.info("Heartbeat already running (pid=%d), skipping", existing_pid) 1200 return 1201 except ProcessLookupError: 1202 pass # Stale PID file, proceed 1203 except PermissionError: 1204 logging.info( 1205 "Heartbeat running under different user (pid file exists), skipping" 1206 ) 1207 return 1208 except ValueError: 1209 pass # Corrupt PID file, proceed 1210 1211 cmd = ["sol", "heartbeat"] 1212 if _task_queue: 1213 _task_queue.submit(cmd) 1214 logging.info("Queued heartbeat after daily dream completion") 1215 else: 1216 logging.warning("No task queue available for heartbeat submission") 1217 1218 1219def _handle_callosum_message(message: dict) -> None: 1220 """Dispatch incoming Callosum messages to appropriate handlers.""" 1221 _handle_task_request(message) 1222 _handle_supervisor_request(message) 1223 _handle_segment_observed(message) 1224 _handle_activity_recorded(message) 1225 _handle_dream_daily_complete(message) 1226 _handle_segment_event_log(message) 1227 1228 1229async def supervise( 1230 *, 1231 daily: bool = True, 1232 schedule: bool = True, 1233 procs: list[ManagedProcess] | None = None, 1234) -> None: 1235 """Main supervision loop. Runs at 1-second intervals for responsiveness. 1236 1237 Monitors runner health, emits status, triggers daily processing, 1238 and checks scheduled agents. 1239 """ 1240 alert_mgr = AlertManager() 1241 last_status_emit = 0.0 1242 1243 try: 1244 while ( 1245 not shutdown_requested 1246 ): # pragma: no cover - loop checked via unit tests by patching 1247 # Check for restart timeouts (enforce SIGKILL after 15s) 1248 for service, (start_time, proc) in list(_restart_requests.items()): 1249 if proc.poll() is not None: # Already exited 1250 _restart_requests.pop(service, None) 1251 elif time.time() - start_time > 15: 1252 logging.warning( 1253 f"{service} did not exit within 15s after SIGINT, sending SIGKILL" 1254 ) 1255 try: 1256 proc.kill() 1257 except Exception as e: 1258 logging.error(f"Failed to kill {service}: {e}") 1259 # Don't delete here - let handle_runner_exits clean up 1260 1261 # Check for runner exits first (immediate alert) 1262 if procs: 1263 await handle_runner_exits(procs, alert_mgr) 1264 1265 # Emit status every 5 seconds 1266 now = time.time() 1267 if now - last_status_emit >= 5: 1268 if _supervisor_callosum and procs: 1269 try: 1270 status = collect_status(procs) 1271 _supervisor_callosum.emit("supervisor", "status", **status) 1272 except Exception as e: 1273 logging.debug(f"Status emission failed: {e}") 1274 last_status_emit = now 1275 1276 # Check for segment flush (non-blocking, submits via task queue) 1277 _check_segment_flush() 1278 1279 # Check for daily processing (non-blocking, submits via task queue) 1280 if daily: 1281 handle_daily_tasks() 1282 1283 # Check periodic task schedules (non-blocking, submits via callosum) 1284 if schedule: 1285 scheduler.check() 1286 routines.check() 1287 1288 # Sleep 1 second before next iteration (responsive to shutdown) 1289 await asyncio.sleep(1) 1290 finally: 1291 pass # Callosum cleanup happens in main() 1292 1293 1294def parse_args() -> argparse.ArgumentParser: 1295 parser = argparse.ArgumentParser(description="Monitor journaling health") 1296 parser.add_argument( 1297 "port", 1298 nargs="?", 1299 type=int, 1300 default=0, 1301 help="Convey port (0 = auto-select available port)", 1302 ) 1303 parser.add_argument( 1304 "--threshold", 1305 type=int, 1306 default=DEFAULT_THRESHOLD, 1307 help="Seconds before heartbeat considered stale", 1308 ) 1309 parser.add_argument( 1310 "--interval", type=int, default=CHECK_INTERVAL, help="Polling interval seconds" 1311 ) 1312 parser.add_argument( 1313 "--no-daily", 1314 action="store_true", 1315 help="Disable daily processing run at midnight", 1316 ) 1317 parser.add_argument( 1318 "--no-cortex", 1319 action="store_true", 1320 help="Do not start the Cortex server (run it manually for debugging)", 1321 ) 1322 parser.add_argument( 1323 "--no-convey", 1324 action="store_true", 1325 help="Do not start the Convey web application", 1326 ) 1327 parser.add_argument( 1328 "--no-schedule", 1329 action="store_true", 1330 help="Disable periodic task scheduler", 1331 ) 1332 parser.add_argument( 1333 "--remote", 1334 type=str, 1335 help="Remote mode: sync to server URL instead of local processing", 1336 ) 1337 return parser 1338 1339 1340def handle_shutdown(signum, frame): 1341 """Handle shutdown signals gracefully.""" 1342 global shutdown_requested 1343 if not shutdown_requested: # Only log once 1344 shutdown_requested = True 1345 logging.info("Shutdown requested, cleaning up...") 1346 raise KeyboardInterrupt 1347 1348 1349def main() -> None: 1350 parser = parse_args() 1351 1352 # Capture journal info BEFORE setup_cli() loads .env and pollutes os.environ 1353 journal_info = get_journal_info() 1354 1355 args = setup_cli(parser) 1356 1357 journal_path = _get_journal_path() 1358 1359 log_level = logging.DEBUG if args.debug else logging.INFO 1360 log_path = journal_path / "health" / "supervisor.log" 1361 log_path.parent.mkdir(parents=True, exist_ok=True) 1362 logging.getLogger().handlers = [] 1363 logging.basicConfig( 1364 level=log_level, 1365 handlers=[logging.FileHandler(log_path, encoding="utf-8")], 1366 format="%(asctime)s [supervisor:log] %(levelname)s %(message)s", 1367 datefmt="%Y-%m-%dT%H:%M:%S", 1368 ) 1369 1370 if args.verbose or args.debug: 1371 console_handler = logging.StreamHandler() 1372 console_handler.setLevel(log_level) 1373 console_handler.setFormatter( 1374 logging.Formatter("%(asctime)s %(levelname)s %(message)s") 1375 ) 1376 logging.getLogger().addHandler(console_handler) 1377 1378 # Singleton guard: only one supervisor per journal 1379 health_dir = journal_path / "health" 1380 lock_path = health_dir / "supervisor.lock" 1381 pid_path = health_dir / "supervisor.pid" 1382 import fcntl 1383 1384 lock_fd = open(lock_path, "w") 1385 try: 1386 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB) 1387 except OSError: 1388 lock_fd.close() 1389 pid_str = "" 1390 try: 1391 pid_str = pid_path.read_text().strip() 1392 except OSError: 1393 pass 1394 pid_msg = f" (PID {pid_str})" if pid_str else "" 1395 sock_path = health_dir / "callosum.sock" 1396 if sock_path.exists(): 1397 try: 1398 from think.health_cli import health_check 1399 1400 print(f"Supervisor already running{pid_msg}\n") 1401 health_check() 1402 except Exception: 1403 print(f"Supervisor already running{pid_msg}") 1404 else: 1405 print(f"Supervisor already running{pid_msg}") 1406 sys.exit(1) 1407 pid_path.write_text(str(os.getpid())) 1408 logging.info("Singleton lock acquired (PID %d)", os.getpid()) 1409 1410 # Set up signal handlers 1411 signal.signal(signal.SIGINT, handle_shutdown) 1412 signal.signal(signal.SIGTERM, handle_shutdown) 1413 1414 # Show journal path and source on startup 1415 path, source = journal_info 1416 print(f"Journal: {path} (from {source})") 1417 logging.info("Supervisor starting...") 1418 1419 global _managed_procs, _supervisor_callosum, _is_remote_mode 1420 global _task_queue 1421 procs: list[ManagedProcess] = [] 1422 convey_port = None 1423 1424 # Remote mode: run sync instead of local processing 1425 _is_remote_mode = bool(args.remote) 1426 1427 # Start Callosum in-process first - it's the message bus that other services depend on 1428 try: 1429 start_callosum_in_process() 1430 except RuntimeError as e: 1431 logging.error(f"Failed to start Callosum server: {e}") 1432 parser.error(f"Failed to start Callosum server: {e}") 1433 return 1434 1435 # Connect supervisor's Callosum client to capture startup events from other services 1436 try: 1437 _supervisor_callosum = CallosumConnection(defaults={"rev": get_rev()}) 1438 _supervisor_callosum.start(callback=_handle_callosum_message) 1439 logging.info("Supervisor connected to Callosum") 1440 except Exception as e: 1441 logging.warning(f"Failed to start Callosum connection: {e}") 1442 1443 # Mirror supervisor log output to callosum logs tract (best-effort) 1444 supervisor_ref = str(now_ms()) 1445 global _supervisor_ref, _supervisor_start 1446 _supervisor_ref = supervisor_ref 1447 _supervisor_start = time.time() 1448 if _supervisor_callosum: 1449 try: 1450 handler = CallosumLogHandler(_supervisor_callosum, supervisor_ref) 1451 handler.setFormatter( 1452 logging.Formatter("%(asctime)s %(levelname)s %(message)s") 1453 ) 1454 logging.getLogger().addHandler(handler) 1455 except Exception: 1456 pass 1457 1458 # Initialize task queue with callosum event callback 1459 _task_queue = TaskQueue(on_queue_change=_emit_queue_event) 1460 1461 # Now start other services (their startup events will be captured) 1462 if _is_remote_mode: 1463 # Remote mode: verify remote server is reachable before starting sync 1464 logging.info("Remote mode: checking server connectivity...") 1465 success, message = check_remote_health(args.remote) 1466 if not success: 1467 logging.error(f"Remote health check failed: {message}") 1468 stop_callosum_in_process() 1469 parser.error(f"Remote server not available: {message}") 1470 logging.info(f"Remote server verified: {message}") 1471 procs.append(start_sync(args.remote)) 1472 else: 1473 # Local mode: convey first, then sense for file processing 1474 if not args.no_convey: 1475 proc, convey_port = start_convey_server( 1476 verbose=args.verbose, debug=args.debug, port=args.port 1477 ) 1478 procs.append(proc) 1479 # Sense handles file processing 1480 procs.append(start_sense()) 1481 # Cortex for agent execution 1482 if not args.no_cortex: 1483 procs.append(start_cortex_server()) 1484 1485 # Make procs accessible to restart handler 1486 _managed_procs = procs 1487 1488 # Initialize daily state to today - dream only triggers at midnight when day changes 1489 _daily_state["last_day"] = datetime.now().date() 1490 1491 # Initialize periodic task scheduler 1492 schedule_enabled = not args.no_schedule and not _is_remote_mode 1493 if schedule_enabled and _supervisor_callosum: 1494 scheduler.init(_supervisor_callosum) 1495 scheduler.register_defaults() 1496 routines.init(_supervisor_callosum) 1497 1498 # Show Convey URL if running 1499 if convey_port: 1500 print(f"Convey: http://localhost:{convey_port}/") 1501 1502 logging.info(f"Started {len(procs)} processes, entering supervision loop") 1503 daily_enabled = not args.no_daily and not _is_remote_mode 1504 if daily_enabled: 1505 logging.info("Daily processing scheduled for midnight") 1506 1507 # Startup catchup: submit dreams for days with pending stream data 1508 if daily_enabled: 1509 all_updated = updated_days() 1510 if all_updated: 1511 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:] 1512 skipped = len(all_updated) - len(days_to_process) 1513 1514 if skipped: 1515 logging.warning( 1516 "Startup catchup: skipping %d older updated days (max %d): %s", 1517 skipped, 1518 MAX_UPDATED_CATCHUP, 1519 all_updated[:skipped], 1520 ) 1521 1522 logging.info( 1523 "Startup catchup: submitted %d day(s) with pending stream data: %s", 1524 len(days_to_process), 1525 days_to_process, 1526 ) 1527 1528 for day_str in days_to_process: 1529 cmd = ["sol", "dream", "-v", "--day", day_str] 1530 if _task_queue: 1531 _task_queue.submit(cmd, day=day_str) 1532 logging.debug("Startup catchup: submitted dream for %s", day_str) 1533 else: 1534 logging.warning( 1535 "No task queue available for startup catchup: %s", day_str 1536 ) 1537 1538 try: 1539 asyncio.run( 1540 supervise( 1541 daily=daily_enabled, 1542 schedule=schedule_enabled, 1543 procs=procs if procs else None, 1544 ) 1545 ) 1546 except KeyboardInterrupt: 1547 logging.info("Caught KeyboardInterrupt, shutting down...") 1548 finally: 1549 logging.info("Stopping all processes...") 1550 print("\nShutting down gracefully (this may take a moment)...", flush=True) 1551 1552 def _stop_process(managed: ManagedProcess) -> None: 1553 name = managed.name 1554 proc = managed.process 1555 logging.info(f"Stopping {name}...") 1556 print(f" Stopping {name}...", end="", flush=True) 1557 try: 1558 proc.terminate() 1559 except Exception: 1560 pass 1561 try: 1562 timeout = getattr(managed, "shutdown_timeout", 15) 1563 proc.wait(timeout=timeout) 1564 print(" done", flush=True) 1565 except subprocess.TimeoutExpired: 1566 logging.warning(f"{name} did not terminate gracefully, killing...") 1567 print(" timeout, forcing kill...", flush=True) 1568 try: 1569 proc.kill() 1570 proc.wait(timeout=1) 1571 except Exception: 1572 pass 1573 managed.cleanup() 1574 1575 # Stop services in reverse order 1576 for managed in reversed(procs): 1577 _stop_process(managed) 1578 1579 # Save scheduler state before disconnecting 1580 if schedule_enabled and scheduler._state: 1581 try: 1582 scheduler.save_state() 1583 except Exception as exc: 1584 logging.warning("Failed to save scheduler state on shutdown: %s", exc) 1585 1586 if schedule_enabled: 1587 try: 1588 routines.save_state() 1589 except Exception as exc: 1590 logging.warning("Failed to save routines state on shutdown: %s", exc) 1591 1592 # Disconnect supervisor's Callosum connection 1593 if _supervisor_callosum: 1594 _supervisor_callosum.stop() 1595 logging.info("Supervisor disconnected from Callosum") 1596 1597 # Stop in-process Callosum server last 1598 stop_callosum_in_process() 1599 1600 logging.info("Supervisor shutdown complete.") 1601 print("Shutdown complete.", flush=True) 1602 1603 1604if __name__ == "__main__": 1605 main()