"""Tests for HTTPClientTask — full HTTP proxy with SAM integration.""" import asyncio from unittest.mock import AsyncMock, MagicMock, patch import pytest from i2p_apps.i2ptunnel.http_client import HTTPClientTask, _STRIPPED_OUTBOUND, _I2P_USER_AGENT def _make_config(**overrides): config = MagicMock() config.proxy_list = overrides.get("proxy_list", []) config.name = "test-http" config.interface = "127.0.0.1" config.listen_port = 0 config.type = MagicMock() return config def _make_task(**overrides): config = _make_config(**overrides) session = AsyncMock() return HTTPClientTask(config, session) def _mock_writer(): writer = MagicMock() writer.write = MagicMock() writer.drain = AsyncMock() writer.close = MagicMock() writer.wait_closed = AsyncMock() return writer class TestHeaderFiltering: def test_strips_referer(self): task = _make_task() h = {"Referer": "http://evil.com", "Host": "example.i2p"} filtered = task._filter_outbound_headers(h, is_i2p=True) assert "Referer" not in filtered assert "Host" in filtered def test_strips_via(self): task = _make_task() h = {"Via": "1.1 proxy", "Accept": "*/*"} filtered = task._filter_outbound_headers(h, is_i2p=True) assert "Via" not in filtered assert "Accept" in filtered def test_strips_x_forwarded_for(self): task = _make_task() h = {"X-Forwarded-For": "1.2.3.4"} filtered = task._filter_outbound_headers(h, is_i2p=True) assert "X-Forwarded-For" not in filtered def test_replaces_user_agent_for_i2p(self): task = _make_task() h = {"User-Agent": "Mozilla/5.0"} filtered = task._filter_outbound_headers(h, is_i2p=True) assert filtered["User-Agent"] == _I2P_USER_AGENT def test_keeps_user_agent_for_clearnet(self): task = _make_task() h = {"User-Agent": "Mozilla/5.0"} filtered = task._filter_outbound_headers(h, is_i2p=False) assert filtered["User-Agent"] == "Mozilla/5.0" def test_empty_headers(self): task = _make_task() assert task._filter_outbound_headers({}, is_i2p=True) == {} class TestAddressHelper: def test_extract_helper(self): task = _make_task() host, helper = task._extract_address_helper( "http://example.i2p/?i2paddresshelper=AAAA" ) assert host == "example.i2p" assert helper == "AAAA" def test_no_helper(self): task = _make_task() host, helper = task._extract_address_helper("http://example.i2p/path") assert host == "example.i2p" assert helper is None def test_cache_and_retrieve(self): task = _make_task() task._cache_address_helper("example.i2p", "dest-base64") assert task._get_cached_helper("example.i2p") == "dest-base64" def test_cache_miss(self): task = _make_task() assert task._get_cached_helper("unknown.i2p") is None class TestOutproxy: def test_has_outproxy(self): task = _make_task(proxy_list=["outproxy.i2p"]) assert task._has_outproxy() is True def test_no_outproxy(self): task = _make_task() assert task._has_outproxy() is False def test_pick_outproxy(self): task = _make_task(proxy_list=["a.i2p", "b.i2p"]) result = task._pick_outproxy() assert result in ("a.i2p", "b.i2p") def test_pick_avoids_failed(self): task = _make_task(proxy_list=["a.i2p", "b.i2p"]) task._failed_outproxies.add("a.i2p") # With only b.i2p available, should always pick it for _ in range(10): assert task._pick_outproxy() == "b.i2p" def test_pick_resets_when_all_failed(self): task = _make_task(proxy_list=["a.i2p"]) task._failed_outproxies.add("a.i2p") result = task._pick_outproxy() assert result == "a.i2p" assert len(task._failed_outproxies) == 0 def test_pick_empty_list(self): task = _make_task() assert task._pick_outproxy() == "" class TestClassification: def test_is_i2p(self): assert HTTPClientTask._is_i2p_request("example.i2p") is True assert HTTPClientTask._is_i2p_request("abc.b32.i2p") is True def test_not_i2p(self): assert HTTPClientTask._is_i2p_request("example.com") is False def test_is_localhost(self): assert HTTPClientTask._is_localhost("127.0.0.1") is True assert HTTPClientTask._is_localhost("localhost") is True assert HTTPClientTask._is_localhost("::1") is True assert HTTPClientTask._is_localhost("0.0.0.0") is True def test_not_localhost(self): assert HTTPClientTask._is_localhost("example.com") is False class TestErrorPage: def test_dnf(self): task = _make_task() page = task._error_page("dnf", 503) text = page.decode() assert "503" in text assert "Destination Not Found" in text assert "Content-Type: text/html" in text def test_timeout(self): task = _make_task() page = task._error_page("timeout", 504) assert b"504" in page def test_unknown_type(self): task = _make_task() page = task._error_page("???", 500) assert b"Error" in page class TestReadHeaders: @pytest.mark.asyncio async def test_reads_headers(self): reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"Host: example.i2p\r\n", b"Accept: text/html\r\n", b"\r\n", ]) headers = await HTTPClientTask._read_headers(reader) assert headers["Host"] == "example.i2p" assert headers["Accept"] == "text/html" @pytest.mark.asyncio async def test_empty_headers(self): reader = AsyncMock() reader.readline = AsyncMock(side_effect=[b"\r\n"]) headers = await HTTPClientTask._read_headers(reader) assert headers == {} @pytest.mark.asyncio async def test_eof_headers(self): reader = AsyncMock() reader.readline = AsyncMock(side_effect=[b""]) headers = await HTTPClientTask._read_headers(reader) assert headers == {} class TestReadBody: @pytest.mark.asyncio async def test_with_content_length(self): reader = AsyncMock() reader.read = AsyncMock(return_value=b"hello") body = await HTTPClientTask._read_body(reader, {"Content-Length": "5"}) assert body == b"hello" reader.read.assert_called_once_with(5) @pytest.mark.asyncio async def test_no_content_length(self): reader = AsyncMock() body = await HTTPClientTask._read_body(reader, {}) assert body == b"" @pytest.mark.asyncio async def test_invalid_content_length(self): reader = AsyncMock() body = await HTTPClientTask._read_body(reader, {"Content-Length": "abc"}) assert body == b"" class TestResolve: @pytest.mark.asyncio async def test_b32_passthrough(self): task = _make_task() result = await task._resolve("abc.b32.i2p") assert result == "abc.b32.i2p" @pytest.mark.asyncio async def test_cached_helper(self): task = _make_task() task._cache_address_helper("example.i2p", "cached-dest") result = await task._resolve("example.i2p") assert result == "cached-dest" @pytest.mark.asyncio async def test_lookup(self): task = _make_task() task._session.lookup = AsyncMock(return_value="looked-up-dest") result = await task._resolve("example.i2p") assert result == "looked-up-dest" class TestRebuildRequest: def test_basic(self): result = HTTPClientTask._rebuild_request("GET", "/path", {"Host": "example.i2p"}, b"") text = result.decode() assert text.startswith("GET /path HTTP/1.1\r\n") assert "Host: example.i2p" in text def test_with_body(self): result = HTTPClientTask._rebuild_request("POST", "/", {}, b"data") assert result.endswith(b"data") class TestHandleClient: @pytest.mark.asyncio async def test_empty_read(self): task = _make_task() reader = AsyncMock() reader.readline = AsyncMock(return_value=b"") writer = _mock_writer() await task.handle_client(reader, writer) writer.write.assert_not_called() @pytest.mark.asyncio async def test_bad_request(self): task = _make_task() reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"INVALID\r\n", ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"400" in written @pytest.mark.asyncio async def test_localhost_rejected(self): task = _make_task() reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://127.0.0.1/ HTTP/1.1\r\n", b"\r\n", ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"403" in written @pytest.mark.asyncio async def test_no_outproxy(self): task = _make_task() reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://example.com/ HTTP/1.1\r\n", b"\r\n", ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"503" in written @pytest.mark.asyncio async def test_i2p_dnf(self): task = _make_task() task._session.lookup = AsyncMock(return_value=None) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://unknown.i2p/ HTTP/1.1\r\n", b"\r\n", ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"503" in written @pytest.mark.asyncio async def test_i2p_connect_fail(self): task = _make_task() task._session.connect = AsyncMock(side_effect=ConnectionError("fail")) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://abc.b32.i2p/path HTTP/1.1\r\n", b"\r\n", ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"504" in written @pytest.mark.asyncio async def test_i2p_success(self): task = _make_task() remote_reader = AsyncMock() remote_writer = MagicMock() remote_writer.close = MagicMock() remote_writer.wait_closed = AsyncMock() task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer)) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://abc.b32.i2p/path HTTP/1.1\r\n", b"Host: abc.b32.i2p\r\n", b"\r\n", ]) reader.read = AsyncMock(return_value=b"") writer = _mock_writer() with patch("i2p_apps.i2ptunnel.http_client.bridge_with_initial_data", new_callable=AsyncMock) as mock_bridge: await task.handle_client(reader, writer) mock_bridge.assert_called_once() @pytest.mark.asyncio async def test_connect_ssl_non_i2p(self): task = _make_task() reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"CONNECT example.com:443 HTTP/1.1\r\n", b"\r\n", # consumed by handle_client's _read_headers b"\r\n", # consumed by _handle_connect's _read_headers ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"403" in written @pytest.mark.asyncio async def test_connect_i2p_success(self): task = _make_task() remote_reader = AsyncMock() remote_writer = MagicMock() remote_writer.close = MagicMock() task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer)) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"CONNECT abc.b32.i2p:443 HTTP/1.1\r\n", b"\r\n", b"\r\n", ]) writer = _mock_writer() with patch("i2p_apps.i2ptunnel.http_client.bridge", new_callable=AsyncMock) as mock_bridge: await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"200" in written mock_bridge.assert_called_once() @pytest.mark.asyncio async def test_connect_i2p_dnf(self): task = _make_task() task._session.lookup = AsyncMock(return_value=None) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"CONNECT unknown.i2p:443 HTTP/1.1\r\n", b"\r\n", b"\r\n", ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"503" in written @pytest.mark.asyncio async def test_connect_i2p_timeout(self): task = _make_task() task._session.connect = AsyncMock(side_effect=ConnectionError("timeout")) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"CONNECT abc.b32.i2p:443 HTTP/1.1\r\n", b"\r\n", b"\r\n", ]) writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"504" in written @pytest.mark.asyncio async def test_outproxy_success(self): task = _make_task(proxy_list=["outproxy.i2p"]) task._session.lookup = AsyncMock(return_value="outproxy-dest") remote_reader = AsyncMock() remote_writer = MagicMock() remote_writer.close = MagicMock() task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer)) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://example.com/path HTTP/1.1\r\n", b"\r\n", ]) reader.read = AsyncMock(return_value=b"") writer = _mock_writer() with patch("i2p_apps.i2ptunnel.http_client.bridge_with_initial_data", new_callable=AsyncMock) as mock_bridge: await task.handle_client(reader, writer) mock_bridge.assert_called_once() @pytest.mark.asyncio async def test_outproxy_dnf(self): task = _make_task(proxy_list=["outproxy.i2p"]) task._session.lookup = AsyncMock(return_value=None) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://example.com/ HTTP/1.1\r\n", b"\r\n", ]) reader.read = AsyncMock(return_value=b"") writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"503" in written @pytest.mark.asyncio async def test_outproxy_connect_fail(self): task = _make_task(proxy_list=["outproxy.i2p"]) task._session.lookup = AsyncMock(return_value="outproxy-dest") task._session.connect = AsyncMock(side_effect=ConnectionError("fail")) reader = AsyncMock() reader.readline = AsyncMock(side_effect=[ b"GET http://example.com/ HTTP/1.1\r\n", b"\r\n", ]) reader.read = AsyncMock(return_value=b"") writer = _mock_writer() await task.handle_client(reader, writer) written = writer.write.call_args[0][0] assert b"504" in written @pytest.mark.asyncio async def test_exception_in_handler(self): task = _make_task() reader = AsyncMock() reader.readline = AsyncMock(side_effect=RuntimeError("boom")) writer = _mock_writer() # Should not raise await task.handle_client(reader, writer)