A Python port of the Invisible Internet Project (I2P)
1"""Layer 2b+2c: Network probes — DNS health, TCP RTT, bandwidth estimation.
2
3All traffic looks like normal HTTPS/DNS usage. No speed-test protocols.
4"""
5
6from __future__ import annotations
7
8import logging
9import socket
10import statistics
11import time
12import urllib.request
13from dataclasses import dataclass
14
15log = logging.getLogger(__name__)
16
17# Endpoints chosen for global accessibility including behind GFW
18DNS_NEUTRAL = "example.com"
19DNS_CDN = "cloudflare.com"
20DNS_RESEED = "reseed.i2p-projekt.de"
21
22RTT_ENDPOINTS = [
23 ("cloudflare.com", 443),
24 ("example.com", 443),
25 ("mozilla.org", 443),
26]
27
28# Small Cloudflare resource for bandwidth estimation (~100KB)
29BANDWIDTH_URL = "https://www.cloudflare.com/favicon.ico"
30
31
32@dataclass
33class DnsProbeResult:
34 """DNS resolution test results."""
35
36 can_resolve_neutral: bool
37 can_resolve_cdn: bool
38 can_resolve_reseed: bool
39 resolver_latency_ms: float
40
41
42@dataclass
43class LatencyProbeResult:
44 """TCP RTT measurement results."""
45
46 rtt_ms: float # median
47 jitter_ms: float # IQR
48 endpoints_reached: int
49 endpoints_tested: int
50
51
52@dataclass
53class BandwidthEstimate:
54 """Estimated download bandwidth."""
55
56 download_mbps: float
57 method: str # "cdn_download" or "reseed_download" or "manual"
58
59
60@dataclass
61class NetworkProbeResult:
62 """Aggregated network probe results."""
63
64 dns: DnsProbeResult
65 latency: LatencyProbeResult
66 bandwidth: BandwidthEstimate | None
67 reseed_reachable: bool
68
69
70def check_dns(domain: str) -> tuple[bool, float]:
71 """Resolve a domain via system DNS.
72
73 Returns (success, latency_ms).
74 """
75 try:
76 start = time.monotonic()
77 socket.getaddrinfo(domain, None)
78 elapsed = (time.monotonic() - start) * 1000
79 return True, elapsed
80 except (socket.gaierror, OSError):
81 return False, 0.0
82
83
84def probe_dns() -> DnsProbeResult:
85 """Test DNS resolution for neutral, CDN, and reseed domains."""
86 neutral_ok, neutral_ms = check_dns(DNS_NEUTRAL)
87 cdn_ok, cdn_ms = check_dns(DNS_CDN)
88 reseed_ok, reseed_ms = check_dns(DNS_RESEED)
89
90 latencies = [ms for ms in (neutral_ms, cdn_ms, reseed_ms) if ms > 0]
91 avg_latency = statistics.mean(latencies) if latencies else 0.0
92
93 return DnsProbeResult(
94 can_resolve_neutral=neutral_ok,
95 can_resolve_cdn=cdn_ok,
96 can_resolve_reseed=reseed_ok,
97 resolver_latency_ms=avg_latency,
98 )
99
100
101def measure_tcp_rtt(host: str, port: int = 443) -> float | None:
102 """Measure RTT via TCP SYN-ACK timing.
103
104 Returns seconds or None on failure.
105 """
106 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
107 sock.settimeout(5.0)
108 try:
109 start = time.monotonic()
110 result = sock.connect_ex((host, port))
111 elapsed = time.monotonic() - start
112 if result == 0:
113 return elapsed
114 return None
115 except OSError:
116 return None
117 finally:
118 sock.close()
119
120
121def probe_latency(
122 endpoints: list[str] | None = None,
123) -> LatencyProbeResult:
124 """Measure RTT to multiple endpoints. Returns median + jitter.
125
126 Endpoints are "host:port" strings.
127 """
128 if endpoints is None:
129 endpoints = [f"{h}:{p}" for h, p in RTT_ENDPOINTS]
130
131 rtts: list[float] = []
132 for ep in endpoints:
133 host, _, port_str = ep.rpartition(":")
134 port = int(port_str) if port_str else 443
135 rtt = measure_tcp_rtt(host, port)
136 if rtt is not None:
137 rtts.append(rtt * 1000) # convert to ms
138
139 if not rtts:
140 return LatencyProbeResult(
141 rtt_ms=0.0,
142 jitter_ms=0.0,
143 endpoints_reached=0,
144 endpoints_tested=len(endpoints),
145 )
146
147 median = statistics.median(rtts)
148 if len(rtts) >= 4:
149 sorted_rtts = sorted(rtts)
150 q1 = sorted_rtts[len(sorted_rtts) // 4]
151 q3 = sorted_rtts[3 * len(sorted_rtts) // 4]
152 jitter = q3 - q1
153 elif len(rtts) >= 2:
154 jitter = max(rtts) - min(rtts)
155 else:
156 jitter = 0.0
157
158 return LatencyProbeResult(
159 rtt_ms=median,
160 jitter_ms=jitter,
161 endpoints_reached=len(rtts),
162 endpoints_tested=len(endpoints),
163 )
164
165
166def estimate_bandwidth(
167 url: str | None = None,
168 size_hint: int = 102400,
169) -> BandwidthEstimate | None:
170 """Download a small resource and estimate bandwidth.
171
172 Returns BandwidthEstimate or None on failure.
173 """
174 if url is None:
175 url = BANDWIDTH_URL
176
177 try:
178 start = time.monotonic()
179 with urllib.request.urlopen(url, timeout=10) as resp:
180 data = resp.read()
181 elapsed = time.monotonic() - start
182
183 if elapsed <= 0 or not data:
184 return None
185
186 bytes_downloaded = len(data)
187 mbps = (bytes_downloaded * 8) / (elapsed * 1_000_000)
188
189 return BandwidthEstimate(download_mbps=mbps, method="cdn_download")
190
191 except (OSError, urllib.error.URLError):
192 return None
193
194
195def run_network_probes(
196 include_reseed: bool = True,
197 include_bandwidth: bool = True,
198) -> NetworkProbeResult:
199 """Run all network probes."""
200 dns = probe_dns()
201 latency = probe_latency()
202
203 bandwidth = None
204 if include_bandwidth:
205 bandwidth = estimate_bandwidth()
206
207 reseed_reachable = dns.can_resolve_reseed
208 if include_reseed and reseed_reachable:
209 ok, _ = check_dns(DNS_RESEED)
210 reseed_reachable = ok
211
212 return NetworkProbeResult(
213 dns=dns,
214 latency=latency,
215 bandwidth=bandwidth,
216 reseed_reachable=reseed_reachable,
217 )