A Python port of the Invisible Internet Project (I2P)
at main 224 lines 6.7 kB view raw
1"""Layer 2: STUN NAT probe — detect NAT type via RFC 8489 Binding Requests. 2 3Sends STUN Binding Requests to multiple public servers from the same socket 4to determine NAT presence and type (open, cone, symmetric). 5Looks like normal WebRTC/VoIP traffic to network observers. 6""" 7 8from __future__ import annotations 9 10import logging 11import os 12import socket 13import struct 14import time 15from dataclasses import dataclass, field 16 17log = logging.getLogger(__name__) 18 19# STUN constants (RFC 8489) 20STUN_BINDING_REQUEST = 0x0001 21STUN_BINDING_RESPONSE = 0x0101 22STUN_MAGIC_COOKIE = 0x2112A442 23 24ATTR_MAPPED_ADDRESS = 0x0001 25ATTR_XOR_MAPPED_ADDRESS = 0x0020 26 27# Public STUN servers — chosen for global accessibility including behind GFW 28DEFAULT_SERVERS = [ 29 "stun.cloudflare.com", 30 "stun.nextcloud.com", 31 "stun.apple.com", 32 "stun.l.google.com", 33] 34 35 36@dataclass 37class StunResult: 38 """Result from a single STUN Binding Request.""" 39 40 server: str 41 external_ip: str 42 external_port: int 43 rtt_ms: float 44 45 46@dataclass 47class NatProbeResult: 48 """Aggregated NAT detection result.""" 49 50 nat_present: bool = False 51 nat_type: str = "unknown" # open, cone, symmetric, unknown 52 external_ip: str | None = None 53 external_port: int | None = None 54 servers_reached: int = 0 55 error: str | None = None 56 57 58def build_binding_request() -> tuple[bytes, bytes]: 59 """Build a STUN Binding Request packet. 60 61 Returns (packet, transaction_id) where packet is 20 bytes: 62 - 2 bytes: message type (0x0001) 63 - 2 bytes: message length (0x0000, no attributes) 64 - 4 bytes: magic cookie (0x2112A442) 65 - 12 bytes: random transaction ID 66 """ 67 txn_id = os.urandom(12) 68 packet = struct.pack("!HHI", STUN_BINDING_REQUEST, 0, STUN_MAGIC_COOKIE) 69 packet += txn_id 70 return packet, txn_id 71 72 73def parse_stun_response(data: bytes, txn_id: bytes) -> tuple[str, int] | None: 74 """Parse a STUN Binding Response, extract mapped address. 75 76 Returns (ip, port) or None on any parse failure. 77 Tries XOR-MAPPED-ADDRESS first, falls back to MAPPED-ADDRESS. 78 """ 79 if len(data) < 20: 80 return None 81 82 msg_type, msg_len, cookie = struct.unpack("!HHI", data[0:8]) 83 resp_txn_id = data[8:20] 84 85 if msg_type != STUN_BINDING_RESPONSE: 86 return None 87 if resp_txn_id != txn_id: 88 return None 89 90 # Parse attributes 91 offset = 20 92 xor_result = None 93 mapped_result = None 94 95 while offset + 4 <= len(data): 96 attr_type, attr_len = struct.unpack("!HH", data[offset : offset + 4]) 97 attr_data = data[offset + 4 : offset + 4 + attr_len] 98 99 if len(attr_data) < attr_len: 100 break 101 102 if attr_type == ATTR_XOR_MAPPED_ADDRESS and attr_len >= 8: 103 _, family, x_port = struct.unpack("!BBH", attr_data[0:4]) 104 x_addr = struct.unpack("!I", attr_data[4:8])[0] 105 port = x_port ^ (STUN_MAGIC_COOKIE >> 16) 106 addr = x_addr ^ STUN_MAGIC_COOKIE 107 ip = socket.inet_ntoa(struct.pack("!I", addr)) 108 xor_result = (ip, port) 109 110 elif attr_type == ATTR_MAPPED_ADDRESS and attr_len >= 8: 111 _, family, port = struct.unpack("!BBH", attr_data[0:4]) 112 addr = struct.unpack("!I", attr_data[4:8])[0] 113 ip = socket.inet_ntoa(struct.pack("!I", addr)) 114 mapped_result = (ip, port) 115 116 # Advance to next attribute (padded to 4-byte boundary) 117 offset += 4 + ((attr_len + 3) & ~3) 118 119 return xor_result or mapped_result 120 121 122def stun_request( 123 server: str, 124 port: int = 3478, 125 timeout: float = 3.0, 126 sock: socket.socket | None = None, 127) -> StunResult | None: 128 """Send a STUN Binding Request and parse the response. 129 130 If sock is provided, uses it (for NAT type detection with shared socket). 131 Returns StunResult or None on timeout/failure. 132 """ 133 packet, txn_id = build_binding_request() 134 own_sock = sock is None 135 136 try: 137 if own_sock: 138 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 139 sock.settimeout(timeout) 140 141 assert sock is not None 142 start = time.monotonic() 143 sock.sendto(packet, (server, port)) 144 data, _ = sock.recvfrom(1024) 145 rtt = (time.monotonic() - start) * 1000 146 147 result = parse_stun_response(data, txn_id) 148 if result is None: 149 return None 150 151 ip, ext_port = result 152 return StunResult(server=server, external_ip=ip, external_port=ext_port, rtt_ms=rtt) 153 154 except (socket.timeout, OSError) as e: 155 log.debug("STUN request to %s:%d failed: %s", server, port, e) 156 return None 157 finally: 158 if own_sock and sock is not None: 159 sock.close() 160 161 162def detect_nat_type( 163 servers: list[str] | None = None, 164 port: int = 3478, 165 timeout: float = 3.0, 166) -> NatProbeResult: 167 """Detect NAT type by querying multiple STUN servers from one socket. 168 169 Logic: 170 - 0 responses → error 171 - 1 response → nat_present=True, nat_type="unknown" 172 - 2+ responses, external == local → nat_present=False, nat_type="open" 173 - 2+ responses, same external mapping → cone NAT 174 - 2+ responses, different mappings → symmetric NAT 175 """ 176 if servers is None: 177 servers = DEFAULT_SERVERS[:2] 178 179 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 180 sock.settimeout(timeout) 181 182 try: 183 # Bind to any port so we can compare local vs external 184 sock.bind(("0.0.0.0", 0)) 185 local_ip, local_port = sock.getsockname() 186 187 results: list[StunResult] = [] 188 for server in servers: 189 r = stun_request(server, port, timeout, sock=sock) 190 if r is not None: 191 results.append(r) 192 193 if not results: 194 return NatProbeResult(error="No STUN servers responded") 195 196 first = results[0] 197 probe = NatProbeResult( 198 external_ip=first.external_ip, 199 external_port=first.external_port, 200 servers_reached=len(results), 201 ) 202 203 if len(results) < 2: 204 probe.nat_present = True 205 probe.nat_type = "unknown" 206 return probe 207 208 # Compare local vs external address 209 if first.external_ip == local_ip and first.external_port == local_port: 210 probe.nat_present = False 211 probe.nat_type = "open" 212 return probe 213 214 # NAT is present — check if mapping is consistent across servers 215 probe.nat_present = True 216 all_same = all( 217 r.external_ip == first.external_ip and r.external_port == first.external_port 218 for r in results 219 ) 220 probe.nat_type = "cone" if all_same else "symmetric" 221 return probe 222 223 finally: 224 sock.close()