A Python port of the Invisible Internet Project (I2P)
1"""Tests for KBucketSet — Kademlia routing table operations."""
2
3import os
4
5import pytest
6
7from i2p_kademlia.kbucket import KBucket, KBucketTrimmer
8from i2p_kademlia.kbucket_set import KBucketSet
9
10
11def _random_key(length=32):
12 return os.urandom(length)
13
14
15def _key_at_distance(us, bit_position):
16 """Create a key that differs from 'us' at the specified bit position."""
17 result = bytearray(us)
18 byte_idx = len(us) - 1 - (bit_position // 8)
19 bit_idx = bit_position % 8
20 if 0 <= byte_idx < len(us):
21 result[byte_idx] ^= (1 << bit_idx)
22 return bytes(result)
23
24
25class TestKBucketSetCreation:
26 def test_create_default(self):
27 us = _random_key()
28 ks = KBucketSet(us)
29 assert ks.size() == 0
30 buckets = ks.get_buckets()
31 assert len(buckets) == 1 # starts with one bucket
32
33 def test_custom_k(self):
34 us = _random_key()
35 ks = KBucketSet(us, max_per_bucket=10)
36 assert ks.size() == 0
37
38 def test_initial_bucket_range(self):
39 us = _random_key()
40 ks = KBucketSet(us)
41 buckets = ks.get_buckets()
42 assert buckets[0].range_begin == 0
43 assert buckets[0].range_end == 255 # 32 bytes * 8 bits - 1
44
45
46class TestKBucketSetAdd:
47 def test_add_single(self):
48 us = _random_key()
49 ks = KBucketSet(us)
50 peer = _random_key()
51 assert ks.add(peer) is True
52 assert ks.size() == 1
53
54 def test_add_self_rejected(self):
55 us = _random_key()
56 ks = KBucketSet(us)
57 assert ks.add(us) is False
58 assert ks.size() == 0
59
60 def test_add_duplicate(self):
61 us = _random_key()
62 ks = KBucketSet(us)
63 peer = _random_key()
64 ks.add(peer)
65 ks.add(peer) # duplicate
66 assert ks.size() == 1
67
68 def test_add_many(self):
69 us = _random_key()
70 ks = KBucketSet(us, max_per_bucket=5)
71 peers = [_random_key() for _ in range(30)]
72 for p in peers:
73 ks.add(p)
74 # Some may be rejected by trimmer, but most should be added
75 assert ks.size() > 0
76
77 def test_add_triggers_split(self):
78 us = _random_key()
79 ks = KBucketSet(us, max_per_bucket=3)
80 # Add enough peers to trigger splits
81 for _ in range(20):
82 ks.add(_random_key())
83 buckets = ks.get_buckets()
84 assert len(buckets) > 1
85
86
87class TestKBucketSetRemove:
88 def test_remove_existing(self):
89 us = _random_key()
90 ks = KBucketSet(us)
91 peer = _random_key()
92 ks.add(peer)
93 assert ks.remove(peer) is True
94 assert ks.size() == 0
95
96 def test_remove_nonexistent(self):
97 us = _random_key()
98 ks = KBucketSet(us)
99 assert ks.remove(_random_key()) is False
100
101 def test_remove_self(self):
102 us = _random_key()
103 ks = KBucketSet(us)
104 assert ks.remove(us) is False
105
106
107class TestKBucketSetClear:
108 def test_clear(self):
109 us = _random_key()
110 ks = KBucketSet(us)
111 for _ in range(10):
112 ks.add(_random_key())
113 assert ks.size() > 0
114 ks.clear()
115 assert ks.size() == 0
116
117
118class TestKBucketSetGetAll:
119 def test_get_all_empty(self):
120 us = _random_key()
121 ks = KBucketSet(us)
122 assert ks.get_all() == set()
123
124 def test_get_all_with_entries(self):
125 us = _random_key()
126 ks = KBucketSet(us)
127 peers = set()
128 for _ in range(5):
129 p = _random_key()
130 ks.add(p)
131 peers.add(p)
132 assert ks.get_all() == peers
133
134 def test_get_all_with_ignore(self):
135 us = _random_key()
136 ks = KBucketSet(us)
137 p1 = _random_key()
138 p2 = _random_key()
139 ks.add(p1)
140 ks.add(p2)
141 result = ks.get_all(to_ignore={p1})
142 assert p1 not in result
143 assert p2 in result
144
145
146class TestKBucketSetGetClosest:
147 def test_get_closest_empty(self):
148 us = _random_key()
149 ks = KBucketSet(us)
150 assert ks.get_closest(_random_key(), 5) == []
151
152 def test_get_closest_returns_sorted(self):
153 us = b"\x00" * 32
154 ks = KBucketSet(us, max_per_bucket=20)
155
156 # Create peers at known distances
157 p_close = _key_at_distance(us, 1) # distance = 2
158 p_mid = _key_at_distance(us, 4) # distance = 16
159 p_far = _key_at_distance(us, 8) # distance = 256
160
161 ks.add(p_close)
162 ks.add(p_mid)
163 ks.add(p_far)
164
165 target = us # looking for closest to us
166 closest = ks.get_closest(target, 3)
167 assert len(closest) == 3
168 # Closest should come first
169 assert closest[0] == p_close
170
171 def test_get_closest_respects_max(self):
172 us = _random_key()
173 ks = KBucketSet(us)
174 for _ in range(10):
175 ks.add(_random_key())
176 result = ks.get_closest(_random_key(), 3)
177 assert len(result) <= 3
178
179 def test_get_closest_with_ignore(self):
180 us = _random_key()
181 ks = KBucketSet(us)
182 p1 = _random_key()
183 p2 = _random_key()
184 ks.add(p1)
185 ks.add(p2)
186 result = ks.get_closest(_random_key(), 10, to_ignore=[p1])
187 assert p1 not in result
188
189 def test_get_closest_to_us(self):
190 us = _random_key()
191 ks = KBucketSet(us)
192 for _ in range(5):
193 ks.add(_random_key())
194 result = ks.get_closest_to_us(3)
195 assert len(result) <= 3
196
197
198class TestKBucketSetExploreKeys:
199 def test_explore_keys_all_stale(self):
200 us = _random_key()
201 ks = KBucketSet(us)
202 # All buckets are stale (max_age_ms=0)
203 keys = ks.get_explore_keys(max_age_ms=0)
204 assert len(keys) >= 1
205
206 def test_explore_keys_fresh(self):
207 us = _random_key()
208 ks = KBucketSet(us)
209 # Very long max age — nothing should be stale but buckets are sparse
210 keys = ks.get_explore_keys(max_age_ms=999999999)
211 # Still returns keys because buckets are sparse (< 75% full)
212 assert len(keys) >= 1
213
214 def test_explore_key_valid_length(self):
215 us = _random_key(32)
216 ks = KBucketSet(us)
217 keys = ks.get_explore_keys(max_age_ms=0)
218 for k in keys:
219 assert len(k) == 32
220
221
222class TestKBucketSetSplit:
223 def test_split_creates_two(self):
224 us = b"\x00" * 32
225 ks = KBucketSet(us, max_per_bucket=2)
226 # Add enough to force a split
227 for _ in range(10):
228 ks.add(_random_key())
229 buckets = ks.get_buckets()
230 assert len(buckets) >= 2
231 # Ranges should be contiguous
232 for i in range(len(buckets) - 1):
233 assert buckets[i].range_end + 1 == buckets[i + 1].range_begin