A Python port of the Invisible Internet Project (I2P)
at main 354 lines 13 kB view raw
1"""UPnP port mapping via SSDP discovery and IGD control. 2 3Provides automatic port forwarding for NTCP2 and SSU2 transports 4when behind a UPnP-capable router. 5 6Ported from net.i2p.router.transport.UPnP. 7""" 8 9import asyncio 10import logging 11import socket 12import xml.etree.ElementTree as ET 13from urllib.parse import urlparse 14 15logger = logging.getLogger(__name__) 16 17# SSDP constants 18SSDP_ADDR = "239.255.255.250" 19SSDP_PORT = 1900 20SSDP_MX = 3 # Max wait time in seconds 21 22# UPnP service types to search for (in preference order) 23_SERVICE_TYPES = [ 24 "urn:schemas-upnp-org:service:WANIPConnection:1", 25 "urn:schemas-upnp-org:service:WANPPPConnection:1", 26] 27 28# SSDP search target 29_SEARCH_TARGET = "urn:schemas-upnp-org:device:InternetGatewayDevice:1" 30 31 32class UPnPMapping: 33 """Represents a single port mapping.""" 34 35 def __init__(self, external_port: int, internal_port: int, 36 protocol: str, description: str = "I2P") -> None: 37 self.external_port = external_port 38 self.internal_port = internal_port 39 self.protocol = protocol # "TCP" or "UDP" 40 self.description = description 41 42 43class UPnPManager: 44 """Manages UPnP port mappings for I2P transports. 45 46 Discovery flow: 47 1. Send SSDP M-SEARCH for InternetGatewayDevice 48 2. Fetch and parse device description XML for control URL 49 3. Send SOAP AddPortMapping / DeletePortMapping requests 50 """ 51 52 def __init__(self) -> None: 53 self._control_url: str | None = None 54 self._service_type: str | None = None 55 self._mappings: list[UPnPMapping] = [] 56 self._external_ip: str | None = None 57 58 # ------------------------------------------------------------------ 59 # Discovery 60 # ------------------------------------------------------------------ 61 62 async def discover(self, timeout: float = 5.0) -> bool: 63 """Discover UPnP gateway via SSDP. 64 65 Returns True if an IGD was found and a control URL was obtained. 66 """ 67 loop = asyncio.get_event_loop() 68 69 try: 70 # Send SSDP M-SEARCH 71 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM, 72 socket.IPPROTO_UDP) 73 sock.settimeout(timeout) 74 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1) 75 76 request = self.build_ssdp_request() 77 sock.sendto(request, (SSDP_ADDR, SSDP_PORT)) 78 79 try: 80 data, _ = await asyncio.wait_for( 81 loop.run_in_executor(None, lambda: sock.recvfrom(4096)), 82 timeout=timeout, 83 ) 84 except (asyncio.TimeoutError, socket.timeout): 85 logger.debug("SSDP discovery timed out") 86 return False 87 finally: 88 sock.close() 89 90 location = self.parse_ssdp_response(data) 91 if location is None: 92 logger.debug("No LOCATION in SSDP response") 93 return False 94 95 # Fetch device description and extract control URL 96 return await self._fetch_control_url(location) 97 98 except Exception: 99 logger.debug("UPnP discovery failed", exc_info=True) 100 return False 101 102 async def _fetch_control_url(self, location: str) -> bool: 103 """Fetch device description XML and extract the control URL.""" 104 try: 105 import urllib.request 106 loop = asyncio.get_event_loop() 107 response = await loop.run_in_executor( 108 None, 109 lambda: urllib.request.urlopen(location, timeout=5).read(), 110 ) 111 root = ET.fromstring(response) 112 113 # Find WANIPConnection or WANPPPConnection service 114 ns = {"upnp": "urn:schemas-upnp-org:device-1-0"} 115 for service_el in root.iter("{urn:schemas-upnp-org:device-1-0}service"): 116 stype_el = service_el.find( 117 "{urn:schemas-upnp-org:device-1-0}serviceType") 118 ctrl_el = service_el.find( 119 "{urn:schemas-upnp-org:device-1-0}controlURL") 120 if stype_el is not None and ctrl_el is not None: 121 stype = stype_el.text or "" 122 if stype in _SERVICE_TYPES: 123 self._service_type = stype 124 # Build absolute control URL 125 parsed = urlparse(location) 126 base = f"{parsed.scheme}://{parsed.netloc}" 127 ctrl_path = ctrl_el.text or "" 128 if ctrl_path.startswith("http"): 129 self._control_url = ctrl_path 130 else: 131 self._control_url = base + ctrl_path 132 logger.info("UPnP control URL: %s", self._control_url) 133 return True 134 135 logger.debug("No supported service found in device description") 136 return False 137 138 except Exception: 139 logger.debug("Failed to fetch UPnP device description", exc_info=True) 140 return False 141 142 # ------------------------------------------------------------------ 143 # External IP 144 # ------------------------------------------------------------------ 145 146 async def get_external_ip(self) -> str | None: 147 """Get external IP from gateway via SOAP.""" 148 if not self._control_url or not self._service_type: 149 return None 150 151 try: 152 xml_body = self.build_soap_request( 153 action="GetExternalIPAddress", 154 args={}, 155 ) 156 response = await self._soap_call(xml_body, "GetExternalIPAddress") 157 if response is not None: 158 root = ET.fromstring(response) 159 for elem in root.iter(): 160 if "NewExternalIPAddress" in (elem.tag or ""): 161 self._external_ip = elem.text 162 return self._external_ip 163 except Exception: 164 logger.debug("Failed to get external IP", exc_info=True) 165 166 return None 167 168 # ------------------------------------------------------------------ 169 # Port mapping management 170 # ------------------------------------------------------------------ 171 172 async def add_mapping(self, mapping: UPnPMapping) -> bool: 173 """Add a port mapping. Returns True on success.""" 174 if not self._control_url or not self._service_type: 175 return False 176 177 # Get local IP for internal client 178 local_ip = self._get_local_ip() 179 180 xml_body = self.build_soap_request( 181 action="AddPortMapping", 182 args={ 183 "NewRemoteHost": "", 184 "NewExternalPort": str(mapping.external_port), 185 "NewProtocol": mapping.protocol, 186 "NewInternalPort": str(mapping.internal_port), 187 "NewInternalClient": local_ip, 188 "NewEnabled": "1", 189 "NewPortMappingDescription": mapping.description, 190 "NewLeaseDuration": "0", 191 }, 192 ) 193 194 try: 195 response = await self._soap_call(xml_body, "AddPortMapping") 196 if response is not None: 197 self._mappings.append(mapping) 198 logger.info( 199 "UPnP mapping added: %s %d -> %d", 200 mapping.protocol, mapping.external_port, 201 mapping.internal_port, 202 ) 203 return True 204 except Exception: 205 logger.debug("Failed to add UPnP mapping", exc_info=True) 206 207 return False 208 209 async def remove_mapping(self, mapping: UPnPMapping) -> bool: 210 """Remove a port mapping. Returns True on success.""" 211 if not self._control_url or not self._service_type: 212 return False 213 214 xml_body = self.build_soap_request( 215 action="DeletePortMapping", 216 args={ 217 "NewRemoteHost": "", 218 "NewExternalPort": str(mapping.external_port), 219 "NewProtocol": mapping.protocol, 220 }, 221 ) 222 223 try: 224 response = await self._soap_call(xml_body, "DeletePortMapping") 225 if response is not None: 226 self._mappings = [ 227 m for m in self._mappings 228 if not (m.external_port == mapping.external_port 229 and m.protocol == mapping.protocol) 230 ] 231 logger.info( 232 "UPnP mapping removed: %s %d", 233 mapping.protocol, mapping.external_port, 234 ) 235 return True 236 except Exception: 237 logger.debug("Failed to remove UPnP mapping", exc_info=True) 238 239 return False 240 241 async def remove_all_mappings(self) -> None: 242 """Remove all registered mappings.""" 243 for mapping in list(self._mappings): 244 await self.remove_mapping(mapping) 245 246 # ------------------------------------------------------------------ 247 # Message builders 248 # ------------------------------------------------------------------ 249 250 def build_ssdp_request(self) -> bytes: 251 """Build SSDP M-SEARCH request for Internet Gateway Devices.""" 252 lines = [ 253 "M-SEARCH * HTTP/1.1", 254 f"HOST: {SSDP_ADDR}:{SSDP_PORT}", 255 'MAN: "ssdp:discover"', 256 f"MX: {SSDP_MX}", 257 f"ST: {_SEARCH_TARGET}", 258 "", 259 "", 260 ] 261 return "\r\n".join(lines).encode("utf-8") 262 263 def parse_ssdp_response(self, data: bytes) -> str | None: 264 """Parse SSDP response and return the LOCATION URL, or None.""" 265 try: 266 text = data.decode("utf-8", errors="replace") 267 for line in text.split("\r\n"): 268 if line.lower().startswith("location:"): 269 return line.split(":", 1)[1].strip() 270 except Exception: 271 pass 272 return None 273 274 def build_soap_request(self, action: str, args: dict) -> str: 275 """Build SOAP XML request for an IGD action. 276 277 Args: 278 action: The UPnP action name (e.g. "AddPortMapping"). 279 args: Dictionary of argument name -> value pairs. 280 281 Returns: 282 SOAP XML envelope as a string. 283 """ 284 service_type = self._service_type or _SERVICE_TYPES[0] 285 286 args_xml = "" 287 for name, value in args.items(): 288 args_xml += f"<{name}>{value}</{name}>\n" 289 290 return ( 291 '<?xml version="1.0" encoding="utf-8"?>\n' 292 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"' 293 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">\n' 294 "<s:Body>\n" 295 f'<u:{action} xmlns:u="{service_type}">\n' 296 f"{args_xml}" 297 f"</u:{action}>\n" 298 "</s:Body>\n" 299 "</s:Envelope>" 300 ) 301 302 # ------------------------------------------------------------------ 303 # Internal helpers 304 # ------------------------------------------------------------------ 305 306 async def _soap_call(self, xml_body: str, action: str) -> str | None: 307 """Send a SOAP request and return the response body.""" 308 if not self._control_url or not self._service_type: 309 return None 310 311 import urllib.request 312 313 headers = { 314 "Content-Type": "text/xml; charset=utf-8", 315 "SOAPAction": f'"{self._service_type}#{action}"', 316 } 317 req = urllib.request.Request( 318 self._control_url, 319 data=xml_body.encode("utf-8"), 320 headers=headers, 321 method="POST", 322 ) 323 324 loop = asyncio.get_event_loop() 325 try: 326 response = await loop.run_in_executor( 327 None, 328 lambda: urllib.request.urlopen(req, timeout=5).read(), 329 ) 330 return response.decode("utf-8", errors="replace") 331 except Exception: 332 logger.debug("SOAP call %s failed", action, exc_info=True) 333 return None 334 335 @staticmethod 336 def _get_local_ip() -> str: 337 """Get the local IP address used for default route.""" 338 try: 339 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM) 340 s.connect(("8.8.8.8", 80)) 341 ip = s.getsockname()[0] 342 s.close() 343 return ip 344 except Exception: 345 return "0.0.0.0" 346 347 # ------------------------------------------------------------------ 348 # Properties 349 # ------------------------------------------------------------------ 350 351 @property 352 def is_available(self) -> bool: 353 """True if a UPnP gateway has been discovered.""" 354 return self._control_url is not None