A Python port of the Invisible Internet Project (I2P)
at main 396 lines 14 kB view raw
1"""Tests for SSU2 asyncio UDP server. 2 3Tests cover: 4- UDP server lifecycle (start/stop) 5- Packet dispatch (handshake vs data) 6- Peer state management 7- Peer test protocol (Alice/Bob/Charlie roles) 8- Introduction/relay protocol for firewalled peers 9- Integration with TransportManager 10""" 11 12import asyncio 13import os 14import struct 15import time 16 17import pytest 18 19from i2p_crypto.x25519 import X25519DH 20from i2p_transport.ssu2_handshake import ( 21 HandshakeKeys, 22 TokenManager, 23 OutboundHandshake, 24 InboundHandshake, 25 LONG_HEADER_SIZE, 26 SHORT_HEADER_SIZE, 27 PKT_TOKEN_REQUEST, 28 PKT_SESSION_REQUEST, 29 PKT_DATA, 30 PROTOCOL_VERSION, 31 NETWORK_ID, 32 _build_long_header, 33) 34from i2p_transport.ssu2_connection import SSU2Connection 35from i2p_transport.ssu2_payload import PaddingBlock, build_payload 36from i2p_transport.transport_base import TransportStyle, ReachabilityStatus 37 38 39# --------------------------------------------------------------------------- 40# Helpers 41# --------------------------------------------------------------------------- 42 43def _make_keys() -> HandshakeKeys: 44 """Create dummy handshake keys for testing.""" 45 return HandshakeKeys( 46 send_cipher_key=os.urandom(32), 47 recv_cipher_key=os.urandom(32), 48 send_header_key=os.urandom(32), 49 recv_header_key=os.urandom(32), 50 ) 51 52 53def _make_keypair() -> tuple[bytes, bytes]: 54 """Generate an X25519 keypair.""" 55 return X25519DH.generate_keypair() 56 57 58# --------------------------------------------------------------------------- 59# SSU2Transport tests 60# --------------------------------------------------------------------------- 61 62class TestSSU2Transport: 63 """Test the SSU2Transport UDP server.""" 64 65 @pytest.fixture 66 def static_keypair(self): 67 return _make_keypair() 68 69 @pytest.fixture 70 def intro_key(self): 71 return os.urandom(32) 72 73 def test_transport_style(self, static_keypair, intro_key): 74 from i2p_transport.ssu2_server import SSU2Transport 75 t = SSU2Transport( 76 host="127.0.0.1", port=0, 77 static_key=static_keypair[0], 78 intro_key=intro_key, 79 ) 80 assert t.style == TransportStyle.SSU2 81 82 def test_initial_state(self, static_keypair, intro_key): 83 from i2p_transport.ssu2_server import SSU2Transport 84 t = SSU2Transport( 85 host="127.0.0.1", port=0, 86 static_key=static_keypair[0], 87 intro_key=intro_key, 88 ) 89 assert not t.is_running 90 assert t.reachability == ReachabilityStatus.UNKNOWN 91 assert t.current_address is None 92 93 def test_start_stop(self, static_keypair, intro_key): 94 async def _run(): 95 from i2p_transport.ssu2_server import SSU2Transport 96 t = SSU2Transport( 97 host="127.0.0.1", port=0, 98 static_key=static_keypair[0], 99 intro_key=intro_key, 100 ) 101 await t.start() 102 assert t.is_running 103 addr = t.current_address 104 assert addr is not None 105 assert addr["host"] == "127.0.0.1" 106 assert addr["port"] > 0 107 assert addr["style"] == "SSU2" 108 await t.stop() 109 assert not t.is_running 110 asyncio.run(_run()) 111 112 def test_bid_no_connection(self, static_keypair, intro_key): 113 """Bid returns WILL_NOT_SEND when no connection to peer.""" 114 async def _run(): 115 from i2p_transport.ssu2_server import SSU2Transport 116 t = SSU2Transport( 117 host="127.0.0.1", port=0, 118 static_key=static_keypair[0], 119 intro_key=intro_key, 120 ) 121 await t.start() 122 bid = await t.bid(os.urandom(32)) 123 from i2p_transport.transport_base import TransportBid 124 assert bid.latency_ms == TransportBid.WILL_NOT_SEND 125 await t.stop() 126 asyncio.run(_run()) 127 128 def test_send_without_connection_fails(self, static_keypair, intro_key): 129 """Send returns False when no connection exists.""" 130 async def _run(): 131 from i2p_transport.ssu2_server import SSU2Transport 132 t = SSU2Transport( 133 host="127.0.0.1", port=0, 134 static_key=static_keypair[0], 135 intro_key=intro_key, 136 ) 137 await t.start() 138 ok = await t.send(os.urandom(32), b"hello") 139 assert ok is False 140 await t.stop() 141 asyncio.run(_run()) 142 143 144# --------------------------------------------------------------------------- 145# EstablishmentManager tests 146# --------------------------------------------------------------------------- 147 148class TestEstablishmentManager: 149 """Test the SSU2 handshake dispatch manager.""" 150 151 def test_create_inbound_handshake(self): 152 from i2p_transport.ssu2_server import EstablishmentManager 153 priv, pub = _make_keypair() 154 intro_key = os.urandom(32) 155 tm = TokenManager() 156 em = EstablishmentManager( 157 local_static_key=priv, 158 local_intro_key=intro_key, 159 token_manager=tm, 160 ) 161 hs = em.create_inbound_handshake() 162 assert hs is not None 163 164 def test_create_outbound_handshake(self): 165 from i2p_transport.ssu2_server import EstablishmentManager 166 priv, pub = _make_keypair() 167 intro_key = os.urandom(32) 168 tm = TokenManager() 169 em = EstablishmentManager( 170 local_static_key=priv, 171 local_intro_key=intro_key, 172 token_manager=tm, 173 ) 174 remote_priv, remote_pub = _make_keypair() 175 remote_intro = os.urandom(32) 176 hs = em.create_outbound_handshake(remote_pub, remote_intro) 177 assert hs is not None 178 179 def test_track_pending(self): 180 from i2p_transport.ssu2_server import EstablishmentManager 181 priv, pub = _make_keypair() 182 intro_key = os.urandom(32) 183 tm = TokenManager() 184 em = EstablishmentManager( 185 local_static_key=priv, 186 local_intro_key=intro_key, 187 token_manager=tm, 188 ) 189 hs = em.create_inbound_handshake() 190 conn_id = hs._src_conn_id 191 em.add_pending(conn_id, hs) 192 assert em.get_pending(conn_id) is hs 193 em.remove_pending(conn_id) 194 assert em.get_pending(conn_id) is None 195 196 197# --------------------------------------------------------------------------- 198# PeerStateMap tests 199# --------------------------------------------------------------------------- 200 201class TestPeerStateMap: 202 """Test the peer connection state tracking.""" 203 204 def test_add_and_get(self): 205 from i2p_transport.ssu2_server import PeerStateMap 206 psm = PeerStateMap() 207 keys = _make_keys() 208 conn = SSU2Connection( 209 keys=keys, src_conn_id=1, dest_conn_id=2, 210 remote_address=("127.0.0.1", 5000), is_initiator=True, 211 ) 212 peer_hash = os.urandom(32) 213 psm.add(peer_hash, conn, ("127.0.0.1", 5000)) 214 assert psm.get_by_peer(peer_hash) is conn 215 assert psm.get_by_address(("127.0.0.1", 5000)) is conn 216 217 def test_get_by_conn_id(self): 218 from i2p_transport.ssu2_server import PeerStateMap 219 psm = PeerStateMap() 220 keys = _make_keys() 221 conn = SSU2Connection( 222 keys=keys, src_conn_id=42, dest_conn_id=99, 223 remote_address=("127.0.0.1", 5000), is_initiator=True, 224 ) 225 peer_hash = os.urandom(32) 226 psm.add(peer_hash, conn, ("127.0.0.1", 5000)) 227 # Look up by our src_conn_id 228 assert psm.get_by_conn_id(42) is conn 229 230 def test_remove(self): 231 from i2p_transport.ssu2_server import PeerStateMap 232 psm = PeerStateMap() 233 keys = _make_keys() 234 conn = SSU2Connection( 235 keys=keys, src_conn_id=1, dest_conn_id=2, 236 remote_address=("10.0.0.1", 3000), is_initiator=False, 237 ) 238 peer_hash = os.urandom(32) 239 psm.add(peer_hash, conn, ("10.0.0.1", 3000)) 240 psm.remove(peer_hash) 241 assert psm.get_by_peer(peer_hash) is None 242 assert psm.get_by_address(("10.0.0.1", 3000)) is None 243 244 def test_active_count(self): 245 from i2p_transport.ssu2_server import PeerStateMap 246 psm = PeerStateMap() 247 assert psm.active_count == 0 248 keys = _make_keys() 249 for i in range(3): 250 conn = SSU2Connection( 251 keys=keys, src_conn_id=i, dest_conn_id=i + 100, 252 remote_address=("127.0.0.1", 5000 + i), is_initiator=True, 253 ) 254 psm.add(os.urandom(32), conn, ("127.0.0.1", 5000 + i)) 255 assert psm.active_count == 3 256 257 258# --------------------------------------------------------------------------- 259# Peer test protocol tests 260# --------------------------------------------------------------------------- 261 262class TestPeerTestProtocol: 263 """Test the three-party NAT detection protocol (Alice/Bob/Charlie).""" 264 265 def test_create_peer_test_request(self): 266 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole 267 ptm = PeerTestManager() 268 nonce, msg = ptm.create_test_request() 269 assert nonce > 0 270 assert len(msg) > 0 271 assert ptm.get_pending_test(nonce) is not None 272 273 def test_peer_test_role_alice(self): 274 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole 275 ptm = PeerTestManager() 276 nonce, msg = ptm.create_test_request() 277 pending = ptm.get_pending_test(nonce) 278 assert pending["role"] == PeerTestRole.ALICE 279 280 def test_process_test_response(self): 281 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole 282 ptm = PeerTestManager() 283 nonce, msg = ptm.create_test_request() 284 # Simulate Charlie's response: code 0 = success, meaning we're reachable 285 result = ptm.process_test_response(nonce, result_code=0, ip=b"\x7f\x00\x00\x01", port=12345) 286 assert result is not None 287 assert result["reachable"] is True 288 289 def test_process_unknown_nonce(self): 290 from i2p_transport.ssu2_server import PeerTestManager 291 ptm = PeerTestManager() 292 result = ptm.process_test_response(99999, result_code=0, ip=b"\x7f\x00\x00\x01", port=12345) 293 assert result is None 294 295 def test_bob_role_relay(self): 296 """Bob relays peer test from Alice to Charlie.""" 297 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole 298 ptm = PeerTestManager() 299 # Simulate receiving a peer test from Alice, acting as Bob 300 relay_msg = ptm.create_relay_to_charlie( 301 nonce=12345, 302 alice_ip=b"\x7f\x00\x00\x01", 303 alice_port=5000, 304 ) 305 assert relay_msg is not None 306 assert len(relay_msg) > 0 307 308 309# --------------------------------------------------------------------------- 310# Introduction/relay protocol tests 311# --------------------------------------------------------------------------- 312 313class TestRelayProtocol: 314 """Test relay/introduction protocol for firewalled peers.""" 315 316 def test_create_relay_request(self): 317 from i2p_transport.ssu2_server import RelayManager 318 rm = RelayManager() 319 nonce, msg = rm.create_relay_request( 320 relay_tag=42, 321 target_hash=os.urandom(32), 322 ) 323 assert nonce > 0 324 assert len(msg) > 0 325 326 def test_process_relay_intro(self): 327 """Introducer receives relay intro and creates response.""" 328 from i2p_transport.ssu2_server import RelayManager 329 rm = RelayManager() 330 # Simulate receiving a relay intro — the introducer relays to the target 331 response = rm.process_relay_intro( 332 nonce=42, 333 requester_ip=b"\x0a\x00\x00\x01", 334 requester_port=8000, 335 target_hash=os.urandom(32), 336 ) 337 assert response is not None 338 339 def test_relay_tag_management(self): 340 from i2p_transport.ssu2_server import RelayManager 341 rm = RelayManager() 342 peer_hash = os.urandom(32) 343 tag = rm.assign_relay_tag(peer_hash) 344 assert tag > 0 345 assert rm.get_peer_for_tag(tag) == peer_hash 346 rm.remove_relay_tag(tag) 347 assert rm.get_peer_for_tag(tag) is None 348 349 def test_relay_response_success(self): 350 from i2p_transport.ssu2_server import RelayManager 351 rm = RelayManager() 352 nonce, _ = rm.create_relay_request( 353 relay_tag=42, 354 target_hash=os.urandom(32), 355 ) 356 result = rm.process_relay_response( 357 nonce=nonce, 358 result_code=0, 359 target_ip=b"\x7f\x00\x00\x01", 360 target_port=9999, 361 ) 362 assert result is not None 363 assert result["success"] is True 364 365 366# --------------------------------------------------------------------------- 367# UDP protocol handler tests 368# --------------------------------------------------------------------------- 369 370class TestSSU2Protocol: 371 """Test the asyncio DatagramProtocol handler.""" 372 373 def test_classify_packet_long_header(self): 374 from i2p_transport.ssu2_server import classify_packet, PacketClass 375 # Build a long header (32 bytes + some payload) 376 header = _build_long_header( 377 dest_conn_id=1, pkt_num=0, pkt_type=PKT_TOKEN_REQUEST, 378 version=PROTOCOL_VERSION, net_id=NETWORK_ID, 379 src_conn_id=2, token=0, 380 ) 381 packet = header + os.urandom(32) 382 cls = classify_packet(packet) 383 assert cls == PacketClass.HANDSHAKE 384 385 def test_classify_packet_short(self): 386 from i2p_transport.ssu2_server import classify_packet, PacketClass 387 # A short (< 32 byte) packet with DATA type 388 header = struct.pack("!QIB", 1, 0, PKT_DATA) + b"\x00\x00\x00" 389 packet = header + os.urandom(64) 390 cls = classify_packet(packet) 391 assert cls == PacketClass.DATA 392 393 def test_classify_too_small(self): 394 from i2p_transport.ssu2_server import classify_packet, PacketClass 395 cls = classify_packet(b"\x00" * 4) 396 assert cls == PacketClass.INVALID