A Python port of the Invisible Internet Project (I2P)
at main 409 lines 15 kB view raw
1"""Tests for I2NP message wire format serialization/deserialization.""" 2 3import hashlib 4import os 5import struct 6 7import pytest 8 9from i2p_data.i2np_codec import ( 10 MSG_TYPE_DATABASE_STORE, 11 MSG_TYPE_DATABASE_LOOKUP, 12 MSG_TYPE_DATABASE_SEARCH_REPLY, 13 MSG_TYPE_DELIVERY_STATUS, 14 MSG_TYPE_GARLIC, 15 MSG_TYPE_TUNNEL_DATA, 16 MSG_TYPE_TUNNEL_GATEWAY, 17 MSG_TYPE_DATA, 18 MSG_TYPE_TUNNEL_BUILD, 19 MSG_TYPE_TUNNEL_BUILD_REPLY, 20 MSG_TYPE_VARIABLE_TUNNEL_BUILD, 21 MSG_TYPE_VARIABLE_TUNNEL_BUILD_REPLY, 22 encode_i2np_short, 23 decode_i2np_short, 24 encode_i2np_standard, 25 decode_i2np_standard, 26 encode_database_store, 27 decode_database_store, 28 encode_database_lookup, 29 decode_database_lookup, 30 encode_database_search_reply, 31 decode_database_search_reply, 32 encode_delivery_status, 33 decode_delivery_status, 34) 35 36 37# --- Message type constants --- 38 39class TestMessageTypeConstants: 40 """Message type constants must match the I2P specification values.""" 41 42 def test_database_store_type(self): 43 assert MSG_TYPE_DATABASE_STORE == 1 44 45 def test_database_lookup_type(self): 46 assert MSG_TYPE_DATABASE_LOOKUP == 2 47 48 def test_database_search_reply_type(self): 49 assert MSG_TYPE_DATABASE_SEARCH_REPLY == 3 50 51 def test_delivery_status_type(self): 52 assert MSG_TYPE_DELIVERY_STATUS == 10 53 54 def test_garlic_type(self): 55 assert MSG_TYPE_GARLIC == 11 56 57 def test_tunnel_data_type(self): 58 assert MSG_TYPE_TUNNEL_DATA == 18 59 60 def test_tunnel_gateway_type(self): 61 assert MSG_TYPE_TUNNEL_GATEWAY == 19 62 63 def test_data_type(self): 64 assert MSG_TYPE_DATA == 20 65 66 def test_tunnel_build_type(self): 67 assert MSG_TYPE_TUNNEL_BUILD == 21 68 69 def test_tunnel_build_reply_type(self): 70 assert MSG_TYPE_TUNNEL_BUILD_REPLY == 22 71 72 def test_variable_tunnel_build_type(self): 73 assert MSG_TYPE_VARIABLE_TUNNEL_BUILD == 23 74 75 def test_variable_tunnel_build_reply_type(self): 76 assert MSG_TYPE_VARIABLE_TUNNEL_BUILD_REPLY == 24 77 78 79# --- Short header (NTCP2) --- 80 81class TestShortHeader: 82 """Short header: type(1) + msg_id(4) + expiration_seconds(4) + size(2) + payload = 11 byte header.""" 83 84 def test_encode_produces_correct_length(self): 85 payload = b"\xaa\xbb\xcc" 86 result = encode_i2np_short(1, 0x12345678, 1000, payload) 87 assert len(result) == 11 + len(payload) 88 89 def test_encode_header_fields(self): 90 payload = b"\x01\x02\x03\x04" 91 result = encode_i2np_short(10, 0xDEADBEEF, 999999, payload) 92 # Parse manually 93 assert result[0] == 10 # type 94 assert struct.unpack("!I", result[1:5])[0] == 0xDEADBEEF # msg_id 95 assert struct.unpack("!i", result[5:9])[0] == 999999 # expiration seconds 96 assert struct.unpack("!H", result[9:11])[0] == 4 # size 97 assert result[11:] == payload 98 99 def test_roundtrip(self): 100 payload = os.urandom(128) 101 msg_type = 18 102 msg_id = 0xCAFEBABE 103 expiration = 1700000000 104 encoded = encode_i2np_short(msg_type, msg_id, expiration, payload) 105 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(encoded) 106 assert dec_type == msg_type 107 assert dec_id == msg_id 108 assert dec_exp == expiration 109 assert dec_payload == payload 110 111 def test_empty_payload(self): 112 encoded = encode_i2np_short(20, 1, 0, b"") 113 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(encoded) 114 assert dec_type == 20 115 assert dec_payload == b"" 116 117 def test_decode_ignores_trailing_data(self): 118 """Decoder should only read size bytes of payload, ignoring trailing bytes.""" 119 payload = b"\xff" * 10 120 encoded = encode_i2np_short(1, 0, 0, payload) + b"\x00" * 50 121 _, _, _, dec_payload = decode_i2np_short(encoded) 122 assert dec_payload == payload 123 124 125# --- Standard header (tunnel messages) --- 126 127class TestStandardHeader: 128 """Standard header: type(1) + msg_id(4) + expiration_ms(8) + size(2) + checksum(1) + payload = 16 byte header.""" 129 130 def test_encode_produces_correct_length(self): 131 payload = b"\xaa" * 20 132 result = encode_i2np_standard(1, 0, 0, payload) 133 assert len(result) == 16 + len(payload) 134 135 def test_checksum_is_first_byte_of_sha256(self): 136 payload = b"Hello I2P" 137 result = encode_i2np_standard(1, 0, 0, payload) 138 expected_checksum = hashlib.sha256(payload).digest()[0] 139 # Checksum is at offset 15 (after type(1) + msg_id(4) + exp(8) + size(2)) 140 assert result[15] == expected_checksum 141 142 def test_encode_header_fields(self): 143 payload = b"\x01\x02" 144 result = encode_i2np_standard(3, 0xAAAAAAAA, 1700000000000, payload) 145 assert result[0] == 3 # type 146 assert struct.unpack("!I", result[1:5])[0] == 0xAAAAAAAA # msg_id 147 assert struct.unpack("!Q", result[5:13])[0] == 1700000000000 # expiration_ms 148 assert struct.unpack("!H", result[13:15])[0] == 2 # size 149 150 def test_roundtrip(self): 151 payload = os.urandom(256) 152 msg_type = 1 153 msg_id = 0x11223344 154 expiration_ms = 1700000000999 155 encoded = encode_i2np_standard(msg_type, msg_id, expiration_ms, payload) 156 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_standard(encoded) 157 assert dec_type == msg_type 158 assert dec_id == msg_id 159 assert dec_exp == expiration_ms 160 assert dec_payload == payload 161 162 def test_bad_checksum_raises(self): 163 payload = b"test payload" 164 encoded = bytearray(encode_i2np_standard(1, 0, 0, payload)) 165 encoded[15] ^= 0xFF # corrupt checksum 166 with pytest.raises(ValueError, match="[Cc]hecksum"): 167 decode_i2np_standard(bytes(encoded)) 168 169 def test_empty_payload(self): 170 encoded = encode_i2np_standard(20, 0, 0, b"") 171 dec_type, _, _, dec_payload = decode_i2np_standard(encoded) 172 assert dec_type == 20 173 assert dec_payload == b"" 174 175 176# --- DatabaseStore (type 1) --- 177 178class TestDatabaseStore: 179 """DatabaseStore: key(32) + type_byte(1) + reply_token(4) + [reply_gateway(32) + reply_tunnel(4)] + data.""" 180 181 def test_encode_with_reply_token_zero(self): 182 key = os.urandom(32) 183 data = os.urandom(100) 184 payload = encode_database_store(key, store_type=0, data=data, reply_token=0) 185 # key(32) + type(1) + token(4) + data(100) = 137 186 assert len(payload) == 32 + 1 + 4 + len(data) 187 188 def test_encode_with_nonzero_reply_token(self): 189 key = os.urandom(32) 190 reply_gw = os.urandom(32) 191 data = os.urandom(50) 192 payload = encode_database_store( 193 key, store_type=1, data=data, 194 reply_token=12345, reply_gateway=reply_gw, reply_tunnel_id=99, 195 ) 196 # key(32) + type(1) + token(4) + gateway(32) + tunnel(4) + data(50) = 123 197 assert len(payload) == 32 + 1 + 4 + 32 + 4 + len(data) 198 199 def test_roundtrip_no_reply(self): 200 key = os.urandom(32) 201 data = os.urandom(200) 202 payload = encode_database_store(key, store_type=0, data=data, reply_token=0) 203 result = decode_database_store(payload) 204 assert result["key"] == key 205 assert result["store_type"] == 0 206 assert result["reply_token"] == 0 207 assert result["data"] == data 208 assert "reply_gateway" not in result or result.get("reply_gateway") is None 209 assert "reply_tunnel_id" not in result or result.get("reply_tunnel_id") is None 210 211 def test_roundtrip_with_reply(self): 212 key = os.urandom(32) 213 reply_gw = os.urandom(32) 214 data = os.urandom(64) 215 payload = encode_database_store( 216 key, store_type=1, data=data, 217 reply_token=0xBEEF, reply_gateway=reply_gw, reply_tunnel_id=42, 218 ) 219 result = decode_database_store(payload) 220 assert result["key"] == key 221 assert result["store_type"] == 1 222 assert result["reply_token"] == 0xBEEF 223 assert result["reply_gateway"] == reply_gw 224 assert result["reply_tunnel_id"] == 42 225 assert result["data"] == data 226 227 def test_store_type_router_info(self): 228 key = os.urandom(32) 229 ri_data = os.urandom(300) 230 payload = encode_database_store(key, store_type=0, data=ri_data) 231 result = decode_database_store(payload) 232 assert result["store_type"] == 0 233 234 def test_store_type_lease_set(self): 235 key = os.urandom(32) 236 ls_data = os.urandom(150) 237 payload = encode_database_store(key, store_type=1, data=ls_data) 238 result = decode_database_store(payload) 239 assert result["store_type"] == 1 240 241 def test_full_message_roundtrip_with_short_header(self): 242 """Encode DatabaseStore payload inside a short-header I2NP message.""" 243 key = os.urandom(32) 244 data = os.urandom(80) 245 payload = encode_database_store(key, store_type=0, data=data) 246 msg = encode_i2np_short(MSG_TYPE_DATABASE_STORE, 0xABCD, 5000, payload) 247 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(msg) 248 assert dec_type == MSG_TYPE_DATABASE_STORE 249 result = decode_database_store(dec_payload) 250 assert result["key"] == key 251 assert result["data"] == data 252 253 254# --- DatabaseLookup (type 2) --- 255 256class TestDatabaseLookup: 257 """DatabaseLookup: key(32) + from_hash(32) + flags(1) + [reply_tunnel_id(4)] + size(1) + exclude_list.""" 258 259 def test_encode_no_tunnel_no_excludes(self): 260 key = os.urandom(32) 261 from_hash = os.urandom(32) 262 payload = encode_database_lookup(key, from_hash, flags=0) 263 # key(32) + from(32) + flags(1) + size(1) = 66 264 assert len(payload) == 66 265 266 def test_encode_with_tunnel_flag(self): 267 key = os.urandom(32) 268 from_hash = os.urandom(32) 269 payload = encode_database_lookup(key, from_hash, flags=0x01, reply_tunnel_id=1234) 270 # key(32) + from(32) + flags(1) + tunnel(4) + size(1) = 70 271 assert len(payload) == 70 272 273 def test_encode_with_excludes(self): 274 key = os.urandom(32) 275 from_hash = os.urandom(32) 276 excludes = [os.urandom(32) for _ in range(3)] 277 payload = encode_database_lookup(key, from_hash, flags=0, exclude_list=excludes) 278 # key(32) + from(32) + flags(1) + size(1) + 3*32 = 162 279 assert len(payload) == 162 280 281 def test_roundtrip_no_tunnel_no_excludes(self): 282 key = os.urandom(32) 283 from_hash = os.urandom(32) 284 payload = encode_database_lookup(key, from_hash, flags=0) 285 result = decode_database_lookup(payload) 286 assert result["key"] == key 287 assert result["from_hash"] == from_hash 288 assert result["flags"] == 0 289 assert result["reply_tunnel_id"] == 0 290 assert result["exclude_list"] == [] 291 292 def test_roundtrip_with_tunnel(self): 293 key = os.urandom(32) 294 from_hash = os.urandom(32) 295 payload = encode_database_lookup(key, from_hash, flags=0x01, reply_tunnel_id=9999) 296 result = decode_database_lookup(payload) 297 assert result["flags"] == 0x01 298 assert result["reply_tunnel_id"] == 9999 299 300 def test_roundtrip_with_excludes(self): 301 key = os.urandom(32) 302 from_hash = os.urandom(32) 303 excludes = [os.urandom(32) for _ in range(5)] 304 payload = encode_database_lookup(key, from_hash, flags=0, exclude_list=excludes) 305 result = decode_database_lookup(payload) 306 assert result["exclude_list"] == excludes 307 308 def test_roundtrip_tunnel_and_excludes(self): 309 key = os.urandom(32) 310 from_hash = os.urandom(32) 311 excludes = [os.urandom(32), os.urandom(32)] 312 payload = encode_database_lookup( 313 key, from_hash, flags=0x01, reply_tunnel_id=777, exclude_list=excludes, 314 ) 315 result = decode_database_lookup(payload) 316 assert result["key"] == key 317 assert result["from_hash"] == from_hash 318 assert result["flags"] == 0x01 319 assert result["reply_tunnel_id"] == 777 320 assert result["exclude_list"] == excludes 321 322 323# --- DatabaseSearchReply (type 3) --- 324 325class TestDatabaseSearchReply: 326 """DatabaseSearchReply: key(32) + num_peers(1) + [peer_hashes(32 each)] + from_hash(32).""" 327 328 def test_encode_no_peers(self): 329 key = os.urandom(32) 330 from_hash = os.urandom(32) 331 payload = encode_database_search_reply(key, [], from_hash) 332 # key(32) + num(1) + from(32) = 65 333 assert len(payload) == 65 334 335 def test_encode_with_peers(self): 336 key = os.urandom(32) 337 from_hash = os.urandom(32) 338 peers = [os.urandom(32) for _ in range(3)] 339 payload = encode_database_search_reply(key, peers, from_hash) 340 # key(32) + num(1) + 3*32 + from(32) = 161 341 assert len(payload) == 161 342 343 def test_roundtrip_no_peers(self): 344 key = os.urandom(32) 345 from_hash = os.urandom(32) 346 payload = encode_database_search_reply(key, [], from_hash) 347 result = decode_database_search_reply(payload) 348 assert result["key"] == key 349 assert result["peer_hashes"] == [] 350 assert result["from_hash"] == from_hash 351 352 def test_roundtrip_with_peers(self): 353 key = os.urandom(32) 354 from_hash = os.urandom(32) 355 peers = [os.urandom(32) for _ in range(4)] 356 payload = encode_database_search_reply(key, peers, from_hash) 357 result = decode_database_search_reply(payload) 358 assert result["key"] == key 359 assert result["peer_hashes"] == peers 360 assert result["from_hash"] == from_hash 361 362 def test_roundtrip_single_peer(self): 363 key = os.urandom(32) 364 from_hash = os.urandom(32) 365 peers = [os.urandom(32)] 366 payload = encode_database_search_reply(key, peers, from_hash) 367 result = decode_database_search_reply(payload) 368 assert result["peer_hashes"] == peers 369 370 371# --- DeliveryStatus (type 10) --- 372 373class TestDeliveryStatus: 374 """DeliveryStatus: msg_id(4) + timestamp(8).""" 375 376 def test_encode_length(self): 377 payload = encode_delivery_status(0xDEAD, 1700000000000) 378 assert len(payload) == 12 379 380 def test_encode_fields(self): 381 payload = encode_delivery_status(0x12345678, 9999999999999) 382 msg_id = struct.unpack("!I", payload[:4])[0] 383 timestamp = struct.unpack("!Q", payload[4:12])[0] 384 assert msg_id == 0x12345678 385 assert timestamp == 9999999999999 386 387 def test_roundtrip(self): 388 msg_id = 0xFEEDFACE 389 timestamp = 1700000000123 390 payload = encode_delivery_status(msg_id, timestamp) 391 result = decode_delivery_status(payload) 392 assert result["msg_id"] == msg_id 393 assert result["timestamp"] == timestamp 394 395 def test_zero_values(self): 396 payload = encode_delivery_status(0, 0) 397 result = decode_delivery_status(payload) 398 assert result["msg_id"] == 0 399 assert result["timestamp"] == 0 400 401 def test_full_message_roundtrip_with_standard_header(self): 402 """Encode DeliveryStatus inside a standard-header I2NP message.""" 403 payload = encode_delivery_status(42, 1700000000000) 404 msg = encode_i2np_standard(MSG_TYPE_DELIVERY_STATUS, 42, 1700000000000, payload) 405 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_standard(msg) 406 assert dec_type == MSG_TYPE_DELIVERY_STATUS 407 result = decode_delivery_status(dec_payload) 408 assert result["msg_id"] == 42 409 assert result["timestamp"] == 1700000000000