A Python port of the Invisible Internet Project (I2P)
at main 468 lines 17 kB view raw
1"""Tests for the real NTCP2 handshake with AES-CBC obfuscation. 2 3TDD tests -- written before the implementation in 4src/i2p_transport/ntcp2_real_handshake.py. 5""" 6 7import hashlib 8import os 9import struct 10import time 11 12import pytest 13 14from i2p_crypto.x25519 import X25519DH 15from i2p_crypto.siphash import SipHashRatchet 16from i2p_transport.ntcp2_real_handshake import NTCP2RealHandshake 17from i2p_transport.ntcp2_blocks import ( 18 decode_blocks, 19 encode_blocks, 20 BLOCK_ROUTERINFO, 21 BLOCK_PADDING, 22 router_info_block, 23 padding_block, 24) 25 26 27# --------------------------------------------------------------------------- 28# helpers 29# --------------------------------------------------------------------------- 30 31def _make_peer_info() -> dict: 32 """Create fake peer info with a router hash and IV ('i' param).""" 33 ri_hash = os.urandom(32) 34 iv = os.urandom(16) 35 return {"ri_hash": ri_hash, "iv": iv} 36 37 38def _make_keypair(): 39 return X25519DH.generate_keypair() 40 41 42def _dummy_router_info(size: int = 64) -> bytes: 43 """Return deterministic fake RouterInfo bytes.""" 44 return os.urandom(size) 45 46 47# --------------------------------------------------------------------------- 48# Test 1: Construction 49# --------------------------------------------------------------------------- 50 51class TestConstruction: 52 def test_initiator_requires_peer_info(self): 53 s = _make_keypair() 54 peer = _make_peer_info() 55 hs = NTCP2RealHandshake( 56 our_static=s, 57 peer_static_pub=os.urandom(32), 58 peer_ri_hash=peer["ri_hash"], 59 peer_iv=peer["iv"], 60 initiator=True, 61 ) 62 assert not hs.is_complete() 63 64 def test_responder_construction(self): 65 s = _make_keypair() 66 peer = _make_peer_info() 67 hs = NTCP2RealHandshake( 68 our_static=s, 69 peer_static_pub=None, 70 peer_ri_hash=peer["ri_hash"], 71 peer_iv=peer["iv"], 72 initiator=False, 73 ) 74 assert not hs.is_complete() 75 76 77# --------------------------------------------------------------------------- 78# Test 2: Full handshake roundtrip 79# --------------------------------------------------------------------------- 80 81class TestFullRoundtrip: 82 """Initiator and responder complete a 3-message handshake in memory.""" 83 84 def _setup_pair(self, padlen1=0, padlen2=0, ri_bytes=None): 85 alice_s = _make_keypair() 86 bob_s = _make_keypair() 87 88 # Bob's identity info used for AES-CBC obfuscation 89 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 90 bob_iv = os.urandom(16) 91 92 if ri_bytes is None: 93 ri_bytes = _dummy_router_info(64) 94 95 initiator = NTCP2RealHandshake( 96 our_static=alice_s, 97 peer_static_pub=bob_s[1], 98 peer_ri_hash=bob_ri_hash, 99 peer_iv=bob_iv, 100 initiator=True, 101 ) 102 103 responder = NTCP2RealHandshake( 104 our_static=bob_s, 105 peer_static_pub=None, 106 peer_ri_hash=bob_ri_hash, 107 peer_iv=bob_iv, 108 initiator=False, 109 ) 110 111 return initiator, responder, ri_bytes, padlen1, padlen2 112 113 def test_roundtrip_no_padding(self): 114 initiator, responder, ri_bytes, padlen1, padlen2 = self._setup_pair() 115 116 # Msg 1: initiator -> responder 117 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) 118 responder.process_session_request(msg1) 119 120 # Msg 2: responder -> initiator 121 msg2 = responder.create_session_created(padding_len=0) 122 initiator.process_session_created(msg2) 123 124 # Msg 3: initiator -> responder 125 msg3 = initiator.create_session_confirmed(router_info=ri_bytes) 126 peer_ri = responder.process_session_confirmed(msg3) 127 128 assert initiator.is_complete() 129 assert responder.is_complete() 130 # Responder should recover initiator's static key 131 assert responder.remote_static_key() == initiator._our_static[1] 132 133 def test_roundtrip_with_padding(self): 134 initiator, responder, ri_bytes, _, _ = self._setup_pair() 135 136 msg1 = initiator.create_session_request(padding_len=32, router_info=ri_bytes) 137 responder.process_session_request(msg1) 138 139 msg2 = responder.create_session_created(padding_len=16) 140 initiator.process_session_created(msg2) 141 142 msg3 = initiator.create_session_confirmed(router_info=ri_bytes) 143 peer_ri = responder.process_session_confirmed(msg3) 144 145 assert initiator.is_complete() 146 assert responder.is_complete() 147 148 def test_responder_recovers_router_info_from_msg3(self): 149 initiator, responder, ri_bytes, _, _ = self._setup_pair() 150 151 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) 152 responder.process_session_request(msg1) 153 154 msg2 = responder.create_session_created(padding_len=0) 155 initiator.process_session_created(msg2) 156 157 msg3 = initiator.create_session_confirmed(router_info=ri_bytes) 158 blocks = responder.process_session_confirmed(msg3) 159 160 # Should contain at least one RouterInfo block 161 ri_blocks = [b for b in blocks if b.block_type == BLOCK_ROUTERINFO] 162 assert len(ri_blocks) >= 1 163 # The RouterInfo data (after 1-byte flag) should match 164 assert ri_blocks[0].data[1:] == ri_bytes 165 166 167# --------------------------------------------------------------------------- 168# Test 3: AES-CBC obfuscation 169# --------------------------------------------------------------------------- 170 171class TestAESObfuscation: 172 """First 32 bytes of msg1 and msg2 must differ from the plain ephemeral key.""" 173 174 def test_msg1_first_32_bytes_are_obfuscated(self): 175 alice_s = _make_keypair() 176 bob_s = _make_keypair() 177 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 178 bob_iv = os.urandom(16) 179 ri_bytes = _dummy_router_info(64) 180 181 initiator = NTCP2RealHandshake( 182 our_static=alice_s, 183 peer_static_pub=bob_s[1], 184 peer_ri_hash=bob_ri_hash, 185 peer_iv=bob_iv, 186 initiator=True, 187 ) 188 189 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) 190 191 # The first 32 bytes should NOT be the raw ephemeral public key 192 raw_ephemeral = initiator.ephemeral_public_key 193 assert msg1[:32] != raw_ephemeral, ( 194 "First 32 bytes of msg1 should be AES-CBC encrypted, not raw key" 195 ) 196 197 def test_msg2_first_32_bytes_are_obfuscated(self): 198 alice_s = _make_keypair() 199 bob_s = _make_keypair() 200 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 201 bob_iv = os.urandom(16) 202 ri_bytes = _dummy_router_info(64) 203 204 initiator = NTCP2RealHandshake( 205 our_static=alice_s, 206 peer_static_pub=bob_s[1], 207 peer_ri_hash=bob_ri_hash, 208 peer_iv=bob_iv, 209 initiator=True, 210 ) 211 responder = NTCP2RealHandshake( 212 our_static=bob_s, 213 peer_static_pub=None, 214 peer_ri_hash=bob_ri_hash, 215 peer_iv=bob_iv, 216 initiator=False, 217 ) 218 219 msg1 = initiator.create_session_request(padding_len=0, router_info=ri_bytes) 220 responder.process_session_request(msg1) 221 222 msg2 = responder.create_session_created(padding_len=0) 223 224 raw_ephemeral_y = responder.ephemeral_public_key 225 assert msg2[:32] != raw_ephemeral_y, ( 226 "First 32 bytes of msg2 should be AES-CBC encrypted, not raw key" 227 ) 228 229 230# --------------------------------------------------------------------------- 231# Test 4: Message sizes 232# --------------------------------------------------------------------------- 233 234class TestMessageSizes: 235 def _do_handshake(self, padlen1, padlen2, ri_size=64): 236 alice_s = _make_keypair() 237 bob_s = _make_keypair() 238 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 239 bob_iv = os.urandom(16) 240 ri_bytes = _dummy_router_info(ri_size) 241 242 ini = NTCP2RealHandshake( 243 our_static=alice_s, peer_static_pub=bob_s[1], 244 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, 245 ) 246 resp = NTCP2RealHandshake( 247 our_static=bob_s, peer_static_pub=None, 248 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, 249 ) 250 251 msg1 = ini.create_session_request(padding_len=padlen1, router_info=ri_bytes) 252 resp.process_session_request(msg1) 253 254 msg2 = resp.create_session_created(padding_len=padlen2) 255 ini.process_session_created(msg2) 256 257 msg3 = ini.create_session_confirmed(router_info=ri_bytes) 258 return msg1, msg2, msg3, ri_bytes 259 260 def test_msg1_size_no_padding(self): 261 msg1, _, _, _ = self._do_handshake(0, 0) 262 # 32 (encrypted ephemeral) + 16 (encrypted options) + 16 (poly tag) = 64 263 assert len(msg1) == 64 264 265 def test_msg1_size_with_padding(self): 266 msg1, _, _, _ = self._do_handshake(32, 0) 267 assert len(msg1) == 64 + 32 268 269 def test_msg2_size_no_padding(self): 270 _, msg2, _, _ = self._do_handshake(0, 0) 271 assert len(msg2) == 64 272 273 def test_msg2_size_with_padding(self): 274 _, msg2, _, _ = self._do_handshake(0, 16) 275 assert len(msg2) == 64 + 16 276 277 def test_msg3_size(self): 278 # msg3 = 48 (encrypted static + tag) + part2_len 279 # part2 = encoded blocks + 16 (AEAD tag) 280 ri_size = 64 281 _, _, msg3, ri_bytes = self._do_handshake(0, 0, ri_size=ri_size) 282 # Part 1 is always 48 bytes (32-byte static key + 16-byte tag) 283 part1_len = 48 284 assert len(msg3) > part1_len 285 286 287# --------------------------------------------------------------------------- 288# Test 5: Options encoding/decoding 289# --------------------------------------------------------------------------- 290 291class TestOptionsInHandshake: 292 def test_options_roundtrip_in_msg1(self): 293 """Options encoded in msg1 are correctly parsed by responder.""" 294 alice_s = _make_keypair() 295 bob_s = _make_keypair() 296 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 297 bob_iv = os.urandom(16) 298 ri_bytes = _dummy_router_info(64) 299 300 ini = NTCP2RealHandshake( 301 our_static=alice_s, peer_static_pub=bob_s[1], 302 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, 303 ) 304 resp = NTCP2RealHandshake( 305 our_static=bob_s, peer_static_pub=None, 306 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, 307 ) 308 309 msg1 = ini.create_session_request(padding_len=32, router_info=ri_bytes) 310 resp.process_session_request(msg1) 311 312 # Responder should have parsed options from msg1 313 assert resp.peer_options is not None 314 assert resp.peer_options["padlen1"] == 32 315 assert resp.peer_options["version"] == 2 316 assert resp.peer_options["network_id"] == 2 # I2P mainnet 317 318 319# --------------------------------------------------------------------------- 320# Test 6: SipHash key derivation after split 321# --------------------------------------------------------------------------- 322 323class TestSplitAndSipHash: 324 def test_split_returns_cipher_states_and_siphash(self): 325 alice_s = _make_keypair() 326 bob_s = _make_keypair() 327 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 328 bob_iv = os.urandom(16) 329 ri_bytes = _dummy_router_info(64) 330 331 ini = NTCP2RealHandshake( 332 our_static=alice_s, peer_static_pub=bob_s[1], 333 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, 334 ) 335 resp = NTCP2RealHandshake( 336 our_static=bob_s, peer_static_pub=None, 337 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, 338 ) 339 340 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) 341 resp.process_session_request(msg1) 342 msg2 = resp.create_session_created(padding_len=0) 343 ini.process_session_created(msg2) 344 msg3 = ini.create_session_confirmed(router_info=ri_bytes) 345 resp.process_session_confirmed(msg3) 346 347 ini_result = ini.split() 348 resp_result = resp.split() 349 350 # Each result has send_cipher, recv_cipher, send_siphash, recv_siphash 351 assert ini_result.send_cipher is not None 352 assert ini_result.recv_cipher is not None 353 assert isinstance(ini_result.send_siphash, SipHashRatchet) 354 assert isinstance(ini_result.recv_siphash, SipHashRatchet) 355 356 # Initiator's send should match responder's recv and vice versa 357 # Verify by encrypting and decrypting a test payload 358 plaintext = b"hello NTCP2 transport" 359 ct = ini_result.send_cipher.encrypt_with_ad(b"", plaintext) 360 pt = resp_result.recv_cipher.decrypt_with_ad(b"", ct) 361 assert pt == plaintext 362 363 ct2 = resp_result.send_cipher.encrypt_with_ad(b"", b"reply") 364 pt2 = ini_result.recv_cipher.decrypt_with_ad(b"", ct2) 365 assert pt2 == b"reply" 366 367 def test_siphash_ratchets_agree(self): 368 """Initiator's send siphash produces same sequence as responder's recv.""" 369 alice_s = _make_keypair() 370 bob_s = _make_keypair() 371 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 372 bob_iv = os.urandom(16) 373 ri_bytes = _dummy_router_info(64) 374 375 ini = NTCP2RealHandshake( 376 our_static=alice_s, peer_static_pub=bob_s[1], 377 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, 378 ) 379 resp = NTCP2RealHandshake( 380 our_static=bob_s, peer_static_pub=None, 381 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, 382 ) 383 384 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) 385 resp.process_session_request(msg1) 386 msg2 = resp.create_session_created(padding_len=0) 387 ini.process_session_created(msg2) 388 msg3 = ini.create_session_confirmed(router_info=ri_bytes) 389 resp.process_session_confirmed(msg3) 390 391 ini_result = ini.split() 392 resp_result = resp.split() 393 394 # Obfuscate/deobfuscate a length value 395 obf = ini_result.send_siphash.obfuscate_length(1234) 396 deobf = resp_result.recv_siphash.deobfuscate_length(obf) 397 assert deobf == 1234 398 399 400# --------------------------------------------------------------------------- 401# Test 7: Block parsing from msg3 part2 402# --------------------------------------------------------------------------- 403 404class TestMsg3BlockParsing: 405 def test_msg3_contains_router_info_block(self): 406 alice_s = _make_keypair() 407 bob_s = _make_keypair() 408 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 409 bob_iv = os.urandom(16) 410 ri_bytes = _dummy_router_info(128) 411 412 ini = NTCP2RealHandshake( 413 our_static=alice_s, peer_static_pub=bob_s[1], 414 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, 415 ) 416 resp = NTCP2RealHandshake( 417 our_static=bob_s, peer_static_pub=None, 418 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, 419 ) 420 421 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) 422 resp.process_session_request(msg1) 423 msg2 = resp.create_session_created(padding_len=0) 424 ini.process_session_created(msg2) 425 426 msg3 = ini.create_session_confirmed(router_info=ri_bytes) 427 blocks = resp.process_session_confirmed(msg3) 428 429 # Should have decoded NTCP2 blocks 430 assert len(blocks) >= 1 431 ri_block = [b for b in blocks if b.block_type == BLOCK_ROUTERINFO] 432 assert len(ri_block) == 1 433 # Data starts with 1-byte flag, then the RI bytes 434 assert ri_block[0].data[1:] == ri_bytes 435 436 def test_msg3_with_padding_block(self): 437 alice_s = _make_keypair() 438 bob_s = _make_keypair() 439 bob_ri_hash = hashlib.sha256(b"bob-router-info").digest() 440 bob_iv = os.urandom(16) 441 ri_bytes = _dummy_router_info(64) 442 443 ini = NTCP2RealHandshake( 444 our_static=alice_s, peer_static_pub=bob_s[1], 445 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=True, 446 ) 447 resp = NTCP2RealHandshake( 448 our_static=bob_s, peer_static_pub=None, 449 peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, initiator=False, 450 ) 451 452 msg1 = ini.create_session_request(padding_len=0, router_info=ri_bytes) 453 resp.process_session_request(msg1) 454 msg2 = resp.create_session_created(padding_len=0) 455 ini.process_session_created(msg2) 456 457 # msg3 always includes a padding block (size set by _padlen3 from msg1) 458 # _padlen3 is 0, so padding block has 0-length data 459 msg3 = ini.create_session_confirmed(router_info=ri_bytes) 460 blocks = resp.process_session_confirmed(msg3) 461 462 pad_blocks = [b for b in blocks if b.block_type == BLOCK_PADDING] 463 assert len(pad_blocks) == 1 464 # Also verify options block is present 465 from i2p_transport.ntcp2_blocks import BLOCK_OPTIONS 466 opts_blocks = [b for b in blocks if b.block_type == BLOCK_OPTIONS] 467 assert len(opts_blocks) == 1 468 assert len(opts_blocks[0].data) == 12