A Python port of the Invisible Internet Project (I2P)
1"""Tests for HTTPClientTask — full HTTP proxy with SAM integration."""
2
3import asyncio
4from unittest.mock import AsyncMock, MagicMock, patch
5
6import pytest
7
8from i2p_apps.i2ptunnel.http_client import HTTPClientTask, _STRIPPED_OUTBOUND, _I2P_USER_AGENT
9
10
11def _make_config(**overrides):
12 config = MagicMock()
13 config.proxy_list = overrides.get("proxy_list", [])
14 config.name = "test-http"
15 config.interface = "127.0.0.1"
16 config.listen_port = 0
17 config.type = MagicMock()
18 return config
19
20
21def _make_task(**overrides):
22 config = _make_config(**overrides)
23 session = AsyncMock()
24 return HTTPClientTask(config, session)
25
26
27def _mock_writer():
28 writer = MagicMock()
29 writer.write = MagicMock()
30 writer.drain = AsyncMock()
31 writer.close = MagicMock()
32 writer.wait_closed = AsyncMock()
33 return writer
34
35
36class TestHeaderFiltering:
37 def test_strips_referer(self):
38 task = _make_task()
39 h = {"Referer": "http://evil.com", "Host": "example.i2p"}
40 filtered = task._filter_outbound_headers(h, is_i2p=True)
41 assert "Referer" not in filtered
42 assert "Host" in filtered
43
44 def test_strips_via(self):
45 task = _make_task()
46 h = {"Via": "1.1 proxy", "Accept": "*/*"}
47 filtered = task._filter_outbound_headers(h, is_i2p=True)
48 assert "Via" not in filtered
49 assert "Accept" in filtered
50
51 def test_strips_x_forwarded_for(self):
52 task = _make_task()
53 h = {"X-Forwarded-For": "1.2.3.4"}
54 filtered = task._filter_outbound_headers(h, is_i2p=True)
55 assert "X-Forwarded-For" not in filtered
56
57 def test_replaces_user_agent_for_i2p(self):
58 task = _make_task()
59 h = {"User-Agent": "Mozilla/5.0"}
60 filtered = task._filter_outbound_headers(h, is_i2p=True)
61 assert filtered["User-Agent"] == _I2P_USER_AGENT
62
63 def test_keeps_user_agent_for_clearnet(self):
64 task = _make_task()
65 h = {"User-Agent": "Mozilla/5.0"}
66 filtered = task._filter_outbound_headers(h, is_i2p=False)
67 assert filtered["User-Agent"] == "Mozilla/5.0"
68
69 def test_empty_headers(self):
70 task = _make_task()
71 assert task._filter_outbound_headers({}, is_i2p=True) == {}
72
73
74class TestAddressHelper:
75 def test_extract_helper(self):
76 task = _make_task()
77 host, helper = task._extract_address_helper(
78 "http://example.i2p/?i2paddresshelper=AAAA"
79 )
80 assert host == "example.i2p"
81 assert helper == "AAAA"
82
83 def test_no_helper(self):
84 task = _make_task()
85 host, helper = task._extract_address_helper("http://example.i2p/path")
86 assert host == "example.i2p"
87 assert helper is None
88
89 def test_cache_and_retrieve(self):
90 task = _make_task()
91 task._cache_address_helper("example.i2p", "dest-base64")
92 assert task._get_cached_helper("example.i2p") == "dest-base64"
93
94 def test_cache_miss(self):
95 task = _make_task()
96 assert task._get_cached_helper("unknown.i2p") is None
97
98
99class TestOutproxy:
100 def test_has_outproxy(self):
101 task = _make_task(proxy_list=["outproxy.i2p"])
102 assert task._has_outproxy() is True
103
104 def test_no_outproxy(self):
105 task = _make_task()
106 assert task._has_outproxy() is False
107
108 def test_pick_outproxy(self):
109 task = _make_task(proxy_list=["a.i2p", "b.i2p"])
110 result = task._pick_outproxy()
111 assert result in ("a.i2p", "b.i2p")
112
113 def test_pick_avoids_failed(self):
114 task = _make_task(proxy_list=["a.i2p", "b.i2p"])
115 task._failed_outproxies.add("a.i2p")
116 # With only b.i2p available, should always pick it
117 for _ in range(10):
118 assert task._pick_outproxy() == "b.i2p"
119
120 def test_pick_resets_when_all_failed(self):
121 task = _make_task(proxy_list=["a.i2p"])
122 task._failed_outproxies.add("a.i2p")
123 result = task._pick_outproxy()
124 assert result == "a.i2p"
125 assert len(task._failed_outproxies) == 0
126
127 def test_pick_empty_list(self):
128 task = _make_task()
129 assert task._pick_outproxy() == ""
130
131
132class TestClassification:
133 def test_is_i2p(self):
134 assert HTTPClientTask._is_i2p_request("example.i2p") is True
135 assert HTTPClientTask._is_i2p_request("abc.b32.i2p") is True
136
137 def test_not_i2p(self):
138 assert HTTPClientTask._is_i2p_request("example.com") is False
139
140 def test_is_localhost(self):
141 assert HTTPClientTask._is_localhost("127.0.0.1") is True
142 assert HTTPClientTask._is_localhost("localhost") is True
143 assert HTTPClientTask._is_localhost("::1") is True
144 assert HTTPClientTask._is_localhost("0.0.0.0") is True
145
146 def test_not_localhost(self):
147 assert HTTPClientTask._is_localhost("example.com") is False
148
149
150class TestErrorPage:
151 def test_dnf(self):
152 task = _make_task()
153 page = task._error_page("dnf", 503)
154 text = page.decode()
155 assert "503" in text
156 assert "Destination Not Found" in text
157 assert "Content-Type: text/html" in text
158
159 def test_timeout(self):
160 task = _make_task()
161 page = task._error_page("timeout", 504)
162 assert b"504" in page
163
164 def test_unknown_type(self):
165 task = _make_task()
166 page = task._error_page("???", 500)
167 assert b"Error" in page
168
169
170class TestReadHeaders:
171 @pytest.mark.asyncio
172 async def test_reads_headers(self):
173 reader = AsyncMock()
174 reader.readline = AsyncMock(side_effect=[
175 b"Host: example.i2p\r\n",
176 b"Accept: text/html\r\n",
177 b"\r\n",
178 ])
179 headers = await HTTPClientTask._read_headers(reader)
180 assert headers["Host"] == "example.i2p"
181 assert headers["Accept"] == "text/html"
182
183 @pytest.mark.asyncio
184 async def test_empty_headers(self):
185 reader = AsyncMock()
186 reader.readline = AsyncMock(side_effect=[b"\r\n"])
187 headers = await HTTPClientTask._read_headers(reader)
188 assert headers == {}
189
190 @pytest.mark.asyncio
191 async def test_eof_headers(self):
192 reader = AsyncMock()
193 reader.readline = AsyncMock(side_effect=[b""])
194 headers = await HTTPClientTask._read_headers(reader)
195 assert headers == {}
196
197
198class TestReadBody:
199 @pytest.mark.asyncio
200 async def test_with_content_length(self):
201 reader = AsyncMock()
202 reader.read = AsyncMock(return_value=b"hello")
203 body = await HTTPClientTask._read_body(reader, {"Content-Length": "5"})
204 assert body == b"hello"
205 reader.read.assert_called_once_with(5)
206
207 @pytest.mark.asyncio
208 async def test_no_content_length(self):
209 reader = AsyncMock()
210 body = await HTTPClientTask._read_body(reader, {})
211 assert body == b""
212
213 @pytest.mark.asyncio
214 async def test_invalid_content_length(self):
215 reader = AsyncMock()
216 body = await HTTPClientTask._read_body(reader, {"Content-Length": "abc"})
217 assert body == b""
218
219
220class TestResolve:
221 @pytest.mark.asyncio
222 async def test_b32_passthrough(self):
223 task = _make_task()
224 result = await task._resolve("abc.b32.i2p")
225 assert result == "abc.b32.i2p"
226
227 @pytest.mark.asyncio
228 async def test_cached_helper(self):
229 task = _make_task()
230 task._cache_address_helper("example.i2p", "cached-dest")
231 result = await task._resolve("example.i2p")
232 assert result == "cached-dest"
233
234 @pytest.mark.asyncio
235 async def test_lookup(self):
236 task = _make_task()
237 task._session.lookup = AsyncMock(return_value="looked-up-dest")
238 result = await task._resolve("example.i2p")
239 assert result == "looked-up-dest"
240
241
242class TestRebuildRequest:
243 def test_basic(self):
244 result = HTTPClientTask._rebuild_request("GET", "/path", {"Host": "example.i2p"}, b"")
245 text = result.decode()
246 assert text.startswith("GET /path HTTP/1.1\r\n")
247 assert "Host: example.i2p" in text
248
249 def test_with_body(self):
250 result = HTTPClientTask._rebuild_request("POST", "/", {}, b"data")
251 assert result.endswith(b"data")
252
253
254class TestHandleClient:
255 @pytest.mark.asyncio
256 async def test_empty_read(self):
257 task = _make_task()
258 reader = AsyncMock()
259 reader.readline = AsyncMock(return_value=b"")
260 writer = _mock_writer()
261 await task.handle_client(reader, writer)
262 writer.write.assert_not_called()
263
264 @pytest.mark.asyncio
265 async def test_bad_request(self):
266 task = _make_task()
267 reader = AsyncMock()
268 reader.readline = AsyncMock(side_effect=[
269 b"INVALID\r\n",
270 ])
271 writer = _mock_writer()
272 await task.handle_client(reader, writer)
273 written = writer.write.call_args[0][0]
274 assert b"400" in written
275
276 @pytest.mark.asyncio
277 async def test_localhost_rejected(self):
278 task = _make_task()
279 reader = AsyncMock()
280 reader.readline = AsyncMock(side_effect=[
281 b"GET http://127.0.0.1/ HTTP/1.1\r\n",
282 b"\r\n",
283 ])
284 writer = _mock_writer()
285 await task.handle_client(reader, writer)
286 written = writer.write.call_args[0][0]
287 assert b"403" in written
288
289 @pytest.mark.asyncio
290 async def test_no_outproxy(self):
291 task = _make_task()
292 reader = AsyncMock()
293 reader.readline = AsyncMock(side_effect=[
294 b"GET http://example.com/ HTTP/1.1\r\n",
295 b"\r\n",
296 ])
297 writer = _mock_writer()
298 await task.handle_client(reader, writer)
299 written = writer.write.call_args[0][0]
300 assert b"503" in written
301
302 @pytest.mark.asyncio
303 async def test_i2p_dnf(self):
304 task = _make_task()
305 task._session.lookup = AsyncMock(return_value=None)
306 reader = AsyncMock()
307 reader.readline = AsyncMock(side_effect=[
308 b"GET http://unknown.i2p/ HTTP/1.1\r\n",
309 b"\r\n",
310 ])
311 writer = _mock_writer()
312 await task.handle_client(reader, writer)
313 written = writer.write.call_args[0][0]
314 assert b"503" in written
315
316 @pytest.mark.asyncio
317 async def test_i2p_connect_fail(self):
318 task = _make_task()
319 task._session.connect = AsyncMock(side_effect=ConnectionError("fail"))
320 reader = AsyncMock()
321 reader.readline = AsyncMock(side_effect=[
322 b"GET http://abc.b32.i2p/path HTTP/1.1\r\n",
323 b"\r\n",
324 ])
325 writer = _mock_writer()
326 await task.handle_client(reader, writer)
327 written = writer.write.call_args[0][0]
328 assert b"504" in written
329
330 @pytest.mark.asyncio
331 async def test_i2p_success(self):
332 task = _make_task()
333 remote_reader = AsyncMock()
334 remote_writer = MagicMock()
335 remote_writer.close = MagicMock()
336 remote_writer.wait_closed = AsyncMock()
337 task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer))
338 reader = AsyncMock()
339 reader.readline = AsyncMock(side_effect=[
340 b"GET http://abc.b32.i2p/path HTTP/1.1\r\n",
341 b"Host: abc.b32.i2p\r\n",
342 b"\r\n",
343 ])
344 reader.read = AsyncMock(return_value=b"")
345 writer = _mock_writer()
346
347 with patch("i2p_apps.i2ptunnel.http_client.bridge_with_initial_data", new_callable=AsyncMock) as mock_bridge:
348 await task.handle_client(reader, writer)
349 mock_bridge.assert_called_once()
350
351 @pytest.mark.asyncio
352 async def test_connect_ssl_non_i2p(self):
353 task = _make_task()
354 reader = AsyncMock()
355 reader.readline = AsyncMock(side_effect=[
356 b"CONNECT example.com:443 HTTP/1.1\r\n",
357 b"\r\n", # consumed by handle_client's _read_headers
358 b"\r\n", # consumed by _handle_connect's _read_headers
359 ])
360 writer = _mock_writer()
361 await task.handle_client(reader, writer)
362 written = writer.write.call_args[0][0]
363 assert b"403" in written
364
365 @pytest.mark.asyncio
366 async def test_connect_i2p_success(self):
367 task = _make_task()
368 remote_reader = AsyncMock()
369 remote_writer = MagicMock()
370 remote_writer.close = MagicMock()
371 task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer))
372 reader = AsyncMock()
373 reader.readline = AsyncMock(side_effect=[
374 b"CONNECT abc.b32.i2p:443 HTTP/1.1\r\n",
375 b"\r\n",
376 b"\r\n",
377 ])
378 writer = _mock_writer()
379
380 with patch("i2p_apps.i2ptunnel.http_client.bridge", new_callable=AsyncMock) as mock_bridge:
381 await task.handle_client(reader, writer)
382 written = writer.write.call_args[0][0]
383 assert b"200" in written
384 mock_bridge.assert_called_once()
385
386 @pytest.mark.asyncio
387 async def test_connect_i2p_dnf(self):
388 task = _make_task()
389 task._session.lookup = AsyncMock(return_value=None)
390 reader = AsyncMock()
391 reader.readline = AsyncMock(side_effect=[
392 b"CONNECT unknown.i2p:443 HTTP/1.1\r\n",
393 b"\r\n",
394 b"\r\n",
395 ])
396 writer = _mock_writer()
397 await task.handle_client(reader, writer)
398 written = writer.write.call_args[0][0]
399 assert b"503" in written
400
401 @pytest.mark.asyncio
402 async def test_connect_i2p_timeout(self):
403 task = _make_task()
404 task._session.connect = AsyncMock(side_effect=ConnectionError("timeout"))
405 reader = AsyncMock()
406 reader.readline = AsyncMock(side_effect=[
407 b"CONNECT abc.b32.i2p:443 HTTP/1.1\r\n",
408 b"\r\n",
409 b"\r\n",
410 ])
411 writer = _mock_writer()
412 await task.handle_client(reader, writer)
413 written = writer.write.call_args[0][0]
414 assert b"504" in written
415
416 @pytest.mark.asyncio
417 async def test_outproxy_success(self):
418 task = _make_task(proxy_list=["outproxy.i2p"])
419 task._session.lookup = AsyncMock(return_value="outproxy-dest")
420 remote_reader = AsyncMock()
421 remote_writer = MagicMock()
422 remote_writer.close = MagicMock()
423 task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer))
424 reader = AsyncMock()
425 reader.readline = AsyncMock(side_effect=[
426 b"GET http://example.com/path HTTP/1.1\r\n",
427 b"\r\n",
428 ])
429 reader.read = AsyncMock(return_value=b"")
430 writer = _mock_writer()
431
432 with patch("i2p_apps.i2ptunnel.http_client.bridge_with_initial_data", new_callable=AsyncMock) as mock_bridge:
433 await task.handle_client(reader, writer)
434 mock_bridge.assert_called_once()
435
436 @pytest.mark.asyncio
437 async def test_outproxy_dnf(self):
438 task = _make_task(proxy_list=["outproxy.i2p"])
439 task._session.lookup = AsyncMock(return_value=None)
440 reader = AsyncMock()
441 reader.readline = AsyncMock(side_effect=[
442 b"GET http://example.com/ HTTP/1.1\r\n",
443 b"\r\n",
444 ])
445 reader.read = AsyncMock(return_value=b"")
446 writer = _mock_writer()
447 await task.handle_client(reader, writer)
448 written = writer.write.call_args[0][0]
449 assert b"503" in written
450
451 @pytest.mark.asyncio
452 async def test_outproxy_connect_fail(self):
453 task = _make_task(proxy_list=["outproxy.i2p"])
454 task._session.lookup = AsyncMock(return_value="outproxy-dest")
455 task._session.connect = AsyncMock(side_effect=ConnectionError("fail"))
456 reader = AsyncMock()
457 reader.readline = AsyncMock(side_effect=[
458 b"GET http://example.com/ HTTP/1.1\r\n",
459 b"\r\n",
460 ])
461 reader.read = AsyncMock(return_value=b"")
462 writer = _mock_writer()
463 await task.handle_client(reader, writer)
464 written = writer.write.call_args[0][0]
465 assert b"504" in written
466
467 @pytest.mark.asyncio
468 async def test_exception_in_handler(self):
469 task = _make_task()
470 reader = AsyncMock()
471 reader.readline = AsyncMock(side_effect=RuntimeError("boom"))
472 writer = _mock_writer()
473 # Should not raise
474 await task.handle_client(reader, writer)