"""Tests for SSU2 asyncio UDP server. Tests cover: - UDP server lifecycle (start/stop) - Packet dispatch (handshake vs data) - Peer state management - Peer test protocol (Alice/Bob/Charlie roles) - Introduction/relay protocol for firewalled peers - Integration with TransportManager """ import asyncio import os import struct import time import pytest from i2p_crypto.x25519 import X25519DH from i2p_transport.ssu2_handshake import ( HandshakeKeys, TokenManager, OutboundHandshake, InboundHandshake, LONG_HEADER_SIZE, SHORT_HEADER_SIZE, PKT_TOKEN_REQUEST, PKT_SESSION_REQUEST, PKT_DATA, PROTOCOL_VERSION, NETWORK_ID, _build_long_header, ) from i2p_transport.ssu2_connection import SSU2Connection from i2p_transport.ssu2_payload import PaddingBlock, build_payload from i2p_transport.transport_base import TransportStyle, ReachabilityStatus # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_keys() -> HandshakeKeys: """Create dummy handshake keys for testing.""" return HandshakeKeys( send_cipher_key=os.urandom(32), recv_cipher_key=os.urandom(32), send_header_key=os.urandom(32), recv_header_key=os.urandom(32), ) def _make_keypair() -> tuple[bytes, bytes]: """Generate an X25519 keypair.""" return X25519DH.generate_keypair() # --------------------------------------------------------------------------- # SSU2Transport tests # --------------------------------------------------------------------------- class TestSSU2Transport: """Test the SSU2Transport UDP server.""" @pytest.fixture def static_keypair(self): return _make_keypair() @pytest.fixture def intro_key(self): return os.urandom(32) def test_transport_style(self, static_keypair, intro_key): from i2p_transport.ssu2_server import SSU2Transport t = SSU2Transport( host="127.0.0.1", port=0, static_key=static_keypair[0], intro_key=intro_key, ) assert t.style == TransportStyle.SSU2 def test_initial_state(self, static_keypair, intro_key): from i2p_transport.ssu2_server import SSU2Transport t = SSU2Transport( host="127.0.0.1", port=0, static_key=static_keypair[0], intro_key=intro_key, ) assert not t.is_running assert t.reachability == ReachabilityStatus.UNKNOWN assert t.current_address is None def test_start_stop(self, static_keypair, intro_key): async def _run(): from i2p_transport.ssu2_server import SSU2Transport t = SSU2Transport( host="127.0.0.1", port=0, static_key=static_keypair[0], intro_key=intro_key, ) await t.start() assert t.is_running addr = t.current_address assert addr is not None assert addr["host"] == "127.0.0.1" assert addr["port"] > 0 assert addr["style"] == "SSU2" await t.stop() assert not t.is_running asyncio.run(_run()) def test_bid_no_connection(self, static_keypair, intro_key): """Bid returns WILL_NOT_SEND when no connection to peer.""" async def _run(): from i2p_transport.ssu2_server import SSU2Transport t = SSU2Transport( host="127.0.0.1", port=0, static_key=static_keypair[0], intro_key=intro_key, ) await t.start() bid = await t.bid(os.urandom(32)) from i2p_transport.transport_base import TransportBid assert bid.latency_ms == TransportBid.WILL_NOT_SEND await t.stop() asyncio.run(_run()) def test_send_without_connection_fails(self, static_keypair, intro_key): """Send returns False when no connection exists.""" async def _run(): from i2p_transport.ssu2_server import SSU2Transport t = SSU2Transport( host="127.0.0.1", port=0, static_key=static_keypair[0], intro_key=intro_key, ) await t.start() ok = await t.send(os.urandom(32), b"hello") assert ok is False await t.stop() asyncio.run(_run()) # --------------------------------------------------------------------------- # EstablishmentManager tests # --------------------------------------------------------------------------- class TestEstablishmentManager: """Test the SSU2 handshake dispatch manager.""" def test_create_inbound_handshake(self): from i2p_transport.ssu2_server import EstablishmentManager priv, pub = _make_keypair() intro_key = os.urandom(32) tm = TokenManager() em = EstablishmentManager( local_static_key=priv, local_intro_key=intro_key, token_manager=tm, ) hs = em.create_inbound_handshake() assert hs is not None def test_create_outbound_handshake(self): from i2p_transport.ssu2_server import EstablishmentManager priv, pub = _make_keypair() intro_key = os.urandom(32) tm = TokenManager() em = EstablishmentManager( local_static_key=priv, local_intro_key=intro_key, token_manager=tm, ) remote_priv, remote_pub = _make_keypair() remote_intro = os.urandom(32) hs = em.create_outbound_handshake(remote_pub, remote_intro) assert hs is not None def test_track_pending(self): from i2p_transport.ssu2_server import EstablishmentManager priv, pub = _make_keypair() intro_key = os.urandom(32) tm = TokenManager() em = EstablishmentManager( local_static_key=priv, local_intro_key=intro_key, token_manager=tm, ) hs = em.create_inbound_handshake() conn_id = hs._src_conn_id em.add_pending(conn_id, hs) assert em.get_pending(conn_id) is hs em.remove_pending(conn_id) assert em.get_pending(conn_id) is None # --------------------------------------------------------------------------- # PeerStateMap tests # --------------------------------------------------------------------------- class TestPeerStateMap: """Test the peer connection state tracking.""" def test_add_and_get(self): from i2p_transport.ssu2_server import PeerStateMap psm = PeerStateMap() keys = _make_keys() conn = SSU2Connection( keys=keys, src_conn_id=1, dest_conn_id=2, remote_address=("127.0.0.1", 5000), is_initiator=True, ) peer_hash = os.urandom(32) psm.add(peer_hash, conn, ("127.0.0.1", 5000)) assert psm.get_by_peer(peer_hash) is conn assert psm.get_by_address(("127.0.0.1", 5000)) is conn def test_get_by_conn_id(self): from i2p_transport.ssu2_server import PeerStateMap psm = PeerStateMap() keys = _make_keys() conn = SSU2Connection( keys=keys, src_conn_id=42, dest_conn_id=99, remote_address=("127.0.0.1", 5000), is_initiator=True, ) peer_hash = os.urandom(32) psm.add(peer_hash, conn, ("127.0.0.1", 5000)) # Look up by our src_conn_id assert psm.get_by_conn_id(42) is conn def test_remove(self): from i2p_transport.ssu2_server import PeerStateMap psm = PeerStateMap() keys = _make_keys() conn = SSU2Connection( keys=keys, src_conn_id=1, dest_conn_id=2, remote_address=("10.0.0.1", 3000), is_initiator=False, ) peer_hash = os.urandom(32) psm.add(peer_hash, conn, ("10.0.0.1", 3000)) psm.remove(peer_hash) assert psm.get_by_peer(peer_hash) is None assert psm.get_by_address(("10.0.0.1", 3000)) is None def test_active_count(self): from i2p_transport.ssu2_server import PeerStateMap psm = PeerStateMap() assert psm.active_count == 0 keys = _make_keys() for i in range(3): conn = SSU2Connection( keys=keys, src_conn_id=i, dest_conn_id=i + 100, remote_address=("127.0.0.1", 5000 + i), is_initiator=True, ) psm.add(os.urandom(32), conn, ("127.0.0.1", 5000 + i)) assert psm.active_count == 3 # --------------------------------------------------------------------------- # Peer test protocol tests # --------------------------------------------------------------------------- class TestPeerTestProtocol: """Test the three-party NAT detection protocol (Alice/Bob/Charlie).""" def test_create_peer_test_request(self): from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole ptm = PeerTestManager() nonce, msg = ptm.create_test_request() assert nonce > 0 assert len(msg) > 0 assert ptm.get_pending_test(nonce) is not None def test_peer_test_role_alice(self): from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole ptm = PeerTestManager() nonce, msg = ptm.create_test_request() pending = ptm.get_pending_test(nonce) assert pending["role"] == PeerTestRole.ALICE def test_process_test_response(self): from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole ptm = PeerTestManager() nonce, msg = ptm.create_test_request() # Simulate Charlie's response: code 0 = success, meaning we're reachable result = ptm.process_test_response(nonce, result_code=0, ip=b"\x7f\x00\x00\x01", port=12345) assert result is not None assert result["reachable"] is True def test_process_unknown_nonce(self): from i2p_transport.ssu2_server import PeerTestManager ptm = PeerTestManager() result = ptm.process_test_response(99999, result_code=0, ip=b"\x7f\x00\x00\x01", port=12345) assert result is None def test_bob_role_relay(self): """Bob relays peer test from Alice to Charlie.""" from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole ptm = PeerTestManager() # Simulate receiving a peer test from Alice, acting as Bob relay_msg = ptm.create_relay_to_charlie( nonce=12345, alice_ip=b"\x7f\x00\x00\x01", alice_port=5000, ) assert relay_msg is not None assert len(relay_msg) > 0 # --------------------------------------------------------------------------- # Introduction/relay protocol tests # --------------------------------------------------------------------------- class TestRelayProtocol: """Test relay/introduction protocol for firewalled peers.""" def test_create_relay_request(self): from i2p_transport.ssu2_server import RelayManager rm = RelayManager() nonce, msg = rm.create_relay_request( relay_tag=42, target_hash=os.urandom(32), ) assert nonce > 0 assert len(msg) > 0 def test_process_relay_intro(self): """Introducer receives relay intro and creates response.""" from i2p_transport.ssu2_server import RelayManager rm = RelayManager() # Simulate receiving a relay intro — the introducer relays to the target response = rm.process_relay_intro( nonce=42, requester_ip=b"\x0a\x00\x00\x01", requester_port=8000, target_hash=os.urandom(32), ) assert response is not None def test_relay_tag_management(self): from i2p_transport.ssu2_server import RelayManager rm = RelayManager() peer_hash = os.urandom(32) tag = rm.assign_relay_tag(peer_hash) assert tag > 0 assert rm.get_peer_for_tag(tag) == peer_hash rm.remove_relay_tag(tag) assert rm.get_peer_for_tag(tag) is None def test_relay_response_success(self): from i2p_transport.ssu2_server import RelayManager rm = RelayManager() nonce, _ = rm.create_relay_request( relay_tag=42, target_hash=os.urandom(32), ) result = rm.process_relay_response( nonce=nonce, result_code=0, target_ip=b"\x7f\x00\x00\x01", target_port=9999, ) assert result is not None assert result["success"] is True # --------------------------------------------------------------------------- # UDP protocol handler tests # --------------------------------------------------------------------------- class TestSSU2Protocol: """Test the asyncio DatagramProtocol handler.""" def test_classify_packet_long_header(self): from i2p_transport.ssu2_server import classify_packet, PacketClass # Build a long header (32 bytes + some payload) header = _build_long_header( dest_conn_id=1, pkt_num=0, pkt_type=PKT_TOKEN_REQUEST, version=PROTOCOL_VERSION, net_id=NETWORK_ID, src_conn_id=2, token=0, ) packet = header + os.urandom(32) cls = classify_packet(packet) assert cls == PacketClass.HANDSHAKE def test_classify_packet_short(self): from i2p_transport.ssu2_server import classify_packet, PacketClass # A short (< 32 byte) packet with DATA type header = struct.pack("!QIB", 1, 0, PKT_DATA) + b"\x00\x00\x00" packet = header + os.urandom(64) cls = classify_packet(packet) assert cls == PacketClass.DATA def test_classify_too_small(self): from i2p_transport.ssu2_server import classify_packet, PacketClass cls = classify_packet(b"\x00" * 4) assert cls == PacketClass.INVALID