"""Rate — rolling rate statistics over configurable time periods. Ported from net.i2p.stat.Rate. """ import threading import time as _time from typing import Optional, TYPE_CHECKING if TYPE_CHECKING: from i2p_stat.rate_stat import RateStat from i2p_stat.rate_averages import RateAverages def _now_ms() -> int: return int(_time.time() * 1000) class RateSummaryListener: """Callback for Rate coalesce events.""" def add( self, total_value: float, event_count: int, total_event_time: float, period: int, ) -> None: pass class Rate: """Track event count and total value over a configurable period. Data is accumulated in a "current" period. When coalesce() is called, current is shifted to "last" and current is reset. The "extreme" tracks the highest-value period ever seen. """ _SLACK = 2000 # ms tolerance for coalesce timing def __init__(self, period: int) -> None: if period <= 0: raise ValueError("period must be positive") self._period = period self._lock = threading.Lock() # Current incomplete period self._current_total_value: float = 0.0 self._current_event_count: int = 0 self._current_total_event_time: float = 0.0 # Last complete period self._last_total_value: float = 0.0 self._last_event_count: int = 0 self._last_total_event_time: float = 0.0 # Extreme (highest) period self._extreme_total_value: float = 0.0 self._extreme_event_count: int = 0 self._extreme_total_event_time: float = 0.0 # Lifetime totals self._lifetime_total_value: float = 0.0 self._lifetime_event_count: int = 0 self._lifetime_total_event_time: float = 0.0 now = _now_ms() self._creation_date: int = now self._last_coalesce_date: int = now self._summary_listener: Optional[RateSummaryListener] = None self._stat: Optional["RateStat"] = None @property def period(self) -> int: return self._period @property def creation_date(self) -> int: return self._creation_date @property def last_coalesce_date(self) -> int: with self._lock: return self._last_coalesce_date def add_data(self, value: int, event_duration: int = 0) -> None: """Record an event with the given value and optional duration.""" with self._lock: self._current_total_value += value self._current_event_count += 1 self._current_total_event_time += event_duration self._lifetime_total_value += value self._lifetime_event_count += 1 self._lifetime_total_event_time += event_duration def coalesce(self) -> None: """Shift current period to last, reset current.""" now = _now_ms() listener = None corrected_value = 0.0 with self._lock: measured = now - self._last_coalesce_date if measured < self._period - self._SLACK: return period_factor = measured / self._period if self._period > 0 else 1.0 # Correct values by period factor self._last_total_value = self._current_total_value / period_factor self._last_event_count = round( self._current_event_count / period_factor + 0.499999 ) if period_factor != 1.0 else self._current_event_count self._last_total_event_time = ( self._current_total_event_time / period_factor ) # Update extreme if this period is highest if self._last_total_value >= self._extreme_total_value: self._extreme_total_value = self._last_total_value self._extreme_event_count = self._last_event_count self._extreme_total_event_time = self._last_total_event_time # Prepare listener data if self._summary_listener is not None: listener = self._summary_listener if self._last_event_count > 0: corrected_value = ( self._current_total_value * self._last_event_count / self._current_event_count ) else: corrected_value = 0.0 # Reset current self._current_total_value = 0.0 self._current_event_count = 0 self._current_total_event_time = 0.0 self._last_coalesce_date = now if listener is not None: listener.add( corrected_value, self._last_event_count, self._last_total_event_time, self._period, ) # --- Getters --- def get_current_total_value(self) -> float: with self._lock: return self._current_total_value def get_current_event_count(self) -> int: with self._lock: return self._current_event_count def get_last_total_value(self) -> float: with self._lock: return self._last_total_value def get_last_event_count(self) -> int: with self._lock: return self._last_event_count def get_last_total_event_time(self) -> float: with self._lock: return self._last_total_event_time def get_extreme_total_value(self) -> float: with self._lock: return self._extreme_total_value def get_extreme_event_count(self) -> int: with self._lock: return self._extreme_event_count def get_lifetime_total_value(self) -> float: with self._lock: return self._lifetime_total_value def get_lifetime_event_count(self) -> int: with self._lock: return self._lifetime_event_count # --- Averages --- def get_average_value(self) -> float: """Average value per event in the last complete period.""" with self._lock: if self._last_event_count <= 0: return 0.0 return self._last_total_value / self._last_event_count def get_extreme_average_value(self) -> float: with self._lock: if self._extreme_event_count <= 0: return 0.0 return self._extreme_total_value / self._extreme_event_count def get_lifetime_average_value(self) -> float: with self._lock: if self._lifetime_event_count <= 0: return 0.0 return self._lifetime_total_value / self._lifetime_event_count def get_avg_or_lifetime_avg(self) -> float: """Last period average, falling back to lifetime if no recent data.""" with self._lock: if self._last_event_count > 0: return self._last_total_value / self._last_event_count if self._lifetime_event_count > 0: return self._lifetime_total_value / self._lifetime_event_count return 0.0 # --- Saturation --- def get_last_event_saturation(self) -> float: """Fraction of last period spent processing events.""" with self._lock: if self._period <= 0: return 0.0 return self._last_total_event_time / self._period # --- Lifetime --- def get_lifetime_periods(self) -> int: """Number of complete periods since creation.""" elapsed = _now_ms() - self._creation_date if self._period <= 0: return 0 return elapsed // self._period # --- Listener --- def set_summary_listener(self, listener: Optional[RateSummaryListener]) -> None: self._summary_listener = listener def set_rate_stat(self, stat: "RateStat") -> None: self._stat = stat def get_rate_stat(self) -> Optional["RateStat"]: return self._stat