A Python port of the Invisible Internet Project (I2P)
1"""Tests for GarlicMessageHandler — garlic-to-router wiring."""
2
3import os
4import struct
5from unittest.mock import MagicMock, call
6
7import pytest
8
9from i2p_crypto.session_key_manager import SessionKeyManager
10from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor
11from i2p_data.garlic import (
12 GarlicMessage,
13 GarlicClove,
14 DeliveryInstructions,
15 DeliveryType,
16)
17from i2p_data.garlic_handler import GarlicMessageHandler
18from i2p_data.message_router import InboundMessageHandler
19
20
21def _build_garlic_plaintext(clove_payloads: list[bytes]) -> bytes:
22 """Build a serialized GarlicMessage from raw clove payloads.
23
24 Each payload becomes a LOCAL-delivery clove with sequential IDs.
25 The result is padded to a 16-byte boundary for AES-CBC.
26 """
27 cloves = []
28 for i, payload in enumerate(clove_payloads):
29 di = DeliveryInstructions(DeliveryType.LOCAL)
30 cloves.append(GarlicClove(di, payload, clove_id=i, expiration=9999999999))
31 raw = GarlicMessage(cloves).to_bytes()
32 # Pad to 16-byte boundary (AES-CBC requirement, no PKCS)
33 pad_len = (16 - len(raw) % 16) % 16
34 return raw + b"\x00" * pad_len
35
36
37class TestHandleExistingSession:
38 """Tag consumed, payload decrypted, cloves returned."""
39
40 def test_single_clove_roundtrip(self):
41 skm = SessionKeyManager()
42 dest = os.urandom(32)
43 session_key, tags = skm.create_session(dest)
44
45 clove_data = b"hello-garlic-clove-data!"
46 plaintext = _build_garlic_plaintext([clove_data])
47
48 # Encrypt with the first tag
49 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0])
50
51 handler = GarlicMessageHandler(skm, GarlicDecryptor)
52 result = handler.handle(encrypted)
53
54 assert len(result) == 1
55 assert result[0] == clove_data
56
57 def test_multiple_cloves(self):
58 skm = SessionKeyManager()
59 dest = os.urandom(32)
60 session_key, tags = skm.create_session(dest)
61
62 payloads = [b"clove-A", b"clove-B", b"clove-C"]
63 plaintext = _build_garlic_plaintext(payloads)
64 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0])
65
66 handler = GarlicMessageHandler(skm, GarlicDecryptor)
67 result = handler.handle(encrypted)
68
69 assert len(result) == 3
70 assert result == payloads
71
72 def test_tag_consumed_after_handle(self):
73 """After handle(), the tag must be consumed and not reusable."""
74 skm = SessionKeyManager()
75 dest = os.urandom(32)
76 session_key, tags = skm.create_session(dest)
77
78 plaintext = _build_garlic_plaintext([b"once"])
79 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0])
80
81 handler = GarlicMessageHandler(skm, GarlicDecryptor)
82 result = handler.handle(encrypted)
83 assert len(result) == 1
84
85 # Same encrypted blob again -- tag already consumed
86 result2 = handler.handle(encrypted)
87 assert result2 == []
88
89
90class TestHandleUnknownTag:
91 """Unknown tag returns empty list."""
92
93 def test_unknown_tag_returns_empty(self):
94 skm = SessionKeyManager()
95 # Fabricate a message with a random tag not in the manager
96 fake_tag = os.urandom(32)
97 fake_ciphertext = os.urandom(48) # at least 16 bytes of AES data
98 payload = fake_tag + fake_ciphertext
99
100 handler = GarlicMessageHandler(skm, GarlicDecryptor)
101 result = handler.handle(payload)
102 assert result == []
103
104 def test_short_payload_returns_empty(self):
105 skm = SessionKeyManager()
106 handler = GarlicMessageHandler(skm, GarlicDecryptor)
107 # Too short for even a tag + 1 AES block
108 result = handler.handle(b"\x00" * 10)
109 assert result == []
110
111
112class TestHandleWithRerouting:
113 """Cloves passed to inbound_handler when set."""
114
115 def test_cloves_rerouted_to_inbound_handler(self):
116 skm = SessionKeyManager()
117 dest = os.urandom(32)
118 session_key, tags = skm.create_session(dest)
119
120 payloads = [b"routed-A", b"routed-B"]
121 plaintext = _build_garlic_plaintext(payloads)
122 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0])
123
124 mock_inbound = MagicMock(spec=InboundMessageHandler)
125 handler = GarlicMessageHandler(skm, GarlicDecryptor, inbound_handler=mock_inbound)
126 result = handler.handle(encrypted)
127
128 # Cloves should still be returned
129 assert len(result) == 2
130 assert result == payloads
131
132 # And each clove should have been dispatched to the inbound handler
133 # The handler receives the garlic message type (11) and each clove's message_data
134 assert mock_inbound.handle.call_count == 2
135 mock_inbound.handle.assert_any_call(InboundMessageHandler.GARLIC, b"routed-A")
136 mock_inbound.handle.assert_any_call(InboundMessageHandler.GARLIC, b"routed-B")
137
138 def test_no_rerouting_when_handler_is_none(self):
139 skm = SessionKeyManager()
140 dest = os.urandom(32)
141 session_key, tags = skm.create_session(dest)
142
143 plaintext = _build_garlic_plaintext([b"no-reroute"])
144 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0])
145
146 handler = GarlicMessageHandler(skm, GarlicDecryptor, inbound_handler=None)
147 result = handler.handle(encrypted)
148 assert len(result) == 1
149 # No exception — just no rerouting
150
151
152class TestHandleNewSession:
153 """New session decryption with ElGamal."""
154
155 def test_new_session_decrypt_and_extract(self):
156 """Use a mock ElGamal to test the new-session path without real key generation."""
157 skm = SessionKeyManager()
158 session_key = os.urandom(32)
159 tags = [os.urandom(32) for _ in range(3)]
160
161 clove_data = b"new-session-clove"
162 plaintext = _build_garlic_plaintext([clove_data])
163
164 # Build a mock decryptor whose decrypt_new_session returns plaintext
165 mock_decryptor = MagicMock()
166 mock_decryptor.decrypt_new_session = MagicMock(return_value=plaintext)
167
168 private_key = os.urandom(256)
169
170 handler = GarlicMessageHandler(skm, mock_decryptor)
171 # The payload doesn't matter because the mock bypasses real crypto
172 fake_payload = os.urandom(514 + 48)
173 result = handler.handle_new_session(fake_payload, private_key)
174
175 assert len(result) == 1
176 assert result[0] == clove_data
177
178 # Verify decrypt_new_session was called with correct args
179 mock_decryptor.decrypt_new_session.assert_called_once_with(
180 fake_payload, private_key, skm
181 )
182
183 def test_new_session_decrypt_failure_returns_empty(self):
184 skm = SessionKeyManager()
185
186 mock_decryptor = MagicMock()
187 mock_decryptor.decrypt_new_session = MagicMock(return_value=None)
188
189 handler = GarlicMessageHandler(skm, mock_decryptor)
190 result = handler.handle_new_session(os.urandom(600), os.urandom(256))
191 assert result == []
192
193 def test_new_session_with_rerouting(self):
194 skm = SessionKeyManager()
195
196 payloads = [b"ns-clove-1", b"ns-clove-2"]
197 plaintext = _build_garlic_plaintext(payloads)
198
199 mock_decryptor = MagicMock()
200 mock_decryptor.decrypt_new_session = MagicMock(return_value=plaintext)
201
202 mock_inbound = MagicMock(spec=InboundMessageHandler)
203 handler = GarlicMessageHandler(skm, mock_decryptor, inbound_handler=mock_inbound)
204
205 result = handler.handle_new_session(os.urandom(600), os.urandom(256))
206 assert result == payloads
207 assert mock_inbound.handle.call_count == 2