"""Bandwidth estimator using exponential moving average. Ported from net.i2p.client.streaming.impl.BandwidthEstimator. """ import time class BandwidthEstimator: """Track send/receive bandwidth with exponential moving average. The estimator uses a configurable time window and EMA smoothing to provide stable bandwidth rate estimates. """ # EMA smoothing factor: higher = more weight to recent samples _ALPHA = 0.3 def __init__(self, window_ms: int = 1000): self._window_ms = window_ms self._send_rate: float = 0.0 self._recv_rate: float = 0.0 self._last_send_time: float | None = None self._last_recv_time: float | None = None def record_sent(self, bytes_count: int) -> None: """Record bytes sent and update send rate estimate.""" now = time.monotonic() if bytes_count == 0: return if self._last_send_time is not None: elapsed = now - self._last_send_time if elapsed > 1e-6: # Cap elapsed to at least 1ms to avoid unrealistic rates elapsed = max(elapsed, self._window_ms / 1_000_000) instant_rate = bytes_count / elapsed self._send_rate = ( self._ALPHA * instant_rate + (1 - self._ALPHA) * self._send_rate ) else: # Simultaneous call: just add to rate estimate self._send_rate += bytes_count * (1000.0 / self._window_ms) else: # First sample: estimate based on window self._send_rate = bytes_count * (1000.0 / self._window_ms) self._last_send_time = now def record_received(self, bytes_count: int) -> None: """Record bytes received and update receive rate estimate.""" now = time.monotonic() if bytes_count == 0: return if self._last_recv_time is not None: elapsed = now - self._last_recv_time if elapsed > 1e-6: elapsed = max(elapsed, self._window_ms / 1_000_000) instant_rate = bytes_count / elapsed self._recv_rate = ( self._ALPHA * instant_rate + (1 - self._ALPHA) * self._recv_rate ) else: self._recv_rate += bytes_count * (1000.0 / self._window_ms) else: self._recv_rate = bytes_count * (1000.0 / self._window_ms) self._last_recv_time = now def send_bps(self) -> float: """Current estimated send rate in bytes per second.""" return self._send_rate def recv_bps(self) -> float: """Current estimated receive rate in bytes per second.""" return self._recv_rate