A Python port of the Invisible Internet Project (I2P)
1"""Garlic-to-router wiring — decrypts garlic messages and extracts cloves.
2
3Bridges the crypto layer (GarlicDecryptor, SessionKeyManager) with the
4data layer (GarlicMessage parsing) and optionally re-routes extracted
5cloves through an InboundMessageHandler.
6
7Ported from net.i2p.router.message.GarlicMessageReceiver.
8"""
9
10from __future__ import annotations
11
12from i2p_data.garlic import GarlicMessage
13from i2p_data.message_router import InboundMessageHandler
14
15
16class GarlicMessageHandler:
17 """Decrypts garlic messages and extracts clove payloads.
18
19 Parameters
20 ----------
21 session_key_mgr:
22 A ``SessionKeyManager`` instance for tag-to-key lookup.
23 garlic_decryptor:
24 A ``GarlicDecryptor`` class or instance with ``decrypt_existing``
25 and ``decrypt_new_session`` methods.
26 inbound_handler:
27 Optional ``InboundMessageHandler`` to re-route extracted cloves.
28 """
29
30 def __init__(
31 self,
32 session_key_mgr,
33 garlic_decryptor,
34 inbound_handler: InboundMessageHandler | None = None,
35 ) -> None:
36 self._session_key_mgr = session_key_mgr
37 self._decryptor = garlic_decryptor
38 self._inbound_handler = inbound_handler
39
40 def handle(self, payload: bytes) -> list[bytes]:
41 """Decrypt an existing-session garlic message and extract cloves.
42
43 Tries to consume the first 32 bytes as a session tag. If the tag
44 is recognized, decrypts using the associated session key and
45 parses the plaintext as a ``GarlicMessage``.
46
47 Parameters
48 ----------
49 payload:
50 Raw encrypted garlic message bytes.
51
52 Returns
53 -------
54 list[bytes]
55 List of decrypted clove ``message_data`` payloads, or an
56 empty list if the tag is unknown or decryption fails.
57 """
58 plaintext = self._decryptor.decrypt_existing(payload, self._session_key_mgr)
59 if plaintext is None:
60 return []
61 return self._extract_cloves(plaintext)
62
63 def handle_new_session(
64 self, payload: bytes, our_private_key: bytes
65 ) -> list[bytes]:
66 """Decrypt a new-session garlic message with ElGamal key wrapping.
67
68 Parameters
69 ----------
70 payload:
71 Raw encrypted message: ``elgamal_block(514) || aes_ciphertext``.
72 our_private_key:
73 256-byte ElGamal private key.
74
75 Returns
76 -------
77 list[bytes]
78 List of decrypted clove ``message_data`` payloads, or an
79 empty list if decryption fails.
80 """
81 plaintext = self._decryptor.decrypt_new_session(
82 payload, our_private_key, self._session_key_mgr
83 )
84 if plaintext is None:
85 return []
86 return self._extract_cloves(plaintext)
87
88 def _extract_cloves(self, plaintext: bytes) -> list[bytes]:
89 """Parse a GarlicMessage from plaintext and extract clove payloads.
90
91 If an ``inbound_handler`` is configured, each clove's message_data
92 is re-routed through it as a GARLIC message type.
93 """
94 garlic_msg = GarlicMessage.from_bytes(plaintext)
95 clove_payloads = [clove.message_data for clove in garlic_msg.cloves]
96
97 if self._inbound_handler is not None:
98 for payload in clove_payloads:
99 self._inbound_handler.handle(InboundMessageHandler.GARLIC, payload)
100
101 return clove_payloads