"""Tests for SOCKS4a/5 client task — parsing, routing, edge cases.""" import struct import pytest from i2p_apps.i2ptunnel.socks_task import ( SOCKS4_CMD_CONNECT, SOCKS4_REPLY_FAILED, SOCKS4_REPLY_GRANTED, SOCKS4_VERSION, SOCKS4Request, SOCKSClientTask, parse_socks4a_request, ) from i2p_apps.i2ptunnel.socks_proxy import ( ATYP_DOMAIN, ATYP_IPV4, ATYP_IPV6, AUTH_NONE, CMD_CONNECT, REPLY_GENERAL_FAILURE, REPLY_HOST_UNREACHABLE, REPLY_SUCCESS, SOCKS_VERSION, SOCKSGreeting, SOCKSProxy, SOCKSRequest, build_socks_greeting_reply, build_socks_reply, parse_socks_greeting, parse_socks_request, ) class TestSOCKS4aParsing: def _build_socks4a(self, command, port, domain, userid=b""): """Build a SOCKS4a request with domain name.""" ip = b"\x00\x00\x00\x01" # 0.0.0.1 = SOCKS4a indicator data = bytes([SOCKS4_VERSION, command]) data += struct.pack("!H", port) data += ip data += userid + b"\x00" data += domain.encode() + b"\x00" return data def _build_socks4(self, command, port, ip_str, userid=b""): """Build a regular SOCKS4 request with IP.""" import socket data = bytes([SOCKS4_VERSION, command]) data += struct.pack("!H", port) data += socket.inet_aton(ip_str) data += userid + b"\x00" return data def test_parse_socks4a_domain(self): data = self._build_socks4a(SOCKS4_CMD_CONNECT, 80, "example.i2p") req = parse_socks4a_request(data) assert req.version == SOCKS4_VERSION assert req.command == SOCKS4_CMD_CONNECT assert req.dest_port == 80 assert req.dest_addr == "example.i2p" def test_parse_socks4a_b32(self): data = self._build_socks4a(SOCKS4_CMD_CONNECT, 443, "abcd1234.b32.i2p") req = parse_socks4a_request(data) assert req.dest_addr == "abcd1234.b32.i2p" def test_parse_socks4_regular_ip(self): data = self._build_socks4(SOCKS4_CMD_CONNECT, 8080, "192.168.1.1") req = parse_socks4a_request(data) assert req.dest_addr == "192.168.1.1" assert req.dest_port == 8080 def test_parse_socks4a_with_userid(self): data = self._build_socks4a(SOCKS4_CMD_CONNECT, 80, "test.i2p", userid=b"user1") req = parse_socks4a_request(data) assert req.dest_addr == "test.i2p" def test_parse_socks4a_too_short(self): with pytest.raises(ValueError, match="too short"): parse_socks4a_request(b"\x04\x01\x00") def test_parse_socks4a_wrong_version(self): data = bytes([0x05, SOCKS4_CMD_CONNECT]) + b"\x00\x50" + b"\x00\x00\x00\x01" + b"\x00" + b"x.i2p\x00" with pytest.raises(ValueError, match="Not SOCKS4"): parse_socks4a_request(data) def test_socks4_request_dataclass(self): r = SOCKS4Request(version=4, command=1, dest_port=80, dest_addr="test.i2p") assert r.version == 4 assert r.command == 1 assert r.dest_port == 80 assert r.dest_addr == "test.i2p" class TestSOCKS5Parsing: def test_parse_greeting_no_auth(self): data = bytes([SOCKS_VERSION, 1, AUTH_NONE]) g = parse_socks_greeting(data) assert g.version == 5 assert AUTH_NONE in g.methods def test_parse_greeting_multiple_methods(self): data = bytes([SOCKS_VERSION, 3, 0x00, 0x01, 0x02]) g = parse_socks_greeting(data) assert len(g.methods) == 3 def test_parse_greeting_too_short(self): with pytest.raises(ValueError): parse_socks_greeting(b"\x05") def test_parse_greeting_wrong_version(self): with pytest.raises(ValueError, match="Unsupported"): parse_socks_greeting(b"\x04\x01\x00") def test_parse_greeting_truncated(self): with pytest.raises(ValueError, match="truncated"): parse_socks_greeting(bytes([SOCKS_VERSION, 3, 0x00])) # claims 3 methods, only 1 def test_build_greeting_reply(self): reply = build_socks_greeting_reply(AUTH_NONE) assert reply == bytes([SOCKS_VERSION, AUTH_NONE]) def test_build_greeting_reply_no_acceptable(self): reply = build_socks_greeting_reply(0xFF) assert reply == bytes([SOCKS_VERSION, 0xFF]) def test_parse_request_domain(self): domain = b"example.i2p" data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) data += domain + struct.pack("!H", 80) req = parse_socks_request(data) assert req.dest_addr == "example.i2p" assert req.dest_port == 80 def test_parse_request_ipv4(self): import socket data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_IPV4]) data += socket.inet_aton("10.0.0.1") + struct.pack("!H", 443) req = parse_socks_request(data) assert req.dest_addr == "10.0.0.1" assert req.dest_port == 443 def test_parse_request_ipv6(self): import socket ipv6 = socket.inet_pton(socket.AF_INET6, "::1") data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_IPV6]) data += ipv6 + struct.pack("!H", 8080) req = parse_socks_request(data) assert req.dest_addr == "::1" assert req.dest_port == 8080 def test_parse_request_too_short(self): with pytest.raises(ValueError): parse_socks_request(b"\x05\x01") def test_parse_request_unsupported_atyp(self): with pytest.raises(ValueError, match="Unsupported address type"): parse_socks_request(bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, 0x07, 0x00])) def test_build_reply_success(self): reply = build_socks_reply(REPLY_SUCCESS) assert reply[0] == SOCKS_VERSION assert reply[1] == REPLY_SUCCESS assert len(reply) == 10 def test_build_reply_failure(self): reply = build_socks_reply(REPLY_HOST_UNREACHABLE) assert reply[1] == REPLY_HOST_UNREACHABLE class TestSOCKSClientTaskHelpers: def test_is_i2p_true(self): assert SOCKSClientTask._is_i2p("example.i2p") is True def test_is_i2p_b32(self): assert SOCKSClientTask._is_i2p("abcd.b32.i2p") is True def test_is_i2p_false(self): assert SOCKSClientTask._is_i2p("example.com") is False def test_is_i2p_empty(self): assert SOCKSClientTask._is_i2p("") is False class TestSOCKSProxy: def test_initial_state(self): proxy = SOCKSProxy() assert proxy.is_running is False assert proxy.listen_address == ("127.0.0.1", 4445) def test_custom_address(self): proxy = SOCKSProxy("0.0.0.0", 9050) assert proxy.listen_address == ("0.0.0.0", 9050) @pytest.mark.asyncio async def test_start_stop(self): proxy = SOCKSProxy("127.0.0.1", 0) await proxy.start() assert proxy.is_running is True await proxy.stop() assert proxy.is_running is False @pytest.mark.asyncio async def test_stop_without_start(self): proxy = SOCKSProxy() await proxy.stop() # should not crash assert proxy.is_running is False # === Async handle_client tests === import asyncio from unittest.mock import AsyncMock, MagicMock def _make_mock_writer(): writer = MagicMock() writer.write = MagicMock() writer.drain = AsyncMock() writer.close = MagicMock() writer.wait_closed = AsyncMock() transport = MagicMock() transport.is_closing = MagicMock(return_value=False) writer.transport = transport return writer def _make_socks_task(): config = MagicMock() config.proxy_list = [] session = AsyncMock() return SOCKSClientTask(config, session) class TestSOCKSClientHandleClient: @pytest.mark.asyncio async def test_empty_read_closes(self): task = _make_socks_task() reader = AsyncMock() reader.read = AsyncMock(return_value=b"") writer = _make_mock_writer() await task.handle_client(reader, writer) @pytest.mark.asyncio async def test_unknown_version_closes(self): task = _make_socks_task() reader = AsyncMock() reader.read = AsyncMock(return_value=b"\x03") writer = _make_mock_writer() await task.handle_client(reader, writer) writer.close.assert_called() @pytest.mark.asyncio async def test_socks5_no_acceptable_auth(self): task = _make_socks_task() # SOCKS5 greeting with only auth method 0x02 (username/password), no 0x00 greeting = bytes([SOCKS_VERSION, 1, 0x02]) reader = AsyncMock() reader.read = AsyncMock(side_effect=[bytes([SOCKS_VERSION]), greeting[1:]]) writer = _make_mock_writer() await task.handle_client(reader, writer) # Should reply with 0xFF (no acceptable method) writer.write.assert_called() written = writer.write.call_args_list[0][0][0] assert written == bytes([SOCKS_VERSION, 0xFF]) @pytest.mark.asyncio async def test_socks5_udp_associate_rejected(self): task = _make_socks_task() greeting_rest = bytes([1, AUTH_NONE]) # 1 method, no auth domain = b"example.i2p" request = bytes([SOCKS_VERSION, 3, 0x00, ATYP_DOMAIN, len(domain)]) # CMD=3 UDP ASSOCIATE request += domain + struct.pack("!H", 80) call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS_VERSION]) # first byte elif call_count[0] == 2: return greeting_rest elif call_count[0] == 3: return request return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) # Should reply with 0x07 (command not supported) calls = writer.write.call_args_list assert any(b"\x07" in c[0][0] for c in calls if len(c[0][0]) > 1) @pytest.mark.asyncio async def test_socks5_non_connect_rejected(self): task = _make_socks_task() greeting_rest = bytes([1, AUTH_NONE]) domain = b"example.i2p" request = bytes([SOCKS_VERSION, 0x02, 0x00, ATYP_DOMAIN, len(domain)]) # CMD=2 BIND request += domain + struct.pack("!H", 80) call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS_VERSION]) elif call_count[0] == 2: return greeting_rest elif call_count[0] == 3: return request return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) @pytest.mark.asyncio async def test_socks5_non_i2p_rejected(self): task = _make_socks_task() greeting_rest = bytes([1, AUTH_NONE]) domain = b"example.com" request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) request += domain + struct.pack("!H", 80) call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS_VERSION]) elif call_count[0] == 2: return greeting_rest elif call_count[0] == 3: return request return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) # Should reply HOST_UNREACHABLE for non-i2p calls = writer.write.call_args_list replies = [c[0][0] for c in calls if len(c[0][0]) >= 2] assert any(r[1] == REPLY_HOST_UNREACHABLE for r in replies if r[0] == SOCKS_VERSION) @pytest.mark.asyncio async def test_socks5_i2p_resolve_fail(self): task = _make_socks_task() task._session.lookup = AsyncMock(return_value=None) greeting_rest = bytes([1, AUTH_NONE]) domain = b"unknown.i2p" request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) request += domain + struct.pack("!H", 80) call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS_VERSION]) elif call_count[0] == 2: return greeting_rest elif call_count[0] == 3: return request return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) @pytest.mark.asyncio async def test_socks4_non_connect_rejected(self): task = _make_socks_task() # SOCKS4 with command 2 (BIND) instead of 1 (CONNECT) data = bytes([SOCKS4_VERSION, 2]) # command=2 data += struct.pack("!H", 80) data += b"\x00\x00\x00\x01" # SOCKS4a indicator data += b"\x00" # empty userid data += b"test.i2p\x00" call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS4_VERSION]) elif call_count[0] == 2: return data[1:] return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) # Should reply with SOCKS4 failure calls = writer.write.call_args_list assert any(SOCKS4_REPLY_FAILED in c[0][0] for c in calls) @pytest.mark.asyncio async def test_socks4_non_i2p_rejected(self): task = _make_socks_task() data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT]) data += struct.pack("!H", 80) data += b"\x00\x00\x00\x01" # SOCKS4a indicator data += b"\x00" data += b"example.com\x00" call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS4_VERSION]) elif call_count[0] == 2: return data[1:] return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) @pytest.mark.asyncio async def test_socks4_i2p_resolve_fail(self): task = _make_socks_task() task._session.lookup = AsyncMock(return_value=None) data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT]) data += struct.pack("!H", 80) data += b"\x00\x00\x00\x01" data += b"\x00" data += b"unknown.i2p\x00" call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS4_VERSION]) elif call_count[0] == 2: return data[1:] return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) @pytest.mark.asyncio async def test_socks4_connect_exception(self): task = _make_socks_task() task._session.lookup = AsyncMock(return_value=None) task._session.connect = AsyncMock(side_effect=ConnectionRefusedError("test")) # .b32.i2p bypasses lookup data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT]) data += struct.pack("!H", 80) data += b"\x00\x00\x00\x01" data += b"\x00" data += b"abcd.b32.i2p\x00" call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS4_VERSION]) elif call_count[0] == 2: return data[1:] return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer) @pytest.mark.asyncio async def test_socks5_connect_exception(self): task = _make_socks_task() task._session.connect = AsyncMock(side_effect=ConnectionRefusedError("test")) greeting_rest = bytes([1, AUTH_NONE]) domain = b"abcd.b32.i2p" request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)]) request += domain + struct.pack("!H", 80) call_count = [0] async def read_side_effect(n): call_count[0] += 1 if call_count[0] == 1: return bytes([SOCKS_VERSION]) elif call_count[0] == 2: return greeting_rest elif call_count[0] == 3: return request return b"" reader = AsyncMock() reader.read = AsyncMock(side_effect=read_side_effect) writer = _make_mock_writer() await task.handle_client(reader, writer)