"""SOCKS4a/5 client task — SOCKS proxy with SAM integration. Uses existing SOCKS5 parsing from socks_proxy.py. Adds SOCKS4a support and SAM-based I2P routing. Ported from net.i2p.i2ptunnel.socks. """ from __future__ import annotations import logging import socket import struct from dataclasses import dataclass from i2p_apps.i2ptunnel.config import TunnelDefinition from i2p_apps.i2ptunnel.forwarder import bridge from i2p_apps.i2ptunnel.socks_proxy import ( SOCKS_VERSION, AUTH_NONE, CMD_CONNECT, ATYP_DOMAIN, REPLY_SUCCESS, REPLY_GENERAL_FAILURE, REPLY_HOST_UNREACHABLE, parse_socks_greeting, build_socks_greeting_reply, parse_socks_request, build_socks_reply, ) from i2p_apps.i2ptunnel.tasks import ClientTunnelTask logger = logging.getLogger(__name__) # SOCKS4 constants SOCKS4_VERSION = 4 SOCKS4_CMD_CONNECT = 1 SOCKS4_REPLY_GRANTED = 0x5A SOCKS4_REPLY_FAILED = 0x5B @dataclass class SOCKS4Request: """Parsed SOCKS4/4a request.""" version: int command: int dest_port: int dest_addr: str def parse_socks4a_request(data: bytes) -> SOCKS4Request: """Parse a SOCKS4/4a CONNECT request. Format: VER(1) CMD(1) DSTPORT(2) DSTIP(4) USERID(var,null-term) [DOMAIN(var,null-term)] If DSTIP is 0.0.0.x (x != 0), it's SOCKS4a with domain after userid. """ if len(data) < 8: raise ValueError("SOCKS4 request too short") version = data[0] if version != SOCKS4_VERSION: raise ValueError(f"Not SOCKS4: version={version}") command = data[1] port = struct.unpack("!H", data[2:4])[0] ip_bytes = data[4:8] # Find userid (null-terminated starting at byte 8) userid_end = data.index(0x00, 8) # Check if SOCKS4a (IP = 0.0.0.x where x != 0) if ip_bytes[0] == 0 and ip_bytes[1] == 0 and ip_bytes[2] == 0 and ip_bytes[3] != 0: # SOCKS4a: domain name follows userid null domain_start = userid_end + 1 domain_end = data.index(0x00, domain_start) domain = data[domain_start:domain_end].decode("utf-8") return SOCKS4Request(version=version, command=command, dest_port=port, dest_addr=domain) else: # Regular SOCKS4: use the IP address addr = socket.inet_ntoa(ip_bytes) return SOCKS4Request(version=version, command=command, dest_port=port, dest_addr=addr) class SOCKSClientTask(ClientTunnelTask): """SOCKS4a/5 proxy with SAM integration. Handles both SOCKS5 (RFC 1928) and SOCKS4a protocols. Routes .i2p destinations through SAM, non-.i2p to outproxy. """ CMD_UDP_ASSOCIATE = 3 def __init__(self, config: TunnelDefinition, session) -> None: super().__init__(config, session) self._proxy_list = list(config.proxy_list) @property def _supports_udp(self) -> bool: return False @staticmethod def _is_i2p(host: str) -> bool: return host.endswith(".i2p") async def _resolve(self, hostname: str) -> str | None: if hostname.endswith(".b32.i2p"): return hostname if hostname.endswith(".i2p"): return await self._session.lookup(hostname) return hostname async def handle_client(self, reader, writer) -> None: try: # Peek at first byte to determine SOCKS version first_byte = await reader.read(1) if not first_byte: return version = first_byte[0] if version == SOCKS_VERSION: await self._handle_socks5(first_byte, reader, writer) elif version == SOCKS4_VERSION: await self._handle_socks4(first_byte, reader, writer) else: writer.close() except Exception: logger.exception("Error in SOCKS proxy handler") async def _handle_socks5(self, first_byte, reader, writer) -> None: """Handle SOCKS5 connection.""" # Read rest of greeting rest = await reader.read(256) greeting_data = first_byte + rest greeting = parse_socks_greeting(greeting_data) if AUTH_NONE not in greeting.methods: writer.write(build_socks_greeting_reply(0xFF)) await writer.drain() return writer.write(build_socks_greeting_reply(AUTH_NONE)) await writer.drain() # Read request request_data = await reader.read(262) request = parse_socks_request(request_data) # Check for UDP ASSOCIATE if request.command == self.CMD_UDP_ASSOCIATE: writer.write(build_socks_reply(0x07)) # Command not supported await writer.drain() return if request.command != CMD_CONNECT: writer.write(build_socks_reply(REPLY_GENERAL_FAILURE)) await writer.drain() return # Route await self._route_socks5(request.dest_addr, request.dest_port, reader, writer) async def _route_socks5(self, dest_addr, dest_port, reader, writer) -> None: if self._is_i2p(dest_addr): resolved = await self._resolve(dest_addr) if resolved is None: writer.write(build_socks_reply(REPLY_HOST_UNREACHABLE)) await writer.drain() return try: remote_reader, remote_writer = await self._session.connect(resolved) except Exception: writer.write(build_socks_reply(REPLY_HOST_UNREACHABLE)) await writer.drain() return writer.write(build_socks_reply(REPLY_SUCCESS)) await writer.drain() await bridge(reader, writer, remote_reader, remote_writer) else: # Non-I2P: forward to outproxy or reject writer.write(build_socks_reply(REPLY_HOST_UNREACHABLE)) await writer.drain() async def _handle_socks4(self, first_byte, reader, writer) -> None: """Handle SOCKS4/4a connection.""" # Read enough for the request (variable length due to userid + domain) rest = await reader.read(512) data = first_byte + rest request = parse_socks4a_request(data) if request.command != SOCKS4_CMD_CONNECT: writer.write(bytes([0x00, SOCKS4_REPLY_FAILED]) + b"\x00" * 6) await writer.drain() return if self._is_i2p(request.dest_addr): resolved = await self._resolve(request.dest_addr) if resolved is None: writer.write(bytes([0x00, SOCKS4_REPLY_FAILED]) + b"\x00" * 6) await writer.drain() return try: remote_reader, remote_writer = await self._session.connect(resolved) except Exception: writer.write(bytes([0x00, SOCKS4_REPLY_FAILED]) + b"\x00" * 6) await writer.drain() return # Send SOCKS4 success reply writer.write(bytes([0x00, SOCKS4_REPLY_GRANTED]) + b"\x00" * 6) await writer.drain() await bridge(reader, writer, remote_reader, remote_writer) else: writer.write(bytes([0x00, SOCKS4_REPLY_FAILED]) + b"\x00" * 6) await writer.drain()