A Python port of the Invisible Internet Project (I2P)
1"""SSU2 asyncio UDP server.
2
3Ported from:
4- net.i2p.router.transport.udp.UDPTransport
5- net.i2p.router.transport.udp.EstablishmentManager
6- net.i2p.router.transport.udp.PeerTestManager
7- net.i2p.router.transport.udp.IntroductionManager
8
9Provides:
10- SSU2Transport: asyncio DatagramProtocol, implements Transport interface
11- EstablishmentManager: dispatches handshake packets to correct state machine
12- PeerStateMap: tracks established SSU2 connections by peer/address/conn_id
13- PeerTestManager: three-party NAT detection (Alice/Bob/Charlie)
14- RelayManager: introduction/relay for firewalled peers
15"""
16
17from __future__ import annotations
18
19import asyncio
20import enum
21import logging
22import os
23import struct
24import time
25from dataclasses import dataclass, field
26
27from i2p_crypto.x25519 import X25519DH
28from i2p_transport.transport_base import (
29 Transport,
30 TransportBid,
31 TransportStyle,
32 ReachabilityStatus,
33)
34from i2p_transport.ssu2_handshake import (
35 HandshakeKeys,
36 TokenManager,
37 OutboundHandshake,
38 InboundHandshake,
39 LONG_HEADER_SIZE,
40 SHORT_HEADER_SIZE,
41 PKT_TOKEN_REQUEST,
42 PKT_SESSION_REQUEST,
43 PKT_DATA,
44)
45from i2p_transport.ssu2_connection import SSU2Connection
46
47logger = logging.getLogger(__name__)
48
49
50# ---------------------------------------------------------------------------
51# Packet classification
52# ---------------------------------------------------------------------------
53
54class PacketClass(enum.Enum):
55 HANDSHAKE = "handshake"
56 DATA = "data"
57 INVALID = "invalid"
58
59
60def classify_packet(packet: bytes) -> PacketClass:
61 """Classify an incoming SSU2 packet.
62
63 - Packets shorter than SHORT_HEADER_SIZE are invalid.
64 - Packets >= LONG_HEADER_SIZE with handshake-type bytes are HANDSHAKE.
65 - Otherwise DATA.
66 """
67 if len(packet) < SHORT_HEADER_SIZE:
68 return PacketClass.INVALID
69 if len(packet) >= LONG_HEADER_SIZE:
70 # Check type byte at offset 12 (after dest_conn_id(8) + pkt_num(4))
71 pkt_type = packet[12]
72 if pkt_type in (PKT_TOKEN_REQUEST, PKT_SESSION_REQUEST, 1, 9):
73 return PacketClass.HANDSHAKE
74 return PacketClass.DATA
75
76
77# ---------------------------------------------------------------------------
78# PeerStateMap — tracks established connections
79# ---------------------------------------------------------------------------
80
81class PeerStateMap:
82 """Tracks established SSU2 connections by peer hash, address, and conn ID."""
83
84 def __init__(self) -> None:
85 self._by_peer: dict[bytes, SSU2Connection] = {}
86 self._by_address: dict[tuple[str, int], SSU2Connection] = {}
87 self._by_conn_id: dict[int, SSU2Connection] = {}
88
89 def add(self, peer_hash: bytes, conn: SSU2Connection,
90 address: tuple[str, int]) -> None:
91 self._by_peer[peer_hash] = conn
92 self._by_address[address] = conn
93 self._by_conn_id[conn.src_conn_id] = conn
94
95 def get_by_peer(self, peer_hash: bytes) -> SSU2Connection | None:
96 return self._by_peer.get(peer_hash)
97
98 def get_by_address(self, address: tuple[str, int]) -> SSU2Connection | None:
99 return self._by_address.get(address)
100
101 def get_by_conn_id(self, conn_id: int) -> SSU2Connection | None:
102 return self._by_conn_id.get(conn_id)
103
104 def remove(self, peer_hash: bytes) -> None:
105 conn = self._by_peer.pop(peer_hash, None)
106 if conn is not None:
107 self._by_address.pop(conn.remote_address, None)
108 self._by_conn_id.pop(conn.src_conn_id, None)
109
110 @property
111 def active_count(self) -> int:
112 return len(self._by_peer)
113
114 def all_peers(self) -> list[bytes]:
115 return list(self._by_peer.keys())
116
117
118# ---------------------------------------------------------------------------
119# EstablishmentManager — dispatches handshake packets
120# ---------------------------------------------------------------------------
121
122class EstablishmentManager:
123 """Manages pending SSU2 handshakes.
124
125 Creates InboundHandshake (responder) or OutboundHandshake (initiator)
126 instances and tracks them by connection ID until they complete.
127 """
128
129 def __init__(self, local_static_key: bytes, local_intro_key: bytes,
130 token_manager: TokenManager) -> None:
131 self._local_static = local_static_key
132 self._local_intro_key = local_intro_key
133 self._token_manager = token_manager
134 self._pending: dict[int, InboundHandshake | OutboundHandshake] = {}
135
136 def create_inbound_handshake(self) -> InboundHandshake:
137 return InboundHandshake(
138 local_static_key=self._local_static,
139 local_intro_key=self._local_intro_key,
140 token_manager=self._token_manager,
141 )
142
143 def create_outbound_handshake(self, remote_static_key: bytes,
144 remote_intro_key: bytes,
145 token: int | None = None) -> OutboundHandshake:
146 return OutboundHandshake(
147 local_static_key=self._local_static,
148 remote_static_key=remote_static_key,
149 remote_intro_key=remote_intro_key,
150 token=token,
151 )
152
153 def add_pending(self, conn_id: int,
154 hs: InboundHandshake | OutboundHandshake) -> None:
155 self._pending[conn_id] = hs
156
157 def get_pending(self, conn_id: int) -> InboundHandshake | OutboundHandshake | None:
158 return self._pending.get(conn_id)
159
160 def remove_pending(self, conn_id: int) -> None:
161 self._pending.pop(conn_id, None)
162
163 @property
164 def pending_count(self) -> int:
165 return len(self._pending)
166
167
168# ---------------------------------------------------------------------------
169# Peer test protocol — three-party NAT detection
170# ---------------------------------------------------------------------------
171
172class PeerTestRole(enum.Enum):
173 ALICE = "alice" # Initiator: wants to know if reachable
174 BOB = "bob" # Relay: forwards test from Alice to Charlie
175 CHARLIE = "charlie" # Tester: sends probe to Alice
176
177
178class PeerTestManager:
179 """Manages SSU2 peer tests for NAT detection.
180
181 Protocol:
182 1. Alice asks Bob to test her reachability
183 2. Bob relays request to Charlie
184 3. Charlie sends probe directly to Alice
185 4. Alice reports result back through Bob
186 """
187
188 def __init__(self) -> None:
189 self._pending_tests: dict[int, dict] = {}
190
191 def create_test_request(self) -> tuple[int, bytes]:
192 """Create a peer test request (Alice role).
193
194 Returns (nonce, serialized_message).
195 """
196 nonce = int.from_bytes(os.urandom(4), "big") | 1 # Ensure non-zero
197 msg = struct.pack("!IB", nonce, PeerTestRole.ALICE.value.encode()[0])
198 self._pending_tests[nonce] = {
199 "role": PeerTestRole.ALICE,
200 "created_at": time.monotonic(),
201 "nonce": nonce,
202 }
203 return nonce, msg
204
205 def get_pending_test(self, nonce: int) -> dict | None:
206 return self._pending_tests.get(nonce)
207
208 def process_test_response(self, nonce: int, result_code: int,
209 ip: bytes, port: int) -> dict | None:
210 """Process a peer test response.
211
212 result_code 0 = reachable, non-zero = unreachable or error.
213 Returns result dict or None if nonce unknown.
214 """
215 pending = self._pending_tests.pop(nonce, None)
216 if pending is None:
217 return None
218 return {
219 "nonce": nonce,
220 "reachable": result_code == 0,
221 "result_code": result_code,
222 "ip": ip,
223 "port": port,
224 }
225
226 def create_relay_to_charlie(self, nonce: int, alice_ip: bytes,
227 alice_port: int) -> bytes:
228 """Create a relay message from Bob to Charlie.
229
230 Bob received a test request from Alice and relays it to Charlie.
231 """
232 msg = struct.pack("!I", nonce)
233 msg += struct.pack("!H", len(alice_ip)) + alice_ip
234 msg += struct.pack("!H", alice_port)
235 return msg
236
237 def cleanup_stale(self, max_age_seconds: float = 30.0) -> int:
238 """Remove stale pending tests."""
239 now = time.monotonic()
240 stale = [n for n, t in self._pending_tests.items()
241 if (now - t["created_at"]) >= max_age_seconds]
242 for n in stale:
243 del self._pending_tests[n]
244 return len(stale)
245
246
247# ---------------------------------------------------------------------------
248# Introduction/relay protocol
249# ---------------------------------------------------------------------------
250
251class RelayManager:
252 """Manages relay tags and introduction for firewalled peers.
253
254 Firewalled peers cannot receive direct connections. They register
255 a relay tag with an introducer. When someone wants to connect,
256 they send a RelayRequest to the introducer, who forwards it to
257 the target via the existing session.
258 """
259
260 def __init__(self) -> None:
261 self._relay_tags: dict[int, bytes] = {} # tag -> peer_hash
262 self._next_tag = 1
263 self._pending_relays: dict[int, dict] = {} # nonce -> request info
264
265 def assign_relay_tag(self, peer_hash: bytes) -> int:
266 """Assign a relay tag to a peer."""
267 tag = self._next_tag
268 self._next_tag += 1
269 self._relay_tags[tag] = peer_hash
270 return tag
271
272 def get_peer_for_tag(self, tag: int) -> bytes | None:
273 return self._relay_tags.get(tag)
274
275 def remove_relay_tag(self, tag: int) -> None:
276 self._relay_tags.pop(tag, None)
277
278 def create_relay_request(self, relay_tag: int,
279 target_hash: bytes) -> tuple[int, bytes]:
280 """Create a relay request to send to an introducer.
281
282 Returns (nonce, serialized_message).
283 """
284 nonce = int.from_bytes(os.urandom(4), "big") | 1
285 msg = struct.pack("!II", nonce, relay_tag) + target_hash
286 self._pending_relays[nonce] = {
287 "relay_tag": relay_tag,
288 "target_hash": target_hash,
289 "created_at": time.monotonic(),
290 }
291 return nonce, msg
292
293 def process_relay_intro(self, nonce: int, requester_ip: bytes,
294 requester_port: int,
295 target_hash: bytes) -> dict:
296 """Process a relay intro as the introducer.
297
298 The introducer forwards the connection request to the target
299 and returns info for the response.
300 """
301 return {
302 "nonce": nonce,
303 "requester_ip": requester_ip,
304 "requester_port": requester_port,
305 "target_hash": target_hash,
306 "action": "forward_to_target",
307 }
308
309 def process_relay_response(self, nonce: int, result_code: int,
310 target_ip: bytes, target_port: int) -> dict | None:
311 """Process a relay response.
312
313 result_code 0 = success (target accepted the introduction).
314 """
315 pending = self._pending_relays.pop(nonce, None)
316 if pending is None:
317 return None
318 return {
319 "nonce": nonce,
320 "success": result_code == 0,
321 "result_code": result_code,
322 "target_ip": target_ip,
323 "target_port": target_port,
324 }
325
326
327# ---------------------------------------------------------------------------
328# SSU2 DatagramProtocol
329# ---------------------------------------------------------------------------
330
331class SSU2DatagramProtocol(asyncio.DatagramProtocol):
332 """asyncio DatagramProtocol for SSU2 UDP packets.
333
334 Receives datagrams, classifies them (handshake vs data),
335 and dispatches to the appropriate handler.
336 """
337
338 def __init__(self, transport_ref: SSU2Transport) -> None:
339 self._transport_ref = transport_ref
340 self._udp_transport: asyncio.DatagramTransport | None = None
341
342 def connection_made(self, transport: asyncio.BaseTransport) -> None:
343 self._udp_transport = transport # type: ignore[assignment]
344
345 def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None:
346 pkt_class = classify_packet(data)
347 if pkt_class == PacketClass.INVALID:
348 logger.debug("Dropping invalid packet from %s (too small)", addr)
349 return
350 if pkt_class == PacketClass.HANDSHAKE:
351 self._transport_ref._handle_handshake_packet(data, addr)
352 else:
353 self._transport_ref._handle_data_packet(data, addr)
354
355 def error_received(self, exc: Exception) -> None:
356 logger.warning("UDP error: %s", exc)
357
358 def connection_lost(self, exc: Exception | None) -> None:
359 logger.info("UDP transport closed")
360
361 def send_to(self, data: bytes, addr: tuple[str, int]) -> None:
362 if self._udp_transport is not None:
363 self._udp_transport.sendto(data, addr)
364
365
366# ---------------------------------------------------------------------------
367# SSU2Transport — main transport implementation
368# ---------------------------------------------------------------------------
369
370class SSU2Transport(Transport):
371 """SSU2 UDP transport implementing the Transport interface.
372
373 Manages:
374 - UDP socket via asyncio DatagramProtocol
375 - Peer state map for established connections
376 - EstablishmentManager for handshake dispatch
377 - PeerTestManager for NAT detection
378 - RelayManager for firewalled peer introductions
379 """
380
381 def __init__(self, host: str, port: int,
382 static_key: bytes, intro_key: bytes) -> None:
383 self._host = host
384 self._port = port
385 self._static_key = static_key
386 self._static_pub = X25519DH.public_from_private(static_key)
387 self._intro_key = intro_key
388
389 self._token_manager = TokenManager()
390 self._establishment = EstablishmentManager(
391 local_static_key=static_key,
392 local_intro_key=intro_key,
393 token_manager=self._token_manager,
394 )
395 self._peers = PeerStateMap()
396 self._peer_test = PeerTestManager()
397 self._relay = RelayManager()
398
399 self._protocol: SSU2DatagramProtocol | None = None
400 self._udp_transport: asyncio.DatagramTransport | None = None
401 self._running = False
402 self._actual_port: int = 0
403 self._reachability = ReachabilityStatus.UNKNOWN
404
405 # ------------------------------------------------------------------
406 # Transport interface
407 # ------------------------------------------------------------------
408
409 @property
410 def style(self) -> TransportStyle:
411 return TransportStyle.SSU2
412
413 async def start(self) -> None:
414 loop = asyncio.get_running_loop()
415 transport, protocol = await loop.create_datagram_endpoint(
416 lambda: SSU2DatagramProtocol(self),
417 local_addr=(self._host, self._port),
418 )
419 self._udp_transport = transport
420 self._protocol = protocol
421
422 # Resolve actual port (if 0 was passed)
423 sock = transport.get_extra_info("socket")
424 if sock is not None:
425 self._actual_port = sock.getsockname()[1]
426 else:
427 self._actual_port = self._port
428
429 self._running = True
430 logger.info("SSU2 transport started on %s:%d", self._host, self._actual_port)
431
432 async def stop(self) -> None:
433 if self._udp_transport is not None:
434 self._udp_transport.close()
435 self._udp_transport = None
436 self._protocol = None
437 self._running = False
438 logger.info("SSU2 transport stopped")
439
440 @property
441 def is_running(self) -> bool:
442 return self._running
443
444 async def bid(self, peer_hash: bytes) -> TransportBid:
445 conn = self._peers.get_by_peer(peer_hash)
446 if conn is not None and conn.is_established:
447 return TransportBid(latency_ms=50, transport=self, preference=1)
448 return TransportBid(
449 latency_ms=TransportBid.WILL_NOT_SEND,
450 transport=self, preference=100,
451 )
452
453 async def send(self, peer_hash: bytes, data: bytes) -> bool:
454 conn = self._peers.get_by_peer(peer_hash)
455 if conn is None or not conn.is_established:
456 return False
457 from i2p_transport.ssu2_payload import I2NPBlock
458 blocks = [I2NPBlock(i2np_data=data)]
459 packet = conn.encrypt_data_packet(blocks)
460 if self._protocol is not None:
461 self._protocol.send_to(packet, conn.remote_address)
462 return True
463 return False
464
465 @property
466 def reachability(self) -> ReachabilityStatus:
467 return self._reachability
468
469 @property
470 def current_address(self) -> dict | None:
471 if not self._running:
472 return None
473 return {
474 "style": "SSU2",
475 "host": self._host,
476 "port": self._actual_port,
477 "intro_key": self._intro_key.hex(),
478 }
479
480 # ------------------------------------------------------------------
481 # Packet handlers
482 # ------------------------------------------------------------------
483
484 def _handle_handshake_packet(self, data: bytes, addr: tuple[str, int]) -> None:
485 """Dispatch a handshake packet to the EstablishmentManager."""
486 logger.debug("Handshake packet from %s (%d bytes)", addr, len(data))
487 # Parse dest_conn_id to find pending handshake
488 dest_conn_id = struct.unpack("!Q", data[:8])[0]
489 hs = self._establishment.get_pending(dest_conn_id)
490 if hs is None:
491 # New inbound handshake
492 hs = self._establishment.create_inbound_handshake()
493 self._establishment.add_pending(hs._src_conn_id, hs)
494 # Further processing would continue the Noise_XK state machine
495
496 def _handle_data_packet(self, data: bytes, addr: tuple[str, int]) -> None:
497 """Dispatch a data packet to the appropriate SSU2Connection."""
498 conn = self._peers.get_by_address(addr)
499 if conn is None:
500 logger.debug("Data packet from unknown address %s, dropping", addr)
501 return
502 try:
503 pkt_num, blocks = conn.decrypt_data_packet(data)
504 logger.debug("Received data packet #%d with %d blocks from %s",
505 pkt_num, len(blocks), addr)
506 except Exception:
507 logger.debug("Failed to decrypt data packet from %s", addr, exc_info=True)