A Python port of the Invisible Internet Project (I2P)
1"""Tests for NetDB Kademlia lookup/store operations and enhanced NetDB operations."""
2
3import os
4import time
5
6import pytest
7
8
9def _make_key(prefix_byte: int) -> bytes:
10 """Create a deterministic 32-byte key with a known prefix for XOR distance testing."""
11 return bytes([prefix_byte]) + b'\x00' * 31
12
13
14class TestSearchOperation:
15 def test_finds_data_at_known_peer(self):
16 from i2p_netdb.operations import SearchOperation
17 target = os.urandom(32)
18 peer_hash = os.urandom(32)
19 known_peers = {peer_hash: b"peer_addr"}
20 op = SearchOperation(target_key=target, known_peers=known_peers)
21 to_query = op.start()
22 assert peer_hash in to_query
23
24 found_data = b"the data we wanted"
25 next_peers = op.on_reply(peer_hash, found_data=found_data)
26 assert op.is_complete()
27 assert op.get_result() == found_data
28 assert next_peers == []
29
30 def test_iterative_discovery(self):
31 from i2p_netdb.operations import SearchOperation
32 target = _make_key(0x00)
33 # Start with a distant peer
34 peer_a = _make_key(0xFF)
35 # peer_a tells us about peer_b, which is closer
36 peer_b = _make_key(0x01)
37 # peer_b has the data
38 known_peers = {peer_a: b"addr_a"}
39 op = SearchOperation(target_key=target, known_peers=known_peers)
40 to_query = op.start()
41 assert peer_a in to_query
42
43 # peer_a doesn't have data, but knows peer_b
44 next_peers = op.on_reply(peer_a, found_data=None,
45 closer_peers={peer_b: b"addr_b"})
46 assert peer_b in next_peers
47 assert peer_a not in next_peers # already queried
48
49 # peer_b has the data
50 op.on_reply(peer_b, found_data=b"target_data")
51 assert op.is_complete()
52 assert op.get_result() == b"target_data"
53
54 def test_exhausted_search_returns_none(self):
55 from i2p_netdb.operations import SearchOperation
56 target = os.urandom(32)
57 peer_a = os.urandom(32)
58 peer_b = os.urandom(32)
59 op = SearchOperation(target_key=target,
60 known_peers={peer_a: b"a", peer_b: b"b"})
61 op.start()
62 op.on_reply(peer_a, found_data=None)
63 op.on_reply(peer_b, found_data=None)
64 assert op.is_complete()
65 assert op.get_result() is None
66
67 def test_xor_distance_ordering(self):
68 from i2p_netdb.operations import SearchOperation
69 target = _make_key(0x00)
70 # Create peers at known distances
71 close_peer = _make_key(0x01) # distance 1
72 mid_peer = _make_key(0x0F) # distance 15
73 far_peer = _make_key(0xFF) # distance 255
74 known_peers = {
75 far_peer: b"far",
76 close_peer: b"close",
77 mid_peer: b"mid",
78 }
79 op = SearchOperation(target_key=target, known_peers=known_peers, k=2)
80 to_query = op.start()
81 # Should return the k=2 closest peers
82 assert len(to_query) == 2
83 assert close_peer in to_query
84 assert mid_peer in to_query
85 assert far_peer not in to_query
86
87 def test_already_queried_peers_not_requeried(self):
88 from i2p_netdb.operations import SearchOperation
89 target = os.urandom(32)
90 peer_a = os.urandom(32)
91 peer_b = os.urandom(32)
92 op = SearchOperation(target_key=target,
93 known_peers={peer_a: b"a", peer_b: b"b"})
94 op.start()
95 # Query peer_a, it returns peer_b (already known) as closer
96 next_peers = op.on_reply(peer_a, found_data=None,
97 closer_peers={peer_b: b"b"})
98 # peer_b should appear since not yet queried
99 # But peer_a should not reappear
100 assert peer_a not in next_peers
101 assert peer_a in op.get_queried()
102
103 # Now query peer_b, it returns peer_a again
104 next_peers = op.on_reply(peer_b, found_data=None,
105 closer_peers={peer_a: b"a"})
106 assert peer_a not in next_peers # already queried
107 assert peer_b in op.get_queried()
108
109 def test_is_complete_when_found(self):
110 from i2p_netdb.operations import SearchOperation
111 target = os.urandom(32)
112 peer = os.urandom(32)
113 op = SearchOperation(target_key=target, known_peers={peer: b"p"})
114 assert not op.is_complete()
115 op.start()
116 assert not op.is_complete()
117 op.on_reply(peer, found_data=b"data")
118 assert op.is_complete()
119
120 def test_is_complete_when_exhausted(self):
121 from i2p_netdb.operations import SearchOperation
122 target = os.urandom(32)
123 peer = os.urandom(32)
124 op = SearchOperation(target_key=target, known_peers={peer: b"p"})
125 op.start()
126 assert not op.is_complete()
127 op.on_reply(peer, found_data=None)
128 assert op.is_complete()
129
130
131class TestStoreOperation:
132 def test_start_returns_correct_tuples(self):
133 from i2p_netdb.operations import StoreOperation
134 key = os.urandom(32)
135 data = b"store this data"
136 peers = [os.urandom(32) for _ in range(3)]
137 op = StoreOperation(key=key, data=data, target_peers=peers)
138 tuples = op.start()
139 assert len(tuples) == 3
140 for peer_hash, t_key, t_data in tuples:
141 assert peer_hash in peers
142 assert t_key == key
143 assert t_data == data
144
145 def test_on_ack_tracks_confirmations(self):
146 from i2p_netdb.operations import StoreOperation
147 key = os.urandom(32)
148 peers = [os.urandom(32) for _ in range(3)]
149 op = StoreOperation(key=key, data=b"data", target_peers=peers)
150 op.start()
151 assert len(op.get_confirmed_peers()) == 0
152 op.on_ack(peers[0])
153 assert peers[0] in op.get_confirmed_peers()
154 assert len(op.get_confirmed_peers()) == 1
155
156 def test_is_complete_after_enough_acks(self):
157 from i2p_netdb.operations import StoreOperation
158 key = os.urandom(32)
159 peers = [os.urandom(32) for _ in range(5)]
160 op = StoreOperation(key=key, data=b"data", target_peers=peers,
161 redundancy=3)
162 op.start()
163 op.on_ack(peers[0])
164 op.on_ack(peers[1])
165 assert not op.is_complete()
166 op.on_ack(peers[2])
167 assert op.is_complete()
168
169 def test_not_complete_before_enough_acks(self):
170 from i2p_netdb.operations import StoreOperation
171 key = os.urandom(32)
172 peers = [os.urandom(32) for _ in range(5)]
173 op = StoreOperation(key=key, data=b"data", target_peers=peers,
174 redundancy=3)
175 op.start()
176 op.on_ack(peers[0])
177 assert not op.is_complete()
178 op.on_ack(peers[1])
179 assert not op.is_complete()
180
181 def test_confirmed_and_pending_peers(self):
182 from i2p_netdb.operations import StoreOperation
183 key = os.urandom(32)
184 peers = [os.urandom(32) for _ in range(4)]
185 op = StoreOperation(key=key, data=b"data", target_peers=peers,
186 redundancy=2)
187 op.start()
188 op.on_ack(peers[0])
189 op.on_ack(peers[2])
190 confirmed = op.get_confirmed_peers()
191 pending = op.get_pending_peers()
192 assert peers[0] in confirmed
193 assert peers[2] in confirmed
194 assert peers[1] in pending
195 assert peers[3] in pending
196 assert len(confirmed) + len(pending) == len(peers)
197
198
199# ---------------------------------------------------------------------------
200# Enhanced NetDB operations: SearchJob, StoreJob, RouterInfoCache, NetDBOperationManager
201# ---------------------------------------------------------------------------
202
203
204class TestSearchJob:
205 def test_start_lookup_creates_search_job(self):
206 from i2p_netdb.floodfill import FloodfillManager
207 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager
208
209 ff_mgr = FloodfillManager()
210 cache = RouterInfoCache()
211 op_mgr = NetDBOperationManager(ff_mgr, cache)
212
213 target = os.urandom(32)
214 job = op_mgr.start_lookup(target)
215 assert job.target_hash == target
216 assert job.search_id # non-empty
217 assert not job.complete
218 assert job.started_at > 0
219 assert len(job.replies) == 0
220
221 def test_process_search_reply(self):
222 from i2p_netdb.floodfill import FloodfillManager
223 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager
224
225 ff_mgr = FloodfillManager()
226 cache = RouterInfoCache()
227 op_mgr = NetDBOperationManager(ff_mgr, cache)
228
229 target = os.urandom(32)
230 job = op_mgr.start_lookup(target)
231 from_peer = os.urandom(32)
232 reply_data = {"router_info": b"some_data", "closer_peers": []}
233 op_mgr.on_search_reply(job.search_id, from_peer, reply_data)
234 assert len(job.replies) == 1
235 assert job.replies[0] == reply_data
236
237
238class TestSearchJobTimeout:
239 def test_lookup_timeout_cleanup(self):
240 from i2p_netdb.floodfill import FloodfillManager
241 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager, SearchJob
242
243 ff_mgr = FloodfillManager()
244 cache = RouterInfoCache()
245 op_mgr = NetDBOperationManager(ff_mgr, cache)
246
247 target = os.urandom(32)
248 job = op_mgr.start_lookup(target)
249 # Artificially age the job
250 job.started_at = time.monotonic() - 60.0
251 cleaned = op_mgr.cleanup_stale(max_age_seconds=30.0)
252 assert cleaned >= 1
253 # The search should no longer be tracked
254 assert job.search_id not in op_mgr._searches
255
256
257class TestRouterInfoCache:
258 def test_get_put(self):
259 from i2p_netdb.operations import RouterInfoCache
260
261 cache = RouterInfoCache(ttl_seconds=3600.0)
262 peer_hash = os.urandom(32)
263 info = {"caps": "f", "version": "0.9.62"}
264 cache.put(peer_hash, info)
265 result = cache.get(peer_hash)
266 assert result == info
267
268 def test_get_missing_returns_none(self):
269 from i2p_netdb.operations import RouterInfoCache
270
271 cache = RouterInfoCache()
272 assert cache.get(os.urandom(32)) is None
273
274 def test_expiration(self):
275 from i2p_netdb.operations import RouterInfoCache
276
277 cache = RouterInfoCache(ttl_seconds=0.0) # immediate expiration
278 peer_hash = os.urandom(32)
279 cache.put(peer_hash, {"test": True})
280 # With TTL=0, entry should be expired immediately (or nearly)
281 assert cache.is_expired(peer_hash)
282
283 def test_cleanup_expired(self):
284 from i2p_netdb.operations import RouterInfoCache
285
286 cache = RouterInfoCache(ttl_seconds=0.0)
287 for _ in range(5):
288 cache.put(os.urandom(32), {"x": 1})
289 removed = cache.cleanup_expired()
290 assert removed == 5
291
292 def test_non_expired_not_cleaned(self):
293 from i2p_netdb.operations import RouterInfoCache
294
295 cache = RouterInfoCache(ttl_seconds=3600.0)
296 peer_hash = os.urandom(32)
297 cache.put(peer_hash, {"alive": True})
298 assert not cache.is_expired(peer_hash)
299 removed = cache.cleanup_expired()
300 assert removed == 0
301 assert cache.get(peer_hash) is not None
302
303
304class TestStoreJobEnhanced:
305 def test_store_operation_creates_store_job(self):
306 from i2p_netdb.floodfill import FloodfillManager
307 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager
308
309 ff_mgr = FloodfillManager()
310 cache = RouterInfoCache()
311 op_mgr = NetDBOperationManager(ff_mgr, cache)
312
313 data_hash = os.urandom(32)
314 data = b"router_info_bytes"
315 job = op_mgr.start_store(data_hash, data)
316 assert job.data_hash == data_hash
317 assert job.data == data
318 assert job.store_id # non-empty
319 assert not job.acked
320 assert job.started_at > 0
321
322 def test_store_ack(self):
323 from i2p_netdb.floodfill import FloodfillManager
324 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager
325
326 ff_mgr = FloodfillManager()
327 cache = RouterInfoCache()
328 op_mgr = NetDBOperationManager(ff_mgr, cache)
329
330 data_hash = os.urandom(32)
331 job = op_mgr.start_store(data_hash, b"data")
332 from_peer = os.urandom(32)
333 op_mgr.on_store_ack(job.store_id, from_peer)
334 assert job.acked
335
336
337class TestGetRouterInfoFromCache:
338 def test_get_router_info_cache_hit(self):
339 from i2p_netdb.floodfill import FloodfillManager
340 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager
341
342 ff_mgr = FloodfillManager()
343 cache = RouterInfoCache()
344 op_mgr = NetDBOperationManager(ff_mgr, cache)
345
346 peer_hash = os.urandom(32)
347 info = {"caps": "fR", "version": "0.9.62"}
348 cache.put(peer_hash, info)
349 result = op_mgr.get_router_info(peer_hash)
350 assert result == info
351
352 def test_get_router_info_cache_miss(self):
353 from i2p_netdb.floodfill import FloodfillManager
354 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager
355
356 ff_mgr = FloodfillManager()
357 cache = RouterInfoCache()
358 op_mgr = NetDBOperationManager(ff_mgr, cache)
359
360 result = op_mgr.get_router_info(os.urandom(32))
361 assert result is None