"""UPnP port mapping via SSDP discovery and IGD control. Provides automatic port forwarding for NTCP2 and SSU2 transports when behind a UPnP-capable router. Ported from net.i2p.router.transport.UPnP. """ import asyncio import logging import socket import xml.etree.ElementTree as ET from urllib.parse import urlparse logger = logging.getLogger(__name__) # SSDP constants SSDP_ADDR = "239.255.255.250" SSDP_PORT = 1900 SSDP_MX = 3 # Max wait time in seconds # UPnP service types to search for (in preference order) _SERVICE_TYPES = [ "urn:schemas-upnp-org:service:WANIPConnection:1", "urn:schemas-upnp-org:service:WANPPPConnection:1", ] # SSDP search target _SEARCH_TARGET = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" class UPnPMapping: """Represents a single port mapping.""" def __init__(self, external_port: int, internal_port: int, protocol: str, description: str = "I2P") -> None: self.external_port = external_port self.internal_port = internal_port self.protocol = protocol # "TCP" or "UDP" self.description = description class UPnPManager: """Manages UPnP port mappings for I2P transports. Discovery flow: 1. Send SSDP M-SEARCH for InternetGatewayDevice 2. Fetch and parse device description XML for control URL 3. Send SOAP AddPortMapping / DeletePortMapping requests """ def __init__(self) -> None: self._control_url: str | None = None self._service_type: str | None = None self._mappings: list[UPnPMapping] = [] self._external_ip: str | None = None # ------------------------------------------------------------------ # Discovery # ------------------------------------------------------------------ async def discover(self, timeout: float = 5.0) -> bool: """Discover UPnP gateway via SSDP. Returns True if an IGD was found and a control URL was obtained. """ loop = asyncio.get_event_loop() try: # Send SSDP M-SEARCH sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, socket.IPPROTO_UDP) sock.settimeout(timeout) sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) request = self.build_ssdp_request() sock.sendto(request, (SSDP_ADDR, SSDP_PORT)) try: data, _ = await asyncio.wait_for( loop.run_in_executor(None, lambda: sock.recvfrom(4096)), timeout=timeout, ) except (asyncio.TimeoutError, socket.timeout): logger.debug("SSDP discovery timed out") return False finally: sock.close() location = self.parse_ssdp_response(data) if location is None: logger.debug("No LOCATION in SSDP response") return False # Fetch device description and extract control URL return await self._fetch_control_url(location) except Exception: logger.debug("UPnP discovery failed", exc_info=True) return False async def _fetch_control_url(self, location: str) -> bool: """Fetch device description XML and extract the control URL.""" try: import urllib.request loop = asyncio.get_event_loop() response = await loop.run_in_executor( None, lambda: urllib.request.urlopen(location, timeout=5).read(), ) root = ET.fromstring(response) # Find WANIPConnection or WANPPPConnection service ns = {"upnp": "urn:schemas-upnp-org:device-1-0"} for service_el in root.iter("{urn:schemas-upnp-org:device-1-0}service"): stype_el = service_el.find( "{urn:schemas-upnp-org:device-1-0}serviceType") ctrl_el = service_el.find( "{urn:schemas-upnp-org:device-1-0}controlURL") if stype_el is not None and ctrl_el is not None: stype = stype_el.text or "" if stype in _SERVICE_TYPES: self._service_type = stype # Build absolute control URL parsed = urlparse(location) base = f"{parsed.scheme}://{parsed.netloc}" ctrl_path = ctrl_el.text or "" if ctrl_path.startswith("http"): self._control_url = ctrl_path else: self._control_url = base + ctrl_path logger.info("UPnP control URL: %s", self._control_url) return True logger.debug("No supported service found in device description") return False except Exception: logger.debug("Failed to fetch UPnP device description", exc_info=True) return False # ------------------------------------------------------------------ # External IP # ------------------------------------------------------------------ async def get_external_ip(self) -> str | None: """Get external IP from gateway via SOAP.""" if not self._control_url or not self._service_type: return None try: xml_body = self.build_soap_request( action="GetExternalIPAddress", args={}, ) response = await self._soap_call(xml_body, "GetExternalIPAddress") if response is not None: root = ET.fromstring(response) for elem in root.iter(): if "NewExternalIPAddress" in (elem.tag or ""): self._external_ip = elem.text return self._external_ip except Exception: logger.debug("Failed to get external IP", exc_info=True) return None # ------------------------------------------------------------------ # Port mapping management # ------------------------------------------------------------------ async def add_mapping(self, mapping: UPnPMapping) -> bool: """Add a port mapping. Returns True on success.""" if not self._control_url or not self._service_type: return False # Get local IP for internal client local_ip = self._get_local_ip() xml_body = self.build_soap_request( action="AddPortMapping", args={ "NewRemoteHost": "", "NewExternalPort": str(mapping.external_port), "NewProtocol": mapping.protocol, "NewInternalPort": str(mapping.internal_port), "NewInternalClient": local_ip, "NewEnabled": "1", "NewPortMappingDescription": mapping.description, "NewLeaseDuration": "0", }, ) try: response = await self._soap_call(xml_body, "AddPortMapping") if response is not None: self._mappings.append(mapping) logger.info( "UPnP mapping added: %s %d -> %d", mapping.protocol, mapping.external_port, mapping.internal_port, ) return True except Exception: logger.debug("Failed to add UPnP mapping", exc_info=True) return False async def remove_mapping(self, mapping: UPnPMapping) -> bool: """Remove a port mapping. Returns True on success.""" if not self._control_url or not self._service_type: return False xml_body = self.build_soap_request( action="DeletePortMapping", args={ "NewRemoteHost": "", "NewExternalPort": str(mapping.external_port), "NewProtocol": mapping.protocol, }, ) try: response = await self._soap_call(xml_body, "DeletePortMapping") if response is not None: self._mappings = [ m for m in self._mappings if not (m.external_port == mapping.external_port and m.protocol == mapping.protocol) ] logger.info( "UPnP mapping removed: %s %d", mapping.protocol, mapping.external_port, ) return True except Exception: logger.debug("Failed to remove UPnP mapping", exc_info=True) return False async def remove_all_mappings(self) -> None: """Remove all registered mappings.""" for mapping in list(self._mappings): await self.remove_mapping(mapping) # ------------------------------------------------------------------ # Message builders # ------------------------------------------------------------------ def build_ssdp_request(self) -> bytes: """Build SSDP M-SEARCH request for Internet Gateway Devices.""" lines = [ "M-SEARCH * HTTP/1.1", f"HOST: {SSDP_ADDR}:{SSDP_PORT}", 'MAN: "ssdp:discover"', f"MX: {SSDP_MX}", f"ST: {_SEARCH_TARGET}", "", "", ] return "\r\n".join(lines).encode("utf-8") def parse_ssdp_response(self, data: bytes) -> str | None: """Parse SSDP response and return the LOCATION URL, or None.""" try: text = data.decode("utf-8", errors="replace") for line in text.split("\r\n"): if line.lower().startswith("location:"): return line.split(":", 1)[1].strip() except Exception: pass return None def build_soap_request(self, action: str, args: dict) -> str: """Build SOAP XML request for an IGD action. Args: action: The UPnP action name (e.g. "AddPortMapping"). args: Dictionary of argument name -> value pairs. Returns: SOAP XML envelope as a string. """ service_type = self._service_type or _SERVICE_TYPES[0] args_xml = "" for name, value in args.items(): args_xml += f"<{name}>{value}\n" return ( '\n' '\n' "\n" f'\n' f"{args_xml}" f"\n" "\n" "" ) # ------------------------------------------------------------------ # Internal helpers # ------------------------------------------------------------------ async def _soap_call(self, xml_body: str, action: str) -> str | None: """Send a SOAP request and return the response body.""" if not self._control_url or not self._service_type: return None import urllib.request headers = { "Content-Type": "text/xml; charset=utf-8", "SOAPAction": f'"{self._service_type}#{action}"', } req = urllib.request.Request( self._control_url, data=xml_body.encode("utf-8"), headers=headers, method="POST", ) loop = asyncio.get_event_loop() try: response = await loop.run_in_executor( None, lambda: urllib.request.urlopen(req, timeout=5).read(), ) return response.decode("utf-8", errors="replace") except Exception: logger.debug("SOAP call %s failed", action, exc_info=True) return None @staticmethod def _get_local_ip() -> str: """Get the local IP address used for default route.""" try: s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) s.connect(("8.8.8.8", 80)) ip = s.getsockname()[0] s.close() return ip except Exception: return "0.0.0.0" # ------------------------------------------------------------------ # Properties # ------------------------------------------------------------------ @property def is_available(self) -> bool: """True if a UPnP gateway has been discovered.""" return self._control_url is not None