A Python port of the Invisible Internet Project (I2P)
at main 137 lines 5.1 kB view raw
1"""Tests for tunnel hop encryption/decryption.""" 2 3import os 4 5import pytest 6 7 8class TestTunnelLayerEncryptDecrypt: 9 def test_single_layer_roundtrip(self): 10 from i2p_tunnel.crypto import TunnelLayerEncryptor, TunnelLayerDecryptor 11 12 data = os.urandom(1024) # typical tunnel data size 13 layer_key = os.urandom(32) 14 iv_key = os.urandom(32) 15 encrypted = TunnelLayerEncryptor.encrypt_layer(data, layer_key, iv_key) 16 assert encrypted != data 17 assert len(encrypted) == len(data) 18 decrypted = TunnelLayerDecryptor.decrypt_layer(encrypted, layer_key, iv_key) 19 assert decrypted == data 20 21 def test_different_keys_different_output(self): 22 from i2p_tunnel.crypto import TunnelLayerEncryptor 23 24 data = os.urandom(1024) 25 iv_key = os.urandom(32) 26 enc1 = TunnelLayerEncryptor.encrypt_layer(data, os.urandom(32), iv_key) 27 enc2 = TunnelLayerEncryptor.encrypt_layer(data, os.urandom(32), iv_key) 28 assert enc1 != enc2 29 30 def test_different_iv_keys(self): 31 from i2p_tunnel.crypto import TunnelLayerEncryptor 32 33 data = os.urandom(1024) 34 key = os.urandom(32) 35 enc1 = TunnelLayerEncryptor.encrypt_layer(data, key, os.urandom(32)) 36 enc2 = TunnelLayerEncryptor.encrypt_layer(data, key, os.urandom(32)) 37 assert enc1 != enc2 38 39 def test_min_block_size(self): 40 from i2p_tunnel.crypto import TunnelLayerEncryptor, TunnelLayerDecryptor 41 42 data = os.urandom(16) # minimum: one AES block 43 key = os.urandom(32) 44 iv_key = os.urandom(32) 45 encrypted = TunnelLayerEncryptor.encrypt_layer(data, key, iv_key) 46 decrypted = TunnelLayerDecryptor.decrypt_layer(encrypted, key, iv_key) 47 assert decrypted == data 48 49 50class TestOutboundTunnelEncryptor: 51 def test_multi_hop_roundtrip(self): 52 from i2p_tunnel.crypto import ( 53 OutboundTunnelEncryptor, 54 TunnelLayerDecryptor, 55 ) 56 57 data = os.urandom(1024) 58 # 3 hops: gateway, middle, endpoint 59 hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)] 60 encrypted = OutboundTunnelEncryptor.encrypt(data, hop_keys) 61 assert encrypted != data 62 63 # Each hop peels one layer (forward order) 64 result = encrypted 65 for layer_key, iv_key in hop_keys: 66 result = TunnelLayerDecryptor.decrypt_layer(result, layer_key, iv_key) 67 assert result == data 68 69 def test_single_hop(self): 70 from i2p_tunnel.crypto import ( 71 OutboundTunnelEncryptor, 72 TunnelLayerDecryptor, 73 ) 74 75 data = os.urandom(1024) 76 hop_keys = [(os.urandom(32), os.urandom(32))] 77 encrypted = OutboundTunnelEncryptor.encrypt(data, hop_keys) 78 decrypted = TunnelLayerDecryptor.decrypt_layer( 79 encrypted, hop_keys[0][0], hop_keys[0][1] 80 ) 81 assert decrypted == data 82 83 84class TestInboundTunnelDecryptor: 85 def test_multi_hop_roundtrip(self): 86 from i2p_tunnel.crypto import ( 87 InboundTunnelDecryptor, 88 TunnelLayerEncryptor, 89 ) 90 91 data = os.urandom(1024) 92 hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)] 93 94 # Simulate each hop encrypting a layer (forward order) 95 result = data 96 for layer_key, iv_key in hop_keys: 97 result = TunnelLayerEncryptor.encrypt_layer(result, layer_key, iv_key) 98 99 # Endpoint decrypts all layers 100 decrypted = InboundTunnelDecryptor.decrypt(result, hop_keys) 101 assert decrypted == data 102 103 104class TestBuildRecordEncryptor: 105 def test_encrypt_record_produces_528_bytes(self): 106 from i2p_tunnel.crypto import BuildRecordEncryptor 107 from i2p_crypto.elgamal import ElGamalEngine 108 109 # Generate ElGamal key pair (returns pub, priv) 110 pubkey, privkey = ElGamalEngine.generate_keypair() 111 record_bytes = os.urandom(222) # standard build record size 112 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pubkey) 113 assert len(encrypted) == 528 114 115 def test_encrypt_decrypt_roundtrip(self): 116 from i2p_tunnel.crypto import BuildRecordEncryptor 117 from i2p_crypto.elgamal import ElGamalEngine 118 119 pubkey, privkey = ElGamalEngine.generate_keypair() 120 record_bytes = os.urandom(222) 121 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pubkey) 122 # Strip padding and decrypt 123 decrypted = ElGamalEngine.decrypt(encrypted[:514], privkey) 124 assert decrypted == record_bytes 125 126 127class TestBuildReplyDecryptor: 128 def test_decrypt_reply_roundtrip(self): 129 from i2p_tunnel.crypto import BuildReplyDecryptor 130 from i2p_crypto.aes import AESEngine 131 132 reply_data = os.urandom(496) # standard reply record size 133 reply_key = os.urandom(32) 134 reply_iv = os.urandom(16) 135 encrypted = AESEngine.encrypt(reply_data, reply_key, reply_iv) 136 decrypted = BuildReplyDecryptor.decrypt_reply(encrypted, reply_key, reply_iv) 137 assert decrypted == reply_data