A Python port of the Invisible Internet Project (I2P)
1"""Bandwidth estimator using exponential moving average.
2
3Ported from net.i2p.client.streaming.impl.BandwidthEstimator.
4"""
5
6import time
7
8
9class BandwidthEstimator:
10 """Track send/receive bandwidth with exponential moving average.
11
12 The estimator uses a configurable time window and EMA smoothing
13 to provide stable bandwidth rate estimates.
14 """
15
16 # EMA smoothing factor: higher = more weight to recent samples
17 _ALPHA = 0.3
18
19 def __init__(self, window_ms: int = 1000):
20 self._window_ms = window_ms
21 self._send_rate: float = 0.0
22 self._recv_rate: float = 0.0
23 self._last_send_time: float | None = None
24 self._last_recv_time: float | None = None
25
26 def record_sent(self, bytes_count: int) -> None:
27 """Record bytes sent and update send rate estimate."""
28 now = time.monotonic()
29 if bytes_count == 0:
30 return
31 if self._last_send_time is not None:
32 elapsed = now - self._last_send_time
33 if elapsed > 1e-6:
34 # Cap elapsed to at least 1ms to avoid unrealistic rates
35 elapsed = max(elapsed, self._window_ms / 1_000_000)
36 instant_rate = bytes_count / elapsed
37 self._send_rate = (
38 self._ALPHA * instant_rate
39 + (1 - self._ALPHA) * self._send_rate
40 )
41 else:
42 # Simultaneous call: just add to rate estimate
43 self._send_rate += bytes_count * (1000.0 / self._window_ms)
44 else:
45 # First sample: estimate based on window
46 self._send_rate = bytes_count * (1000.0 / self._window_ms)
47 self._last_send_time = now
48
49 def record_received(self, bytes_count: int) -> None:
50 """Record bytes received and update receive rate estimate."""
51 now = time.monotonic()
52 if bytes_count == 0:
53 return
54 if self._last_recv_time is not None:
55 elapsed = now - self._last_recv_time
56 if elapsed > 1e-6:
57 elapsed = max(elapsed, self._window_ms / 1_000_000)
58 instant_rate = bytes_count / elapsed
59 self._recv_rate = (
60 self._ALPHA * instant_rate
61 + (1 - self._ALPHA) * self._recv_rate
62 )
63 else:
64 self._recv_rate += bytes_count * (1000.0 / self._window_ms)
65 else:
66 self._recv_rate = bytes_count * (1000.0 / self._window_ms)
67 self._last_recv_time = now
68
69 def send_bps(self) -> float:
70 """Current estimated send rate in bytes per second."""
71 return self._send_rate
72
73 def recv_bps(self) -> float:
74 """Current estimated receive rate in bytes per second."""
75 return self._recv_rate