A Python port of the Invisible Internet Project (I2P)
at main 138 lines 4.6 kB view raw
1"""Bandwidth limiter using token bucket algorithm. 2 3Controls send/receive rates to prevent bandwidth saturation. 4Ported from net.i2p.router.transport.BandwidthLimiter. 5""" 6 7import asyncio 8import time 9 10 11class BandwidthLimiter: 12 """Token bucket bandwidth limiter. 13 14 Allows bursts up to bucket_size, refills at rate_bytes_per_sec. 15 A rate of 0 means unlimited (all acquire calls succeed immediately). 16 """ 17 18 def __init__(self, rate_bytes_per_sec: int = 0, 19 bucket_size: int = 0) -> None: 20 """Create a bandwidth limiter. 21 22 Args: 23 rate_bytes_per_sec: Refill rate. 0 means unlimited. 24 bucket_size: Maximum burst size. Defaults to rate_bytes_per_sec. 25 """ 26 self._rate = rate_bytes_per_sec 27 self._bucket_size = bucket_size or rate_bytes_per_sec 28 self._tokens: float = float(self._bucket_size) 29 self._last_refill: float = time.monotonic() 30 self._total_bytes: int = 0 31 self._lock = asyncio.Lock() 32 33 # ------------------------------------------------------------------ 34 # Public API 35 # ------------------------------------------------------------------ 36 37 async def acquire(self, byte_count: int) -> None: 38 """Wait until enough tokens are available, then consume them. 39 40 For unlimited limiters (rate=0), returns immediately. 41 """ 42 if self._rate == 0: 43 self._total_bytes += byte_count 44 return 45 46 while True: 47 async with self._lock: 48 self._refill() 49 if self._tokens >= byte_count: 50 self._tokens -= byte_count 51 self._total_bytes += byte_count 52 return 53 54 # Not enough tokens — wait a bit for refill 55 needed = byte_count - self._tokens 56 wait_seconds = needed / self._rate 57 # Clamp to reasonable range 58 wait_seconds = max(0.001, min(wait_seconds, 1.0)) 59 await asyncio.sleep(wait_seconds) 60 61 def try_acquire(self, byte_count: int) -> bool: 62 """Non-blocking: consume if available, return False if not. 63 64 For unlimited limiters (rate=0), always returns True. 65 """ 66 if self._rate == 0: 67 self._total_bytes += byte_count 68 return True 69 70 self._refill() 71 if self._tokens >= byte_count: 72 self._tokens -= byte_count 73 self._total_bytes += byte_count 74 return True 75 return False 76 77 def _refill(self) -> None: 78 """Add tokens based on elapsed time since last refill.""" 79 now = time.monotonic() 80 elapsed = now - self._last_refill 81 if elapsed <= 0: 82 return 83 self._tokens += elapsed * self._rate 84 if self._tokens > self._bucket_size: 85 self._tokens = float(self._bucket_size) 86 self._last_refill = now 87 88 # ------------------------------------------------------------------ 89 # Properties 90 # ------------------------------------------------------------------ 91 92 @property 93 def available(self) -> float: 94 """Current available tokens (bytes).""" 95 self._refill() 96 return self._tokens 97 98 @property 99 def total_bytes(self) -> int: 100 """Total bytes consumed through this limiter.""" 101 return self._total_bytes 102 103 @property 104 def is_unlimited(self) -> bool: 105 """True if rate is 0 (no limiting).""" 106 return self._rate == 0 107 108 109class BandwidthManager: 110 """Manages separate inbound and outbound bandwidth limits. 111 112 Provides convenience methods for acquiring bandwidth in either 113 direction, each backed by an independent token bucket. 114 """ 115 116 def __init__(self, inbound_rate: int = 0, outbound_rate: int = 0, 117 burst_factor: float = 1.5) -> None: 118 """Create a bandwidth manager. 119 120 Args: 121 inbound_rate: Inbound rate limit (bytes/sec). 0 = unlimited. 122 outbound_rate: Outbound rate limit (bytes/sec). 0 = unlimited. 123 burst_factor: Bucket size as a multiple of the rate. 124 """ 125 self.inbound = BandwidthLimiter( 126 inbound_rate, int(inbound_rate * burst_factor), 127 ) 128 self.outbound = BandwidthLimiter( 129 outbound_rate, int(outbound_rate * burst_factor), 130 ) 131 132 async def acquire_inbound(self, byte_count: int) -> None: 133 """Acquire inbound bandwidth.""" 134 await self.inbound.acquire(byte_count) 135 136 async def acquire_outbound(self, byte_count: int) -> None: 137 """Acquire outbound bandwidth.""" 138 await self.outbound.acquire(byte_count)