"""Tests for NetDBHandler — wiring layer between DataStore and operations.""" import os import pytest def _rand_key() -> bytes: return os.urandom(32) def _make_key(prefix_byte: int) -> bytes: return bytes([prefix_byte]) + b'\x00' * 31 class TestHandleStoreAndLookup: def test_store_and_lookup_roundtrip(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() key = _rand_key() data = b"router info payload" assert handler.handle_store(key, data) is True assert handler.handle_lookup(key) == data def test_lookup_missing_key_returns_none(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() assert handler.handle_lookup(_rand_key()) is None class TestSearchLifecycle: def test_start_search_creates_operation(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() target = _rand_key() peer = _rand_key() search_id = handler.start_search(target, known_peers={peer: b"addr"}) assert isinstance(search_id, str) assert len(search_id) == 16 # 8 bytes -> 16 hex chars assert not handler.is_search_complete(search_id) assert handler.get_search_result(search_id) is None def test_on_search_reply_with_found_data_stores_it(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() target = _rand_key() peer = _rand_key() search_id = handler.start_search(target, known_peers={peer: b"addr"}) found_data = b"the target data" new_queries = handler.on_search_reply(search_id, peer, found_data=found_data) assert new_queries == [] assert handler.is_search_complete(search_id) assert handler.get_search_result(search_id) == found_data # Data should also be in the datastore now assert handler.handle_lookup(target) == found_data def test_on_search_reply_with_closer_peers_returns_new_queries(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() target = _make_key(0x00) peer_a = _make_key(0xFF) peer_b = _make_key(0x01) search_id = handler.start_search(target, known_peers={peer_a: b"addr_a"}) new_queries = handler.on_search_reply( search_id, peer_a, found_data=None, closer_peers={peer_b: b"addr_b"} ) assert peer_b in new_queries def test_is_search_complete_after_finding_data(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() target = _rand_key() peer = _rand_key() search_id = handler.start_search(target, known_peers={peer: b"addr"}) assert not handler.is_search_complete(search_id) handler.on_search_reply(search_id, peer, found_data=b"data") assert handler.is_search_complete(search_id) def test_multiple_concurrent_searches(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() target_1 = _rand_key() peer_1 = _rand_key() sid_1 = handler.start_search(target_1, known_peers={peer_1: b"a1"}) target_2 = _rand_key() peer_2 = _rand_key() sid_2 = handler.start_search(target_2, known_peers={peer_2: b"a2"}) assert sid_1 != sid_2 # Complete search 1 handler.on_search_reply(sid_1, peer_1, found_data=b"result_1") assert handler.is_search_complete(sid_1) assert not handler.is_search_complete(sid_2) assert handler.get_search_result(sid_1) == b"result_1" assert handler.get_search_result(sid_2) is None # Complete search 2 handler.on_search_reply(sid_2, peer_2, found_data=b"result_2") assert handler.is_search_complete(sid_2) assert handler.get_search_result(sid_2) == b"result_2" class TestStoreOpLifecycle: def test_start_store_op_and_on_store_ack(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() key = _rand_key() data = b"store me" peers = [_rand_key() for _ in range(5)] store_id = handler.start_store_op(key, data, target_peers=peers, redundancy=3) assert isinstance(store_id, str) assert len(store_id) == 16 assert not handler.is_store_complete(store_id) handler.on_store_ack(store_id, peers[0]) handler.on_store_ack(store_id, peers[1]) assert not handler.is_store_complete(store_id) handler.on_store_ack(store_id, peers[2]) assert handler.is_store_complete(store_id) def test_is_store_complete_after_enough_acks(self): from i2p_netdb.netdb_handler import NetDBHandler handler = NetDBHandler() key = _rand_key() peers = [_rand_key() for _ in range(4)] store_id = handler.start_store_op(key, b"data", target_peers=peers, redundancy=2) handler.on_store_ack(store_id, peers[0]) assert not handler.is_store_complete(store_id) handler.on_store_ack(store_id, peers[3]) assert handler.is_store_complete(store_id)