"""Tests for garlic encryption/decryption pipeline. TDD: tests written before the implementation. Tests cover: 1. Existing session encrypt/decrypt roundtrip 2. New session encrypt/decrypt roundtrip with ElGamal 3. Tag reuse rejected (replay protection) 4. Unknown tag returns None 5. AES-CBC uses first 16 bytes of tag as IV """ from __future__ import annotations import os import struct import time import pytest from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor from i2p_crypto.session_key_manager import SessionKeyManager from i2p_crypto.aes import AESEngine from i2p_crypto.elgamal import ElGamalEngine def _pad16(data: bytes) -> bytes: """PKCS7-pad data to a 16-byte boundary.""" pad_len = 16 - (len(data) % 16) return data + bytes([pad_len]) * pad_len def _unpad16(data: bytes) -> bytes: """Remove PKCS7 padding.""" pad_len = data[-1] return data[:-pad_len] class TestExistingSessionRoundtrip: """Encrypt with a session tag, decrypt with the same session key.""" def test_roundtrip_basic(self): session_key = os.urandom(32) session_tag = os.urandom(32) cloves_data = _pad16(b"Hello garlic clove data!") encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) # Set up decryptor side: register the tag mgr = SessionKeyManager() now = int(time.time() * 1000) mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000) decrypted = GarlicDecryptor.decrypt_existing(encrypted, mgr) assert decrypted is not None assert decrypted == cloves_data def test_roundtrip_larger_payload(self): session_key = os.urandom(32) session_tag = os.urandom(32) cloves_data = _pad16(os.urandom(256)) encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) mgr = SessionKeyManager() now = int(time.time() * 1000) mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000) decrypted = GarlicDecryptor.decrypt_existing(encrypted, mgr) assert decrypted == cloves_data def test_encrypted_format_starts_with_tag(self): """Encrypted output must start with the 32-byte session tag.""" session_key = os.urandom(32) session_tag = os.urandom(32) cloves_data = _pad16(b"test") encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) assert encrypted[:32] == session_tag def test_encrypted_length(self): """Encrypted output = 32 (tag) + len(cloves_data).""" session_key = os.urandom(32) session_tag = os.urandom(32) cloves_data = _pad16(b"exactly sixteen!") # 16 bytes encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) assert len(encrypted) == 32 + len(cloves_data) class TestNewSessionRoundtrip: """Encrypt with ElGamal key wrapping, decrypt with private key.""" def test_roundtrip_new_session(self): pub, priv = ElGamalEngine.generate_keypair() session_key = os.urandom(32) tags = [os.urandom(32) for _ in range(5)] cloves_data = _pad16(b"New session garlic message") encrypted = GarlicEncryptor.encrypt_new_session( cloves_data, session_key, tags, pub ) mgr = SessionKeyManager() decrypted = GarlicDecryptor.decrypt_new_session(encrypted, priv, mgr) assert decrypted is not None assert decrypted == cloves_data def test_new_session_registers_tags(self): """After decrypting a new session, the delivered tags should be usable.""" pub, priv = ElGamalEngine.generate_keypair() session_key = os.urandom(32) tags = [os.urandom(32) for _ in range(3)] cloves_data = _pad16(b"Session setup message") encrypted = GarlicEncryptor.encrypt_new_session( cloves_data, session_key, tags, pub ) mgr = SessionKeyManager() GarlicDecryptor.decrypt_new_session(encrypted, priv, mgr) # Tags should now be registered -- can use them for existing session for tag in tags: # Each tag should resolve to the session key key = mgr.consume_tag(tag) assert key == session_key def test_new_session_encrypted_format_length(self): """New session output = 514 (ElGamal block) + len(aes_encrypted).""" pub, priv = ElGamalEngine.generate_keypair() session_key = os.urandom(32) tags = [os.urandom(32) for _ in range(5)] cloves_data = _pad16(b"test payload ok!") # 16 bytes encrypted = GarlicEncryptor.encrypt_new_session( cloves_data, session_key, tags, pub ) assert len(encrypted) == 514 + len(cloves_data) def test_new_session_max_5_tags(self): """More than 5 tags should raise ValueError (ElGamal 222-byte limit).""" pub, priv = ElGamalEngine.generate_keypair() session_key = os.urandom(32) tags = [os.urandom(32) for _ in range(6)] cloves_data = _pad16(b"too many tags") with pytest.raises(ValueError, match="[Tt]ag"): GarlicEncryptor.encrypt_new_session( cloves_data, session_key, tags, pub ) class TestReplayProtection: """Tag reuse must be rejected.""" def test_tag_reuse_rejected(self): session_key = os.urandom(32) session_tag = os.urandom(32) cloves_data = _pad16(b"replay test data") encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) mgr = SessionKeyManager() now = int(time.time() * 1000) mgr.add_tags(session_key, [session_tag], expiration_ms=now + 720_000) # First decryption should succeed result1 = GarlicDecryptor.decrypt_existing(encrypted, mgr) assert result1 == cloves_data # Second decryption with same tag should fail (tag consumed) result2 = GarlicDecryptor.decrypt_existing(encrypted, mgr) assert result2 is None class TestUnknownTag: """Unknown tags must return None.""" def test_unknown_tag_returns_none(self): session_key = os.urandom(32) fake_tag = os.urandom(32) cloves_data = _pad16(b"unknown tag test") # Encrypt with the fake tag encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, fake_tag) # Decryptor has no tags registered mgr = SessionKeyManager() result = GarlicDecryptor.decrypt_existing(encrypted, mgr) assert result is None class TestIVDerivation: """AES-CBC must use the first 16 bytes of the session tag as IV.""" def test_iv_is_first_16_bytes_of_tag(self): session_key = os.urandom(32) session_tag = os.urandom(32) cloves_data = _pad16(b"IV verification!") encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) # The AES ciphertext starts after the 32-byte tag prefix aes_ciphertext = encrypted[32:] # Manually decrypt with AES using tag[:16] as IV expected_iv = session_tag[:16] manual_decrypted = AESEngine.decrypt(aes_ciphertext, session_key, expected_iv) assert manual_decrypted == cloves_data def test_wrong_iv_produces_garbage(self): """Using wrong IV should not produce the original plaintext.""" session_key = os.urandom(32) session_tag = os.urandom(32) cloves_data = _pad16(b"wrong IV test!!!") encrypted = GarlicEncryptor.encrypt(cloves_data, session_key, session_tag) aes_ciphertext = encrypted[32:] # Use wrong IV (last 16 bytes instead of first 16) wrong_iv = session_tag[16:] wrong_decrypted = AESEngine.decrypt(aes_ciphertext, session_key, wrong_iv) assert wrong_decrypted != cloves_data class TestNewSessionTagDeliveryParsing: """Verify the tag delivery block format: key(32) + count(2) + tags(32 each).""" def test_tag_delivery_block_format(self): """Encrypt a new session and verify the ElGamal block contains the session key and tags in the expected format.""" pub, priv = ElGamalEngine.generate_keypair() session_key = os.urandom(32) tags = [os.urandom(32) for _ in range(4)] cloves_data = _pad16(b"block format ok!") encrypted = GarlicEncryptor.encrypt_new_session( cloves_data, session_key, tags, pub ) # Decrypt just the ElGamal block to verify format elgamal_block = encrypted[:514] decrypted_block = ElGamalEngine.decrypt(elgamal_block, priv) assert decrypted_block is not None # Parse: session_key(32) + tag_count(2, big-endian) + tags(32 each) recovered_key = decrypted_block[:32] tag_count = struct.unpack(">H", decrypted_block[32:34])[0] recovered_tags = [] offset = 34 for _ in range(tag_count): recovered_tags.append(decrypted_block[offset : offset + 32]) offset += 32 assert recovered_key == session_key assert tag_count == 4 assert recovered_tags == tags