A Python port of the Invisible Internet Project (I2P)
at main 217 lines 5.5 kB view raw
1"""Layer 2b+2c: Network probes — DNS health, TCP RTT, bandwidth estimation. 2 3All traffic looks like normal HTTPS/DNS usage. No speed-test protocols. 4""" 5 6from __future__ import annotations 7 8import logging 9import socket 10import statistics 11import time 12import urllib.request 13from dataclasses import dataclass 14 15log = logging.getLogger(__name__) 16 17# Endpoints chosen for global accessibility including behind GFW 18DNS_NEUTRAL = "example.com" 19DNS_CDN = "cloudflare.com" 20DNS_RESEED = "reseed.i2p-projekt.de" 21 22RTT_ENDPOINTS = [ 23 ("cloudflare.com", 443), 24 ("example.com", 443), 25 ("mozilla.org", 443), 26] 27 28# Small Cloudflare resource for bandwidth estimation (~100KB) 29BANDWIDTH_URL = "https://www.cloudflare.com/favicon.ico" 30 31 32@dataclass 33class DnsProbeResult: 34 """DNS resolution test results.""" 35 36 can_resolve_neutral: bool 37 can_resolve_cdn: bool 38 can_resolve_reseed: bool 39 resolver_latency_ms: float 40 41 42@dataclass 43class LatencyProbeResult: 44 """TCP RTT measurement results.""" 45 46 rtt_ms: float # median 47 jitter_ms: float # IQR 48 endpoints_reached: int 49 endpoints_tested: int 50 51 52@dataclass 53class BandwidthEstimate: 54 """Estimated download bandwidth.""" 55 56 download_mbps: float 57 method: str # "cdn_download" or "reseed_download" or "manual" 58 59 60@dataclass 61class NetworkProbeResult: 62 """Aggregated network probe results.""" 63 64 dns: DnsProbeResult 65 latency: LatencyProbeResult 66 bandwidth: BandwidthEstimate | None 67 reseed_reachable: bool 68 69 70def check_dns(domain: str) -> tuple[bool, float]: 71 """Resolve a domain via system DNS. 72 73 Returns (success, latency_ms). 74 """ 75 try: 76 start = time.monotonic() 77 socket.getaddrinfo(domain, None) 78 elapsed = (time.monotonic() - start) * 1000 79 return True, elapsed 80 except (socket.gaierror, OSError): 81 return False, 0.0 82 83 84def probe_dns() -> DnsProbeResult: 85 """Test DNS resolution for neutral, CDN, and reseed domains.""" 86 neutral_ok, neutral_ms = check_dns(DNS_NEUTRAL) 87 cdn_ok, cdn_ms = check_dns(DNS_CDN) 88 reseed_ok, reseed_ms = check_dns(DNS_RESEED) 89 90 latencies = [ms for ms in (neutral_ms, cdn_ms, reseed_ms) if ms > 0] 91 avg_latency = statistics.mean(latencies) if latencies else 0.0 92 93 return DnsProbeResult( 94 can_resolve_neutral=neutral_ok, 95 can_resolve_cdn=cdn_ok, 96 can_resolve_reseed=reseed_ok, 97 resolver_latency_ms=avg_latency, 98 ) 99 100 101def measure_tcp_rtt(host: str, port: int = 443) -> float | None: 102 """Measure RTT via TCP SYN-ACK timing. 103 104 Returns seconds or None on failure. 105 """ 106 sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) 107 sock.settimeout(5.0) 108 try: 109 start = time.monotonic() 110 result = sock.connect_ex((host, port)) 111 elapsed = time.monotonic() - start 112 if result == 0: 113 return elapsed 114 return None 115 except OSError: 116 return None 117 finally: 118 sock.close() 119 120 121def probe_latency( 122 endpoints: list[str] | None = None, 123) -> LatencyProbeResult: 124 """Measure RTT to multiple endpoints. Returns median + jitter. 125 126 Endpoints are "host:port" strings. 127 """ 128 if endpoints is None: 129 endpoints = [f"{h}:{p}" for h, p in RTT_ENDPOINTS] 130 131 rtts: list[float] = [] 132 for ep in endpoints: 133 host, _, port_str = ep.rpartition(":") 134 port = int(port_str) if port_str else 443 135 rtt = measure_tcp_rtt(host, port) 136 if rtt is not None: 137 rtts.append(rtt * 1000) # convert to ms 138 139 if not rtts: 140 return LatencyProbeResult( 141 rtt_ms=0.0, 142 jitter_ms=0.0, 143 endpoints_reached=0, 144 endpoints_tested=len(endpoints), 145 ) 146 147 median = statistics.median(rtts) 148 if len(rtts) >= 4: 149 sorted_rtts = sorted(rtts) 150 q1 = sorted_rtts[len(sorted_rtts) // 4] 151 q3 = sorted_rtts[3 * len(sorted_rtts) // 4] 152 jitter = q3 - q1 153 elif len(rtts) >= 2: 154 jitter = max(rtts) - min(rtts) 155 else: 156 jitter = 0.0 157 158 return LatencyProbeResult( 159 rtt_ms=median, 160 jitter_ms=jitter, 161 endpoints_reached=len(rtts), 162 endpoints_tested=len(endpoints), 163 ) 164 165 166def estimate_bandwidth( 167 url: str | None = None, 168 size_hint: int = 102400, 169) -> BandwidthEstimate | None: 170 """Download a small resource and estimate bandwidth. 171 172 Returns BandwidthEstimate or None on failure. 173 """ 174 if url is None: 175 url = BANDWIDTH_URL 176 177 try: 178 start = time.monotonic() 179 with urllib.request.urlopen(url, timeout=10) as resp: 180 data = resp.read() 181 elapsed = time.monotonic() - start 182 183 if elapsed <= 0 or not data: 184 return None 185 186 bytes_downloaded = len(data) 187 mbps = (bytes_downloaded * 8) / (elapsed * 1_000_000) 188 189 return BandwidthEstimate(download_mbps=mbps, method="cdn_download") 190 191 except (OSError, urllib.error.URLError): 192 return None 193 194 195def run_network_probes( 196 include_reseed: bool = True, 197 include_bandwidth: bool = True, 198) -> NetworkProbeResult: 199 """Run all network probes.""" 200 dns = probe_dns() 201 latency = probe_latency() 202 203 bandwidth = None 204 if include_bandwidth: 205 bandwidth = estimate_bandwidth() 206 207 reseed_reachable = dns.can_resolve_reseed 208 if include_reseed and reseed_reachable: 209 ok, _ = check_dns(DNS_RESEED) 210 reseed_reachable = ok 211 212 return NetworkProbeResult( 213 dns=dns, 214 latency=latency, 215 bandwidth=bandwidth, 216 reseed_reachable=reseed_reachable, 217 )