"""Tests for the real NTCP2 listener and connector with AES-CBC handshake and SipHash-obfuscated transport. TDD tests -- written before the implementation in src/i2p_transport/ntcp2_real_server.py. """ import asyncio import hashlib import os from i2p_crypto.x25519 import X25519DH from i2p_transport.ntcp2_blocks import BLOCK_I2NP, BLOCK_ROUTERINFO from i2p_transport.ntcp2_real_connection import NTCP2RealConnection from i2p_transport.ntcp2_real_server import NTCP2RealListener, NTCP2RealConnector # --------------------------------------------------------------------------- # helpers # --------------------------------------------------------------------------- def _make_keypair(): return X25519DH.generate_keypair() def _dummy_ri(size: int = 64) -> bytes: return os.urandom(size) # --------------------------------------------------------------------------- # Test 1: NTCP2RealListener construction # --------------------------------------------------------------------------- class TestListenerConstruction: def test_basic_construction(self): static = _make_keypair() ri_hash = os.urandom(32) iv = os.urandom(16) listener = NTCP2RealListener( host="127.0.0.1", port=0, our_static_key=static, our_ri_hash=ri_hash, our_iv=iv, on_connection=None, ) assert listener is not None def test_construction_with_callback(self): static = _make_keypair() ri_hash = os.urandom(32) iv = os.urandom(16) async def cb(conn): pass listener = NTCP2RealListener( host="127.0.0.1", port=0, our_static_key=static, our_ri_hash=ri_hash, our_iv=iv, on_connection=cb, ) assert listener is not None # --------------------------------------------------------------------------- # Test 2: NTCP2RealConnector construction # --------------------------------------------------------------------------- class TestConnectorConstruction: def test_basic_construction(self): connector = NTCP2RealConnector() assert connector is not None # --------------------------------------------------------------------------- # Test 3: Localhost integration — full real TCP roundtrip # --------------------------------------------------------------------------- class TestLocalhostIntegration: def test_full_roundtrip(self): """Start listener, connect, handshake, exchange I2NP, verify.""" async def _run(): # Responder (Bob) identity bob_static = _make_keypair() bob_ri_bytes = _dummy_ri(128) bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest() bob_iv = os.urandom(16) # Initiator (Alice) identity alice_static = _make_keypair() alice_ri_bytes = _dummy_ri(128) # Track connections received by listener connections = [] conn_event = asyncio.Event() async def on_conn(conn): connections.append(conn) conn_event.set() listener = NTCP2RealListener( host="127.0.0.1", port=0, our_static_key=bob_static, our_ri_hash=bob_ri_hash, our_iv=bob_iv, on_connection=on_conn, ) server = await listener.start() addr = server.sockets[0].getsockname() port = addr[1] try: # Alice connects to Bob connector = NTCP2RealConnector() alice_conn = await connector.connect( host="127.0.0.1", port=port, our_static_key=alice_static, our_ri_bytes=alice_ri_bytes, peer_static_pub=bob_static[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, ) # Wait for Bob's side to complete handshake await asyncio.wait_for(conn_event.wait(), timeout=5.0) assert len(connections) == 1 bob_conn = connections[0] # Both connections should be alive assert alice_conn.is_alive() assert bob_conn.is_alive() # Alice sends I2NP message to Bob test_msg = b"Hello from Alice via real NTCP2!" await alice_conn.send_i2np(test_msg) # Bob receives it blocks = await asyncio.wait_for(bob_conn.recv_frame(), timeout=5.0) i2np_blocks = [b for b in blocks if b.block_type == BLOCK_I2NP] assert len(i2np_blocks) == 1 assert i2np_blocks[0].data == test_msg # Bob replies reply_msg = b"Hello from Bob via real NTCP2!" await bob_conn.send_i2np(reply_msg) # Alice receives reply blocks2 = await asyncio.wait_for(alice_conn.recv_frame(), timeout=5.0) i2np_blocks2 = [b for b in blocks2 if b.block_type == BLOCK_I2NP] assert len(i2np_blocks2) == 1 assert i2np_blocks2[0].data == reply_msg finally: # Close connections before server to avoid Python 3.12+ wait_closed hang alice_conn._writer.close() if connections: connections[0]._writer.close() listener.close() asyncio.run(_run()) # --------------------------------------------------------------------------- # Test 4: Handshake produces correct transport keys # --------------------------------------------------------------------------- class TestTransportKeys: def test_both_sides_have_working_cipher_and_siphash(self): """After handshake, both sides can encrypt/decrypt with SipHash framing.""" async def _run(): bob_static = _make_keypair() bob_ri_bytes = _dummy_ri(128) bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest() bob_iv = os.urandom(16) alice_static = _make_keypair() alice_ri_bytes = _dummy_ri(128) connections = [] conn_event = asyncio.Event() async def on_conn(conn): connections.append(conn) conn_event.set() listener = NTCP2RealListener( host="127.0.0.1", port=0, our_static_key=bob_static, our_ri_hash=bob_ri_hash, our_iv=bob_iv, on_connection=on_conn, ) server = await listener.start() port = server.sockets[0].getsockname()[1] try: connector = NTCP2RealConnector() alice_conn = await connector.connect( host="127.0.0.1", port=port, our_static_key=alice_static, our_ri_bytes=alice_ri_bytes, peer_static_pub=bob_static[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, ) await asyncio.wait_for(conn_event.wait(), timeout=5.0) bob_conn = connections[0] # Verify both are NTCP2RealConnection instances assert isinstance(alice_conn, NTCP2RealConnection) assert isinstance(bob_conn, NTCP2RealConnection) # Send multiple messages in sequence to verify nonce incrementing for i in range(3): msg = f"message-{i}".encode() await alice_conn.send_i2np(msg) blocks = await asyncio.wait_for(bob_conn.recv_frame(), timeout=5.0) i2np = [b for b in blocks if b.block_type == BLOCK_I2NP] assert i2np[0].data == msg finally: alice_conn._writer.close() if connections: connections[0]._writer.close() listener.close() asyncio.run(_run()) # --------------------------------------------------------------------------- # Test 5: Multiple connections # --------------------------------------------------------------------------- class TestMultipleConnections: def test_listener_accepts_multiple_sequential_connections(self): """Listener can accept multiple connections sequentially.""" async def _run(): bob_static = _make_keypair() bob_ri_bytes = _dummy_ri(128) bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest() bob_iv = os.urandom(16) connections = [] conn_events = [asyncio.Event(), asyncio.Event()] conn_idx = 0 async def on_conn(conn): nonlocal conn_idx connections.append(conn) if conn_idx < len(conn_events): conn_events[conn_idx].set() conn_idx += 1 listener = NTCP2RealListener( host="127.0.0.1", port=0, our_static_key=bob_static, our_ri_hash=bob_ri_hash, our_iv=bob_iv, on_connection=on_conn, ) server = await listener.start() port = server.sockets[0].getsockname()[1] try: connector = NTCP2RealConnector() # First connection alice1_static = _make_keypair() alice1_ri = _dummy_ri(64) conn1 = await connector.connect( host="127.0.0.1", port=port, our_static_key=alice1_static, our_ri_bytes=alice1_ri, peer_static_pub=bob_static[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, ) await asyncio.wait_for(conn_events[0].wait(), timeout=5.0) # Second connection alice2_static = _make_keypair() alice2_ri = _dummy_ri(64) conn2 = await connector.connect( host="127.0.0.1", port=port, our_static_key=alice2_static, our_ri_bytes=alice2_ri, peer_static_pub=bob_static[1], peer_ri_hash=bob_ri_hash, peer_iv=bob_iv, ) await asyncio.wait_for(conn_events[1].wait(), timeout=5.0) assert len(connections) == 2 # Verify both are independent — send on each, receive on each await conn1.send_i2np(b"from-conn1") blocks1 = await asyncio.wait_for( connections[0].recv_frame(), timeout=5.0 ) i2np1 = [b for b in blocks1 if b.block_type == BLOCK_I2NP] assert i2np1[0].data == b"from-conn1" await conn2.send_i2np(b"from-conn2") blocks2 = await asyncio.wait_for( connections[1].recv_frame(), timeout=5.0 ) i2np2 = [b for b in blocks2 if b.block_type == BLOCK_I2NP] assert i2np2[0].data == b"from-conn2" finally: conn1._writer.close() conn2._writer.close() for c in connections: c._writer.close() listener.close() asyncio.run(_run())