A Python port of the Invisible Internet Project (I2P)
at main 93 lines 3.0 kB view raw
1"""Frequency — rolling average event frequency tracker. 2 3Ported from net.i2p.stat.Frequency. 4""" 5 6import threading 7import time as _time 8 9 10def _now_ms() -> int: 11 return int(_time.time() * 1000) 12 13 14class Frequency: 15 """Track the average interval between events using a rolling weighted average.""" 16 17 def __init__(self, period: int) -> None: 18 if period <= 0: 19 raise ValueError("period must be positive") 20 self._period = period 21 self._lock = threading.Lock() 22 now = _now_ms() 23 self._last_event: int = now 24 self._start: int = now 25 self._count: int = 0 26 self._avg_interval: float = float(period + 1) 27 self._min_avg_interval: float = float(period + 1) 28 29 @property 30 def period(self) -> int: 31 return self._period 32 33 def event_occurred(self) -> None: 34 """Record that an event has occurred and update rolling average.""" 35 now = _now_ms() 36 with self._lock: 37 interval = now - self._last_event 38 self._last_event = now 39 self._count += 1 40 41 if interval >= self._period: 42 # No recent events — reset to "no events" 43 self._avg_interval = float(self._period + 1) 44 else: 45 # Weighted rolling average 46 old_weight = 1.0 - (interval / self._period) 47 new_weight = interval / self._period 48 self._avg_interval = ( 49 old_weight * self._avg_interval + new_weight * interval 50 ) 51 52 if self._avg_interval < self._min_avg_interval: 53 self._min_avg_interval = self._avg_interval 54 55 def recalculate(self) -> None: 56 """Recalculate rolling average without recording an event.""" 57 now = _now_ms() 58 with self._lock: 59 interval = now - self._last_event 60 if interval >= self._period: 61 self._avg_interval = float(self._period + 1) 62 63 def get_average_interval(self) -> float: 64 with self._lock: 65 return self._avg_interval 66 67 def get_min_average_interval(self) -> float: 68 with self._lock: 69 return self._min_avg_interval 70 71 def get_average_events_per_period(self) -> float: 72 with self._lock: 73 if self._avg_interval <= 0: 74 return 0.0 75 return self._period / self._avg_interval 76 77 def get_max_average_events_per_period(self) -> float: 78 with self._lock: 79 if self._min_avg_interval <= 0: 80 return 0.0 81 return self._period / self._min_avg_interval 82 83 def get_strict_average_interval(self) -> float: 84 """Lifetime average: (now - start) / event_count.""" 85 now = _now_ms() 86 with self._lock: 87 if self._count <= 0: 88 return float(self._period + 1) 89 return (now - self._start) / self._count 90 91 def get_event_count(self) -> int: 92 with self._lock: 93 return self._count