A Python port of the Invisible Internet Project (I2P)
at main 101 lines 3.4 kB view raw
1"""Garlic-to-router wiring — decrypts garlic messages and extracts cloves. 2 3Bridges the crypto layer (GarlicDecryptor, SessionKeyManager) with the 4data layer (GarlicMessage parsing) and optionally re-routes extracted 5cloves through an InboundMessageHandler. 6 7Ported from net.i2p.router.message.GarlicMessageReceiver. 8""" 9 10from __future__ import annotations 11 12from i2p_data.garlic import GarlicMessage 13from i2p_data.message_router import InboundMessageHandler 14 15 16class GarlicMessageHandler: 17 """Decrypts garlic messages and extracts clove payloads. 18 19 Parameters 20 ---------- 21 session_key_mgr: 22 A ``SessionKeyManager`` instance for tag-to-key lookup. 23 garlic_decryptor: 24 A ``GarlicDecryptor`` class or instance with ``decrypt_existing`` 25 and ``decrypt_new_session`` methods. 26 inbound_handler: 27 Optional ``InboundMessageHandler`` to re-route extracted cloves. 28 """ 29 30 def __init__( 31 self, 32 session_key_mgr, 33 garlic_decryptor, 34 inbound_handler: InboundMessageHandler | None = None, 35 ) -> None: 36 self._session_key_mgr = session_key_mgr 37 self._decryptor = garlic_decryptor 38 self._inbound_handler = inbound_handler 39 40 def handle(self, payload: bytes) -> list[bytes]: 41 """Decrypt an existing-session garlic message and extract cloves. 42 43 Tries to consume the first 32 bytes as a session tag. If the tag 44 is recognized, decrypts using the associated session key and 45 parses the plaintext as a ``GarlicMessage``. 46 47 Parameters 48 ---------- 49 payload: 50 Raw encrypted garlic message bytes. 51 52 Returns 53 ------- 54 list[bytes] 55 List of decrypted clove ``message_data`` payloads, or an 56 empty list if the tag is unknown or decryption fails. 57 """ 58 plaintext = self._decryptor.decrypt_existing(payload, self._session_key_mgr) 59 if plaintext is None: 60 return [] 61 return self._extract_cloves(plaintext) 62 63 def handle_new_session( 64 self, payload: bytes, our_private_key: bytes 65 ) -> list[bytes]: 66 """Decrypt a new-session garlic message with ElGamal key wrapping. 67 68 Parameters 69 ---------- 70 payload: 71 Raw encrypted message: ``elgamal_block(514) || aes_ciphertext``. 72 our_private_key: 73 256-byte ElGamal private key. 74 75 Returns 76 ------- 77 list[bytes] 78 List of decrypted clove ``message_data`` payloads, or an 79 empty list if decryption fails. 80 """ 81 plaintext = self._decryptor.decrypt_new_session( 82 payload, our_private_key, self._session_key_mgr 83 ) 84 if plaintext is None: 85 return [] 86 return self._extract_cloves(plaintext) 87 88 def _extract_cloves(self, plaintext: bytes) -> list[bytes]: 89 """Parse a GarlicMessage from plaintext and extract clove payloads. 90 91 If an ``inbound_handler`` is configured, each clove's message_data 92 is re-routed through it as a GARLIC message type. 93 """ 94 garlic_msg = GarlicMessage.from_bytes(plaintext) 95 clove_payloads = [clove.message_data for clove in garlic_msg.cloves] 96 97 if self._inbound_handler is not None: 98 for payload in clove_payloads: 99 self._inbound_handler.handle(InboundMessageHandler.GARLIC, payload) 100 101 return clove_payloads