A Python port of the Invisible Internet Project (I2P)
1"""Bandwidth limiter using token bucket algorithm.
2
3Controls send/receive rates to prevent bandwidth saturation.
4Ported from net.i2p.router.transport.BandwidthLimiter.
5"""
6
7import asyncio
8import time
9
10
11class BandwidthLimiter:
12 """Token bucket bandwidth limiter.
13
14 Allows bursts up to bucket_size, refills at rate_bytes_per_sec.
15 A rate of 0 means unlimited (all acquire calls succeed immediately).
16 """
17
18 def __init__(self, rate_bytes_per_sec: int = 0,
19 bucket_size: int = 0) -> None:
20 """Create a bandwidth limiter.
21
22 Args:
23 rate_bytes_per_sec: Refill rate. 0 means unlimited.
24 bucket_size: Maximum burst size. Defaults to rate_bytes_per_sec.
25 """
26 self._rate = rate_bytes_per_sec
27 self._bucket_size = bucket_size or rate_bytes_per_sec
28 self._tokens: float = float(self._bucket_size)
29 self._last_refill: float = time.monotonic()
30 self._total_bytes: int = 0
31 self._lock = asyncio.Lock()
32
33 # ------------------------------------------------------------------
34 # Public API
35 # ------------------------------------------------------------------
36
37 async def acquire(self, byte_count: int) -> None:
38 """Wait until enough tokens are available, then consume them.
39
40 For unlimited limiters (rate=0), returns immediately.
41 """
42 if self._rate == 0:
43 self._total_bytes += byte_count
44 return
45
46 while True:
47 async with self._lock:
48 self._refill()
49 if self._tokens >= byte_count:
50 self._tokens -= byte_count
51 self._total_bytes += byte_count
52 return
53
54 # Not enough tokens — wait a bit for refill
55 needed = byte_count - self._tokens
56 wait_seconds = needed / self._rate
57 # Clamp to reasonable range
58 wait_seconds = max(0.001, min(wait_seconds, 1.0))
59 await asyncio.sleep(wait_seconds)
60
61 def try_acquire(self, byte_count: int) -> bool:
62 """Non-blocking: consume if available, return False if not.
63
64 For unlimited limiters (rate=0), always returns True.
65 """
66 if self._rate == 0:
67 self._total_bytes += byte_count
68 return True
69
70 self._refill()
71 if self._tokens >= byte_count:
72 self._tokens -= byte_count
73 self._total_bytes += byte_count
74 return True
75 return False
76
77 def _refill(self) -> None:
78 """Add tokens based on elapsed time since last refill."""
79 now = time.monotonic()
80 elapsed = now - self._last_refill
81 if elapsed <= 0:
82 return
83 self._tokens += elapsed * self._rate
84 if self._tokens > self._bucket_size:
85 self._tokens = float(self._bucket_size)
86 self._last_refill = now
87
88 # ------------------------------------------------------------------
89 # Properties
90 # ------------------------------------------------------------------
91
92 @property
93 def available(self) -> float:
94 """Current available tokens (bytes)."""
95 self._refill()
96 return self._tokens
97
98 @property
99 def total_bytes(self) -> int:
100 """Total bytes consumed through this limiter."""
101 return self._total_bytes
102
103 @property
104 def is_unlimited(self) -> bool:
105 """True if rate is 0 (no limiting)."""
106 return self._rate == 0
107
108
109class BandwidthManager:
110 """Manages separate inbound and outbound bandwidth limits.
111
112 Provides convenience methods for acquiring bandwidth in either
113 direction, each backed by an independent token bucket.
114 """
115
116 def __init__(self, inbound_rate: int = 0, outbound_rate: int = 0,
117 burst_factor: float = 1.5) -> None:
118 """Create a bandwidth manager.
119
120 Args:
121 inbound_rate: Inbound rate limit (bytes/sec). 0 = unlimited.
122 outbound_rate: Outbound rate limit (bytes/sec). 0 = unlimited.
123 burst_factor: Bucket size as a multiple of the rate.
124 """
125 self.inbound = BandwidthLimiter(
126 inbound_rate, int(inbound_rate * burst_factor),
127 )
128 self.outbound = BandwidthLimiter(
129 outbound_rate, int(outbound_rate * burst_factor),
130 )
131
132 async def acquire_inbound(self, byte_count: int) -> None:
133 """Acquire inbound bandwidth."""
134 await self.inbound.acquire(byte_count)
135
136 async def acquire_outbound(self, byte_count: int) -> None:
137 """Acquire outbound bandwidth."""
138 await self.outbound.acquire(byte_count)