A Python port of the Invisible Internet Project (I2P)
at main 237 lines 9.4 kB view raw
1"""Tests for STUN NAT probe — TDD: tests before implementation.""" 2 3import os 4import socket 5import struct 6from unittest.mock import MagicMock, patch 7 8import pytest 9 10from i2p_apps.setup.stun_probe import ( 11 ATTR_MAPPED_ADDRESS, 12 ATTR_XOR_MAPPED_ADDRESS, 13 STUN_BINDING_REQUEST, 14 STUN_BINDING_RESPONSE, 15 STUN_MAGIC_COOKIE, 16 NatProbeResult, 17 StunResult, 18 build_binding_request, 19 parse_stun_response, 20 stun_request, 21 detect_nat_type, 22) 23 24 25class TestBuildBindingRequest: 26 def test_returns_20_bytes(self): 27 packet, txn_id = build_binding_request() 28 assert len(packet) == 20 29 30 def test_message_type_is_binding_request(self): 31 packet, _ = build_binding_request() 32 msg_type = struct.unpack("!H", packet[0:2])[0] 33 assert msg_type == STUN_BINDING_REQUEST 34 35 def test_message_length_is_zero(self): 36 packet, _ = build_binding_request() 37 msg_len = struct.unpack("!H", packet[2:4])[0] 38 assert msg_len == 0 39 40 def test_magic_cookie(self): 41 packet, _ = build_binding_request() 42 cookie = struct.unpack("!I", packet[4:8])[0] 43 assert cookie == STUN_MAGIC_COOKIE 44 45 def test_transaction_id_is_12_bytes(self): 46 _, txn_id = build_binding_request() 47 assert len(txn_id) == 12 48 49 def test_transaction_id_is_random(self): 50 _, txn1 = build_binding_request() 51 _, txn2 = build_binding_request() 52 assert txn1 != txn2 53 54 55def _build_stun_response(ext_ip: str, ext_port: int, txn_id: bytes, 56 use_xor: bool = True) -> bytes: 57 """Build a mock STUN Binding Response with XOR-MAPPED-ADDRESS.""" 58 ip_bytes = socket.inet_aton(ext_ip) 59 ip_int = struct.unpack("!I", ip_bytes)[0] 60 61 if use_xor: 62 x_port = ext_port ^ (STUN_MAGIC_COOKIE >> 16) 63 x_addr = ip_int ^ STUN_MAGIC_COOKIE 64 attr_type = ATTR_XOR_MAPPED_ADDRESS 65 else: 66 x_port = ext_port 67 x_addr = ip_int 68 attr_type = ATTR_MAPPED_ADDRESS 69 70 # Attribute: type(2) + length(2) + reserved(1) + family(1) + port(2) + addr(4) = 12 bytes 71 attr = struct.pack("!HH", attr_type, 8) 72 attr += struct.pack("!BBH", 0x00, 0x01, x_port) 73 attr += struct.pack("!I", x_addr) 74 75 # Header: type(2) + length(2) + cookie(4) + txn_id(12) 76 header = struct.pack("!HHI", STUN_BINDING_RESPONSE, len(attr), STUN_MAGIC_COOKIE) 77 header += txn_id 78 79 return header + attr 80 81 82class TestParseStunResponse: 83 def test_parse_xor_mapped_address(self): 84 txn_id = os.urandom(12) 85 response = _build_stun_response("198.51.100.30", 54321, txn_id, use_xor=True) 86 result = parse_stun_response(response, txn_id) 87 assert result is not None 88 ip, port = result 89 assert ip == "198.51.100.30" 90 assert port == 54321 91 92 def test_parse_mapped_address_fallback(self): 93 txn_id = os.urandom(12) 94 response = _build_stun_response("10.0.0.1", 1234, txn_id, use_xor=False) 95 result = parse_stun_response(response, txn_id) 96 assert result is not None 97 ip, port = result 98 assert ip == "10.0.0.1" 99 assert port == 1234 100 101 def test_wrong_txn_id_returns_none(self): 102 txn_id = os.urandom(12) 103 wrong_txn = os.urandom(12) 104 response = _build_stun_response("1.2.3.4", 5678, txn_id) 105 result = parse_stun_response(response, wrong_txn) 106 assert result is None 107 108 def test_short_data_returns_none(self): 109 result = parse_stun_response(b"\x00" * 10, os.urandom(12)) 110 assert result is None 111 112 def test_wrong_message_type_returns_none(self): 113 txn_id = os.urandom(12) 114 # Build a response but change type to something else 115 response = _build_stun_response("1.2.3.4", 5678, txn_id) 116 bad_response = struct.pack("!H", 0x0111) + response[2:] 117 result = parse_stun_response(bad_response, txn_id) 118 assert result is None 119 120 121class TestStunRequest: 122 def test_successful_request(self): 123 txn_id = os.urandom(12) 124 mock_response = _build_stun_response("203.0.113.42", 12345, txn_id) 125 126 mock_sock = MagicMock() 127 mock_sock.recvfrom.return_value = (mock_response, ("stun.example.com", 3478)) 128 129 with patch("i2p_apps.setup.stun_probe.build_binding_request", 130 return_value=(b"\x00" * 20, txn_id)): 131 result = stun_request("stun.example.com", 3478, sock=mock_sock) 132 133 assert result is not None 134 assert result.external_ip == "203.0.113.42" 135 assert result.external_port == 12345 136 137 def test_timeout_returns_none(self): 138 mock_sock = MagicMock() 139 mock_sock.recvfrom.side_effect = socket.timeout() 140 141 result = stun_request("stun.example.com", 3478, timeout=0.1, sock=mock_sock) 142 assert result is None 143 144 145class TestDetectNatType: 146 def _mock_stun_request(self, results: dict): 147 """Create a side_effect function that returns results by server.""" 148 def side_effect(server, port=3478, timeout=3.0, sock=None): 149 key = server 150 if key in results: 151 return results[key] 152 return None 153 return side_effect 154 155 def test_no_nat(self): 156 """Same external as local → open internet.""" 157 with patch("i2p_apps.setup.stun_probe.stun_request") as mock: 158 mock.side_effect = self._mock_stun_request({ 159 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "192.168.1.1", 5000, 10.0), 160 "stun.nextcloud.com": StunResult("stun.nextcloud.com", "192.168.1.1", 5000, 12.0), 161 }) 162 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket: 163 mock_sock = MagicMock() 164 mock_sock.getsockname.return_value = ("192.168.1.1", 5000) 165 mock_socket.socket.return_value = mock_sock 166 mock_socket.AF_INET = socket.AF_INET 167 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM 168 169 result = detect_nat_type() 170 assert result.nat_present is False 171 assert result.nat_type == "open" 172 173 def test_cone_nat(self): 174 """Same mapping to both servers → cone NAT.""" 175 with patch("i2p_apps.setup.stun_probe.stun_request") as mock: 176 mock.side_effect = self._mock_stun_request({ 177 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "203.0.113.42", 54321, 10.0), 178 "stun.nextcloud.com": StunResult("stun.nextcloud.com", "203.0.113.42", 54321, 12.0), 179 }) 180 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket: 181 mock_sock = MagicMock() 182 mock_sock.getsockname.return_value = ("192.168.1.100", 5000) 183 mock_socket.socket.return_value = mock_sock 184 mock_socket.AF_INET = socket.AF_INET 185 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM 186 187 result = detect_nat_type() 188 assert result.nat_present is True 189 assert result.nat_type == "cone" 190 191 def test_symmetric_nat(self): 192 """Different mapping per server → symmetric NAT.""" 193 with patch("i2p_apps.setup.stun_probe.stun_request") as mock: 194 mock.side_effect = self._mock_stun_request({ 195 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "203.0.113.42", 54321, 10.0), 196 "stun.nextcloud.com": StunResult("stun.nextcloud.com", "203.0.113.42", 54322, 12.0), 197 }) 198 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket: 199 mock_sock = MagicMock() 200 mock_sock.getsockname.return_value = ("192.168.1.100", 5000) 201 mock_socket.socket.return_value = mock_sock 202 mock_socket.AF_INET = socket.AF_INET 203 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM 204 205 result = detect_nat_type() 206 assert result.nat_present is True 207 assert result.nat_type == "symmetric" 208 209 def test_single_server_only(self): 210 """Only one server responds → unknown NAT type.""" 211 with patch("i2p_apps.setup.stun_probe.stun_request") as mock: 212 mock.side_effect = self._mock_stun_request({ 213 "stun.cloudflare.com": StunResult("stun.cloudflare.com", "203.0.113.42", 54321, 10.0), 214 }) 215 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket: 216 mock_sock = MagicMock() 217 mock_sock.getsockname.return_value = ("192.168.1.100", 5000) 218 mock_socket.socket.return_value = mock_sock 219 mock_socket.AF_INET = socket.AF_INET 220 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM 221 222 result = detect_nat_type() 223 assert result.nat_present is True 224 assert result.nat_type == "unknown" 225 226 def test_no_servers_respond(self): 227 """No servers respond → error.""" 228 with patch("i2p_apps.setup.stun_probe.stun_request", return_value=None): 229 with patch("i2p_apps.setup.stun_probe.socket") as mock_socket: 230 mock_sock = MagicMock() 231 mock_sock.getsockname.return_value = ("192.168.1.100", 5000) 232 mock_socket.socket.return_value = mock_sock 233 mock_socket.AF_INET = socket.AF_INET 234 mock_socket.SOCK_DGRAM = socket.SOCK_DGRAM 235 236 result = detect_nat_type() 237 assert result.error is not None