"""Base transport interface. All transports (NTCP2, SSU2) implement this interface so TransportManager can coordinate between them. Ported from net.i2p.router.transport.Transport. """ import enum from abc import ABC, abstractmethod class TransportStyle(enum.Enum): NTCP2 = "NTCP2" SSU2 = "SSU2" class ReachabilityStatus(enum.Enum): OK = "ok" # Directly reachable FIREWALLED = "firewalled" # Behind NAT, needs relay UNKNOWN = "unknown" # Not yet determined TESTING = "testing" # Currently running peer test SYMMETRIC_NAT = "symmetric_nat" # Symmetric NAT (hardest case) # Reachability ordering — lower index is better _REACHABILITY_ORDER = [ ReachabilityStatus.OK, ReachabilityStatus.TESTING, ReachabilityStatus.FIREWALLED, ReachabilityStatus.SYMMETRIC_NAT, ReachabilityStatus.UNKNOWN, ] def _reachability_rank(status: ReachabilityStatus) -> int: """Return numeric rank for a reachability status (lower is better).""" try: return _REACHABILITY_ORDER.index(status) except ValueError: return len(_REACHABILITY_ORDER) class TransportBid: """A bid from a transport to carry a message to a peer. The TransportManager collects bids from all registered transports and selects the lowest (best) bid. """ # Sentinel values TRANSIENT_FAIL = 999 WILL_NOT_SEND = -1 def __init__(self, latency_ms: int, transport: "Transport", preference: int = 0): self.latency_ms = latency_ms self.transport = transport self.preference = preference # Tiebreaker (lower = preferred) def __lt__(self, other: "TransportBid") -> bool: if self.latency_ms != other.latency_ms: return self.latency_ms < other.latency_ms return self.preference < other.preference def __repr__(self) -> str: return (f"TransportBid(latency_ms={self.latency_ms}, " f"transport={self.transport.style.value}, " f"preference={self.preference})") class Transport(ABC): """Abstract base for I2P transports.""" @property @abstractmethod def style(self) -> TransportStyle: """The transport style (NTCP2 or SSU2).""" @abstractmethod async def start(self) -> None: """Start the transport.""" @abstractmethod async def stop(self) -> None: """Stop the transport.""" @property @abstractmethod def is_running(self) -> bool: """Whether the transport is currently running.""" @abstractmethod async def bid(self, peer_hash: bytes) -> TransportBid: """Bid to send a message to the given peer. Returns a TransportBid with latency_ms=WILL_NOT_SEND if this transport cannot reach the peer. """ @abstractmethod async def send(self, peer_hash: bytes, data: bytes) -> bool: """Send data to peer. Returns True on success.""" @property @abstractmethod def reachability(self) -> ReachabilityStatus: """Current reachability status of this transport.""" @property @abstractmethod def current_address(self) -> dict | None: """Published address for this transport (IP, port, options). Returns None if no address is available (e.g. transport not started). """