A Python port of the Invisible Internet Project (I2P)
1"""Tests for garlic encryption/decryption pipeline.
2
3TDD: tests written before the implementation.
4
5Tests cover:
61. Existing session encrypt/decrypt roundtrip
72. New session encrypt/decrypt roundtrip with ElGamal
83. Tag reuse rejected (replay protection)
94. Unknown tag returns None
105. AES-CBC uses first 16 bytes of tag as IV
11"""
12
13from __future__ import annotations
14
15import os
16import struct
17import time
18
19import pytest
20
21from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor
22from i2p_crypto.session_key_manager import SessionKeyManager
23from i2p_crypto.aes import AESEngine
24from i2p_crypto.elgamal import ElGamalEngine
25
26
27def _pad16(data: bytes) -> bytes:
28 """PKCS7-pad data to a 16-byte boundary."""
29 pad_len = 16 - (len(data) % 16)
30 return data + bytes([pad_len]) * pad_len
31
32
33def _unpad16(data: bytes) -> bytes:
34 """Remove PKCS7 padding."""
35 pad_len = data[-1]
36 return data[:-pad_len]
37
38
39class TestExistingSessionRoundtrip:
40 """Encrypt with a session tag, decrypt with the same session key."""
41
42 def test_roundtrip_basic(self):
43 session_key = os.urandom(32)
44 session_tag = os.urandom(32)
45 cloves_data = _pad16(b"Hello garlic clove data!")
46
47 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag)
48
49 # Set up decryptor side: register the tag
50 mgr = SessionKeyManager()
51 now = int(time.time() * 1000)
52 mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000)
53
54 decrypted = GarlicDecryptor.decrypt_existing(encrypted, mgr)
55 assert decrypted is not None
56 assert decrypted == cloves_data
57
58 def test_roundtrip_larger_payload(self):
59 session_key = os.urandom(32)
60 session_tag = os.urandom(32)
61 cloves_data = _pad16(os.urandom(256))
62
63 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag)
64
65 mgr = SessionKeyManager()
66 now = int(time.time() * 1000)
67 mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000)
68
69 decrypted = GarlicDecryptor.decrypt_existing(encrypted, mgr)
70 assert decrypted == cloves_data
71
72 def test_encrypted_format_starts_with_tag(self):
73 """Encrypted output must start with the 32-byte session tag."""
74 session_key = os.urandom(32)
75 session_tag = os.urandom(32)
76 cloves_data = _pad16(b"test")
77
78 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag)
79 assert encrypted[:32] == session_tag
80
81 def test_encrypted_length(self):
82 """Encrypted output = 32 (tag) + len(cloves_data)."""
83 session_key = os.urandom(32)
84 session_tag = os.urandom(32)
85 cloves_data = _pad16(b"exactly sixteen!") # 16 bytes
86
87 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag)
88 assert len(encrypted) == 32 + len(cloves_data)
89
90
91class TestNewSessionRoundtrip:
92 """Encrypt with ElGamal key wrapping, decrypt with private key."""
93
94 def test_roundtrip_new_session(self):
95 pub, priv = ElGamalEngine.generate_keypair()
96 session_key = os.urandom(32)
97 tags = [os.urandom(32) for _ in range(5)]
98 cloves_data = _pad16(b"New session garlic message")
99
100 encrypted = GarlicEncryptor.encrypt_new_session(
101 cloves_data, session_key, tags, pub
102 )
103
104 mgr = SessionKeyManager()
105 decrypted = GarlicDecryptor.decrypt_new_session(encrypted, priv, mgr)
106 assert decrypted is not None
107 assert decrypted == cloves_data
108
109 def test_new_session_registers_tags(self):
110 """After decrypting a new session, the delivered tags should be usable."""
111 pub, priv = ElGamalEngine.generate_keypair()
112 session_key = os.urandom(32)
113 tags = [os.urandom(32) for _ in range(3)]
114 cloves_data = _pad16(b"Session setup message")
115
116 encrypted = GarlicEncryptor.encrypt_new_session(
117 cloves_data, session_key, tags, pub
118 )
119
120 mgr = SessionKeyManager()
121 GarlicDecryptor.decrypt_new_session(encrypted, priv, mgr)
122
123 # Tags should now be registered -- can use them for existing session
124 for tag in tags:
125 # Each tag should resolve to the session key
126 key = mgr.consume_tag(tag)
127 assert key == session_key
128
129 def test_new_session_encrypted_format_length(self):
130 """New session output = 514 (ElGamal block) + len(aes_encrypted)."""
131 pub, priv = ElGamalEngine.generate_keypair()
132 session_key = os.urandom(32)
133 tags = [os.urandom(32) for _ in range(5)]
134 cloves_data = _pad16(b"test payload ok!") # 16 bytes
135
136 encrypted = GarlicEncryptor.encrypt_new_session(
137 cloves_data, session_key, tags, pub
138 )
139 assert len(encrypted) == 514 + len(cloves_data)
140
141 def test_new_session_max_5_tags(self):
142 """More than 5 tags should raise ValueError (ElGamal 222-byte limit)."""
143 pub, priv = ElGamalEngine.generate_keypair()
144 session_key = os.urandom(32)
145 tags = [os.urandom(32) for _ in range(6)]
146 cloves_data = _pad16(b"too many tags")
147
148 with pytest.raises(ValueError, match="[Tt]ag"):
149 GarlicEncryptor.encrypt_new_session(
150 cloves_data, session_key, tags, pub
151 )
152
153
154class TestReplayProtection:
155 """Tag reuse must be rejected."""
156
157 def test_tag_reuse_rejected(self):
158 session_key = os.urandom(32)
159 session_tag = os.urandom(32)
160 cloves_data = _pad16(b"replay test data")
161
162 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag)
163
164 mgr = SessionKeyManager()
165 now = int(time.time() * 1000)
166 mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000)
167
168 # First decryption should succeed
169 result1 = GarlicDecryptor.decrypt_existing(encrypted, mgr)
170 assert result1 == cloves_data
171
172 # Second decryption with same tag should fail (tag consumed)
173 result2 = GarlicDecryptor.decrypt_existing(encrypted, mgr)
174 assert result2 is None
175
176
177class TestUnknownTag:
178 """Unknown tags must return None."""
179
180 def test_unknown_tag_returns_none(self):
181 session_key = os.urandom(32)
182 fake_tag = os.urandom(32)
183 cloves_data = _pad16(b"unknown tag test")
184
185 # Encrypt with the fake tag
186 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, fake_tag)
187
188 # Decryptor has no tags registered
189 mgr = SessionKeyManager()
190 result = GarlicDecryptor.decrypt_existing(encrypted, mgr)
191 assert result is None
192
193
194class TestIVDerivation:
195 """AES-CBC must use the first 16 bytes of the session tag as IV."""
196
197 def test_iv_is_first_16_bytes_of_tag(self):
198 session_key = os.urandom(32)
199 session_tag = os.urandom(32)
200 cloves_data = _pad16(b"IV verification!")
201
202 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag)
203
204 # The AES ciphertext starts after the 32-byte tag prefix
205 aes_ciphertext = encrypted[32:]
206
207 # Manually decrypt with AES using tag[:16] as IV
208 expected_iv = session_tag[:16]
209 manual_decrypted = AESEngine.decrypt(aes_ciphertext, session_key, expected_iv)
210 assert manual_decrypted == cloves_data
211
212 def test_wrong_iv_produces_garbage(self):
213 """Using wrong IV should not produce the original plaintext."""
214 session_key = os.urandom(32)
215 session_tag = os.urandom(32)
216 cloves_data = _pad16(b"wrong IV test!!!")
217
218 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag)
219 aes_ciphertext = encrypted[32:]
220
221 # Use wrong IV (last 16 bytes instead of first 16)
222 wrong_iv = session_tag[16:]
223 wrong_decrypted = AESEngine.decrypt(aes_ciphertext, session_key, wrong_iv)
224 assert wrong_decrypted != cloves_data
225
226
227class TestNewSessionTagDeliveryParsing:
228 """Verify the tag delivery block format: key(32) + count(2) + tags(32 each)."""
229
230 def test_tag_delivery_block_format(self):
231 """Encrypt a new session and verify the ElGamal block contains
232 the session key and tags in the expected format."""
233 pub, priv = ElGamalEngine.generate_keypair()
234 session_key = os.urandom(32)
235 tags = [os.urandom(32) for _ in range(4)]
236 cloves_data = _pad16(b"block format ok!")
237
238 encrypted = GarlicEncryptor.encrypt_new_session(
239 cloves_data, session_key, tags, pub
240 )
241
242 # Decrypt just the ElGamal block to verify format
243 elgamal_block = encrypted[:514]
244 decrypted_block = ElGamalEngine.decrypt(elgamal_block, priv)
245 assert decrypted_block is not None
246
247 # Parse: session_key(32) + tag_count(2, big-endian) + tags(32 each)
248 recovered_key = decrypted_block[:32]
249 tag_count = struct.unpack(">H", decrypted_block[32:34])[0]
250 recovered_tags = []
251 offset = 34
252 for _ in range(tag_count):
253 recovered_tags.append(decrypted_block[offset : offset + 32])
254 offset += 32
255
256 assert recovered_key == session_key
257 assert tag_count == 4
258 assert recovered_tags == tags