"""Bandwidth limiter using token bucket algorithm. Controls send/receive rates to prevent bandwidth saturation. Ported from net.i2p.router.transport.BandwidthLimiter. """ import asyncio import time class BandwidthLimiter: """Token bucket bandwidth limiter. Allows bursts up to bucket_size, refills at rate_bytes_per_sec. A rate of 0 means unlimited (all acquire calls succeed immediately). """ def __init__(self, rate_bytes_per_sec: int = 0, bucket_size: int = 0) -> None: """Create a bandwidth limiter. Args: rate_bytes_per_sec: Refill rate. 0 means unlimited. bucket_size: Maximum burst size. Defaults to rate_bytes_per_sec. """ self._rate = rate_bytes_per_sec self._bucket_size = bucket_size or rate_bytes_per_sec self._tokens: float = float(self._bucket_size) self._last_refill: float = time.monotonic() self._total_bytes: int = 0 self._lock = asyncio.Lock() # ------------------------------------------------------------------ # Public API # ------------------------------------------------------------------ async def acquire(self, byte_count: int) -> None: """Wait until enough tokens are available, then consume them. For unlimited limiters (rate=0), returns immediately. """ if self._rate == 0: self._total_bytes += byte_count return while True: async with self._lock: self._refill() if self._tokens >= byte_count: self._tokens -= byte_count self._total_bytes += byte_count return # Not enough tokens — wait a bit for refill needed = byte_count - self._tokens wait_seconds = needed / self._rate # Clamp to reasonable range wait_seconds = max(0.001, min(wait_seconds, 1.0)) await asyncio.sleep(wait_seconds) def try_acquire(self, byte_count: int) -> bool: """Non-blocking: consume if available, return False if not. For unlimited limiters (rate=0), always returns True. """ if self._rate == 0: self._total_bytes += byte_count return True self._refill() if self._tokens >= byte_count: self._tokens -= byte_count self._total_bytes += byte_count return True return False def _refill(self) -> None: """Add tokens based on elapsed time since last refill.""" now = time.monotonic() elapsed = now - self._last_refill if elapsed <= 0: return self._tokens += elapsed * self._rate if self._tokens > self._bucket_size: self._tokens = float(self._bucket_size) self._last_refill = now # ------------------------------------------------------------------ # Properties # ------------------------------------------------------------------ @property def available(self) -> float: """Current available tokens (bytes).""" self._refill() return self._tokens @property def total_bytes(self) -> int: """Total bytes consumed through this limiter.""" return self._total_bytes @property def is_unlimited(self) -> bool: """True if rate is 0 (no limiting).""" return self._rate == 0 class BandwidthManager: """Manages separate inbound and outbound bandwidth limits. Provides convenience methods for acquiring bandwidth in either direction, each backed by an independent token bucket. """ def __init__(self, inbound_rate: int = 0, outbound_rate: int = 0, burst_factor: float = 1.5) -> None: """Create a bandwidth manager. Args: inbound_rate: Inbound rate limit (bytes/sec). 0 = unlimited. outbound_rate: Outbound rate limit (bytes/sec). 0 = unlimited. burst_factor: Bucket size as a multiple of the rate. """ self.inbound = BandwidthLimiter( inbound_rate, int(inbound_rate * burst_factor), ) self.outbound = BandwidthLimiter( outbound_rate, int(outbound_rate * burst_factor), ) async def acquire_inbound(self, byte_count: int) -> None: """Acquire inbound bandwidth.""" await self.inbound.acquire(byte_count) async def acquire_outbound(self, byte_count: int) -> None: """Acquire outbound bandwidth.""" await self.outbound.acquire(byte_count)