A Python port of the Invisible Internet Project (I2P)
1"""Tests for SSU2 asyncio UDP server.
2
3Tests cover:
4- UDP server lifecycle (start/stop)
5- Packet dispatch (handshake vs data)
6- Peer state management
7- Peer test protocol (Alice/Bob/Charlie roles)
8- Introduction/relay protocol for firewalled peers
9- Integration with TransportManager
10"""
11
12import asyncio
13import os
14import struct
15import time
16
17import pytest
18
19from i2p_crypto.x25519 import X25519DH
20from i2p_transport.ssu2_handshake import (
21 HandshakeKeys,
22 TokenManager,
23 OutboundHandshake,
24 InboundHandshake,
25 LONG_HEADER_SIZE,
26 SHORT_HEADER_SIZE,
27 PKT_TOKEN_REQUEST,
28 PKT_SESSION_REQUEST,
29 PKT_DATA,
30 PROTOCOL_VERSION,
31 NETWORK_ID,
32 _build_long_header,
33)
34from i2p_transport.ssu2_connection import SSU2Connection
35from i2p_transport.ssu2_payload import PaddingBlock, build_payload
36from i2p_transport.transport_base import TransportStyle, ReachabilityStatus
37
38
39# ---------------------------------------------------------------------------
40# Helpers
41# ---------------------------------------------------------------------------
42
43def _make_keys() -> HandshakeKeys:
44 """Create dummy handshake keys for testing."""
45 return HandshakeKeys(
46 send_cipher_key=os.urandom(32),
47 recv_cipher_key=os.urandom(32),
48 send_header_key=os.urandom(32),
49 recv_header_key=os.urandom(32),
50 )
51
52
53def _make_keypair() -> tuple[bytes, bytes]:
54 """Generate an X25519 keypair."""
55 return X25519DH.generate_keypair()
56
57
58# ---------------------------------------------------------------------------
59# SSU2Transport tests
60# ---------------------------------------------------------------------------
61
62class TestSSU2Transport:
63 """Test the SSU2Transport UDP server."""
64
65 @pytest.fixture
66 def static_keypair(self):
67 return _make_keypair()
68
69 @pytest.fixture
70 def intro_key(self):
71 return os.urandom(32)
72
73 def test_transport_style(self, static_keypair, intro_key):
74 from i2p_transport.ssu2_server import SSU2Transport
75 t = SSU2Transport(
76 host="127.0.0.1", port=0,
77 static_key=static_keypair[0],
78 intro_key=intro_key,
79 )
80 assert t.style == TransportStyle.SSU2
81
82 def test_initial_state(self, static_keypair, intro_key):
83 from i2p_transport.ssu2_server import SSU2Transport
84 t = SSU2Transport(
85 host="127.0.0.1", port=0,
86 static_key=static_keypair[0],
87 intro_key=intro_key,
88 )
89 assert not t.is_running
90 assert t.reachability == ReachabilityStatus.UNKNOWN
91 assert t.current_address is None
92
93 def test_start_stop(self, static_keypair, intro_key):
94 async def _run():
95 from i2p_transport.ssu2_server import SSU2Transport
96 t = SSU2Transport(
97 host="127.0.0.1", port=0,
98 static_key=static_keypair[0],
99 intro_key=intro_key,
100 )
101 await t.start()
102 assert t.is_running
103 addr = t.current_address
104 assert addr is not None
105 assert addr["host"] == "127.0.0.1"
106 assert addr["port"] > 0
107 assert addr["style"] == "SSU2"
108 await t.stop()
109 assert not t.is_running
110 asyncio.run(_run())
111
112 def test_bid_no_connection(self, static_keypair, intro_key):
113 """Bid returns WILL_NOT_SEND when no connection to peer."""
114 async def _run():
115 from i2p_transport.ssu2_server import SSU2Transport
116 t = SSU2Transport(
117 host="127.0.0.1", port=0,
118 static_key=static_keypair[0],
119 intro_key=intro_key,
120 )
121 await t.start()
122 bid = await t.bid(os.urandom(32))
123 from i2p_transport.transport_base import TransportBid
124 assert bid.latency_ms == TransportBid.WILL_NOT_SEND
125 await t.stop()
126 asyncio.run(_run())
127
128 def test_send_without_connection_fails(self, static_keypair, intro_key):
129 """Send returns False when no connection exists."""
130 async def _run():
131 from i2p_transport.ssu2_server import SSU2Transport
132 t = SSU2Transport(
133 host="127.0.0.1", port=0,
134 static_key=static_keypair[0],
135 intro_key=intro_key,
136 )
137 await t.start()
138 ok = await t.send(os.urandom(32), b"hello")
139 assert ok is False
140 await t.stop()
141 asyncio.run(_run())
142
143
144# ---------------------------------------------------------------------------
145# EstablishmentManager tests
146# ---------------------------------------------------------------------------
147
148class TestEstablishmentManager:
149 """Test the SSU2 handshake dispatch manager."""
150
151 def test_create_inbound_handshake(self):
152 from i2p_transport.ssu2_server import EstablishmentManager
153 priv, pub = _make_keypair()
154 intro_key = os.urandom(32)
155 tm = TokenManager()
156 em = EstablishmentManager(
157 local_static_key=priv,
158 local_intro_key=intro_key,
159 token_manager=tm,
160 )
161 hs = em.create_inbound_handshake()
162 assert hs is not None
163
164 def test_create_outbound_handshake(self):
165 from i2p_transport.ssu2_server import EstablishmentManager
166 priv, pub = _make_keypair()
167 intro_key = os.urandom(32)
168 tm = TokenManager()
169 em = EstablishmentManager(
170 local_static_key=priv,
171 local_intro_key=intro_key,
172 token_manager=tm,
173 )
174 remote_priv, remote_pub = _make_keypair()
175 remote_intro = os.urandom(32)
176 hs = em.create_outbound_handshake(remote_pub, remote_intro)
177 assert hs is not None
178
179 def test_track_pending(self):
180 from i2p_transport.ssu2_server import EstablishmentManager
181 priv, pub = _make_keypair()
182 intro_key = os.urandom(32)
183 tm = TokenManager()
184 em = EstablishmentManager(
185 local_static_key=priv,
186 local_intro_key=intro_key,
187 token_manager=tm,
188 )
189 hs = em.create_inbound_handshake()
190 conn_id = hs._src_conn_id
191 em.add_pending(conn_id, hs)
192 assert em.get_pending(conn_id) is hs
193 em.remove_pending(conn_id)
194 assert em.get_pending(conn_id) is None
195
196
197# ---------------------------------------------------------------------------
198# PeerStateMap tests
199# ---------------------------------------------------------------------------
200
201class TestPeerStateMap:
202 """Test the peer connection state tracking."""
203
204 def test_add_and_get(self):
205 from i2p_transport.ssu2_server import PeerStateMap
206 psm = PeerStateMap()
207 keys = _make_keys()
208 conn = SSU2Connection(
209 keys=keys, src_conn_id=1, dest_conn_id=2,
210 remote_address=("127.0.0.1", 5000), is_initiator=True,
211 )
212 peer_hash = os.urandom(32)
213 psm.add(peer_hash, conn, ("127.0.0.1", 5000))
214 assert psm.get_by_peer(peer_hash) is conn
215 assert psm.get_by_address(("127.0.0.1", 5000)) is conn
216
217 def test_get_by_conn_id(self):
218 from i2p_transport.ssu2_server import PeerStateMap
219 psm = PeerStateMap()
220 keys = _make_keys()
221 conn = SSU2Connection(
222 keys=keys, src_conn_id=42, dest_conn_id=99,
223 remote_address=("127.0.0.1", 5000), is_initiator=True,
224 )
225 peer_hash = os.urandom(32)
226 psm.add(peer_hash, conn, ("127.0.0.1", 5000))
227 # Look up by our src_conn_id
228 assert psm.get_by_conn_id(42) is conn
229
230 def test_remove(self):
231 from i2p_transport.ssu2_server import PeerStateMap
232 psm = PeerStateMap()
233 keys = _make_keys()
234 conn = SSU2Connection(
235 keys=keys, src_conn_id=1, dest_conn_id=2,
236 remote_address=("10.0.0.1", 3000), is_initiator=False,
237 )
238 peer_hash = os.urandom(32)
239 psm.add(peer_hash, conn, ("10.0.0.1", 3000))
240 psm.remove(peer_hash)
241 assert psm.get_by_peer(peer_hash) is None
242 assert psm.get_by_address(("10.0.0.1", 3000)) is None
243
244 def test_active_count(self):
245 from i2p_transport.ssu2_server import PeerStateMap
246 psm = PeerStateMap()
247 assert psm.active_count == 0
248 keys = _make_keys()
249 for i in range(3):
250 conn = SSU2Connection(
251 keys=keys, src_conn_id=i, dest_conn_id=i + 100,
252 remote_address=("127.0.0.1", 5000 + i), is_initiator=True,
253 )
254 psm.add(os.urandom(32), conn, ("127.0.0.1", 5000 + i))
255 assert psm.active_count == 3
256
257
258# ---------------------------------------------------------------------------
259# Peer test protocol tests
260# ---------------------------------------------------------------------------
261
262class TestPeerTestProtocol:
263 """Test the three-party NAT detection protocol (Alice/Bob/Charlie)."""
264
265 def test_create_peer_test_request(self):
266 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole
267 ptm = PeerTestManager()
268 nonce, msg = ptm.create_test_request()
269 assert nonce > 0
270 assert len(msg) > 0
271 assert ptm.get_pending_test(nonce) is not None
272
273 def test_peer_test_role_alice(self):
274 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole
275 ptm = PeerTestManager()
276 nonce, msg = ptm.create_test_request()
277 pending = ptm.get_pending_test(nonce)
278 assert pending["role"] == PeerTestRole.ALICE
279
280 def test_process_test_response(self):
281 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole
282 ptm = PeerTestManager()
283 nonce, msg = ptm.create_test_request()
284 # Simulate Charlie's response: code 0 = success, meaning we're reachable
285 result = ptm.process_test_response(nonce, result_code=0, ip=b"\x7f\x00\x00\x01", port=12345)
286 assert result is not None
287 assert result["reachable"] is True
288
289 def test_process_unknown_nonce(self):
290 from i2p_transport.ssu2_server import PeerTestManager
291 ptm = PeerTestManager()
292 result = ptm.process_test_response(99999, result_code=0, ip=b"\x7f\x00\x00\x01", port=12345)
293 assert result is None
294
295 def test_bob_role_relay(self):
296 """Bob relays peer test from Alice to Charlie."""
297 from i2p_transport.ssu2_server import PeerTestManager, PeerTestRole
298 ptm = PeerTestManager()
299 # Simulate receiving a peer test from Alice, acting as Bob
300 relay_msg = ptm.create_relay_to_charlie(
301 nonce=12345,
302 alice_ip=b"\x7f\x00\x00\x01",
303 alice_port=5000,
304 )
305 assert relay_msg is not None
306 assert len(relay_msg) > 0
307
308
309# ---------------------------------------------------------------------------
310# Introduction/relay protocol tests
311# ---------------------------------------------------------------------------
312
313class TestRelayProtocol:
314 """Test relay/introduction protocol for firewalled peers."""
315
316 def test_create_relay_request(self):
317 from i2p_transport.ssu2_server import RelayManager
318 rm = RelayManager()
319 nonce, msg = rm.create_relay_request(
320 relay_tag=42,
321 target_hash=os.urandom(32),
322 )
323 assert nonce > 0
324 assert len(msg) > 0
325
326 def test_process_relay_intro(self):
327 """Introducer receives relay intro and creates response."""
328 from i2p_transport.ssu2_server import RelayManager
329 rm = RelayManager()
330 # Simulate receiving a relay intro — the introducer relays to the target
331 response = rm.process_relay_intro(
332 nonce=42,
333 requester_ip=b"\x0a\x00\x00\x01",
334 requester_port=8000,
335 target_hash=os.urandom(32),
336 )
337 assert response is not None
338
339 def test_relay_tag_management(self):
340 from i2p_transport.ssu2_server import RelayManager
341 rm = RelayManager()
342 peer_hash = os.urandom(32)
343 tag = rm.assign_relay_tag(peer_hash)
344 assert tag > 0
345 assert rm.get_peer_for_tag(tag) == peer_hash
346 rm.remove_relay_tag(tag)
347 assert rm.get_peer_for_tag(tag) is None
348
349 def test_relay_response_success(self):
350 from i2p_transport.ssu2_server import RelayManager
351 rm = RelayManager()
352 nonce, _ = rm.create_relay_request(
353 relay_tag=42,
354 target_hash=os.urandom(32),
355 )
356 result = rm.process_relay_response(
357 nonce=nonce,
358 result_code=0,
359 target_ip=b"\x7f\x00\x00\x01",
360 target_port=9999,
361 )
362 assert result is not None
363 assert result["success"] is True
364
365
366# ---------------------------------------------------------------------------
367# UDP protocol handler tests
368# ---------------------------------------------------------------------------
369
370class TestSSU2Protocol:
371 """Test the asyncio DatagramProtocol handler."""
372
373 def test_classify_packet_long_header(self):
374 from i2p_transport.ssu2_server import classify_packet, PacketClass
375 # Build a long header (32 bytes + some payload)
376 header = _build_long_header(
377 dest_conn_id=1, pkt_num=0, pkt_type=PKT_TOKEN_REQUEST,
378 version=PROTOCOL_VERSION, net_id=NETWORK_ID,
379 src_conn_id=2, token=0,
380 )
381 packet = header + os.urandom(32)
382 cls = classify_packet(packet)
383 assert cls == PacketClass.HANDSHAKE
384
385 def test_classify_packet_short(self):
386 from i2p_transport.ssu2_server import classify_packet, PacketClass
387 # A short (< 32 byte) packet with DATA type
388 header = struct.pack("!QIB", 1, 0, PKT_DATA) + b"\x00\x00\x00"
389 packet = header + os.urandom(64)
390 cls = classify_packet(packet)
391 assert cls == PacketClass.DATA
392
393 def test_classify_too_small(self):
394 from i2p_transport.ssu2_server import classify_packet, PacketClass
395 cls = classify_packet(b"\x00" * 4)
396 assert cls == PacketClass.INVALID