A Python port of the Invisible Internet Project (I2P)
at main 361 lines 13 kB view raw
1"""Tests for NetDB Kademlia lookup/store operations and enhanced NetDB operations.""" 2 3import os 4import time 5 6import pytest 7 8 9def _make_key(prefix_byte: int) -> bytes: 10 """Create a deterministic 32-byte key with a known prefix for XOR distance testing.""" 11 return bytes([prefix_byte]) + b'\x00' * 31 12 13 14class TestSearchOperation: 15 def test_finds_data_at_known_peer(self): 16 from i2p_netdb.operations import SearchOperation 17 target = os.urandom(32) 18 peer_hash = os.urandom(32) 19 known_peers = {peer_hash: b"peer_addr"} 20 op = SearchOperation(target_key=target, known_peers=known_peers) 21 to_query = op.start() 22 assert peer_hash in to_query 23 24 found_data = b"the data we wanted" 25 next_peers = op.on_reply(peer_hash, found_data=found_data) 26 assert op.is_complete() 27 assert op.get_result() == found_data 28 assert next_peers == [] 29 30 def test_iterative_discovery(self): 31 from i2p_netdb.operations import SearchOperation 32 target = _make_key(0x00) 33 # Start with a distant peer 34 peer_a = _make_key(0xFF) 35 # peer_a tells us about peer_b, which is closer 36 peer_b = _make_key(0x01) 37 # peer_b has the data 38 known_peers = {peer_a: b"addr_a"} 39 op = SearchOperation(target_key=target, known_peers=known_peers) 40 to_query = op.start() 41 assert peer_a in to_query 42 43 # peer_a doesn't have data, but knows peer_b 44 next_peers = op.on_reply(peer_a, found_data=None, 45 closer_peers={peer_b: b"addr_b"}) 46 assert peer_b in next_peers 47 assert peer_a not in next_peers # already queried 48 49 # peer_b has the data 50 op.on_reply(peer_b, found_data=b"target_data") 51 assert op.is_complete() 52 assert op.get_result() == b"target_data" 53 54 def test_exhausted_search_returns_none(self): 55 from i2p_netdb.operations import SearchOperation 56 target = os.urandom(32) 57 peer_a = os.urandom(32) 58 peer_b = os.urandom(32) 59 op = SearchOperation(target_key=target, 60 known_peers={peer_a: b"a", peer_b: b"b"}) 61 op.start() 62 op.on_reply(peer_a, found_data=None) 63 op.on_reply(peer_b, found_data=None) 64 assert op.is_complete() 65 assert op.get_result() is None 66 67 def test_xor_distance_ordering(self): 68 from i2p_netdb.operations import SearchOperation 69 target = _make_key(0x00) 70 # Create peers at known distances 71 close_peer = _make_key(0x01) # distance 1 72 mid_peer = _make_key(0x0F) # distance 15 73 far_peer = _make_key(0xFF) # distance 255 74 known_peers = { 75 far_peer: b"far", 76 close_peer: b"close", 77 mid_peer: b"mid", 78 } 79 op = SearchOperation(target_key=target, known_peers=known_peers, k=2) 80 to_query = op.start() 81 # Should return the k=2 closest peers 82 assert len(to_query) == 2 83 assert close_peer in to_query 84 assert mid_peer in to_query 85 assert far_peer not in to_query 86 87 def test_already_queried_peers_not_requeried(self): 88 from i2p_netdb.operations import SearchOperation 89 target = os.urandom(32) 90 peer_a = os.urandom(32) 91 peer_b = os.urandom(32) 92 op = SearchOperation(target_key=target, 93 known_peers={peer_a: b"a", peer_b: b"b"}) 94 op.start() 95 # Query peer_a, it returns peer_b (already known) as closer 96 next_peers = op.on_reply(peer_a, found_data=None, 97 closer_peers={peer_b: b"b"}) 98 # peer_b should appear since not yet queried 99 # But peer_a should not reappear 100 assert peer_a not in next_peers 101 assert peer_a in op.get_queried() 102 103 # Now query peer_b, it returns peer_a again 104 next_peers = op.on_reply(peer_b, found_data=None, 105 closer_peers={peer_a: b"a"}) 106 assert peer_a not in next_peers # already queried 107 assert peer_b in op.get_queried() 108 109 def test_is_complete_when_found(self): 110 from i2p_netdb.operations import SearchOperation 111 target = os.urandom(32) 112 peer = os.urandom(32) 113 op = SearchOperation(target_key=target, known_peers={peer: b"p"}) 114 assert not op.is_complete() 115 op.start() 116 assert not op.is_complete() 117 op.on_reply(peer, found_data=b"data") 118 assert op.is_complete() 119 120 def test_is_complete_when_exhausted(self): 121 from i2p_netdb.operations import SearchOperation 122 target = os.urandom(32) 123 peer = os.urandom(32) 124 op = SearchOperation(target_key=target, known_peers={peer: b"p"}) 125 op.start() 126 assert not op.is_complete() 127 op.on_reply(peer, found_data=None) 128 assert op.is_complete() 129 130 131class TestStoreOperation: 132 def test_start_returns_correct_tuples(self): 133 from i2p_netdb.operations import StoreOperation 134 key = os.urandom(32) 135 data = b"store this data" 136 peers = [os.urandom(32) for _ in range(3)] 137 op = StoreOperation(key=key, data=data, target_peers=peers) 138 tuples = op.start() 139 assert len(tuples) == 3 140 for peer_hash, t_key, t_data in tuples: 141 assert peer_hash in peers 142 assert t_key == key 143 assert t_data == data 144 145 def test_on_ack_tracks_confirmations(self): 146 from i2p_netdb.operations import StoreOperation 147 key = os.urandom(32) 148 peers = [os.urandom(32) for _ in range(3)] 149 op = StoreOperation(key=key, data=b"data", target_peers=peers) 150 op.start() 151 assert len(op.get_confirmed_peers()) == 0 152 op.on_ack(peers[0]) 153 assert peers[0] in op.get_confirmed_peers() 154 assert len(op.get_confirmed_peers()) == 1 155 156 def test_is_complete_after_enough_acks(self): 157 from i2p_netdb.operations import StoreOperation 158 key = os.urandom(32) 159 peers = [os.urandom(32) for _ in range(5)] 160 op = StoreOperation(key=key, data=b"data", target_peers=peers, 161 redundancy=3) 162 op.start() 163 op.on_ack(peers[0]) 164 op.on_ack(peers[1]) 165 assert not op.is_complete() 166 op.on_ack(peers[2]) 167 assert op.is_complete() 168 169 def test_not_complete_before_enough_acks(self): 170 from i2p_netdb.operations import StoreOperation 171 key = os.urandom(32) 172 peers = [os.urandom(32) for _ in range(5)] 173 op = StoreOperation(key=key, data=b"data", target_peers=peers, 174 redundancy=3) 175 op.start() 176 op.on_ack(peers[0]) 177 assert not op.is_complete() 178 op.on_ack(peers[1]) 179 assert not op.is_complete() 180 181 def test_confirmed_and_pending_peers(self): 182 from i2p_netdb.operations import StoreOperation 183 key = os.urandom(32) 184 peers = [os.urandom(32) for _ in range(4)] 185 op = StoreOperation(key=key, data=b"data", target_peers=peers, 186 redundancy=2) 187 op.start() 188 op.on_ack(peers[0]) 189 op.on_ack(peers[2]) 190 confirmed = op.get_confirmed_peers() 191 pending = op.get_pending_peers() 192 assert peers[0] in confirmed 193 assert peers[2] in confirmed 194 assert peers[1] in pending 195 assert peers[3] in pending 196 assert len(confirmed) + len(pending) == len(peers) 197 198 199# --------------------------------------------------------------------------- 200# Enhanced NetDB operations: SearchJob, StoreJob, RouterInfoCache, NetDBOperationManager 201# --------------------------------------------------------------------------- 202 203 204class TestSearchJob: 205 def test_start_lookup_creates_search_job(self): 206 from i2p_netdb.floodfill import FloodfillManager 207 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager 208 209 ff_mgr = FloodfillManager() 210 cache = RouterInfoCache() 211 op_mgr = NetDBOperationManager(ff_mgr, cache) 212 213 target = os.urandom(32) 214 job = op_mgr.start_lookup(target) 215 assert job.target_hash == target 216 assert job.search_id # non-empty 217 assert not job.complete 218 assert job.started_at > 0 219 assert len(job.replies) == 0 220 221 def test_process_search_reply(self): 222 from i2p_netdb.floodfill import FloodfillManager 223 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager 224 225 ff_mgr = FloodfillManager() 226 cache = RouterInfoCache() 227 op_mgr = NetDBOperationManager(ff_mgr, cache) 228 229 target = os.urandom(32) 230 job = op_mgr.start_lookup(target) 231 from_peer = os.urandom(32) 232 reply_data = {"router_info": b"some_data", "closer_peers": []} 233 op_mgr.on_search_reply(job.search_id, from_peer, reply_data) 234 assert len(job.replies) == 1 235 assert job.replies[0] == reply_data 236 237 238class TestSearchJobTimeout: 239 def test_lookup_timeout_cleanup(self): 240 from i2p_netdb.floodfill import FloodfillManager 241 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager, SearchJob 242 243 ff_mgr = FloodfillManager() 244 cache = RouterInfoCache() 245 op_mgr = NetDBOperationManager(ff_mgr, cache) 246 247 target = os.urandom(32) 248 job = op_mgr.start_lookup(target) 249 # Artificially age the job 250 job.started_at = time.monotonic() - 60.0 251 cleaned = op_mgr.cleanup_stale(max_age_seconds=30.0) 252 assert cleaned >= 1 253 # The search should no longer be tracked 254 assert job.search_id not in op_mgr._searches 255 256 257class TestRouterInfoCache: 258 def test_get_put(self): 259 from i2p_netdb.operations import RouterInfoCache 260 261 cache = RouterInfoCache(ttl_seconds=3600.0) 262 peer_hash = os.urandom(32) 263 info = {"caps": "f", "version": "0.9.62"} 264 cache.put(peer_hash, info) 265 result = cache.get(peer_hash) 266 assert result == info 267 268 def test_get_missing_returns_none(self): 269 from i2p_netdb.operations import RouterInfoCache 270 271 cache = RouterInfoCache() 272 assert cache.get(os.urandom(32)) is None 273 274 def test_expiration(self): 275 from i2p_netdb.operations import RouterInfoCache 276 277 cache = RouterInfoCache(ttl_seconds=0.0) # immediate expiration 278 peer_hash = os.urandom(32) 279 cache.put(peer_hash, {"test": True}) 280 # With TTL=0, entry should be expired immediately (or nearly) 281 assert cache.is_expired(peer_hash) 282 283 def test_cleanup_expired(self): 284 from i2p_netdb.operations import RouterInfoCache 285 286 cache = RouterInfoCache(ttl_seconds=0.0) 287 for _ in range(5): 288 cache.put(os.urandom(32), {"x": 1}) 289 removed = cache.cleanup_expired() 290 assert removed == 5 291 292 def test_non_expired_not_cleaned(self): 293 from i2p_netdb.operations import RouterInfoCache 294 295 cache = RouterInfoCache(ttl_seconds=3600.0) 296 peer_hash = os.urandom(32) 297 cache.put(peer_hash, {"alive": True}) 298 assert not cache.is_expired(peer_hash) 299 removed = cache.cleanup_expired() 300 assert removed == 0 301 assert cache.get(peer_hash) is not None 302 303 304class TestStoreJobEnhanced: 305 def test_store_operation_creates_store_job(self): 306 from i2p_netdb.floodfill import FloodfillManager 307 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager 308 309 ff_mgr = FloodfillManager() 310 cache = RouterInfoCache() 311 op_mgr = NetDBOperationManager(ff_mgr, cache) 312 313 data_hash = os.urandom(32) 314 data = b"router_info_bytes" 315 job = op_mgr.start_store(data_hash, data) 316 assert job.data_hash == data_hash 317 assert job.data == data 318 assert job.store_id # non-empty 319 assert not job.acked 320 assert job.started_at > 0 321 322 def test_store_ack(self): 323 from i2p_netdb.floodfill import FloodfillManager 324 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager 325 326 ff_mgr = FloodfillManager() 327 cache = RouterInfoCache() 328 op_mgr = NetDBOperationManager(ff_mgr, cache) 329 330 data_hash = os.urandom(32) 331 job = op_mgr.start_store(data_hash, b"data") 332 from_peer = os.urandom(32) 333 op_mgr.on_store_ack(job.store_id, from_peer) 334 assert job.acked 335 336 337class TestGetRouterInfoFromCache: 338 def test_get_router_info_cache_hit(self): 339 from i2p_netdb.floodfill import FloodfillManager 340 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager 341 342 ff_mgr = FloodfillManager() 343 cache = RouterInfoCache() 344 op_mgr = NetDBOperationManager(ff_mgr, cache) 345 346 peer_hash = os.urandom(32) 347 info = {"caps": "fR", "version": "0.9.62"} 348 cache.put(peer_hash, info) 349 result = op_mgr.get_router_info(peer_hash) 350 assert result == info 351 352 def test_get_router_info_cache_miss(self): 353 from i2p_netdb.floodfill import FloodfillManager 354 from i2p_netdb.operations import RouterInfoCache, NetDBOperationManager 355 356 ff_mgr = FloodfillManager() 357 cache = RouterInfoCache() 358 op_mgr = NetDBOperationManager(ff_mgr, cache) 359 360 result = op_mgr.get_router_info(os.urandom(32)) 361 assert result is None