A Python port of the Invisible Internet Project (I2P)
1"""NetDB throttle components — rate limiting for lookups and floods.
2
3Ported from net.i2p.router.networkdb.kademlia.NegativeLookupCache,
4FloodThrottler, and LookupThrottler.
5"""
6
7from __future__ import annotations
8
9from collections import defaultdict
10
11
12class NegativeLookupCache:
13 """Track failed lookups to avoid redundant retries.
14
15 After MAX_FAILS consecutive failures for a key, further lookups
16 are skipped. Bad destinations are permanently cached.
17 """
18
19 MAX_FAILS = 3
20
21 def __init__(self) -> None:
22 self._fail_counts: dict[bytes, int] = defaultdict(int)
23 self._bad_destinations: set[bytes] = set()
24
25 def lookup_failed(self, key: bytes) -> None:
26 """Record a lookup failure."""
27 self._fail_counts[key] += 1
28
29 def should_skip(self, key: bytes) -> bool:
30 """True if too many failures or known bad destination."""
31 if key in self._bad_destinations:
32 return True
33 return self._fail_counts.get(key, 0) >= self.MAX_FAILS
34
35 def add_bad_destination(self, key: bytes) -> None:
36 """Permanently mark a destination as bad."""
37 self._bad_destinations.add(key)
38
39 def is_bad_destination(self, key: bytes) -> bool:
40 return key in self._bad_destinations
41
42 def clear(self) -> None:
43 """Reset fail counters only (bad destinations persist)."""
44 self._fail_counts.clear()
45
46 @property
47 def cached_count(self) -> int:
48 return len(self._fail_counts) + len(self._bad_destinations)
49
50
51class FloodThrottler:
52 """Rate limit flood store requests per peer.
53
54 Allows MAX_FLOODS stores per peer per window. Resets on clear().
55 """
56
57 MAX_FLOODS = 3
58
59 def __init__(self) -> None:
60 self._counts: dict[bytes, int] = defaultdict(int)
61
62 def should_throttle(self, peer_hash: bytes) -> bool:
63 """Increment counter and return True if over limit."""
64 self._counts[peer_hash] += 1
65 return self._counts[peer_hash] > self.MAX_FLOODS
66
67 def clear(self) -> None:
68 """Reset all counters."""
69 self._counts.clear()
70
71
72class LookupThrottler:
73 """Prevent duplicate lookup requests on the same tunnel.
74
75 Tracks (key, reply_tunnel_id, reply_gateway) tuples and
76 throttles duplicates within a window.
77 """
78
79 def __init__(self) -> None:
80 self._seen: set[tuple[bytes, int, bytes]] = set()
81
82 def should_throttle(self, key: bytes, reply_tunnel_id: int, reply_gateway: bytes) -> bool:
83 """Return True if this exact lookup was already seen."""
84 entry = (key, reply_tunnel_id, reply_gateway)
85 if entry in self._seen:
86 return True
87 self._seen.add(entry)
88 return False
89
90 def clear(self) -> None:
91 """Reset all tracked lookups."""
92 self._seen.clear()