"""Tests for BandwidthLimiter — token bucket rate limiting. TDD: tests written before implementation. """ import asyncio import time import pytest from i2p_transport.bandwidth_limiter import BandwidthLimiter, BandwidthManager class TestBandwidthLimiter: def test_unlimited(self): """rate=0 means unlimited — try_acquire always succeeds.""" limiter = BandwidthLimiter(rate_bytes_per_sec=0) assert limiter.is_unlimited assert limiter.try_acquire(1_000_000) is True def test_try_acquire_available(self): """Succeeds when enough tokens are available.""" limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=1000) assert limiter.try_acquire(500) is True assert limiter.available == pytest.approx(500, abs=10) def test_try_acquire_insufficient(self): """Fails when not enough tokens.""" limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=100) assert limiter.try_acquire(200) is False def test_refill_over_time(self): """Tokens regenerate based on elapsed time.""" limiter = BandwidthLimiter(rate_bytes_per_sec=10000, bucket_size=10000) # Drain the bucket assert limiter.try_acquire(10000) is True assert limiter.available == pytest.approx(0, abs=10) # Simulate time passing by adjusting _last_refill limiter._last_refill -= 0.5 # 500ms ago -> should add ~5000 tokens limiter._refill() assert limiter.available == pytest.approx(5000, abs=200) def test_total_bytes_tracked(self): """Total consumed bytes are accurately counted.""" limiter = BandwidthLimiter(rate_bytes_per_sec=10000, bucket_size=10000) assert limiter.total_bytes == 0 limiter.try_acquire(100) assert limiter.total_bytes == 100 limiter.try_acquire(250) assert limiter.total_bytes == 350 @pytest.mark.asyncio async def test_acquire_blocks_until_available(self): """acquire() waits until tokens are available.""" limiter = BandwidthLimiter(rate_bytes_per_sec=100000, bucket_size=100) # Drain limiter.try_acquire(100) # acquire(50) should eventually succeed as tokens refill at 100k/sec await asyncio.wait_for(limiter.acquire(50), timeout=2.0) assert limiter.total_bytes == 150 def test_bucket_does_not_exceed_max(self): """Tokens should not exceed bucket_size after refill.""" limiter = BandwidthLimiter(rate_bytes_per_sec=1000, bucket_size=500) # Simulate a long time passing limiter._last_refill -= 100 # 100 seconds = 100000 tokens, but capped limiter._refill() assert limiter.available <= 500 def test_burst_factor(self): """BandwidthManager bucket allows burst above steady rate.""" mgr = BandwidthManager(inbound_rate=1000, outbound_rate=1000, burst_factor=2.0) # Bucket size should be 2x the rate assert mgr.inbound._bucket_size == 2000 assert mgr.outbound._bucket_size == 2000 # Should be able to acquire burst amount assert mgr.inbound.try_acquire(1500) is True class TestBandwidthManager: def test_separate_limits(self): """Inbound and outbound are independent.""" mgr = BandwidthManager(inbound_rate=1000, outbound_rate=2000) # Drain inbound completely — request the full bucket mgr.inbound.try_acquire(1000) # Outbound should still have tokens (independent bucket) assert mgr.outbound.try_acquire(1500) is True # Inbound should be nearly empty — request more than could refill # in a microsecond (rate=1000 bytes/sec, so ~0.001 bytes/microsecond) assert mgr.inbound.try_acquire(999) is False @pytest.mark.asyncio async def test_acquire_inbound_outbound(self): """Manager convenience methods delegate correctly.""" mgr = BandwidthManager(inbound_rate=0, outbound_rate=0) # unlimited await mgr.acquire_inbound(1000) await mgr.acquire_outbound(2000) assert mgr.inbound.total_bytes == 1000 assert mgr.outbound.total_bytes == 2000 def test_unlimited_manager(self): """Zero rates mean unlimited.""" mgr = BandwidthManager(inbound_rate=0, outbound_rate=0) assert mgr.inbound.is_unlimited assert mgr.outbound.is_unlimited