"""Garlic-to-router wiring — decrypts garlic messages and extracts cloves. Bridges the crypto layer (GarlicDecryptor, SessionKeyManager) with the data layer (GarlicMessage parsing) and optionally re-routes extracted cloves through an InboundMessageHandler. Ported from net.i2p.router.message.GarlicMessageReceiver. """ from __future__ import annotations from i2p_data.garlic import GarlicMessage from i2p_data.message_router import InboundMessageHandler class GarlicMessageHandler: """Decrypts garlic messages and extracts clove payloads. Parameters ---------- session_key_mgr: A ``SessionKeyManager`` instance for tag-to-key lookup. garlic_decryptor: A ``GarlicDecryptor`` class or instance with ``decrypt_existing`` and ``decrypt_new_session`` methods. inbound_handler: Optional ``InboundMessageHandler`` to re-route extracted cloves. """ def __init__( self, session_key_mgr, garlic_decryptor, inbound_handler: InboundMessageHandler | None = None, ) -> None: self._session_key_mgr = session_key_mgr self._decryptor = garlic_decryptor self._inbound_handler = inbound_handler def handle(self, payload: bytes) -> list[bytes]: """Decrypt an existing-session garlic message and extract cloves. Tries to consume the first 32 bytes as a session tag. If the tag is recognized, decrypts using the associated session key and parses the plaintext as a ``GarlicMessage``. Parameters ---------- payload: Raw encrypted garlic message bytes. Returns ------- list[bytes] List of decrypted clove ``message_data`` payloads, or an empty list if the tag is unknown or decryption fails. """ plaintext = self._decryptor.decrypt_existing(payload, self._session_key_mgr) if plaintext is None: return [] return self._extract_cloves(plaintext) def handle_new_session( self, payload: bytes, our_private_key: bytes ) -> list[bytes]: """Decrypt a new-session garlic message with ElGamal key wrapping. Parameters ---------- payload: Raw encrypted message: ``elgamal_block(514) || aes_ciphertext``. our_private_key: 256-byte ElGamal private key. Returns ------- list[bytes] List of decrypted clove ``message_data`` payloads, or an empty list if decryption fails. """ plaintext = self._decryptor.decrypt_new_session( payload, our_private_key, self._session_key_mgr ) if plaintext is None: return [] return self._extract_cloves(plaintext) def _extract_cloves(self, plaintext: bytes) -> list[bytes]: """Parse a GarlicMessage from plaintext and extract clove payloads. If an ``inbound_handler`` is configured, each clove's message_data is re-routed through it as a GARLIC message type. """ garlic_msg = GarlicMessage.from_bytes(plaintext) clove_payloads = [clove.message_data for clove in garlic_msg.cloves] if self._inbound_handler is not None: for payload in clove_payloads: self._inbound_handler.handle(InboundMessageHandler.GARLIC, payload) return clove_payloads