"""Tests for I2NP message wire format serialization/deserialization.""" import hashlib import os import struct import pytest from i2p_data.i2np_codec import ( MSG_TYPE_DATABASE_STORE, MSG_TYPE_DATABASE_LOOKUP, MSG_TYPE_DATABASE_SEARCH_REPLY, MSG_TYPE_DELIVERY_STATUS, MSG_TYPE_GARLIC, MSG_TYPE_TUNNEL_DATA, MSG_TYPE_TUNNEL_GATEWAY, MSG_TYPE_DATA, MSG_TYPE_TUNNEL_BUILD, MSG_TYPE_TUNNEL_BUILD_REPLY, MSG_TYPE_VARIABLE_TUNNEL_BUILD, MSG_TYPE_VARIABLE_TUNNEL_BUILD_REPLY, encode_i2np_short, decode_i2np_short, encode_i2np_standard, decode_i2np_standard, encode_database_store, decode_database_store, encode_database_lookup, decode_database_lookup, encode_database_search_reply, decode_database_search_reply, encode_delivery_status, decode_delivery_status, ) # --- Message type constants --- class TestMessageTypeConstants: """Message type constants must match the I2P specification values.""" def test_database_store_type(self): assert MSG_TYPE_DATABASE_STORE == 1 def test_database_lookup_type(self): assert MSG_TYPE_DATABASE_LOOKUP == 2 def test_database_search_reply_type(self): assert MSG_TYPE_DATABASE_SEARCH_REPLY == 3 def test_delivery_status_type(self): assert MSG_TYPE_DELIVERY_STATUS == 10 def test_garlic_type(self): assert MSG_TYPE_GARLIC == 11 def test_tunnel_data_type(self): assert MSG_TYPE_TUNNEL_DATA == 18 def test_tunnel_gateway_type(self): assert MSG_TYPE_TUNNEL_GATEWAY == 19 def test_data_type(self): assert MSG_TYPE_DATA == 20 def test_tunnel_build_type(self): assert MSG_TYPE_TUNNEL_BUILD == 21 def test_tunnel_build_reply_type(self): assert MSG_TYPE_TUNNEL_BUILD_REPLY == 22 def test_variable_tunnel_build_type(self): assert MSG_TYPE_VARIABLE_TUNNEL_BUILD == 23 def test_variable_tunnel_build_reply_type(self): assert MSG_TYPE_VARIABLE_TUNNEL_BUILD_REPLY == 24 # --- Short header (NTCP2) --- class TestShortHeader: """Short header: type(1) + msg_id(4) + expiration_seconds(4) + size(2) + payload = 11 byte header.""" def test_encode_produces_correct_length(self): payload = b"\xaa\xbb\xcc" result = encode_i2np_short(1, 0x12345678, 1000, payload) assert len(result) == 11 + len(payload) def test_encode_header_fields(self): payload = b"\x01\x02\x03\x04" result = encode_i2np_short(10, 0xDEADBEEF, 999999, payload) # Parse manually assert result[0] == 10 # type assert struct.unpack("!I", result[1:5])[0] == 0xDEADBEEF # msg_id assert struct.unpack("!i", result[5:9])[0] == 999999 # expiration seconds assert struct.unpack("!H", result[9:11])[0] == 4 # size assert result[11:] == payload def test_roundtrip(self): payload = os.urandom(128) msg_type = 18 msg_id = 0xCAFEBABE expiration = 1700000000 encoded = encode_i2np_short(msg_type, msg_id, expiration, payload) dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(encoded) assert dec_type == msg_type assert dec_id == msg_id assert dec_exp == expiration assert dec_payload == payload def test_empty_payload(self): encoded = encode_i2np_short(20, 1, 0, b"") dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(encoded) assert dec_type == 20 assert dec_payload == b"" def test_decode_ignores_trailing_data(self): """Decoder should only read size bytes of payload, ignoring trailing bytes.""" payload = b"\xff" * 10 encoded = encode_i2np_short(1, 0, 0, payload) + b"\x00" * 50 _, _, _, dec_payload = decode_i2np_short(encoded) assert dec_payload == payload # --- Standard header (tunnel messages) --- class TestStandardHeader: """Standard header: type(1) + msg_id(4) + expiration_ms(8) + size(2) + checksum(1) + payload = 16 byte header.""" def test_encode_produces_correct_length(self): payload = b"\xaa" * 20 result = encode_i2np_standard(1, 0, 0, payload) assert len(result) == 16 + len(payload) def test_checksum_is_first_byte_of_sha256(self): payload = b"Hello I2P" result = encode_i2np_standard(1, 0, 0, payload) expected_checksum = hashlib.sha256(payload).digest()[0] # Checksum is at offset 15 (after type(1) + msg_id(4) + exp(8) + size(2)) assert result[15] == expected_checksum def test_encode_header_fields(self): payload = b"\x01\x02" result = encode_i2np_standard(3, 0xAAAAAAAA, 1700000000000, payload) assert result[0] == 3 # type assert struct.unpack("!I", result[1:5])[0] == 0xAAAAAAAA # msg_id assert struct.unpack("!Q", result[5:13])[0] == 1700000000000 # expiration_ms assert struct.unpack("!H", result[13:15])[0] == 2 # size def test_roundtrip(self): payload = os.urandom(256) msg_type = 1 msg_id = 0x11223344 expiration_ms = 1700000000999 encoded = encode_i2np_standard(msg_type, msg_id, expiration_ms, payload) dec_type, dec_id, dec_exp, dec_payload = decode_i2np_standard(encoded) assert dec_type == msg_type assert dec_id == msg_id assert dec_exp == expiration_ms assert dec_payload == payload def test_bad_checksum_raises(self): payload = b"test payload" encoded = bytearray(encode_i2np_standard(1, 0, 0, payload)) encoded[15] ^= 0xFF # corrupt checksum with pytest.raises(ValueError, match="[Cc]hecksum"): decode_i2np_standard(bytes(encoded)) def test_empty_payload(self): encoded = encode_i2np_standard(20, 0, 0, b"") dec_type, _, _, dec_payload = decode_i2np_standard(encoded) assert dec_type == 20 assert dec_payload == b"" # --- DatabaseStore (type 1) --- class TestDatabaseStore: """DatabaseStore: key(32) + type_byte(1) + reply_token(4) + [reply_gateway(32) + reply_tunnel(4)] + data.""" def test_encode_with_reply_token_zero(self): key = os.urandom(32) data = os.urandom(100) payload = encode_database_store(key, store_type=0, data=data, reply_token=0) # key(32) + type(1) + token(4) + data(100) = 137 assert len(payload) == 32 + 1 + 4 + len(data) def test_encode_with_nonzero_reply_token(self): key = os.urandom(32) reply_gw = os.urandom(32) data = os.urandom(50) payload = encode_database_store( key, store_type=1, data=data, reply_token=12345, reply_gateway=reply_gw, reply_tunnel_id=99, ) # key(32) + type(1) + token(4) + gateway(32) + tunnel(4) + data(50) = 123 assert len(payload) == 32 + 1 + 4 + 32 + 4 + len(data) def test_roundtrip_no_reply(self): key = os.urandom(32) data = os.urandom(200) payload = encode_database_store(key, store_type=0, data=data, reply_token=0) result = decode_database_store(payload) assert result["key"] == key assert result["store_type"] == 0 assert result["reply_token"] == 0 assert result["data"] == data assert "reply_gateway" not in result or result.get("reply_gateway") is None assert "reply_tunnel_id" not in result or result.get("reply_tunnel_id") is None def test_roundtrip_with_reply(self): key = os.urandom(32) reply_gw = os.urandom(32) data = os.urandom(64) payload = encode_database_store( key, store_type=1, data=data, reply_token=0xBEEF, reply_gateway=reply_gw, reply_tunnel_id=42, ) result = decode_database_store(payload) assert result["key"] == key assert result["store_type"] == 1 assert result["reply_token"] == 0xBEEF assert result["reply_gateway"] == reply_gw assert result["reply_tunnel_id"] == 42 assert result["data"] == data def test_store_type_router_info(self): key = os.urandom(32) ri_data = os.urandom(300) payload = encode_database_store(key, store_type=0, data=ri_data) result = decode_database_store(payload) assert result["store_type"] == 0 def test_store_type_lease_set(self): key = os.urandom(32) ls_data = os.urandom(150) payload = encode_database_store(key, store_type=1, data=ls_data) result = decode_database_store(payload) assert result["store_type"] == 1 def test_full_message_roundtrip_with_short_header(self): """Encode DatabaseStore payload inside a short-header I2NP message.""" key = os.urandom(32) data = os.urandom(80) payload = encode_database_store(key, store_type=0, data=data) msg = encode_i2np_short(MSG_TYPE_DATABASE_STORE, 0xABCD, 5000, payload) dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(msg) assert dec_type == MSG_TYPE_DATABASE_STORE result = decode_database_store(dec_payload) assert result["key"] == key assert result["data"] == data # --- DatabaseLookup (type 2) --- class TestDatabaseLookup: """DatabaseLookup: key(32) + from_hash(32) + flags(1) + [reply_tunnel_id(4)] + size(1) + exclude_list.""" def test_encode_no_tunnel_no_excludes(self): key = os.urandom(32) from_hash = os.urandom(32) payload = encode_database_lookup(key, from_hash, flags=0) # key(32) + from(32) + flags(1) + size(1) = 66 assert len(payload) == 66 def test_encode_with_tunnel_flag(self): key = os.urandom(32) from_hash = os.urandom(32) payload = encode_database_lookup(key, from_hash, flags=0x01, reply_tunnel_id=1234) # key(32) + from(32) + flags(1) + tunnel(4) + size(1) = 70 assert len(payload) == 70 def test_encode_with_excludes(self): key = os.urandom(32) from_hash = os.urandom(32) excludes = [os.urandom(32) for _ in range(3)] payload = encode_database_lookup(key, from_hash, flags=0, exclude_list=excludes) # key(32) + from(32) + flags(1) + size(1) + 3*32 = 162 assert len(payload) == 162 def test_roundtrip_no_tunnel_no_excludes(self): key = os.urandom(32) from_hash = os.urandom(32) payload = encode_database_lookup(key, from_hash, flags=0) result = decode_database_lookup(payload) assert result["key"] == key assert result["from_hash"] == from_hash assert result["flags"] == 0 assert result["reply_tunnel_id"] == 0 assert result["exclude_list"] == [] def test_roundtrip_with_tunnel(self): key = os.urandom(32) from_hash = os.urandom(32) payload = encode_database_lookup(key, from_hash, flags=0x01, reply_tunnel_id=9999) result = decode_database_lookup(payload) assert result["flags"] == 0x01 assert result["reply_tunnel_id"] == 9999 def test_roundtrip_with_excludes(self): key = os.urandom(32) from_hash = os.urandom(32) excludes = [os.urandom(32) for _ in range(5)] payload = encode_database_lookup(key, from_hash, flags=0, exclude_list=excludes) result = decode_database_lookup(payload) assert result["exclude_list"] == excludes def test_roundtrip_tunnel_and_excludes(self): key = os.urandom(32) from_hash = os.urandom(32) excludes = [os.urandom(32), os.urandom(32)] payload = encode_database_lookup( key, from_hash, flags=0x01, reply_tunnel_id=777, exclude_list=excludes, ) result = decode_database_lookup(payload) assert result["key"] == key assert result["from_hash"] == from_hash assert result["flags"] == 0x01 assert result["reply_tunnel_id"] == 777 assert result["exclude_list"] == excludes # --- DatabaseSearchReply (type 3) --- class TestDatabaseSearchReply: """DatabaseSearchReply: key(32) + num_peers(1) + [peer_hashes(32 each)] + from_hash(32).""" def test_encode_no_peers(self): key = os.urandom(32) from_hash = os.urandom(32) payload = encode_database_search_reply(key, [], from_hash) # key(32) + num(1) + from(32) = 65 assert len(payload) == 65 def test_encode_with_peers(self): key = os.urandom(32) from_hash = os.urandom(32) peers = [os.urandom(32) for _ in range(3)] payload = encode_database_search_reply(key, peers, from_hash) # key(32) + num(1) + 3*32 + from(32) = 161 assert len(payload) == 161 def test_roundtrip_no_peers(self): key = os.urandom(32) from_hash = os.urandom(32) payload = encode_database_search_reply(key, [], from_hash) result = decode_database_search_reply(payload) assert result["key"] == key assert result["peer_hashes"] == [] assert result["from_hash"] == from_hash def test_roundtrip_with_peers(self): key = os.urandom(32) from_hash = os.urandom(32) peers = [os.urandom(32) for _ in range(4)] payload = encode_database_search_reply(key, peers, from_hash) result = decode_database_search_reply(payload) assert result["key"] == key assert result["peer_hashes"] == peers assert result["from_hash"] == from_hash def test_roundtrip_single_peer(self): key = os.urandom(32) from_hash = os.urandom(32) peers = [os.urandom(32)] payload = encode_database_search_reply(key, peers, from_hash) result = decode_database_search_reply(payload) assert result["peer_hashes"] == peers # --- DeliveryStatus (type 10) --- class TestDeliveryStatus: """DeliveryStatus: msg_id(4) + timestamp(8).""" def test_encode_length(self): payload = encode_delivery_status(0xDEAD, 1700000000000) assert len(payload) == 12 def test_encode_fields(self): payload = encode_delivery_status(0x12345678, 9999999999999) msg_id = struct.unpack("!I", payload[:4])[0] timestamp = struct.unpack("!Q", payload[4:12])[0] assert msg_id == 0x12345678 assert timestamp == 9999999999999 def test_roundtrip(self): msg_id = 0xFEEDFACE timestamp = 1700000000123 payload = encode_delivery_status(msg_id, timestamp) result = decode_delivery_status(payload) assert result["msg_id"] == msg_id assert result["timestamp"] == timestamp def test_zero_values(self): payload = encode_delivery_status(0, 0) result = decode_delivery_status(payload) assert result["msg_id"] == 0 assert result["timestamp"] == 0 def test_full_message_roundtrip_with_standard_header(self): """Encode DeliveryStatus inside a standard-header I2NP message.""" payload = encode_delivery_status(42, 1700000000000) msg = encode_i2np_standard(MSG_TYPE_DELIVERY_STATUS, 42, 1700000000000, payload) dec_type, dec_id, dec_exp, dec_payload = decode_i2np_standard(msg) assert dec_type == MSG_TYPE_DELIVERY_STATUS result = decode_delivery_status(dec_payload) assert result["msg_id"] == 42 assert result["timestamp"] == 1700000000000