"""KBucket — interface and implementation for Kademlia routing table buckets. Ported from net.i2p.kademlia.KBucket and KBucketImpl. """ import threading import time as _time from typing import Callable, Generic, Set, TypeVar def _now_ms() -> int: return int(_time.time() * 1000) T = TypeVar("T", bound=bytes) class KBucket: """A Kademlia bucket holding entries at a specific XOR distance range. range_begin/range_end define the bit positions of the highest differing bit from "us" for entries in this bucket. """ def __init__( self, begin: int, end: int, max_size: int, trimmer: "KBucketTrimmer | None" = None, ) -> None: self._begin = begin self._end = end self._max = max_size self._trimmer = trimmer self._entries: Set[bytes] = set() self._last_changed: int = _now_ms() self._lock = threading.RLock() @property def range_begin(self) -> int: return self._begin @property def range_end(self) -> int: return self._end @property def key_count(self) -> int: return len(self._entries) @property def last_changed(self) -> int: return self._last_changed def set_last_changed(self) -> None: self._last_changed = _now_ms() def add(self, key: bytes) -> bool: """Add an entry. Returns True if added (new or already present).""" with self._lock: if key in self._entries: self._last_changed = _now_ms() return True if self._begin == self._end and len(self._entries) >= self._max: # Bucket can't split, try trimming if self._trimmer is not None: if not self._trimmer.trim(self, key): return False else: return False self._entries.add(key) self._last_changed = _now_ms() return True def remove(self, key: bytes) -> bool: """Remove an entry. Returns True if it existed.""" with self._lock: if key in self._entries: self._entries.discard(key) return True return False def get_entries(self) -> Set[bytes]: """Return a copy of all entries.""" with self._lock: return set(self._entries) def clear(self) -> None: with self._lock: self._entries.clear() # Alias for compatibility KBucketImpl = KBucket class KBucketTrimmer: """Interface for bucket eviction strategy.""" def trim(self, bucket: KBucket, to_add: bytes) -> bool: """Called when bucket is full. Return True to add, False to reject.""" raise NotImplementedError