A Python port of the Invisible Internet Project (I2P)
1"""UPnP port mapping via SSDP discovery and IGD control.
2
3Provides automatic port forwarding for NTCP2 and SSU2 transports
4when behind a UPnP-capable router.
5
6Ported from net.i2p.router.transport.UPnP.
7"""
8
9import asyncio
10import logging
11import socket
12import xml.etree.ElementTree as ET
13from urllib.parse import urlparse
14
15logger = logging.getLogger(__name__)
16
17# SSDP constants
18SSDP_ADDR = "239.255.255.250"
19SSDP_PORT = 1900
20SSDP_MX = 3 # Max wait time in seconds
21
22# UPnP service types to search for (in preference order)
23_SERVICE_TYPES = [
24 "urn:schemas-upnp-org:service:WANIPConnection:1",
25 "urn:schemas-upnp-org:service:WANPPPConnection:1",
26]
27
28# SSDP search target
29_SEARCH_TARGET = "urn:schemas-upnp-org:device:InternetGatewayDevice:1"
30
31
32class UPnPMapping:
33 """Represents a single port mapping."""
34
35 def __init__(self, external_port: int, internal_port: int,
36 protocol: str, description: str = "I2P") -> None:
37 self.external_port = external_port
38 self.internal_port = internal_port
39 self.protocol = protocol # "TCP" or "UDP"
40 self.description = description
41
42
43class UPnPManager:
44 """Manages UPnP port mappings for I2P transports.
45
46 Discovery flow:
47 1. Send SSDP M-SEARCH for InternetGatewayDevice
48 2. Fetch and parse device description XML for control URL
49 3. Send SOAP AddPortMapping / DeletePortMapping requests
50 """
51
52 def __init__(self) -> None:
53 self._control_url: str | None = None
54 self._service_type: str | None = None
55 self._mappings: list[UPnPMapping] = []
56 self._external_ip: str | None = None
57
58 # ------------------------------------------------------------------
59 # Discovery
60 # ------------------------------------------------------------------
61
62 async def discover(self, timeout: float = 5.0) -> bool:
63 """Discover UPnP gateway via SSDP.
64
65 Returns True if an IGD was found and a control URL was obtained.
66 """
67 loop = asyncio.get_event_loop()
68
69 try:
70 # Send SSDP M-SEARCH
71 sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM,
72 socket.IPPROTO_UDP)
73 sock.settimeout(timeout)
74 sock.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
75
76 request = self.build_ssdp_request()
77 sock.sendto(request, (SSDP_ADDR, SSDP_PORT))
78
79 try:
80 data, _ = await asyncio.wait_for(
81 loop.run_in_executor(None, lambda: sock.recvfrom(4096)),
82 timeout=timeout,
83 )
84 except (asyncio.TimeoutError, socket.timeout):
85 logger.debug("SSDP discovery timed out")
86 return False
87 finally:
88 sock.close()
89
90 location = self.parse_ssdp_response(data)
91 if location is None:
92 logger.debug("No LOCATION in SSDP response")
93 return False
94
95 # Fetch device description and extract control URL
96 return await self._fetch_control_url(location)
97
98 except Exception:
99 logger.debug("UPnP discovery failed", exc_info=True)
100 return False
101
102 async def _fetch_control_url(self, location: str) -> bool:
103 """Fetch device description XML and extract the control URL."""
104 try:
105 import urllib.request
106 loop = asyncio.get_event_loop()
107 response = await loop.run_in_executor(
108 None,
109 lambda: urllib.request.urlopen(location, timeout=5).read(),
110 )
111 root = ET.fromstring(response)
112
113 # Find WANIPConnection or WANPPPConnection service
114 ns = {"upnp": "urn:schemas-upnp-org:device-1-0"}
115 for service_el in root.iter("{urn:schemas-upnp-org:device-1-0}service"):
116 stype_el = service_el.find(
117 "{urn:schemas-upnp-org:device-1-0}serviceType")
118 ctrl_el = service_el.find(
119 "{urn:schemas-upnp-org:device-1-0}controlURL")
120 if stype_el is not None and ctrl_el is not None:
121 stype = stype_el.text or ""
122 if stype in _SERVICE_TYPES:
123 self._service_type = stype
124 # Build absolute control URL
125 parsed = urlparse(location)
126 base = f"{parsed.scheme}://{parsed.netloc}"
127 ctrl_path = ctrl_el.text or ""
128 if ctrl_path.startswith("http"):
129 self._control_url = ctrl_path
130 else:
131 self._control_url = base + ctrl_path
132 logger.info("UPnP control URL: %s", self._control_url)
133 return True
134
135 logger.debug("No supported service found in device description")
136 return False
137
138 except Exception:
139 logger.debug("Failed to fetch UPnP device description", exc_info=True)
140 return False
141
142 # ------------------------------------------------------------------
143 # External IP
144 # ------------------------------------------------------------------
145
146 async def get_external_ip(self) -> str | None:
147 """Get external IP from gateway via SOAP."""
148 if not self._control_url or not self._service_type:
149 return None
150
151 try:
152 xml_body = self.build_soap_request(
153 action="GetExternalIPAddress",
154 args={},
155 )
156 response = await self._soap_call(xml_body, "GetExternalIPAddress")
157 if response is not None:
158 root = ET.fromstring(response)
159 for elem in root.iter():
160 if "NewExternalIPAddress" in (elem.tag or ""):
161 self._external_ip = elem.text
162 return self._external_ip
163 except Exception:
164 logger.debug("Failed to get external IP", exc_info=True)
165
166 return None
167
168 # ------------------------------------------------------------------
169 # Port mapping management
170 # ------------------------------------------------------------------
171
172 async def add_mapping(self, mapping: UPnPMapping) -> bool:
173 """Add a port mapping. Returns True on success."""
174 if not self._control_url or not self._service_type:
175 return False
176
177 # Get local IP for internal client
178 local_ip = self._get_local_ip()
179
180 xml_body = self.build_soap_request(
181 action="AddPortMapping",
182 args={
183 "NewRemoteHost": "",
184 "NewExternalPort": str(mapping.external_port),
185 "NewProtocol": mapping.protocol,
186 "NewInternalPort": str(mapping.internal_port),
187 "NewInternalClient": local_ip,
188 "NewEnabled": "1",
189 "NewPortMappingDescription": mapping.description,
190 "NewLeaseDuration": "0",
191 },
192 )
193
194 try:
195 response = await self._soap_call(xml_body, "AddPortMapping")
196 if response is not None:
197 self._mappings.append(mapping)
198 logger.info(
199 "UPnP mapping added: %s %d -> %d",
200 mapping.protocol, mapping.external_port,
201 mapping.internal_port,
202 )
203 return True
204 except Exception:
205 logger.debug("Failed to add UPnP mapping", exc_info=True)
206
207 return False
208
209 async def remove_mapping(self, mapping: UPnPMapping) -> bool:
210 """Remove a port mapping. Returns True on success."""
211 if not self._control_url or not self._service_type:
212 return False
213
214 xml_body = self.build_soap_request(
215 action="DeletePortMapping",
216 args={
217 "NewRemoteHost": "",
218 "NewExternalPort": str(mapping.external_port),
219 "NewProtocol": mapping.protocol,
220 },
221 )
222
223 try:
224 response = await self._soap_call(xml_body, "DeletePortMapping")
225 if response is not None:
226 self._mappings = [
227 m for m in self._mappings
228 if not (m.external_port == mapping.external_port
229 and m.protocol == mapping.protocol)
230 ]
231 logger.info(
232 "UPnP mapping removed: %s %d",
233 mapping.protocol, mapping.external_port,
234 )
235 return True
236 except Exception:
237 logger.debug("Failed to remove UPnP mapping", exc_info=True)
238
239 return False
240
241 async def remove_all_mappings(self) -> None:
242 """Remove all registered mappings."""
243 for mapping in list(self._mappings):
244 await self.remove_mapping(mapping)
245
246 # ------------------------------------------------------------------
247 # Message builders
248 # ------------------------------------------------------------------
249
250 def build_ssdp_request(self) -> bytes:
251 """Build SSDP M-SEARCH request for Internet Gateway Devices."""
252 lines = [
253 "M-SEARCH * HTTP/1.1",
254 f"HOST: {SSDP_ADDR}:{SSDP_PORT}",
255 'MAN: "ssdp:discover"',
256 f"MX: {SSDP_MX}",
257 f"ST: {_SEARCH_TARGET}",
258 "",
259 "",
260 ]
261 return "\r\n".join(lines).encode("utf-8")
262
263 def parse_ssdp_response(self, data: bytes) -> str | None:
264 """Parse SSDP response and return the LOCATION URL, or None."""
265 try:
266 text = data.decode("utf-8", errors="replace")
267 for line in text.split("\r\n"):
268 if line.lower().startswith("location:"):
269 return line.split(":", 1)[1].strip()
270 except Exception:
271 pass
272 return None
273
274 def build_soap_request(self, action: str, args: dict) -> str:
275 """Build SOAP XML request for an IGD action.
276
277 Args:
278 action: The UPnP action name (e.g. "AddPortMapping").
279 args: Dictionary of argument name -> value pairs.
280
281 Returns:
282 SOAP XML envelope as a string.
283 """
284 service_type = self._service_type or _SERVICE_TYPES[0]
285
286 args_xml = ""
287 for name, value in args.items():
288 args_xml += f"<{name}>{value}</{name}>\n"
289
290 return (
291 '<?xml version="1.0" encoding="utf-8"?>\n'
292 '<s:Envelope xmlns:s="http://schemas.xmlsoap.org/soap/envelope/"'
293 ' s:encodingStyle="http://schemas.xmlsoap.org/soap/encoding/">\n'
294 "<s:Body>\n"
295 f'<u:{action} xmlns:u="{service_type}">\n'
296 f"{args_xml}"
297 f"</u:{action}>\n"
298 "</s:Body>\n"
299 "</s:Envelope>"
300 )
301
302 # ------------------------------------------------------------------
303 # Internal helpers
304 # ------------------------------------------------------------------
305
306 async def _soap_call(self, xml_body: str, action: str) -> str | None:
307 """Send a SOAP request and return the response body."""
308 if not self._control_url or not self._service_type:
309 return None
310
311 import urllib.request
312
313 headers = {
314 "Content-Type": "text/xml; charset=utf-8",
315 "SOAPAction": f'"{self._service_type}#{action}"',
316 }
317 req = urllib.request.Request(
318 self._control_url,
319 data=xml_body.encode("utf-8"),
320 headers=headers,
321 method="POST",
322 )
323
324 loop = asyncio.get_event_loop()
325 try:
326 response = await loop.run_in_executor(
327 None,
328 lambda: urllib.request.urlopen(req, timeout=5).read(),
329 )
330 return response.decode("utf-8", errors="replace")
331 except Exception:
332 logger.debug("SOAP call %s failed", action, exc_info=True)
333 return None
334
335 @staticmethod
336 def _get_local_ip() -> str:
337 """Get the local IP address used for default route."""
338 try:
339 s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
340 s.connect(("8.8.8.8", 80))
341 ip = s.getsockname()[0]
342 s.close()
343 return ip
344 except Exception:
345 return "0.0.0.0"
346
347 # ------------------------------------------------------------------
348 # Properties
349 # ------------------------------------------------------------------
350
351 @property
352 def is_available(self) -> bool:
353 """True if a UPnP gateway has been discovered."""
354 return self._control_url is not None