"""Tests for KBucketSet — Kademlia routing table operations.""" import os import pytest from i2p_kademlia.kbucket import KBucket, KBucketTrimmer from i2p_kademlia.kbucket_set import KBucketSet def _random_key(length=32): return os.urandom(length) def _key_at_distance(us, bit_position): """Create a key that differs from 'us' at the specified bit position.""" result = bytearray(us) byte_idx = len(us) - 1 - (bit_position // 8) bit_idx = bit_position % 8 if 0 <= byte_idx < len(us): result[byte_idx] ^= (1 << bit_idx) return bytes(result) class TestKBucketSetCreation: def test_create_default(self): us = _random_key() ks = KBucketSet(us) assert ks.size() == 0 buckets = ks.get_buckets() assert len(buckets) == 1 # starts with one bucket def test_custom_k(self): us = _random_key() ks = KBucketSet(us, max_per_bucket=10) assert ks.size() == 0 def test_initial_bucket_range(self): us = _random_key() ks = KBucketSet(us) buckets = ks.get_buckets() assert buckets[0].range_begin == 0 assert buckets[0].range_end == 255 # 32 bytes * 8 bits - 1 class TestKBucketSetAdd: def test_add_single(self): us = _random_key() ks = KBucketSet(us) peer = _random_key() assert ks.add(peer) is True assert ks.size() == 1 def test_add_self_rejected(self): us = _random_key() ks = KBucketSet(us) assert ks.add(us) is False assert ks.size() == 0 def test_add_duplicate(self): us = _random_key() ks = KBucketSet(us) peer = _random_key() ks.add(peer) ks.add(peer) # duplicate assert ks.size() == 1 def test_add_many(self): us = _random_key() ks = KBucketSet(us, max_per_bucket=5) peers = [_random_key() for _ in range(30)] for p in peers: ks.add(p) # Some may be rejected by trimmer, but most should be added assert ks.size() > 0 def test_add_triggers_split(self): us = _random_key() ks = KBucketSet(us, max_per_bucket=3) # Add enough peers to trigger splits for _ in range(20): ks.add(_random_key()) buckets = ks.get_buckets() assert len(buckets) > 1 class TestKBucketSetRemove: def test_remove_existing(self): us = _random_key() ks = KBucketSet(us) peer = _random_key() ks.add(peer) assert ks.remove(peer) is True assert ks.size() == 0 def test_remove_nonexistent(self): us = _random_key() ks = KBucketSet(us) assert ks.remove(_random_key()) is False def test_remove_self(self): us = _random_key() ks = KBucketSet(us) assert ks.remove(us) is False class TestKBucketSetClear: def test_clear(self): us = _random_key() ks = KBucketSet(us) for _ in range(10): ks.add(_random_key()) assert ks.size() > 0 ks.clear() assert ks.size() == 0 class TestKBucketSetGetAll: def test_get_all_empty(self): us = _random_key() ks = KBucketSet(us) assert ks.get_all() == set() def test_get_all_with_entries(self): us = _random_key() ks = KBucketSet(us) peers = set() for _ in range(5): p = _random_key() ks.add(p) peers.add(p) assert ks.get_all() == peers def test_get_all_with_ignore(self): us = _random_key() ks = KBucketSet(us) p1 = _random_key() p2 = _random_key() ks.add(p1) ks.add(p2) result = ks.get_all(to_ignore={p1}) assert p1 not in result assert p2 in result class TestKBucketSetGetClosest: def test_get_closest_empty(self): us = _random_key() ks = KBucketSet(us) assert ks.get_closest(_random_key(), 5) == [] def test_get_closest_returns_sorted(self): us = b"\x00" * 32 ks = KBucketSet(us, max_per_bucket=20) # Create peers at known distances p_close = _key_at_distance(us, 1) # distance = 2 p_mid = _key_at_distance(us, 4) # distance = 16 p_far = _key_at_distance(us, 8) # distance = 256 ks.add(p_close) ks.add(p_mid) ks.add(p_far) target = us # looking for closest to us closest = ks.get_closest(target, 3) assert len(closest) == 3 # Closest should come first assert closest[0] == p_close def test_get_closest_respects_max(self): us = _random_key() ks = KBucketSet(us) for _ in range(10): ks.add(_random_key()) result = ks.get_closest(_random_key(), 3) assert len(result) <= 3 def test_get_closest_with_ignore(self): us = _random_key() ks = KBucketSet(us) p1 = _random_key() p2 = _random_key() ks.add(p1) ks.add(p2) result = ks.get_closest(_random_key(), 10, to_ignore=[p1]) assert p1 not in result def test_get_closest_to_us(self): us = _random_key() ks = KBucketSet(us) for _ in range(5): ks.add(_random_key()) result = ks.get_closest_to_us(3) assert len(result) <= 3 class TestKBucketSetExploreKeys: def test_explore_keys_all_stale(self): us = _random_key() ks = KBucketSet(us) # All buckets are stale (max_age_ms=0) keys = ks.get_explore_keys(max_age_ms=0) assert len(keys) >= 1 def test_explore_keys_fresh(self): us = _random_key() ks = KBucketSet(us) # Very long max age — nothing should be stale but buckets are sparse keys = ks.get_explore_keys(max_age_ms=999999999) # Still returns keys because buckets are sparse (< 75% full) assert len(keys) >= 1 def test_explore_key_valid_length(self): us = _random_key(32) ks = KBucketSet(us) keys = ks.get_explore_keys(max_age_ms=0) for k in keys: assert len(k) == 32 class TestKBucketSetSplit: def test_split_creates_two(self): us = b"\x00" * 32 ks = KBucketSet(us, max_per_bucket=2) # Add enough to force a split for _ in range(10): ks.add(_random_key()) buckets = ks.get_buckets() assert len(buckets) >= 2 # Ranges should be contiguous for i in range(len(buckets) - 1): assert buckets[i].range_end + 1 == buckets[i + 1].range_begin