"""Tests for GarlicMessageHandler — garlic-to-router wiring.""" import os import struct from unittest.mock import MagicMock, call import pytest from i2p_crypto.session_key_manager import SessionKeyManager from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor from i2p_data.garlic import ( GarlicMessage, GarlicClove, DeliveryInstructions, DeliveryType, ) from i2p_data.garlic_handler import GarlicMessageHandler from i2p_data.message_router import InboundMessageHandler def _build_garlic_plaintext(clove_payloads: list[bytes]) -> bytes: """Build a serialized GarlicMessage from raw clove payloads. Each payload becomes a LOCAL-delivery clove with sequential IDs. The result is padded to a 16-byte boundary for AES-CBC. """ cloves = [] for i, payload in enumerate(clove_payloads): di = DeliveryInstructions(DeliveryType.LOCAL) cloves.append(GarlicClove(di, payload, clove_id=i, expiration=9999999999)) raw = GarlicMessage(cloves).to_bytes() # Pad to 16-byte boundary (AES-CBC requirement, no PKCS) pad_len = (16 - len(raw) % 16) % 16 return raw + b"\x00" * pad_len class TestHandleExistingSession: """Tag consumed, payload decrypted, cloves returned.""" def test_single_clove_roundtrip(self): skm = SessionKeyManager() dest = os.urandom(32) session_key, tags = skm.create_session(dest) clove_data = b"hello-garlic-clove-data!" plaintext = _build_garlic_plaintext([clove_data]) # Encrypt with the first tag encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) handler = GarlicMessageHandler(skm, GarlicDecryptor) result = handler.handle(encrypted) assert len(result) == 1 assert result[0] == clove_data def test_multiple_cloves(self): skm = SessionKeyManager() dest = os.urandom(32) session_key, tags = skm.create_session(dest) payloads = [b"clove-A", b"clove-B", b"clove-C"] plaintext = _build_garlic_plaintext(payloads) encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) handler = GarlicMessageHandler(skm, GarlicDecryptor) result = handler.handle(encrypted) assert len(result) == 3 assert result == payloads def test_tag_consumed_after_handle(self): """After handle(), the tag must be consumed and not reusable.""" skm = SessionKeyManager() dest = os.urandom(32) session_key, tags = skm.create_session(dest) plaintext = _build_garlic_plaintext([b"once"]) encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) handler = GarlicMessageHandler(skm, GarlicDecryptor) result = handler.handle(encrypted) assert len(result) == 1 # Same encrypted blob again -- tag already consumed result2 = handler.handle(encrypted) assert result2 == [] class TestHandleUnknownTag: """Unknown tag returns empty list.""" def test_unknown_tag_returns_empty(self): skm = SessionKeyManager() # Fabricate a message with a random tag not in the manager fake_tag = os.urandom(32) fake_ciphertext = os.urandom(48) # at least 16 bytes of AES data payload = fake_tag + fake_ciphertext handler = GarlicMessageHandler(skm, GarlicDecryptor) result = handler.handle(payload) assert result == [] def test_short_payload_returns_empty(self): skm = SessionKeyManager() handler = GarlicMessageHandler(skm, GarlicDecryptor) # Too short for even a tag + 1 AES block result = handler.handle(b"\x00" * 10) assert result == [] class TestHandleWithRerouting: """Cloves passed to inbound_handler when set.""" def test_cloves_rerouted_to_inbound_handler(self): skm = SessionKeyManager() dest = os.urandom(32) session_key, tags = skm.create_session(dest) payloads = [b"routed-A", b"routed-B"] plaintext = _build_garlic_plaintext(payloads) encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) mock_inbound = MagicMock(spec=InboundMessageHandler) handler = GarlicMessageHandler(skm, GarlicDecryptor, inbound_handler=mock_inbound) result = handler.handle(encrypted) # Cloves should still be returned assert len(result) == 2 assert result == payloads # And each clove should have been dispatched to the inbound handler # The handler receives the garlic message type (11) and each clove's message_data assert mock_inbound.handle.call_count == 2 mock_inbound.handle.assert_any_call(InboundMessageHandler.GARLIC, b"routed-A") mock_inbound.handle.assert_any_call(InboundMessageHandler.GARLIC, b"routed-B") def test_no_rerouting_when_handler_is_none(self): skm = SessionKeyManager() dest = os.urandom(32) session_key, tags = skm.create_session(dest) plaintext = _build_garlic_plaintext([b"no-reroute"]) encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) handler = GarlicMessageHandler(skm, GarlicDecryptor, inbound_handler=None) result = handler.handle(encrypted) assert len(result) == 1 # No exception — just no rerouting class TestHandleNewSession: """New session decryption with ElGamal.""" def test_new_session_decrypt_and_extract(self): """Use a mock ElGamal to test the new-session path without real key generation.""" skm = SessionKeyManager() session_key = os.urandom(32) tags = [os.urandom(32) for _ in range(3)] clove_data = b"new-session-clove" plaintext = _build_garlic_plaintext([clove_data]) # Build a mock decryptor whose decrypt_new_session returns plaintext mock_decryptor = MagicMock() mock_decryptor.decrypt_new_session = MagicMock(return_value=plaintext) private_key = os.urandom(256) handler = GarlicMessageHandler(skm, mock_decryptor) # The payload doesn't matter because the mock bypasses real crypto fake_payload = os.urandom(514 + 48) result = handler.handle_new_session(fake_payload, private_key) assert len(result) == 1 assert result[0] == clove_data # Verify decrypt_new_session was called with correct args mock_decryptor.decrypt_new_session.assert_called_once_with( fake_payload, private_key, skm ) def test_new_session_decrypt_failure_returns_empty(self): skm = SessionKeyManager() mock_decryptor = MagicMock() mock_decryptor.decrypt_new_session = MagicMock(return_value=None) handler = GarlicMessageHandler(skm, mock_decryptor) result = handler.handle_new_session(os.urandom(600), os.urandom(256)) assert result == [] def test_new_session_with_rerouting(self): skm = SessionKeyManager() payloads = [b"ns-clove-1", b"ns-clove-2"] plaintext = _build_garlic_plaintext(payloads) mock_decryptor = MagicMock() mock_decryptor.decrypt_new_session = MagicMock(return_value=plaintext) mock_inbound = MagicMock(spec=InboundMessageHandler) handler = GarlicMessageHandler(skm, mock_decryptor, inbound_handler=mock_inbound) result = handler.handle_new_session(os.urandom(600), os.urandom(256)) assert result == payloads assert mock_inbound.handle.call_count == 2