A Python port of the Invisible Internet Project (I2P)
1"""Banlist — temporary and permanent peer banning.
2
3Ported from net.i2p.router.Banlist.
4
5Tracks peers that should be avoided for routing, either globally
6or for specific transports. Supports time-based expiration.
7"""
8
9from __future__ import annotations
10
11import time
12from dataclasses import dataclass
13
14
15@dataclass
16class BanlistEntry:
17 """A single ban entry."""
18 peer_hash: bytes
19 expiration_ms: int # 0 = permanent
20 transport: str | None # None = all transports
21 reason: str
22 cause: str | None = None
23
24
25class Banlist:
26 """Peer ban tracking with expiration and transport-specific bans."""
27
28 DEFAULT_DURATION_MS = 3_600_000 # 1 hour
29
30 def __init__(self) -> None:
31 # Key: (peer_hash, transport_or_None) → BanlistEntry
32 self._entries: dict[tuple[bytes, str | None], BanlistEntry] = {}
33
34 def banlist_router(
35 self,
36 peer_hash: bytes,
37 reason: str,
38 duration_ms: int = DEFAULT_DURATION_MS,
39 ) -> None:
40 """Ban a peer globally (all transports)."""
41 now = int(time.time() * 1000)
42 self._entries[(peer_hash, None)] = BanlistEntry(
43 peer_hash=peer_hash,
44 expiration_ms=now + duration_ms,
45 transport=None,
46 reason=reason,
47 )
48
49 def banlist_router_forever(self, peer_hash: bytes, reason: str) -> None:
50 """Permanently ban a peer."""
51 self._entries[(peer_hash, None)] = BanlistEntry(
52 peer_hash=peer_hash,
53 expiration_ms=0,
54 transport=None,
55 reason=reason,
56 )
57
58 def banlist_router_transport(
59 self,
60 peer_hash: bytes,
61 transport: str,
62 reason: str,
63 duration_ms: int = DEFAULT_DURATION_MS,
64 ) -> None:
65 """Ban a peer for a specific transport only."""
66 now = int(time.time() * 1000)
67 self._entries[(peer_hash, transport)] = BanlistEntry(
68 peer_hash=peer_hash,
69 expiration_ms=now + duration_ms,
70 transport=transport,
71 reason=reason,
72 )
73
74 def unbanlist_router(self, peer_hash: bytes) -> None:
75 """Remove all bans for a peer."""
76 keys_to_remove = [k for k in self._entries if k[0] == peer_hash]
77 for k in keys_to_remove:
78 del self._entries[k]
79
80 def _is_active(self, entry: BanlistEntry) -> bool:
81 """Check if a ban entry is still active (not expired)."""
82 if entry.expiration_ms == 0:
83 return True # permanent
84 return int(time.time() * 1000) < entry.expiration_ms
85
86 def is_banlisted(self, peer_hash: bytes) -> bool:
87 """Check if peer is globally banned (transport=None)."""
88 entry = self._entries.get((peer_hash, None))
89 if entry is None:
90 return False
91 if not self._is_active(entry):
92 del self._entries[(peer_hash, None)]
93 return False
94 return True
95
96 def is_banlisted_transport(self, peer_hash: bytes, transport: str) -> bool:
97 """Check if peer is banned for a specific transport.
98
99 Returns True if globally banned OR transport-specifically banned.
100 """
101 # Check global ban first
102 if self.is_banlisted(peer_hash):
103 return True
104 # Check transport-specific
105 entry = self._entries.get((peer_hash, transport))
106 if entry is None:
107 return False
108 if not self._is_active(entry):
109 del self._entries[(peer_hash, transport)]
110 return False
111 return True
112
113 def get_entry(self, peer_hash: bytes) -> BanlistEntry | None:
114 """Get the global ban entry for a peer, or None."""
115 entry = self._entries.get((peer_hash, None))
116 if entry and self._is_active(entry):
117 return entry
118 return None
119
120 def get_all_entries(self) -> list[BanlistEntry]:
121 """Get all active ban entries."""
122 return [e for e in self._entries.values() if self._is_active(e)]
123
124 def cleanup_expired(self) -> int:
125 """Remove expired entries. Returns count removed."""
126 expired = [k for k, e in self._entries.items() if not self._is_active(e)]
127 for k in expired:
128 del self._entries[k]
129 return len(expired)
130
131 @property
132 def count(self) -> int:
133 """Number of active ban entries."""
134 return sum(1 for e in self._entries.values() if self._is_active(e))