A Python port of the Invisible Internet Project (I2P)
at main 109 lines 4.4 kB view raw
1"""Tests for BandwidthLimiter — token bucket rate limiting. 2 3TDD: tests written before implementation. 4""" 5 6import asyncio 7import time 8 9import pytest 10 11from i2p_transport.bandwidth_limiter import BandwidthLimiter, BandwidthManager 12 13 14class TestBandwidthLimiter: 15 16 def test_unlimited(self): 17 """rate=0 means unlimited — try_acquire always succeeds.""" 18 limiter = BandwidthLimiter(rate_bytes_per_sec=0) 19 assert limiter.is_unlimited 20 assert limiter.try_acquire(1_000_000) is True 21 22 def test_try_acquire_available(self): 23 """Succeeds when enough tokens are available.""" 24 limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=1000) 25 assert limiter.try_acquire(500) is True 26 assert limiter.available == pytest.approx(500, abs=10) 27 28 def test_try_acquire_insufficient(self): 29 """Fails when not enough tokens.""" 30 limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=100) 31 assert limiter.try_acquire(200) is False 32 33 def test_refill_over_time(self): 34 """Tokens regenerate based on elapsed time.""" 35 limiter = BandwidthLimiter(rate_bytes_per_sec=10000, bucket_size=10000) 36 # Drain the bucket 37 assert limiter.try_acquire(10000) is True 38 assert limiter.available == pytest.approx(0, abs=10) 39 40 # Simulate time passing by adjusting _last_refill 41 limiter._last_refill -= 0.5 # 500ms ago -> should add ~5000 tokens 42 limiter._refill() 43 assert limiter.available == pytest.approx(5000, abs=200) 44 45 def test_total_bytes_tracked(self): 46 """Total consumed bytes are accurately counted.""" 47 limiter = BandwidthLimiter(rate_bytes_per_sec=10000, bucket_size=10000) 48 assert limiter.total_bytes == 0 49 limiter.try_acquire(100) 50 assert limiter.total_bytes == 100 51 limiter.try_acquire(250) 52 assert limiter.total_bytes == 350 53 54 @pytest.mark.asyncio 55 async def test_acquire_blocks_until_available(self): 56 """acquire() waits until tokens are available.""" 57 limiter = BandwidthLimiter(rate_bytes_per_sec=100000, bucket_size=100) 58 # Drain 59 limiter.try_acquire(100) 60 61 # acquire(50) should eventually succeed as tokens refill at 100k/sec 62 await asyncio.wait_for(limiter.acquire(50), timeout=2.0) 63 assert limiter.total_bytes == 150 64 65 def test_bucket_does_not_exceed_max(self): 66 """Tokens should not exceed bucket_size after refill.""" 67 limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=500) 68 # Simulate a long time passing 69 limiter._last_refill -= 100 # 100 seconds = 100000 tokens, but capped 70 limiter._refill() 71 assert limiter.available <= 500 72 73 def test_burst_factor(self): 74 """BandwidthManager bucket allows burst above steady rate.""" 75 mgr = BandwidthManager(inbound_rate=1000, outbound_rate=1000, burst_factor=2.0) 76 # Bucket size should be 2x the rate 77 assert mgr.inbound._bucket_size == 2000 78 assert mgr.outbound._bucket_size == 2000 79 # Should be able to acquire burst amount 80 assert mgr.inbound.try_acquire(1500) is True 81 82 83class TestBandwidthManager: 84 85 def test_separate_limits(self): 86 """Inbound and outbound are independent.""" 87 mgr = BandwidthManager(inbound_rate=1000, outbound_rate=2000) 88 # Drain inbound completely — request the full bucket 89 mgr.inbound.try_acquire(1000) 90 # Outbound should still have tokens (independent bucket) 91 assert mgr.outbound.try_acquire(1500) is True 92 # Inbound should be nearly empty — request more than could refill 93 # in a microsecond (rate=1000 bytes/sec, so ~0.001 bytes/microsecond) 94 assert mgr.inbound.try_acquire(999) is False 95 96 @pytest.mark.asyncio 97 async def test_acquire_inbound_outbound(self): 98 """Manager convenience methods delegate correctly.""" 99 mgr = BandwidthManager(inbound_rate=0, outbound_rate=0) # unlimited 100 await mgr.acquire_inbound(1000) 101 await mgr.acquire_outbound(2000) 102 assert mgr.inbound.total_bytes == 1000 103 assert mgr.outbound.total_bytes == 2000 104 105 def test_unlimited_manager(self): 106 """Zero rates mean unlimited.""" 107 mgr = BandwidthManager(inbound_rate=0, outbound_rate=0) 108 assert mgr.inbound.is_unlimited 109 assert mgr.outbound.is_unlimited