"""RouterThrottle — admission control for tunnel requests and messages. Ported from net.i2p.router.RouterThrottleImpl. Checks bandwidth, tunnel count, and job lag to decide whether to accept tunnel build requests and network messages. """ from __future__ import annotations import enum from typing import Protocol, runtime_checkable class TunnelRequestStatus(enum.Enum): """Result of a tunnel build request evaluation.""" ACCEPT = "accept" REJECT_BANDWIDTH = "reject_bandwidth" REJECT_OVERLOADED = "reject_overloaded" REJECT_TRANSIENT = "reject_transient" REJECT_PROBABILISTIC = "reject_probabilistic" @runtime_checkable class RouterThrottle(Protocol): """Protocol for router throttle implementations.""" def accept_tunnel_request(self) -> TunnelRequestStatus: ... def accept_network_message(self) -> bool: ... def get_status(self) -> str: ... class RouterThrottleImpl: """Concrete throttle implementation using bandwidth, tunnel count, and job lag.""" MAX_PARTICIPATING_TUNNELS = 10_000 JOB_LAG_LIMIT_MS = 1500 MESSAGE_DELAY_LIMIT_MS = 3000 MIN_THROTTLE_TUNNELS = 100 def __init__(self, context=None) -> None: self._context = context self._participating_count: int = 0 self._bandwidth_load: float = 0.0 self._job_lag_ms: float = 0.0 self._message_delay_ms: float = 0.0 def set_participating_count(self, count: int) -> None: self._participating_count = count def set_bandwidth_load(self, load: float) -> None: """Set bandwidth load as 0.0-1.0 fraction.""" self._bandwidth_load = max(0.0, min(1.0, load)) def set_job_lag_ms(self, lag_ms: float) -> None: self._job_lag_ms = lag_ms def set_message_delay_ms(self, delay_ms: float) -> None: self._message_delay_ms = delay_ms def accept_tunnel_request(self) -> TunnelRequestStatus: """Evaluate whether to accept a tunnel build request.""" # Check tunnel count result = self._check_tunnel_count() if result is not None: return result # Check bandwidth result = self._check_bandwidth() if result is not None: return result # Check job lag result = self._check_job_lag() if result is not None: return result return TunnelRequestStatus.ACCEPT def accept_network_message(self) -> bool: """Return True if we should accept an inbound network message.""" if self._message_delay_ms > self.MESSAGE_DELAY_LIMIT_MS: return False if self._bandwidth_load > 0.95: return False return True def get_status(self) -> str: """Human-readable status string.""" return ( f"tunnels={self._participating_count}/{self.MAX_PARTICIPATING_TUNNELS} " f"bw={self._bandwidth_load:.1%} " f"lag={self._job_lag_ms:.0f}ms " f"delay={self._message_delay_ms:.0f}ms" ) def _check_tunnel_count(self) -> TunnelRequestStatus | None: if self._participating_count >= self.MAX_PARTICIPATING_TUNNELS: return TunnelRequestStatus.REJECT_OVERLOADED return None def _check_bandwidth(self) -> TunnelRequestStatus | None: if self._bandwidth_load > 0.9: return TunnelRequestStatus.REJECT_BANDWIDTH return None def _check_job_lag(self) -> TunnelRequestStatus | None: if self._job_lag_ms > self.JOB_LAG_LIMIT_MS: return TunnelRequestStatus.REJECT_TRANSIENT return None