A Python port of the Invisible Internet Project (I2P)
1"""HTTP server tunnel — HTTP-aware hidden service hosting.
2
3Extends ServerTunnelTask to add HTTP-specific processing:
4host header spoofing, I2P destination headers, response stripping,
5POST rate limiting, and inproxy rejection.
6
7Ported from net.i2p.i2ptunnel.I2PTunnelHTTPServer.
8"""
9
10from __future__ import annotations
11
12import hashlib
13import logging
14import time
15from collections import defaultdict, deque
16
17from i2p_apps.i2ptunnel.config import TunnelDefinition
18from i2p_apps.i2ptunnel.tasks import ServerTunnelTask
19
20logger = logging.getLogger(__name__)
21
22# Headers stripped from responses for privacy
23_STRIPPED_RESPONSE = {"server", "date", "x-powered-by", "x-runtime", "proxy"}
24
25
26class HTTPServerTask(ServerTunnelTask):
27 """HTTP-aware server tunnel for hidden service hosting.
28
29 Accepts I2P connections and forwards to a local HTTP server with:
30 - Host header spoofing
31 - X-I2P-Dest header injection
32 - Response header stripping
33 - POST rate limiting
34 - Inproxy request rejection
35 """
36
37 def __init__(self, config: TunnelDefinition, session) -> None:
38 super().__init__(config, session)
39 self._spoofed_host = config.spoofed_host
40 self._reject_inproxy = config.options.get("rejectInproxy", "false").lower() == "true"
41 self._max_posts = int(config.options.get("maxPosts", "0"))
42 self._max_total_posts = int(config.options.get("maxTotalPosts", "0"))
43 self._post_check_time = int(config.options.get("postCheckTime", "300"))
44 self._post_ban_time = int(config.options.get("postBanTime", "1800"))
45 self._post_records: dict[str, deque[float]] = defaultdict(deque)
46 self._total_post_records: deque[float] = deque()
47
48 # --- Host Spoofing ---
49
50 def _spoof_host(self, headers: dict[str, str]) -> dict[str, str]:
51 """Replace Host header with spoofedHost if configured."""
52 result = dict(headers)
53 if self._spoofed_host:
54 result["Host"] = self._spoofed_host
55 return result
56
57 # --- I2P Destination Headers ---
58
59 def _inject_i2p_headers(
60 self, headers: dict[str, str], remote_dest: str
61 ) -> dict[str, str]:
62 """Inject X-I2P-DestHash, X-I2P-DestB64, X-I2P-DestB32 headers."""
63 result = dict(headers)
64 result["X-I2P-DestB64"] = remote_dest
65
66 # Compute hash of the destination
67 try:
68 # Use raw bytes of the base64 destination for hashing
69 dest_hash = hashlib.sha256(remote_dest.encode("utf-8")).hexdigest()
70 result["X-I2P-DestHash"] = dest_hash
71
72 # B32 address is base32(sha256(dest)).b32.i2p — simplified here
73 import base64
74 hash_bytes = hashlib.sha256(remote_dest.encode("utf-8")).digest()
75 b32 = base64.b32encode(hash_bytes).decode("ascii").lower().rstrip("=")
76 result["X-I2P-DestB32"] = f"{b32}.b32.i2p"
77 except Exception:
78 result["X-I2P-DestHash"] = ""
79 result["X-I2P-DestB32"] = ""
80
81 return result
82
83 # --- Response Header Stripping ---
84
85 @staticmethod
86 def _strip_response_headers(headers: dict[str, str]) -> dict[str, str]:
87 """Remove privacy-sensitive headers from responses."""
88 return {
89 k: v for k, v in headers.items()
90 if k.lower() not in _STRIPPED_RESPONSE
91 }
92
93 # --- POST Rate Limiting ---
94
95 def _check_post_limit(self, peer_hash: str) -> bool:
96 """Check if a POST from this peer is allowed."""
97 now = time.monotonic()
98 cutoff = now - self._post_check_time
99
100 # Purge old records
101 peer_recs = self._post_records[peer_hash]
102 while peer_recs and peer_recs[0] < cutoff:
103 peer_recs.popleft()
104 while self._total_post_records and self._total_post_records[0] < cutoff:
105 self._total_post_records.popleft()
106
107 # Check per-peer limit
108 if self._max_posts > 0 and len(peer_recs) >= self._max_posts:
109 return False
110
111 # Check global limit
112 if self._max_total_posts > 0 and len(self._total_post_records) >= self._max_total_posts:
113 return False
114
115 return True
116
117 def _record_post(self, peer_hash: str) -> None:
118 """Record a POST from this peer."""
119 now = time.monotonic()
120 self._post_records[peer_hash].append(now)
121 self._total_post_records.append(now)
122
123 # --- Inproxy Rejection ---
124
125 def _is_inproxy_request(self, headers: dict[str, str]) -> bool:
126 """Check if request came through an inproxy (clearnet gateway)."""
127 if not self._reject_inproxy:
128 return False
129 lower_headers = {k.lower(): v for k, v in headers.items()}
130 return "via" in lower_headers or "x-forwarded-for" in lower_headers