"""StatManager — central statistics repository. Ported from net.i2p.stat.StatManager. """ import threading from typing import Dict, List, Optional, Set from i2p_stat.rate_stat import RateStat from i2p_stat.frequency_stat import FrequencyStat class StatManager: """Central registry for all RateStats and FrequencyStats. By default, only "required" stats are created. Set stat_full=True to allow all stats (used in router context for full monitoring). """ _FREQ_COALESCE_RATE = 9 def __init__(self, stat_full: bool = False) -> None: self._stat_full = stat_full self._lock = threading.Lock() self._rate_stats: Dict[str, RateStat] = {} self._frequency_stats: Dict[str, FrequencyStat] = {} self._coalesce_counter = 0 def shutdown(self) -> None: with self._lock: self._rate_stats.clear() self._frequency_stats.clear() # --- Rate Stats --- def create_rate_stat( self, name: str, description: str, group: str, periods: List[int], ) -> None: """Create a rate stat if stat_full is True.""" if not self._stat_full: return self.create_required_rate_stat(name, description, group, periods) def create_required_rate_stat( self, name: str, description: str, group: str, periods: List[int], ) -> None: """Always create the rate stat.""" if name not in self._rate_stats: self._rate_stats[name] = RateStat(name, description, group, periods) def remove_rate_stat(self, name: str) -> None: self._rate_stats.pop(name, None) def add_rate_data( self, name: str, data: int, event_duration: int = 0 ) -> None: stat = self._rate_stats.get(name) if stat is not None: stat.add_data(data, event_duration) def get_rate(self, name: str) -> Optional[RateStat]: return self._rate_stats.get(name) def get_rate_names(self) -> Set[str]: return set(self._rate_stats.keys()) def is_rate(self, name: str) -> bool: return name in self._rate_stats # --- Frequency Stats --- def create_frequency_stat( self, name: str, description: str, group: str, periods: List[int], ) -> None: if not self._stat_full: return self.create_required_frequency_stat(name, description, group, periods) def create_required_frequency_stat( self, name: str, description: str, group: str, periods: List[int], ) -> None: if name not in self._frequency_stats: self._frequency_stats[name] = FrequencyStat( name, description, group, periods ) def update_frequency(self, name: str) -> None: stat = self._frequency_stats.get(name) if stat is not None: stat.event_occurred() def get_frequency(self, name: str) -> Optional[FrequencyStat]: return self._frequency_stats.get(name) def get_frequency_names(self) -> Set[str]: return set(self._frequency_stats.keys()) def is_frequency(self, name: str) -> bool: return name in self._frequency_stats # --- Coalesce --- def coalesce_stats(self) -> None: """Coalesce all rate stats, and frequency stats every Nth call.""" with self._lock: self._coalesce_counter += 1 do_freq = self._coalesce_counter % self._FREQ_COALESCE_RATE == 0 for rs in list(self._rate_stats.values()): rs.coalesce_stats() if do_freq: for fs in list(self._frequency_stats.values()): fs.coalesce_stats() # --- Grouping --- def get_stats_by_group(self) -> Dict[str, Set[str]]: result: Dict[str, Set[str]] = {} for name, rs in self._rate_stats.items(): result.setdefault(rs.group_name, set()).add(name) for name, fs in self._frequency_stats.items(): result.setdefault(fs.group_name, set()).add(name) return result