"""End-to-end integration tests for the I2P Python port. These tests exercise multiple subsystems together through the RouterContext, verifying that crypto, tunnel, NetDB, transport, and streaming components integrate correctly. """ import os import struct import time import pytest from i2p_router.core import RouterContext from i2p_crypto.session_key_manager import SessionKeyManager from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor from i2p_crypto.aes import AESEngine from i2p_tunnel.crypto import ( TunnelLayerEncryptor, TunnelLayerDecryptor, OutboundTunnelEncryptor, ) from i2p_tunnel.data_handler import TunnelCryptoRegistry, TunnelDataHandler from i2p_transport.ntcp2_handshake import NTCP2Handshake, NTCP2FrameCodec from i2p_transport.ntcp2 import NTCP2Frame, FrameType from i2p_crypto.x25519 import X25519DH from i2p_netdb.operations import SearchOperation, StoreOperation from i2p_streaming.stream_io import MessageInputStream, MessageOutputStream, StreamSession from i2p_data.garlic import ( GarlicMessage, GarlicClove, DeliveryInstructions, DeliveryType, ) def _pad16(data: bytes) -> bytes: """Pad data to a multiple of 16 bytes with zero bytes.""" remainder = len(data) % 16 if remainder == 0 and len(data) > 0: return data pad_len = 16 - remainder return data + b"\x00" * pad_len def _build_garlic_plaintext(message_data: bytes) -> bytes: """Build a valid GarlicMessage plaintext containing one clove with the given data. Returns bytes that are a multiple of 16. """ di = DeliveryInstructions(DeliveryType.LOCAL) clove = GarlicClove( delivery_instructions=di, message_data=message_data, clove_id=1, expiration=int(time.time() * 1000) + 600_000, ) garlic_msg = GarlicMessage([clove]) raw = garlic_msg.to_bytes() return _pad16(raw) class TestGarlicEncryptDecryptThroughRouter: """Test 1: Garlic encrypt/decrypt through RouterContext.""" def test_garlic_encrypt_decrypt_through_router(self): router = RouterContext() # Create a session (generates session key + tags) dest_hash = os.urandom(32) session_key, tags = router.session_key_mgr.create_session(dest_hash) # Build garlic plaintext with a clove containing our test data test_payload = b"hello from garlic clove" garlic_plaintext = _build_garlic_plaintext(test_payload) # Pick a session tag and encrypt session_tag = tags[0] encrypted_payload = GarlicEncryptor.encrypt( garlic_plaintext, session_key, session_tag ) # Feed through router's inbound handler (type 11 = GARLIC) result = router.process_inbound(11, encrypted_payload) # The garlic handler returns a list of clove message_data payloads assert isinstance(result, list) assert len(result) == 1 assert result[0] == test_payload class TestTunnelDataMultiHop: """Test 2: Multi-hop tunnel encryption and decryption.""" def test_tunnel_data_multi_hop(self): # Create 3 sets of hop keys (layer_key, iv_key) hop_keys = [ (os.urandom(32), os.urandom(32)) # hop 0 (gateway) for _ in range(3) ] # Use distinct keys for each hop hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)] # The endpoint is hop 2 (last hop) endpoint_layer_key, endpoint_iv_key = hop_keys[2] # Create a RouterContext for the endpoint router = RouterContext() endpoint_tunnel_id = 42 router.register_tunnel( endpoint_tunnel_id, endpoint_layer_key, endpoint_iv_key, is_endpoint=True, ) # Original plaintext (must be multiple of 16) original_plaintext = _pad16(b"tunnel payload data for endpoint delivery") # Encrypt with OutboundTunnelEncryptor (applies layers in reverse order) fully_encrypted = OutboundTunnelEncryptor.encrypt(original_plaintext, hop_keys) # Simulate intermediate hops peeling layers # Hop 0 (gateway) decrypts its layer after_hop0 = TunnelLayerDecryptor.decrypt_layer( fully_encrypted, hop_keys[0][0], hop_keys[0][1] ) # Hop 1 (intermediate) decrypts its layer after_hop1 = TunnelLayerDecryptor.decrypt_layer( after_hop0, hop_keys[1][0], hop_keys[1][1] ) # At the endpoint, call router.process_tunnel_data() result = router.process_tunnel_data(endpoint_tunnel_id, after_hop1) assert result["action"] == "deliver" assert result["data"] == original_plaintext class TestTunnelBuildAndRegisterLifecycle: """Test 3: Register a tunnel and verify lifecycle.""" def test_tunnel_build_and_register_lifecycle(self): router = RouterContext() tunnel_id = 100 layer_key = os.urandom(32) iv_key = os.urandom(32) # Register the tunnel router.register_tunnel(tunnel_id, layer_key, iv_key, is_endpoint=True) # Verify process_tunnel_data works on that tunnel plaintext = _pad16(b"lifecycle test data") # Encrypt a single layer (since only one hop registered) encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) result = router.process_tunnel_data(tunnel_id, encrypted) assert result["action"] == "deliver" assert result["data"] == plaintext # Check get_status() shows the tunnel status = router.get_status() assert status["tunnels_registered"] == 1 def test_unknown_tunnel_returns_unknown(self): router = RouterContext() data = _pad16(b"data for unknown tunnel") result = router.process_tunnel_data(999, data) assert result["action"] == "unknown" assert result["tunnel_id"] == 999 class TestNetDBStoreThenLookupViaInbound: """Test 4: Store and lookup NetDB entries via inbound message processing.""" def test_netdb_store_then_lookup_via_inbound(self): router = RouterContext() # Store an entry via store_netdb_entry() key = os.urandom(32) data = b"router info data for testing netdb store and lookup" router.store_netdb_entry(key, data) # Lookup the entry via process_inbound(2, key_payload) (DATABASE_LOOKUP type) # The handler extracts the first 32 bytes as the key result = router.process_inbound(2, key) assert result == data def test_netdb_lookup_missing_returns_none(self): router = RouterContext() missing_key = os.urandom(32) result = router.process_inbound(2, missing_key) assert result is None def test_netdb_store_via_inbound(self): router = RouterContext() key = os.urandom(32) data = b"stored via inbound message" # DATABASE_STORE is type 1; payload = key(32) + data store_payload = key + data router.process_inbound(1, store_payload) # Verify it was stored by looking it up result = router.lookup_netdb_entry(key) assert result == data class TestNTCP2HandshakeThenFrameExchange: """Test 5: Full NTCP2 Noise_XK handshake followed by frame exchange.""" def test_ntcp2_handshake_then_frame_exchange(self): # Generate static keypairs for initiator and responder init_priv, init_pub = X25519DH.generate_keypair() resp_priv, resp_pub = X25519DH.generate_keypair() # Create handshake objects # Initiator knows responder's static public key initiator = NTCP2Handshake( our_static=(init_priv, init_pub), peer_static_pub=resp_pub, initiator=True, ) # Responder does not know initiator's static key ahead of time responder = NTCP2Handshake( our_static=(resp_priv, resp_pub), peer_static_pub=None, initiator=False, ) # Message 1: initiator -> responder msg1 = initiator.create_message_1(b"") # Message 2: responder processes msg1, produces msg2 msg2 = responder.process_message_1(msg1) # Message 3: initiator processes msg2, produces msg3 msg3 = initiator.process_message_2(msg2) # Responder processes msg3 responder.process_message_3(msg3) # Both sides should be complete assert initiator.is_complete() assert responder.is_complete() # Responder should have learned initiator's static key assert responder.remote_static_key() == init_pub # Get cipher states init_send, init_recv = initiator.split() resp_send, resp_recv = responder.split() # Create a frame codec codec = NTCP2FrameCodec() # Test: initiator sends a frame, responder decrypts it test_payload = b"hello from initiator to responder" frame = NTCP2Frame(FrameType.I2NP, test_payload) encrypted = codec.encrypt_frame(init_send, frame) decrypted_frame = codec.decrypt_frame(resp_recv, encrypted) assert decrypted_frame.frame_type == FrameType.I2NP assert decrypted_frame.payload == test_payload # Test: responder sends a frame, initiator decrypts it reply_payload = b"reply from responder to initiator" reply_frame = NTCP2Frame(FrameType.I2NP, reply_payload) encrypted_reply = codec.encrypt_frame(resp_send, reply_frame) decrypted_reply = codec.decrypt_frame(init_recv, encrypted_reply) assert decrypted_reply.frame_type == FrameType.I2NP assert decrypted_reply.payload == reply_payload def test_ntcp2_multiple_frames_maintain_nonce(self): """Verify that multiple frames can be sent/received with nonce advancement.""" init_priv, init_pub = X25519DH.generate_keypair() resp_priv, resp_pub = X25519DH.generate_keypair() initiator = NTCP2Handshake( our_static=(init_priv, init_pub), peer_static_pub=resp_pub, initiator=True, ) responder = NTCP2Handshake( our_static=(resp_priv, resp_pub), initiator=False, ) msg1 = initiator.create_message_1() msg2 = responder.process_message_1(msg1) msg3 = initiator.process_message_2(msg2) responder.process_message_3(msg3) init_send, init_recv = initiator.split() resp_send, resp_recv = responder.split() codec = NTCP2FrameCodec() # Send multiple frames in sequence for i in range(5): payload = f"message number {i}".encode() frame = NTCP2Frame(FrameType.I2NP, payload) encrypted = codec.encrypt_frame(init_send, frame) decrypted = codec.decrypt_frame(resp_recv, encrypted) assert decrypted.payload == payload class TestStreamingSessionDataFlow: """Test 6: Streaming session data flow.""" def test_streaming_session_data_flow(self): # Create a StreamSession and connect session = StreamSession(local_id=1000) seq, syn_data = session.connect() assert session.state == "SYN_SENT" # Simulate receiving SYN-ACK remote_id = 2000 session.receive_syn_ack(remote_id) assert session.state == "ESTABLISHED" # Send data test_data = b"streaming data payload for integration test" packets = session.send(test_data) assert len(packets) > 0 # Feed the sent packets into a MessageInputStream (simulating receiver) input_stream = MessageInputStream() for seq_num, chunk in packets: input_stream.receive_packet(seq_num, chunk) # Read the data back readable = input_stream.readable_bytes() assert readable == len(test_data) received = input_stream.read(readable) assert received == test_data def test_streaming_session_bidirectional(self): """Test bidirectional data flow between two sessions.""" # Session A (initiator) session_a = StreamSession(local_id=100) _, syn_data = session_a.connect() # Session B (responder) session_b = StreamSession(local_id=200) remote_id_a = struct.unpack("!I", syn_data)[0] _, syn_ack_data = session_b.accept(remote_id_a) # Complete handshake remote_id_b = struct.unpack("!II", syn_ack_data)[1] session_a.receive_syn_ack(session_b.local_id) assert session_a.state == "ESTABLISHED" assert session_b.state == "ESTABLISHED" # A sends to B data_a = b"hello from A" packets_a = session_a.send(data_a) for seq_num, chunk in packets_a: session_b.receive(seq_num, chunk) assert session_b.read(len(data_a)) == data_a # B sends to A data_b = b"hello from B" packets_b = session_b.send(data_b) for seq_num, chunk in packets_b: session_a.receive(seq_num, chunk) assert session_a.read(len(data_b)) == data_b def test_streaming_large_data_chunked(self): """Test that large data is properly chunked and reassembled.""" session = StreamSession(local_id=3000) session.connect() session.receive_syn_ack(4000) # Send data larger than max_packet_size (default 1024) large_data = os.urandom(3000) packets = session.send(large_data) # Should be chunked into multiple packets assert len(packets) > 1 # Reassemble on receive side input_stream = MessageInputStream() for seq_num, chunk in packets: input_stream.receive_packet(seq_num, chunk) received = input_stream.read(input_stream.readable_bytes()) assert received == large_data class TestFullMessageFlowNetDBStoreAndSearch: """Test 7: Full message flow with NetDB store and search operations.""" def test_full_message_flow_netdb_store_and_search(self): router = RouterContext() # Store several entries in the NetDB entries = {} for i in range(5): key = os.urandom(32) data = f"router info entry {i}".encode() router.store_netdb_entry(key, data) entries[key] = data # Pick a target to search for target_key = list(entries.keys())[2] target_data = entries[target_key] # Create known peers (some with keys that are "close" to target) peer1 = os.urandom(32) peer2 = os.urandom(32) peer3 = os.urandom(32) known_peers = { peer1: b"peer1_info", peer2: b"peer2_info", peer3: b"peer3_info", } # Start a search via the netdb_handler search_id = router.netdb_handler.start_search( target_key, known_peers=known_peers ) # Feed a reply from peer1: didn't find it, but knows a closer peer closer_peer = os.urandom(32) next_queries = router.netdb_handler.on_search_reply( search_id, peer_hash=peer1, found_data=None, closer_peers={closer_peer: b"closer_peer_info"}, ) assert not router.netdb_handler.is_search_complete(search_id) # Feed a reply from peer2: found the target data router.netdb_handler.on_search_reply( search_id, peer_hash=peer2, found_data=target_data, ) assert router.netdb_handler.is_search_complete(search_id) assert router.netdb_handler.get_search_result(search_id) == target_data def test_search_exhausts_peers_without_finding(self): """Test search completing by exhausting all peers.""" router = RouterContext() target_key = os.urandom(32) peer1 = os.urandom(32) peer2 = os.urandom(32) known_peers = {peer1: b"p1", peer2: b"p2"} search_id = router.netdb_handler.start_search( target_key, known_peers=known_peers ) # Both peers reply without finding the target and without new peers router.netdb_handler.on_search_reply(search_id, peer1, None) router.netdb_handler.on_search_reply(search_id, peer2, None) assert router.netdb_handler.is_search_complete(search_id) assert router.netdb_handler.get_search_result(search_id) is None class TestRouterContextStatus: """Test 8: RouterContext status reflects all state correctly.""" def test_router_context_status(self): router = RouterContext() # Initial status status = router.get_status() assert status["tunnels_registered"] == 0 assert status["netdb_entries"] == 0 assert status["inbound_count"] == 0 assert status["outbound_count"] == 0 assert len(status["router_hash"]) == 64 # 32 bytes hex-encoded # Register tunnels for tid in [10, 20, 30]: router.register_tunnel(tid, os.urandom(32), os.urandom(32)) status = router.get_status() assert status["tunnels_registered"] == 3 # Store NetDB entries for _ in range(5): router.store_netdb_entry(os.urandom(32), b"test entry data") status = router.get_status() assert status["netdb_entries"] == 5 # Status should still reflect all registered tunnels assert status["tunnels_registered"] == 3 def test_router_context_custom_hash(self): """Verify that a custom router_hash is preserved.""" custom_hash = os.urandom(32) router = RouterContext(router_hash=custom_hash) status = router.get_status() assert status["router_hash"] == custom_hash.hex() def test_router_context_all_subsystems_wired(self): """Verify that all subsystems are accessible and properly wired.""" router = RouterContext() # Session key manager works dest = os.urandom(32) key, tags = router.session_key_mgr.create_session(dest) assert len(key) == 32 assert len(tags) == 20 # Garlic encryptor/decryptor are accessible assert router.garlic_encryptor is not None assert router.garlic_decryptor is not None # Tunnel subsystems work router.register_tunnel(1, os.urandom(32), os.urandom(32)) assert router.get_status()["tunnels_registered"] == 1 # NetDB subsystem works k = os.urandom(32) router.store_netdb_entry(k, b"data") assert router.lookup_netdb_entry(k) == b"data" # Outbound routing works result = router.route_outbound(0, b"local payload") assert result["type"] == "local" assert result["payload"] == b"local payload"