"""Tests for tunnel hop encryption/decryption.""" import os import pytest class TestTunnelLayerEncryptDecrypt: def test_single_layer_roundtrip(self): from i2p_tunnel.crypto import TunnelLayerEncryptor, TunnelLayerDecryptor data = os.urandom(1024) # typical tunnel data size layer_key = os.urandom(32) iv_key = os.urandom(32) encrypted = TunnelLayerEncryptor.encrypt_layer(data, layer_key, iv_key) assert encrypted != data assert len(encrypted) == len(data) decrypted = TunnelLayerDecryptor.decrypt_layer(encrypted, layer_key, iv_key) assert decrypted == data def test_different_keys_different_output(self): from i2p_tunnel.crypto import TunnelLayerEncryptor data = os.urandom(1024) iv_key = os.urandom(32) enc1 = TunnelLayerEncryptor.encrypt_layer(data, os.urandom(32), iv_key) enc2 = TunnelLayerEncryptor.encrypt_layer(data, os.urandom(32), iv_key) assert enc1 != enc2 def test_different_iv_keys(self): from i2p_tunnel.crypto import TunnelLayerEncryptor data = os.urandom(1024) key = os.urandom(32) enc1 = TunnelLayerEncryptor.encrypt_layer(data, key, os.urandom(32)) enc2 = TunnelLayerEncryptor.encrypt_layer(data, key, os.urandom(32)) assert enc1 != enc2 def test_min_block_size(self): from i2p_tunnel.crypto import TunnelLayerEncryptor, TunnelLayerDecryptor data = os.urandom(16) # minimum: one AES block key = os.urandom(32) iv_key = os.urandom(32) encrypted = TunnelLayerEncryptor.encrypt_layer(data, key, iv_key) decrypted = TunnelLayerDecryptor.decrypt_layer(encrypted, key, iv_key) assert decrypted == data class TestOutboundTunnelEncryptor: def test_multi_hop_roundtrip(self): from i2p_tunnel.crypto import ( OutboundTunnelEncryptor, TunnelLayerDecryptor, ) data = os.urandom(1024) # 3 hops: gateway, middle, endpoint hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)] encrypted = OutboundTunnelEncryptor.encrypt(data, hop_keys) assert encrypted != data # Each hop peels one layer (forward order) result = encrypted for layer_key, iv_key in hop_keys: result = TunnelLayerDecryptor.decrypt_layer(result, layer_key, iv_key) assert result == data def test_single_hop(self): from i2p_tunnel.crypto import ( OutboundTunnelEncryptor, TunnelLayerDecryptor, ) data = os.urandom(1024) hop_keys = [(os.urandom(32), os.urandom(32))] encrypted = OutboundTunnelEncryptor.encrypt(data, hop_keys) decrypted = TunnelLayerDecryptor.decrypt_layer( encrypted, hop_keys[0][0], hop_keys[0][1] ) assert decrypted == data class TestInboundTunnelDecryptor: def test_multi_hop_roundtrip(self): from i2p_tunnel.crypto import ( InboundTunnelDecryptor, TunnelLayerEncryptor, ) data = os.urandom(1024) hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)] # Simulate each hop encrypting a layer (forward order) result = data for layer_key, iv_key in hop_keys: result = TunnelLayerEncryptor.encrypt_layer(result, layer_key, iv_key) # Endpoint decrypts all layers decrypted = InboundTunnelDecryptor.decrypt(result, hop_keys) assert decrypted == data class TestBuildRecordEncryptor: def test_encrypt_record_produces_528_bytes(self): from i2p_tunnel.crypto import BuildRecordEncryptor from i2p_crypto.elgamal import ElGamalEngine # Generate ElGamal key pair (returns pub, priv) pubkey, privkey = ElGamalEngine.generate_keypair() record_bytes = os.urandom(222) # standard build record size encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pubkey) assert len(encrypted) == 528 def test_encrypt_decrypt_roundtrip(self): from i2p_tunnel.crypto import BuildRecordEncryptor from i2p_crypto.elgamal import ElGamalEngine pubkey, privkey = ElGamalEngine.generate_keypair() record_bytes = os.urandom(222) encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pubkey) # Strip padding and decrypt decrypted = ElGamalEngine.decrypt(encrypted[:514], privkey) assert decrypted == record_bytes class TestBuildReplyDecryptor: def test_decrypt_reply_roundtrip(self): from i2p_tunnel.crypto import BuildReplyDecryptor from i2p_crypto.aes import AESEngine reply_data = os.urandom(496) # standard reply record size reply_key = os.urandom(32) reply_iv = os.urandom(16) encrypted = AESEngine.encrypt(reply_data, reply_key, reply_iv) decrypted = BuildReplyDecryptor.decrypt_reply(encrypted, reply_key, reply_iv) assert decrypted == reply_data