A Python port of the Invisible Internet Project (I2P)
1"""Tests for NetDBHandler — wiring layer between DataStore and operations."""
2
3import os
4
5import pytest
6
7
8def _rand_key() -> bytes:
9 return os.urandom(32)
10
11
12def _make_key(prefix_byte: int) -> bytes:
13 return bytes([prefix_byte]) + b'\x00' * 31
14
15
16class TestHandleStoreAndLookup:
17 def test_store_and_lookup_roundtrip(self):
18 from i2p_netdb.netdb_handler import NetDBHandler
19 handler = NetDBHandler()
20 key = _rand_key()
21 data = b"router info payload"
22 assert handler.handle_store(key, data) is True
23 assert handler.handle_lookup(key) == data
24
25 def test_lookup_missing_key_returns_none(self):
26 from i2p_netdb.netdb_handler import NetDBHandler
27 handler = NetDBHandler()
28 assert handler.handle_lookup(_rand_key()) is None
29
30
31class TestSearchLifecycle:
32 def test_start_search_creates_operation(self):
33 from i2p_netdb.netdb_handler import NetDBHandler
34 handler = NetDBHandler()
35 target = _rand_key()
36 peer = _rand_key()
37 search_id = handler.start_search(target, known_peers={peer: b"addr"})
38 assert isinstance(search_id, str)
39 assert len(search_id) == 16 # 8 bytes -> 16 hex chars
40 assert not handler.is_search_complete(search_id)
41 assert handler.get_search_result(search_id) is None
42
43 def test_on_search_reply_with_found_data_stores_it(self):
44 from i2p_netdb.netdb_handler import NetDBHandler
45 handler = NetDBHandler()
46 target = _rand_key()
47 peer = _rand_key()
48 search_id = handler.start_search(target, known_peers={peer: b"addr"})
49
50 found_data = b"the target data"
51 new_queries = handler.on_search_reply(search_id, peer, found_data=found_data)
52 assert new_queries == []
53 assert handler.is_search_complete(search_id)
54 assert handler.get_search_result(search_id) == found_data
55 # Data should also be in the datastore now
56 assert handler.handle_lookup(target) == found_data
57
58 def test_on_search_reply_with_closer_peers_returns_new_queries(self):
59 from i2p_netdb.netdb_handler import NetDBHandler
60 handler = NetDBHandler()
61 target = _make_key(0x00)
62 peer_a = _make_key(0xFF)
63 peer_b = _make_key(0x01)
64 search_id = handler.start_search(target, known_peers={peer_a: b"addr_a"})
65
66 new_queries = handler.on_search_reply(
67 search_id, peer_a, found_data=None,
68 closer_peers={peer_b: b"addr_b"}
69 )
70 assert peer_b in new_queries
71
72 def test_is_search_complete_after_finding_data(self):
73 from i2p_netdb.netdb_handler import NetDBHandler
74 handler = NetDBHandler()
75 target = _rand_key()
76 peer = _rand_key()
77 search_id = handler.start_search(target, known_peers={peer: b"addr"})
78 assert not handler.is_search_complete(search_id)
79
80 handler.on_search_reply(search_id, peer, found_data=b"data")
81 assert handler.is_search_complete(search_id)
82
83 def test_multiple_concurrent_searches(self):
84 from i2p_netdb.netdb_handler import NetDBHandler
85 handler = NetDBHandler()
86
87 target_1 = _rand_key()
88 peer_1 = _rand_key()
89 sid_1 = handler.start_search(target_1, known_peers={peer_1: b"a1"})
90
91 target_2 = _rand_key()
92 peer_2 = _rand_key()
93 sid_2 = handler.start_search(target_2, known_peers={peer_2: b"a2"})
94
95 assert sid_1 != sid_2
96
97 # Complete search 1
98 handler.on_search_reply(sid_1, peer_1, found_data=b"result_1")
99 assert handler.is_search_complete(sid_1)
100 assert not handler.is_search_complete(sid_2)
101 assert handler.get_search_result(sid_1) == b"result_1"
102 assert handler.get_search_result(sid_2) is None
103
104 # Complete search 2
105 handler.on_search_reply(sid_2, peer_2, found_data=b"result_2")
106 assert handler.is_search_complete(sid_2)
107 assert handler.get_search_result(sid_2) == b"result_2"
108
109
110class TestStoreOpLifecycle:
111 def test_start_store_op_and_on_store_ack(self):
112 from i2p_netdb.netdb_handler import NetDBHandler
113 handler = NetDBHandler()
114 key = _rand_key()
115 data = b"store me"
116 peers = [_rand_key() for _ in range(5)]
117 store_id = handler.start_store_op(key, data, target_peers=peers, redundancy=3)
118 assert isinstance(store_id, str)
119 assert len(store_id) == 16
120 assert not handler.is_store_complete(store_id)
121
122 handler.on_store_ack(store_id, peers[0])
123 handler.on_store_ack(store_id, peers[1])
124 assert not handler.is_store_complete(store_id)
125
126 handler.on_store_ack(store_id, peers[2])
127 assert handler.is_store_complete(store_id)
128
129 def test_is_store_complete_after_enough_acks(self):
130 from i2p_netdb.netdb_handler import NetDBHandler
131 handler = NetDBHandler()
132 key = _rand_key()
133 peers = [_rand_key() for _ in range(4)]
134 store_id = handler.start_store_op(key, b"data", target_peers=peers, redundancy=2)
135
136 handler.on_store_ack(store_id, peers[0])
137 assert not handler.is_store_complete(store_id)
138
139 handler.on_store_ack(store_id, peers[3])
140 assert handler.is_store_complete(store_id)