A Python port of the Invisible Internet Project (I2P)
1"""Layer 2: STUN NAT probe — detect NAT type via RFC 8489 Binding Requests.
2
3Sends STUN Binding Requests to multiple public servers from the same socket
4to determine NAT presence and type (open, cone, symmetric).
5Looks like normal WebRTC/VoIP traffic to network observers.
6"""
7
8from __future__ import annotations
9
10import logging
11import os
12import socket
13import struct
14import time
15from dataclasses import dataclass, field
16
17log = logging.getLogger(__name__)
18
19# STUN constants (RFC 8489)
20STUN_BINDING_REQUEST = 0x0001
21STUN_BINDING_RESPONSE = 0x0101
22STUN_MAGIC_COOKIE = 0x2112A442
23
24ATTR_MAPPED_ADDRESS = 0x0001
25ATTR_XOR_MAPPED_ADDRESS = 0x0020
26
27# Public STUN servers — chosen for global accessibility including behind GFW
28DEFAULT_SERVERS = [
29 "stun.cloudflare.com",
30 "stun.nextcloud.com",
31 "stun.apple.com",
32 "stun.l.google.com",
33]
34
35
36@dataclass
37class StunResult:
38 """Result from a single STUN Binding Request."""
39
40 server: str
41 external_ip: str
42 external_port: int
43 rtt_ms: float
44
45
46@dataclass
47class NatProbeResult:
48 """Aggregated NAT detection result."""
49
50 nat_present: bool = False
51 nat_type: str = "unknown" # open, cone, symmetric, unknown
52 external_ip: str | None = None
53 external_port: int | None = None
54 servers_reached: int = 0
55 error: str | None = None
56
57
58def build_binding_request() -> tuple[bytes, bytes]:
59 """Build a STUN Binding Request packet.
60
61 Returns (packet, transaction_id) where packet is 20 bytes:
62 - 2 bytes: message type (0x0001)
63 - 2 bytes: message length (0x0000, no attributes)
64 - 4 bytes: magic cookie (0x2112A442)
65 - 12 bytes: random transaction ID
66 """
67 txn_id = os.urandom(12)
68 packet = struct.pack("!HHI", STUN_BINDING_REQUEST, 0, STUN_MAGIC_COOKIE)
69 packet += txn_id
70 return packet, txn_id
71
72
73def parse_stun_response(data: bytes, txn_id: bytes) -> tuple[str, int] | None:
74 """Parse a STUN Binding Response, extract mapped address.
75
76 Returns (ip, port) or None on any parse failure.
77 Tries XOR-MAPPED-ADDRESS first, falls back to MAPPED-ADDRESS.
78 """
79 if len(data) < 20:
80 return None
81
82 msg_type, msg_len, cookie = struct.unpack("!HHI", data[0:8])
83 resp_txn_id = data[8:20]
84
85 if msg_type != STUN_BINDING_RESPONSE:
86 return None
87 if resp_txn_id != txn_id:
88 return None
89
90 # Parse attributes
91 offset = 20
92 xor_result = None
93 mapped_result = None
94
95 while offset + 4 <= len(data):
96 attr_type, attr_len = struct.unpack("!HH", data[offset : offset + 4])
97 attr_data = data[offset + 4 : offset + 4 + attr_len]
98
99 if len(attr_data) < attr_len:
100 break
101
102 if attr_type == ATTR_XOR_MAPPED_ADDRESS and attr_len >= 8:
103 _, family, x_port = struct.unpack("!BBH", attr_data[0:4])
104 x_addr = struct.unpack("!I", attr_data[4:8])[0]
105 port = x_port ^ (STUN_MAGIC_COOKIE >> 16)
106 addr = x_addr ^ STUN_MAGIC_COOKIE
107 ip = socket.inet_ntoa(struct.pack("!I", addr))
108 xor_result = (ip, port)
109
110 elif attr_type == ATTR_MAPPED_ADDRESS and attr_len >= 8:
111 _, family, port = struct.unpack("!BBH", attr_data[0:4])
112 addr = struct.unpack("!I", attr_data[4:8])[0]
113 ip = socket.inet_ntoa(struct.pack("!I", addr))
114 mapped_result = (ip, port)
115
116 # Advance to next attribute (padded to 4-byte boundary)
117 offset += 4 + ((attr_len + 3) & ~3)
118
119 return xor_result or mapped_result
120
121
122def stun_request(
123 server: str,
124 port: int = 3478,
125 timeout: float = 3.0,
126 sock: socket.socket | None = None,
127) -> StunResult | None:
128 """Send a STUN Binding Request and parse the response.
129
130 If sock is provided, uses it (for NAT type detection with shared socket).
131 Returns StunResult or None on timeout/failure.
132 """
133 packet, txn_id = build_binding_request()
134 own_sock = sock is None
135
136 try:
137 if own_sock:
138 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
139 sock.settimeout(timeout)
140
141 assert sock is not None
142 start = time.monotonic()
143 sock.sendto(packet, (server, port))
144 data, _ = sock.recvfrom(1024)
145 rtt = (time.monotonic() - start) * 1000
146
147 result = parse_stun_response(data, txn_id)
148 if result is None:
149 return None
150
151 ip, ext_port = result
152 return StunResult(server=server, external_ip=ip, external_port=ext_port, rtt_ms=rtt)
153
154 except (socket.timeout, OSError) as e:
155 log.debug("STUN request to %s:%d failed: %s", server, port, e)
156 return None
157 finally:
158 if own_sock and sock is not None:
159 sock.close()
160
161
162def detect_nat_type(
163 servers: list[str] | None = None,
164 port: int = 3478,
165 timeout: float = 3.0,
166) -> NatProbeResult:
167 """Detect NAT type by querying multiple STUN servers from one socket.
168
169 Logic:
170 - 0 responses → error
171 - 1 response → nat_present=True, nat_type="unknown"
172 - 2+ responses, external == local → nat_present=False, nat_type="open"
173 - 2+ responses, same external mapping → cone NAT
174 - 2+ responses, different mappings → symmetric NAT
175 """
176 if servers is None:
177 servers = DEFAULT_SERVERS[:2]
178
179 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
180 sock.settimeout(timeout)
181
182 try:
183 # Bind to any port so we can compare local vs external
184 sock.bind(("0.0.0.0", 0))
185 local_ip, local_port = sock.getsockname()
186
187 results: list[StunResult] = []
188 for server in servers:
189 r = stun_request(server, port, timeout, sock=sock)
190 if r is not None:
191 results.append(r)
192
193 if not results:
194 return NatProbeResult(error="No STUN servers responded")
195
196 first = results[0]
197 probe = NatProbeResult(
198 external_ip=first.external_ip,
199 external_port=first.external_port,
200 servers_reached=len(results),
201 )
202
203 if len(results) < 2:
204 probe.nat_present = True
205 probe.nat_type = "unknown"
206 return probe
207
208 # Compare local vs external address
209 if first.external_ip == local_ip and first.external_port == local_port:
210 probe.nat_present = False
211 probe.nat_type = "open"
212 return probe
213
214 # NAT is present — check if mapping is consistent across servers
215 probe.nat_present = True
216 all_same = all(
217 r.external_ip == first.external_ip and r.external_port == first.external_port
218 for r in results
219 )
220 probe.nat_type = "cone" if all_same else "symmetric"
221 return probe
222
223 finally:
224 sock.close()