"""HTTP proxy tunnel — routes .i2p requests through SAM. Extends the existing parsing from http_proxy.py with full SAM integration, header filtering, CONNECT tunneling, address helpers, and outproxy support. Ported from net.i2p.i2ptunnel.I2PTunnelHTTPClient. """ from __future__ import annotations import logging import random from urllib.parse import urlparse, parse_qs from i2p_apps.i2ptunnel.config import TunnelDefinition from i2p_apps.i2ptunnel.forwarder import bridge, bridge_with_initial_data from i2p_apps.i2ptunnel.http_proxy import parse_http_request, extract_i2p_destination from i2p_apps.i2ptunnel.tasks import ClientTunnelTask logger = logging.getLogger(__name__) # Headers stripped from outbound requests for privacy _STRIPPED_OUTBOUND = { "referer", "via", "x-forwarded-for", "from", "proxy-connection", "proxy-authorization", } # I2P-specific User-Agent replacement _I2P_USER_AGENT = "MYOB/6.66 (AN/ON)" # Error page types _ERROR_MESSAGES = { "dnf": "Destination Not Found", "noproxy": "No Outproxy Configured", "ssl": "SSL Not Allowed", "timeout": "Connection Timeout", "protocol": "Protocol Error", "localhost": "Localhost Access Denied", "baduri": "Bad Request URI", "denied": "Access Denied", } class HTTPClientTask(ClientTunnelTask): """HTTP proxy that routes .i2p requests through SAM. Features: - .i2p hostname resolution via SAM NAMING LOOKUP - Header filtering (strips Referer, Via, X-Forwarded-For, etc.) - User-Agent replacement with MYOB/6.66 for .i2p sites - CONNECT method for HTTPS tunneling - Address helper support (?i2paddresshelper=) - Outproxy support for non-.i2p hosts """ def __init__(self, config: TunnelDefinition, session) -> None: super().__init__(config, session) self._proxy_list = list(config.proxy_list) self._address_cache: dict[str, str] = {} self._failed_outproxies: set[str] = set() # --- Header Filtering --- def _filter_outbound_headers( self, headers: dict[str, str], is_i2p: bool ) -> dict[str, str]: """Filter outbound headers for privacy.""" filtered = {} for key, value in headers.items(): if key.lower() in _STRIPPED_OUTBOUND: continue if key.lower() == "user-agent": filtered[key] = _I2P_USER_AGENT if is_i2p else value continue filtered[key] = value return filtered # --- Address Helpers --- def _extract_address_helper(self, url: str) -> tuple[str, str | None]: """Extract ?i2paddresshelper= value from URL. Returns (host, helper_value_or_None). """ parsed = urlparse(url) params = parse_qs(parsed.query) helper = params.get("i2paddresshelper", [None])[0] return parsed.hostname or "", helper def _cache_address_helper(self, hostname: str, dest: str) -> None: self._address_cache[hostname] = dest def _get_cached_helper(self, hostname: str) -> str | None: return self._address_cache.get(hostname) # --- Outproxy --- def _has_outproxy(self) -> bool: return len(self._proxy_list) > 0 def _pick_outproxy(self) -> str: """Pick an outproxy destination, avoiding recently failed ones.""" available = [p for p in self._proxy_list if p not in self._failed_outproxies] if not available: # All failed — reset and try again self._failed_outproxies.clear() available = list(self._proxy_list) if not available: return "" return random.choice(available) # --- Request Classification --- @staticmethod def _is_i2p_request(host: str) -> bool: return host.endswith(".i2p") @staticmethod def _is_localhost(host: str) -> bool: return host in ("127.0.0.1", "localhost", "::1", "0.0.0.0") # --- Error Pages --- def _error_page(self, error_type: str, status_code: int) -> bytes: """Generate an HTML error response.""" message = _ERROR_MESSAGES.get(error_type, "Error") body = ( f"I2P Proxy Error" f"

{status_code} {message}

" f"

The I2P HTTP proxy was unable to process your request.

" f"

Error: {error_type}

