A Python port of the Invisible Internet Project (I2P)
1"""RouterWatchdog — periodic health monitoring.
2
3Ported from net.i2p.router.tasks.RouterWatchdog.
4
5Checks job queue liveness and transport health at regular intervals.
6After MAX_CONSECUTIVE_ERRORS failures, logs a diagnostic dump.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import logging
13import time
14
15logger = logging.getLogger(__name__)
16
17
18class RouterWatchdog:
19 """Monitors router health and detects hung states."""
20
21 CHECK_INTERVAL_SECONDS = 60
22 MAX_CONSECUTIVE_ERRORS = 20
23
24 def __init__(self, router) -> None:
25 self._router = router
26 self._consecutive_errors = 0
27 self._running = False
28 self._last_check: float = 0
29
30 @property
31 def consecutive_errors(self) -> int:
32 return self._consecutive_errors
33
34 @property
35 def is_running(self) -> bool:
36 return self._running
37
38 async def run(self) -> None:
39 """Main watchdog loop. Runs until stopped."""
40 self._running = True
41 logger.info("RouterWatchdog started")
42
43 while self._running:
44 await asyncio.sleep(self.CHECK_INTERVAL_SECONDS)
45 if not self._running:
46 break
47
48 ok = True
49 if not self._verify_job_queue_liveness():
50 ok = False
51 if not self._verify_transport_liveness():
52 ok = False
53
54 if ok:
55 self._consecutive_errors = 0
56 else:
57 self._consecutive_errors += 1
58 if self._consecutive_errors >= self.MAX_CONSECUTIVE_ERRORS:
59 logger.error(
60 "RouterWatchdog: %d consecutive failures, dumping status",
61 self._consecutive_errors,
62 )
63 logger.error(self.dump_status())
64
65 self._last_check = time.monotonic()
66
67 def stop(self) -> None:
68 """Stop the watchdog loop."""
69 self._running = False
70
71 def _verify_job_queue_liveness(self) -> bool:
72 """Check that the job queue has processed something recently."""
73 ctx = getattr(self._router, '_context', None)
74 if ctx is None:
75 return False
76 job_queue = getattr(ctx, 'job_queue', None)
77 if job_queue is None:
78 return True # no job queue means no check possible
79 last_run = getattr(job_queue, 'last_job_run_time', 0)
80 if last_run == 0:
81 return True # never run, may be freshly started
82 return (time.monotonic() - last_run) < self.CHECK_INTERVAL_SECONDS
83
84 def _verify_transport_liveness(self) -> bool:
85 """Check that at least one transport is running."""
86 ctx = getattr(self._router, '_context', None)
87 if ctx is None:
88 return False
89 transport_mgr = getattr(ctx, 'transport_manager', None)
90 if transport_mgr is None:
91 return True # no transport manager
92 return getattr(transport_mgr, 'is_running', False)
93
94 def dump_status(self) -> str:
95 """Generate diagnostic status string."""
96 lines = [f"=== RouterWatchdog Status (errors={self._consecutive_errors}) ==="]
97 lines.append(f"Router state: {getattr(self._router, 'state', 'unknown')}")
98 lines.append(f"Uptime: {getattr(self._router, 'uptime_seconds', 0):.0f}s")
99
100 ctx = getattr(self._router, '_context', None)
101 if ctx:
102 jq = getattr(ctx, 'job_queue', None)
103 if jq:
104 lines.append(f"Job queue pending: {getattr(jq, 'pending_count', '?')}")
105 tm = getattr(ctx, 'transport_manager', None)
106 if tm:
107 lines.append(f"Transports running: {getattr(tm, 'is_running', '?')}")
108
109 return "\n".join(lines)