A Python port of the Invisible Internet Project (I2P)
at main 222 lines 7.3 kB view raw
1"""Tests for CONNECT-only proxy tunnel.""" 2 3import asyncio 4from unittest.mock import AsyncMock, MagicMock 5 6import pytest 7 8from i2p_apps.i2ptunnel.connect_client import ConnectClientTask 9from i2p_apps.i2ptunnel.http_proxy import parse_http_request, extract_i2p_destination 10 11 12class TestParseHTTPRequest: 13 """Test the HTTP request parser used by connect_client.""" 14 15 def test_connect_with_port(self): 16 req = parse_http_request("CONNECT example.i2p:443 HTTP/1.1") 17 assert req.is_connect is True 18 assert req.host == "example.i2p" 19 assert req.port == 443 20 21 def test_connect_without_port(self): 22 req = parse_http_request("CONNECT example.i2p HTTP/1.1") 23 assert req.is_connect is True 24 assert req.host == "example.i2p" 25 assert req.port == 443 # default 26 27 def test_get_absolute_url(self): 28 req = parse_http_request("GET http://example.i2p/path HTTP/1.1") 29 assert req.is_connect is False 30 assert req.host == "example.i2p" 31 assert req.port == 80 32 assert req.path == "/path" 33 34 def test_get_absolute_with_port(self): 35 req = parse_http_request("GET http://example.i2p:8080/test HTTP/1.1") 36 assert req.host == "example.i2p" 37 assert req.port == 8080 38 assert req.path == "/test" 39 40 def test_malformed_request(self): 41 with pytest.raises(ValueError, match="Malformed"): 42 parse_http_request("INVALID") 43 44 def test_method_case_insensitive(self): 45 req = parse_http_request("connect example.i2p:443 HTTP/1.1") 46 assert req.is_connect is True 47 48 49class TestExtractI2PDestination: 50 def test_i2p_host(self): 51 assert extract_i2p_destination("example.i2p") == "example.i2p" 52 53 def test_b32_host(self): 54 assert extract_i2p_destination("abcd.b32.i2p") == "abcd.b32.i2p" 55 56 def test_non_i2p(self): 57 assert extract_i2p_destination("example.com") is None 58 59 def test_empty(self): 60 assert extract_i2p_destination("") is None 61 62 63class TestConnectClientProperties: 64 def test_is_connect_only(self): 65 config = MagicMock() 66 config.proxy_list = [] 67 session = MagicMock() 68 task = ConnectClientTask(config, session) 69 assert task._is_connect_only is True 70 71 def test_strip_all_headers(self): 72 headers = {"Host": "example.i2p", "User-Agent": "curl/7"} 73 assert ConnectClientTask._strip_all_headers(headers) == {} 74 75 76class TestConsumeHeaders: 77 @pytest.mark.asyncio 78 async def test_reads_until_empty_line(self): 79 config = MagicMock() 80 config.proxy_list = [] 81 session = MagicMock() 82 task = ConnectClientTask(config, session) 83 84 reader = AsyncMock() 85 reader.readline = AsyncMock(side_effect=[ 86 b"Host: example.i2p\r\n", 87 b"User-Agent: test\r\n", 88 b"\r\n", 89 ]) 90 headers = await task._consume_headers(reader) 91 assert "Host" in headers 92 assert "User-Agent" in headers 93 94 @pytest.mark.asyncio 95 async def test_empty_headers(self): 96 config = MagicMock() 97 config.proxy_list = [] 98 session = MagicMock() 99 task = ConnectClientTask(config, session) 100 101 reader = AsyncMock() 102 reader.readline = AsyncMock(side_effect=[b"\r\n"]) 103 headers = await task._consume_headers(reader) 104 assert headers == {} 105 106 107class TestResolve: 108 @pytest.mark.asyncio 109 async def test_b32_passthrough(self): 110 config = MagicMock() 111 config.proxy_list = [] 112 session = MagicMock() 113 task = ConnectClientTask(config, session) 114 result = await task._resolve("abcdef.b32.i2p") 115 assert result == "abcdef.b32.i2p" 116 117 @pytest.mark.asyncio 118 async def test_i2p_lookup(self): 119 config = MagicMock() 120 config.proxy_list = [] 121 session = AsyncMock() 122 session.lookup = AsyncMock(return_value="resolved-dest") 123 task = ConnectClientTask(config, session) 124 result = await task._resolve("example.i2p") 125 assert result == "resolved-dest" 126 session.lookup.assert_called_once_with("example.i2p") 127 128 @pytest.mark.asyncio 129 async def test_non_i2p_passthrough(self): 130 config = MagicMock() 131 config.proxy_list = [] 132 session = MagicMock() 133 task = ConnectClientTask(config, session) 134 result = await task._resolve("example.com") 135 assert result == "example.com" 136 137 138def _make_task(): 139 config = MagicMock() 140 config.proxy_list = [] 141 session = AsyncMock() 142 return ConnectClientTask(config, session) 143 144 145def _mock_writer(): 146 writer = MagicMock() 147 writer.write = MagicMock() 148 writer.drain = AsyncMock() 149 writer.close = MagicMock() 150 return writer 151 152 153class TestHandleClient: 154 @pytest.mark.asyncio 155 async def test_empty_read(self): 156 task = _make_task() 157 reader = AsyncMock() 158 reader.readline = AsyncMock(return_value=b"") 159 writer = _mock_writer() 160 await task.handle_client(reader, writer) 161 162 @pytest.mark.asyncio 163 async def test_bad_request_line(self): 164 task = _make_task() 165 reader = AsyncMock() 166 reader.readline = AsyncMock(side_effect=[ 167 b"INVALID\r\n", 168 ]) 169 writer = _mock_writer() 170 await task.handle_client(reader, writer) 171 writer.write.assert_called_with(b"HTTP/1.1 400 Bad Request\r\n\r\n") 172 173 @pytest.mark.asyncio 174 async def test_non_connect_method_rejected(self): 175 task = _make_task() 176 reader = AsyncMock() 177 reader.readline = AsyncMock(side_effect=[ 178 b"GET http://example.i2p/ HTTP/1.1\r\n", 179 b"Host: example.i2p\r\n", 180 b"\r\n", 181 ]) 182 writer = _mock_writer() 183 await task.handle_client(reader, writer) 184 writer.write.assert_called_with(b"HTTP/1.1 405 Method Not Allowed\r\n\r\n") 185 186 @pytest.mark.asyncio 187 async def test_connect_non_i2p_no_outproxy(self): 188 task = _make_task() 189 reader = AsyncMock() 190 reader.readline = AsyncMock(side_effect=[ 191 b"CONNECT example.com:443 HTTP/1.1\r\n", 192 b"\r\n", 193 ]) 194 writer = _mock_writer() 195 await task.handle_client(reader, writer) 196 writer.write.assert_called_with(b"HTTP/1.1 503 No Outproxy\r\n\r\n") 197 198 @pytest.mark.asyncio 199 async def test_connect_i2p_resolve_fail(self): 200 task = _make_task() 201 task._session.lookup = AsyncMock(return_value=None) 202 reader = AsyncMock() 203 reader.readline = AsyncMock(side_effect=[ 204 b"CONNECT unknown.i2p:443 HTTP/1.1\r\n", 205 b"\r\n", 206 ]) 207 writer = _mock_writer() 208 await task.handle_client(reader, writer) 209 writer.write.assert_called_with(b"HTTP/1.1 503 Destination Not Found\r\n\r\n") 210 211 @pytest.mark.asyncio 212 async def test_connect_i2p_connect_fail(self): 213 task = _make_task() 214 task._session.connect = AsyncMock(side_effect=ConnectionError("fail")) 215 reader = AsyncMock() 216 reader.readline = AsyncMock(side_effect=[ 217 b"CONNECT abcd.b32.i2p:443 HTTP/1.1\r\n", 218 b"\r\n", 219 ]) 220 writer = _mock_writer() 221 await task.handle_client(reader, writer) 222 writer.write.assert_called_with(b"HTTP/1.1 504 Connection Timeout\r\n\r\n")