"""SOCKS5 proxy for I2P tunneling. Implements RFC 1928 SOCKS5 protocol with domain name address type (ATYP=3) for .i2p hostname resolution. Ported from net.i2p.i2ptunnel.socks. """ from __future__ import annotations import asyncio import socket import struct from dataclasses import dataclass # SOCKS5 constants SOCKS_VERSION = 5 AUTH_NONE = 0x00 CMD_CONNECT = 0x01 ATYP_IPV4 = 0x01 ATYP_DOMAIN = 0x03 ATYP_IPV6 = 0x04 REPLY_SUCCESS = 0x00 REPLY_GENERAL_FAILURE = 0x01 REPLY_NOT_ALLOWED = 0x02 REPLY_NETWORK_UNREACHABLE = 0x03 REPLY_HOST_UNREACHABLE = 0x04 REPLY_REFUSED = 0x05 @dataclass class SOCKSGreeting: """SOCKS5 client greeting.""" version: int methods: list[int] @dataclass class SOCKSRequest: """SOCKS5 client request.""" version: int command: int address_type: int dest_addr: str dest_port: int def parse_socks_greeting(data: bytes) -> SOCKSGreeting: """Parse a SOCKS5 client greeting. Format: VER (1) | NMETHODS (1) | METHODS (NMETHODS) """ if len(data) < 2: raise ValueError("SOCKS greeting too short") version = data[0] if version != SOCKS_VERSION: raise ValueError(f"Unsupported SOCKS version: {version}") nmethods = data[1] if len(data) < 2 + nmethods: raise ValueError("SOCKS greeting truncated") methods = list(data[2 : 2 + nmethods]) return SOCKSGreeting(version=version, methods=methods) def build_socks_greeting_reply(method: int = AUTH_NONE) -> bytes: """Build a SOCKS5 server greeting reply. Format: VER (1) | METHOD (1) """ return bytes([SOCKS_VERSION, method]) def parse_socks_request(data: bytes) -> SOCKSRequest: """Parse a SOCKS5 CONNECT request. Format: VER (1) | CMD (1) | RSV (1) | ATYP (1) | DST.ADDR (var) | DST.PORT (2) """ if len(data) < 4: raise ValueError("SOCKS request too short") version = data[0] if version != SOCKS_VERSION: raise ValueError(f"Unsupported SOCKS version: {version}") command = data[1] # data[2] is reserved address_type = data[3] if address_type == ATYP_IPV4: if len(data) < 10: raise ValueError("SOCKS IPv4 request truncated") addr = socket.inet_ntoa(data[4:8]) port = struct.unpack("!H", data[8:10])[0] elif address_type == ATYP_DOMAIN: domain_len = data[4] if len(data) < 5 + domain_len + 2: raise ValueError("SOCKS domain request truncated") addr = data[5 : 5 + domain_len].decode("utf-8") port = struct.unpack("!H", data[5 + domain_len : 5 + domain_len + 2])[0] elif address_type == ATYP_IPV6: if len(data) < 22: raise ValueError("SOCKS IPv6 request truncated") addr = socket.inet_ntop(socket.AF_INET6, data[4:20]) port = struct.unpack("!H", data[20:22])[0] else: raise ValueError(f"Unsupported address type: {address_type}") return SOCKSRequest( version=version, command=command, address_type=address_type, dest_addr=addr, dest_port=port, ) def build_socks_reply( reply_code: int, bind_addr: str = "0.0.0.0", bind_port: int = 0 ) -> bytes: """Build a SOCKS5 reply. Format: VER (1) | REP (1) | RSV (1) | ATYP (1) | BND.ADDR (4) | BND.PORT (2) Always uses IPv4 address type for the reply. """ addr_bytes = socket.inet_aton(bind_addr) port_bytes = struct.pack("!H", bind_port) return bytes([SOCKS_VERSION, reply_code, 0x00, ATYP_IPV4]) + addr_bytes + port_bytes class SOCKSProxy: """SOCKS5 proxy for I2P tunneling.""" def __init__(self, listen_host: str = "127.0.0.1", listen_port: int = 4445): self._host = listen_host self._port = listen_port self._running = False self._server: asyncio.Server | None = None async def start(self) -> None: """Start listening for SOCKS5 connections.""" self._server = await asyncio.start_server( self._handle_client, self._host, self._port ) self._running = True async def stop(self) -> None: """Stop the SOCKS5 proxy server.""" if self._server is not None: self._server.close() await self._server.wait_closed() self._running = False async def _handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: """Handle an incoming SOCKS5 client.""" try: # Read greeting greeting_data = await reader.read(257) # max: 1 + 1 + 255 methods greeting = parse_socks_greeting(greeting_data) if AUTH_NONE not in greeting.methods: writer.write(build_socks_greeting_reply(0xFF)) # no acceptable methods await writer.drain() writer.close() return writer.write(build_socks_greeting_reply(AUTH_NONE)) await writer.drain() # Read request request_data = await reader.read(262) request = parse_socks_request(request_data) # TODO: route through I2P writer.write(build_socks_reply(REPLY_HOST_UNREACHABLE)) await writer.drain() writer.close() except Exception: try: writer.close() except Exception: pass @property def is_running(self) -> bool: return self._running @property def listen_address(self) -> tuple[str, int]: return (self._host, self._port)