A Python port of the Invisible Internet Project (I2P)
at main 48 lines 1.4 kB view raw
1"""Service name → host/port registry. 2 3Thread-safe registry mapping service names (HTTP_PROXY, SAM, etc.) 4to local host:port pairs. 5 6Ported from net.i2p.util.PortMapper. 7""" 8 9from __future__ import annotations 10 11import threading 12from dataclasses import dataclass 13 14 15@dataclass 16class _Entry: 17 host: str 18 port: int 19 20 21class PortMapper: 22 """Thread-safe service name → (host, port) registry.""" 23 24 def __init__(self) -> None: 25 self._lock = threading.Lock() 26 self._services: dict[str, _Entry] = {} 27 28 def register(self, service: str, host: str, port: int) -> None: 29 with self._lock: 30 self._services[service] = _Entry(host=host, port=port) 31 32 def unregister(self, service: str) -> None: 33 with self._lock: 34 self._services.pop(service, None) 35 36 def get_port(self, service: str, default: int = -1) -> int: 37 with self._lock: 38 entry = self._services.get(service) 39 return entry.port if entry else default 40 41 def get_host(self, service: str, default: str = "127.0.0.1") -> str: 42 with self._lock: 43 entry = self._services.get(service) 44 return entry.host if entry else default 45 46 def get_all_services(self) -> dict[str, tuple[str, int]]: 47 with self._lock: 48 return {k: (v.host, v.port) for k, v in self._services.items()}