"""HTTP server tunnel — HTTP-aware hidden service hosting. Extends ServerTunnelTask to add HTTP-specific processing: host header spoofing, I2P destination headers, response stripping, POST rate limiting, and inproxy rejection. Ported from net.i2p.i2ptunnel.I2PTunnelHTTPServer. """ from __future__ import annotations import hashlib import logging import time from collections import defaultdict, deque from i2p_apps.i2ptunnel.config import TunnelDefinition from i2p_apps.i2ptunnel.tasks import ServerTunnelTask logger = logging.getLogger(__name__) # Headers stripped from responses for privacy _STRIPPED_RESPONSE = {"server", "date", "x-powered-by", "x-runtime", "proxy"} class HTTPServerTask(ServerTunnelTask): """HTTP-aware server tunnel for hidden service hosting. Accepts I2P connections and forwards to a local HTTP server with: - Host header spoofing - X-I2P-Dest header injection - Response header stripping - POST rate limiting - Inproxy request rejection """ def __init__(self, config: TunnelDefinition, session) -> None: super().__init__(config, session) self._spoofed_host = config.spoofed_host self._reject_inproxy = config.options.get("rejectInproxy", "false").lower() == "true" self._max_posts = int(config.options.get("maxPosts", "0")) self._max_total_posts = int(config.options.get("maxTotalPosts", "0")) self._post_check_time = int(config.options.get("postCheckTime", "300")) self._post_ban_time = int(config.options.get("postBanTime", "1800")) self._post_records: dict[str, deque[float]] = defaultdict(deque) self._total_post_records: deque[float] = deque() # --- Host Spoofing --- def _spoof_host(self, headers: dict[str, str]) -> dict[str, str]: """Replace Host header with spoofedHost if configured.""" result = dict(headers) if self._spoofed_host: result["Host"] = self._spoofed_host return result # --- I2P Destination Headers --- def _inject_i2p_headers( self, headers: dict[str, str], remote_dest: str ) -> dict[str, str]: """Inject X-I2P-DestHash, X-I2P-DestB64, X-I2P-DestB32 headers.""" result = dict(headers) result["X-I2P-DestB64"] = remote_dest # Compute hash of the destination try: # Use raw bytes of the base64 destination for hashing dest_hash = hashlib.sha256(remote_dest.encode("utf-8")).hexdigest() result["X-I2P-DestHash"] = dest_hash # B32 address is base32(sha256(dest)).b32.i2p — simplified here import base64 hash_bytes = hashlib.sha256(remote_dest.encode("utf-8")).digest() b32 = base64.b32encode(hash_bytes).decode("ascii").lower().rstrip("=") result["X-I2P-DestB32"] = f"{b32}.b32.i2p" except Exception: result["X-I2P-DestHash"] = "" result["X-I2P-DestB32"] = "" return result # --- Response Header Stripping --- @staticmethod def _strip_response_headers(headers: dict[str, str]) -> dict[str, str]: """Remove privacy-sensitive headers from responses.""" return { k: v for k, v in headers.items() if k.lower() not in _STRIPPED_RESPONSE } # --- POST Rate Limiting --- def _check_post_limit(self, peer_hash: str) -> bool: """Check if a POST from this peer is allowed.""" now = time.monotonic() cutoff = now - self._post_check_time # Purge old records peer_recs = self._post_records[peer_hash] while peer_recs and peer_recs[0] < cutoff: peer_recs.popleft() while self._total_post_records and self._total_post_records[0] < cutoff: self._total_post_records.popleft() # Check per-peer limit if self._max_posts > 0 and len(peer_recs) >= self._max_posts: return False # Check global limit if self._max_total_posts > 0 and len(self._total_post_records) >= self._max_total_posts: return False return True def _record_post(self, peer_hash: str) -> None: """Record a POST from this peer.""" now = time.monotonic() self._post_records[peer_hash].append(now) self._total_post_records.append(now) # --- Inproxy Rejection --- def _is_inproxy_request(self, headers: dict[str, str]) -> bool: """Check if request came through an inproxy (clearnet gateway).""" if not self._reject_inproxy: return False lower_headers = {k.lower(): v for k, v in headers.items()} return "via" in lower_headers or "x-forwarded-for" in lower_headers