"""Config advisor — maps probe results to recommended RouterConfig values. Takes LocalProbeResult, NatProbeResult, and NetworkProbeResult and produces a ConfigRecommendation with explanations and warnings. """ from __future__ import annotations import socket from dataclasses import dataclass, field from i2p_apps.setup.local_probe import LocalProbeResult from i2p_apps.setup.stun_probe import NatProbeResult from i2p_apps.setup.network_probe import NetworkProbeResult GB = 1_073_741_824 # 1 GiB in bytes @dataclass class ConfigRecommendation: """Recommended config values with explanations.""" bandwidth_limit_kbps: int = 0 share_percentage: int = 50 inbound_tunnel_count: int = 5 outbound_tunnel_count: int = 5 floodfill: bool = False nat_type: str = "unknown" external_ip: str | None = None upnp_enabled: bool = True listen_port: int = 9700 warnings: list[str] = field(default_factory=list) notes: list[str] = field(default_factory=list) def to_router_config_overrides(self) -> dict: """Return dict of RouterConfig field overrides.""" return { "bandwidth_limit_kbps": self.bandwidth_limit_kbps, "share_percentage": self.share_percentage, "inbound_tunnel_count": self.inbound_tunnel_count, "outbound_tunnel_count": self.outbound_tunnel_count, "floodfill": self.floodfill, "nat_type": self.nat_type, "external_ip": self.external_ip, "upnp_enabled": self.upnp_enabled, "listen_port": self.listen_port, } def _find_available_port(preferred: int) -> int: """Check if preferred port is available, return it or next available.""" try: with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("0.0.0.0", preferred)) return preferred except OSError: # Port in use, try next with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: s.bind(("0.0.0.0", 0)) return s.getsockname()[1] def advise( local: LocalProbeResult, nat: NatProbeResult | None = None, network: NetworkProbeResult | None = None, ) -> ConfigRecommendation: """Map probe results to recommended config values.""" warnings: list[str] = [] notes: list[str] = [] # CPU → tunnel count + floodfill eligibility crypto = local.crypto_ops_per_sec if crypto > 50000: tunnels = 20 floodfill_eligible = True elif crypto > 10000: tunnels = 10 floodfill_eligible = True elif crypto > 2000: tunnels = 5 floodfill_eligible = False else: tunnels = 2 floodfill_eligible = False # Bandwidth → share percentage + limit share = 50 limit = 0 if network and network.bandwidth: bw = network.bandwidth.download_mbps if bw >= 100: share = 80 limit = 80_000 elif bw >= 10: share = 60 limit = int(bw * 600) elif bw >= 1: share = 40 limit = int(bw * 400) else: share = 30 limit = int(bw * 300) # NAT → UPnP + warnings upnp = True nat_type = "unknown" external_ip = None if nat: nat_type = nat.nat_type external_ip = nat.external_ip if nat.nat_type == "symmetric": warnings.append( "Symmetric NAT detected — inbound connections will not work. " "SSU2 relay mode recommended." ) upnp = False elif nat.nat_type == "cone": upnp = True elif not nat.nat_present: upnp = False notes.append("Open internet detected — no NAT traversal needed.") # Floodfill: requires good CPU + good bandwidth + non-symmetric NAT floodfill = ( floodfill_eligible and share >= 60 and (nat is None or nat.nat_type != "symmetric") ) # Port selection listen_port = _find_available_port(9700) return ConfigRecommendation( bandwidth_limit_kbps=limit, share_percentage=share, inbound_tunnel_count=tunnels, outbound_tunnel_count=tunnels, floodfill=floodfill, nat_type=nat_type, external_ip=external_ip, upnp_enabled=upnp, listen_port=listen_port, warnings=warnings, notes=notes, )