"""Tests for the real NTCP2 handshake with AES-CBC obfuscation. TDD tests -- written before the implementation in src/i2p_transport/ntcp2_real_handshake.py. """ import hashlib import os import struct import time import pytest from i2p_crypto.x25519 import X25519DH from i2p_crypto.siphash import SipHashRatchet from i2p_transport.ntcp2_real_handshake import NTCP2RealHandshake from i2p_transport.ntcp2_blocks import ( decode_blocks, encode_blocks, BLOCK_ROUTERINFO, BLOCK_PADDING, router_info_block, padding_block, ) # --------------------------------------------------------------------------- # helpers # --------------------------------------------------------------------------- def _make_peer_info() -> dict: """Create fake peer info with a router hash and IV ('i' param).""" ri_hash = os.urandom(32) iv = os.urandom(16) return {"ri_hash": ri_hash, "iv": iv} def _make_keypair(): return X25519DH.generate_keypair() def _dummy_router_info(size: int = 64) -> bytes: """Return deterministic fake RouterInfo bytes.""" return os.urandom(size) # --------------------------------------------------------------------------- # Test 1: Construction # --------------------------------------------------------------------------- class TestConstruction: def test_initiator_requires_peer_info(self): s = _make_keypair() peer = _make_peer_info() hs = NTCP2RealHandshake( our_static=s, peer_static_pub=os.urandom(32), peer_ri_hash=peer["ri_hash"], peer_iv=peer["iv"], initiator=True, ) assert not hs.is_complete() def test_responder_construction(self): s = _make_keypair() peer = _make_peer_info() hs = NTCP2RealHandshake( our_static=s, peer_static_pub=None, peer_ri_hash=peer["ri_hash"], peer_iv=peer["iv"], initiator=False, ) assert not hs.is_complete() # --------------------------------------------------------------------------- # Test 2: Full handshake roundtrip # --------------------------------------------------------------------------- class TestFullRoundtrip: """Initiator and responder complete a 3-message handshake in memory.""" def _setup_pair(self, padlen1=0, padlen2=0, ri_bytes=None): alice_s = _make_keypair() bob_s = _make_keypair() # Bob's identity info used for AES-CBC obfuscation bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) if ri_bytes is None: ri_bytes = _dummy_router_info(64) initiator = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) responder = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) return initiator, responder, ri_bytes, padlen1, padlen2 def test_roundtrip_no_padding(self): initiator, responder, ri_bytes, padlen1, padlen2 = self._setup_pair() # Msg 1: initiator -> responder msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) responder.process_session_request(msg1) # Msg 2: responder -> initiator msg2 = responder.create_session_created(padding_len=0) initiator.process_session_created(msg2) # Msg 3: initiator -> responder msg3 = initiator.create_session_confirmed(router_info=ri_bytes) peer_ri = responder.process_session_confirmed(msg3) assert initiator.is_complete() assert responder.is_complete() # Responder should recover initiator's static key assert responder.remote_static_key() == initiator._our_static[1] def test_roundtrip_with_padding(self): initiator, responder, ri_bytes, _, _ = self._setup_pair() msg1 = initiator.create_session_request(padding_len=32, router_info=ri_bytes) responder.process_session_request(msg1) msg2 = responder.create_session_created(padding_len=16) initiator.process_session_created(msg2) msg3 = initiator.create_session_confirmed(router_info=ri_bytes) peer_ri = responder.process_session_confirmed(msg3) assert initiator.is_complete() assert responder.is_complete() def test_responder_recovers_router_info_from_msg3(self): initiator, responder, ri_bytes, _, _ = self._setup_pair() msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) responder.process_session_request(msg1) msg2 = responder.create_session_created(padding_len=0) initiator.process_session_created(msg2) msg3 = initiator.create_session_confirmed(router_info=ri_bytes) blocks = responder.process_session_confirmed(msg3) # Should contain at least one RouterInfo block ri_blocks = [b for b in blocks if b.block_type == BLOCK_ROUTERINFO] assert len(ri_blocks) >= 1 # The RouterInfo data (after 1-byte flag) should match assert ri_blocks[0].data[1:] == ri_bytes # --------------------------------------------------------------------------- # Test 3: AES-CBC obfuscation # --------------------------------------------------------------------------- class TestAESObfuscation: """First 32 bytes of msg1 and msg2 must differ from the plain ephemeral key.""" def test_msg1_first_32_bytes_are_obfuscated(self): alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(64) initiator = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) # The first 32 bytes should NOT be the raw ephemeral public key raw_ephemeral = initiator.ephemeral_public_key assert msg1[:32] != raw_ephemeral, ( "First 32 bytes of msg1 should be AES-CBC encrypted, not raw key" ) def test_msg2_first_32_bytes_are_obfuscated(self): alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(64) initiator = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) responder = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) responder.process_session_request(msg1) msg2 = responder.create_session_created(padding_len=0) raw_ephemeral_y = responder.ephemeral_public_key assert msg2[:32] != raw_ephemeral_y, ( "First 32 bytes of msg2 should be AES-CBC encrypted, not raw key" ) # --------------------------------------------------------------------------- # Test 4: Message sizes # --------------------------------------------------------------------------- class TestMessageSizes: def _do_handshake(self, padlen1, padlen2, ri_size=64): alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(ri_size) ini = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) resp = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) msg1 = ini.create_session_request(padding_len=padlen1, router_info=ri_bytes) resp.process_session_request(msg1) msg2 = resp.create_session_created(padding_len=padlen2) ini.process_session_created(msg2) msg3 = ini.create_session_confirmed(router_info=ri_bytes) return msg1, msg2, msg3, ri_bytes def test_msg1_size_no_padding(self): msg1, _, _, _ = self._do_handshake(0, 0) # 32 (encrypted ephemeral) + 16 (encrypted options) + 16 (poly tag) = 64 assert len(msg1) == 64 def test_msg1_size_with_padding(self): msg1, _, _, _ = self._do_handshake(32, 0) assert len(msg1) == 64 + 32 def test_msg2_size_no_padding(self): _, msg2, _, _ = self._do_handshake(0, 0) assert len(msg2) == 64 def test_msg2_size_with_padding(self): _, msg2, _, _ = self._do_handshake(0, 16) assert len(msg2) == 64 + 16 def test_msg3_size(self): # msg3 = 48 (encrypted static + tag) + part2_len # part2 = encoded blocks + 16 (AEAD tag) ri_size = 64 _, _, msg3, ri_bytes = self._do_handshake(0, 0, ri_size=ri_size) # Part 1 is always 48 bytes (32-byte static key + 16-byte tag) part1_len = 48 assert len(msg3) > part1_len # --------------------------------------------------------------------------- # Test 5: Options encoding/decoding # --------------------------------------------------------------------------- class TestOptionsInHandshake: def test_options_roundtrip_in_msg1(self): """Options encoded in msg1 are correctly parsed by responder.""" alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(64) ini = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) resp = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) msg1 = ini.create_session_request(padding_len=32, router_info=ri_bytes) resp.process_session_request(msg1) # Responder should have parsed options from msg1 assert resp.peer_options is not None assert resp.peer_options["padlen1"] == 32 assert resp.peer_options["version"] == 2 assert resp.peer_options["network_id"] == 2 # I2P mainnet # --------------------------------------------------------------------------- # Test 6: SipHash key derivation after split # --------------------------------------------------------------------------- class TestSplitAndSipHash: def test_split_returns_cipher_states_and_siphash(self): alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(64) ini = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) resp = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) resp.process_session_request(msg1) msg2 = resp.create_session_created(padding_len=0) ini.process_session_created(msg2) msg3 = ini.create_session_confirmed(router_info=ri_bytes) resp.process_session_confirmed(msg3) ini_result = ini.split() resp_result = resp.split() # Each result has send_cipher, recv_cipher, send_siphash, recv_siphash assert ini_result.send_cipher is not None assert ini_result.recv_cipher is not None assert isinstance(ini_result.send_siphash, SipHashRatchet) assert isinstance(ini_result.recv_siphash, SipHashRatchet) # Initiator's send should match responder's recv and vice versa # Verify by encrypting and decrypting a test payload plaintext = b"hello NTCP2 transport" ct = ini_result.send_cipher.encrypt_with_ad(b"", plaintext) pt = resp_result.recv_cipher.decrypt_with_ad(b"", ct) assert pt == plaintext ct2 = resp_result.send_cipher.encrypt_with_ad(b"", b"reply") pt2 = ini_result.recv_cipher.decrypt_with_ad(b"", ct2) assert pt2 == b"reply" def test_siphash_ratchets_agree(self): """Initiator's send siphash produces same sequence as responder's recv.""" alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(64) ini = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) resp = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) resp.process_session_request(msg1) msg2 = resp.create_session_created(padding_len=0) ini.process_session_created(msg2) msg3 = ini.create_session_confirmed(router_info=ri_bytes) resp.process_session_confirmed(msg3) ini_result = ini.split() resp_result = resp.split() # Obfuscate/deobfuscate a length value obf = ini_result.send_siphash.obfuscate_length(1234) deobf = resp_result.recv_siphash.deobfuscate_length(obf) assert deobf == 1234 # --------------------------------------------------------------------------- # Test 7: Block parsing from msg3 part2 # --------------------------------------------------------------------------- class TestMsg3BlockParsing: def test_msg3_contains_router_info_block(self): alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(128) ini = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) resp = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) resp.process_session_request(msg1) msg2 = resp.create_session_created(padding_len=0) ini.process_session_created(msg2) msg3 = ini.create_session_confirmed(router_info=ri_bytes) blocks = resp.process_session_confirmed(msg3) # Should have decoded NTCP2 blocks assert len(blocks) >= 1 ri_block = [b for b in blocks if b.block_type == BLOCK_ROUTERINFO] assert len(ri_block) == 1 # Data starts with 1-byte flag, then the RI bytes assert ri_block[0].data[1:] == ri_bytes def test_msg3_with_padding_block(self): alice_s = _make_keypair() bob_s = _make_keypair() bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() bob_iv = os.urandom(16) ri_bytes = _dummy_router_info(64) ini = NTCP2RealHandshake( our_static=alice_s, peer_static_pub=bob_s[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, ) resp = NTCP2RealHandshake( our_static=bob_s, peer_static_pub=None, peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, ) msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) resp.process_session_request(msg1) msg2 = resp.create_session_created(padding_len=0) ini.process_session_created(msg2) # msg3 always includes a padding block (size set by _padlen3 from msg1) # _padlen3 is 0, so padding block has 0-length data msg3 = ini.create_session_confirmed(router_info=ri_bytes) blocks = resp.process_session_confirmed(msg3) pad_blocks = [b for b in blocks if b.block_type == BLOCK_PADDING] assert len(pad_blocks) == 1 # Also verify options block is present from i2p_transport.ntcp2_blocks import BLOCK_OPTIONS opts_blocks = [b for b in blocks if b.block_type == BLOCK_OPTIONS] assert len(opts_blocks) == 1 assert len(opts_blocks[0].data) == 12