A Python port of the Invisible Internet Project (I2P)
1"""Rate — rolling rate statistics over configurable time periods.
2
3Ported from net.i2p.stat.Rate.
4"""
5
6import threading
7import time as _time
8from typing import Optional, TYPE_CHECKING
9
10if TYPE_CHECKING:
11 from i2p_stat.rate_stat import RateStat
12 from i2p_stat.rate_averages import RateAverages
13
14
15def _now_ms() -> int:
16 return int(_time.time() * 1000)
17
18
19class RateSummaryListener:
20 """Callback for Rate coalesce events."""
21
22 def add(
23 self,
24 total_value: float,
25 event_count: int,
26 total_event_time: float,
27 period: int,
28 ) -> None:
29 pass
30
31
32class Rate:
33 """Track event count and total value over a configurable period.
34
35 Data is accumulated in a "current" period. When coalesce() is called,
36 current is shifted to "last" and current is reset. The "extreme"
37 tracks the highest-value period ever seen.
38 """
39
40 _SLACK = 2000 # ms tolerance for coalesce timing
41
42 def __init__(self, period: int) -> None:
43 if period <= 0:
44 raise ValueError("period must be positive")
45 self._period = period
46 self._lock = threading.Lock()
47
48 # Current incomplete period
49 self._current_total_value: float = 0.0
50 self._current_event_count: int = 0
51 self._current_total_event_time: float = 0.0
52
53 # Last complete period
54 self._last_total_value: float = 0.0
55 self._last_event_count: int = 0
56 self._last_total_event_time: float = 0.0
57
58 # Extreme (highest) period
59 self._extreme_total_value: float = 0.0
60 self._extreme_event_count: int = 0
61 self._extreme_total_event_time: float = 0.0
62
63 # Lifetime totals
64 self._lifetime_total_value: float = 0.0
65 self._lifetime_event_count: int = 0
66 self._lifetime_total_event_time: float = 0.0
67
68 now = _now_ms()
69 self._creation_date: int = now
70 self._last_coalesce_date: int = now
71
72 self._summary_listener: Optional[RateSummaryListener] = None
73 self._stat: Optional["RateStat"] = None
74
75 @property
76 def period(self) -> int:
77 return self._period
78
79 @property
80 def creation_date(self) -> int:
81 return self._creation_date
82
83 @property
84 def last_coalesce_date(self) -> int:
85 with self._lock:
86 return self._last_coalesce_date
87
88 def add_data(self, value: int, event_duration: int = 0) -> None:
89 """Record an event with the given value and optional duration."""
90 with self._lock:
91 self._current_total_value += value
92 self._current_event_count += 1
93 self._current_total_event_time += event_duration
94 self._lifetime_total_value += value
95 self._lifetime_event_count += 1
96 self._lifetime_total_event_time += event_duration
97
98 def coalesce(self) -> None:
99 """Shift current period to last, reset current."""
100 now = _now_ms()
101 listener = None
102 corrected_value = 0.0
103
104 with self._lock:
105 measured = now - self._last_coalesce_date
106 if measured < self._period - self._SLACK:
107 return
108
109 period_factor = measured / self._period if self._period > 0 else 1.0
110
111 # Correct values by period factor
112 self._last_total_value = self._current_total_value / period_factor
113 self._last_event_count = round(
114 self._current_event_count / period_factor + 0.499999
115 ) if period_factor != 1.0 else self._current_event_count
116 self._last_total_event_time = (
117 self._current_total_event_time / period_factor
118 )
119
120 # Update extreme if this period is highest
121 if self._last_total_value >= self._extreme_total_value:
122 self._extreme_total_value = self._last_total_value
123 self._extreme_event_count = self._last_event_count
124 self._extreme_total_event_time = self._last_total_event_time
125
126 # Prepare listener data
127 if self._summary_listener is not None:
128 listener = self._summary_listener
129 if self._last_event_count > 0:
130 corrected_value = (
131 self._current_total_value
132 * self._last_event_count
133 / self._current_event_count
134 )
135 else:
136 corrected_value = 0.0
137
138 # Reset current
139 self._current_total_value = 0.0
140 self._current_event_count = 0
141 self._current_total_event_time = 0.0
142 self._last_coalesce_date = now
143
144 if listener is not None:
145 listener.add(
146 corrected_value,
147 self._last_event_count,
148 self._last_total_event_time,
149 self._period,
150 )
151
152 # --- Getters ---
153
154 def get_current_total_value(self) -> float:
155 with self._lock:
156 return self._current_total_value
157
158 def get_current_event_count(self) -> int:
159 with self._lock:
160 return self._current_event_count
161
162 def get_last_total_value(self) -> float:
163 with self._lock:
164 return self._last_total_value
165
166 def get_last_event_count(self) -> int:
167 with self._lock:
168 return self._last_event_count
169
170 def get_last_total_event_time(self) -> float:
171 with self._lock:
172 return self._last_total_event_time
173
174 def get_extreme_total_value(self) -> float:
175 with self._lock:
176 return self._extreme_total_value
177
178 def get_extreme_event_count(self) -> int:
179 with self._lock:
180 return self._extreme_event_count
181
182 def get_lifetime_total_value(self) -> float:
183 with self._lock:
184 return self._lifetime_total_value
185
186 def get_lifetime_event_count(self) -> int:
187 with self._lock:
188 return self._lifetime_event_count
189
190 # --- Averages ---
191
192 def get_average_value(self) -> float:
193 """Average value per event in the last complete period."""
194 with self._lock:
195 if self._last_event_count <= 0:
196 return 0.0
197 return self._last_total_value / self._last_event_count
198
199 def get_extreme_average_value(self) -> float:
200 with self._lock:
201 if self._extreme_event_count <= 0:
202 return 0.0
203 return self._extreme_total_value / self._extreme_event_count
204
205 def get_lifetime_average_value(self) -> float:
206 with self._lock:
207 if self._lifetime_event_count <= 0:
208 return 0.0
209 return self._lifetime_total_value / self._lifetime_event_count
210
211 def get_avg_or_lifetime_avg(self) -> float:
212 """Last period average, falling back to lifetime if no recent data."""
213 with self._lock:
214 if self._last_event_count > 0:
215 return self._last_total_value / self._last_event_count
216 if self._lifetime_event_count > 0:
217 return self._lifetime_total_value / self._lifetime_event_count
218 return 0.0
219
220 # --- Saturation ---
221
222 def get_last_event_saturation(self) -> float:
223 """Fraction of last period spent processing events."""
224 with self._lock:
225 if self._period <= 0:
226 return 0.0
227 return self._last_total_event_time / self._period
228
229 # --- Lifetime ---
230
231 def get_lifetime_periods(self) -> int:
232 """Number of complete periods since creation."""
233 elapsed = _now_ms() - self._creation_date
234 if self._period <= 0:
235 return 0
236 return elapsed // self._period
237
238 # --- Listener ---
239
240 def set_summary_listener(self, listener: Optional[RateSummaryListener]) -> None:
241 self._summary_listener = listener
242
243 def set_rate_stat(self, stat: "RateStat") -> None:
244 self._stat = stat
245
246 def get_rate_stat(self) -> Optional["RateStat"]:
247 return self._stat