"""Layer 2: STUN NAT probe — detect NAT type via RFC 8489 Binding Requests. Sends STUN Binding Requests to multiple public servers from the same socket to determine NAT presence and type (open, cone, symmetric). Looks like normal WebRTC/VoIP traffic to network observers. """ from __future__ import annotations import logging import os import socket import struct import time from dataclasses import dataclass, field log = logging.getLogger(__name__) # STUN constants (RFC 8489) STUN_BINDING_REQUEST = 0x0001 STUN_BINDING_RESPONSE = 0x0101 STUN_MAGIC_COOKIE = 0x2112A442 ATTR_MAPPED_ADDRESS = 0x0001 ATTR_XOR_MAPPED_ADDRESS = 0x0020 # Public STUN servers — chosen for global accessibility including behind GFW DEFAULT_SERVERS = [ "stun.cloudflare.com", "stun.nextcloud.com", "stun.apple.com", "stun.l.google.com", ] @dataclass class StunResult: """Result from a single STUN Binding Request.""" server: str external_ip: str external_port: int rtt_ms: float @dataclass class NatProbeResult: """Aggregated NAT detection result.""" nat_present: bool = False nat_type: str = "unknown" # open, cone, symmetric, unknown external_ip: str | None = None external_port: int | None = None servers_reached: int = 0 error: str | None = None def build_binding_request() -> tuple[bytes, bytes]: """Build a STUN Binding Request packet. Returns (packet, transaction_id) where packet is 20 bytes: - 2 bytes: message type (0x0001) - 2 bytes: message length (0x0000, no attributes) - 4 bytes: magic cookie (0x2112A442) - 12 bytes: random transaction ID """ txn_id = os.urandom(12) packet = struct.pack("!HHI", STUN_BINDING_REQUEST, 0, STUN_MAGIC_COOKIE) packet += txn_id return packet, txn_id def parse_stun_response(data: bytes, txn_id: bytes) -> tuple[str, int] | None: """Parse a STUN Binding Response, extract mapped address. Returns (ip, port) or None on any parse failure. Tries XOR-MAPPED-ADDRESS first, falls back to MAPPED-ADDRESS. """ if len(data) < 20: return None msg_type, msg_len, cookie = struct.unpack("!HHI", data[0:8]) resp_txn_id = data[8:20] if msg_type != STUN_BINDING_RESPONSE: return None if resp_txn_id != txn_id: return None # Parse attributes offset = 20 xor_result = None mapped_result = None while offset + 4 <= len(data): attr_type, attr_len = struct.unpack("!HH", data[offset : offset + 4]) attr_data = data[offset + 4 : offset + 4 + attr_len] if len(attr_data) < attr_len: break if attr_type == ATTR_XOR_MAPPED_ADDRESS and attr_len >= 8: _, family, x_port = struct.unpack("!BBH", attr_data[0:4]) x_addr = struct.unpack("!I", attr_data[4:8])[0] port = x_port ^ (STUN_MAGIC_COOKIE >> 16) addr = x_addr ^ STUN_MAGIC_COOKIE ip = socket.inet_ntoa(struct.pack("!I", addr)) xor_result = (ip, port) elif attr_type == ATTR_MAPPED_ADDRESS and attr_len >= 8: _, family, port = struct.unpack("!BBH", attr_data[0:4]) addr = struct.unpack("!I", attr_data[4:8])[0] ip = socket.inet_ntoa(struct.pack("!I", addr)) mapped_result = (ip, port) # Advance to next attribute (padded to 4-byte boundary) offset += 4 + ((attr_len + 3) & ~3) return xor_result or mapped_result def stun_request( server: str, port: int = 3478, timeout: float = 3.0, sock: socket.socket | None = None, ) -> StunResult | None: """Send a STUN Binding Request and parse the response. If sock is provided, uses it (for NAT type detection with shared socket). Returns StunResult or None on timeout/failure. """ packet, txn_id = build_binding_request() own_sock = sock is None try: if own_sock: sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) assert sock is not None start = time.monotonic() sock.sendto(packet, (server, port)) data, _ = sock.recvfrom(1024) rtt = (time.monotonic() - start) * 1000 result = parse_stun_response(data, txn_id) if result is None: return None ip, ext_port = result return StunResult(server=server, external_ip=ip, external_port=ext_port, rtt_ms=rtt) except (socket.timeout, OSError) as e: log.debug("STUN request to %s:%d failed: %s", server, port, e) return None finally: if own_sock and sock is not None: sock.close() def detect_nat_type( servers: list[str] | None = None, port: int = 3478, timeout: float = 3.0, ) -> NatProbeResult: """Detect NAT type by querying multiple STUN servers from one socket. Logic: - 0 responses → error - 1 response → nat_present=True, nat_type="unknown" - 2+ responses, external == local → nat_present=False, nat_type="open" - 2+ responses, same external mapping → cone NAT - 2+ responses, different mappings → symmetric NAT """ if servers is None: servers = DEFAULT_SERVERS[:2] sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) sock.settimeout(timeout) try: # Bind to any port so we can compare local vs external sock.bind(("0.0.0.0", 0)) local_ip, local_port = sock.getsockname() results: list[StunResult] = [] for server in servers: r = stun_request(server, port, timeout, sock=sock) if r is not None: results.append(r) if not results: return NatProbeResult(error="No STUN servers responded") first = results[0] probe = NatProbeResult( external_ip=first.external_ip, external_port=first.external_port, servers_reached=len(results), ) if len(results) < 2: probe.nat_present = True probe.nat_type = "unknown" return probe # Compare local vs external address if first.external_ip == local_ip and first.external_port == local_port: probe.nat_present = False probe.nat_type = "open" return probe # NAT is present — check if mapping is consistent across servers probe.nat_present = True all_same = all( r.external_ip == first.external_ip and r.external_port == first.external_port for r in results ) probe.nat_type = "cone" if all_same else "symmetric" return probe finally: sock.close()