A Python port of the Invisible Internet Project (I2P)
1"""Tests for tunnel hop encryption/decryption.
2
3Covers: TunnelLayerEncryptor, TunnelLayerDecryptor,
4 OutboundTunnelEncryptor, InboundTunnelDecryptor,
5 BuildRecordEncryptor, BuildReplyDecryptor.
6"""
7
8import os
9import hashlib
10
11import pytest
12
13from i2p_tunnel.crypto import (
14 TunnelLayerEncryptor,
15 TunnelLayerDecryptor,
16 OutboundTunnelEncryptor,
17 InboundTunnelDecryptor,
18 BuildRecordEncryptor,
19 BuildReplyDecryptor,
20)
21from i2p_crypto.aes import AESEngine
22from i2p_crypto.elgamal import ElGamalEngine
23from i2p_tunnel.builder import BuildRecord
24
25
26# ---------------------------------------------------------------------------
27# Helpers
28# ---------------------------------------------------------------------------
29
30def _random_key() -> bytes:
31 """32-byte random AES key."""
32 return os.urandom(32)
33
34
35def _random_iv_key() -> bytes:
36 """32-byte random IV key (first 16 bytes used as IV)."""
37 return os.urandom(32)
38
39
40TUNNEL_DATA_SIZE = 1024 # Standard I2P tunnel data size
41
42
43# ---------------------------------------------------------------------------
44# TunnelLayerEncryptor / TunnelLayerDecryptor — single layer
45# ---------------------------------------------------------------------------
46
47class TestSingleLayerRoundtrip:
48 """encrypt_layer -> decrypt_layer must recover the original data."""
49
50 def test_roundtrip_1024_bytes(self):
51 plaintext = os.urandom(TUNNEL_DATA_SIZE)
52 layer_key = _random_key()
53 iv_key = _random_iv_key()
54
55 ciphertext = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key)
56 assert ciphertext != plaintext
57 assert len(ciphertext) == TUNNEL_DATA_SIZE
58
59 recovered = TunnelLayerDecryptor.decrypt_layer(ciphertext, layer_key, iv_key)
60 assert recovered == plaintext
61
62 def test_roundtrip_multiple_of_16(self):
63 """Any multiple-of-16 data should work, not just 1024."""
64 for size in (16, 32, 256, 512):
65 plaintext = os.urandom(size)
66 layer_key = _random_key()
67 iv_key = _random_iv_key()
68
69 ct = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key)
70 assert len(ct) == size
71 pt = TunnelLayerDecryptor.decrypt_layer(ct, layer_key, iv_key)
72 assert pt == plaintext
73
74 def test_different_keys_different_ciphertext(self):
75 """Different layer keys must produce different ciphertext."""
76 plaintext = os.urandom(TUNNEL_DATA_SIZE)
77 iv_key = _random_iv_key()
78 key_a = _random_key()
79 key_b = _random_key()
80
81 ct_a = TunnelLayerEncryptor.encrypt_layer(plaintext, key_a, iv_key)
82 ct_b = TunnelLayerEncryptor.encrypt_layer(plaintext, key_b, iv_key)
83 assert ct_a != ct_b
84
85
86# ---------------------------------------------------------------------------
87# OutboundTunnelEncryptor — multi-hop
88# ---------------------------------------------------------------------------
89
90class TestOutboundTunnelEncryptor:
91 """Outbound encryption applies layers in reverse; each hop peels one."""
92
93 def test_three_hop_peel(self):
94 """Simulate 3-hop outbound tunnel: encrypt, then each hop decrypts."""
95 plaintext = os.urandom(TUNNEL_DATA_SIZE)
96
97 # 3 hops: gateway (0), middle (1), endpoint (2)
98 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)]
99
100 encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys)
101 assert encrypted != plaintext
102
103 # Each hop decrypts one layer in forward order
104 data = encrypted
105 for layer_key, iv_key in hop_keys:
106 data = TunnelLayerDecryptor.decrypt_layer(data, layer_key, iv_key)
107
108 assert data == plaintext
109
110 def test_single_hop(self):
111 plaintext = os.urandom(TUNNEL_DATA_SIZE)
112 hop_keys = [(_random_key(), _random_iv_key())]
113
114 encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys)
115 decrypted = TunnelLayerDecryptor.decrypt_layer(
116 encrypted, hop_keys[0][0], hop_keys[0][1]
117 )
118 assert decrypted == plaintext
119
120
121# ---------------------------------------------------------------------------
122# InboundTunnelDecryptor — multi-hop
123# ---------------------------------------------------------------------------
124
125class TestInboundTunnelDecryptor:
126 """Inbound decryption peels layers in reverse order to match hop encryption."""
127
128 def test_three_hop_roundtrip(self):
129 """Each hop encrypts in forward order; endpoint decrypts in reverse.
130
131 In an inbound tunnel, each hop encrypts a layer as the message
132 passes through (hop 0 first, then hop 1, then hop 2). The
133 endpoint decrypts in reverse order (hop 2 first) to recover
134 the original plaintext.
135 """
136 plaintext = os.urandom(TUNNEL_DATA_SIZE)
137
138 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)]
139
140 # Simulate each hop encrypting in forward order
141 data = plaintext
142 for layer_key, iv_key in hop_keys:
143 data = TunnelLayerEncryptor.encrypt_layer(data, layer_key, iv_key)
144
145 # Tunnel endpoint decrypts all layers in forward order
146 recovered = InboundTunnelDecryptor.decrypt(data, hop_keys)
147 assert recovered == plaintext
148
149 def test_single_hop_inbound(self):
150 """Single hop: one encrypt + one decrypt is a simple roundtrip."""
151 plaintext = os.urandom(TUNNEL_DATA_SIZE)
152 hop_keys = [(_random_key(), _random_iv_key())]
153
154 encrypted = TunnelLayerEncryptor.encrypt_layer(
155 plaintext, hop_keys[0][0], hop_keys[0][1]
156 )
157 recovered = InboundTunnelDecryptor.decrypt(encrypted, hop_keys)
158 assert recovered == plaintext
159
160
161# ---------------------------------------------------------------------------
162# BuildRecordEncryptor — ElGamal
163# ---------------------------------------------------------------------------
164
165class TestBuildRecordEncryptor:
166 """Build record encrypt/decrypt roundtrip using ElGamal."""
167
168 def test_roundtrip(self):
169 pub_key, priv_key = ElGamalEngine.generate_keypair()
170
171 # Create a 222-byte build record
172 record = BuildRecord(
173 receive_tunnel_id=12345,
174 our_ident=os.urandom(32),
175 next_tunnel_id=67890,
176 next_ident=os.urandom(32),
177 layer_key=os.urandom(32),
178 iv_key=os.urandom(32),
179 reply_key=os.urandom(32),
180 reply_iv=os.urandom(16),
181 is_gateway=True,
182 is_endpoint=False,
183 )
184 record_bytes = record.to_bytes()
185 assert len(record_bytes) == BuildRecord.SIZE # 222
186
187 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pub_key)
188 assert len(encrypted) == 528
189
190 # Decrypt: strip padding and ElGamal-decrypt
191 # The first 514 bytes are the ElGamal ciphertext
192 decrypted = ElGamalEngine.decrypt(encrypted[:514], priv_key)
193 assert decrypted is not None
194 assert decrypted == record_bytes
195
196 def test_output_size(self):
197 pub_key, _ = ElGamalEngine.generate_keypair()
198 record_bytes = os.urandom(222)
199 encrypted = BuildRecordEncryptor.encrypt_record(record_bytes, pub_key)
200 assert len(encrypted) == 528
201
202
203# ---------------------------------------------------------------------------
204# BuildReplyDecryptor
205# ---------------------------------------------------------------------------
206
207class TestBuildReplyDecryptor:
208 """Build reply AES encrypt→decrypt roundtrip."""
209
210 def test_roundtrip(self):
211 reply_key = _random_key()
212 reply_iv = os.urandom(16)
213 # Reply record is typically 496 bytes, but must be multiple of 16
214 reply_data = os.urandom(496)
215
216 # Encrypt (simulating what the hop does)
217 encrypted = AESEngine.encrypt(reply_data, reply_key, reply_iv[:16])
218
219 decrypted = BuildReplyDecryptor.decrypt_reply(encrypted, reply_key, reply_iv)
220 assert decrypted == reply_data
221
222 def test_different_keys_different_output(self):
223 reply_iv = os.urandom(16)
224 reply_data = os.urandom(496)
225
226 key_a = _random_key()
227 key_b = _random_key()
228
229 enc_a = AESEngine.encrypt(reply_data, key_a, reply_iv[:16])
230 enc_b = AESEngine.encrypt(reply_data, key_b, reply_iv[:16])
231
232 dec_a = BuildReplyDecryptor.decrypt_reply(enc_a, key_a, reply_iv)
233 dec_b = BuildReplyDecryptor.decrypt_reply(enc_b, key_b, reply_iv)
234
235 assert dec_a == dec_b == reply_data
236 assert enc_a != enc_b
237
238
239# ---------------------------------------------------------------------------
240# Data alignment
241# ---------------------------------------------------------------------------
242
243class TestDataAlignment:
244 """Standard tunnel data is 1024 bytes — verify it works end-to-end."""
245
246 def test_1024_byte_outbound_inbound(self):
247 plaintext = os.urandom(1024)
248 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)]
249
250 # Outbound: encrypt
251 encrypted = OutboundTunnelEncryptor.encrypt(plaintext, hop_keys)
252 assert len(encrypted) == 1024
253
254 # Simulate each hop decrypting
255 data = encrypted
256 for lk, ik in hop_keys:
257 data = TunnelLayerDecryptor.decrypt_layer(data, lk, ik)
258 assert data == plaintext