A Python port of the Invisible Internet Project (I2P)
at main 207 lines 7.6 kB view raw
1"""Tests for GarlicMessageHandler — garlic-to-router wiring.""" 2 3import os 4import struct 5from unittest.mock import MagicMock, call 6 7import pytest 8 9from i2p_crypto.session_key_manager import SessionKeyManager 10from i2p_crypto.garlic_crypto import GarlicEncryptor, GarlicDecryptor 11from i2p_data.garlic import ( 12 GarlicMessage, 13 GarlicClove, 14 DeliveryInstructions, 15 DeliveryType, 16) 17from i2p_data.garlic_handler import GarlicMessageHandler 18from i2p_data.message_router import InboundMessageHandler 19 20 21def _build_garlic_plaintext(clove_payloads: list[bytes]) -> bytes: 22 """Build a serialized GarlicMessage from raw clove payloads. 23 24 Each payload becomes a LOCAL-delivery clove with sequential IDs. 25 The result is padded to a 16-byte boundary for AES-CBC. 26 """ 27 cloves = [] 28 for i, payload in enumerate(clove_payloads): 29 di = DeliveryInstructions(DeliveryType.LOCAL) 30 cloves.append(GarlicClove(di, payload, clove_id=i, expiration=9999999999)) 31 raw = GarlicMessage(cloves).to_bytes() 32 # Pad to 16-byte boundary (AES-CBC requirement, no PKCS) 33 pad_len = (16 - len(raw) % 16) % 16 34 return raw + b"\x00" * pad_len 35 36 37class TestHandleExistingSession: 38 """Tag consumed, payload decrypted, cloves returned.""" 39 40 def test_single_clove_roundtrip(self): 41 skm = SessionKeyManager() 42 dest = os.urandom(32) 43 session_key, tags = skm.create_session(dest) 44 45 clove_data = b"hello-garlic-clove-data!" 46 plaintext = _build_garlic_plaintext([clove_data]) 47 48 # Encrypt with the first tag 49 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) 50 51 handler = GarlicMessageHandler(skm, GarlicDecryptor) 52 result = handler.handle(encrypted) 53 54 assert len(result) == 1 55 assert result[0] == clove_data 56 57 def test_multiple_cloves(self): 58 skm = SessionKeyManager() 59 dest = os.urandom(32) 60 session_key, tags = skm.create_session(dest) 61 62 payloads = [b"clove-A", b"clove-B", b"clove-C"] 63 plaintext = _build_garlic_plaintext(payloads) 64 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) 65 66 handler = GarlicMessageHandler(skm, GarlicDecryptor) 67 result = handler.handle(encrypted) 68 69 assert len(result) == 3 70 assert result == payloads 71 72 def test_tag_consumed_after_handle(self): 73 """After handle(), the tag must be consumed and not reusable.""" 74 skm = SessionKeyManager() 75 dest = os.urandom(32) 76 session_key, tags = skm.create_session(dest) 77 78 plaintext = _build_garlic_plaintext([b"once"]) 79 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) 80 81 handler = GarlicMessageHandler(skm, GarlicDecryptor) 82 result = handler.handle(encrypted) 83 assert len(result) == 1 84 85 # Same encrypted blob again -- tag already consumed 86 result2 = handler.handle(encrypted) 87 assert result2 == [] 88 89 90class TestHandleUnknownTag: 91 """Unknown tag returns empty list.""" 92 93 def test_unknown_tag_returns_empty(self): 94 skm = SessionKeyManager() 95 # Fabricate a message with a random tag not in the manager 96 fake_tag = os.urandom(32) 97 fake_ciphertext = os.urandom(48) # at least 16 bytes of AES data 98 payload = fake_tag + fake_ciphertext 99 100 handler = GarlicMessageHandler(skm, GarlicDecryptor) 101 result = handler.handle(payload) 102 assert result == [] 103 104 def test_short_payload_returns_empty(self): 105 skm = SessionKeyManager() 106 handler = GarlicMessageHandler(skm, GarlicDecryptor) 107 # Too short for even a tag + 1 AES block 108 result = handler.handle(b"\x00" * 10) 109 assert result == [] 110 111 112class TestHandleWithRerouting: 113 """Cloves passed to inbound_handler when set.""" 114 115 def test_cloves_rerouted_to_inbound_handler(self): 116 skm = SessionKeyManager() 117 dest = os.urandom(32) 118 session_key, tags = skm.create_session(dest) 119 120 payloads = [b"routed-A", b"routed-B"] 121 plaintext = _build_garlic_plaintext(payloads) 122 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) 123 124 mock_inbound = MagicMock(spec=InboundMessageHandler) 125 handler = GarlicMessageHandler(skm, GarlicDecryptor, inbound_handler=mock_inbound) 126 result = handler.handle(encrypted) 127 128 # Cloves should still be returned 129 assert len(result) == 2 130 assert result == payloads 131 132 # And each clove should have been dispatched to the inbound handler 133 # The handler receives the garlic message type (11) and each clove's message_data 134 assert mock_inbound.handle.call_count == 2 135 mock_inbound.handle.assert_any_call(InboundMessageHandler.GARLIC, b"routed-A") 136 mock_inbound.handle.assert_any_call(InboundMessageHandler.GARLIC, b"routed-B") 137 138 def test_no_rerouting_when_handler_is_none(self): 139 skm = SessionKeyManager() 140 dest = os.urandom(32) 141 session_key, tags = skm.create_session(dest) 142 143 plaintext = _build_garlic_plaintext([b"no-reroute"]) 144 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tags[0]) 145 146 handler = GarlicMessageHandler(skm, GarlicDecryptor, inbound_handler=None) 147 result = handler.handle(encrypted) 148 assert len(result) == 1 149 # No exception — just no rerouting 150 151 152class TestHandleNewSession: 153 """New session decryption with ElGamal.""" 154 155 def test_new_session_decrypt_and_extract(self): 156 """Use a mock ElGamal to test the new-session path without real key generation.""" 157 skm = SessionKeyManager() 158 session_key = os.urandom(32) 159 tags = [os.urandom(32) for _ in range(3)] 160 161 clove_data = b"new-session-clove" 162 plaintext = _build_garlic_plaintext([clove_data]) 163 164 # Build a mock decryptor whose decrypt_new_session returns plaintext 165 mock_decryptor = MagicMock() 166 mock_decryptor.decrypt_new_session = MagicMock(return_value=plaintext) 167 168 private_key = os.urandom(256) 169 170 handler = GarlicMessageHandler(skm, mock_decryptor) 171 # The payload doesn't matter because the mock bypasses real crypto 172 fake_payload = os.urandom(514 + 48) 173 result = handler.handle_new_session(fake_payload, private_key) 174 175 assert len(result) == 1 176 assert result[0] == clove_data 177 178 # Verify decrypt_new_session was called with correct args 179 mock_decryptor.decrypt_new_session.assert_called_once_with( 180 fake_payload, private_key, skm 181 ) 182 183 def test_new_session_decrypt_failure_returns_empty(self): 184 skm = SessionKeyManager() 185 186 mock_decryptor = MagicMock() 187 mock_decryptor.decrypt_new_session = MagicMock(return_value=None) 188 189 handler = GarlicMessageHandler(skm, mock_decryptor) 190 result = handler.handle_new_session(os.urandom(600), os.urandom(256)) 191 assert result == [] 192 193 def test_new_session_with_rerouting(self): 194 skm = SessionKeyManager() 195 196 payloads = [b"ns-clove-1", b"ns-clove-2"] 197 plaintext = _build_garlic_plaintext(payloads) 198 199 mock_decryptor = MagicMock() 200 mock_decryptor.decrypt_new_session = MagicMock(return_value=plaintext) 201 202 mock_inbound = MagicMock(spec=InboundMessageHandler) 203 handler = GarlicMessageHandler(skm, mock_decryptor, inbound_handler=mock_inbound) 204 205 result = handler.handle_new_session(os.urandom(600), os.urandom(256)) 206 assert result == payloads 207 assert mock_inbound.handle.call_count == 2