A Python port of the Invisible Internet Project (I2P)
at main 109 lines 3.7 kB view raw
1"""RouterWatchdog — periodic health monitoring. 2 3Ported from net.i2p.router.tasks.RouterWatchdog. 4 5Checks job queue liveness and transport health at regular intervals. 6After MAX_CONSECUTIVE_ERRORS failures, logs a diagnostic dump. 7""" 8 9from __future__ import annotations 10 11import asyncio 12import logging 13import time 14 15logger = logging.getLogger(__name__) 16 17 18class RouterWatchdog: 19 """Monitors router health and detects hung states.""" 20 21 CHECK_INTERVAL_SECONDS = 60 22 MAX_CONSECUTIVE_ERRORS = 20 23 24 def __init__(self, router) -> None: 25 self._router = router 26 self._consecutive_errors = 0 27 self._running = False 28 self._last_check: float = 0 29 30 @property 31 def consecutive_errors(self) -> int: 32 return self._consecutive_errors 33 34 @property 35 def is_running(self) -> bool: 36 return self._running 37 38 async def run(self) -> None: 39 """Main watchdog loop. Runs until stopped.""" 40 self._running = True 41 logger.info("RouterWatchdog started") 42 43 while self._running: 44 await asyncio.sleep(self.CHECK_INTERVAL_SECONDS) 45 if not self._running: 46 break 47 48 ok = True 49 if not self._verify_job_queue_liveness(): 50 ok = False 51 if not self._verify_transport_liveness(): 52 ok = False 53 54 if ok: 55 self._consecutive_errors = 0 56 else: 57 self._consecutive_errors += 1 58 if self._consecutive_errors >= self.MAX_CONSECUTIVE_ERRORS: 59 logger.error( 60 "RouterWatchdog: %d consecutive failures, dumping status", 61 self._consecutive_errors, 62 ) 63 logger.error(self.dump_status()) 64 65 self._last_check = time.monotonic() 66 67 def stop(self) -> None: 68 """Stop the watchdog loop.""" 69 self._running = False 70 71 def _verify_job_queue_liveness(self) -> bool: 72 """Check that the job queue has processed something recently.""" 73 ctx = getattr(self._router, '_context', None) 74 if ctx is None: 75 return False 76 job_queue = getattr(ctx, 'job_queue', None) 77 if job_queue is None: 78 return True # no job queue means no check possible 79 last_run = getattr(job_queue, 'last_job_run_time', 0) 80 if last_run == 0: 81 return True # never run, may be freshly started 82 return (time.monotonic() - last_run) < self.CHECK_INTERVAL_SECONDS 83 84 def _verify_transport_liveness(self) -> bool: 85 """Check that at least one transport is running.""" 86 ctx = getattr(self._router, '_context', None) 87 if ctx is None: 88 return False 89 transport_mgr = getattr(ctx, 'transport_manager', None) 90 if transport_mgr is None: 91 return True # no transport manager 92 return getattr(transport_mgr, 'is_running', False) 93 94 def dump_status(self) -> str: 95 """Generate diagnostic status string.""" 96 lines = [f"=== RouterWatchdog Status (errors={self._consecutive_errors}) ==="] 97 lines.append(f"Router state: {getattr(self._router, 'state', 'unknown')}") 98 lines.append(f"Uptime: {getattr(self._router, 'uptime_seconds', 0):.0f}s") 99 100 ctx = getattr(self._router, '_context', None) 101 if ctx: 102 jq = getattr(ctx, 'job_queue', None) 103 if jq: 104 lines.append(f"Job queue pending: {getattr(jq, 'pending_count', '?')}") 105 tm = getattr(ctx, 'transport_manager', None) 106 if tm: 107 lines.append(f"Transports running: {getattr(tm, 'is_running', '?')}") 108 109 return "\n".join(lines)