A Python port of the Invisible Internet Project (I2P)
at main 247 lines 8.0 kB view raw
1"""Rate — rolling rate statistics over configurable time periods. 2 3Ported from net.i2p.stat.Rate. 4""" 5 6import threading 7import time as _time 8from typing import Optional, TYPE_CHECKING 9 10if TYPE_CHECKING: 11 from i2p_stat.rate_stat import RateStat 12 from i2p_stat.rate_averages import RateAverages 13 14 15def _now_ms() -> int: 16 return int(_time.time() * 1000) 17 18 19class RateSummaryListener: 20 """Callback for Rate coalesce events.""" 21 22 def add( 23 self, 24 total_value: float, 25 event_count: int, 26 total_event_time: float, 27 period: int, 28 ) -> None: 29 pass 30 31 32class Rate: 33 """Track event count and total value over a configurable period. 34 35 Data is accumulated in a "current" period. When coalesce() is called, 36 current is shifted to "last" and current is reset. The "extreme" 37 tracks the highest-value period ever seen. 38 """ 39 40 _SLACK = 2000 # ms tolerance for coalesce timing 41 42 def __init__(self, period: int) -> None: 43 if period <= 0: 44 raise ValueError("period must be positive") 45 self._period = period 46 self._lock = threading.Lock() 47 48 # Current incomplete period 49 self._current_total_value: float = 0.0 50 self._current_event_count: int = 0 51 self._current_total_event_time: float = 0.0 52 53 # Last complete period 54 self._last_total_value: float = 0.0 55 self._last_event_count: int = 0 56 self._last_total_event_time: float = 0.0 57 58 # Extreme (highest) period 59 self._extreme_total_value: float = 0.0 60 self._extreme_event_count: int = 0 61 self._extreme_total_event_time: float = 0.0 62 63 # Lifetime totals 64 self._lifetime_total_value: float = 0.0 65 self._lifetime_event_count: int = 0 66 self._lifetime_total_event_time: float = 0.0 67 68 now = _now_ms() 69 self._creation_date: int = now 70 self._last_coalesce_date: int = now 71 72 self._summary_listener: Optional[RateSummaryListener] = None 73 self._stat: Optional["RateStat"] = None 74 75 @property 76 def period(self) -> int: 77 return self._period 78 79 @property 80 def creation_date(self) -> int: 81 return self._creation_date 82 83 @property 84 def last_coalesce_date(self) -> int: 85 with self._lock: 86 return self._last_coalesce_date 87 88 def add_data(self, value: int, event_duration: int = 0) -> None: 89 """Record an event with the given value and optional duration.""" 90 with self._lock: 91 self._current_total_value += value 92 self._current_event_count += 1 93 self._current_total_event_time += event_duration 94 self._lifetime_total_value += value 95 self._lifetime_event_count += 1 96 self._lifetime_total_event_time += event_duration 97 98 def coalesce(self) -> None: 99 """Shift current period to last, reset current.""" 100 now = _now_ms() 101 listener = None 102 corrected_value = 0.0 103 104 with self._lock: 105 measured = now - self._last_coalesce_date 106 if measured < self._period - self._SLACK: 107 return 108 109 period_factor = measured / self._period if self._period > 0 else 1.0 110 111 # Correct values by period factor 112 self._last_total_value = self._current_total_value / period_factor 113 self._last_event_count = round( 114 self._current_event_count / period_factor + 0.499999 115 ) if period_factor != 1.0 else self._current_event_count 116 self._last_total_event_time = ( 117 self._current_total_event_time / period_factor 118 ) 119 120 # Update extreme if this period is highest 121 if self._last_total_value >= self._extreme_total_value: 122 self._extreme_total_value = self._last_total_value 123 self._extreme_event_count = self._last_event_count 124 self._extreme_total_event_time = self._last_total_event_time 125 126 # Prepare listener data 127 if self._summary_listener is not None: 128 listener = self._summary_listener 129 if self._last_event_count > 0: 130 corrected_value = ( 131 self._current_total_value 132 * self._last_event_count 133 / self._current_event_count 134 ) 135 else: 136 corrected_value = 0.0 137 138 # Reset current 139 self._current_total_value = 0.0 140 self._current_event_count = 0 141 self._current_total_event_time = 0.0 142 self._last_coalesce_date = now 143 144 if listener is not None: 145 listener.add( 146 corrected_value, 147 self._last_event_count, 148 self._last_total_event_time, 149 self._period, 150 ) 151 152 # --- Getters --- 153 154 def get_current_total_value(self) -> float: 155 with self._lock: 156 return self._current_total_value 157 158 def get_current_event_count(self) -> int: 159 with self._lock: 160 return self._current_event_count 161 162 def get_last_total_value(self) -> float: 163 with self._lock: 164 return self._last_total_value 165 166 def get_last_event_count(self) -> int: 167 with self._lock: 168 return self._last_event_count 169 170 def get_last_total_event_time(self) -> float: 171 with self._lock: 172 return self._last_total_event_time 173 174 def get_extreme_total_value(self) -> float: 175 with self._lock: 176 return self._extreme_total_value 177 178 def get_extreme_event_count(self) -> int: 179 with self._lock: 180 return self._extreme_event_count 181 182 def get_lifetime_total_value(self) -> float: 183 with self._lock: 184 return self._lifetime_total_value 185 186 def get_lifetime_event_count(self) -> int: 187 with self._lock: 188 return self._lifetime_event_count 189 190 # --- Averages --- 191 192 def get_average_value(self) -> float: 193 """Average value per event in the last complete period.""" 194 with self._lock: 195 if self._last_event_count <= 0: 196 return 0.0 197 return self._last_total_value / self._last_event_count 198 199 def get_extreme_average_value(self) -> float: 200 with self._lock: 201 if self._extreme_event_count <= 0: 202 return 0.0 203 return self._extreme_total_value / self._extreme_event_count 204 205 def get_lifetime_average_value(self) -> float: 206 with self._lock: 207 if self._lifetime_event_count <= 0: 208 return 0.0 209 return self._lifetime_total_value / self._lifetime_event_count 210 211 def get_avg_or_lifetime_avg(self) -> float: 212 """Last period average, falling back to lifetime if no recent data.""" 213 with self._lock: 214 if self._last_event_count > 0: 215 return self._last_total_value / self._last_event_count 216 if self._lifetime_event_count > 0: 217 return self._lifetime_total_value / self._lifetime_event_count 218 return 0.0 219 220 # --- Saturation --- 221 222 def get_last_event_saturation(self) -> float: 223 """Fraction of last period spent processing events.""" 224 with self._lock: 225 if self._period <= 0: 226 return 0.0 227 return self._last_total_event_time / self._period 228 229 # --- Lifetime --- 230 231 def get_lifetime_periods(self) -> int: 232 """Number of complete periods since creation.""" 233 elapsed = _now_ms() - self._creation_date 234 if self._period <= 0: 235 return 0 236 return elapsed // self._period 237 238 # --- Listener --- 239 240 def set_summary_listener(self, listener: Optional[RateSummaryListener]) -> None: 241 self._summary_listener = listener 242 243 def set_rate_stat(self, stat: "RateStat") -> None: 244 self._stat = stat 245 246 def get_rate_stat(self) -> Optional["RateStat"]: 247 return self._stat