"""RouterWatchdog — periodic health monitoring. Ported from net.i2p.router.tasks.RouterWatchdog. Checks job queue liveness and transport health at regular intervals. After MAX_CONSECUTIVE_ERRORS failures, logs a diagnostic dump. """ from __future__ import annotations import asyncio import logging import time logger = logging.getLogger(__name__) class RouterWatchdog: """Monitors router health and detects hung states.""" CHECK_INTERVAL_SECONDS = 60 MAX_CONSECUTIVE_ERRORS = 20 def __init__(self, router) -> None: self._router = router self._consecutive_errors = 0 self._running = False self._last_check: float = 0 @property def consecutive_errors(self) -> int: return self._consecutive_errors @property def is_running(self) -> bool: return self._running async def run(self) -> None: """Main watchdog loop. Runs until stopped.""" self._running = True logger.info("RouterWatchdog started") while self._running: await asyncio.sleep(self.CHECK_INTERVAL_SECONDS) if not self._running: break ok = True if not self._verify_job_queue_liveness(): ok = False if not self._verify_transport_liveness(): ok = False if ok: self._consecutive_errors = 0 else: self._consecutive_errors += 1 if self._consecutive_errors >= self.MAX_CONSECUTIVE_ERRORS: logger.error( "RouterWatchdog: %d consecutive failures, dumping status", self._consecutive_errors, ) logger.error(self.dump_status()) self._last_check = time.monotonic() def stop(self) -> None: """Stop the watchdog loop.""" self._running = False def _verify_job_queue_liveness(self) -> bool: """Check that the job queue has processed something recently.""" ctx = getattr(self._router, '_context', None) if ctx is None: return False job_queue = getattr(ctx, 'job_queue', None) if job_queue is None: return True # no job queue means no check possible last_run = getattr(job_queue, 'last_job_run_time', 0) if last_run == 0: return True # never run, may be freshly started return (time.monotonic() - last_run) < self.CHECK_INTERVAL_SECONDS def _verify_transport_liveness(self) -> bool: """Check that at least one transport is running.""" ctx = getattr(self._router, '_context', None) if ctx is None: return False transport_mgr = getattr(ctx, 'transport_manager', None) if transport_mgr is None: return True # no transport manager return getattr(transport_mgr, 'is_running', False) def dump_status(self) -> str: """Generate diagnostic status string.""" lines = [f"=== RouterWatchdog Status (errors={self._consecutive_errors}) ==="] lines.append(f"Router state: {getattr(self._router, 'state', 'unknown')}") lines.append(f"Uptime: {getattr(self._router, 'uptime_seconds', 0):.0f}s") ctx = getattr(self._router, '_context', None) if ctx: jq = getattr(ctx, 'job_queue', None) if jq: lines.append(f"Job queue pending: {getattr(jq, 'pending_count', '?')}") tm = getattr(ctx, 'transport_manager', None) if tm: lines.append(f"Transports running: {getattr(tm, 'is_running', '?')}") return "\n".join(lines)