A Python port of the Invisible Internet Project (I2P)
at main 103 lines 2.8 kB view raw
1"""KBucket — interface and implementation for Kademlia routing table buckets. 2 3Ported from net.i2p.kademlia.KBucket and KBucketImpl. 4""" 5 6import threading 7import time as _time 8from typing import Callable, Generic, Set, TypeVar 9 10 11def _now_ms() -> int: 12 return int(_time.time() * 1000) 13 14 15T = TypeVar("T", bound=bytes) 16 17 18class KBucket: 19 """A Kademlia bucket holding entries at a specific XOR distance range. 20 21 range_begin/range_end define the bit positions of the highest differing 22 bit from "us" for entries in this bucket. 23 """ 24 25 def __init__( 26 self, 27 begin: int, 28 end: int, 29 max_size: int, 30 trimmer: "KBucketTrimmer | None" = None, 31 ) -> None: 32 self._begin = begin 33 self._end = end 34 self._max = max_size 35 self._trimmer = trimmer 36 self._entries: Set[bytes] = set() 37 self._last_changed: int = _now_ms() 38 self._lock = threading.RLock() 39 40 @property 41 def range_begin(self) -> int: 42 return self._begin 43 44 @property 45 def range_end(self) -> int: 46 return self._end 47 48 @property 49 def key_count(self) -> int: 50 return len(self._entries) 51 52 @property 53 def last_changed(self) -> int: 54 return self._last_changed 55 56 def set_last_changed(self) -> None: 57 self._last_changed = _now_ms() 58 59 def add(self, key: bytes) -> bool: 60 """Add an entry. Returns True if added (new or already present).""" 61 with self._lock: 62 if key in self._entries: 63 self._last_changed = _now_ms() 64 return True 65 if self._begin == self._end and len(self._entries) >= self._max: 66 # Bucket can't split, try trimming 67 if self._trimmer is not None: 68 if not self._trimmer.trim(self, key): 69 return False 70 else: 71 return False 72 self._entries.add(key) 73 self._last_changed = _now_ms() 74 return True 75 76 def remove(self, key: bytes) -> bool: 77 """Remove an entry. Returns True if it existed.""" 78 with self._lock: 79 if key in self._entries: 80 self._entries.discard(key) 81 return True 82 return False 83 84 def get_entries(self) -> Set[bytes]: 85 """Return a copy of all entries.""" 86 with self._lock: 87 return set(self._entries) 88 89 def clear(self) -> None: 90 with self._lock: 91 self._entries.clear() 92 93 94# Alias for compatibility 95KBucketImpl = KBucket 96 97 98class KBucketTrimmer: 99 """Interface for bucket eviction strategy.""" 100 101 def trim(self, bucket: KBucket, to_add: bytes) -> bool: 102 """Called when bucket is full. Return True to add, False to reject.""" 103 raise NotImplementedError