A Python port of the Invisible Internet Project (I2P)
1"""Tests for tunnel hop encryption/decryption."""
2
3import os
4
5import pytest
6
7
8class TestTunnelLayerEncryptDecrypt:
9 def test_single_layer_roundtrip(self):
10 from i2p_tunnel.crypto import TunnelLayerEncryptor, TunnelLayerDecryptor
11
12 data = os.urandom(1024) # typical tunnel data size
13 layer_key = os.urandom(32)
14 iv_key = os.urandom(32)
15 encrypted = TunnelLayerEncryptor.encrypt_layer(data, layer_key, iv_key)
16 assert encrypted != data
17 assert len(encrypted) == len(data)
18 decrypted = TunnelLayerDecryptor.decrypt_layer(encrypted, layer_key, iv_key)
19 assert decrypted == data
20
21 def test_different_keys_different_output(self):
22 from i2p_tunnel.crypto import TunnelLayerEncryptor
23
24 data = os.urandom(1024)
25 iv_key = os.urandom(32)
26 enc1 = TunnelLayerEncryptor.encrypt_layer(data, os.urandom(32), iv_key)
27 enc2 = TunnelLayerEncryptor.encrypt_layer(data, os.urandom(32), iv_key)
28 assert enc1 != enc2
29
30 def test_different_iv_keys(self):
31 from i2p_tunnel.crypto import TunnelLayerEncryptor
32
33 data = os.urandom(1024)
34 key = os.urandom(32)
35 enc1 = TunnelLayerEncryptor.encrypt_layer(data, key, os.urandom(32))
36 enc2 = TunnelLayerEncryptor.encrypt_layer(data, key, os.urandom(32))
37 assert enc1 != enc2
38
39 def test_min_block_size(self):
40 from i2p_tunnel.crypto import TunnelLayerEncryptor, TunnelLayerDecryptor
41
42 data = os.urandom(16) # minimum: one AES block
43 key = os.urandom(32)
44 iv_key = os.urandom(32)
45 encrypted = TunnelLayerEncryptor.encrypt_layer(data, key, iv_key)
46 decrypted = TunnelLayerDecryptor.decrypt_layer(encrypted, key, iv_key)
47 assert decrypted == data
48
49
50class TestOutboundTunnelEncryptor:
51 def test_multi_hop_roundtrip(self):
52 from i2p_tunnel.crypto import (
53 OutboundTunnelEncryptor,
54 TunnelLayerDecryptor,
55 )
56
57 data = os.urandom(1024)
58 # 3 hops: gateway, middle, endpoint
59 hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)]
60 encrypted = OutboundTunnelEncryptor.encrypt(data, hop_keys)
61 assert encrypted != data
62
63 # Each hop peels one layer (forward order)
64 result = encrypted
65 for layer_key, iv_key in hop_keys:
66 result = TunnelLayerDecryptor.decrypt_layer(result, layer_key, iv_key)
67 assert result == data
68
69 def test_single_hop(self):
70 from i2p_tunnel.crypto import (
71 OutboundTunnelEncryptor,
72 TunnelLayerDecryptor,
73 )
74
75 data = os.urandom(1024)
76 hop_keys = [(os.urandom(32), os.urandom(32))]
77 encrypted = OutboundTunnelEncryptor.encrypt(data, hop_keys)
78 decrypted = TunnelLayerDecryptor.decrypt_layer(
79 encrypted, hop_keys[0][0], hop_keys[0][1]
80 )
81 assert decrypted == data
82
83
84class TestInboundTunnelDecryptor:
85 def test_multi_hop_roundtrip(self):
86 from i2p_tunnel.crypto import (
87 InboundTunnelDecryptor,
88 TunnelLayerEncryptor,
89 )
90
91 data = os.urandom(1024)
92 hop_keys = [(os.urandom(32), os.urandom(32)) for _ in range(3)]
93
94 # Simulate each hop encrypting a layer (forward order)
95 result = data
96 for layer_key, iv_key in hop_keys:
97 result = TunnelLayerEncryptor.encrypt_layer(result, layer_key, iv_key)
98
99 # Endpoint decrypts all layers
100 decrypted = InboundTunnelDecryptor.decrypt(result, hop_keys)
101 assert decrypted == data
102
103
104class TestBuildRecordEncryptor:
105 def test_encrypt_record_produces_528_bytes(self):
106 from i2p_tunnel.crypto import BuildRecordEncryptor
107 from i2p_crypto.elgamal import ElGamalEngine
108
109 # Generate ElGamal key pair (returns pub, priv)
110 pubkey, privkey = ElGamalEngine.generate_keypair()
111 record_bytes = os.urandom(222) # standard build record size
112 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pubkey)
113 assert len(encrypted) == 528
114
115 def test_encrypt_decrypt_roundtrip(self):
116 from i2p_tunnel.crypto import BuildRecordEncryptor
117 from i2p_crypto.elgamal import ElGamalEngine
118
119 pubkey, privkey = ElGamalEngine.generate_keypair()
120 record_bytes = os.urandom(222)
121 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pubkey)
122 # Strip padding and decrypt
123 decrypted = ElGamalEngine.decrypt(encrypted[:514], privkey)
124 assert decrypted == record_bytes
125
126
127class TestBuildReplyDecryptor:
128 def test_decrypt_reply_roundtrip(self):
129 from i2p_tunnel.crypto import BuildReplyDecryptor
130 from i2p_crypto.aes import AESEngine
131
132 reply_data = os.urandom(496) # standard reply record size
133 reply_key = os.urandom(32)
134 reply_iv = os.urandom(16)
135 encrypted = AESEngine.encrypt(reply_data, reply_key, reply_iv)
136 decrypted = BuildReplyDecryptor.decrypt_reply(encrypted, reply_key, reply_iv)
137 assert decrypted == reply_data