A Python port of the Invisible Internet Project (I2P)
at main 140 lines 5.2 kB view raw
1"""Tests for NetDBHandler — wiring layer between DataStore and operations.""" 2 3import os 4 5import pytest 6 7 8def _rand_key() -> bytes: 9 return os.urandom(32) 10 11 12def _make_key(prefix_byte: int) -> bytes: 13 return bytes([prefix_byte]) + b'\x00' * 31 14 15 16class TestHandleStoreAndLookup: 17 def test_store_and_lookup_roundtrip(self): 18 from i2p_netdb.netdb_handler import NetDBHandler 19 handler = NetDBHandler() 20 key = _rand_key() 21 data = b"router info payload" 22 assert handler.handle_store(key, data) is True 23 assert handler.handle_lookup(key) == data 24 25 def test_lookup_missing_key_returns_none(self): 26 from i2p_netdb.netdb_handler import NetDBHandler 27 handler = NetDBHandler() 28 assert handler.handle_lookup(_rand_key()) is None 29 30 31class TestSearchLifecycle: 32 def test_start_search_creates_operation(self): 33 from i2p_netdb.netdb_handler import NetDBHandler 34 handler = NetDBHandler() 35 target = _rand_key() 36 peer = _rand_key() 37 search_id = handler.start_search(target, known_peers={peer: b"addr"}) 38 assert isinstance(search_id, str) 39 assert len(search_id) == 16 # 8 bytes -> 16 hex chars 40 assert not handler.is_search_complete(search_id) 41 assert handler.get_search_result(search_id) is None 42 43 def test_on_search_reply_with_found_data_stores_it(self): 44 from i2p_netdb.netdb_handler import NetDBHandler 45 handler = NetDBHandler() 46 target = _rand_key() 47 peer = _rand_key() 48 search_id = handler.start_search(target, known_peers={peer: b"addr"}) 49 50 found_data = b"the target data" 51 new_queries = handler.on_search_reply(search_id, peer, found_data=found_data) 52 assert new_queries == [] 53 assert handler.is_search_complete(search_id) 54 assert handler.get_search_result(search_id) == found_data 55 # Data should also be in the datastore now 56 assert handler.handle_lookup(target) == found_data 57 58 def test_on_search_reply_with_closer_peers_returns_new_queries(self): 59 from i2p_netdb.netdb_handler import NetDBHandler 60 handler = NetDBHandler() 61 target = _make_key(0x00) 62 peer_a = _make_key(0xFF) 63 peer_b = _make_key(0x01) 64 search_id = handler.start_search(target, known_peers={peer_a: b"addr_a"}) 65 66 new_queries = handler.on_search_reply( 67 search_id, peer_a, found_data=None, 68 closer_peers={peer_b: b"addr_b"} 69 ) 70 assert peer_b in new_queries 71 72 def test_is_search_complete_after_finding_data(self): 73 from i2p_netdb.netdb_handler import NetDBHandler 74 handler = NetDBHandler() 75 target = _rand_key() 76 peer = _rand_key() 77 search_id = handler.start_search(target, known_peers={peer: b"addr"}) 78 assert not handler.is_search_complete(search_id) 79 80 handler.on_search_reply(search_id, peer, found_data=b"data") 81 assert handler.is_search_complete(search_id) 82 83 def test_multiple_concurrent_searches(self): 84 from i2p_netdb.netdb_handler import NetDBHandler 85 handler = NetDBHandler() 86 87 target_1 = _rand_key() 88 peer_1 = _rand_key() 89 sid_1 = handler.start_search(target_1, known_peers={peer_1: b"a1"}) 90 91 target_2 = _rand_key() 92 peer_2 = _rand_key() 93 sid_2 = handler.start_search(target_2, known_peers={peer_2: b"a2"}) 94 95 assert sid_1 != sid_2 96 97 # Complete search 1 98 handler.on_search_reply(sid_1, peer_1, found_data=b"result_1") 99 assert handler.is_search_complete(sid_1) 100 assert not handler.is_search_complete(sid_2) 101 assert handler.get_search_result(sid_1) == b"result_1" 102 assert handler.get_search_result(sid_2) is None 103 104 # Complete search 2 105 handler.on_search_reply(sid_2, peer_2, found_data=b"result_2") 106 assert handler.is_search_complete(sid_2) 107 assert handler.get_search_result(sid_2) == b"result_2" 108 109 110class TestStoreOpLifecycle: 111 def test_start_store_op_and_on_store_ack(self): 112 from i2p_netdb.netdb_handler import NetDBHandler 113 handler = NetDBHandler() 114 key = _rand_key() 115 data = b"store me" 116 peers = [_rand_key() for _ in range(5)] 117 store_id = handler.start_store_op(key, data, target_peers=peers, redundancy=3) 118 assert isinstance(store_id, str) 119 assert len(store_id) == 16 120 assert not handler.is_store_complete(store_id) 121 122 handler.on_store_ack(store_id, peers[0]) 123 handler.on_store_ack(store_id, peers[1]) 124 assert not handler.is_store_complete(store_id) 125 126 handler.on_store_ack(store_id, peers[2]) 127 assert handler.is_store_complete(store_id) 128 129 def test_is_store_complete_after_enough_acks(self): 130 from i2p_netdb.netdb_handler import NetDBHandler 131 handler = NetDBHandler() 132 key = _rand_key() 133 peers = [_rand_key() for _ in range(4)] 134 store_id = handler.start_store_op(key, b"data", target_peers=peers, redundancy=2) 135 136 handler.on_store_ack(store_id, peers[0]) 137 assert not handler.is_store_complete(store_id) 138 139 handler.on_store_ack(store_id, peers[3]) 140 assert handler.is_store_complete(store_id)