A Python port of the Invisible Internet Project (I2P)
at main 331 lines 12 kB view raw
1"""Tests for the real NTCP2 listener and connector with AES-CBC handshake 2and SipHash-obfuscated transport. 3 4TDD tests -- written before the implementation in 5src/i2p_transport/ntcp2_real_server.py. 6""" 7 8import asyncio 9import hashlib 10import os 11 12from i2p_crypto.x25519 import X25519DH 13from i2p_transport.ntcp2_blocks import BLOCK_I2NP, BLOCK_ROUTERINFO 14from i2p_transport.ntcp2_real_connection import NTCP2RealConnection 15from i2p_transport.ntcp2_real_server import NTCP2RealListener, NTCP2RealConnector 16 17 18# --------------------------------------------------------------------------- 19# helpers 20# --------------------------------------------------------------------------- 21 22def _make_keypair(): 23 return X25519DH.generate_keypair() 24 25 26def _dummy_ri(size: int = 64) -> bytes: 27 return os.urandom(size) 28 29 30# --------------------------------------------------------------------------- 31# Test 1: NTCP2RealListener construction 32# --------------------------------------------------------------------------- 33 34class TestListenerConstruction: 35 def test_basic_construction(self): 36 static = _make_keypair() 37 ri_hash = os.urandom(32) 38 iv = os.urandom(16) 39 40 listener = NTCP2RealListener( 41 host="127.0.0.1", 42 port=0, 43 our_static_key=static, 44 our_ri_hash=ri_hash, 45 our_iv=iv, 46 on_connection=None, 47 ) 48 assert listener is not None 49 50 def test_construction_with_callback(self): 51 static = _make_keypair() 52 ri_hash = os.urandom(32) 53 iv = os.urandom(16) 54 55 async def cb(conn): 56 pass 57 58 listener = NTCP2RealListener( 59 host="127.0.0.1", 60 port=0, 61 our_static_key=static, 62 our_ri_hash=ri_hash, 63 our_iv=iv, 64 on_connection=cb, 65 ) 66 assert listener is not None 67 68 69# --------------------------------------------------------------------------- 70# Test 2: NTCP2RealConnector construction 71# --------------------------------------------------------------------------- 72 73class TestConnectorConstruction: 74 def test_basic_construction(self): 75 connector = NTCP2RealConnector() 76 assert connector is not None 77 78 79# --------------------------------------------------------------------------- 80# Test 3: Localhost integration — full real TCP roundtrip 81# --------------------------------------------------------------------------- 82 83class TestLocalhostIntegration: 84 def test_full_roundtrip(self): 85 """Start listener, connect, handshake, exchange I2NP, verify.""" 86 87 async def _run(): 88 # Responder (Bob) identity 89 bob_static = _make_keypair() 90 bob_ri_bytes = _dummy_ri(128) 91 bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest() 92 bob_iv = os.urandom(16) 93 94 # Initiator (Alice) identity 95 alice_static = _make_keypair() 96 alice_ri_bytes = _dummy_ri(128) 97 98 # Track connections received by listener 99 connections = [] 100 conn_event = asyncio.Event() 101 102 async def on_conn(conn): 103 connections.append(conn) 104 conn_event.set() 105 106 listener = NTCP2RealListener( 107 host="127.0.0.1", 108 port=0, 109 our_static_key=bob_static, 110 our_ri_hash=bob_ri_hash, 111 our_iv=bob_iv, 112 on_connection=on_conn, 113 ) 114 server = await listener.start() 115 addr = server.sockets[0].getsockname() 116 port = addr[1] 117 118 try: 119 # Alice connects to Bob 120 connector = NTCP2RealConnector() 121 alice_conn = await connector.connect( 122 host="127.0.0.1", 123 port=port, 124 our_static_key=alice_static, 125 our_ri_bytes=alice_ri_bytes, 126 peer_static_pub=bob_static[1], 127 peer_ri_hash=bob_ri_hash, 128 peer_iv=bob_iv, 129 ) 130 131 # Wait for Bob's side to complete handshake 132 await asyncio.wait_for(conn_event.wait(), timeout=5.0) 133 assert len(connections) == 1 134 bob_conn = connections[0] 135 136 # Both connections should be alive 137 assert alice_conn.is_alive() 138 assert bob_conn.is_alive() 139 140 # Alice sends I2NP message to Bob 141 test_msg = b"Hello from Alice via real NTCP2!" 142 await alice_conn.send_i2np(test_msg) 143 144 # Bob receives it 145 blocks = await asyncio.wait_for(bob_conn.recv_frame(), timeout=5.0) 146 i2np_blocks = [b for b in blocks if b.block_type == BLOCK_I2NP] 147 assert len(i2np_blocks) == 1 148 assert i2np_blocks[0].data == test_msg 149 150 # Bob replies 151 reply_msg = b"Hello from Bob via real NTCP2!" 152 await bob_conn.send_i2np(reply_msg) 153 154 # Alice receives reply 155 blocks2 = await asyncio.wait_for(alice_conn.recv_frame(), timeout=5.0) 156 i2np_blocks2 = [b for b in blocks2 if b.block_type == BLOCK_I2NP] 157 assert len(i2np_blocks2) == 1 158 assert i2np_blocks2[0].data == reply_msg 159 160 finally: 161 # Close connections before server to avoid Python 3.12+ wait_closed hang 162 alice_conn._writer.close() 163 if connections: 164 connections[0]._writer.close() 165 listener.close() 166 167 asyncio.run(_run()) 168 169 170# --------------------------------------------------------------------------- 171# Test 4: Handshake produces correct transport keys 172# --------------------------------------------------------------------------- 173 174class TestTransportKeys: 175 def test_both_sides_have_working_cipher_and_siphash(self): 176 """After handshake, both sides can encrypt/decrypt with SipHash framing.""" 177 178 async def _run(): 179 bob_static = _make_keypair() 180 bob_ri_bytes = _dummy_ri(128) 181 bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest() 182 bob_iv = os.urandom(16) 183 184 alice_static = _make_keypair() 185 alice_ri_bytes = _dummy_ri(128) 186 187 connections = [] 188 conn_event = asyncio.Event() 189 190 async def on_conn(conn): 191 connections.append(conn) 192 conn_event.set() 193 194 listener = NTCP2RealListener( 195 host="127.0.0.1", 196 port=0, 197 our_static_key=bob_static, 198 our_ri_hash=bob_ri_hash, 199 our_iv=bob_iv, 200 on_connection=on_conn, 201 ) 202 server = await listener.start() 203 port = server.sockets[0].getsockname()[1] 204 205 try: 206 connector = NTCP2RealConnector() 207 alice_conn = await connector.connect( 208 host="127.0.0.1", 209 port=port, 210 our_static_key=alice_static, 211 our_ri_bytes=alice_ri_bytes, 212 peer_static_pub=bob_static[1], 213 peer_ri_hash=bob_ri_hash, 214 peer_iv=bob_iv, 215 ) 216 await asyncio.wait_for(conn_event.wait(), timeout=5.0) 217 bob_conn = connections[0] 218 219 # Verify both are NTCP2RealConnection instances 220 assert isinstance(alice_conn, NTCP2RealConnection) 221 assert isinstance(bob_conn, NTCP2RealConnection) 222 223 # Send multiple messages in sequence to verify nonce incrementing 224 for i in range(3): 225 msg = f"message-{i}".encode() 226 await alice_conn.send_i2np(msg) 227 blocks = await asyncio.wait_for(bob_conn.recv_frame(), timeout=5.0) 228 i2np = [b for b in blocks if b.block_type == BLOCK_I2NP] 229 assert i2np[0].data == msg 230 231 finally: 232 alice_conn._writer.close() 233 if connections: 234 connections[0]._writer.close() 235 listener.close() 236 237 asyncio.run(_run()) 238 239 240# --------------------------------------------------------------------------- 241# Test 5: Multiple connections 242# --------------------------------------------------------------------------- 243 244class TestMultipleConnections: 245 def test_listener_accepts_multiple_sequential_connections(self): 246 """Listener can accept multiple connections sequentially.""" 247 248 async def _run(): 249 bob_static = _make_keypair() 250 bob_ri_bytes = _dummy_ri(128) 251 bob_ri_hash = hashlib.sha256(bob_ri_bytes).digest() 252 bob_iv = os.urandom(16) 253 254 connections = [] 255 conn_events = [asyncio.Event(), asyncio.Event()] 256 conn_idx = 0 257 258 async def on_conn(conn): 259 nonlocal conn_idx 260 connections.append(conn) 261 if conn_idx < len(conn_events): 262 conn_events[conn_idx].set() 263 conn_idx += 1 264 265 listener = NTCP2RealListener( 266 host="127.0.0.1", 267 port=0, 268 our_static_key=bob_static, 269 our_ri_hash=bob_ri_hash, 270 our_iv=bob_iv, 271 on_connection=on_conn, 272 ) 273 server = await listener.start() 274 port = server.sockets[0].getsockname()[1] 275 276 try: 277 connector = NTCP2RealConnector() 278 279 # First connection 280 alice1_static = _make_keypair() 281 alice1_ri = _dummy_ri(64) 282 conn1 = await connector.connect( 283 host="127.0.0.1", 284 port=port, 285 our_static_key=alice1_static, 286 our_ri_bytes=alice1_ri, 287 peer_static_pub=bob_static[1], 288 peer_ri_hash=bob_ri_hash, 289 peer_iv=bob_iv, 290 ) 291 await asyncio.wait_for(conn_events[0].wait(), timeout=5.0) 292 293 # Second connection 294 alice2_static = _make_keypair() 295 alice2_ri = _dummy_ri(64) 296 conn2 = await connector.connect( 297 host="127.0.0.1", 298 port=port, 299 our_static_key=alice2_static, 300 our_ri_bytes=alice2_ri, 301 peer_static_pub=bob_static[1], 302 peer_ri_hash=bob_ri_hash, 303 peer_iv=bob_iv, 304 ) 305 await asyncio.wait_for(conn_events[1].wait(), timeout=5.0) 306 307 assert len(connections) == 2 308 309 # Verify both are independent — send on each, receive on each 310 await conn1.send_i2np(b"from-conn1") 311 blocks1 = await asyncio.wait_for( 312 connections[0].recv_frame(), timeout=5.0 313 ) 314 i2np1 = [b for b in blocks1 if b.block_type == BLOCK_I2NP] 315 assert i2np1[0].data == b"from-conn1" 316 317 await conn2.send_i2np(b"from-conn2") 318 blocks2 = await asyncio.wait_for( 319 connections[1].recv_frame(), timeout=5.0 320 ) 321 i2np2 = [b for b in blocks2 if b.block_type == BLOCK_I2NP] 322 assert i2np2[0].data == b"from-conn2" 323 324 finally: 325 conn1._writer.close() 326 conn2._writer.close() 327 for c in connections: 328 c._writer.close() 329 listener.close() 330 331 asyncio.run(_run())