A Python port of the Invisible Internet Project (I2P)
at main 258 lines 9.2 kB view raw
1"""Tests for garlic encryption/decryption pipeline. 2 3TDD: tests written before the implementation. 4 5Tests cover: 61. Existing session encrypt/decrypt roundtrip 72. New session encrypt/decrypt roundtrip with ElGamal 83. Tag reuse rejected (replay protection) 94. Unknown tag returns None 105. AES-CBC uses first 16 bytes of tag as IV 11""" 12 13from __future__ import annotations 14 15import os 16import struct 17import time 18 19import pytest 20 21from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor 22from i2p_crypto.session_key_manager import SessionKeyManager 23from i2p_crypto.aes import AESEngine 24from i2p_crypto.elgamal import ElGamalEngine 25 26 27def _pad16(data: bytes) -> bytes: 28 """PKCS7-pad data to a 16-byte boundary.""" 29 pad_len = 16 - (len(data) % 16) 30 return data + bytes([pad_len]) * pad_len 31 32 33def _unpad16(data: bytes) -> bytes: 34 """Remove PKCS7 padding.""" 35 pad_len = data[-1] 36 return data[:-pad_len] 37 38 39class TestExistingSessionRoundtrip: 40 """Encrypt with a session tag, decrypt with the same session key.""" 41 42 def test_roundtrip_basic(self): 43 session_key = os.urandom(32) 44 session_tag = os.urandom(32) 45 cloves_data = _pad16(b"Hello garlic clove data!") 46 47 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) 48 49 # Set up decryptor side: register the tag 50 mgr = SessionKeyManager() 51 now = int(time.time() * 1000) 52 mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000) 53 54 decrypted = GarlicDecryptor.decrypt_existing(encrypted, mgr) 55 assert decrypted is not None 56 assert decrypted == cloves_data 57 58 def test_roundtrip_larger_payload(self): 59 session_key = os.urandom(32) 60 session_tag = os.urandom(32) 61 cloves_data = _pad16(os.urandom(256)) 62 63 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) 64 65 mgr = SessionKeyManager() 66 now = int(time.time() * 1000) 67 mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000) 68 69 decrypted = GarlicDecryptor.decrypt_existing(encrypted, mgr) 70 assert decrypted == cloves_data 71 72 def test_encrypted_format_starts_with_tag(self): 73 """Encrypted output must start with the 32-byte session tag.""" 74 session_key = os.urandom(32) 75 session_tag = os.urandom(32) 76 cloves_data = _pad16(b"test") 77 78 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) 79 assert encrypted[:32] == session_tag 80 81 def test_encrypted_length(self): 82 """Encrypted output = 32 (tag) + len(cloves_data).""" 83 session_key = os.urandom(32) 84 session_tag = os.urandom(32) 85 cloves_data = _pad16(b"exactly sixteen!") # 16 bytes 86 87 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) 88 assert len(encrypted) == 32 + len(cloves_data) 89 90 91class TestNewSessionRoundtrip: 92 """Encrypt with ElGamal key wrapping, decrypt with private key.""" 93 94 def test_roundtrip_new_session(self): 95 pub, priv = ElGamalEngine.generate_keypair() 96 session_key = os.urandom(32) 97 tags = [os.urandom(32) for _ in range(5)] 98 cloves_data = _pad16(b"New session garlic message") 99 100 encrypted = GarlicEncryptor.encrypt_new_session( 101 cloves_data, session_key, tags, pub 102 ) 103 104 mgr = SessionKeyManager() 105 decrypted = GarlicDecryptor.decrypt_new_session(encrypted, priv, mgr) 106 assert decrypted is not None 107 assert decrypted == cloves_data 108 109 def test_new_session_registers_tags(self): 110 """After decrypting a new session, the delivered tags should be usable.""" 111 pub, priv = ElGamalEngine.generate_keypair() 112 session_key = os.urandom(32) 113 tags = [os.urandom(32) for _ in range(3)] 114 cloves_data = _pad16(b"Session setup message") 115 116 encrypted = GarlicEncryptor.encrypt_new_session( 117 cloves_data, session_key, tags, pub 118 ) 119 120 mgr = SessionKeyManager() 121 GarlicDecryptor.decrypt_new_session(encrypted, priv, mgr) 122 123 # Tags should now be registered -- can use them for existing session 124 for tag in tags: 125 # Each tag should resolve to the session key 126 key = mgr.consume_tag(tag) 127 assert key == session_key 128 129 def test_new_session_encrypted_format_length(self): 130 """New session output = 514 (ElGamal block) + len(aes_encrypted).""" 131 pub, priv = ElGamalEngine.generate_keypair() 132 session_key = os.urandom(32) 133 tags = [os.urandom(32) for _ in range(5)] 134 cloves_data = _pad16(b"test payload ok!") # 16 bytes 135 136 encrypted = GarlicEncryptor.encrypt_new_session( 137 cloves_data, session_key, tags, pub 138 ) 139 assert len(encrypted) == 514 + len(cloves_data) 140 141 def test_new_session_max_5_tags(self): 142 """More than 5 tags should raise ValueError (ElGamal 222-byte limit).""" 143 pub, priv = ElGamalEngine.generate_keypair() 144 session_key = os.urandom(32) 145 tags = [os.urandom(32) for _ in range(6)] 146 cloves_data = _pad16(b"too many tags") 147 148 with pytest.raises(ValueError, match="[Tt]ag"): 149 GarlicEncryptor.encrypt_new_session( 150 cloves_data, session_key, tags, pub 151 ) 152 153 154class TestReplayProtection: 155 """Tag reuse must be rejected.""" 156 157 def test_tag_reuse_rejected(self): 158 session_key = os.urandom(32) 159 session_tag = os.urandom(32) 160 cloves_data = _pad16(b"replay test data") 161 162 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) 163 164 mgr = SessionKeyManager() 165 now = int(time.time() * 1000) 166 mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000) 167 168 # First decryption should succeed 169 result1 = GarlicDecryptor.decrypt_existing(encrypted, mgr) 170 assert result1 == cloves_data 171 172 # Second decryption with same tag should fail (tag consumed) 173 result2 = GarlicDecryptor.decrypt_existing(encrypted, mgr) 174 assert result2 is None 175 176 177class TestUnknownTag: 178 """Unknown tags must return None.""" 179 180 def test_unknown_tag_returns_none(self): 181 session_key = os.urandom(32) 182 fake_tag = os.urandom(32) 183 cloves_data = _pad16(b"unknown tag test") 184 185 # Encrypt with the fake tag 186 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, fake_tag) 187 188 # Decryptor has no tags registered 189 mgr = SessionKeyManager() 190 result = GarlicDecryptor.decrypt_existing(encrypted, mgr) 191 assert result is None 192 193 194class TestIVDerivation: 195 """AES-CBC must use the first 16 bytes of the session tag as IV.""" 196 197 def test_iv_is_first_16_bytes_of_tag(self): 198 session_key = os.urandom(32) 199 session_tag = os.urandom(32) 200 cloves_data = _pad16(b"IV verification!") 201 202 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) 203 204 # The AES ciphertext starts after the 32-byte tag prefix 205 aes_ciphertext = encrypted[32:] 206 207 # Manually decrypt with AES using tag[:16] as IV 208 expected_iv = session_tag[:16] 209 manual_decrypted = AESEngine.decrypt(aes_ciphertext, session_key, expected_iv) 210 assert manual_decrypted == cloves_data 211 212 def test_wrong_iv_produces_garbage(self): 213 """Using wrong IV should not produce the original plaintext.""" 214 session_key = os.urandom(32) 215 session_tag = os.urandom(32) 216 cloves_data = _pad16(b"wrong IV test!!!") 217 218 encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) 219 aes_ciphertext = encrypted[32:] 220 221 # Use wrong IV (last 16 bytes instead of first 16) 222 wrong_iv = session_tag[16:] 223 wrong_decrypted = AESEngine.decrypt(aes_ciphertext, session_key, wrong_iv) 224 assert wrong_decrypted != cloves_data 225 226 227class TestNewSessionTagDeliveryParsing: 228 """Verify the tag delivery block format: key(32) + count(2) + tags(32 each).""" 229 230 def test_tag_delivery_block_format(self): 231 """Encrypt a new session and verify the ElGamal block contains 232 the session key and tags in the expected format.""" 233 pub, priv = ElGamalEngine.generate_keypair() 234 session_key = os.urandom(32) 235 tags = [os.urandom(32) for _ in range(4)] 236 cloves_data = _pad16(b"block format ok!") 237 238 encrypted = GarlicEncryptor.encrypt_new_session( 239 cloves_data, session_key, tags, pub 240 ) 241 242 # Decrypt just the ElGamal block to verify format 243 elgamal_block = encrypted[:514] 244 decrypted_block = ElGamalEngine.decrypt(elgamal_block, priv) 245 assert decrypted_block is not None 246 247 # Parse: session_key(32) + tag_count(2, big-endian) + tags(32 each) 248 recovered_key = decrypted_block[:32] 249 tag_count = struct.unpack(">H", decrypted_block[32:34])[0] 250 recovered_tags = [] 251 offset = 34 252 for _ in range(tag_count): 253 recovered_tags.append(decrypted_block[offset : offset + 32]) 254 offset += 32 255 256 assert recovered_key == session_key 257 assert tag_count == 4 258 assert recovered_tags == tags