A Python port of the Invisible Internet Project (I2P)
at main 134 lines 4.4 kB view raw
1"""Banlist — temporary and permanent peer banning. 2 3Ported from net.i2p.router.Banlist. 4 5Tracks peers that should be avoided for routing, either globally 6or for specific transports. Supports time-based expiration. 7""" 8 9from __future__ import annotations 10 11import time 12from dataclasses import dataclass 13 14 15@dataclass 16class BanlistEntry: 17 """A single ban entry.""" 18 peer_hash: bytes 19 expiration_ms: int # 0 = permanent 20 transport: str | None # None = all transports 21 reason: str 22 cause: str | None = None 23 24 25class Banlist: 26 """Peer ban tracking with expiration and transport-specific bans.""" 27 28 DEFAULT_DURATION_MS = 3_600_000 # 1 hour 29 30 def __init__(self) -> None: 31 # Key: (peer_hash, transport_or_None) → BanlistEntry 32 self._entries: dict[tuple[bytes, str | None], BanlistEntry] = {} 33 34 def banlist_router( 35 self, 36 peer_hash: bytes, 37 reason: str, 38 duration_ms: int = DEFAULT_DURATION_MS, 39 ) -> None: 40 """Ban a peer globally (all transports).""" 41 now = int(time.time() * 1000) 42 self._entries[(peer_hash, None)] = BanlistEntry( 43 peer_hash=peer_hash, 44 expiration_ms=now + duration_ms, 45 transport=None, 46 reason=reason, 47 ) 48 49 def banlist_router_forever(self, peer_hash: bytes, reason: str) -> None: 50 """Permanently ban a peer.""" 51 self._entries[(peer_hash, None)] = BanlistEntry( 52 peer_hash=peer_hash, 53 expiration_ms=0, 54 transport=None, 55 reason=reason, 56 ) 57 58 def banlist_router_transport( 59 self, 60 peer_hash: bytes, 61 transport: str, 62 reason: str, 63 duration_ms: int = DEFAULT_DURATION_MS, 64 ) -> None: 65 """Ban a peer for a specific transport only.""" 66 now = int(time.time() * 1000) 67 self._entries[(peer_hash, transport)] = BanlistEntry( 68 peer_hash=peer_hash, 69 expiration_ms=now + duration_ms, 70 transport=transport, 71 reason=reason, 72 ) 73 74 def unbanlist_router(self, peer_hash: bytes) -> None: 75 """Remove all bans for a peer.""" 76 keys_to_remove = [k for k in self._entries if k[0] == peer_hash] 77 for k in keys_to_remove: 78 del self._entries[k] 79 80 def _is_active(self, entry: BanlistEntry) -> bool: 81 """Check if a ban entry is still active (not expired).""" 82 if entry.expiration_ms == 0: 83 return True # permanent 84 return int(time.time() * 1000) < entry.expiration_ms 85 86 def is_banlisted(self, peer_hash: bytes) -> bool: 87 """Check if peer is globally banned (transport=None).""" 88 entry = self._entries.get((peer_hash, None)) 89 if entry is None: 90 return False 91 if not self._is_active(entry): 92 del self._entries[(peer_hash, None)] 93 return False 94 return True 95 96 def is_banlisted_transport(self, peer_hash: bytes, transport: str) -> bool: 97 """Check if peer is banned for a specific transport. 98 99 Returns True if globally banned OR transport-specifically banned. 100 """ 101 # Check global ban first 102 if self.is_banlisted(peer_hash): 103 return True 104 # Check transport-specific 105 entry = self._entries.get((peer_hash, transport)) 106 if entry is None: 107 return False 108 if not self._is_active(entry): 109 del self._entries[(peer_hash, transport)] 110 return False 111 return True 112 113 def get_entry(self, peer_hash: bytes) -> BanlistEntry | None: 114 """Get the global ban entry for a peer, or None.""" 115 entry = self._entries.get((peer_hash, None)) 116 if entry and self._is_active(entry): 117 return entry 118 return None 119 120 def get_all_entries(self) -> list[BanlistEntry]: 121 """Get all active ban entries.""" 122 return [e for e in self._entries.values() if self._is_active(e)] 123 124 def cleanup_expired(self) -> int: 125 """Remove expired entries. Returns count removed.""" 126 expired = [k for k, e in self._entries.items() if not self._is_active(e)] 127 for k in expired: 128 del self._entries[k] 129 return len(expired) 130 131 @property 132 def count(self) -> int: 133 """Number of active ban entries.""" 134 return sum(1 for e in self._entries.values() if self._is_active(e))