A Python port of the Invisible Internet Project (I2P)
at main 507 lines 18 kB view raw
1"""SSU2 asyncio UDP server. 2 3Ported from: 4- net.i2p.router.transport.udp.UDPTransport 5- net.i2p.router.transport.udp.EstablishmentManager 6- net.i2p.router.transport.udp.PeerTestManager 7- net.i2p.router.transport.udp.IntroductionManager 8 9Provides: 10- SSU2Transport: asyncio DatagramProtocol, implements Transport interface 11- EstablishmentManager: dispatches handshake packets to correct state machine 12- PeerStateMap: tracks established SSU2 connections by peer/address/conn_id 13- PeerTestManager: three-party NAT detection (Alice/Bob/Charlie) 14- RelayManager: introduction/relay for firewalled peers 15""" 16 17from __future__ import annotations 18 19import asyncio 20import enum 21import logging 22import os 23import struct 24import time 25from dataclasses import dataclass, field 26 27from i2p_crypto.x25519 import X25519DH 28from i2p_transport.transport_base import ( 29 Transport, 30 TransportBid, 31 TransportStyle, 32 ReachabilityStatus, 33) 34from i2p_transport.ssu2_handshake import ( 35 HandshakeKeys, 36 TokenManager, 37 OutboundHandshake, 38 InboundHandshake, 39 LONG_HEADER_SIZE, 40 SHORT_HEADER_SIZE, 41 PKT_TOKEN_REQUEST, 42 PKT_SESSION_REQUEST, 43 PKT_DATA, 44) 45from i2p_transport.ssu2_connection import SSU2Connection 46 47logger = logging.getLogger(__name__) 48 49 50# --------------------------------------------------------------------------- 51# Packet classification 52# --------------------------------------------------------------------------- 53 54class PacketClass(enum.Enum): 55 HANDSHAKE = "handshake" 56 DATA = "data" 57 INVALID = "invalid" 58 59 60def classify_packet(packet: bytes) -> PacketClass: 61 """Classify an incoming SSU2 packet. 62 63 - Packets shorter than SHORT_HEADER_SIZE are invalid. 64 - Packets >= LONG_HEADER_SIZE with handshake-type bytes are HANDSHAKE. 65 - Otherwise DATA. 66 """ 67 if len(packet) < SHORT_HEADER_SIZE: 68 return PacketClass.INVALID 69 if len(packet) >= LONG_HEADER_SIZE: 70 # Check type byte at offset 12 (after dest_conn_id(8) + pkt_num(4)) 71 pkt_type = packet[12] 72 if pkt_type in (PKT_TOKEN_REQUEST, PKT_SESSION_REQUEST, 1, 9): 73 return PacketClass.HANDSHAKE 74 return PacketClass.DATA 75 76 77# --------------------------------------------------------------------------- 78# PeerStateMap — tracks established connections 79# --------------------------------------------------------------------------- 80 81class PeerStateMap: 82 """Tracks established SSU2 connections by peer hash, address, and conn ID.""" 83 84 def __init__(self) -> None: 85 self._by_peer: dict[bytes, SSU2Connection] = {} 86 self._by_address: dict[tuple[str, int], SSU2Connection] = {} 87 self._by_conn_id: dict[int, SSU2Connection] = {} 88 89 def add(self, peer_hash: bytes, conn: SSU2Connection, 90 address: tuple[str, int]) -> None: 91 self._by_peer[peer_hash] = conn 92 self._by_address[address] = conn 93 self._by_conn_id[conn.src_conn_id] = conn 94 95 def get_by_peer(self, peer_hash: bytes) -> SSU2Connection | None: 96 return self._by_peer.get(peer_hash) 97 98 def get_by_address(self, address: tuple[str, int]) -> SSU2Connection | None: 99 return self._by_address.get(address) 100 101 def get_by_conn_id(self, conn_id: int) -> SSU2Connection | None: 102 return self._by_conn_id.get(conn_id) 103 104 def remove(self, peer_hash: bytes) -> None: 105 conn = self._by_peer.pop(peer_hash, None) 106 if conn is not None: 107 self._by_address.pop(conn.remote_address, None) 108 self._by_conn_id.pop(conn.src_conn_id, None) 109 110 @property 111 def active_count(self) -> int: 112 return len(self._by_peer) 113 114 def all_peers(self) -> list[bytes]: 115 return list(self._by_peer.keys()) 116 117 118# --------------------------------------------------------------------------- 119# EstablishmentManager — dispatches handshake packets 120# --------------------------------------------------------------------------- 121 122class EstablishmentManager: 123 """Manages pending SSU2 handshakes. 124 125 Creates InboundHandshake (responder) or OutboundHandshake (initiator) 126 instances and tracks them by connection ID until they complete. 127 """ 128 129 def __init__(self, local_static_key: bytes, local_intro_key: bytes, 130 token_manager: TokenManager) -> None: 131 self._local_static = local_static_key 132 self._local_intro_key = local_intro_key 133 self._token_manager = token_manager 134 self._pending: dict[int, InboundHandshake | OutboundHandshake] = {} 135 136 def create_inbound_handshake(self) -> InboundHandshake: 137 return InboundHandshake( 138 local_static_key=self._local_static, 139 local_intro_key=self._local_intro_key, 140 token_manager=self._token_manager, 141 ) 142 143 def create_outbound_handshake(self, remote_static_key: bytes, 144 remote_intro_key: bytes, 145 token: int | None = None) -> OutboundHandshake: 146 return OutboundHandshake( 147 local_static_key=self._local_static, 148 remote_static_key=remote_static_key, 149 remote_intro_key=remote_intro_key, 150 token=token, 151 ) 152 153 def add_pending(self, conn_id: int, 154 hs: InboundHandshake | OutboundHandshake) -> None: 155 self._pending[conn_id] = hs 156 157 def get_pending(self, conn_id: int) -> InboundHandshake | OutboundHandshake | None: 158 return self._pending.get(conn_id) 159 160 def remove_pending(self, conn_id: int) -> None: 161 self._pending.pop(conn_id, None) 162 163 @property 164 def pending_count(self) -> int: 165 return len(self._pending) 166 167 168# --------------------------------------------------------------------------- 169# Peer test protocol — three-party NAT detection 170# --------------------------------------------------------------------------- 171 172class PeerTestRole(enum.Enum): 173 ALICE = "alice" # Initiator: wants to know if reachable 174 BOB = "bob" # Relay: forwards test from Alice to Charlie 175 CHARLIE = "charlie" # Tester: sends probe to Alice 176 177 178class PeerTestManager: 179 """Manages SSU2 peer tests for NAT detection. 180 181 Protocol: 182 1. Alice asks Bob to test her reachability 183 2. Bob relays request to Charlie 184 3. Charlie sends probe directly to Alice 185 4. Alice reports result back through Bob 186 """ 187 188 def __init__(self) -> None: 189 self._pending_tests: dict[int, dict] = {} 190 191 def create_test_request(self) -> tuple[int, bytes]: 192 """Create a peer test request (Alice role). 193 194 Returns (nonce, serialized_message). 195 """ 196 nonce = int.from_bytes(os.urandom(4), "big") | 1 # Ensure non-zero 197 msg = struct.pack("!IB", nonce, PeerTestRole.ALICE.value.encode()[0]) 198 self._pending_tests[nonce] = { 199 "role": PeerTestRole.ALICE, 200 "created_at": time.monotonic(), 201 "nonce": nonce, 202 } 203 return nonce, msg 204 205 def get_pending_test(self, nonce: int) -> dict | None: 206 return self._pending_tests.get(nonce) 207 208 def process_test_response(self, nonce: int, result_code: int, 209 ip: bytes, port: int) -> dict | None: 210 """Process a peer test response. 211 212 result_code 0 = reachable, non-zero = unreachable or error. 213 Returns result dict or None if nonce unknown. 214 """ 215 pending = self._pending_tests.pop(nonce, None) 216 if pending is None: 217 return None 218 return { 219 "nonce": nonce, 220 "reachable": result_code == 0, 221 "result_code": result_code, 222 "ip": ip, 223 "port": port, 224 } 225 226 def create_relay_to_charlie(self, nonce: int, alice_ip: bytes, 227 alice_port: int) -> bytes: 228 """Create a relay message from Bob to Charlie. 229 230 Bob received a test request from Alice and relays it to Charlie. 231 """ 232 msg = struct.pack("!I", nonce) 233 msg += struct.pack("!H", len(alice_ip)) + alice_ip 234 msg += struct.pack("!H", alice_port) 235 return msg 236 237 def cleanup_stale(self, max_age_seconds: float = 30.0) -> int: 238 """Remove stale pending tests.""" 239 now = time.monotonic() 240 stale = [n for n, t in self._pending_tests.items() 241 if (now - t["created_at"]) >= max_age_seconds] 242 for n in stale: 243 del self._pending_tests[n] 244 return len(stale) 245 246 247# --------------------------------------------------------------------------- 248# Introduction/relay protocol 249# --------------------------------------------------------------------------- 250 251class RelayManager: 252 """Manages relay tags and introduction for firewalled peers. 253 254 Firewalled peers cannot receive direct connections. They register 255 a relay tag with an introducer. When someone wants to connect, 256 they send a RelayRequest to the introducer, who forwards it to 257 the target via the existing session. 258 """ 259 260 def __init__(self) -> None: 261 self._relay_tags: dict[int, bytes] = {} # tag -> peer_hash 262 self._next_tag = 1 263 self._pending_relays: dict[int, dict] = {} # nonce -> request info 264 265 def assign_relay_tag(self, peer_hash: bytes) -> int: 266 """Assign a relay tag to a peer.""" 267 tag = self._next_tag 268 self._next_tag += 1 269 self._relay_tags[tag] = peer_hash 270 return tag 271 272 def get_peer_for_tag(self, tag: int) -> bytes | None: 273 return self._relay_tags.get(tag) 274 275 def remove_relay_tag(self, tag: int) -> None: 276 self._relay_tags.pop(tag, None) 277 278 def create_relay_request(self, relay_tag: int, 279 target_hash: bytes) -> tuple[int, bytes]: 280 """Create a relay request to send to an introducer. 281 282 Returns (nonce, serialized_message). 283 """ 284 nonce = int.from_bytes(os.urandom(4), "big") | 1 285 msg = struct.pack("!II", nonce, relay_tag) + target_hash 286 self._pending_relays[nonce] = { 287 "relay_tag": relay_tag, 288 "target_hash": target_hash, 289 "created_at": time.monotonic(), 290 } 291 return nonce, msg 292 293 def process_relay_intro(self, nonce: int, requester_ip: bytes, 294 requester_port: int, 295 target_hash: bytes) -> dict: 296 """Process a relay intro as the introducer. 297 298 The introducer forwards the connection request to the target 299 and returns info for the response. 300 """ 301 return { 302 "nonce": nonce, 303 "requester_ip": requester_ip, 304 "requester_port": requester_port, 305 "target_hash": target_hash, 306 "action": "forward_to_target", 307 } 308 309 def process_relay_response(self, nonce: int, result_code: int, 310 target_ip: bytes, target_port: int) -> dict | None: 311 """Process a relay response. 312 313 result_code 0 = success (target accepted the introduction). 314 """ 315 pending = self._pending_relays.pop(nonce, None) 316 if pending is None: 317 return None 318 return { 319 "nonce": nonce, 320 "success": result_code == 0, 321 "result_code": result_code, 322 "target_ip": target_ip, 323 "target_port": target_port, 324 } 325 326 327# --------------------------------------------------------------------------- 328# SSU2 DatagramProtocol 329# --------------------------------------------------------------------------- 330 331class SSU2DatagramProtocol(asyncio.DatagramProtocol): 332 """asyncio DatagramProtocol for SSU2 UDP packets. 333 334 Receives datagrams, classifies them (handshake vs data), 335 and dispatches to the appropriate handler. 336 """ 337 338 def __init__(self, transport_ref: SSU2Transport) -> None: 339 self._transport_ref = transport_ref 340 self._udp_transport: asyncio.DatagramTransport | None = None 341 342 def connection_made(self, transport: asyncio.BaseTransport) -> None: 343 self._udp_transport = transport # type: ignore[assignment] 344 345 def datagram_received(self, data: bytes, addr: tuple[str, int]) -> None: 346 pkt_class = classify_packet(data) 347 if pkt_class == PacketClass.INVALID: 348 logger.debug("Dropping invalid packet from %s (too small)", addr) 349 return 350 if pkt_class == PacketClass.HANDSHAKE: 351 self._transport_ref._handle_handshake_packet(data, addr) 352 else: 353 self._transport_ref._handle_data_packet(data, addr) 354 355 def error_received(self, exc: Exception) -> None: 356 logger.warning("UDP error: %s", exc) 357 358 def connection_lost(self, exc: Exception | None) -> None: 359 logger.info("UDP transport closed") 360 361 def send_to(self, data: bytes, addr: tuple[str, int]) -> None: 362 if self._udp_transport is not None: 363 self._udp_transport.sendto(data, addr) 364 365 366# --------------------------------------------------------------------------- 367# SSU2Transport — main transport implementation 368# --------------------------------------------------------------------------- 369 370class SSU2Transport(Transport): 371 """SSU2 UDP transport implementing the Transport interface. 372 373 Manages: 374 - UDP socket via asyncio DatagramProtocol 375 - Peer state map for established connections 376 - EstablishmentManager for handshake dispatch 377 - PeerTestManager for NAT detection 378 - RelayManager for firewalled peer introductions 379 """ 380 381 def __init__(self, host: str, port: int, 382 static_key: bytes, intro_key: bytes) -> None: 383 self._host = host 384 self._port = port 385 self._static_key = static_key 386 self._static_pub = X25519DH.public_from_private(static_key) 387 self._intro_key = intro_key 388 389 self._token_manager = TokenManager() 390 self._establishment = EstablishmentManager( 391 local_static_key=static_key, 392 local_intro_key=intro_key, 393 token_manager=self._token_manager, 394 ) 395 self._peers = PeerStateMap() 396 self._peer_test = PeerTestManager() 397 self._relay = RelayManager() 398 399 self._protocol: SSU2DatagramProtocol | None = None 400 self._udp_transport: asyncio.DatagramTransport | None = None 401 self._running = False 402 self._actual_port: int = 0 403 self._reachability = ReachabilityStatus.UNKNOWN 404 405 # ------------------------------------------------------------------ 406 # Transport interface 407 # ------------------------------------------------------------------ 408 409 @property 410 def style(self) -> TransportStyle: 411 return TransportStyle.SSU2 412 413 async def start(self) -> None: 414 loop = asyncio.get_running_loop() 415 transport, protocol = await loop.create_datagram_endpoint( 416 lambda: SSU2DatagramProtocol(self), 417 local_addr=(self._host, self._port), 418 ) 419 self._udp_transport = transport 420 self._protocol = protocol 421 422 # Resolve actual port (if 0 was passed) 423 sock = transport.get_extra_info("socket") 424 if sock is not None: 425 self._actual_port = sock.getsockname()[1] 426 else: 427 self._actual_port = self._port 428 429 self._running = True 430 logger.info("SSU2 transport started on %s:%d", self._host, self._actual_port) 431 432 async def stop(self) -> None: 433 if self._udp_transport is not None: 434 self._udp_transport.close() 435 self._udp_transport = None 436 self._protocol = None 437 self._running = False 438 logger.info("SSU2 transport stopped") 439 440 @property 441 def is_running(self) -> bool: 442 return self._running 443 444 async def bid(self, peer_hash: bytes) -> TransportBid: 445 conn = self._peers.get_by_peer(peer_hash) 446 if conn is not None and conn.is_established: 447 return TransportBid(latency_ms=50, transport=self, preference=1) 448 return TransportBid( 449 latency_ms=TransportBid.WILL_NOT_SEND, 450 transport=self, preference=100, 451 ) 452 453 async def send(self, peer_hash: bytes, data: bytes) -> bool: 454 conn = self._peers.get_by_peer(peer_hash) 455 if conn is None or not conn.is_established: 456 return False 457 from i2p_transport.ssu2_payload import I2NPBlock 458 blocks = [I2NPBlock(i2np_data=data)] 459 packet = conn.encrypt_data_packet(blocks) 460 if self._protocol is not None: 461 self._protocol.send_to(packet, conn.remote_address) 462 return True 463 return False 464 465 @property 466 def reachability(self) -> ReachabilityStatus: 467 return self._reachability 468 469 @property 470 def current_address(self) -> dict | None: 471 if not self._running: 472 return None 473 return { 474 "style": "SSU2", 475 "host": self._host, 476 "port": self._actual_port, 477 "intro_key": self._intro_key.hex(), 478 } 479 480 # ------------------------------------------------------------------ 481 # Packet handlers 482 # ------------------------------------------------------------------ 483 484 def _handle_handshake_packet(self, data: bytes, addr: tuple[str, int]) -> None: 485 """Dispatch a handshake packet to the EstablishmentManager.""" 486 logger.debug("Handshake packet from %s (%d bytes)", addr, len(data)) 487 # Parse dest_conn_id to find pending handshake 488 dest_conn_id = struct.unpack("!Q", data[:8])[0] 489 hs = self._establishment.get_pending(dest_conn_id) 490 if hs is None: 491 # New inbound handshake 492 hs = self._establishment.create_inbound_handshake() 493 self._establishment.add_pending(hs._src_conn_id, hs) 494 # Further processing would continue the Noise_XK state machine 495 496 def _handle_data_packet(self, data: bytes, addr: tuple[str, int]) -> None: 497 """Dispatch a data packet to the appropriate SSU2Connection.""" 498 conn = self._peers.get_by_address(addr) 499 if conn is None: 500 logger.debug("Data packet from unknown address %s, dropping", addr) 501 return 502 try: 503 pkt_num, blocks = conn.decrypt_data_packet(data) 504 logger.debug("Received data packet #%d with %d blocks from %s", 505 pkt_num, len(blocks), addr) 506 except Exception: 507 logger.debug("Failed to decrypt data packet from %s", addr, exc_info=True)