"""Tests for NetDB Kademlia lookup/store operations and enhanced NetDB operations.""" import os import time import pytest def _make_key(prefix_byte: int) -> bytes: """Create a deterministic 32-byte key with a known prefix for XOR distance testing.""" return bytes([prefix_byte]) + b'\x00' * 31 class TestSearchOperation: def test_finds_data_at_known_peer(self): from i2p_netdb.operations import SearchOperation target = os.urandom(32) peer_hash = os.urandom(32) known_peers = {peer_hash: b"peer_addr"} op = SearchOperation(target_key=target, known_peers=known_peers) to_query = op.start() assert peer_hash in to_query found_data = b"the data we wanted" next_peers = op.on_reply(peer_hash, found_data=found_data) assert op.is_complete() assert op.get_result() == found_data assert next_peers == [] def test_iterative_discovery(self): from i2p_netdb.operations import SearchOperation target = _make_key(0x00) # Start with a distant peer peer_a = _make_key(0xFF) # peer_a tells us about peer_b, which is closer peer_b = _make_key(0x01) # peer_b has the data known_peers = {peer_a: b"addr_a"} op = SearchOperation(target_key=target, known_peers=known_peers) to_query = op.start() assert peer_a in to_query # peer_a doesn't have data, but knows peer_b next_peers = op.on_reply(peer_a, found_data=None, closer_peers={peer_b: b"addr_b"}) assert peer_b in next_peers assert peer_a not in next_peers # already queried # peer_b has the data op.on_reply(peer_b, found_data=b"target_data") assert op.is_complete() assert op.get_result() == b"target_data" def test_exhausted_search_returns_none(self): from i2p_netdb.operations import SearchOperation target = os.urandom(32) peer_a = os.urandom(32) peer_b = os.urandom(32) op = SearchOperation(target_key=target, known_peers={peer_a: b"a", peer_b: b"b"}) op.start() op.on_reply(peer_a, found_data=None) op.on_reply(peer_b, found_data=None) assert op.is_complete() assert op.get_result() is None def test_xor_distance_ordering(self): from i2p_netdb.operations import SearchOperation target = _make_key(0x00) # Create peers at known distances close_peer = _make_key(0x01) # distance 1 mid_peer = _make_key(0x0F) # distance 15 far_peer = _make_key(0xFF) # distance 255 known_peers = { far_peer: b"far", close_peer: b"close", mid_peer: b"mid", } op = SearchOperation(target_key=target, known_peers=known_peers, k=2) to_query = op.start() # Should return the k=2 closest peers assert len(to_query) == 2 assert close_peer in to_query assert mid_peer in to_query assert far_peer not in to_query def test_already_queried_peers_not_requeried(self): from i2p_netdb.operations import SearchOperation target = os.urandom(32) peer_a = os.urandom(32) peer_b = os.urandom(32) op = SearchOperation(target_key=target, known_peers={peer_a: b"a", peer_b: b"b"}) op.start() # Query peer_a, it returns peer_b (already known) as closer next_peers = op.on_reply(peer_a, found_data=None, closer_peers={peer_b: b"b"}) # peer_b should appear since not yet queried # But peer_a should not reappear assert peer_a not in next_peers assert peer_a in op.get_queried() # Now query peer_b, it returns peer_a again next_peers = op.on_reply(peer_b, found_data=None, closer_peers={peer_a: b"a"}) assert peer_a not in next_peers # already queried assert peer_b in op.get_queried() def test_is_complete_when_found(self): from i2p_netdb.operations import SearchOperation target = os.urandom(32) peer = os.urandom(32) op = SearchOperation(target_key=target, known_peers={peer: b"p"}) assert not op.is_complete() op.start() assert not op.is_complete() op.on_reply(peer, found_data=b"data") assert op.is_complete() def test_is_complete_when_exhausted(self): from i2p_netdb.operations import SearchOperation target = os.urandom(32) peer = os.urandom(32) op = SearchOperation(target_key=target, known_peers={peer: b"p"}) op.start() assert not op.is_complete() op.on_reply(peer, found_data=None) assert op.is_complete() class TestStoreOperation: def test_start_returns_correct_tuples(self): from i2p_netdb.operations import StoreOperation key = os.urandom(32) data = b"store this data" peers = [os.urandom(32) for _ in range(3)] op = StoreOperation(key=key, data=data, target_peers=peers) tuples = op.start() assert len(tuples) == 3 for peer_hash, t_key, t_data in tuples: assert peer_hash in peers assert t_key == key assert t_data == data def test_on_ack_tracks_confirmations(self): from i2p_netdb.operations import StoreOperation key = os.urandom(32) peers = [os.urandom(32) for _ in range(3)] op = StoreOperation(key=key, data=b"data", target_peers=peers) op.start() assert len(op.get_confirmed_peers()) == 0 op.on_ack(peers[0]) assert peers[0] in op.get_confirmed_peers() assert len(op.get_confirmed_peers()) == 1 def test_is_complete_after_enough_acks(self): from i2p_netdb.operations import StoreOperation key = os.urandom(32) peers = [os.urandom(32) for _ in range(5)] op = StoreOperation(key=key, data=b"data", target_peers=peers, redundancy=3) op.start() op.on_ack(peers[0]) op.on_ack(peers[1]) assert not op.is_complete() op.on_ack(peers[2]) assert op.is_complete() def test_not_complete_before_enough_acks(self): from i2p_netdb.operations import StoreOperation key = os.urandom(32) peers = [os.urandom(32) for _ in range(5)] op = StoreOperation(key=key, data=b"data", target_peers=peers, redundancy=3) op.start() op.on_ack(peers[0]) assert not op.is_complete() op.on_ack(peers[1]) assert not op.is_complete() def test_confirmed_and_pending_peers(self): from i2p_netdb.operations import StoreOperation key = os.urandom(32) peers = [os.urandom(32) for _ in range(4)] op = StoreOperation(key=key, data=b"data", target_peers=peers, redundancy=2) op.start() op.on_ack(peers[0]) op.on_ack(peers[2]) confirmed = op.get_confirmed_peers() pending = op.get_pending_peers() assert peers[0] in confirmed assert peers[2] in confirmed assert peers[1] in pending assert peers[3] in pending assert len(confirmed) + len(pending) == len(peers) # --------------------------------------------------------------------------- # Enhanced NetDB operations: SearchJob, StoreJob, RouterInfoCache, NetDBOperationManager # --------------------------------------------------------------------------- class TestSearchJob: def test_start_lookup_creates_search_job(self): from i2p_netdb.floodfill import FloodfillManager from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager ff_mgr = FloodfillManager() cache = RouterInfoCache() op_mgr = NetDBOperationManager(ff_mgr, cache) target = os.urandom(32) job = op_mgr.start_lookup(target) assert job.target_hash == target assert job.search_id # non-empty assert not job.complete assert job.started_at > 0 assert len(job.replies) == 0 def test_process_search_reply(self): from i2p_netdb.floodfill import FloodfillManager from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager ff_mgr = FloodfillManager() cache = RouterInfoCache() op_mgr = NetDBOperationManager(ff_mgr, cache) target = os.urandom(32) job = op_mgr.start_lookup(target) from_peer = os.urandom(32) reply_data = {"router_info": b"some_data", "closer_peers": []} op_mgr.on_search_reply(job.search_id, from_peer, reply_data) assert len(job.replies) == 1 assert job.replies[0] == reply_data class TestSearchJobTimeout: def test_lookup_timeout_cleanup(self): from i2p_netdb.floodfill import FloodfillManager from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager, SearchJob ff_mgr = FloodfillManager() cache = RouterInfoCache() op_mgr = NetDBOperationManager(ff_mgr, cache) target = os.urandom(32) job = op_mgr.start_lookup(target) # Artificially age the job job.started_at = time.monotonic() - 60.0 cleaned = op_mgr.cleanup_stale(max_age_seconds=30.0) assert cleaned >= 1 # The search should no longer be tracked assert job.search_id not in op_mgr._searches class TestRouterInfoCache: def test_get_put(self): from i2p_netdb.operations import RouterInfoCache cache = RouterInfoCache(ttl_seconds=3600.0) peer_hash = os.urandom(32) info = {"caps": "f", "version": "0.9.62"} cache.put(peer_hash, info) result = cache.get(peer_hash) assert result == info def test_get_missing_returns_none(self): from i2p_netdb.operations import RouterInfoCache cache = RouterInfoCache() assert cache.get(os.urandom(32)) is None def test_expiration(self): from i2p_netdb.operations import RouterInfoCache cache = RouterInfoCache(ttl_seconds=0.0) # immediate expiration peer_hash = os.urandom(32) cache.put(peer_hash, {"test": True}) # With TTL=0, entry should be expired immediately (or nearly) assert cache.is_expired(peer_hash) def test_cleanup_expired(self): from i2p_netdb.operations import RouterInfoCache cache = RouterInfoCache(ttl_seconds=0.0) for _ in range(5): cache.put(os.urandom(32), {"x": 1}) removed = cache.cleanup_expired() assert removed == 5 def test_non_expired_not_cleaned(self): from i2p_netdb.operations import RouterInfoCache cache = RouterInfoCache(ttl_seconds=3600.0) peer_hash = os.urandom(32) cache.put(peer_hash, {"alive": True}) assert not cache.is_expired(peer_hash) removed = cache.cleanup_expired() assert removed == 0 assert cache.get(peer_hash) is not None class TestStoreJobEnhanced: def test_store_operation_creates_store_job(self): from i2p_netdb.floodfill import FloodfillManager from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager ff_mgr = FloodfillManager() cache = RouterInfoCache() op_mgr = NetDBOperationManager(ff_mgr, cache) data_hash = os.urandom(32) data = b"router_info_bytes" job = op_mgr.start_store(data_hash, data) assert job.data_hash == data_hash assert job.data == data assert job.store_id # non-empty assert not job.acked assert job.started_at > 0 def test_store_ack(self): from i2p_netdb.floodfill import FloodfillManager from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager ff_mgr = FloodfillManager() cache = RouterInfoCache() op_mgr = NetDBOperationManager(ff_mgr, cache) data_hash = os.urandom(32) job = op_mgr.start_store(data_hash, b"data") from_peer = os.urandom(32) op_mgr.on_store_ack(job.store_id, from_peer) assert job.acked class TestGetRouterInfoFromCache: def test_get_router_info_cache_hit(self): from i2p_netdb.floodfill import FloodfillManager from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager ff_mgr = FloodfillManager() cache = RouterInfoCache() op_mgr = NetDBOperationManager(ff_mgr, cache) peer_hash = os.urandom(32) info = {"caps": "fR", "version": "0.9.62"} cache.put(peer_hash, info) result = op_mgr.get_router_info(peer_hash) assert result == info def test_get_router_info_cache_miss(self): from i2p_netdb.floodfill import FloodfillManager from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager ff_mgr = FloodfillManager() cache = RouterInfoCache() op_mgr = NetDBOperationManager(ff_mgr, cache) result = op_mgr.get_router_info(os.urandom(32)) assert result is None