A Python port of the Invisible Internet Project (I2P)
1"""Tests for SOCKS4a/5 client task — parsing, routing, edge cases."""
2
3import struct
4
5import pytest
6
7from i2p_apps.i2ptunnel.socks_task import (
8 SOCKS4_CMD_CONNECT,
9 SOCKS4_REPLY_FAILED,
10 SOCKS4_REPLY_GRANTED,
11 SOCKS4_VERSION,
12 SOCKS4Request,
13 SOCKSClientTask,
14 parse_socks4a_request,
15)
16from i2p_apps.i2ptunnel.socks_proxy import (
17 ATYP_DOMAIN,
18 ATYP_IPV4,
19 ATYP_IPV6,
20 AUTH_NONE,
21 CMD_CONNECT,
22 REPLY_GENERAL_FAILURE,
23 REPLY_HOST_UNREACHABLE,
24 REPLY_SUCCESS,
25 SOCKS_VERSION,
26 SOCKSGreeting,
27 SOCKSProxy,
28 SOCKSRequest,
29 build_socks_greeting_reply,
30 build_socks_reply,
31 parse_socks_greeting,
32 parse_socks_request,
33)
34
35
36class TestSOCKS4aParsing:
37 def _build_socks4a(self, command, port, domain, userid=b""):
38 """Build a SOCKS4a request with domain name."""
39 ip = b"\x00\x00\x00\x01" # 0.0.0.1 = SOCKS4a indicator
40 data = bytes([SOCKS4_VERSION, command])
41 data += struct.pack("!H", port)
42 data += ip
43 data += userid + b"\x00"
44 data += domain.encode() + b"\x00"
45 return data
46
47 def _build_socks4(self, command, port, ip_str, userid=b""):
48 """Build a regular SOCKS4 request with IP."""
49 import socket
50 data = bytes([SOCKS4_VERSION, command])
51 data += struct.pack("!H", port)
52 data += socket.inet_aton(ip_str)
53 data += userid + b"\x00"
54 return data
55
56 def test_parse_socks4a_domain(self):
57 data = self._build_socks4a(SOCKS4_CMD_CONNECT, 80, "example.i2p")
58 req = parse_socks4a_request(data)
59 assert req.version == SOCKS4_VERSION
60 assert req.command == SOCKS4_CMD_CONNECT
61 assert req.dest_port == 80
62 assert req.dest_addr == "example.i2p"
63
64 def test_parse_socks4a_b32(self):
65 data = self._build_socks4a(SOCKS4_CMD_CONNECT, 443, "abcd1234.b32.i2p")
66 req = parse_socks4a_request(data)
67 assert req.dest_addr == "abcd1234.b32.i2p"
68
69 def test_parse_socks4_regular_ip(self):
70 data = self._build_socks4(SOCKS4_CMD_CONNECT, 8080, "192.168.1.1")
71 req = parse_socks4a_request(data)
72 assert req.dest_addr == "192.168.1.1"
73 assert req.dest_port == 8080
74
75 def test_parse_socks4a_with_userid(self):
76 data = self._build_socks4a(SOCKS4_CMD_CONNECT, 80, "test.i2p", userid=b"user1")
77 req = parse_socks4a_request(data)
78 assert req.dest_addr == "test.i2p"
79
80 def test_parse_socks4a_too_short(self):
81 with pytest.raises(ValueError, match="too short"):
82 parse_socks4a_request(b"\x04\x01\x00")
83
84 def test_parse_socks4a_wrong_version(self):
85 data = bytes([0x05, SOCKS4_CMD_CONNECT]) + b"\x00\x50" + b"\x00\x00\x00\x01" + b"\x00" + b"x.i2p\x00"
86 with pytest.raises(ValueError, match="Not SOCKS4"):
87 parse_socks4a_request(data)
88
89 def test_socks4_request_dataclass(self):
90 r = SOCKS4Request(version=4, command=1, dest_port=80, dest_addr="test.i2p")
91 assert r.version == 4
92 assert r.command == 1
93 assert r.dest_port == 80
94 assert r.dest_addr == "test.i2p"
95
96
97class TestSOCKS5Parsing:
98 def test_parse_greeting_no_auth(self):
99 data = bytes([SOCKS_VERSION, 1, AUTH_NONE])
100 g = parse_socks_greeting(data)
101 assert g.version == 5
102 assert AUTH_NONE in g.methods
103
104 def test_parse_greeting_multiple_methods(self):
105 data = bytes([SOCKS_VERSION, 3, 0x00, 0x01, 0x02])
106 g = parse_socks_greeting(data)
107 assert len(g.methods) == 3
108
109 def test_parse_greeting_too_short(self):
110 with pytest.raises(ValueError):
111 parse_socks_greeting(b"\x05")
112
113 def test_parse_greeting_wrong_version(self):
114 with pytest.raises(ValueError, match="Unsupported"):
115 parse_socks_greeting(b"\x04\x01\x00")
116
117 def test_parse_greeting_truncated(self):
118 with pytest.raises(ValueError, match="truncated"):
119 parse_socks_greeting(bytes([SOCKS_VERSION, 3, 0x00])) # claims 3 methods, only 1
120
121 def test_build_greeting_reply(self):
122 reply = build_socks_greeting_reply(AUTH_NONE)
123 assert reply == bytes([SOCKS_VERSION, AUTH_NONE])
124
125 def test_build_greeting_reply_no_acceptable(self):
126 reply = build_socks_greeting_reply(0xFF)
127 assert reply == bytes([SOCKS_VERSION, 0xFF])
128
129 def test_parse_request_domain(self):
130 domain = b"example.i2p"
131 data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)])
132 data += domain + struct.pack("!H", 80)
133 req = parse_socks_request(data)
134 assert req.dest_addr == "example.i2p"
135 assert req.dest_port == 80
136
137 def test_parse_request_ipv4(self):
138 import socket
139 data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_IPV4])
140 data += socket.inet_aton("10.0.0.1") + struct.pack("!H", 443)
141 req = parse_socks_request(data)
142 assert req.dest_addr == "10.0.0.1"
143 assert req.dest_port == 443
144
145 def test_parse_request_ipv6(self):
146 import socket
147 ipv6 = socket.inet_pton(socket.AF_INET6, "::1")
148 data = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_IPV6])
149 data += ipv6 + struct.pack("!H", 8080)
150 req = parse_socks_request(data)
151 assert req.dest_addr == "::1"
152 assert req.dest_port == 8080
153
154 def test_parse_request_too_short(self):
155 with pytest.raises(ValueError):
156 parse_socks_request(b"\x05\x01")
157
158 def test_parse_request_unsupported_atyp(self):
159 with pytest.raises(ValueError, match="Unsupported address type"):
160 parse_socks_request(bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, 0x07, 0x00]))
161
162 def test_build_reply_success(self):
163 reply = build_socks_reply(REPLY_SUCCESS)
164 assert reply[0] == SOCKS_VERSION
165 assert reply[1] == REPLY_SUCCESS
166 assert len(reply) == 10
167
168 def test_build_reply_failure(self):
169 reply = build_socks_reply(REPLY_HOST_UNREACHABLE)
170 assert reply[1] == REPLY_HOST_UNREACHABLE
171
172
173class TestSOCKSClientTaskHelpers:
174 def test_is_i2p_true(self):
175 assert SOCKSClientTask._is_i2p("example.i2p") is True
176
177 def test_is_i2p_b32(self):
178 assert SOCKSClientTask._is_i2p("abcd.b32.i2p") is True
179
180 def test_is_i2p_false(self):
181 assert SOCKSClientTask._is_i2p("example.com") is False
182
183 def test_is_i2p_empty(self):
184 assert SOCKSClientTask._is_i2p("") is False
185
186
187class TestSOCKSProxy:
188 def test_initial_state(self):
189 proxy = SOCKSProxy()
190 assert proxy.is_running is False
191 assert proxy.listen_address == ("127.0.0.1", 4445)
192
193 def test_custom_address(self):
194 proxy = SOCKSProxy("0.0.0.0", 9050)
195 assert proxy.listen_address == ("0.0.0.0", 9050)
196
197 @pytest.mark.asyncio
198 async def test_start_stop(self):
199 proxy = SOCKSProxy("127.0.0.1", 0)
200 await proxy.start()
201 assert proxy.is_running is True
202 await proxy.stop()
203 assert proxy.is_running is False
204
205 @pytest.mark.asyncio
206 async def test_stop_without_start(self):
207 proxy = SOCKSProxy()
208 await proxy.stop() # should not crash
209 assert proxy.is_running is False
210
211
212# === Async handle_client tests ===
213
214import asyncio
215from unittest.mock import AsyncMock, MagicMock
216
217
218def _make_mock_writer():
219 writer = MagicMock()
220 writer.write = MagicMock()
221 writer.drain = AsyncMock()
222 writer.close = MagicMock()
223 writer.wait_closed = AsyncMock()
224 transport = MagicMock()
225 transport.is_closing = MagicMock(return_value=False)
226 writer.transport = transport
227 return writer
228
229
230def _make_socks_task():
231 config = MagicMock()
232 config.proxy_list = []
233 session = AsyncMock()
234 return SOCKSClientTask(config, session)
235
236
237class TestSOCKSClientHandleClient:
238 @pytest.mark.asyncio
239 async def test_empty_read_closes(self):
240 task = _make_socks_task()
241 reader = AsyncMock()
242 reader.read = AsyncMock(return_value=b"")
243 writer = _make_mock_writer()
244 await task.handle_client(reader, writer)
245
246 @pytest.mark.asyncio
247 async def test_unknown_version_closes(self):
248 task = _make_socks_task()
249 reader = AsyncMock()
250 reader.read = AsyncMock(return_value=b"\x03")
251 writer = _make_mock_writer()
252 await task.handle_client(reader, writer)
253 writer.close.assert_called()
254
255 @pytest.mark.asyncio
256 async def test_socks5_no_acceptable_auth(self):
257 task = _make_socks_task()
258 # SOCKS5 greeting with only auth method 0x02 (username/password), no 0x00
259 greeting = bytes([SOCKS_VERSION, 1, 0x02])
260 reader = AsyncMock()
261 reader.read = AsyncMock(side_effect=[bytes([SOCKS_VERSION]), greeting[1:]])
262 writer = _make_mock_writer()
263 await task.handle_client(reader, writer)
264 # Should reply with 0xFF (no acceptable method)
265 writer.write.assert_called()
266 written = writer.write.call_args_list[0][0][0]
267 assert written == bytes([SOCKS_VERSION, 0xFF])
268
269 @pytest.mark.asyncio
270 async def test_socks5_udp_associate_rejected(self):
271 task = _make_socks_task()
272 greeting_rest = bytes([1, AUTH_NONE]) # 1 method, no auth
273 domain = b"example.i2p"
274 request = bytes([SOCKS_VERSION, 3, 0x00, ATYP_DOMAIN, len(domain)]) # CMD=3 UDP ASSOCIATE
275 request += domain + struct.pack("!H", 80)
276
277 call_count = [0]
278 async def read_side_effect(n):
279 call_count[0] += 1
280 if call_count[0] == 1:
281 return bytes([SOCKS_VERSION]) # first byte
282 elif call_count[0] == 2:
283 return greeting_rest
284 elif call_count[0] == 3:
285 return request
286 return b""
287
288 reader = AsyncMock()
289 reader.read = AsyncMock(side_effect=read_side_effect)
290 writer = _make_mock_writer()
291 await task.handle_client(reader, writer)
292 # Should reply with 0x07 (command not supported)
293 calls = writer.write.call_args_list
294 assert any(b"\x07" in c[0][0] for c in calls if len(c[0][0]) > 1)
295
296 @pytest.mark.asyncio
297 async def test_socks5_non_connect_rejected(self):
298 task = _make_socks_task()
299 greeting_rest = bytes([1, AUTH_NONE])
300 domain = b"example.i2p"
301 request = bytes([SOCKS_VERSION, 0x02, 0x00, ATYP_DOMAIN, len(domain)]) # CMD=2 BIND
302 request += domain + struct.pack("!H", 80)
303
304 call_count = [0]
305 async def read_side_effect(n):
306 call_count[0] += 1
307 if call_count[0] == 1:
308 return bytes([SOCKS_VERSION])
309 elif call_count[0] == 2:
310 return greeting_rest
311 elif call_count[0] == 3:
312 return request
313 return b""
314
315 reader = AsyncMock()
316 reader.read = AsyncMock(side_effect=read_side_effect)
317 writer = _make_mock_writer()
318 await task.handle_client(reader, writer)
319
320 @pytest.mark.asyncio
321 async def test_socks5_non_i2p_rejected(self):
322 task = _make_socks_task()
323 greeting_rest = bytes([1, AUTH_NONE])
324 domain = b"example.com"
325 request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)])
326 request += domain + struct.pack("!H", 80)
327
328 call_count = [0]
329 async def read_side_effect(n):
330 call_count[0] += 1
331 if call_count[0] == 1:
332 return bytes([SOCKS_VERSION])
333 elif call_count[0] == 2:
334 return greeting_rest
335 elif call_count[0] == 3:
336 return request
337 return b""
338
339 reader = AsyncMock()
340 reader.read = AsyncMock(side_effect=read_side_effect)
341 writer = _make_mock_writer()
342 await task.handle_client(reader, writer)
343 # Should reply HOST_UNREACHABLE for non-i2p
344 calls = writer.write.call_args_list
345 replies = [c[0][0] for c in calls if len(c[0][0]) >= 2]
346 assert any(r[1] == REPLY_HOST_UNREACHABLE for r in replies if r[0] == SOCKS_VERSION)
347
348 @pytest.mark.asyncio
349 async def test_socks5_i2p_resolve_fail(self):
350 task = _make_socks_task()
351 task._session.lookup = AsyncMock(return_value=None)
352 greeting_rest = bytes([1, AUTH_NONE])
353 domain = b"unknown.i2p"
354 request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)])
355 request += domain + struct.pack("!H", 80)
356
357 call_count = [0]
358 async def read_side_effect(n):
359 call_count[0] += 1
360 if call_count[0] == 1:
361 return bytes([SOCKS_VERSION])
362 elif call_count[0] == 2:
363 return greeting_rest
364 elif call_count[0] == 3:
365 return request
366 return b""
367
368 reader = AsyncMock()
369 reader.read = AsyncMock(side_effect=read_side_effect)
370 writer = _make_mock_writer()
371 await task.handle_client(reader, writer)
372
373 @pytest.mark.asyncio
374 async def test_socks4_non_connect_rejected(self):
375 task = _make_socks_task()
376 # SOCKS4 with command 2 (BIND) instead of 1 (CONNECT)
377 data = bytes([SOCKS4_VERSION, 2]) # command=2
378 data += struct.pack("!H", 80)
379 data += b"\x00\x00\x00\x01" # SOCKS4a indicator
380 data += b"\x00" # empty userid
381 data += b"test.i2p\x00"
382
383 call_count = [0]
384 async def read_side_effect(n):
385 call_count[0] += 1
386 if call_count[0] == 1:
387 return bytes([SOCKS4_VERSION])
388 elif call_count[0] == 2:
389 return data[1:]
390 return b""
391
392 reader = AsyncMock()
393 reader.read = AsyncMock(side_effect=read_side_effect)
394 writer = _make_mock_writer()
395 await task.handle_client(reader, writer)
396 # Should reply with SOCKS4 failure
397 calls = writer.write.call_args_list
398 assert any(SOCKS4_REPLY_FAILED in c[0][0] for c in calls)
399
400 @pytest.mark.asyncio
401 async def test_socks4_non_i2p_rejected(self):
402 task = _make_socks_task()
403 data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT])
404 data += struct.pack("!H", 80)
405 data += b"\x00\x00\x00\x01" # SOCKS4a indicator
406 data += b"\x00"
407 data += b"example.com\x00"
408
409 call_count = [0]
410 async def read_side_effect(n):
411 call_count[0] += 1
412 if call_count[0] == 1:
413 return bytes([SOCKS4_VERSION])
414 elif call_count[0] == 2:
415 return data[1:]
416 return b""
417
418 reader = AsyncMock()
419 reader.read = AsyncMock(side_effect=read_side_effect)
420 writer = _make_mock_writer()
421 await task.handle_client(reader, writer)
422
423 @pytest.mark.asyncio
424 async def test_socks4_i2p_resolve_fail(self):
425 task = _make_socks_task()
426 task._session.lookup = AsyncMock(return_value=None)
427 data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT])
428 data += struct.pack("!H", 80)
429 data += b"\x00\x00\x00\x01"
430 data += b"\x00"
431 data += b"unknown.i2p\x00"
432
433 call_count = [0]
434 async def read_side_effect(n):
435 call_count[0] += 1
436 if call_count[0] == 1:
437 return bytes([SOCKS4_VERSION])
438 elif call_count[0] == 2:
439 return data[1:]
440 return b""
441
442 reader = AsyncMock()
443 reader.read = AsyncMock(side_effect=read_side_effect)
444 writer = _make_mock_writer()
445 await task.handle_client(reader, writer)
446
447 @pytest.mark.asyncio
448 async def test_socks4_connect_exception(self):
449 task = _make_socks_task()
450 task._session.lookup = AsyncMock(return_value=None)
451 task._session.connect = AsyncMock(side_effect=ConnectionRefusedError("test"))
452 # .b32.i2p bypasses lookup
453 data = bytes([SOCKS4_VERSION, SOCKS4_CMD_CONNECT])
454 data += struct.pack("!H", 80)
455 data += b"\x00\x00\x00\x01"
456 data += b"\x00"
457 data += b"abcd.b32.i2p\x00"
458
459 call_count = [0]
460 async def read_side_effect(n):
461 call_count[0] += 1
462 if call_count[0] == 1:
463 return bytes([SOCKS4_VERSION])
464 elif call_count[0] == 2:
465 return data[1:]
466 return b""
467
468 reader = AsyncMock()
469 reader.read = AsyncMock(side_effect=read_side_effect)
470 writer = _make_mock_writer()
471 await task.handle_client(reader, writer)
472
473 @pytest.mark.asyncio
474 async def test_socks5_connect_exception(self):
475 task = _make_socks_task()
476 task._session.connect = AsyncMock(side_effect=ConnectionRefusedError("test"))
477 greeting_rest = bytes([1, AUTH_NONE])
478 domain = b"abcd.b32.i2p"
479 request = bytes([SOCKS_VERSION, CMD_CONNECT, 0x00, ATYP_DOMAIN, len(domain)])
480 request += domain + struct.pack("!H", 80)
481
482 call_count = [0]
483 async def read_side_effect(n):
484 call_count[0] += 1
485 if call_count[0] == 1:
486 return bytes([SOCKS_VERSION])
487 elif call_count[0] == 2:
488 return greeting_rest
489 elif call_count[0] == 3:
490 return request
491 return b""
492
493 reader = AsyncMock()
494 reader.read = AsyncMock(side_effect=read_side_effect)
495 writer = _make_mock_writer()
496 await task.handle_client(reader, writer)