A Python port of the Invisible Internet Project (I2P)
at main 540 lines 19 kB view raw
1"""End-to-end integration tests for the I2P Python port. 2 3These tests exercise multiple subsystems together through the RouterContext, 4verifying that crypto, tunnel, NetDB, transport, and streaming components 5integrate correctly. 6""" 7 8import os 9import struct 10import time 11 12import pytest 13 14from i2p_router.core import RouterContext 15from i2p_crypto.session_key_manager import SessionKeyManager 16from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor 17from i2p_crypto.aes import AESEngine 18from i2p_tunnel.crypto import ( 19 TunnelLayerEncryptor, 20 TunnelLayerDecryptor, 21 OutboundTunnelEncryptor, 22) 23from i2p_tunnel.data_handler import TunnelCryptoRegistry, TunnelDataHandler 24from i2p_transport.ntcp2_handshake import NTCP2Handshake, NTCP2FrameCodec 25from i2p_transport.ntcp2 import NTCP2Frame, FrameType 26from i2p_crypto.x25519 import X25519DH 27from i2p_netdb.operations import SearchOperation, StoreOperation 28from i2p_streaming.stream_io import MessageInputStream, MessageOutputStream, StreamSession 29from i2p_data.garlic import ( 30 GarlicMessage, 31 GarlicClove, 32 DeliveryInstructions, 33 DeliveryType, 34) 35 36 37def _pad16(data: bytes) -> bytes: 38 """Pad data to a multiple of 16 bytes with zero bytes.""" 39 remainder = len(data) % 16 40 if remainder == 0 and len(data) > 0: 41 return data 42 pad_len = 16 - remainder 43 return data + b"\x00" * pad_len 44 45 46def _build_garlic_plaintext(message_data: bytes) -> bytes: 47 """Build a valid GarlicMessage plaintext containing one clove with the given data. 48 49 Returns bytes that are a multiple of 16. 50 """ 51 di = DeliveryInstructions(DeliveryType.LOCAL) 52 clove = GarlicClove( 53 delivery_instructions=di, 54 message_data=message_data, 55 clove_id=1, 56 expiration=int(time.time() * 1000) + 600_000, 57 ) 58 garlic_msg = GarlicMessage([clove]) 59 raw = garlic_msg.to_bytes() 60 return _pad16(raw) 61 62 63class TestGarlicEncryptDecryptThroughRouter: 64 """Test 1: Garlic encrypt/decrypt through RouterContext.""" 65 66 def test_garlic_encrypt_decrypt_through_router(self): 67 router = RouterContext() 68 69 # Create a session (generates session key + tags) 70 dest_hash = os.urandom(32) 71 session_key, tags = router.session_key_mgr.create_session(dest_hash) 72 73 # Build garlic plaintext with a clove containing our test data 74 test_payload = b"hello from garlic clove" 75 garlic_plaintext = _build_garlic_plaintext(test_payload) 76 77 # Pick a session tag and encrypt 78 session_tag = tags[0] 79 encrypted_payload = GarlicEncryptor.encrypt( 80 garlic_plaintext, session_key, session_tag 81 ) 82 83 # Feed through router's inbound handler (type 11 = GARLIC) 84 result = router.process_inbound(11, encrypted_payload) 85 86 # The garlic handler returns a list of clove message_data payloads 87 assert isinstance(result, list) 88 assert len(result) == 1 89 assert result[0] == test_payload 90 91 92class TestTunnelDataMultiHop: 93 """Test 2: Multi-hop tunnel encryption and decryption.""" 94 95 def test_tunnel_data_multi_hop(self): 96 # Create 3 sets of hop keys (layer_key, iv_key) 97 hop_keys = [ 98 (os.urandom(32), os.urandom(32)) # hop 0 (gateway) 99 for _ in range(3) 100 ] 101 # Use distinct keys for each hop 102 hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)] 103 104 # The endpoint is hop 2 (last hop) 105 endpoint_layer_key, endpoint_iv_key = hop_keys[2] 106 107 # Create a RouterContext for the endpoint 108 router = RouterContext() 109 endpoint_tunnel_id = 42 110 router.register_tunnel( 111 endpoint_tunnel_id, 112 endpoint_layer_key, 113 endpoint_iv_key, 114 is_endpoint=True, 115 ) 116 117 # Original plaintext (must be multiple of 16) 118 original_plaintext = _pad16(b"tunnel payload data for endpoint delivery") 119 120 # Encrypt with OutboundTunnelEncryptor (applies layers in reverse order) 121 fully_encrypted = OutboundTunnelEncryptor.encrypt(original_plaintext, hop_keys) 122 123 # Simulate intermediate hops peeling layers 124 # Hop 0 (gateway) decrypts its layer 125 after_hop0 = TunnelLayerDecryptor.decrypt_layer( 126 fully_encrypted, hop_keys[0][0], hop_keys[0][1] 127 ) 128 # Hop 1 (intermediate) decrypts its layer 129 after_hop1 = TunnelLayerDecryptor.decrypt_layer( 130 after_hop0, hop_keys[1][0], hop_keys[1][1] 131 ) 132 133 # At the endpoint, call router.process_tunnel_data() 134 result = router.process_tunnel_data(endpoint_tunnel_id, after_hop1) 135 136 assert result["action"] == "deliver" 137 assert result["data"] == original_plaintext 138 139 140class TestTunnelBuildAndRegisterLifecycle: 141 """Test 3: Register a tunnel and verify lifecycle.""" 142 143 def test_tunnel_build_and_register_lifecycle(self): 144 router = RouterContext() 145 146 tunnel_id = 100 147 layer_key = os.urandom(32) 148 iv_key = os.urandom(32) 149 150 # Register the tunnel 151 router.register_tunnel(tunnel_id, layer_key, iv_key, is_endpoint=True) 152 153 # Verify process_tunnel_data works on that tunnel 154 plaintext = _pad16(b"lifecycle test data") 155 # Encrypt a single layer (since only one hop registered) 156 encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) 157 result = router.process_tunnel_data(tunnel_id, encrypted) 158 159 assert result["action"] == "deliver" 160 assert result["data"] == plaintext 161 162 # Check get_status() shows the tunnel 163 status = router.get_status() 164 assert status["tunnels_registered"] == 1 165 166 def test_unknown_tunnel_returns_unknown(self): 167 router = RouterContext() 168 169 data = _pad16(b"data for unknown tunnel") 170 result = router.process_tunnel_data(999, data) 171 172 assert result["action"] == "unknown" 173 assert result["tunnel_id"] == 999 174 175 176class TestNetDBStoreThenLookupViaInbound: 177 """Test 4: Store and lookup NetDB entries via inbound message processing.""" 178 179 def test_netdb_store_then_lookup_via_inbound(self): 180 router = RouterContext() 181 182 # Store an entry via store_netdb_entry() 183 key = os.urandom(32) 184 data = b"router info data for testing netdb store and lookup" 185 186 router.store_netdb_entry(key, data) 187 188 # Lookup the entry via process_inbound(2, key_payload) (DATABASE_LOOKUP type) 189 # The handler extracts the first 32 bytes as the key 190 result = router.process_inbound(2, key) 191 192 assert result == data 193 194 def test_netdb_lookup_missing_returns_none(self): 195 router = RouterContext() 196 197 missing_key = os.urandom(32) 198 result = router.process_inbound(2, missing_key) 199 200 assert result is None 201 202 def test_netdb_store_via_inbound(self): 203 router = RouterContext() 204 205 key = os.urandom(32) 206 data = b"stored via inbound message" 207 208 # DATABASE_STORE is type 1; payload = key(32) + data 209 store_payload = key + data 210 router.process_inbound(1, store_payload) 211 212 # Verify it was stored by looking it up 213 result = router.lookup_netdb_entry(key) 214 assert result == data 215 216 217class TestNTCP2HandshakeThenFrameExchange: 218 """Test 5: Full NTCP2 Noise_XK handshake followed by frame exchange.""" 219 220 def test_ntcp2_handshake_then_frame_exchange(self): 221 # Generate static keypairs for initiator and responder 222 init_priv, init_pub = X25519DH.generate_keypair() 223 resp_priv, resp_pub = X25519DH.generate_keypair() 224 225 # Create handshake objects 226 # Initiator knows responder's static public key 227 initiator = NTCP2Handshake( 228 our_static=(init_priv, init_pub), 229 peer_static_pub=resp_pub, 230 initiator=True, 231 ) 232 # Responder does not know initiator's static key ahead of time 233 responder = NTCP2Handshake( 234 our_static=(resp_priv, resp_pub), 235 peer_static_pub=None, 236 initiator=False, 237 ) 238 239 # Message 1: initiator -> responder 240 msg1 = initiator.create_message_1(b"") 241 # Message 2: responder processes msg1, produces msg2 242 msg2 = responder.process_message_1(msg1) 243 # Message 3: initiator processes msg2, produces msg3 244 msg3 = initiator.process_message_2(msg2) 245 # Responder processes msg3 246 responder.process_message_3(msg3) 247 248 # Both sides should be complete 249 assert initiator.is_complete() 250 assert responder.is_complete() 251 252 # Responder should have learned initiator's static key 253 assert responder.remote_static_key() == init_pub 254 255 # Get cipher states 256 init_send, init_recv = initiator.split() 257 resp_send, resp_recv = responder.split() 258 259 # Create a frame codec 260 codec = NTCP2FrameCodec() 261 262 # Test: initiator sends a frame, responder decrypts it 263 test_payload = b"hello from initiator to responder" 264 frame = NTCP2Frame(FrameType.I2NP, test_payload) 265 266 encrypted = codec.encrypt_frame(init_send, frame) 267 decrypted_frame = codec.decrypt_frame(resp_recv, encrypted) 268 269 assert decrypted_frame.frame_type == FrameType.I2NP 270 assert decrypted_frame.payload == test_payload 271 272 # Test: responder sends a frame, initiator decrypts it 273 reply_payload = b"reply from responder to initiator" 274 reply_frame = NTCP2Frame(FrameType.I2NP, reply_payload) 275 276 encrypted_reply = codec.encrypt_frame(resp_send, reply_frame) 277 decrypted_reply = codec.decrypt_frame(init_recv, encrypted_reply) 278 279 assert decrypted_reply.frame_type == FrameType.I2NP 280 assert decrypted_reply.payload == reply_payload 281 282 def test_ntcp2_multiple_frames_maintain_nonce(self): 283 """Verify that multiple frames can be sent/received with nonce advancement.""" 284 init_priv, init_pub = X25519DH.generate_keypair() 285 resp_priv, resp_pub = X25519DH.generate_keypair() 286 287 initiator = NTCP2Handshake( 288 our_static=(init_priv, init_pub), 289 peer_static_pub=resp_pub, 290 initiator=True, 291 ) 292 responder = NTCP2Handshake( 293 our_static=(resp_priv, resp_pub), 294 initiator=False, 295 ) 296 297 msg1 = initiator.create_message_1() 298 msg2 = responder.process_message_1(msg1) 299 msg3 = initiator.process_message_2(msg2) 300 responder.process_message_3(msg3) 301 302 init_send, init_recv = initiator.split() 303 resp_send, resp_recv = responder.split() 304 codec = NTCP2FrameCodec() 305 306 # Send multiple frames in sequence 307 for i in range(5): 308 payload = f"message number {i}".encode() 309 frame = NTCP2Frame(FrameType.I2NP, payload) 310 encrypted = codec.encrypt_frame(init_send, frame) 311 decrypted = codec.decrypt_frame(resp_recv, encrypted) 312 assert decrypted.payload == payload 313 314 315class TestStreamingSessionDataFlow: 316 """Test 6: Streaming session data flow.""" 317 318 def test_streaming_session_data_flow(self): 319 # Create a StreamSession and connect 320 session = StreamSession(local_id=1000) 321 seq, syn_data = session.connect() 322 assert session.state == "SYN_SENT" 323 324 # Simulate receiving SYN-ACK 325 remote_id = 2000 326 session.receive_syn_ack(remote_id) 327 assert session.state == "ESTABLISHED" 328 329 # Send data 330 test_data = b"streaming data payload for integration test" 331 packets = session.send(test_data) 332 assert len(packets) > 0 333 334 # Feed the sent packets into a MessageInputStream (simulating receiver) 335 input_stream = MessageInputStream() 336 for seq_num, chunk in packets: 337 input_stream.receive_packet(seq_num, chunk) 338 339 # Read the data back 340 readable = input_stream.readable_bytes() 341 assert readable == len(test_data) 342 343 received = input_stream.read(readable) 344 assert received == test_data 345 346 def test_streaming_session_bidirectional(self): 347 """Test bidirectional data flow between two sessions.""" 348 # Session A (initiator) 349 session_a = StreamSession(local_id=100) 350 _, syn_data = session_a.connect() 351 352 # Session B (responder) 353 session_b = StreamSession(local_id=200) 354 remote_id_a = struct.unpack("!I", syn_data)[0] 355 _, syn_ack_data = session_b.accept(remote_id_a) 356 357 # Complete handshake 358 remote_id_b = struct.unpack("!II", syn_ack_data)[1] 359 session_a.receive_syn_ack(session_b.local_id) 360 361 assert session_a.state == "ESTABLISHED" 362 assert session_b.state == "ESTABLISHED" 363 364 # A sends to B 365 data_a = b"hello from A" 366 packets_a = session_a.send(data_a) 367 for seq_num, chunk in packets_a: 368 session_b.receive(seq_num, chunk) 369 assert session_b.read(len(data_a)) == data_a 370 371 # B sends to A 372 data_b = b"hello from B" 373 packets_b = session_b.send(data_b) 374 for seq_num, chunk in packets_b: 375 session_a.receive(seq_num, chunk) 376 assert session_a.read(len(data_b)) == data_b 377 378 def test_streaming_large_data_chunked(self): 379 """Test that large data is properly chunked and reassembled.""" 380 session = StreamSession(local_id=3000) 381 session.connect() 382 session.receive_syn_ack(4000) 383 384 # Send data larger than max_packet_size (default 1024) 385 large_data = os.urandom(3000) 386 packets = session.send(large_data) 387 388 # Should be chunked into multiple packets 389 assert len(packets) > 1 390 391 # Reassemble on receive side 392 input_stream = MessageInputStream() 393 for seq_num, chunk in packets: 394 input_stream.receive_packet(seq_num, chunk) 395 396 received = input_stream.read(input_stream.readable_bytes()) 397 assert received == large_data 398 399 400class TestFullMessageFlowNetDBStoreAndSearch: 401 """Test 7: Full message flow with NetDB store and search operations.""" 402 403 def test_full_message_flow_netdb_store_and_search(self): 404 router = RouterContext() 405 406 # Store several entries in the NetDB 407 entries = {} 408 for i in range(5): 409 key = os.urandom(32) 410 data = f"router info entry {i}".encode() 411 router.store_netdb_entry(key, data) 412 entries[key] = data 413 414 # Pick a target to search for 415 target_key = list(entries.keys())[2] 416 target_data = entries[target_key] 417 418 # Create known peers (some with keys that are "close" to target) 419 peer1 = os.urandom(32) 420 peer2 = os.urandom(32) 421 peer3 = os.urandom(32) 422 known_peers = { 423 peer1: b"peer1_info", 424 peer2: b"peer2_info", 425 peer3: b"peer3_info", 426 } 427 428 # Start a search via the netdb_handler 429 search_id = router.netdb_handler.start_search( 430 target_key, known_peers=known_peers 431 ) 432 433 # Feed a reply from peer1: didn't find it, but knows a closer peer 434 closer_peer = os.urandom(32) 435 next_queries = router.netdb_handler.on_search_reply( 436 search_id, 437 peer_hash=peer1, 438 found_data=None, 439 closer_peers={closer_peer: b"closer_peer_info"}, 440 ) 441 442 assert not router.netdb_handler.is_search_complete(search_id) 443 444 # Feed a reply from peer2: found the target data 445 router.netdb_handler.on_search_reply( 446 search_id, 447 peer_hash=peer2, 448 found_data=target_data, 449 ) 450 451 assert router.netdb_handler.is_search_complete(search_id) 452 assert router.netdb_handler.get_search_result(search_id) == target_data 453 454 def test_search_exhausts_peers_without_finding(self): 455 """Test search completing by exhausting all peers.""" 456 router = RouterContext() 457 458 target_key = os.urandom(32) 459 peer1 = os.urandom(32) 460 peer2 = os.urandom(32) 461 known_peers = {peer1: b"p1", peer2: b"p2"} 462 463 search_id = router.netdb_handler.start_search( 464 target_key, known_peers=known_peers 465 ) 466 467 # Both peers reply without finding the target and without new peers 468 router.netdb_handler.on_search_reply(search_id, peer1, None) 469 router.netdb_handler.on_search_reply(search_id, peer2, None) 470 471 assert router.netdb_handler.is_search_complete(search_id) 472 assert router.netdb_handler.get_search_result(search_id) is None 473 474 475class TestRouterContextStatus: 476 """Test 8: RouterContext status reflects all state correctly.""" 477 478 def test_router_context_status(self): 479 router = RouterContext() 480 481 # Initial status 482 status = router.get_status() 483 assert status["tunnels_registered"] == 0 484 assert status["netdb_entries"] == 0 485 assert status["inbound_count"] == 0 486 assert status["outbound_count"] == 0 487 assert len(status["router_hash"]) == 64 # 32 bytes hex-encoded 488 489 # Register tunnels 490 for tid in [10, 20, 30]: 491 router.register_tunnel(tid, os.urandom(32), os.urandom(32)) 492 493 status = router.get_status() 494 assert status["tunnels_registered"] == 3 495 496 # Store NetDB entries 497 for _ in range(5): 498 router.store_netdb_entry(os.urandom(32), b"test entry data") 499 500 status = router.get_status() 501 assert status["netdb_entries"] == 5 502 503 # Status should still reflect all registered tunnels 504 assert status["tunnels_registered"] == 3 505 506 def test_router_context_custom_hash(self): 507 """Verify that a custom router_hash is preserved.""" 508 custom_hash = os.urandom(32) 509 router = RouterContext(router_hash=custom_hash) 510 511 status = router.get_status() 512 assert status["router_hash"] == custom_hash.hex() 513 514 def test_router_context_all_subsystems_wired(self): 515 """Verify that all subsystems are accessible and properly wired.""" 516 router = RouterContext() 517 518 # Session key manager works 519 dest = os.urandom(32) 520 key, tags = router.session_key_mgr.create_session(dest) 521 assert len(key) == 32 522 assert len(tags) == 20 523 524 # Garlic encryptor/decryptor are accessible 525 assert router.garlic_encryptor is not None 526 assert router.garlic_decryptor is not None 527 528 # Tunnel subsystems work 529 router.register_tunnel(1, os.urandom(32), os.urandom(32)) 530 assert router.get_status()["tunnels_registered"] == 1 531 532 # NetDB subsystem works 533 k = os.urandom(32) 534 router.store_netdb_entry(k, b"data") 535 assert router.lookup_netdb_entry(k) == b"data" 536 537 # Outbound routing works 538 result = router.route_outbound(0, b"local payload") 539 assert result["type"] == "local" 540 assert result["payload"] == b"local payload"