" f"" ) return ( f"HTTP/1.1 {status_code} {message}\r\n" f"Content-Type: text/html; charset=UTF-8\r\n" f"Content-Length: {len(body)}\r\n" f"Connection: close\r\n" f"\r\n" f"{body}" ).encode("utf-8") # --- Header Parsing Helpers --- @staticmethod async def _read_headers(reader) -> dict[str, str]: """Read HTTP headers until blank line.""" headers: dict[str, str] = {} while True: line = await reader.readline() if not line or line == b"\r\n" or line == b"\n": break text = line.decode("utf-8", errors="replace").strip() if ":" in text: key, _, value = text.partition(":") headers[key.strip()] = value.strip() return headers @staticmethod async def _read_body(reader, headers: dict[str, str]) -> bytes: """Read request body based on Content-Length header.""" cl = headers.get("Content-Length", headers.get("content-length", "0")) try: length = int(cl) except ValueError: length = 0 if length > 0: return await reader.read(length) return b"" # --- Resolution --- async def _resolve(self, hostname: str) -> str | None: """Resolve an I2P hostname to base64 destination.""" # Check address helper cache first cached = self._get_cached_helper(hostname) if cached: return cached if hostname.endswith(".b32.i2p"): return hostname return await self._session.lookup(hostname) # --- Request Rebuilding --- @staticmethod def _rebuild_request(method: str, path: str, headers: dict[str, str], body: bytes) -> bytes: """Rebuild an HTTP request with relative path and filtered headers.""" lines = [f"{method} {path} HTTP/1.1"] for key, value in headers.items(): lines.append(f"{key}: {value}") lines.append("") lines.append("") request = "\r\n".join(lines).encode("utf-8") if body: request += body return request # --- Main Handler --- async def handle_client(self, reader, writer) -> None: try: first_line_bytes = await reader.readline() if not first_line_bytes: return first_line = first_line_bytes.decode("utf-8", errors="replace").strip() try: request = parse_http_request(first_line) except ValueError: writer.write(self._error_page("baduri", 400)) await writer.drain() return headers = await self._read_headers(reader) # Reject localhost if self._is_localhost(request.host): writer.write(self._error_page("localhost", 403)) await writer.drain() return if request.is_connect: await self._handle_connect(request, reader, writer) elif self._is_i2p_request(request.host): body = await self._read_body(reader, headers) await self._handle_i2p(request, headers, body, reader, writer) elif self._has_outproxy(): body = await self._read_body(reader, headers) await self._handle_outproxy(request, headers, body, reader, writer) else: writer.write(self._error_page("noproxy", 503)) await writer.drain() except Exception: logger.exception("Error in HTTP proxy handler") async def _handle_i2p(self, request, headers, body, reader, writer) -> None: """Route request to .i2p destination via SAM.""" # Check for address helper _, helper = self._extract_address_helper(request.raw_first_line) if helper: self._cache_address_helper(request.host, helper) dest = await self._resolve(request.host) if dest is None: writer.write(self._error_page("dnf", 503)) await writer.drain() return try: remote_reader, remote_writer = await self._session.connect(dest) except Exception: writer.write(self._error_page("timeout", 504)) await writer.drain() return # Filter headers and rebuild request with relative path filtered = self._filter_outbound_headers(headers, is_i2p=True) if "Host" not in filtered and "host" not in filtered: filtered["Host"] = request.host path = request.path or "/" request_bytes = self._rebuild_request(request.method, path, filtered, body) await bridge_with_initial_data( reader, writer, remote_reader, remote_writer, request_bytes ) async def _handle_connect(self, request, reader, writer) -> None: """Handle CONNECT method for HTTPS tunneling.""" # Consume remaining headers await self._read_headers(reader) if self._is_i2p_request(request.host): dest = await self._resolve(request.host) if dest is None: writer.write(self._error_page("dnf", 503)) await writer.drain() return try: remote_reader, remote_writer = await self._session.connect(dest) except Exception: writer.write(self._error_page("timeout", 504)) await writer.drain() return writer.write(b"HTTP/1.1 200 Connection Established\r\n\r\n") await writer.drain() await bridge(reader, writer, remote_reader, remote_writer) else: writer.write(self._error_page("ssl", 403)) await writer.drain() async def _handle_outproxy(self, request, headers, body, reader, writer) -> None: """Forward non-.i2p request to outproxy.""" outproxy = self._pick_outproxy() if not outproxy: writer.write(self._error_page("noproxy", 503)) await writer.drain() return dest = await self._resolve(outproxy) if dest is None: self._failed_outproxies.add(outproxy) writer.write(self._error_page("dnf", 503)) await writer.drain() return try: remote_reader, remote_writer = await self._session.connect(dest) except Exception: self._failed_outproxies.add(outproxy) writer.write(self._error_page("timeout", 504)) await writer.drain() return # Forward the original request (with absolute URL) to the outproxy filtered = self._filter_outbound_headers(headers, is_i2p=False) request_bytes = self._rebuild_request( request.method, request.raw_first_line.split()[1], filtered, body ) await bridge_with_initial_data( reader, writer, remote_reader, remote_writer, request_bytes )