A Python port of the Invisible Internet Project (I2P)
1"""Frequency — rolling average event frequency tracker.
2
3Ported from net.i2p.stat.Frequency.
4"""
5
6import threading
7import time as _time
8
9
10def _now_ms() -> int:
11 return int(_time.time() * 1000)
12
13
14class Frequency:
15 """Track the average interval between events using a rolling weighted average."""
16
17 def __init__(self, period: int) -> None:
18 if period <= 0:
19 raise ValueError("period must be positive")
20 self._period = period
21 self._lock = threading.Lock()
22 now = _now_ms()
23 self._last_event: int = now
24 self._start: int = now
25 self._count: int = 0
26 self._avg_interval: float = float(period + 1)
27 self._min_avg_interval: float = float(period + 1)
28
29 @property
30 def period(self) -> int:
31 return self._period
32
33 def event_occurred(self) -> None:
34 """Record that an event has occurred and update rolling average."""
35 now = _now_ms()
36 with self._lock:
37 interval = now - self._last_event
38 self._last_event = now
39 self._count += 1
40
41 if interval >= self._period:
42 # No recent events — reset to "no events"
43 self._avg_interval = float(self._period + 1)
44 else:
45 # Weighted rolling average
46 old_weight = 1.0 - (interval / self._period)
47 new_weight = interval / self._period
48 self._avg_interval = (
49 old_weight * self._avg_interval + new_weight * interval
50 )
51
52 if self._avg_interval < self._min_avg_interval:
53 self._min_avg_interval = self._avg_interval
54
55 def recalculate(self) -> None:
56 """Recalculate rolling average without recording an event."""
57 now = _now_ms()
58 with self._lock:
59 interval = now - self._last_event
60 if interval >= self._period:
61 self._avg_interval = float(self._period + 1)
62
63 def get_average_interval(self) -> float:
64 with self._lock:
65 return self._avg_interval
66
67 def get_min_average_interval(self) -> float:
68 with self._lock:
69 return self._min_avg_interval
70
71 def get_average_events_per_period(self) -> float:
72 with self._lock:
73 if self._avg_interval <= 0:
74 return 0.0
75 return self._period / self._avg_interval
76
77 def get_max_average_events_per_period(self) -> float:
78 with self._lock:
79 if self._min_avg_interval <= 0:
80 return 0.0
81 return self._period / self._min_avg_interval
82
83 def get_strict_average_interval(self) -> float:
84 """Lifetime average: (now - start) / event_count."""
85 now = _now_ms()
86 with self._lock:
87 if self._count <= 0:
88 return float(self._period + 1)
89 return (now - self._start) / self._count
90
91 def get_event_count(self) -> int:
92 with self._lock:
93 return self._count