A Python port of the Invisible Internet Project (I2P)
1"""KBucket — interface and implementation for Kademlia routing table buckets.
2
3Ported from net.i2p.kademlia.KBucket and KBucketImpl.
4"""
5
6import threading
7import time as _time
8from typing import Callable, Generic, Set, TypeVar
9
10
11def _now_ms() -> int:
12 return int(_time.time() * 1000)
13
14
15T = TypeVar("T", bound=bytes)
16
17
18class KBucket:
19 """A Kademlia bucket holding entries at a specific XOR distance range.
20
21 range_begin/range_end define the bit positions of the highest differing
22 bit from "us" for entries in this bucket.
23 """
24
25 def __init__(
26 self,
27 begin: int,
28 end: int,
29 max_size: int,
30 trimmer: "KBucketTrimmer | None" = None,
31 ) -> None:
32 self._begin = begin
33 self._end = end
34 self._max = max_size
35 self._trimmer = trimmer
36 self._entries: Set[bytes] = set()
37 self._last_changed: int = _now_ms()
38 self._lock = threading.RLock()
39
40 @property
41 def range_begin(self) -> int:
42 return self._begin
43
44 @property
45 def range_end(self) -> int:
46 return self._end
47
48 @property
49 def key_count(self) -> int:
50 return len(self._entries)
51
52 @property
53 def last_changed(self) -> int:
54 return self._last_changed
55
56 def set_last_changed(self) -> None:
57 self._last_changed = _now_ms()
58
59 def add(self, key: bytes) -> bool:
60 """Add an entry. Returns True if added (new or already present)."""
61 with self._lock:
62 if key in self._entries:
63 self._last_changed = _now_ms()
64 return True
65 if self._begin == self._end and len(self._entries) >= self._max:
66 # Bucket can't split, try trimming
67 if self._trimmer is not None:
68 if not self._trimmer.trim(self, key):
69 return False
70 else:
71 return False
72 self._entries.add(key)
73 self._last_changed = _now_ms()
74 return True
75
76 def remove(self, key: bytes) -> bool:
77 """Remove an entry. Returns True if it existed."""
78 with self._lock:
79 if key in self._entries:
80 self._entries.discard(key)
81 return True
82 return False
83
84 def get_entries(self) -> Set[bytes]:
85 """Return a copy of all entries."""
86 with self._lock:
87 return set(self._entries)
88
89 def clear(self) -> None:
90 with self._lock:
91 self._entries.clear()
92
93
94# Alias for compatibility
95KBucketImpl = KBucket
96
97
98class KBucketTrimmer:
99 """Interface for bucket eviction strategy."""
100
101 def trim(self, bucket: KBucket, to_add: bytes) -> bool:
102 """Called when bucket is full. Return True to add, False to reject."""
103 raise NotImplementedError