A Python port of the Invisible Internet Project (I2P)
at main 496 lines 17 kB view raw
1"""Tests for SOCKS4a/5 client task — parsing, routing, edge cases.""" 2 3import struct 4 5import pytest 6 7from i2p_apps.i2ptunnel.socks_task import ( 8 SOCKS4_CMD_CONNECT, 9 SOCKS4_REPLY_FAILED, 10 SOCKS4_REPLY_GRANTED, 11 SOCKS4_VERSION, 12 SOCKS4Request, 13 SOCKSClientTask, 14 parse_socks4a_request, 15) 16from i2p_apps.i2ptunnel.socks_proxy import ( 17 ATYP_DOMAIN, 18 ATYP_IPV4, 19 ATYP_IPV6, 20 AUTH_NONE, 21 CMD_CONNECT, 22 REPLY_GENERAL_FAILURE, 23 REPLY_HOST_UNREACHABLE, 24 REPLY_SUCCESS, 25 SOCKS_VERSION, 26 SOCKSGreeting, 27 SOCKSProxy, 28 SOCKSRequest, 29 build_socks_greeting_reply, 30 build_socks_reply, 31 parse_socks_greeting, 32 parse_socks_request, 33) 34 35 36class TestSOCKS4aParsing: 37 def _build_socks4a(self, command, port, domain, userid=b""): 38 """Build a SOCKS4a request with domain name.""" 39 ip = b"\x00\x00\x00\x01" # 0.0.0.1 = SOCKS4a indicator 40 data = bytes([SOCKS4_VERSION, command]) 41 data += struct.pack("!H", port) 42 data += ip 43 data += userid + b"\x00" 44 data += domain.encode() + b"\x00" 45 return data 46 47 def _build_socks4(self, command, port, ip_str, userid=b""): 48 """Build a regular SOCKS4 request with IP.""" 49 import socket 50 data = bytes([SOCKS4_VERSION, command]) 51 data += struct.pack("!H", port) 52 data += socket.inet_aton(ip_str) 53 data += userid + b"\x00" 54 return data 55 56 def test_parse_socks4a_domain(self): 57 data = self._build_socks4a(SOCKS4_CMD_CONNECT, 80, "example.i2p") 58 req = parse_socks4a_request(data) 59 assert req.version == SOCKS4_VERSION 60 assert req.command == SOCKS4_CMD_CONNECT 61 assert req.dest_port == 80 62 assert req.dest_addr == "example.i2p" 63 64 def test_parse_socks4a_b32(self): 65 data = self._build_socks4a(SOCKS4_CMD_CONNECT, 443, "abcd1234.b32.i2p") 66 req = parse_socks4a_request(data) 67 assert req.dest_addr == "abcd1234.b32.i2p" 68 69 def test_parse_socks4_regular_ip(self): 70 data = self._build_socks4(SOCKS4_CMD_CONNECT, 8080, "192.168.1.1") 71 req = parse_socks4a_request(data) 72 assert req.dest_addr == "192.168.1.1" 73 assert req.dest_port == 8080 74 75 def test_parse_socks4a_with_userid(self): 76 data = self._build_socks4a(SOCKS4_CMD_CONNECT, 80, "test.i2p", userid=b"user1") 77 req = parse_socks4a_request(data) 78 assert req.dest_addr == "test.i2p" 79 80 def test_parse_socks4a_too_short(self): 81 with pytest.raises(ValueError, match="too short"): 82 parse_socks4a_request(b"\x04\x01\x00") 83 84 def test_parse_socks4a_wrong_version(self): 85 data = bytes([0x05, SOCKS4_CMD_CONNECT]) + b"\x00\x50" + b"\x00\x00\x00\x01" + b"\x00" + b"x.i2p\x00" 86 with pytest.raises(ValueError, match="Not SOCKS4"): 87 parse_socks4a_request(data) 88 89 def test_socks4_request_dataclass(self): 90 r = SOCKS4Request(version=4, command=1, dest_port=80, dest_addr="test.i2p") 91 assert r.version == 4 92 assert r.command == 1 93 assert r.dest_port == 80 94 assert r.dest_addr == "test.i2p" 95 96 97class TestSOCKS5Parsing: 98 def test_parse_greeting_no_auth(self): 99 data = bytes([SOCKS_VERSION, 1, AUTH_NONE]) 100 g = parse_socks_greeting(data) 101 assert g.version == 5 102 assert AUTH_NONE in g.methods 103 104 def test_parse_greeting_multiple_methods(self): 105 data = bytes([SOCKS_VERSION, 3, 0x00, 0x01, 0x02]) 106 g = parse_socks_greeting(data) 107 assert len(g.methods) == 3 108 109 def test_parse_greeting_too_short(self): 110 with pytest.raises(ValueError): 111 parse_socks_greeting(b"\x05") 112 113 def test_parse_greeting_wrong_version(self): 114 with pytest.raises(ValueError, match="Unsupported"): 115 parse_socks_greeting(b"\x04\x01\x00") 116 117 def test_parse_greeting_truncated(self): 118 with pytest.raises(ValueError, match="truncated"): 119 parse_socks_greeting(bytes([SOCKS_VERSION, 3, 0x00])) # claims 3 methods, only 1 120 121 def test_build_greeting_reply(self): 122 reply = build_socks_greeting_reply(AUTH_NONE) 123 assert reply == bytes([SOCKS_VERSION, AUTH_NONE]) 124 125 def test_build_greeting_reply_no_acceptable(self): 126 reply = build_socks_greeting_reply(0xFF) 127 assert reply == bytes([SOCKS_VERSION, 0xFF]) 128 129 def test_parse_request_domain(self): 130 domain = b"example.i2p" 131 data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) 132 data += domain + struct.pack("!H", 80) 133 req = parse_socks_request(data) 134 assert req.dest_addr == "example.i2p" 135 assert req.dest_port == 80 136 137 def test_parse_request_ipv4(self): 138 import socket 139 data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_IPV4]) 140 data += socket.inet_aton("10.0.0.1") + struct.pack("!H", 443) 141 req = parse_socks_request(data) 142 assert req.dest_addr == "10.0.0.1" 143 assert req.dest_port == 443 144 145 def test_parse_request_ipv6(self): 146 import socket 147 ipv6 = socket.inet_pton(socket.AF_INET6, "::1") 148 data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_IPV6]) 149 data += ipv6 + struct.pack("!H", 8080) 150 req = parse_socks_request(data) 151 assert req.dest_addr == "::1" 152 assert req.dest_port == 8080 153 154 def test_parse_request_too_short(self): 155 with pytest.raises(ValueError): 156 parse_socks_request(b"\x05\x01") 157 158 def test_parse_request_unsupported_atyp(self): 159 with pytest.raises(ValueError, match="Unsupported address type"): 160 parse_socks_request(bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, 0x07, 0x00])) 161 162 def test_build_reply_success(self): 163 reply = build_socks_reply(REPLY_SUCCESS) 164 assert reply[0] == SOCKS_VERSION 165 assert reply[1] == REPLY_SUCCESS 166 assert len(reply) == 10 167 168 def test_build_reply_failure(self): 169 reply = build_socks_reply(REPLY_HOST_UNREACHABLE) 170 assert reply[1] == REPLY_HOST_UNREACHABLE 171 172 173class TestSOCKSClientTaskHelpers: 174 def test_is_i2p_true(self): 175 assert SOCKSClientTask._is_i2p("example.i2p") is True 176 177 def test_is_i2p_b32(self): 178 assert SOCKSClientTask._is_i2p("abcd.b32.i2p") is True 179 180 def test_is_i2p_false(self): 181 assert SOCKSClientTask._is_i2p("example.com") is False 182 183 def test_is_i2p_empty(self): 184 assert SOCKSClientTask._is_i2p("") is False 185 186 187class TestSOCKSProxy: 188 def test_initial_state(self): 189 proxy = SOCKSProxy() 190 assert proxy.is_running is False 191 assert proxy.listen_address == ("127.0.0.1", 4445) 192 193 def test_custom_address(self): 194 proxy = SOCKSProxy("0.0.0.0", 9050) 195 assert proxy.listen_address == ("0.0.0.0", 9050) 196 197 @pytest.mark.asyncio 198 async def test_start_stop(self): 199 proxy = SOCKSProxy("127.0.0.1", 0) 200 await proxy.start() 201 assert proxy.is_running is True 202 await proxy.stop() 203 assert proxy.is_running is False 204 205 @pytest.mark.asyncio 206 async def test_stop_without_start(self): 207 proxy = SOCKSProxy() 208 await proxy.stop() # should not crash 209 assert proxy.is_running is False 210 211 212# === Async handle_client tests === 213 214import asyncio 215from unittest.mock import AsyncMock, MagicMock 216 217 218def _make_mock_writer(): 219 writer = MagicMock() 220 writer.write = MagicMock() 221 writer.drain = AsyncMock() 222 writer.close = MagicMock() 223 writer.wait_closed = AsyncMock() 224 transport = MagicMock() 225 transport.is_closing = MagicMock(return_value=False) 226 writer.transport = transport 227 return writer 228 229 230def _make_socks_task(): 231 config = MagicMock() 232 config.proxy_list = [] 233 session = AsyncMock() 234 return SOCKSClientTask(config, session) 235 236 237class TestSOCKSClientHandleClient: 238 @pytest.mark.asyncio 239 async def test_empty_read_closes(self): 240 task = _make_socks_task() 241 reader = AsyncMock() 242 reader.read = AsyncMock(return_value=b"") 243 writer = _make_mock_writer() 244 await task.handle_client(reader, writer) 245 246 @pytest.mark.asyncio 247 async def test_unknown_version_closes(self): 248 task = _make_socks_task() 249 reader = AsyncMock() 250 reader.read = AsyncMock(return_value=b"\x03") 251 writer = _make_mock_writer() 252 await task.handle_client(reader, writer) 253 writer.close.assert_called() 254 255 @pytest.mark.asyncio 256 async def test_socks5_no_acceptable_auth(self): 257 task = _make_socks_task() 258 # SOCKS5 greeting with only auth method 0x02 (username/password), no 0x00 259 greeting = bytes([SOCKS_VERSION, 1, 0x02]) 260 reader = AsyncMock() 261 reader.read = AsyncMock(side_effect=[bytes([SOCKS_VERSION]), greeting[1:]]) 262 writer = _make_mock_writer() 263 await task.handle_client(reader, writer) 264 # Should reply with 0xFF (no acceptable method) 265 writer.write.assert_called() 266 written = writer.write.call_args_list[0][0][0] 267 assert written == bytes([SOCKS_VERSION, 0xFF]) 268 269 @pytest.mark.asyncio 270 async def test_socks5_udp_associate_rejected(self): 271 task = _make_socks_task() 272 greeting_rest = bytes([1, AUTH_NONE]) # 1 method, no auth 273 domain = b"example.i2p" 274 request = bytes([SOCKS_VERSION, 3, 0x00, ATYP_DOMAIN, len(domain)]) # CMD=3 UDP ASSOCIATE 275 request += domain + struct.pack("!H", 80) 276 277 call_count = [0] 278 async def read_side_effect(n): 279 call_count[0] += 1 280 if call_count[0] == 1: 281 return bytes([SOCKS_VERSION]) # first byte 282 elif call_count[0] == 2: 283 return greeting_rest 284 elif call_count[0] == 3: 285 return request 286 return b"" 287 288 reader = AsyncMock() 289 reader.read = AsyncMock(side_effect=read_side_effect) 290 writer = _make_mock_writer() 291 await task.handle_client(reader, writer) 292 # Should reply with 0x07 (command not supported) 293 calls = writer.write.call_args_list 294 assert any(b"\x07" in c[0][0] for c in calls if len(c[0][0]) > 1) 295 296 @pytest.mark.asyncio 297 async def test_socks5_non_connect_rejected(self): 298 task = _make_socks_task() 299 greeting_rest = bytes([1, AUTH_NONE]) 300 domain = b"example.i2p" 301 request = bytes([SOCKS_VERSION, 0x02, 0x00, ATYP_DOMAIN, len(domain)]) # CMD=2 BIND 302 request += domain + struct.pack("!H", 80) 303 304 call_count = [0] 305 async def read_side_effect(n): 306 call_count[0] += 1 307 if call_count[0] == 1: 308 return bytes([SOCKS_VERSION]) 309 elif call_count[0] == 2: 310 return greeting_rest 311 elif call_count[0] == 3: 312 return request 313 return b"" 314 315 reader = AsyncMock() 316 reader.read = AsyncMock(side_effect=read_side_effect) 317 writer = _make_mock_writer() 318 await task.handle_client(reader, writer) 319 320 @pytest.mark.asyncio 321 async def test_socks5_non_i2p_rejected(self): 322 task = _make_socks_task() 323 greeting_rest = bytes([1, AUTH_NONE]) 324 domain = b"example.com" 325 request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) 326 request += domain + struct.pack("!H", 80) 327 328 call_count = [0] 329 async def read_side_effect(n): 330 call_count[0] += 1 331 if call_count[0] == 1: 332 return bytes([SOCKS_VERSION]) 333 elif call_count[0] == 2: 334 return greeting_rest 335 elif call_count[0] == 3: 336 return request 337 return b"" 338 339 reader = AsyncMock() 340 reader.read = AsyncMock(side_effect=read_side_effect) 341 writer = _make_mock_writer() 342 await task.handle_client(reader, writer) 343 # Should reply HOST_UNREACHABLE for non-i2p 344 calls = writer.write.call_args_list 345 replies = [c[0][0] for c in calls if len(c[0][0]) >= 2] 346 assert any(r[1] == REPLY_HOST_UNREACHABLE for r in replies if r[0] == SOCKS_VERSION) 347 348 @pytest.mark.asyncio 349 async def test_socks5_i2p_resolve_fail(self): 350 task = _make_socks_task() 351 task._session.lookup = AsyncMock(return_value=None) 352 greeting_rest = bytes([1, AUTH_NONE]) 353 domain = b"unknown.i2p" 354 request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) 355 request += domain + struct.pack("!H", 80) 356 357 call_count = [0] 358 async def read_side_effect(n): 359 call_count[0] += 1 360 if call_count[0] == 1: 361 return bytes([SOCKS_VERSION]) 362 elif call_count[0] == 2: 363 return greeting_rest 364 elif call_count[0] == 3: 365 return request 366 return b"" 367 368 reader = AsyncMock() 369 reader.read = AsyncMock(side_effect=read_side_effect) 370 writer = _make_mock_writer() 371 await task.handle_client(reader, writer) 372 373 @pytest.mark.asyncio 374 async def test_socks4_non_connect_rejected(self): 375 task = _make_socks_task() 376 # SOCKS4 with command 2 (BIND) instead of 1 (CONNECT) 377 data = bytes([SOCKS4_VERSION, 2]) # command=2 378 data += struct.pack("!H", 80) 379 data += b"\x00\x00\x00\x01" # SOCKS4a indicator 380 data += b"\x00" # empty userid 381 data += b"test.i2p\x00" 382 383 call_count = [0] 384 async def read_side_effect(n): 385 call_count[0] += 1 386 if call_count[0] == 1: 387 return bytes([SOCKS4_VERSION]) 388 elif call_count[0] == 2: 389 return data[1:] 390 return b"" 391 392 reader = AsyncMock() 393 reader.read = AsyncMock(side_effect=read_side_effect) 394 writer = _make_mock_writer() 395 await task.handle_client(reader, writer) 396 # Should reply with SOCKS4 failure 397 calls = writer.write.call_args_list 398 assert any(SOCKS4_REPLY_FAILED in c[0][0] for c in calls) 399 400 @pytest.mark.asyncio 401 async def test_socks4_non_i2p_rejected(self): 402 task = _make_socks_task() 403 data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT]) 404 data += struct.pack("!H", 80) 405 data += b"\x00\x00\x00\x01" # SOCKS4a indicator 406 data += b"\x00" 407 data += b"example.com\x00" 408 409 call_count = [0] 410 async def read_side_effect(n): 411 call_count[0] += 1 412 if call_count[0] == 1: 413 return bytes([SOCKS4_VERSION]) 414 elif call_count[0] == 2: 415 return data[1:] 416 return b"" 417 418 reader = AsyncMock() 419 reader.read = AsyncMock(side_effect=read_side_effect) 420 writer = _make_mock_writer() 421 await task.handle_client(reader, writer) 422 423 @pytest.mark.asyncio 424 async def test_socks4_i2p_resolve_fail(self): 425 task = _make_socks_task() 426 task._session.lookup = AsyncMock(return_value=None) 427 data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT]) 428 data += struct.pack("!H", 80) 429 data += b"\x00\x00\x00\x01" 430 data += b"\x00" 431 data += b"unknown.i2p\x00" 432 433 call_count = [0] 434 async def read_side_effect(n): 435 call_count[0] += 1 436 if call_count[0] == 1: 437 return bytes([SOCKS4_VERSION]) 438 elif call_count[0] == 2: 439 return data[1:] 440 return b"" 441 442 reader = AsyncMock() 443 reader.read = AsyncMock(side_effect=read_side_effect) 444 writer = _make_mock_writer() 445 await task.handle_client(reader, writer) 446 447 @pytest.mark.asyncio 448 async def test_socks4_connect_exception(self): 449 task = _make_socks_task() 450 task._session.lookup = AsyncMock(return_value=None) 451 task._session.connect = AsyncMock(side_effect=ConnectionRefusedError("test")) 452 # .b32.i2p bypasses lookup 453 data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT]) 454 data += struct.pack("!H", 80) 455 data += b"\x00\x00\x00\x01" 456 data += b"\x00" 457 data += b"abcd.b32.i2p\x00" 458 459 call_count = [0] 460 async def read_side_effect(n): 461 call_count[0] += 1 462 if call_count[0] == 1: 463 return bytes([SOCKS4_VERSION]) 464 elif call_count[0] == 2: 465 return data[1:] 466 return b"" 467 468 reader = AsyncMock() 469 reader.read = AsyncMock(side_effect=read_side_effect) 470 writer = _make_mock_writer() 471 await task.handle_client(reader, writer) 472 473 @pytest.mark.asyncio 474 async def test_socks5_connect_exception(self): 475 task = _make_socks_task() 476 task._session.connect = AsyncMock(side_effect=ConnectionRefusedError("test")) 477 greeting_rest = bytes([1, AUTH_NONE]) 478 domain = b"abcd.b32.i2p" 479 request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) 480 request += domain + struct.pack("!H", 80) 481 482 call_count = [0] 483 async def read_side_effect(n): 484 call_count[0] += 1 485 if call_count[0] == 1: 486 return bytes([SOCKS_VERSION]) 487 elif call_count[0] == 2: 488 return greeting_rest 489 elif call_count[0] == 3: 490 return request 491 return b"" 492 493 reader = AsyncMock() 494 reader.read = AsyncMock(side_effect=read_side_effect) 495 writer = _make_mock_writer() 496 await task.handle_client(reader, writer)