"""Layer 2b+2c: Network probes — DNS health, TCP RTT, bandwidth estimation. All traffic looks like normal HTTPS/DNS usage. No speed-test protocols. """ from __future__ import annotations import logging import socket import statistics import time import urllib.request from dataclasses import dataclass log = logging.getLogger(__name__) # Endpoints chosen for global accessibility including behind GFW DNS_NEUTRAL = "example.com" DNS_CDN = "cloudflare.com" DNS_RESEED = "reseed.i2p-projekt.de" RTT_ENDPOINTS = [ ("cloudflare.com", 443), ("example.com", 443), ("mozilla.org", 443), ] # Small Cloudflare resource for bandwidth estimation (~100KB) BANDWIDTH_URL = "https://www.cloudflare.com/favicon.ico" @dataclass class DnsProbeResult: """DNS resolution test results.""" can_resolve_neutral: bool can_resolve_cdn: bool can_resolve_reseed: bool resolver_latency_ms: float @dataclass class LatencyProbeResult: """TCP RTT measurement results.""" rtt_ms: float # median jitter_ms: float # IQR endpoints_reached: int endpoints_tested: int @dataclass class BandwidthEstimate: """Estimated download bandwidth.""" download_mbps: float method: str # "cdn_download" or "reseed_download" or "manual" @dataclass class NetworkProbeResult: """Aggregated network probe results.""" dns: DnsProbeResult latency: LatencyProbeResult bandwidth: BandwidthEstimate | None reseed_reachable: bool def check_dns(domain: str) -> tuple[bool, float]: """Resolve a domain via system DNS. Returns (success, latency_ms). """ try: start = time.monotonic() socket.getaddrinfo(domain, None) elapsed = (time.monotonic() - start) * 1000 return True, elapsed except (socket.gaierror, OSError): return False, 0.0 def probe_dns() -> DnsProbeResult: """Test DNS resolution for neutral, CDN, and reseed domains.""" neutral_ok, neutral_ms = check_dns(DNS_NEUTRAL) cdn_ok, cdn_ms = check_dns(DNS_CDN) reseed_ok, reseed_ms = check_dns(DNS_RESEED) latencies = [ms for ms in (neutral_ms, cdn_ms, reseed_ms) if ms > 0] avg_latency = statistics.mean(latencies) if latencies else 0.0 return DnsProbeResult( can_resolve_neutral=neutral_ok, can_resolve_cdn=cdn_ok, can_resolve_reseed=reseed_ok, resolver_latency_ms=avg_latency, ) def measure_tcp_rtt(host: str, port: int = 443) -> float | None: """Measure RTT via TCP SYN-ACK timing. Returns seconds or None on failure. """ sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) sock.settimeout(5.0) try: start = time.monotonic() result = sock.connect_ex((host, port)) elapsed = time.monotonic() - start if result == 0: return elapsed return None except OSError: return None finally: sock.close() def probe_latency( endpoints: list[str] | None = None, ) -> LatencyProbeResult: """Measure RTT to multiple endpoints. Returns median + jitter. Endpoints are "host:port" strings. """ if endpoints is None: endpoints = [f"{h}:{p}" for h, p in RTT_ENDPOINTS] rtts: list[float] = [] for ep in endpoints: host, _, port_str = ep.rpartition(":") port = int(port_str) if port_str else 443 rtt = measure_tcp_rtt(host, port) if rtt is not None: rtts.append(rtt * 1000) # convert to ms if not rtts: return LatencyProbeResult( rtt_ms=0.0, jitter_ms=0.0, endpoints_reached=0, endpoints_tested=len(endpoints), ) median = statistics.median(rtts) if len(rtts) >= 4: sorted_rtts = sorted(rtts) q1 = sorted_rtts[len(sorted_rtts) // 4] q3 = sorted_rtts[3 * len(sorted_rtts) // 4] jitter = q3 - q1 elif len(rtts) >= 2: jitter = max(rtts) - min(rtts) else: jitter = 0.0 return LatencyProbeResult( rtt_ms=median, jitter_ms=jitter, endpoints_reached=len(rtts), endpoints_tested=len(endpoints), ) def estimate_bandwidth( url: str | None = None, size_hint: int = 102400, ) -> BandwidthEstimate | None: """Download a small resource and estimate bandwidth. Returns BandwidthEstimate or None on failure. """ if url is None: url = BANDWIDTH_URL try: start = time.monotonic() with urllib.request.urlopen(url, timeout=10) as resp: data = resp.read() elapsed = time.monotonic() - start if elapsed <= 0 or not data: return None bytes_downloaded = len(data) mbps = (bytes_downloaded * 8) / (elapsed * 1_000_000) return BandwidthEstimate(download_mbps=mbps, method="cdn_download") except (OSError, urllib.error.URLError): return None def run_network_probes( include_reseed: bool = True, include_bandwidth: bool = True, ) -> NetworkProbeResult: """Run all network probes.""" dns = probe_dns() latency = probe_latency() bandwidth = None if include_bandwidth: bandwidth = estimate_bandwidth() reseed_reachable = dns.can_resolve_reseed if include_reseed and reseed_reachable: ok, _ = check_dns(DNS_RESEED) reseed_reachable = ok return NetworkProbeResult( dns=dns, latency=latency, bandwidth=bandwidth, reseed_reachable=reseed_reachable, )