A Python port of the Invisible Internet Project (I2P)
1"""Tests for STUN NAT probe — TDD: tests before implementation."""
2
3import os
4import socket
5import struct
6from unittest.mock import MagicMock, patch
7
8import pytest
9
10from i2p_apps.setup.stun_probe import (
11 ATTR_MAPPED_ADDRESS,
12 ATTR_XOR_MAPPED_ADDRESS,
13 STUN_BINDING_REQUEST,
14 STUN_BINDING_RESPONSE,
15 STUN_MAGIC_COOKIE,
16 NatProbeResult,
17 StunResult,
18 build_binding_request,
19 parse_stun_response,
20 stun_request,
21 detect_nat_type,
22)
23
24
25class TestBuildBindingRequest:
26 def test_returns_20_bytes(self):
27 packet, txn_id = build_binding_request()
28 assert len(packet) == 20
29
30 def test_message_type_is_binding_request(self):
31 packet, _ = build_binding_request()
32 msg_type = struct.unpack("!H", packet[0:2])[0]
33 assert msg_type == STUN_BINDING_REQUEST
34
35 def test_message_length_is_zero(self):
36 packet, _ = build_binding_request()
37 msg_len = struct.unpack("!H", packet[2:4])[0]
38 assert msg_len == 0
39
40 def test_magic_cookie(self):
41 packet, _ = build_binding_request()
42 cookie = struct.unpack("!I", packet[4:8])[0]
43 assert cookie == STUN_MAGIC_COOKIE
44
45 def test_transaction_id_is_12_bytes(self):
46 _, txn_id = build_binding_request()
47 assert len(txn_id) == 12
48
49 def test_transaction_id_is_random(self):
50 _, txn1 = build_binding_request()
51 _, txn2 = build_binding_request()
52 assert txn1 != txn2
53
54
55def _build_stun_response(ext_ip: str, ext_port: int, txn_id: bytes,
56 use_xor: bool = True) -> bytes:
57 """Build a mock STUN Binding Response with XOR-MAPPED-ADDRESS."""
58 ip_bytes = socket.inet_aton(ext_ip)
59 ip_int = struct.unpack("!I", ip_bytes)[0]
60
61 if use_xor:
62 x_port = ext_port ^ (STUN_MAGIC_COOKIE >> 16)
63 x_addr = ip_int ^ STUN_MAGIC_COOKIE
64 attr_type = ATTR_XOR_MAPPED_ADDRESS
65 else:
66 x_port = ext_port
67 x_addr = ip_int
68 attr_type = ATTR_MAPPED_ADDRESS
69
70 # Attribute: type(2) + length(2) + reserved(1) + family(1) + port(2) + addr(4) = 12 bytes
71 attr = struct.pack("!HH", attr_type, 8)
72 attr += struct.pack("!BBH", 0x00, 0x01, x_port)
73 attr += struct.pack("!I", x_addr)
74
75 # Header: type(2) + length(2) + cookie(4) + txn_id(12)
76 header = struct.pack("!HHI", STUN_BINDING_RESPONSE, len(attr), STUN_MAGIC_COOKIE)
77 header += txn_id
78
79 return header + attr
80
81
82class TestParseStunResponse:
83 def test_parse_xor_mapped_address(self):
84 txn_id = os.urandom(12)
85 response = _build_stun_response("198.51.100.30", 54321, txn_id, use_xor=True)
86 result = parse_stun_response(response, txn_id)
87 assert result is not None
88 ip, port = result
89 assert ip == "198.51.100.30"
90 assert port == 54321
91
92 def test_parse_mapped_address_fallback(self):
93 txn_id = os.urandom(12)
94 response = _build_stun_response("10.0.0.1", 1234, txn_id, use_xor=False)
95 result = parse_stun_response(response, txn_id)
96 assert result is not None
97 ip, port = result
98 assert ip == "10.0.0.1"
99 assert port == 1234
100
101 def test_wrong_txn_id_returns_none(self):
102 txn_id = os.urandom(12)
103 wrong_txn = os.urandom(12)
104 response = _build_stun_response("1.2.3.4", 5678, txn_id)
105 result = parse_stun_response(response, wrong_txn)
106 assert result is None
107
108 def test_short_data_returns_none(self):
109 result = parse_stun_response(b"\x00" * 10, os.urandom(12))
110 assert result is None
111
112 def test_wrong_message_type_returns_none(self):
113 txn_id = os.urandom(12)
114 # Build a response but change type to something else
115 response = _build_stun_response("1.2.3.4", 5678, txn_id)
116 bad_response = struct.pack("!H", 0x0111) + response[2:]
117 result = parse_stun_response(bad_response, txn_id)
118 assert result is None
119
120
121class TestStunRequest:
122 def test_successful_request(self):
123 txn_id = os.urandom(12)
124 mock_response = _build_stun_response("203.0.113.42", 12345, txn_id)
125
126 mock_sock = MagicMock()
127 mock_sock.recvfrom.return_value = (mock_response, ("stun.example.com", 3478))
128
129 with patch("i2p_apps.setup.stun_probe.build_binding_request",
130 return_value=(b"\x00" * 20, txn_id)):
131 result = stun_request("stun.example.com", 3478, sock=mock_sock)
132
133 assert result is not None
134 assert result.external_ip == "203.0.113.42"
135 assert result.external_port == 12345
136
137 def test_timeout_returns_none(self):
138 mock_sock = MagicMock()
139 mock_sock.recvfrom.side_effect = socket.timeout()
140
141 result = stun_request("stun.example.com", 3478, timeout=0.1, sock=mock_sock)
142 assert result is None
143
144
145class TestDetectNatType:
146 def _mock_stun_request(self, results: dict):
147 """Create a side_effect function that returns results by server."""
148 def side_effect(server, port=3478, timeout=3.0, sock=None):
149 key = server
150 if key in results:
151 return results[key]
152 return None
153 return side_effect
154
155 def test_no_nat(self):
156 """Same external as local → open internet."""
157 with patch("i2p_apps.setup.stun_probe.stun_request") as mock:
158 mock.side_effect = self._mock_stun_request({
159 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "192.168.1.1", 5000, 10.0),
160 "stun.nextcloud.com": StunResult("stun.nextcloud.com", "192.168.1.1", 5000, 12.0),
161 })
162 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket:
163 mock_sock = MagicMock()
164 mock_sock.getsockname.return_value = ("192.168.1.1", 5000)
165 mock_socket.socket.return_value = mock_sock
166 mock_socket.AF_INET = socket.AF_INET
167 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM
168
169 result = detect_nat_type()
170 assert result.nat_present is False
171 assert result.nat_type == "open"
172
173 def test_cone_nat(self):
174 """Same mapping to both servers → cone NAT."""
175 with patch("i2p_apps.setup.stun_probe.stun_request") as mock:
176 mock.side_effect = self._mock_stun_request({
177 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "203.0.113.42", 54321, 10.0),
178 "stun.nextcloud.com": StunResult("stun.nextcloud.com", "203.0.113.42", 54321, 12.0),
179 })
180 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket:
181 mock_sock = MagicMock()
182 mock_sock.getsockname.return_value = ("192.168.1.100", 5000)
183 mock_socket.socket.return_value = mock_sock
184 mock_socket.AF_INET = socket.AF_INET
185 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM
186
187 result = detect_nat_type()
188 assert result.nat_present is True
189 assert result.nat_type == "cone"
190
191 def test_symmetric_nat(self):
192 """Different mapping per server → symmetric NAT."""
193 with patch("i2p_apps.setup.stun_probe.stun_request") as mock:
194 mock.side_effect = self._mock_stun_request({
195 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "203.0.113.42", 54321, 10.0),
196 "stun.nextcloud.com": StunResult("stun.nextcloud.com", "203.0.113.42", 54322, 12.0),
197 })
198 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket:
199 mock_sock = MagicMock()
200 mock_sock.getsockname.return_value = ("192.168.1.100", 5000)
201 mock_socket.socket.return_value = mock_sock
202 mock_socket.AF_INET = socket.AF_INET
203 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM
204
205 result = detect_nat_type()
206 assert result.nat_present is True
207 assert result.nat_type == "symmetric"
208
209 def test_single_server_only(self):
210 """Only one server responds → unknown NAT type."""
211 with patch("i2p_apps.setup.stun_probe.stun_request") as mock:
212 mock.side_effect = self._mock_stun_request({
213 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "203.0.113.42", 54321, 10.0),
214 })
215 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket:
216 mock_sock = MagicMock()
217 mock_sock.getsockname.return_value = ("192.168.1.100", 5000)
218 mock_socket.socket.return_value = mock_sock
219 mock_socket.AF_INET = socket.AF_INET
220 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM
221
222 result = detect_nat_type()
223 assert result.nat_present is True
224 assert result.nat_type == "unknown"
225
226 def test_no_servers_respond(self):
227 """No servers respond → error."""
228 with patch("i2p_apps.setup.stun_probe.stun_request", return_value=None):
229 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket:
230 mock_sock = MagicMock()
231 mock_sock.getsockname.return_value = ("192.168.1.100", 5000)
232 mock_socket.socket.return_value = mock_sock
233 mock_socket.AF_INET = socket.AF_INET
234 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM
235
236 result = detect_nat_type()
237 assert result.error is not None