"""Tests for bandwidth estimator.""" import time import pytest class TestBandwidthEstimator: def test_initial_rates_are_zero(self): from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator() assert bw.send_bps() == 0.0 assert bw.recv_bps() == 0.0 def test_record_sent_increases_send_rate(self): from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator(window_ms=1000) bw.record_sent(1000) assert bw.send_bps() > 0.0 def test_record_received_increases_recv_rate(self): from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator(window_ms=1000) bw.record_received(1000) assert bw.recv_bps() > 0.0 def test_send_and_recv_are_independent(self): from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator() bw.record_sent(5000) assert bw.send_bps() > 0.0 assert bw.recv_bps() == 0.0 def test_multiple_records_accumulate(self): from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator(window_ms=5000) bw.record_sent(100) first = bw.send_bps() bw.record_sent(100) second = bw.send_bps() assert second >= first def test_ema_smoothing(self): """EMA should smooth out spikes over time.""" from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator(window_ms=1000) # Record a large spike bw.record_sent(10000) spike_rate = bw.send_bps() # Wait a bit then record small amounts — rate should decrease time.sleep(0.05) for i in range(10): time.sleep(0.01) bw.record_sent(10) after_small = bw.send_bps() # After many small records with real elapsed time, rate should drop assert after_small < spike_rate def test_custom_window_size(self): from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator(window_ms=500) bw.record_sent(1000) assert bw.send_bps() > 0.0 def test_zero_bytes_recorded(self): from i2p_streaming.bandwidth import BandwidthEstimator bw = BandwidthEstimator() bw.record_sent(0) # Should not error, rate should be 0 assert bw.send_bps() == 0.0