personal memory agent
1# SPDX-License-Identifier: AGPL-3.0-only
2# Copyright (c) 2026 sol pbc
3
4from __future__ import annotations
5
6import argparse
7import asyncio
8import json
9import logging
10import os
11import signal
12import subprocess
13import sys
14import threading
15import time
16from dataclasses import dataclass, field
17from datetime import datetime
18from pathlib import Path
19
20from desktop_notifier import DesktopNotifier, Urgency
21
22from observe.sync import check_remote_health
23from think import routines, scheduler
24from think.callosum import CallosumConnection, CallosumServer
25from think.runner import DailyLogWriter
26from think.runner import ManagedProcess as RunnerManagedProcess
27from think.utils import (
28 find_available_port,
29 get_journal,
30 get_journal_info,
31 get_rev,
32 now_ms,
33 setup_cli,
34 updated_days,
35)
36
37DEFAULT_THRESHOLD = 60
38CHECK_INTERVAL = 30
39MAX_UPDATED_CATCHUP = 4
40EXIT_TEMPFAIL = 75 # EX_TEMPFAIL: service prerequisites not ready
41TEMPFAIL_DELAY = 15 # seconds to wait before retrying a tempfail exit
42
43# Global shutdown flag
44shutdown_requested = False
45# Supervisor identity (set in main() once ref is assigned)
46_supervisor_ref: str | None = None
47_supervisor_start: float | None = None
48
49
50class CallosumLogHandler(logging.Handler):
51 """Logging handler that emits log records as callosum ``logs`` tract events.
52
53 Silently drops events on any error — callosum mirroring is best-effort.
54 """
55
56 def __init__(self, conn: CallosumConnection, ref: str):
57 super().__init__()
58 self._conn = conn
59 self._ref = ref
60 self._pid = os.getpid()
61 self._emitting = False
62
63 def emit(self, record: logging.LogRecord) -> None:
64 if self._emitting:
65 return
66 self._emitting = True
67 try:
68 self._conn.emit(
69 "logs",
70 "line",
71 ref=self._ref,
72 name="supervisor",
73 pid=self._pid,
74 stream="log",
75 line=self.format(record),
76 )
77 except Exception:
78 pass
79 finally:
80 self._emitting = False
81
82
83# Desktop notification system
84_notifier: DesktopNotifier | None = None
85_notification_ids: dict[tuple, str] = {} # Maps alert_key -> notification_id
86
87
88class AlertManager:
89 """Manages alerts with exponential backoff and notification clearing."""
90
91 def __init__(self, initial_backoff: int = 60, max_backoff: int = 3600):
92 self._state: dict[tuple, tuple[float, int]] = {} # {key: (last_time, backoff)}
93 self._initial_backoff = initial_backoff
94 self._max_backoff = max_backoff
95
96 async def alert_if_ready(self, key: tuple, message: str) -> bool:
97 """Send alert with exponential backoff. Returns True if sent."""
98 now = time.time()
99
100 if key in self._state:
101 last_time, backoff = self._state[key]
102 if now - last_time >= backoff:
103 await send_notification(message, alert_key=key)
104 new_backoff = min(backoff * 2, self._max_backoff)
105 self._state[key] = (now, new_backoff)
106 logging.info(f"Alert sent, next backoff: {new_backoff}s")
107 return True
108 else:
109 remaining = int(backoff - (now - last_time))
110 logging.info(f"Suppressing alert, next in {remaining}s")
111 return False
112 else:
113 await send_notification(message, alert_key=key)
114 self._state[key] = (now, self._initial_backoff)
115 return True
116
117 async def clear(self, key: tuple) -> None:
118 """Clear alert state and notification."""
119 if key in self._state:
120 del self._state[key]
121 await clear_notification(key)
122
123 def clear_matching(self, predicate) -> None:
124 """Clear alert states matching predicate."""
125 self._state = {k: v for k, v in self._state.items() if not predicate(k, v)}
126
127
128class TaskQueue:
129 """Manages on-demand task execution with per-command serialization.
130
131 Tasks are serialized by command name - only one task per command runs at a time.
132 Additional requests for the same command are queued (deduped by exact cmd match).
133 Multiple callers requesting the same work have their refs coalesced so all get
134 notified when the task completes.
135
136 The lock only protects state mutations, never held during I/O operations.
137 """
138
139 def __init__(self, on_queue_change: callable = None):
140 """Initialize task queue.
141
142 Args:
143 on_queue_change: Optional callback(cmd_name, running_ref, queue_entries)
144 called after queue state changes. Called outside lock.
145 """
146 self._running: dict[str, str] = {} # command_name -> ref of running task
147 self._queues: dict[str, list] = {} # command_name -> list of {refs, cmd} dicts
148 self._active: dict[str, RunnerManagedProcess] = {} # ref -> process
149 self._lock = threading.Lock()
150 self._on_queue_change = on_queue_change
151
152 @staticmethod
153 def get_command_name(cmd: list[str]) -> str:
154 """Extract command name from cmd array for queue serialization.
155
156 For 'sol X' commands, returns X. Otherwise returns cmd[0] basename.
157 """
158 if cmd and cmd[0] == "sol" and len(cmd) > 1:
159 return cmd[1]
160 return Path(cmd[0]).name if cmd else "unknown"
161
162 def _notify_queue_change(self, cmd_name: str) -> None:
163 """Notify listener of queue state change (called outside lock)."""
164 if not self._on_queue_change:
165 return
166
167 with self._lock:
168 queue = list(self._queues.get(cmd_name, []))
169 running_ref = self._running.get(cmd_name)
170
171 self._on_queue_change(cmd_name, running_ref, queue)
172
173 def submit(
174 self,
175 cmd: list[str],
176 ref: str | None = None,
177 day: str | None = None,
178 ) -> str | None:
179 """Submit a task for execution.
180
181 If no task of this command type is running, starts immediately.
182 Otherwise queues (deduped by exact cmd match, refs coalesced).
183
184 Args:
185 cmd: Command to execute
186 ref: Optional caller-provided ref for tracking
187 day: Optional day override (YYYYMMDD) for log placement
188
189 Returns:
190 ref if task was started/queued, None if already tracked (no change)
191 """
192 ref = ref or str(now_ms())
193 cmd_name = self.get_command_name(cmd)
194
195 should_notify = False
196 should_start = False
197
198 with self._lock:
199 if cmd_name in self._running:
200 # Command already running - queue or coalesce
201 queue = self._queues.setdefault(cmd_name, [])
202 existing = next((q for q in queue if q["cmd"] == cmd), None)
203 if existing:
204 if ref not in existing["refs"]:
205 existing["refs"].append(ref)
206 logging.info(
207 f"Added ref {ref} to queued task {cmd_name} "
208 f"(refs: {len(existing['refs'])})"
209 )
210 should_notify = True
211 else:
212 logging.debug(f"Ref already tracked for queued task: {ref}")
213 return None
214 else:
215 queue.append({"refs": [ref], "cmd": cmd, "day": day})
216 logging.info(
217 f"Queued task {cmd_name}: {' '.join(cmd)} ref={ref} "
218 f"(queue: {len(queue)})"
219 )
220 should_notify = True
221 else:
222 # Not running - mark as running and start
223 self._running[cmd_name] = ref
224 should_start = True
225
226 # Notify outside lock
227 if should_notify:
228 self._notify_queue_change(cmd_name)
229 return ref
230
231 # Start task outside lock
232 if should_start:
233 threading.Thread(
234 target=self._run_task,
235 args=([ref], cmd, cmd_name, day),
236 daemon=True,
237 ).start()
238 return ref
239
240 return None
241
242 def _run_task(
243 self,
244 refs: list[str],
245 cmd: list[str],
246 cmd_name: str,
247 day: str | None = None,
248 ) -> None:
249 """Execute a task and handle completion.
250
251 Args:
252 refs: List of refs to notify on completion
253 cmd: Command to execute
254 cmd_name: Command name for queue management
255 day: Optional day override (YYYYMMDD) for log placement
256 """
257 callosum = CallosumConnection()
258 managed = None
259 primary_ref = refs[0]
260 service = cmd_name
261
262 try:
263 callosum.start()
264 logging.info(f"Starting task {primary_ref}: {' '.join(cmd)}")
265
266 managed = RunnerManagedProcess.spawn(
267 cmd, ref=primary_ref, callosum=callosum, day=day
268 )
269 self._active[primary_ref] = managed
270
271 callosum.emit(
272 "supervisor",
273 "started",
274 service=service,
275 pid=managed.pid,
276 ref=primary_ref,
277 )
278
279 exit_code = managed.wait()
280
281 for ref in refs:
282 callosum.emit(
283 "supervisor",
284 "stopped",
285 service=service,
286 pid=managed.pid,
287 ref=ref,
288 exit_code=exit_code,
289 )
290
291 if exit_code == 0:
292 logging.info(f"Task {cmd_name} ({primary_ref}) finished successfully")
293 else:
294 logging.warning(
295 f"Task {cmd_name} ({primary_ref}) failed with exit code {exit_code}"
296 )
297
298 except Exception as e:
299 logging.exception(
300 f"Task {cmd_name} ({primary_ref}) encountered exception: {e}"
301 )
302 for ref in refs:
303 callosum.emit(
304 "supervisor",
305 "stopped",
306 service=service,
307 pid=managed.pid if managed else 0,
308 ref=ref,
309 exit_code=-1,
310 )
311 finally:
312 if managed:
313 managed.cleanup()
314 self._active.pop(primary_ref, None)
315 callosum.stop()
316 self._process_next(cmd_name)
317
318 def _process_next(self, cmd_name: str) -> None:
319 """Process next queued task after completion."""
320 next_cmd = None
321 refs = None
322 day = None
323
324 with self._lock:
325 queue = self._queues.get(cmd_name, [])
326 if queue:
327 entry = queue.pop(0)
328 refs = entry["refs"]
329 next_cmd = entry["cmd"]
330 day = entry.get("day")
331 self._running[cmd_name] = refs[0]
332 logging.info(
333 f"Dequeued task {cmd_name}: {' '.join(next_cmd)} refs={refs} "
334 f"(remaining: {len(queue)})"
335 )
336 else:
337 self._running.pop(cmd_name, None)
338
339 # Notify and spawn outside lock
340 self._notify_queue_change(cmd_name)
341 if next_cmd:
342 threading.Thread(
343 target=self._run_task,
344 args=(refs, next_cmd, cmd_name, day),
345 daemon=True,
346 ).start()
347
348 def cancel(self, ref: str) -> bool:
349 """Cancel a running task.
350
351 Returns:
352 True if task was found and terminated, False otherwise
353 """
354 if ref not in self._active:
355 logging.warning(f"Cannot cancel task {ref}: not found")
356 return False
357
358 managed = self._active[ref]
359 if not managed.is_running():
360 logging.debug(f"Task {ref} already finished")
361 return False
362
363 logging.info(f"Cancelling task {ref}...")
364 managed.terminate()
365 return True
366
367 def get_status(self, ref: str) -> dict:
368 """Get status of a task."""
369 if ref not in self._active:
370 return {"status": "not_found"}
371
372 managed = self._active[ref]
373 return {
374 "status": "running" if managed.is_running() else "finished",
375 "pid": managed.pid,
376 "returncode": managed.returncode,
377 "log_path": str(managed.log_writer.path),
378 "cmd": managed.cmd,
379 }
380
381 def collect_task_status(self) -> list[dict]:
382 """Collect status of all running tasks for supervisor status."""
383 now = time.time()
384 tasks = []
385 for ref, managed in self._active.items():
386 if managed.is_running():
387 duration = int(now - managed.start_time)
388 cmd_name = TaskQueue.get_command_name(managed.cmd)
389 tasks.append(
390 {
391 "ref": ref,
392 "name": cmd_name,
393 "duration_seconds": duration,
394 }
395 )
396 return tasks
397
398 def collect_queue_counts(self) -> dict[str, int]:
399 """Snapshot per-command queue depths for status reporting."""
400 with self._lock:
401 return {
402 cmd_name: len(queue)
403 for cmd_name, queue in self._queues.items()
404 if queue
405 }
406
407
408# Global task queue instance (initialized in main())
409_task_queue: TaskQueue | None = None
410
411# Global supervisor callosum connection for event emissions
412_supervisor_callosum: CallosumConnection | None = None
413
414# Global reference to managed processes for restart control
415_managed_procs: list[ManagedProcess] = []
416
417# Global reference to in-process Callosum server
418_callosum_server: CallosumServer | None = None
419_callosum_thread: threading.Thread | None = None
420
421# Restart request tracking for SIGKILL enforcement
422_restart_requests: dict[str, tuple[float, subprocess.Popen]] = {}
423
424# Track whether running in remote mode (upload-only, no local processing)
425_is_remote_mode: bool = False
426
427# State for daily processing (tracks day boundary for midnight dream trigger)
428_daily_state = {
429 "last_day": None, # Track which day we last processed
430}
431
432# Timeout before flushing stale segments (seconds)
433FLUSH_TIMEOUT = 3600
434
435# State for segment flush (close out dangling agent state after inactivity)
436_flush_state: dict = {
437 "last_segment_ts": 0.0, # Wall-clock time of last observe.observed event
438 "day": None, # Day of last observed segment
439 "segment": None, # Last observed segment key
440 "flushed": False, # Whether flush has already run for current segment
441}
442
443
444def _get_journal_path() -> Path:
445 return Path(get_journal())
446
447
448class RestartPolicy:
449 """Track restart attempts and compute backoff delays."""
450
451 _SCHEDULE = (0, 1, 5)
452
453 def __init__(self) -> None:
454 self.attempts = 0
455 self.last_start = 0.0
456
457 def record_start(self) -> None:
458 self.last_start = time.time()
459
460 def reset_attempts(self) -> None:
461 self.attempts = 0
462
463 def next_delay(self) -> int:
464 delay = self._SCHEDULE[min(self.attempts, len(self._SCHEDULE) - 1)]
465 self.attempts += 1
466 return delay
467
468
469_RESTART_POLICIES: dict[str, RestartPolicy] = {}
470
471
472def _get_restart_policy(name: str) -> RestartPolicy:
473 return _RESTART_POLICIES.setdefault(name, RestartPolicy())
474
475
476@dataclass
477class ManagedProcess:
478 """Wrapper around RunnerManagedProcess for restart policy tracking."""
479
480 process: subprocess.Popen
481 name: str
482 log_writer: DailyLogWriter
483 cmd: list[str]
484 restart: bool = False
485 shutdown_timeout: int = 15
486 threads: list[threading.Thread] = field(default_factory=list)
487 ref: str = ""
488
489 def cleanup(self) -> None:
490 for thread in self.threads:
491 thread.join(timeout=1)
492 self.log_writer.close()
493
494
495def _launch_process(
496 name: str,
497 cmd: list[str],
498 *,
499 restart: bool = False,
500 ref: str | None = None,
501) -> ManagedProcess:
502 # NOTE: All child processes should include -v for verbose logging by default.
503 # This ensures their output is captured in logs for debugging.
504 """Launch process with automatic output logging and restart policy tracking."""
505 policy: RestartPolicy | None = None
506 if restart:
507 policy = _get_restart_policy(name)
508
509 # Generate ref if not provided
510 ref = ref if ref else str(now_ms())
511
512 # Use unified runner to spawn process (share supervisor's callosum)
513 try:
514 managed = RunnerManagedProcess.spawn(
515 cmd, ref=ref, callosum=_supervisor_callosum
516 )
517 except RuntimeError as exc:
518 logging.error(str(exc))
519 raise
520
521 if policy:
522 policy.record_start()
523
524 # Emit started event
525 if _supervisor_callosum:
526 _supervisor_callosum.emit(
527 "supervisor",
528 "started",
529 service=name,
530 pid=managed.process.pid,
531 ref=managed.ref,
532 )
533
534 # Wrap in ManagedProcess for restart tracking
535 return ManagedProcess(
536 process=managed.process,
537 name=name,
538 log_writer=managed.log_writer,
539 cmd=list(cmd),
540 restart=restart,
541 threads=managed._threads,
542 ref=managed.ref,
543 )
544
545
546def _get_notifier() -> DesktopNotifier:
547 """Get or create the global desktop notifier instance."""
548 global _notifier
549 if _notifier is None:
550 _notifier = DesktopNotifier(app_name="solstone Supervisor")
551 return _notifier
552
553
554async def send_notification(message: str, alert_key: tuple | None = None) -> None:
555 """Send a desktop notification with ``message``.
556
557 Args:
558 message: The notification message to display
559 alert_key: Optional key to track this notification for later clearing
560 """
561 try:
562 notifier = _get_notifier()
563 notification_id = await notifier.send(
564 title="solstone Supervisor",
565 message=message,
566 urgency=Urgency.Critical,
567 )
568
569 # Store notification ID if we have an alert key
570 if alert_key and notification_id:
571 _notification_ids[alert_key] = notification_id
572 logging.debug(f"Stored notification {notification_id} for key {alert_key}")
573
574 except Exception as exc: # pragma: no cover - system issues
575 logging.error("Failed to send notification: %s", exc)
576
577
578async def clear_notification(alert_key: tuple) -> None:
579 """Clear a notification by its alert key.
580
581 Args:
582 alert_key: The key used when the notification was sent
583 """
584 if alert_key not in _notification_ids:
585 return
586
587 try:
588 notifier = _get_notifier()
589 notification_id = _notification_ids[alert_key]
590 await notifier.clear(notification_id)
591 del _notification_ids[alert_key]
592 logging.debug(f"Cleared notification for key {alert_key}")
593
594 except Exception as exc: # pragma: no cover - system issues
595 logging.error("Failed to clear notification: %s", exc)
596
597
598def _emit_queue_event(cmd_name: str, running_ref: str, queue: list) -> None:
599 """Emit supervisor.queue event with current queue state for a command.
600
601 This is the callback passed to TaskQueue for queue change notifications.
602 """
603 if not _supervisor_callosum:
604 return
605
606 _supervisor_callosum.emit(
607 "supervisor",
608 "queue",
609 command=cmd_name,
610 running=running_ref,
611 queued=len(queue),
612 queue=queue,
613 )
614
615
616def _handle_task_request(message: dict) -> None:
617 """Handle incoming task request from Callosum."""
618 if message.get("tract") != "supervisor" or message.get("event") != "request":
619 return
620
621 cmd = message.get("cmd")
622 if not cmd:
623 logging.error(f"Invalid task request: missing cmd: {message}")
624 return
625
626 ref = message.get("ref")
627 day = message.get("day")
628 if _task_queue:
629 _task_queue.submit(cmd, ref, day=day)
630
631
632def _restart_service(service: str) -> bool:
633 """Send SIGINT to a managed service to trigger graceful restart.
634
635 Returns True if the service was found and running, False if not found
636 or already exited.
637 """
638 for proc in _managed_procs:
639 if proc.name == service:
640 if proc.process.poll() is not None:
641 logging.debug(
642 f"Ignoring restart for {service}: already exited, awaiting auto-restart"
643 )
644 return False
645
646 logging.info(f"Restart requested for {service}, sending SIGINT...")
647
648 if _supervisor_callosum:
649 _supervisor_callosum.emit(
650 "supervisor",
651 "restarting",
652 service=service,
653 pid=proc.process.pid,
654 ref=proc.ref,
655 )
656
657 try:
658 proc.process.send_signal(signal.SIGINT)
659 _restart_requests[service] = (time.time(), proc.process)
660 except Exception as e:
661 logging.error(f"Failed to send SIGINT to {service}: {e}")
662 return True
663
664 logging.warning(f"Cannot restart {service}: not found in managed processes")
665 return False
666
667
668def _handle_supervisor_request(message: dict) -> None:
669 """Handle incoming supervisor control messages."""
670 if message.get("tract") != "supervisor" or message.get("event") != "restart":
671 return
672
673 service = message.get("service")
674 if not service:
675 logging.error("Invalid restart request: missing service")
676 return
677 if service == "supervisor":
678 logging.debug("Ignoring restart request for supervisor itself")
679 return
680
681 _restart_service(service)
682
683
684def get_task_status(ref: str) -> dict:
685 """Get status of a task.
686
687 Args:
688 ref: Task correlation ID
689
690 Returns:
691 Dict with status info, or {"status": "not_found"} if task doesn't exist
692 """
693 if _task_queue:
694 return _task_queue.get_status(ref)
695 return {"status": "not_found"}
696
697
698def collect_status(procs: list[ManagedProcess]) -> dict:
699 """Collect current supervisor status for broadcasting."""
700 now = time.time()
701
702 # Running services
703 services = []
704 running_names = set()
705 for proc in procs:
706 if proc.process.poll() is None: # Still running
707 policy = _get_restart_policy(proc.name)
708 uptime = int(now - policy.last_start) if policy.last_start else 0
709 services.append(
710 {
711 "name": proc.name,
712 "ref": proc.ref,
713 "pid": proc.process.pid,
714 "uptime_seconds": uptime,
715 }
716 )
717 running_names.add(proc.name)
718
719 # Prepend supervisor itself
720 if _supervisor_ref and _supervisor_start:
721 services.insert(
722 0,
723 {
724 "name": "supervisor",
725 "ref": _supervisor_ref,
726 "pid": os.getpid(),
727 "uptime_seconds": int(now - _supervisor_start),
728 },
729 )
730
731 # Crashed services (in restart backoff)
732 crashed = []
733 for name, policy in _RESTART_POLICIES.items():
734 if name not in running_names and policy.attempts > 0:
735 crashed.append(
736 {
737 "name": name,
738 "restart_attempts": policy.attempts,
739 }
740 )
741
742 # Running tasks
743 tasks = _task_queue.collect_task_status() if _task_queue else []
744 queues = _task_queue.collect_queue_counts() if _task_queue else {}
745
746 # Scheduled tasks
747 schedules = scheduler.collect_status()
748 # Connected callosum clients
749 callosum_clients = _callosum_server.client_count() if _callosum_server else 0
750
751 return {
752 "services": services,
753 "crashed": crashed,
754 "tasks": tasks,
755 "queues": queues,
756 "stale_heartbeats": [],
757 "schedules": schedules,
758 "callosum_clients": callosum_clients,
759 }
760
761
762def start_sense() -> ManagedProcess:
763 """Launch sol sense with output logging."""
764 return _launch_process("sense", ["sol", "sense", "-v"], restart=True)
765
766
767def start_sync(remote_url: str) -> ManagedProcess:
768 """Launch sol sync with output logging.
769
770 Args:
771 remote_url: Remote ingest URL for sync service
772 """
773 managed = _launch_process(
774 "sync",
775 ["sol", "sync", "-v", "--remote", remote_url],
776 restart=True,
777 )
778 # Sync shutdown can block while draining pending segments.
779 # Give it extra time so the supervisor does not cut it off early.
780 managed.shutdown_timeout = 90
781 return managed
782
783
784def start_callosum_in_process() -> CallosumServer:
785 """Start Callosum message bus server in-process.
786
787 Runs the server in a background thread and waits for socket to be ready.
788
789 Returns:
790 CallosumServer instance
791 """
792 global _callosum_server, _callosum_thread
793
794 server = CallosumServer()
795 _callosum_server = server
796
797 # Pre-delete stale socket to avoid race condition where the ready check
798 # passes due to an old socket file before the server thread deletes it
799 socket_path = server.socket_path
800 socket_path.parent.mkdir(parents=True, exist_ok=True)
801 if socket_path.exists():
802 socket_path.unlink()
803
804 # Start server in background thread (server.start() is blocking)
805 thread = threading.Thread(target=server.start, daemon=False, name="callosum-server")
806 thread.start()
807 _callosum_thread = thread
808
809 # Wait for socket to be ready (with timeout)
810 for _ in range(50): # Wait up to 500ms
811 if socket_path.exists():
812 logging.info(f"Callosum server started on {socket_path}")
813 return server
814 time.sleep(0.01)
815
816 raise RuntimeError("Callosum server failed to create socket within 500ms")
817
818
819def stop_callosum_in_process() -> None:
820 """Stop the in-process Callosum server."""
821 global _callosum_server, _callosum_thread
822
823 if _callosum_server:
824 logging.info("Stopping Callosum server...")
825 _callosum_server.stop()
826
827 if _callosum_thread:
828 _callosum_thread.join(timeout=5)
829 if _callosum_thread.is_alive():
830 logging.warning("Callosum server thread did not stop cleanly")
831
832 _callosum_server = None
833 _callosum_thread = None
834
835
836def start_cortex_server() -> ManagedProcess:
837 """Launch the Cortex WebSocket API server."""
838 cmd = ["sol", "cortex", "-v"]
839 return _launch_process("cortex", cmd, restart=True)
840
841
842def start_convey_server(
843 verbose: bool, debug: bool = False, port: int = 0
844) -> tuple[ManagedProcess, int]:
845 """Launch the Convey web application with optional verbose and debug logging.
846
847 Returns:
848 Tuple of (ManagedProcess, resolved_port) where resolved_port is the
849 actual port being used (auto-selected if port was 0).
850 """
851 # Resolve port 0 to an available port before launching
852 resolved_port = port if port != 0 else find_available_port()
853
854 cmd = ["sol", "convey", "--port", str(resolved_port)]
855 if debug:
856 cmd.append("-d")
857 elif verbose:
858 cmd.append("-v")
859 return _launch_process("convey", cmd, restart=True), resolved_port
860
861
862def check_runner_exits(procs: list[ManagedProcess]) -> list[ManagedProcess]:
863 """Return managed processes that have exited."""
864
865 exited: list[ManagedProcess] = []
866 for managed in procs:
867 if managed.process.poll() is not None:
868 exited.append(managed)
869 return exited
870
871
872async def handle_runner_exits(
873 procs: list[ManagedProcess],
874 alert_mgr: AlertManager,
875) -> None:
876 """Check for and handle exited processes with restart policy."""
877 exited = check_runner_exits(procs)
878 if not exited:
879 return
880
881 exited_names = [managed.name for managed in exited]
882 exit_key = ("runner_exit", tuple(sorted(exited_names)))
883
884 # Check if all exits are tempfail (session not ready)
885 all_tempfail = all(m.process.returncode == EXIT_TEMPFAIL for m in exited)
886
887 if all_tempfail:
888 logging.info("Runner waiting for session: %s", ", ".join(sorted(exited_names)))
889 else:
890 msg = f"Runner process exited: {', '.join(sorted(exited_names))}"
891 logging.error(msg)
892 await alert_mgr.alert_if_ready(exit_key, msg)
893
894 for managed in exited:
895 # Clear any pending restart request for this service
896 _restart_requests.pop(managed.name, None)
897
898 returncode = managed.process.returncode
899 is_tempfail = returncode == EXIT_TEMPFAIL
900 logging.info("%s exited with code %s", managed.name, returncode)
901
902 # Emit stopped event
903 if _supervisor_callosum:
904 _supervisor_callosum.emit(
905 "supervisor",
906 "stopped",
907 service=managed.name,
908 pid=managed.process.pid,
909 ref=managed.ref,
910 exit_code=returncode,
911 )
912
913 # Remove from procs list
914 try:
915 procs.remove(managed)
916 except ValueError:
917 pass
918
919 managed.cleanup()
920
921 # Handle restart if needed
922 if managed.restart and not shutdown_requested:
923 # Tempfail: use fixed longer delay, don't burn through backoff
924 if is_tempfail:
925 delay = TEMPFAIL_DELAY
926 else:
927 policy = _get_restart_policy(managed.name)
928 uptime = time.time() - policy.last_start if policy.last_start else 0
929 if uptime >= 60:
930 policy.reset_attempts()
931 delay = policy.next_delay()
932 if delay:
933 logging.info("Waiting %ss before restarting %s", delay, managed.name)
934 for _ in range(delay):
935 if shutdown_requested:
936 break
937 await asyncio.sleep(1)
938 if shutdown_requested:
939 continue
940 logging.info("Restarting %s...", managed.name)
941 try:
942 new_proc = _launch_process(
943 managed.name,
944 managed.cmd,
945 restart=True,
946 )
947 except Exception as exc:
948 logging.exception("Failed to restart %s: %s", managed.name, exc)
949 continue
950
951 procs.append(new_proc)
952 logging.info("Restarted %s after exit code %s", managed.name, returncode)
953 # Clear the notification now that process has restarted
954 await alert_mgr.clear(exit_key)
955 else:
956 logging.info("Not restarting %s", managed.name)
957
958
959def handle_daily_tasks() -> None:
960 """Check for day change and submit daily dream for updated days (non-blocking).
961
962 Triggers once when the day rolls over at midnight. Queries ``updated_days()``
963 for journal days that have new stream data but haven't completed a daily
964 dream yet, then submits up to ``MAX_UPDATED_CATCHUP`` dreams in chronological
965 order (oldest first, yesterday last) via the TaskQueue.
966
967 Dream auto-detects updated state and enables ``--refresh`` internally, so we
968 don't pass it here.
969
970 Skipped in remote mode (no local data to process).
971 """
972 # Remote mode: no local processing, data is on the server
973 if _is_remote_mode:
974 return
975
976 today = datetime.now().date()
977
978 # Only trigger when day actually changes (at midnight)
979 if today != _daily_state["last_day"]:
980 # The day that just ended is what we process
981 prev_day = _daily_state["last_day"]
982
983 # Guard against None (e.g., module reloaded without going through main())
984 if prev_day is None:
985 logging.warning("Daily state not initialized, skipping daily processing")
986 _daily_state["last_day"] = today
987 return
988
989 prev_day_str = prev_day.strftime("%Y%m%d")
990
991 # Update state for new day
992 _daily_state["last_day"] = today
993
994 # Flush any dangling segment state from the previous day before daily dream
995 if not _flush_state["flushed"] and _flush_state["day"] == prev_day_str:
996 _check_segment_flush(force=True)
997
998 today_str = today.strftime("%Y%m%d")
999 all_updated = updated_days(exclude={today_str})
1000
1001 if not all_updated:
1002 logging.info("Day changed to %s, no updated days to process", today)
1003 return
1004
1005 # Take the newest MAX_UPDATED_CATCHUP days (already sorted ascending)
1006 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:]
1007 skipped = len(all_updated) - len(days_to_process)
1008
1009 if skipped:
1010 logging.warning(
1011 "Skipping %d older updated days (max catchup %d): %s",
1012 skipped,
1013 MAX_UPDATED_CATCHUP,
1014 all_updated[:skipped],
1015 )
1016
1017 logging.info(
1018 "Day changed to %s, queuing daily dream for %d updated day(s): %s",
1019 today,
1020 len(days_to_process),
1021 days_to_process,
1022 )
1023
1024 # Submit oldest-first so yesterday is processed last
1025 for day_str in days_to_process:
1026 cmd = ["sol", "dream", "-v", "--day", day_str]
1027 if _task_queue:
1028 _task_queue.submit(cmd, day=day_str)
1029 logging.debug("Submitted daily dream for %s", day_str)
1030 else:
1031 logging.warning(
1032 "No task queue available for daily processing: %s", day_str
1033 )
1034
1035
1036def _handle_segment_observed(message: dict) -> None:
1037 """Handle segment completion events (from live observation or imports).
1038
1039 Submits sol dream in segment mode via task queue, which handles both
1040 generators and segment agents. Also updates flush state to track
1041 segment recency.
1042 """
1043 if message.get("tract") != "observe" or message.get("event") != "observed":
1044 return
1045
1046 segment = message.get("segment") # e.g., "163045_300"
1047 if not segment:
1048 logging.warning("observed event missing segment field")
1049 return
1050
1051 # Use day from event payload, fallback to today (for live observation)
1052 day = message.get("day") or datetime.now().strftime("%Y%m%d")
1053 stream = message.get("stream")
1054
1055 # Update flush state — new segment resets the flush timer
1056 _flush_state["last_segment_ts"] = time.time()
1057 _flush_state["day"] = day
1058 _flush_state["segment"] = segment
1059 _flush_state["stream"] = stream
1060 _flush_state["flushed"] = False
1061
1062 logging.info(f"Segment observed: {day}/{segment}, submitting processing...")
1063
1064 # Submit via task queue — serializes with other dream invocations
1065 cmd = ["sol", "dream", "-v", "--day", day, "--segment", segment]
1066 if stream:
1067 cmd.extend(["--stream", stream])
1068 if _task_queue:
1069 _task_queue.submit(cmd, day=day)
1070 else:
1071 logging.warning(
1072 "No task queue available for segment processing: %s/%s", day, segment
1073 )
1074
1075
1076def _check_segment_flush(force: bool = False) -> None:
1077 """Check if the last observed segment needs flushing.
1078
1079 If no new segments have arrived within FLUSH_TIMEOUT seconds, runs
1080 ``sol dream --flush`` on the last segment to let flush-enabled agents
1081 close out dangling state (e.g., end active activities).
1082
1083 Args:
1084 force: Skip timeout check (used at day boundary to flush
1085 before daily dream regardless of elapsed time).
1086
1087 Skipped in remote mode (no local processing).
1088 """
1089 if _is_remote_mode:
1090 return
1091
1092 last_ts = _flush_state["last_segment_ts"]
1093 if not last_ts or _flush_state["flushed"]:
1094 return
1095
1096 if not force and time.time() - last_ts < FLUSH_TIMEOUT:
1097 return
1098
1099 day = _flush_state["day"]
1100 segment = _flush_state["segment"]
1101 if not day or not segment:
1102 return
1103
1104 _flush_state["flushed"] = True
1105
1106 stream = _flush_state.get("stream")
1107 cmd = ["sol", "dream", "-v", "--day", day, "--segment", segment, "--flush"]
1108 if stream:
1109 cmd.extend(["--stream", stream])
1110 if _task_queue:
1111 _task_queue.submit(cmd, day=day)
1112 logging.info(f"Queued segment flush: {day}/{segment}")
1113 else:
1114 logging.warning(
1115 "No task queue available for segment flush: %s/%s", day, segment
1116 )
1117
1118
1119def _handle_segment_event_log(message: dict) -> None:
1120 """Log observe, dream, and activity events with day+segment to segment/events.jsonl.
1121
1122 Any observe, dream, or activity tract message with both day and segment fields
1123 gets logged to journal/day/segment/events.jsonl if that directory exists.
1124 """
1125 if message.get("tract") not in {"observe", "dream", "activity"}:
1126 return
1127
1128 day = message.get("day")
1129 segment = message.get("segment")
1130
1131 if not day or not segment:
1132 return
1133
1134 stream = message.get("stream")
1135
1136 try:
1137 journal_path = _get_journal_path()
1138
1139 if stream:
1140 segment_dir = journal_path / day / stream / segment
1141 else:
1142 segment_dir = journal_path / day / segment
1143
1144 # Only log if segment directory exists
1145 if not segment_dir.is_dir():
1146 return
1147
1148 events_file = segment_dir / "events.jsonl"
1149
1150 # Append event as JSON line
1151 with open(events_file, "a", encoding="utf-8") as f:
1152 f.write(json.dumps(message, ensure_ascii=False) + "\n")
1153
1154 except Exception as e:
1155 logging.debug(f"Failed to log segment event: {e}")
1156
1157
1158def _handle_activity_recorded(message: dict) -> None:
1159 """Queue a per-activity dream task when an activity is recorded.
1160
1161 Listens for activity.recorded events and submits a queued dream task
1162 for per-activity agent processing (serialized via TaskQueue).
1163 """
1164 if message.get("tract") != "activity" or message.get("event") != "recorded":
1165 return
1166
1167 record_id = message.get("id")
1168 facet = message.get("facet")
1169 day = message.get("day")
1170
1171 if not record_id or not facet or not day:
1172 logging.warning("activity.recorded event missing required fields")
1173 return
1174
1175 cmd = ["sol", "dream", "--activity", record_id, "--facet", facet, "--day", day]
1176
1177 if _task_queue:
1178 _task_queue.submit(cmd, day=day)
1179 logging.info(f"Queued activity dream: {record_id} for #{facet}")
1180 else:
1181 logging.warning("No task queue available for activity dream: %s", record_id)
1182
1183
1184def _handle_dream_daily_complete(message: dict) -> None:
1185 """Submit a heartbeat task after daily dream processing completes.
1186
1187 Listens for dream.daily_complete events. Skips if a heartbeat process
1188 is already running (PID file guard).
1189 """
1190 if message.get("tract") != "dream" or message.get("event") != "daily_complete":
1191 return
1192
1193 # Check if heartbeat is already running via PID file
1194 pid_file = Path(get_journal()) / "health" / "heartbeat.pid"
1195 if pid_file.exists():
1196 try:
1197 existing_pid = int(pid_file.read_text().strip())
1198 os.kill(existing_pid, 0)
1199 logging.info("Heartbeat already running (pid=%d), skipping", existing_pid)
1200 return
1201 except ProcessLookupError:
1202 pass # Stale PID file, proceed
1203 except PermissionError:
1204 logging.info(
1205 "Heartbeat running under different user (pid file exists), skipping"
1206 )
1207 return
1208 except ValueError:
1209 pass # Corrupt PID file, proceed
1210
1211 cmd = ["sol", "heartbeat"]
1212 if _task_queue:
1213 _task_queue.submit(cmd)
1214 logging.info("Queued heartbeat after daily dream completion")
1215 else:
1216 logging.warning("No task queue available for heartbeat submission")
1217
1218
1219def _handle_callosum_message(message: dict) -> None:
1220 """Dispatch incoming Callosum messages to appropriate handlers."""
1221 _handle_task_request(message)
1222 _handle_supervisor_request(message)
1223 _handle_segment_observed(message)
1224 _handle_activity_recorded(message)
1225 _handle_dream_daily_complete(message)
1226 _handle_segment_event_log(message)
1227
1228
1229async def supervise(
1230 *,
1231 daily: bool = True,
1232 schedule: bool = True,
1233 procs: list[ManagedProcess] | None = None,
1234) -> None:
1235 """Main supervision loop. Runs at 1-second intervals for responsiveness.
1236
1237 Monitors runner health, emits status, triggers daily processing,
1238 and checks scheduled agents.
1239 """
1240 alert_mgr = AlertManager()
1241 last_status_emit = 0.0
1242
1243 try:
1244 while (
1245 not shutdown_requested
1246 ): # pragma: no cover - loop checked via unit tests by patching
1247 # Check for restart timeouts (enforce SIGKILL after 15s)
1248 for service, (start_time, proc) in list(_restart_requests.items()):
1249 if proc.poll() is not None: # Already exited
1250 _restart_requests.pop(service, None)
1251 elif time.time() - start_time > 15:
1252 logging.warning(
1253 f"{service} did not exit within 15s after SIGINT, sending SIGKILL"
1254 )
1255 try:
1256 proc.kill()
1257 except Exception as e:
1258 logging.error(f"Failed to kill {service}: {e}")
1259 # Don't delete here - let handle_runner_exits clean up
1260
1261 # Check for runner exits first (immediate alert)
1262 if procs:
1263 await handle_runner_exits(procs, alert_mgr)
1264
1265 # Emit status every 5 seconds
1266 now = time.time()
1267 if now - last_status_emit >= 5:
1268 if _supervisor_callosum and procs:
1269 try:
1270 status = collect_status(procs)
1271 _supervisor_callosum.emit("supervisor", "status", **status)
1272 except Exception as e:
1273 logging.debug(f"Status emission failed: {e}")
1274 last_status_emit = now
1275
1276 # Check for segment flush (non-blocking, submits via task queue)
1277 _check_segment_flush()
1278
1279 # Check for daily processing (non-blocking, submits via task queue)
1280 if daily:
1281 handle_daily_tasks()
1282
1283 # Check periodic task schedules (non-blocking, submits via callosum)
1284 if schedule:
1285 scheduler.check()
1286 routines.check()
1287
1288 # Sleep 1 second before next iteration (responsive to shutdown)
1289 await asyncio.sleep(1)
1290 finally:
1291 pass # Callosum cleanup happens in main()
1292
1293
1294def parse_args() -> argparse.ArgumentParser:
1295 parser = argparse.ArgumentParser(description="Monitor journaling health")
1296 parser.add_argument(
1297 "port",
1298 nargs="?",
1299 type=int,
1300 default=0,
1301 help="Convey port (0 = auto-select available port)",
1302 )
1303 parser.add_argument(
1304 "--threshold",
1305 type=int,
1306 default=DEFAULT_THRESHOLD,
1307 help="Seconds before heartbeat considered stale",
1308 )
1309 parser.add_argument(
1310 "--interval", type=int, default=CHECK_INTERVAL, help="Polling interval seconds"
1311 )
1312 parser.add_argument(
1313 "--no-daily",
1314 action="store_true",
1315 help="Disable daily processing run at midnight",
1316 )
1317 parser.add_argument(
1318 "--no-cortex",
1319 action="store_true",
1320 help="Do not start the Cortex server (run it manually for debugging)",
1321 )
1322 parser.add_argument(
1323 "--no-convey",
1324 action="store_true",
1325 help="Do not start the Convey web application",
1326 )
1327 parser.add_argument(
1328 "--no-schedule",
1329 action="store_true",
1330 help="Disable periodic task scheduler",
1331 )
1332 parser.add_argument(
1333 "--remote",
1334 type=str,
1335 help="Remote mode: sync to server URL instead of local processing",
1336 )
1337 return parser
1338
1339
1340def handle_shutdown(signum, frame):
1341 """Handle shutdown signals gracefully."""
1342 global shutdown_requested
1343 if not shutdown_requested: # Only log once
1344 shutdown_requested = True
1345 logging.info("Shutdown requested, cleaning up...")
1346 raise KeyboardInterrupt
1347
1348
1349def main() -> None:
1350 parser = parse_args()
1351
1352 # Capture journal info BEFORE setup_cli() loads .env and pollutes os.environ
1353 journal_info = get_journal_info()
1354
1355 args = setup_cli(parser)
1356
1357 journal_path = _get_journal_path()
1358
1359 log_level = logging.DEBUG if args.debug else logging.INFO
1360 log_path = journal_path / "health" / "supervisor.log"
1361 log_path.parent.mkdir(parents=True, exist_ok=True)
1362 logging.getLogger().handlers = []
1363 logging.basicConfig(
1364 level=log_level,
1365 handlers=[logging.FileHandler(log_path, encoding="utf-8")],
1366 format="%(asctime)s [supervisor:log] %(levelname)s %(message)s",
1367 datefmt="%Y-%m-%dT%H:%M:%S",
1368 )
1369
1370 if args.verbose or args.debug:
1371 console_handler = logging.StreamHandler()
1372 console_handler.setLevel(log_level)
1373 console_handler.setFormatter(
1374 logging.Formatter("%(asctime)s %(levelname)s %(message)s")
1375 )
1376 logging.getLogger().addHandler(console_handler)
1377
1378 # Singleton guard: only one supervisor per journal
1379 health_dir = journal_path / "health"
1380 lock_path = health_dir / "supervisor.lock"
1381 pid_path = health_dir / "supervisor.pid"
1382 import fcntl
1383
1384 lock_fd = open(lock_path, "w")
1385 try:
1386 fcntl.flock(lock_fd, fcntl.LOCK_EX | fcntl.LOCK_NB)
1387 except OSError:
1388 lock_fd.close()
1389 pid_str = ""
1390 try:
1391 pid_str = pid_path.read_text().strip()
1392 except OSError:
1393 pass
1394 pid_msg = f" (PID {pid_str})" if pid_str else ""
1395 sock_path = health_dir / "callosum.sock"
1396 if sock_path.exists():
1397 try:
1398 from think.health_cli import health_check
1399
1400 print(f"Supervisor already running{pid_msg}\n")
1401 health_check()
1402 except Exception:
1403 print(f"Supervisor already running{pid_msg}")
1404 else:
1405 print(f"Supervisor already running{pid_msg}")
1406 sys.exit(1)
1407 pid_path.write_text(str(os.getpid()))
1408 logging.info("Singleton lock acquired (PID %d)", os.getpid())
1409
1410 # Set up signal handlers
1411 signal.signal(signal.SIGINT, handle_shutdown)
1412 signal.signal(signal.SIGTERM, handle_shutdown)
1413
1414 # Show journal path and source on startup
1415 path, source = journal_info
1416 print(f"Journal: {path} (from {source})")
1417 logging.info("Supervisor starting...")
1418
1419 global _managed_procs, _supervisor_callosum, _is_remote_mode
1420 global _task_queue
1421 procs: list[ManagedProcess] = []
1422 convey_port = None
1423
1424 # Remote mode: run sync instead of local processing
1425 _is_remote_mode = bool(args.remote)
1426
1427 # Start Callosum in-process first - it's the message bus that other services depend on
1428 try:
1429 start_callosum_in_process()
1430 except RuntimeError as e:
1431 logging.error(f"Failed to start Callosum server: {e}")
1432 parser.error(f"Failed to start Callosum server: {e}")
1433 return
1434
1435 # Connect supervisor's Callosum client to capture startup events from other services
1436 try:
1437 _supervisor_callosum = CallosumConnection(defaults={"rev": get_rev()})
1438 _supervisor_callosum.start(callback=_handle_callosum_message)
1439 logging.info("Supervisor connected to Callosum")
1440 except Exception as e:
1441 logging.warning(f"Failed to start Callosum connection: {e}")
1442
1443 # Mirror supervisor log output to callosum logs tract (best-effort)
1444 supervisor_ref = str(now_ms())
1445 global _supervisor_ref, _supervisor_start
1446 _supervisor_ref = supervisor_ref
1447 _supervisor_start = time.time()
1448 if _supervisor_callosum:
1449 try:
1450 handler = CallosumLogHandler(_supervisor_callosum, supervisor_ref)
1451 handler.setFormatter(
1452 logging.Formatter("%(asctime)s %(levelname)s %(message)s")
1453 )
1454 logging.getLogger().addHandler(handler)
1455 except Exception:
1456 pass
1457
1458 # Initialize task queue with callosum event callback
1459 _task_queue = TaskQueue(on_queue_change=_emit_queue_event)
1460
1461 # Now start other services (their startup events will be captured)
1462 if _is_remote_mode:
1463 # Remote mode: verify remote server is reachable before starting sync
1464 logging.info("Remote mode: checking server connectivity...")
1465 success, message = check_remote_health(args.remote)
1466 if not success:
1467 logging.error(f"Remote health check failed: {message}")
1468 stop_callosum_in_process()
1469 parser.error(f"Remote server not available: {message}")
1470 logging.info(f"Remote server verified: {message}")
1471 procs.append(start_sync(args.remote))
1472 else:
1473 # Local mode: convey first, then sense for file processing
1474 if not args.no_convey:
1475 proc, convey_port = start_convey_server(
1476 verbose=args.verbose, debug=args.debug, port=args.port
1477 )
1478 procs.append(proc)
1479 # Sense handles file processing
1480 procs.append(start_sense())
1481 # Cortex for agent execution
1482 if not args.no_cortex:
1483 procs.append(start_cortex_server())
1484
1485 # Make procs accessible to restart handler
1486 _managed_procs = procs
1487
1488 # Initialize daily state to today - dream only triggers at midnight when day changes
1489 _daily_state["last_day"] = datetime.now().date()
1490
1491 # Initialize periodic task scheduler
1492 schedule_enabled = not args.no_schedule and not _is_remote_mode
1493 if schedule_enabled and _supervisor_callosum:
1494 scheduler.init(_supervisor_callosum)
1495 scheduler.register_defaults()
1496 routines.init(_supervisor_callosum)
1497
1498 # Show Convey URL if running
1499 if convey_port:
1500 print(f"Convey: http://localhost:{convey_port}/")
1501
1502 logging.info(f"Started {len(procs)} processes, entering supervision loop")
1503 daily_enabled = not args.no_daily and not _is_remote_mode
1504 if daily_enabled:
1505 logging.info("Daily processing scheduled for midnight")
1506
1507 # Startup catchup: submit dreams for days with pending stream data
1508 if daily_enabled:
1509 all_updated = updated_days()
1510 if all_updated:
1511 days_to_process = all_updated[-MAX_UPDATED_CATCHUP:]
1512 skipped = len(all_updated) - len(days_to_process)
1513
1514 if skipped:
1515 logging.warning(
1516 "Startup catchup: skipping %d older updated days (max %d): %s",
1517 skipped,
1518 MAX_UPDATED_CATCHUP,
1519 all_updated[:skipped],
1520 )
1521
1522 logging.info(
1523 "Startup catchup: submitted %d day(s) with pending stream data: %s",
1524 len(days_to_process),
1525 days_to_process,
1526 )
1527
1528 for day_str in days_to_process:
1529 cmd = ["sol", "dream", "-v", "--day", day_str]
1530 if _task_queue:
1531 _task_queue.submit(cmd, day=day_str)
1532 logging.debug("Startup catchup: submitted dream for %s", day_str)
1533 else:
1534 logging.warning(
1535 "No task queue available for startup catchup: %s", day_str
1536 )
1537
1538 try:
1539 asyncio.run(
1540 supervise(
1541 daily=daily_enabled,
1542 schedule=schedule_enabled,
1543 procs=procs if procs else None,
1544 )
1545 )
1546 except KeyboardInterrupt:
1547 logging.info("Caught KeyboardInterrupt, shutting down...")
1548 finally:
1549 logging.info("Stopping all processes...")
1550 print("\nShutting down gracefully (this may take a moment)...", flush=True)
1551
1552 def _stop_process(managed: ManagedProcess) -> None:
1553 name = managed.name
1554 proc = managed.process
1555 logging.info(f"Stopping {name}...")
1556 print(f" Stopping {name}...", end="", flush=True)
1557 try:
1558 proc.terminate()
1559 except Exception:
1560 pass
1561 try:
1562 timeout = getattr(managed, "shutdown_timeout", 15)
1563 proc.wait(timeout=timeout)
1564 print(" done", flush=True)
1565 except subprocess.TimeoutExpired:
1566 logging.warning(f"{name} did not terminate gracefully, killing...")
1567 print(" timeout, forcing kill...", flush=True)
1568 try:
1569 proc.kill()
1570 proc.wait(timeout=1)
1571 except Exception:
1572 pass
1573 managed.cleanup()
1574
1575 # Stop services in reverse order
1576 for managed in reversed(procs):
1577 _stop_process(managed)
1578
1579 # Save scheduler state before disconnecting
1580 if schedule_enabled and scheduler._state:
1581 try:
1582 scheduler.save_state()
1583 except Exception as exc:
1584 logging.warning("Failed to save scheduler state on shutdown: %s", exc)
1585
1586 if schedule_enabled:
1587 try:
1588 routines.save_state()
1589 except Exception as exc:
1590 logging.warning("Failed to save routines state on shutdown: %s", exc)
1591
1592 # Disconnect supervisor's Callosum connection
1593 if _supervisor_callosum:
1594 _supervisor_callosum.stop()
1595 logging.info("Supervisor disconnected from Callosum")
1596
1597 # Stop in-process Callosum server last
1598 stop_callosum_in_process()
1599
1600 logging.info("Supervisor shutdown complete.")
1601 print("Shutdown complete.", flush=True)
1602
1603
1604if __name__ == "__main__":
1605 main()