A Python port of the Invisible Internet Project (I2P)
1"""Service name → host/port registry.
2
3Thread-safe registry mapping service names (HTTP_PROXY, SAM, etc.)
4to local host:port pairs.
5
6Ported from net.i2p.util.PortMapper.
7"""
8
9from __future__ import annotations
10
11import threading
12from dataclasses import dataclass
13
14
15@dataclass
16class _Entry:
17 host: str
18 port: int
19
20
21class PortMapper:
22 """Thread-safe service name → (host, port) registry."""
23
24 def __init__(self) -> None:
25 self._lock = threading.Lock()
26 self._services: dict[str, _Entry] = {}
27
28 def register(self, service: str, host: str, port: int) -> None:
29 with self._lock:
30 self._services[service] = _Entry(host=host, port=port)
31
32 def unregister(self, service: str) -> None:
33 with self._lock:
34 self._services.pop(service, None)
35
36 def get_port(self, service: str, default: int = -1) -> int:
37 with self._lock:
38 entry = self._services.get(service)
39 return entry.port if entry else default
40
41 def get_host(self, service: str, default: str = "127.0.0.1") -> str:
42 with self._lock:
43 entry = self._services.get(service)
44 return entry.host if entry else default
45
46 def get_all_services(self) -> dict[str, tuple[str, int]]:
47 with self._lock:
48 return {k: (v.host, v.port) for k, v in self._services.items()}