"""NetDB throttle components — rate limiting for lookups and floods. Ported from net.i2p.router.networkdb.kademlia.NegativeLookupCache, FloodThrottler, and LookupThrottler. """ from __future__ import annotations from collections import defaultdict class NegativeLookupCache: """Track failed lookups to avoid redundant retries. After MAX_FAILS consecutive failures for a key, further lookups are skipped. Bad destinations are permanently cached. """ MAX_FAILS = 3 def __init__(self) -> None: self._fail_counts: dict[bytes, int] = defaultdict(int) self._bad_destinations: set[bytes] = set() def lookup_failed(self, key: bytes) -> None: """Record a lookup failure.""" self._fail_counts[key] += 1 def should_skip(self, key: bytes) -> bool: """True if too many failures or known bad destination.""" if key in self._bad_destinations: return True return self._fail_counts.get(key, 0) >= self.MAX_FAILS def add_bad_destination(self, key: bytes) -> None: """Permanently mark a destination as bad.""" self._bad_destinations.add(key) def is_bad_destination(self, key: bytes) -> bool: return key in self._bad_destinations def clear(self) -> None: """Reset fail counters only (bad destinations persist).""" self._fail_counts.clear() @property def cached_count(self) -> int: return len(self._fail_counts) + len(self._bad_destinations) class FloodThrottler: """Rate limit flood store requests per peer. Allows MAX_FLOODS stores per peer per window. Resets on clear(). """ MAX_FLOODS = 3 def __init__(self) -> None: self._counts: dict[bytes, int] = defaultdict(int) def should_throttle(self, peer_hash: bytes) -> bool: """Increment counter and return True if over limit.""" self._counts[peer_hash] += 1 return self._counts[peer_hash] > self.MAX_FLOODS def clear(self) -> None: """Reset all counters.""" self._counts.clear() class LookupThrottler: """Prevent duplicate lookup requests on the same tunnel. Tracks (key, reply_tunnel_id, reply_gateway) tuples and throttles duplicates within a window. """ def __init__(self) -> None: self._seen: set[tuple[bytes, int, bytes]] = set() def should_throttle(self, key: bytes, reply_tunnel_id: int, reply_gateway: bytes) -> bool: """Return True if this exact lookup was already seen.""" entry = (key, reply_tunnel_id, reply_gateway) if entry in self._seen: return True self._seen.add(entry) return False def clear(self) -> None: """Reset all tracked lookups.""" self._seen.clear()