A Python port of the Invisible Internet Project (I2P)
at main 233 lines 6.6 kB view raw
1"""Tests for KBucketSet — Kademlia routing table operations.""" 2 3import os 4 5import pytest 6 7from i2p_kademlia.kbucket import KBucket, KBucketTrimmer 8from i2p_kademlia.kbucket_set import KBucketSet 9 10 11def _random_key(length=32): 12 return os.urandom(length) 13 14 15def _key_at_distance(us, bit_position): 16 """Create a key that differs from 'us' at the specified bit position.""" 17 result = bytearray(us) 18 byte_idx = len(us) - 1 - (bit_position // 8) 19 bit_idx = bit_position % 8 20 if 0 <= byte_idx < len(us): 21 result[byte_idx] ^= (1 << bit_idx) 22 return bytes(result) 23 24 25class TestKBucketSetCreation: 26 def test_create_default(self): 27 us = _random_key() 28 ks = KBucketSet(us) 29 assert ks.size() == 0 30 buckets = ks.get_buckets() 31 assert len(buckets) == 1 # starts with one bucket 32 33 def test_custom_k(self): 34 us = _random_key() 35 ks = KBucketSet(us, max_per_bucket=10) 36 assert ks.size() == 0 37 38 def test_initial_bucket_range(self): 39 us = _random_key() 40 ks = KBucketSet(us) 41 buckets = ks.get_buckets() 42 assert buckets[0].range_begin == 0 43 assert buckets[0].range_end == 255 # 32 bytes * 8 bits - 1 44 45 46class TestKBucketSetAdd: 47 def test_add_single(self): 48 us = _random_key() 49 ks = KBucketSet(us) 50 peer = _random_key() 51 assert ks.add(peer) is True 52 assert ks.size() == 1 53 54 def test_add_self_rejected(self): 55 us = _random_key() 56 ks = KBucketSet(us) 57 assert ks.add(us) is False 58 assert ks.size() == 0 59 60 def test_add_duplicate(self): 61 us = _random_key() 62 ks = KBucketSet(us) 63 peer = _random_key() 64 ks.add(peer) 65 ks.add(peer) # duplicate 66 assert ks.size() == 1 67 68 def test_add_many(self): 69 us = _random_key() 70 ks = KBucketSet(us, max_per_bucket=5) 71 peers = [_random_key() for _ in range(30)] 72 for p in peers: 73 ks.add(p) 74 # Some may be rejected by trimmer, but most should be added 75 assert ks.size() > 0 76 77 def test_add_triggers_split(self): 78 us = _random_key() 79 ks = KBucketSet(us, max_per_bucket=3) 80 # Add enough peers to trigger splits 81 for _ in range(20): 82 ks.add(_random_key()) 83 buckets = ks.get_buckets() 84 assert len(buckets) > 1 85 86 87class TestKBucketSetRemove: 88 def test_remove_existing(self): 89 us = _random_key() 90 ks = KBucketSet(us) 91 peer = _random_key() 92 ks.add(peer) 93 assert ks.remove(peer) is True 94 assert ks.size() == 0 95 96 def test_remove_nonexistent(self): 97 us = _random_key() 98 ks = KBucketSet(us) 99 assert ks.remove(_random_key()) is False 100 101 def test_remove_self(self): 102 us = _random_key() 103 ks = KBucketSet(us) 104 assert ks.remove(us) is False 105 106 107class TestKBucketSetClear: 108 def test_clear(self): 109 us = _random_key() 110 ks = KBucketSet(us) 111 for _ in range(10): 112 ks.add(_random_key()) 113 assert ks.size() > 0 114 ks.clear() 115 assert ks.size() == 0 116 117 118class TestKBucketSetGetAll: 119 def test_get_all_empty(self): 120 us = _random_key() 121 ks = KBucketSet(us) 122 assert ks.get_all() == set() 123 124 def test_get_all_with_entries(self): 125 us = _random_key() 126 ks = KBucketSet(us) 127 peers = set() 128 for _ in range(5): 129 p = _random_key() 130 ks.add(p) 131 peers.add(p) 132 assert ks.get_all() == peers 133 134 def test_get_all_with_ignore(self): 135 us = _random_key() 136 ks = KBucketSet(us) 137 p1 = _random_key() 138 p2 = _random_key() 139 ks.add(p1) 140 ks.add(p2) 141 result = ks.get_all(to_ignore={p1}) 142 assert p1 not in result 143 assert p2 in result 144 145 146class TestKBucketSetGetClosest: 147 def test_get_closest_empty(self): 148 us = _random_key() 149 ks = KBucketSet(us) 150 assert ks.get_closest(_random_key(), 5) == [] 151 152 def test_get_closest_returns_sorted(self): 153 us = b"\x00" * 32 154 ks = KBucketSet(us, max_per_bucket=20) 155 156 # Create peers at known distances 157 p_close = _key_at_distance(us, 1) # distance = 2 158 p_mid = _key_at_distance(us, 4) # distance = 16 159 p_far = _key_at_distance(us, 8) # distance = 256 160 161 ks.add(p_close) 162 ks.add(p_mid) 163 ks.add(p_far) 164 165 target = us # looking for closest to us 166 closest = ks.get_closest(target, 3) 167 assert len(closest) == 3 168 # Closest should come first 169 assert closest[0] == p_close 170 171 def test_get_closest_respects_max(self): 172 us = _random_key() 173 ks = KBucketSet(us) 174 for _ in range(10): 175 ks.add(_random_key()) 176 result = ks.get_closest(_random_key(), 3) 177 assert len(result) <= 3 178 179 def test_get_closest_with_ignore(self): 180 us = _random_key() 181 ks = KBucketSet(us) 182 p1 = _random_key() 183 p2 = _random_key() 184 ks.add(p1) 185 ks.add(p2) 186 result = ks.get_closest(_random_key(), 10, to_ignore=[p1]) 187 assert p1 not in result 188 189 def test_get_closest_to_us(self): 190 us = _random_key() 191 ks = KBucketSet(us) 192 for _ in range(5): 193 ks.add(_random_key()) 194 result = ks.get_closest_to_us(3) 195 assert len(result) <= 3 196 197 198class TestKBucketSetExploreKeys: 199 def test_explore_keys_all_stale(self): 200 us = _random_key() 201 ks = KBucketSet(us) 202 # All buckets are stale (max_age_ms=0) 203 keys = ks.get_explore_keys(max_age_ms=0) 204 assert len(keys) >= 1 205 206 def test_explore_keys_fresh(self): 207 us = _random_key() 208 ks = KBucketSet(us) 209 # Very long max age — nothing should be stale but buckets are sparse 210 keys = ks.get_explore_keys(max_age_ms=999999999) 211 # Still returns keys because buckets are sparse (< 75% full) 212 assert len(keys) >= 1 213 214 def test_explore_key_valid_length(self): 215 us = _random_key(32) 216 ks = KBucketSet(us) 217 keys = ks.get_explore_keys(max_age_ms=0) 218 for k in keys: 219 assert len(k) == 32 220 221 222class TestKBucketSetSplit: 223 def test_split_creates_two(self): 224 us = b"\x00" * 32 225 ks = KBucketSet(us, max_per_bucket=2) 226 # Add enough to force a split 227 for _ in range(10): 228 ks.add(_random_key()) 229 buckets = ks.get_buckets() 230 assert len(buckets) >= 2 231 # Ranges should be contiguous 232 for i in range(len(buckets) - 1): 233 assert buckets[i].range_end + 1 == buckets[i + 1].range_begin