A Python port of the Invisible Internet Project (I2P)
1"""SOCKS5 proxy for I2P tunneling.
2
3Implements RFC 1928 SOCKS5 protocol with domain name address type (ATYP=3)
4for .i2p hostname resolution.
5
6Ported from net.i2p.i2ptunnel.socks.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import socket
13import struct
14from dataclasses import dataclass
15
16# SOCKS5 constants
17SOCKS_VERSION = 5
18AUTH_NONE = 0x00
19CMD_CONNECT = 0x01
20ATYP_IPV4 = 0x01
21ATYP_DOMAIN = 0x03
22ATYP_IPV6 = 0x04
23REPLY_SUCCESS = 0x00
24REPLY_GENERAL_FAILURE = 0x01
25REPLY_NOT_ALLOWED = 0x02
26REPLY_NETWORK_UNREACHABLE = 0x03
27REPLY_HOST_UNREACHABLE = 0x04
28REPLY_REFUSED = 0x05
29
30
31@dataclass
32class SOCKSGreeting:
33 """SOCKS5 client greeting."""
34
35 version: int
36 methods: list[int]
37
38
39@dataclass
40class SOCKSRequest:
41 """SOCKS5 client request."""
42
43 version: int
44 command: int
45 address_type: int
46 dest_addr: str
47 dest_port: int
48
49
50def parse_socks_greeting(data: bytes) -> SOCKSGreeting:
51 """Parse a SOCKS5 client greeting.
52
53 Format: VER (1) | NMETHODS (1) | METHODS (NMETHODS)
54 """
55 if len(data) < 2:
56 raise ValueError("SOCKS greeting too short")
57 version = data[0]
58 if version != SOCKS_VERSION:
59 raise ValueError(f"Unsupported SOCKS version: {version}")
60 nmethods = data[1]
61 if len(data) < 2 + nmethods:
62 raise ValueError("SOCKS greeting truncated")
63 methods = list(data[2 : 2 + nmethods])
64 return SOCKSGreeting(version=version, methods=methods)
65
66
67def build_socks_greeting_reply(method: int = AUTH_NONE) -> bytes:
68 """Build a SOCKS5 server greeting reply.
69
70 Format: VER (1) | METHOD (1)
71 """
72 return bytes([SOCKS_VERSION, method])
73
74
75def parse_socks_request(data: bytes) -> SOCKSRequest:
76 """Parse a SOCKS5 CONNECT request.
77
78 Format: VER (1) | CMD (1) | RSV (1) | ATYP (1) | DST.ADDR (var) | DST.PORT (2)
79 """
80 if len(data) < 4:
81 raise ValueError("SOCKS request too short")
82 version = data[0]
83 if version != SOCKS_VERSION:
84 raise ValueError(f"Unsupported SOCKS version: {version}")
85 command = data[1]
86 # data[2] is reserved
87 address_type = data[3]
88
89 if address_type == ATYP_IPV4:
90 if len(data) < 10:
91 raise ValueError("SOCKS IPv4 request truncated")
92 addr = socket.inet_ntoa(data[4:8])
93 port = struct.unpack("!H", data[8:10])[0]
94 elif address_type == ATYP_DOMAIN:
95 domain_len = data[4]
96 if len(data) < 5 + domain_len + 2:
97 raise ValueError("SOCKS domain request truncated")
98 addr = data[5 : 5 + domain_len].decode("utf-8")
99 port = struct.unpack("!H", data[5 + domain_len : 5 + domain_len + 2])[0]
100 elif address_type == ATYP_IPV6:
101 if len(data) < 22:
102 raise ValueError("SOCKS IPv6 request truncated")
103 addr = socket.inet_ntop(socket.AF_INET6, data[4:20])
104 port = struct.unpack("!H", data[20:22])[0]
105 else:
106 raise ValueError(f"Unsupported address type: {address_type}")
107
108 return SOCKSRequest(
109 version=version,
110 command=command,
111 address_type=address_type,
112 dest_addr=addr,
113 dest_port=port,
114 )
115
116
117def build_socks_reply(
118 reply_code: int, bind_addr: str = "0.0.0.0", bind_port: int = 0
119) -> bytes:
120 """Build a SOCKS5 reply.
121
122 Format: VER (1) | REP (1) | RSV (1) | ATYP (1) | BND.ADDR (4) | BND.PORT (2)
123 Always uses IPv4 address type for the reply.
124 """
125 addr_bytes = socket.inet_aton(bind_addr)
126 port_bytes = struct.pack("!H", bind_port)
127 return bytes([SOCKS_VERSION, reply_code, 0x00, ATYP_IPV4]) + addr_bytes + port_bytes
128
129
130class SOCKSProxy:
131 """SOCKS5 proxy for I2P tunneling."""
132
133 def __init__(self, listen_host: str = "127.0.0.1", listen_port: int = 4445):
134 self._host = listen_host
135 self._port = listen_port
136 self._running = False
137 self._server: asyncio.Server | None = None
138
139 async def start(self) -> None:
140 """Start listening for SOCKS5 connections."""
141 self._server = await asyncio.start_server(
142 self._handle_client, self._host, self._port
143 )
144 self._running = True
145
146 async def stop(self) -> None:
147 """Stop the SOCKS5 proxy server."""
148 if self._server is not None:
149 self._server.close()
150 await self._server.wait_closed()
151 self._running = False
152
153 async def _handle_client(
154 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
155 ) -> None:
156 """Handle an incoming SOCKS5 client."""
157 try:
158 # Read greeting
159 greeting_data = await reader.read(257) # max: 1 + 1 + 255 methods
160 greeting = parse_socks_greeting(greeting_data)
161
162 if AUTH_NONE not in greeting.methods:
163 writer.write(build_socks_greeting_reply(0xFF)) # no acceptable methods
164 await writer.drain()
165 writer.close()
166 return
167
168 writer.write(build_socks_greeting_reply(AUTH_NONE))
169 await writer.drain()
170
171 # Read request
172 request_data = await reader.read(262)
173 request = parse_socks_request(request_data)
174
175 # TODO: route through I2P
176 writer.write(build_socks_reply(REPLY_HOST_UNREACHABLE))
177 await writer.drain()
178 writer.close()
179 except Exception:
180 try:
181 writer.close()
182 except Exception:
183 pass
184
185 @property
186 def is_running(self) -> bool:
187 return self._running
188
189 @property
190 def listen_address(self) -> tuple[str, int]:
191 return (self._host, self._port)