A Python port of the Invisible Internet Project (I2P)
at main 474 lines 16 kB view raw
1"""Tests for HTTPClientTask — full HTTP proxy with SAM integration.""" 2 3import asyncio 4from unittest.mock import AsyncMock, MagicMock, patch 5 6import pytest 7 8from i2p_apps.i2ptunnel.http_client import HTTPClientTask, _STRIPPED_OUTBOUND, _I2P_USER_AGENT 9 10 11def _make_config(**overrides): 12 config = MagicMock() 13 config.proxy_list = overrides.get("proxy_list", []) 14 config.name = "test-http" 15 config.interface = "127.0.0.1" 16 config.listen_port = 0 17 config.type = MagicMock() 18 return config 19 20 21def _make_task(**overrides): 22 config = _make_config(**overrides) 23 session = AsyncMock() 24 return HTTPClientTask(config, session) 25 26 27def _mock_writer(): 28 writer = MagicMock() 29 writer.write = MagicMock() 30 writer.drain = AsyncMock() 31 writer.close = MagicMock() 32 writer.wait_closed = AsyncMock() 33 return writer 34 35 36class TestHeaderFiltering: 37 def test_strips_referer(self): 38 task = _make_task() 39 h = {"Referer": "http://evil.com", "Host": "example.i2p"} 40 filtered = task._filter_outbound_headers(h, is_i2p=True) 41 assert "Referer" not in filtered 42 assert "Host" in filtered 43 44 def test_strips_via(self): 45 task = _make_task() 46 h = {"Via": "1.1 proxy", "Accept": "*/*"} 47 filtered = task._filter_outbound_headers(h, is_i2p=True) 48 assert "Via" not in filtered 49 assert "Accept" in filtered 50 51 def test_strips_x_forwarded_for(self): 52 task = _make_task() 53 h = {"X-Forwarded-For": "1.2.3.4"} 54 filtered = task._filter_outbound_headers(h, is_i2p=True) 55 assert "X-Forwarded-For" not in filtered 56 57 def test_replaces_user_agent_for_i2p(self): 58 task = _make_task() 59 h = {"User-Agent": "Mozilla/5.0"} 60 filtered = task._filter_outbound_headers(h, is_i2p=True) 61 assert filtered["User-Agent"] == _I2P_USER_AGENT 62 63 def test_keeps_user_agent_for_clearnet(self): 64 task = _make_task() 65 h = {"User-Agent": "Mozilla/5.0"} 66 filtered = task._filter_outbound_headers(h, is_i2p=False) 67 assert filtered["User-Agent"] == "Mozilla/5.0" 68 69 def test_empty_headers(self): 70 task = _make_task() 71 assert task._filter_outbound_headers({}, is_i2p=True) == {} 72 73 74class TestAddressHelper: 75 def test_extract_helper(self): 76 task = _make_task() 77 host, helper = task._extract_address_helper( 78 "http://example.i2p/?i2paddresshelper=AAAA" 79 ) 80 assert host == "example.i2p" 81 assert helper == "AAAA" 82 83 def test_no_helper(self): 84 task = _make_task() 85 host, helper = task._extract_address_helper("http://example.i2p/path") 86 assert host == "example.i2p" 87 assert helper is None 88 89 def test_cache_and_retrieve(self): 90 task = _make_task() 91 task._cache_address_helper("example.i2p", "dest-base64") 92 assert task._get_cached_helper("example.i2p") == "dest-base64" 93 94 def test_cache_miss(self): 95 task = _make_task() 96 assert task._get_cached_helper("unknown.i2p") is None 97 98 99class TestOutproxy: 100 def test_has_outproxy(self): 101 task = _make_task(proxy_list=["outproxy.i2p"]) 102 assert task._has_outproxy() is True 103 104 def test_no_outproxy(self): 105 task = _make_task() 106 assert task._has_outproxy() is False 107 108 def test_pick_outproxy(self): 109 task = _make_task(proxy_list=["a.i2p", "b.i2p"]) 110 result = task._pick_outproxy() 111 assert result in ("a.i2p", "b.i2p") 112 113 def test_pick_avoids_failed(self): 114 task = _make_task(proxy_list=["a.i2p", "b.i2p"]) 115 task._failed_outproxies.add("a.i2p") 116 # With only b.i2p available, should always pick it 117 for _ in range(10): 118 assert task._pick_outproxy() == "b.i2p" 119 120 def test_pick_resets_when_all_failed(self): 121 task = _make_task(proxy_list=["a.i2p"]) 122 task._failed_outproxies.add("a.i2p") 123 result = task._pick_outproxy() 124 assert result == "a.i2p" 125 assert len(task._failed_outproxies) == 0 126 127 def test_pick_empty_list(self): 128 task = _make_task() 129 assert task._pick_outproxy() == "" 130 131 132class TestClassification: 133 def test_is_i2p(self): 134 assert HTTPClientTask._is_i2p_request("example.i2p") is True 135 assert HTTPClientTask._is_i2p_request("abc.b32.i2p") is True 136 137 def test_not_i2p(self): 138 assert HTTPClientTask._is_i2p_request("example.com") is False 139 140 def test_is_localhost(self): 141 assert HTTPClientTask._is_localhost("127.0.0.1") is True 142 assert HTTPClientTask._is_localhost("localhost") is True 143 assert HTTPClientTask._is_localhost("::1") is True 144 assert HTTPClientTask._is_localhost("0.0.0.0") is True 145 146 def test_not_localhost(self): 147 assert HTTPClientTask._is_localhost("example.com") is False 148 149 150class TestErrorPage: 151 def test_dnf(self): 152 task = _make_task() 153 page = task._error_page("dnf", 503) 154 text = page.decode() 155 assert "503" in text 156 assert "Destination Not Found" in text 157 assert "Content-Type: text/html" in text 158 159 def test_timeout(self): 160 task = _make_task() 161 page = task._error_page("timeout", 504) 162 assert b"504" in page 163 164 def test_unknown_type(self): 165 task = _make_task() 166 page = task._error_page("???", 500) 167 assert b"Error" in page 168 169 170class TestReadHeaders: 171 @pytest.mark.asyncio 172 async def test_reads_headers(self): 173 reader = AsyncMock() 174 reader.readline = AsyncMock(side_effect=[ 175 b"Host: example.i2p\r\n", 176 b"Accept: text/html\r\n", 177 b"\r\n", 178 ]) 179 headers = await HTTPClientTask._read_headers(reader) 180 assert headers["Host"] == "example.i2p" 181 assert headers["Accept"] == "text/html" 182 183 @pytest.mark.asyncio 184 async def test_empty_headers(self): 185 reader = AsyncMock() 186 reader.readline = AsyncMock(side_effect=[b"\r\n"]) 187 headers = await HTTPClientTask._read_headers(reader) 188 assert headers == {} 189 190 @pytest.mark.asyncio 191 async def test_eof_headers(self): 192 reader = AsyncMock() 193 reader.readline = AsyncMock(side_effect=[b""]) 194 headers = await HTTPClientTask._read_headers(reader) 195 assert headers == {} 196 197 198class TestReadBody: 199 @pytest.mark.asyncio 200 async def test_with_content_length(self): 201 reader = AsyncMock() 202 reader.read = AsyncMock(return_value=b"hello") 203 body = await HTTPClientTask._read_body(reader, {"Content-Length": "5"}) 204 assert body == b"hello" 205 reader.read.assert_called_once_with(5) 206 207 @pytest.mark.asyncio 208 async def test_no_content_length(self): 209 reader = AsyncMock() 210 body = await HTTPClientTask._read_body(reader, {}) 211 assert body == b"" 212 213 @pytest.mark.asyncio 214 async def test_invalid_content_length(self): 215 reader = AsyncMock() 216 body = await HTTPClientTask._read_body(reader, {"Content-Length": "abc"}) 217 assert body == b"" 218 219 220class TestResolve: 221 @pytest.mark.asyncio 222 async def test_b32_passthrough(self): 223 task = _make_task() 224 result = await task._resolve("abc.b32.i2p") 225 assert result == "abc.b32.i2p" 226 227 @pytest.mark.asyncio 228 async def test_cached_helper(self): 229 task = _make_task() 230 task._cache_address_helper("example.i2p", "cached-dest") 231 result = await task._resolve("example.i2p") 232 assert result == "cached-dest" 233 234 @pytest.mark.asyncio 235 async def test_lookup(self): 236 task = _make_task() 237 task._session.lookup = AsyncMock(return_value="looked-up-dest") 238 result = await task._resolve("example.i2p") 239 assert result == "looked-up-dest" 240 241 242class TestRebuildRequest: 243 def test_basic(self): 244 result = HTTPClientTask._rebuild_request("GET", "/path", {"Host": "example.i2p"}, b"") 245 text = result.decode() 246 assert text.startswith("GET /path HTTP/1.1\r\n") 247 assert "Host: example.i2p" in text 248 249 def test_with_body(self): 250 result = HTTPClientTask._rebuild_request("POST", "/", {}, b"data") 251 assert result.endswith(b"data") 252 253 254class TestHandleClient: 255 @pytest.mark.asyncio 256 async def test_empty_read(self): 257 task = _make_task() 258 reader = AsyncMock() 259 reader.readline = AsyncMock(return_value=b"") 260 writer = _mock_writer() 261 await task.handle_client(reader, writer) 262 writer.write.assert_not_called() 263 264 @pytest.mark.asyncio 265 async def test_bad_request(self): 266 task = _make_task() 267 reader = AsyncMock() 268 reader.readline = AsyncMock(side_effect=[ 269 b"INVALID\r\n", 270 ]) 271 writer = _mock_writer() 272 await task.handle_client(reader, writer) 273 written = writer.write.call_args[0][0] 274 assert b"400" in written 275 276 @pytest.mark.asyncio 277 async def test_localhost_rejected(self): 278 task = _make_task() 279 reader = AsyncMock() 280 reader.readline = AsyncMock(side_effect=[ 281 b"GET http://127.0.0.1/ HTTP/1.1\r\n", 282 b"\r\n", 283 ]) 284 writer = _mock_writer() 285 await task.handle_client(reader, writer) 286 written = writer.write.call_args[0][0] 287 assert b"403" in written 288 289 @pytest.mark.asyncio 290 async def test_no_outproxy(self): 291 task = _make_task() 292 reader = AsyncMock() 293 reader.readline = AsyncMock(side_effect=[ 294 b"GET http://example.com/ HTTP/1.1\r\n", 295 b"\r\n", 296 ]) 297 writer = _mock_writer() 298 await task.handle_client(reader, writer) 299 written = writer.write.call_args[0][0] 300 assert b"503" in written 301 302 @pytest.mark.asyncio 303 async def test_i2p_dnf(self): 304 task = _make_task() 305 task._session.lookup = AsyncMock(return_value=None) 306 reader = AsyncMock() 307 reader.readline = AsyncMock(side_effect=[ 308 b"GET http://unknown.i2p/ HTTP/1.1\r\n", 309 b"\r\n", 310 ]) 311 writer = _mock_writer() 312 await task.handle_client(reader, writer) 313 written = writer.write.call_args[0][0] 314 assert b"503" in written 315 316 @pytest.mark.asyncio 317 async def test_i2p_connect_fail(self): 318 task = _make_task() 319 task._session.connect = AsyncMock(side_effect=ConnectionError("fail")) 320 reader = AsyncMock() 321 reader.readline = AsyncMock(side_effect=[ 322 b"GET http://abc.b32.i2p/path HTTP/1.1\r\n", 323 b"\r\n", 324 ]) 325 writer = _mock_writer() 326 await task.handle_client(reader, writer) 327 written = writer.write.call_args[0][0] 328 assert b"504" in written 329 330 @pytest.mark.asyncio 331 async def test_i2p_success(self): 332 task = _make_task() 333 remote_reader = AsyncMock() 334 remote_writer = MagicMock() 335 remote_writer.close = MagicMock() 336 remote_writer.wait_closed = AsyncMock() 337 task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer)) 338 reader = AsyncMock() 339 reader.readline = AsyncMock(side_effect=[ 340 b"GET http://abc.b32.i2p/path HTTP/1.1\r\n", 341 b"Host: abc.b32.i2p\r\n", 342 b"\r\n", 343 ]) 344 reader.read = AsyncMock(return_value=b"") 345 writer = _mock_writer() 346 347 with patch("i2p_apps.i2ptunnel.http_client.bridge_with_initial_data", new_callable=AsyncMock) as mock_bridge: 348 await task.handle_client(reader, writer) 349 mock_bridge.assert_called_once() 350 351 @pytest.mark.asyncio 352 async def test_connect_ssl_non_i2p(self): 353 task = _make_task() 354 reader = AsyncMock() 355 reader.readline = AsyncMock(side_effect=[ 356 b"CONNECT example.com:443 HTTP/1.1\r\n", 357 b"\r\n", # consumed by handle_client's _read_headers 358 b"\r\n", # consumed by _handle_connect's _read_headers 359 ]) 360 writer = _mock_writer() 361 await task.handle_client(reader, writer) 362 written = writer.write.call_args[0][0] 363 assert b"403" in written 364 365 @pytest.mark.asyncio 366 async def test_connect_i2p_success(self): 367 task = _make_task() 368 remote_reader = AsyncMock() 369 remote_writer = MagicMock() 370 remote_writer.close = MagicMock() 371 task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer)) 372 reader = AsyncMock() 373 reader.readline = AsyncMock(side_effect=[ 374 b"CONNECT abc.b32.i2p:443 HTTP/1.1\r\n", 375 b"\r\n", 376 b"\r\n", 377 ]) 378 writer = _mock_writer() 379 380 with patch("i2p_apps.i2ptunnel.http_client.bridge", new_callable=AsyncMock) as mock_bridge: 381 await task.handle_client(reader, writer) 382 written = writer.write.call_args[0][0] 383 assert b"200" in written 384 mock_bridge.assert_called_once() 385 386 @pytest.mark.asyncio 387 async def test_connect_i2p_dnf(self): 388 task = _make_task() 389 task._session.lookup = AsyncMock(return_value=None) 390 reader = AsyncMock() 391 reader.readline = AsyncMock(side_effect=[ 392 b"CONNECT unknown.i2p:443 HTTP/1.1\r\n", 393 b"\r\n", 394 b"\r\n", 395 ]) 396 writer = _mock_writer() 397 await task.handle_client(reader, writer) 398 written = writer.write.call_args[0][0] 399 assert b"503" in written 400 401 @pytest.mark.asyncio 402 async def test_connect_i2p_timeout(self): 403 task = _make_task() 404 task._session.connect = AsyncMock(side_effect=ConnectionError("timeout")) 405 reader = AsyncMock() 406 reader.readline = AsyncMock(side_effect=[ 407 b"CONNECT abc.b32.i2p:443 HTTP/1.1\r\n", 408 b"\r\n", 409 b"\r\n", 410 ]) 411 writer = _mock_writer() 412 await task.handle_client(reader, writer) 413 written = writer.write.call_args[0][0] 414 assert b"504" in written 415 416 @pytest.mark.asyncio 417 async def test_outproxy_success(self): 418 task = _make_task(proxy_list=["outproxy.i2p"]) 419 task._session.lookup = AsyncMock(return_value="outproxy-dest") 420 remote_reader = AsyncMock() 421 remote_writer = MagicMock() 422 remote_writer.close = MagicMock() 423 task._session.connect = AsyncMock(return_value=(remote_reader, remote_writer)) 424 reader = AsyncMock() 425 reader.readline = AsyncMock(side_effect=[ 426 b"GET http://example.com/path HTTP/1.1\r\n", 427 b"\r\n", 428 ]) 429 reader.read = AsyncMock(return_value=b"") 430 writer = _mock_writer() 431 432 with patch("i2p_apps.i2ptunnel.http_client.bridge_with_initial_data", new_callable=AsyncMock) as mock_bridge: 433 await task.handle_client(reader, writer) 434 mock_bridge.assert_called_once() 435 436 @pytest.mark.asyncio 437 async def test_outproxy_dnf(self): 438 task = _make_task(proxy_list=["outproxy.i2p"]) 439 task._session.lookup = AsyncMock(return_value=None) 440 reader = AsyncMock() 441 reader.readline = AsyncMock(side_effect=[ 442 b"GET http://example.com/ HTTP/1.1\r\n", 443 b"\r\n", 444 ]) 445 reader.read = AsyncMock(return_value=b"") 446 writer = _mock_writer() 447 await task.handle_client(reader, writer) 448 written = writer.write.call_args[0][0] 449 assert b"503" in written 450 451 @pytest.mark.asyncio 452 async def test_outproxy_connect_fail(self): 453 task = _make_task(proxy_list=["outproxy.i2p"]) 454 task._session.lookup = AsyncMock(return_value="outproxy-dest") 455 task._session.connect = AsyncMock(side_effect=ConnectionError("fail")) 456 reader = AsyncMock() 457 reader.readline = AsyncMock(side_effect=[ 458 b"GET http://example.com/ HTTP/1.1\r\n", 459 b"\r\n", 460 ]) 461 reader.read = AsyncMock(return_value=b"") 462 writer = _mock_writer() 463 await task.handle_client(reader, writer) 464 written = writer.write.call_args[0][0] 465 assert b"504" in written 466 467 @pytest.mark.asyncio 468 async def test_exception_in_handler(self): 469 task = _make_task() 470 reader = AsyncMock() 471 reader.readline = AsyncMock(side_effect=RuntimeError("boom")) 472 writer = _mock_writer() 473 # Should not raise 474 await task.handle_client(reader, writer)