A Python port of the Invisible Internet Project (I2P)
1"""Tests for the real NTCP2 handshake with AES-CBC obfuscation.
2
3TDD tests -- written before the implementation in
4src/i2p_transport/ntcp2_real_handshake.py.
5"""
6
7import hashlib
8import os
9import struct
10import time
11
12import pytest
13
14from i2p_crypto.x25519 import X25519DH
15from i2p_crypto.siphash import SipHashRatchet
16from i2p_transport.ntcp2_real_handshake import NTCP2RealHandshake
17from i2p_transport.ntcp2_blocks import (
18 decode_blocks,
19 encode_blocks,
20 BLOCK_ROUTERINFO,
21 BLOCK_PADDING,
22 router_info_block,
23 padding_block,
24)
25
26
27# ---------------------------------------------------------------------------
28# helpers
29# ---------------------------------------------------------------------------
30
31def _make_peer_info() -> dict:
32 """Create fake peer info with a router hash and IV ('i' param)."""
33 ri_hash = os.urandom(32)
34 iv = os.urandom(16)
35 return {"ri_hash": ri_hash, "iv": iv}
36
37
38def _make_keypair():
39 return X25519DH.generate_keypair()
40
41
42def _dummy_router_info(size: int = 64) -> bytes:
43 """Return deterministic fake RouterInfo bytes."""
44 return os.urandom(size)
45
46
47# ---------------------------------------------------------------------------
48# Test 1: Construction
49# ---------------------------------------------------------------------------
50
51class TestConstruction:
52 def test_initiator_requires_peer_info(self):
53 s = _make_keypair()
54 peer = _make_peer_info()
55 hs = NTCP2RealHandshake(
56 our_static=s,
57 peer_static_pub=os.urandom(32),
58 peer_ri_hash=peer["ri_hash"],
59 peer_iv=peer["iv"],
60 initiator=True,
61 )
62 assert not hs.is_complete()
63
64 def test_responder_construction(self):
65 s = _make_keypair()
66 peer = _make_peer_info()
67 hs = NTCP2RealHandshake(
68 our_static=s,
69 peer_static_pub=None,
70 peer_ri_hash=peer["ri_hash"],
71 peer_iv=peer["iv"],
72 initiator=False,
73 )
74 assert not hs.is_complete()
75
76
77# ---------------------------------------------------------------------------
78# Test 2: Full handshake roundtrip
79# ---------------------------------------------------------------------------
80
81class TestFullRoundtrip:
82 """Initiator and responder complete a 3-message handshake in memory."""
83
84 def _setup_pair(self, padlen1=0, padlen2=0, ri_bytes=None):
85 alice_s = _make_keypair()
86 bob_s = _make_keypair()
87
88 # Bob's identity info used for AES-CBC obfuscation
89 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
90 bob_iv = os.urandom(16)
91
92 if ri_bytes is None:
93 ri_bytes = _dummy_router_info(64)
94
95 initiator = NTCP2RealHandshake(
96 our_static=alice_s,
97 peer_static_pub=bob_s[1],
98 peer_ri_hash=bob_ri_hash,
99 peer_iv=bob_iv,
100 initiator=True,
101 )
102
103 responder = NTCP2RealHandshake(
104 our_static=bob_s,
105 peer_static_pub=None,
106 peer_ri_hash=bob_ri_hash,
107 peer_iv=bob_iv,
108 initiator=False,
109 )
110
111 return initiator, responder, ri_bytes, padlen1, padlen2
112
113 def test_roundtrip_no_padding(self):
114 initiator, responder, ri_bytes, padlen1, padlen2 = self._setup_pair()
115
116 # Msg 1: initiator -> responder
117 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes)
118 responder.process_session_request(msg1)
119
120 # Msg 2: responder -> initiator
121 msg2 = responder.create_session_created(padding_len=0)
122 initiator.process_session_created(msg2)
123
124 # Msg 3: initiator -> responder
125 msg3 = initiator.create_session_confirmed(router_info=ri_bytes)
126 peer_ri = responder.process_session_confirmed(msg3)
127
128 assert initiator.is_complete()
129 assert responder.is_complete()
130 # Responder should recover initiator's static key
131 assert responder.remote_static_key() == initiator._our_static[1]
132
133 def test_roundtrip_with_padding(self):
134 initiator, responder, ri_bytes, _, _ = self._setup_pair()
135
136 msg1 = initiator.create_session_request(padding_len=32, router_info=ri_bytes)
137 responder.process_session_request(msg1)
138
139 msg2 = responder.create_session_created(padding_len=16)
140 initiator.process_session_created(msg2)
141
142 msg3 = initiator.create_session_confirmed(router_info=ri_bytes)
143 peer_ri = responder.process_session_confirmed(msg3)
144
145 assert initiator.is_complete()
146 assert responder.is_complete()
147
148 def test_responder_recovers_router_info_from_msg3(self):
149 initiator, responder, ri_bytes, _, _ = self._setup_pair()
150
151 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes)
152 responder.process_session_request(msg1)
153
154 msg2 = responder.create_session_created(padding_len=0)
155 initiator.process_session_created(msg2)
156
157 msg3 = initiator.create_session_confirmed(router_info=ri_bytes)
158 blocks = responder.process_session_confirmed(msg3)
159
160 # Should contain at least one RouterInfo block
161 ri_blocks = [b for b in blocks if b.block_type == BLOCK_ROUTERINFO]
162 assert len(ri_blocks) >= 1
163 # The RouterInfo data (after 1-byte flag) should match
164 assert ri_blocks[0].data[1:] == ri_bytes
165
166
167# ---------------------------------------------------------------------------
168# Test 3: AES-CBC obfuscation
169# ---------------------------------------------------------------------------
170
171class TestAESObfuscation:
172 """First 32 bytes of msg1 and msg2 must differ from the plain ephemeral key."""
173
174 def test_msg1_first_32_bytes_are_obfuscated(self):
175 alice_s = _make_keypair()
176 bob_s = _make_keypair()
177 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
178 bob_iv = os.urandom(16)
179 ri_bytes = _dummy_router_info(64)
180
181 initiator = NTCP2RealHandshake(
182 our_static=alice_s,
183 peer_static_pub=bob_s[1],
184 peer_ri_hash=bob_ri_hash,
185 peer_iv=bob_iv,
186 initiator=True,
187 )
188
189 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes)
190
191 # The first 32 bytes should NOT be the raw ephemeral public key
192 raw_ephemeral = initiator.ephemeral_public_key
193 assert msg1[:32] != raw_ephemeral, (
194 "First 32 bytes of msg1 should be AES-CBC encrypted, not raw key"
195 )
196
197 def test_msg2_first_32_bytes_are_obfuscated(self):
198 alice_s = _make_keypair()
199 bob_s = _make_keypair()
200 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
201 bob_iv = os.urandom(16)
202 ri_bytes = _dummy_router_info(64)
203
204 initiator = NTCP2RealHandshake(
205 our_static=alice_s,
206 peer_static_pub=bob_s[1],
207 peer_ri_hash=bob_ri_hash,
208 peer_iv=bob_iv,
209 initiator=True,
210 )
211 responder = NTCP2RealHandshake(
212 our_static=bob_s,
213 peer_static_pub=None,
214 peer_ri_hash=bob_ri_hash,
215 peer_iv=bob_iv,
216 initiator=False,
217 )
218
219 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes)
220 responder.process_session_request(msg1)
221
222 msg2 = responder.create_session_created(padding_len=0)
223
224 raw_ephemeral_y = responder.ephemeral_public_key
225 assert msg2[:32] != raw_ephemeral_y, (
226 "First 32 bytes of msg2 should be AES-CBC encrypted, not raw key"
227 )
228
229
230# ---------------------------------------------------------------------------
231# Test 4: Message sizes
232# ---------------------------------------------------------------------------
233
234class TestMessageSizes:
235 def _do_handshake(self, padlen1, padlen2, ri_size=64):
236 alice_s = _make_keypair()
237 bob_s = _make_keypair()
238 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
239 bob_iv = os.urandom(16)
240 ri_bytes = _dummy_router_info(ri_size)
241
242 ini = NTCP2RealHandshake(
243 our_static=alice_s, peer_static_pub=bob_s[1],
244 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True,
245 )
246 resp = NTCP2RealHandshake(
247 our_static=bob_s, peer_static_pub=None,
248 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False,
249 )
250
251 msg1 = ini.create_session_request(padding_len=padlen1, router_info=ri_bytes)
252 resp.process_session_request(msg1)
253
254 msg2 = resp.create_session_created(padding_len=padlen2)
255 ini.process_session_created(msg2)
256
257 msg3 = ini.create_session_confirmed(router_info=ri_bytes)
258 return msg1, msg2, msg3, ri_bytes
259
260 def test_msg1_size_no_padding(self):
261 msg1, _, _, _ = self._do_handshake(0, 0)
262 # 32 (encrypted ephemeral) + 16 (encrypted options) + 16 (poly tag) = 64
263 assert len(msg1) == 64
264
265 def test_msg1_size_with_padding(self):
266 msg1, _, _, _ = self._do_handshake(32, 0)
267 assert len(msg1) == 64 + 32
268
269 def test_msg2_size_no_padding(self):
270 _, msg2, _, _ = self._do_handshake(0, 0)
271 assert len(msg2) == 64
272
273 def test_msg2_size_with_padding(self):
274 _, msg2, _, _ = self._do_handshake(0, 16)
275 assert len(msg2) == 64 + 16
276
277 def test_msg3_size(self):
278 # msg3 = 48 (encrypted static + tag) + part2_len
279 # part2 = encoded blocks + 16 (AEAD tag)
280 ri_size = 64
281 _, _, msg3, ri_bytes = self._do_handshake(0, 0, ri_size=ri_size)
282 # Part 1 is always 48 bytes (32-byte static key + 16-byte tag)
283 part1_len = 48
284 assert len(msg3) > part1_len
285
286
287# ---------------------------------------------------------------------------
288# Test 5: Options encoding/decoding
289# ---------------------------------------------------------------------------
290
291class TestOptionsInHandshake:
292 def test_options_roundtrip_in_msg1(self):
293 """Options encoded in msg1 are correctly parsed by responder."""
294 alice_s = _make_keypair()
295 bob_s = _make_keypair()
296 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
297 bob_iv = os.urandom(16)
298 ri_bytes = _dummy_router_info(64)
299
300 ini = NTCP2RealHandshake(
301 our_static=alice_s, peer_static_pub=bob_s[1],
302 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True,
303 )
304 resp = NTCP2RealHandshake(
305 our_static=bob_s, peer_static_pub=None,
306 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False,
307 )
308
309 msg1 = ini.create_session_request(padding_len=32, router_info=ri_bytes)
310 resp.process_session_request(msg1)
311
312 # Responder should have parsed options from msg1
313 assert resp.peer_options is not None
314 assert resp.peer_options["padlen1"] == 32
315 assert resp.peer_options["version"] == 2
316 assert resp.peer_options["network_id"] == 2 # I2P mainnet
317
318
319# ---------------------------------------------------------------------------
320# Test 6: SipHash key derivation after split
321# ---------------------------------------------------------------------------
322
323class TestSplitAndSipHash:
324 def test_split_returns_cipher_states_and_siphash(self):
325 alice_s = _make_keypair()
326 bob_s = _make_keypair()
327 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
328 bob_iv = os.urandom(16)
329 ri_bytes = _dummy_router_info(64)
330
331 ini = NTCP2RealHandshake(
332 our_static=alice_s, peer_static_pub=bob_s[1],
333 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True,
334 )
335 resp = NTCP2RealHandshake(
336 our_static=bob_s, peer_static_pub=None,
337 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False,
338 )
339
340 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes)
341 resp.process_session_request(msg1)
342 msg2 = resp.create_session_created(padding_len=0)
343 ini.process_session_created(msg2)
344 msg3 = ini.create_session_confirmed(router_info=ri_bytes)
345 resp.process_session_confirmed(msg3)
346
347 ini_result = ini.split()
348 resp_result = resp.split()
349
350 # Each result has send_cipher, recv_cipher, send_siphash, recv_siphash
351 assert ini_result.send_cipher is not None
352 assert ini_result.recv_cipher is not None
353 assert isinstance(ini_result.send_siphash, SipHashRatchet)
354 assert isinstance(ini_result.recv_siphash, SipHashRatchet)
355
356 # Initiator's send should match responder's recv and vice versa
357 # Verify by encrypting and decrypting a test payload
358 plaintext = b"hello NTCP2 transport"
359 ct = ini_result.send_cipher.encrypt_with_ad(b"", plaintext)
360 pt = resp_result.recv_cipher.decrypt_with_ad(b"", ct)
361 assert pt == plaintext
362
363 ct2 = resp_result.send_cipher.encrypt_with_ad(b"", b"reply")
364 pt2 = ini_result.recv_cipher.decrypt_with_ad(b"", ct2)
365 assert pt2 == b"reply"
366
367 def test_siphash_ratchets_agree(self):
368 """Initiator's send siphash produces same sequence as responder's recv."""
369 alice_s = _make_keypair()
370 bob_s = _make_keypair()
371 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
372 bob_iv = os.urandom(16)
373 ri_bytes = _dummy_router_info(64)
374
375 ini = NTCP2RealHandshake(
376 our_static=alice_s, peer_static_pub=bob_s[1],
377 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True,
378 )
379 resp = NTCP2RealHandshake(
380 our_static=bob_s, peer_static_pub=None,
381 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False,
382 )
383
384 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes)
385 resp.process_session_request(msg1)
386 msg2 = resp.create_session_created(padding_len=0)
387 ini.process_session_created(msg2)
388 msg3 = ini.create_session_confirmed(router_info=ri_bytes)
389 resp.process_session_confirmed(msg3)
390
391 ini_result = ini.split()
392 resp_result = resp.split()
393
394 # Obfuscate/deobfuscate a length value
395 obf = ini_result.send_siphash.obfuscate_length(1234)
396 deobf = resp_result.recv_siphash.deobfuscate_length(obf)
397 assert deobf == 1234
398
399
400# ---------------------------------------------------------------------------
401# Test 7: Block parsing from msg3 part2
402# ---------------------------------------------------------------------------
403
404class TestMsg3BlockParsing:
405 def test_msg3_contains_router_info_block(self):
406 alice_s = _make_keypair()
407 bob_s = _make_keypair()
408 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
409 bob_iv = os.urandom(16)
410 ri_bytes = _dummy_router_info(128)
411
412 ini = NTCP2RealHandshake(
413 our_static=alice_s, peer_static_pub=bob_s[1],
414 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True,
415 )
416 resp = NTCP2RealHandshake(
417 our_static=bob_s, peer_static_pub=None,
418 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False,
419 )
420
421 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes)
422 resp.process_session_request(msg1)
423 msg2 = resp.create_session_created(padding_len=0)
424 ini.process_session_created(msg2)
425
426 msg3 = ini.create_session_confirmed(router_info=ri_bytes)
427 blocks = resp.process_session_confirmed(msg3)
428
429 # Should have decoded NTCP2 blocks
430 assert len(blocks) >= 1
431 ri_block = [b for b in blocks if b.block_type == BLOCK_ROUTERINFO]
432 assert len(ri_block) == 1
433 # Data starts with 1-byte flag, then the RI bytes
434 assert ri_block[0].data[1:] == ri_bytes
435
436 def test_msg3_with_padding_block(self):
437 alice_s = _make_keypair()
438 bob_s = _make_keypair()
439 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest()
440 bob_iv = os.urandom(16)
441 ri_bytes = _dummy_router_info(64)
442
443 ini = NTCP2RealHandshake(
444 our_static=alice_s, peer_static_pub=bob_s[1],
445 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True,
446 )
447 resp = NTCP2RealHandshake(
448 our_static=bob_s, peer_static_pub=None,
449 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False,
450 )
451
452 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes)
453 resp.process_session_request(msg1)
454 msg2 = resp.create_session_created(padding_len=0)
455 ini.process_session_created(msg2)
456
457 # msg3 always includes a padding block (size set by _padlen3 from msg1)
458 # _padlen3 is 0, so padding block has 0-length data
459 msg3 = ini.create_session_confirmed(router_info=ri_bytes)
460 blocks = resp.process_session_confirmed(msg3)
461
462 pad_blocks = [b for b in blocks if b.block_type == BLOCK_PADDING]
463 assert len(pad_blocks) == 1
464 # Also verify options block is present
465 from i2p_transport.ntcp2_blocks import BLOCK_OPTIONS
466 opts_blocks = [b for b in blocks if b.block_type == BLOCK_OPTIONS]
467 assert len(opts_blocks) == 1
468 assert len(opts_blocks[0].data) == 12