A Python port of the Invisible Internet Project (I2P)
1"""Tests for I2NP message wire format serialization/deserialization."""
2
3import hashlib
4import os
5import struct
6
7import pytest
8
9from i2p_data.i2np_codec import (
10 MSG_TYPE_DATABASE_STORE,
11 MSG_TYPE_DATABASE_LOOKUP,
12 MSG_TYPE_DATABASE_SEARCH_REPLY,
13 MSG_TYPE_DELIVERY_STATUS,
14 MSG_TYPE_GARLIC,
15 MSG_TYPE_TUNNEL_DATA,
16 MSG_TYPE_TUNNEL_GATEWAY,
17 MSG_TYPE_DATA,
18 MSG_TYPE_TUNNEL_BUILD,
19 MSG_TYPE_TUNNEL_BUILD_REPLY,
20 MSG_TYPE_VARIABLE_TUNNEL_BUILD,
21 MSG_TYPE_VARIABLE_TUNNEL_BUILD_REPLY,
22 encode_i2np_short,
23 decode_i2np_short,
24 encode_i2np_standard,
25 decode_i2np_standard,
26 encode_database_store,
27 decode_database_store,
28 encode_database_lookup,
29 decode_database_lookup,
30 encode_database_search_reply,
31 decode_database_search_reply,
32 encode_delivery_status,
33 decode_delivery_status,
34)
35
36
37# --- Message type constants ---
38
39class TestMessageTypeConstants:
40 """Message type constants must match the I2P specification values."""
41
42 def test_database_store_type(self):
43 assert MSG_TYPE_DATABASE_STORE == 1
44
45 def test_database_lookup_type(self):
46 assert MSG_TYPE_DATABASE_LOOKUP == 2
47
48 def test_database_search_reply_type(self):
49 assert MSG_TYPE_DATABASE_SEARCH_REPLY == 3
50
51 def test_delivery_status_type(self):
52 assert MSG_TYPE_DELIVERY_STATUS == 10
53
54 def test_garlic_type(self):
55 assert MSG_TYPE_GARLIC == 11
56
57 def test_tunnel_data_type(self):
58 assert MSG_TYPE_TUNNEL_DATA == 18
59
60 def test_tunnel_gateway_type(self):
61 assert MSG_TYPE_TUNNEL_GATEWAY == 19
62
63 def test_data_type(self):
64 assert MSG_TYPE_DATA == 20
65
66 def test_tunnel_build_type(self):
67 assert MSG_TYPE_TUNNEL_BUILD == 21
68
69 def test_tunnel_build_reply_type(self):
70 assert MSG_TYPE_TUNNEL_BUILD_REPLY == 22
71
72 def test_variable_tunnel_build_type(self):
73 assert MSG_TYPE_VARIABLE_TUNNEL_BUILD == 23
74
75 def test_variable_tunnel_build_reply_type(self):
76 assert MSG_TYPE_VARIABLE_TUNNEL_BUILD_REPLY == 24
77
78
79# --- Short header (NTCP2) ---
80
81class TestShortHeader:
82 """Short header: type(1) + msg_id(4) + expiration_seconds(4) + size(2) + payload = 11 byte header."""
83
84 def test_encode_produces_correct_length(self):
85 payload = b"\xaa\xbb\xcc"
86 result = encode_i2np_short(1, 0x12345678, 1000, payload)
87 assert len(result) == 11 + len(payload)
88
89 def test_encode_header_fields(self):
90 payload = b"\x01\x02\x03\x04"
91 result = encode_i2np_short(10, 0xDEADBEEF, 999999, payload)
92 # Parse manually
93 assert result[0] == 10 # type
94 assert struct.unpack("!I", result[1:5])[0] == 0xDEADBEEF # msg_id
95 assert struct.unpack("!i", result[5:9])[0] == 999999 # expiration seconds
96 assert struct.unpack("!H", result[9:11])[0] == 4 # size
97 assert result[11:] == payload
98
99 def test_roundtrip(self):
100 payload = os.urandom(128)
101 msg_type = 18
102 msg_id = 0xCAFEBABE
103 expiration = 1700000000
104 encoded = encode_i2np_short(msg_type, msg_id, expiration, payload)
105 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(encoded)
106 assert dec_type == msg_type
107 assert dec_id == msg_id
108 assert dec_exp == expiration
109 assert dec_payload == payload
110
111 def test_empty_payload(self):
112 encoded = encode_i2np_short(20, 1, 0, b"")
113 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(encoded)
114 assert dec_type == 20
115 assert dec_payload == b""
116
117 def test_decode_ignores_trailing_data(self):
118 """Decoder should only read size bytes of payload, ignoring trailing bytes."""
119 payload = b"\xff" * 10
120 encoded = encode_i2np_short(1, 0, 0, payload) + b"\x00" * 50
121 _, _, _, dec_payload = decode_i2np_short(encoded)
122 assert dec_payload == payload
123
124
125# --- Standard header (tunnel messages) ---
126
127class TestStandardHeader:
128 """Standard header: type(1) + msg_id(4) + expiration_ms(8) + size(2) + checksum(1) + payload = 16 byte header."""
129
130 def test_encode_produces_correct_length(self):
131 payload = b"\xaa" * 20
132 result = encode_i2np_standard(1, 0, 0, payload)
133 assert len(result) == 16 + len(payload)
134
135 def test_checksum_is_first_byte_of_sha256(self):
136 payload = b"Hello I2P"
137 result = encode_i2np_standard(1, 0, 0, payload)
138 expected_checksum = hashlib.sha256(payload).digest()[0]
139 # Checksum is at offset 15 (after type(1) + msg_id(4) + exp(8) + size(2))
140 assert result[15] == expected_checksum
141
142 def test_encode_header_fields(self):
143 payload = b"\x01\x02"
144 result = encode_i2np_standard(3, 0xAAAAAAAA, 1700000000000, payload)
145 assert result[0] == 3 # type
146 assert struct.unpack("!I", result[1:5])[0] == 0xAAAAAAAA # msg_id
147 assert struct.unpack("!Q", result[5:13])[0] == 1700000000000 # expiration_ms
148 assert struct.unpack("!H", result[13:15])[0] == 2 # size
149
150 def test_roundtrip(self):
151 payload = os.urandom(256)
152 msg_type = 1
153 msg_id = 0x11223344
154 expiration_ms = 1700000000999
155 encoded = encode_i2np_standard(msg_type, msg_id, expiration_ms, payload)
156 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_standard(encoded)
157 assert dec_type == msg_type
158 assert dec_id == msg_id
159 assert dec_exp == expiration_ms
160 assert dec_payload == payload
161
162 def test_bad_checksum_raises(self):
163 payload = b"test payload"
164 encoded = bytearray(encode_i2np_standard(1, 0, 0, payload))
165 encoded[15] ^= 0xFF # corrupt checksum
166 with pytest.raises(ValueError, match="[Cc]hecksum"):
167 decode_i2np_standard(bytes(encoded))
168
169 def test_empty_payload(self):
170 encoded = encode_i2np_standard(20, 0, 0, b"")
171 dec_type, _, _, dec_payload = decode_i2np_standard(encoded)
172 assert dec_type == 20
173 assert dec_payload == b""
174
175
176# --- DatabaseStore (type 1) ---
177
178class TestDatabaseStore:
179 """DatabaseStore: key(32) + type_byte(1) + reply_token(4) + [reply_gateway(32) + reply_tunnel(4)] + data."""
180
181 def test_encode_with_reply_token_zero(self):
182 key = os.urandom(32)
183 data = os.urandom(100)
184 payload = encode_database_store(key, store_type=0, data=data, reply_token=0)
185 # key(32) + type(1) + token(4) + data(100) = 137
186 assert len(payload) == 32 + 1 + 4 + len(data)
187
188 def test_encode_with_nonzero_reply_token(self):
189 key = os.urandom(32)
190 reply_gw = os.urandom(32)
191 data = os.urandom(50)
192 payload = encode_database_store(
193 key, store_type=1, data=data,
194 reply_token=12345, reply_gateway=reply_gw, reply_tunnel_id=99,
195 )
196 # key(32) + type(1) + token(4) + gateway(32) + tunnel(4) + data(50) = 123
197 assert len(payload) == 32 + 1 + 4 + 32 + 4 + len(data)
198
199 def test_roundtrip_no_reply(self):
200 key = os.urandom(32)
201 data = os.urandom(200)
202 payload = encode_database_store(key, store_type=0, data=data, reply_token=0)
203 result = decode_database_store(payload)
204 assert result["key"] == key
205 assert result["store_type"] == 0
206 assert result["reply_token"] == 0
207 assert result["data"] == data
208 assert "reply_gateway" not in result or result.get("reply_gateway") is None
209 assert "reply_tunnel_id" not in result or result.get("reply_tunnel_id") is None
210
211 def test_roundtrip_with_reply(self):
212 key = os.urandom(32)
213 reply_gw = os.urandom(32)
214 data = os.urandom(64)
215 payload = encode_database_store(
216 key, store_type=1, data=data,
217 reply_token=0xBEEF, reply_gateway=reply_gw, reply_tunnel_id=42,
218 )
219 result = decode_database_store(payload)
220 assert result["key"] == key
221 assert result["store_type"] == 1
222 assert result["reply_token"] == 0xBEEF
223 assert result["reply_gateway"] == reply_gw
224 assert result["reply_tunnel_id"] == 42
225 assert result["data"] == data
226
227 def test_store_type_router_info(self):
228 key = os.urandom(32)
229 ri_data = os.urandom(300)
230 payload = encode_database_store(key, store_type=0, data=ri_data)
231 result = decode_database_store(payload)
232 assert result["store_type"] == 0
233
234 def test_store_type_lease_set(self):
235 key = os.urandom(32)
236 ls_data = os.urandom(150)
237 payload = encode_database_store(key, store_type=1, data=ls_data)
238 result = decode_database_store(payload)
239 assert result["store_type"] == 1
240
241 def test_full_message_roundtrip_with_short_header(self):
242 """Encode DatabaseStore payload inside a short-header I2NP message."""
243 key = os.urandom(32)
244 data = os.urandom(80)
245 payload = encode_database_store(key, store_type=0, data=data)
246 msg = encode_i2np_short(MSG_TYPE_DATABASE_STORE, 0xABCD, 5000, payload)
247 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_short(msg)
248 assert dec_type == MSG_TYPE_DATABASE_STORE
249 result = decode_database_store(dec_payload)
250 assert result["key"] == key
251 assert result["data"] == data
252
253
254# --- DatabaseLookup (type 2) ---
255
256class TestDatabaseLookup:
257 """DatabaseLookup: key(32) + from_hash(32) + flags(1) + [reply_tunnel_id(4)] + size(1) + exclude_list."""
258
259 def test_encode_no_tunnel_no_excludes(self):
260 key = os.urandom(32)
261 from_hash = os.urandom(32)
262 payload = encode_database_lookup(key, from_hash, flags=0)
263 # key(32) + from(32) + flags(1) + size(1) = 66
264 assert len(payload) == 66
265
266 def test_encode_with_tunnel_flag(self):
267 key = os.urandom(32)
268 from_hash = os.urandom(32)
269 payload = encode_database_lookup(key, from_hash, flags=0x01, reply_tunnel_id=1234)
270 # key(32) + from(32) + flags(1) + tunnel(4) + size(1) = 70
271 assert len(payload) == 70
272
273 def test_encode_with_excludes(self):
274 key = os.urandom(32)
275 from_hash = os.urandom(32)
276 excludes = [os.urandom(32) for _ in range(3)]
277 payload = encode_database_lookup(key, from_hash, flags=0, exclude_list=excludes)
278 # key(32) + from(32) + flags(1) + size(1) + 3*32 = 162
279 assert len(payload) == 162
280
281 def test_roundtrip_no_tunnel_no_excludes(self):
282 key = os.urandom(32)
283 from_hash = os.urandom(32)
284 payload = encode_database_lookup(key, from_hash, flags=0)
285 result = decode_database_lookup(payload)
286 assert result["key"] == key
287 assert result["from_hash"] == from_hash
288 assert result["flags"] == 0
289 assert result["reply_tunnel_id"] == 0
290 assert result["exclude_list"] == []
291
292 def test_roundtrip_with_tunnel(self):
293 key = os.urandom(32)
294 from_hash = os.urandom(32)
295 payload = encode_database_lookup(key, from_hash, flags=0x01, reply_tunnel_id=9999)
296 result = decode_database_lookup(payload)
297 assert result["flags"] == 0x01
298 assert result["reply_tunnel_id"] == 9999
299
300 def test_roundtrip_with_excludes(self):
301 key = os.urandom(32)
302 from_hash = os.urandom(32)
303 excludes = [os.urandom(32) for _ in range(5)]
304 payload = encode_database_lookup(key, from_hash, flags=0, exclude_list=excludes)
305 result = decode_database_lookup(payload)
306 assert result["exclude_list"] == excludes
307
308 def test_roundtrip_tunnel_and_excludes(self):
309 key = os.urandom(32)
310 from_hash = os.urandom(32)
311 excludes = [os.urandom(32), os.urandom(32)]
312 payload = encode_database_lookup(
313 key, from_hash, flags=0x01, reply_tunnel_id=777, exclude_list=excludes,
314 )
315 result = decode_database_lookup(payload)
316 assert result["key"] == key
317 assert result["from_hash"] == from_hash
318 assert result["flags"] == 0x01
319 assert result["reply_tunnel_id"] == 777
320 assert result["exclude_list"] == excludes
321
322
323# --- DatabaseSearchReply (type 3) ---
324
325class TestDatabaseSearchReply:
326 """DatabaseSearchReply: key(32) + num_peers(1) + [peer_hashes(32 each)] + from_hash(32)."""
327
328 def test_encode_no_peers(self):
329 key = os.urandom(32)
330 from_hash = os.urandom(32)
331 payload = encode_database_search_reply(key, [], from_hash)
332 # key(32) + num(1) + from(32) = 65
333 assert len(payload) == 65
334
335 def test_encode_with_peers(self):
336 key = os.urandom(32)
337 from_hash = os.urandom(32)
338 peers = [os.urandom(32) for _ in range(3)]
339 payload = encode_database_search_reply(key, peers, from_hash)
340 # key(32) + num(1) + 3*32 + from(32) = 161
341 assert len(payload) == 161
342
343 def test_roundtrip_no_peers(self):
344 key = os.urandom(32)
345 from_hash = os.urandom(32)
346 payload = encode_database_search_reply(key, [], from_hash)
347 result = decode_database_search_reply(payload)
348 assert result["key"] == key
349 assert result["peer_hashes"] == []
350 assert result["from_hash"] == from_hash
351
352 def test_roundtrip_with_peers(self):
353 key = os.urandom(32)
354 from_hash = os.urandom(32)
355 peers = [os.urandom(32) for _ in range(4)]
356 payload = encode_database_search_reply(key, peers, from_hash)
357 result = decode_database_search_reply(payload)
358 assert result["key"] == key
359 assert result["peer_hashes"] == peers
360 assert result["from_hash"] == from_hash
361
362 def test_roundtrip_single_peer(self):
363 key = os.urandom(32)
364 from_hash = os.urandom(32)
365 peers = [os.urandom(32)]
366 payload = encode_database_search_reply(key, peers, from_hash)
367 result = decode_database_search_reply(payload)
368 assert result["peer_hashes"] == peers
369
370
371# --- DeliveryStatus (type 10) ---
372
373class TestDeliveryStatus:
374 """DeliveryStatus: msg_id(4) + timestamp(8)."""
375
376 def test_encode_length(self):
377 payload = encode_delivery_status(0xDEAD, 1700000000000)
378 assert len(payload) == 12
379
380 def test_encode_fields(self):
381 payload = encode_delivery_status(0x12345678, 9999999999999)
382 msg_id = struct.unpack("!I", payload[:4])[0]
383 timestamp = struct.unpack("!Q", payload[4:12])[0]
384 assert msg_id == 0x12345678
385 assert timestamp == 9999999999999
386
387 def test_roundtrip(self):
388 msg_id = 0xFEEDFACE
389 timestamp = 1700000000123
390 payload = encode_delivery_status(msg_id, timestamp)
391 result = decode_delivery_status(payload)
392 assert result["msg_id"] == msg_id
393 assert result["timestamp"] == timestamp
394
395 def test_zero_values(self):
396 payload = encode_delivery_status(0, 0)
397 result = decode_delivery_status(payload)
398 assert result["msg_id"] == 0
399 assert result["timestamp"] == 0
400
401 def test_full_message_roundtrip_with_standard_header(self):
402 """Encode DeliveryStatus inside a standard-header I2NP message."""
403 payload = encode_delivery_status(42, 1700000000000)
404 msg = encode_i2np_standard(MSG_TYPE_DELIVERY_STATUS, 42, 1700000000000, payload)
405 dec_type, dec_id, dec_exp, dec_payload = decode_i2np_standard(msg)
406 assert dec_type == MSG_TYPE_DELIVERY_STATUS
407 result = decode_delivery_status(dec_payload)
408 assert result["msg_id"] == 42
409 assert result["timestamp"] == 1700000000000