A Python port of the Invisible Internet Project (I2P)
1"""Tests for BandwidthLimiter — token bucket rate limiting.
2
3TDD: tests written before implementation.
4"""
5
6import asyncio
7import time
8
9import pytest
10
11from i2p_transport.bandwidth_limiter import BandwidthLimiter, BandwidthManager
12
13
14class TestBandwidthLimiter:
15
16 def test_unlimited(self):
17 """rate=0 means unlimited — try_acquire always succeeds."""
18 limiter = BandwidthLimiter(rate_bytes_per_sec=0)
19 assert limiter.is_unlimited
20 assert limiter.try_acquire(1_000_000) is True
21
22 def test_try_acquire_available(self):
23 """Succeeds when enough tokens are available."""
24 limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=1000)
25 assert limiter.try_acquire(500) is True
26 assert limiter.available == pytest.approx(500, abs=10)
27
28 def test_try_acquire_insufficient(self):
29 """Fails when not enough tokens."""
30 limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=100)
31 assert limiter.try_acquire(200) is False
32
33 def test_refill_over_time(self):
34 """Tokens regenerate based on elapsed time."""
35 limiter = BandwidthLimiter(rate_bytes_per_sec=10000, bucket_size=10000)
36 # Drain the bucket
37 assert limiter.try_acquire(10000) is True
38 assert limiter.available == pytest.approx(0, abs=10)
39
40 # Simulate time passing by adjusting _last_refill
41 limiter._last_refill -= 0.5 # 500ms ago -> should add ~5000 tokens
42 limiter._refill()
43 assert limiter.available == pytest.approx(5000, abs=200)
44
45 def test_total_bytes_tracked(self):
46 """Total consumed bytes are accurately counted."""
47 limiter = BandwidthLimiter(rate_bytes_per_sec=10000, bucket_size=10000)
48 assert limiter.total_bytes == 0
49 limiter.try_acquire(100)
50 assert limiter.total_bytes == 100
51 limiter.try_acquire(250)
52 assert limiter.total_bytes == 350
53
54 @pytest.mark.asyncio
55 async def test_acquire_blocks_until_available(self):
56 """acquire() waits until tokens are available."""
57 limiter = BandwidthLimiter(rate_bytes_per_sec=100000, bucket_size=100)
58 # Drain
59 limiter.try_acquire(100)
60
61 # acquire(50) should eventually succeed as tokens refill at 100k/sec
62 await asyncio.wait_for(limiter.acquire(50), timeout=2.0)
63 assert limiter.total_bytes == 150
64
65 def test_bucket_does_not_exceed_max(self):
66 """Tokens should not exceed bucket_size after refill."""
67 limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=500)
68 # Simulate a long time passing
69 limiter._last_refill -= 100 # 100 seconds = 100000 tokens, but capped
70 limiter._refill()
71 assert limiter.available <= 500
72
73 def test_burst_factor(self):
74 """BandwidthManager bucket allows burst above steady rate."""
75 mgr = BandwidthManager(inbound_rate=1000, outbound_rate=1000, burst_factor=2.0)
76 # Bucket size should be 2x the rate
77 assert mgr.inbound._bucket_size == 2000
78 assert mgr.outbound._bucket_size == 2000
79 # Should be able to acquire burst amount
80 assert mgr.inbound.try_acquire(1500) is True
81
82
83class TestBandwidthManager:
84
85 def test_separate_limits(self):
86 """Inbound and outbound are independent."""
87 mgr = BandwidthManager(inbound_rate=1000, outbound_rate=2000)
88 # Drain inbound completely — request the full bucket
89 mgr.inbound.try_acquire(1000)
90 # Outbound should still have tokens (independent bucket)
91 assert mgr.outbound.try_acquire(1500) is True
92 # Inbound should be nearly empty — request more than could refill
93 # in a microsecond (rate=1000 bytes/sec, so ~0.001 bytes/microsecond)
94 assert mgr.inbound.try_acquire(999) is False
95
96 @pytest.mark.asyncio
97 async def test_acquire_inbound_outbound(self):
98 """Manager convenience methods delegate correctly."""
99 mgr = BandwidthManager(inbound_rate=0, outbound_rate=0) # unlimited
100 await mgr.acquire_inbound(1000)
101 await mgr.acquire_outbound(2000)
102 assert mgr.inbound.total_bytes == 1000
103 assert mgr.outbound.total_bytes == 2000
104
105 def test_unlimited_manager(self):
106 """Zero rates mean unlimited."""
107 mgr = BandwidthManager(inbound_rate=0, outbound_rate=0)
108 assert mgr.inbound.is_unlimited
109 assert mgr.outbound.is_unlimited