"""Tests for tunnel hop encryption/decryption. Covers: TunnelLayerEncryptor, TunnelLayerDecryptor, OutboundTunnelEncryptor, InboundTunnelDecryptor, BuildRecordEncryptor, BuildReplyDecryptor. """ import os import hashlib import pytest from i2p_tunnel.crypto import ( TunnelLayerEncryptor, TunnelLayerDecryptor, OutboundTunnelEncryptor, InboundTunnelDecryptor, BuildRecordEncryptor, BuildReplyDecryptor, ) from i2p_crypto.aes import AESEngine from i2p_crypto.elgamal import ElGamalEngine from i2p_tunnel.builder import BuildRecord # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _random_key() -> bytes: """32-byte random AES key.""" return os.urandom(32) def _random_iv_key() -> bytes: """32-byte random IV key (first 16 bytes used as IV).""" return os.urandom(32) TUNNEL_DATA_SIZE = 1024 # Standard I2P tunnel data size # --------------------------------------------------------------------------- # TunnelLayerEncryptor / TunnelLayerDecryptor — single layer # --------------------------------------------------------------------------- class TestSingleLayerRoundtrip: """encrypt_layer -> decrypt_layer must recover the original data.""" def test_roundtrip_1024_bytes(self): plaintext = os.urandom(TUNNEL_DATA_SIZE) layer_key = _random_key() iv_key = _random_iv_key() ciphertext = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) assert ciphertext != plaintext assert len(ciphertext) == TUNNEL_DATA_SIZE recovered = TunnelLayerDecryptor.decrypt_layer(ciphertext, layer_key, iv_key) assert recovered == plaintext def test_roundtrip_multiple_of_16(self): """Any multiple-of-16 data should work, not just 1024.""" for size in (16, 32, 256, 512): plaintext = os.urandom(size) layer_key = _random_key() iv_key = _random_iv_key() ct = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) assert len(ct) == size pt = TunnelLayerDecryptor.decrypt_layer(ct, layer_key, iv_key) assert pt == plaintext def test_different_keys_different_ciphertext(self): """Different layer keys must produce different ciphertext.""" plaintext = os.urandom(TUNNEL_DATA_SIZE) iv_key = _random_iv_key() key_a = _random_key() key_b = _random_key() ct_a = TunnelLayerEncryptor.encrypt_layer(plaintext, key_a, iv_key) ct_b = TunnelLayerEncryptor.encrypt_layer(plaintext, key_b, iv_key) assert ct_a != ct_b # --------------------------------------------------------------------------- # OutboundTunnelEncryptor — multi-hop # --------------------------------------------------------------------------- class TestOutboundTunnelEncryptor: """Outbound encryption applies layers in reverse; each hop peels one.""" def test_three_hop_peel(self): """Simulate 3-hop outbound tunnel: encrypt, then each hop decrypts.""" plaintext = os.urandom(TUNNEL_DATA_SIZE) # 3 hops: gateway (0), middle (1), endpoint (2) hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys) assert encrypted != plaintext # Each hop decrypts one layer in forward order data = encrypted for layer_key, iv_key in hop_keys: data = TunnelLayerDecryptor.decrypt_layer(data, layer_key, iv_key) assert data == plaintext def test_single_hop(self): plaintext = os.urandom(TUNNEL_DATA_SIZE) hop_keys = [(_random_key(), _random_iv_key())] encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys) decrypted = TunnelLayerDecryptor.decrypt_layer( encrypted, hop_keys[0][0], hop_keys[0][1] ) assert decrypted == plaintext # --------------------------------------------------------------------------- # InboundTunnelDecryptor — multi-hop # --------------------------------------------------------------------------- class TestInboundTunnelDecryptor: """Inbound decryption peels layers in reverse order to match hop encryption.""" def test_three_hop_roundtrip(self): """Each hop encrypts in forward order; endpoint decrypts in reverse. In an inbound tunnel, each hop encrypts a layer as the message passes through (hop 0 first, then hop 1, then hop 2). The endpoint decrypts in reverse order (hop 2 first) to recover the original plaintext. """ plaintext = os.urandom(TUNNEL_DATA_SIZE) hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] # Simulate each hop encrypting in forward order data = plaintext for layer_key, iv_key in hop_keys: data = TunnelLayerEncryptor.encrypt_layer(data, layer_key, iv_key) # Tunnel endpoint decrypts all layers in forward order recovered = InboundTunnelDecryptor.decrypt(data, hop_keys) assert recovered == plaintext def test_single_hop_inbound(self): """Single hop: one encrypt + one decrypt is a simple roundtrip.""" plaintext = os.urandom(TUNNEL_DATA_SIZE) hop_keys = [(_random_key(), _random_iv_key())] encrypted = TunnelLayerEncryptor.encrypt_layer( plaintext, hop_keys[0][0], hop_keys[0][1] ) recovered = InboundTunnelDecryptor.decrypt(encrypted, hop_keys) assert recovered == plaintext # --------------------------------------------------------------------------- # BuildRecordEncryptor — ElGamal # --------------------------------------------------------------------------- class TestBuildRecordEncryptor: """Build record encrypt/decrypt roundtrip using ElGamal.""" def test_roundtrip(self): pub_key, priv_key = ElGamalEngine.generate_keypair() # Create a 222-byte build record record = BuildRecord( receive_tunnel_id=12345, our_ident=os.urandom(32), next_tunnel_id=67890, next_ident=os.urandom(32), layer_key=os.urandom(32), iv_key=os.urandom(32), reply_key=os.urandom(32), reply_iv=os.urandom(16), is_gateway=True, is_endpoint=False, ) record_bytes = record.to_bytes() assert len(record_bytes) == BuildRecord.SIZE # 222 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pub_key) assert len(encrypted) == 528 # Decrypt: strip padding and ElGamal-decrypt # The first 514 bytes are the ElGamal ciphertext decrypted = ElGamalEngine.decrypt(encrypted[:514], priv_key) assert decrypted is not None assert decrypted == record_bytes def test_output_size(self): pub_key, _ = ElGamalEngine.generate_keypair() record_bytes = os.urandom(222) encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pub_key) assert len(encrypted) == 528 # --------------------------------------------------------------------------- # BuildReplyDecryptor # --------------------------------------------------------------------------- class TestBuildReplyDecryptor: """Build reply AES encrypt→decrypt roundtrip.""" def test_roundtrip(self): reply_key = _random_key() reply_iv = os.urandom(16) # Reply record is typically 496 bytes, but must be multiple of 16 reply_data = os.urandom(496) # Encrypt (simulating what the hop does) encrypted = AESEngine.encrypt(reply_data, reply_key, reply_iv[:16]) decrypted = BuildReplyDecryptor.decrypt_reply(encrypted, reply_key, reply_iv) assert decrypted == reply_data def test_different_keys_different_output(self): reply_iv = os.urandom(16) reply_data = os.urandom(496) key_a = _random_key() key_b = _random_key() enc_a = AESEngine.encrypt(reply_data, key_a, reply_iv[:16]) enc_b = AESEngine.encrypt(reply_data, key_b, reply_iv[:16]) dec_a = BuildReplyDecryptor.decrypt_reply(enc_a, key_a, reply_iv) dec_b = BuildReplyDecryptor.decrypt_reply(enc_b, key_b, reply_iv) assert dec_a == dec_b == reply_data assert enc_a != enc_b # --------------------------------------------------------------------------- # Data alignment # --------------------------------------------------------------------------- class TestDataAlignment: """Standard tunnel data is 1024 bytes — verify it works end-to-end.""" def test_1024_byte_outbound_inbound(self): plaintext = os.urandom(1024) hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] # Outbound: encrypt encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys) assert len(encrypted) == 1024 # Simulate each hop decrypting data = encrypted for lk, ik in hop_keys: data = TunnelLayerDecryptor.decrypt_layer(data, lk, ik) assert data == plaintext