A Python port of the Invisible Internet Project (I2P)
1"""Multi-transport coordinator.
2
3Ported from net.i2p.router.transport.TransportManager.
4
5Manages NTCP2 and SSU2 transport instances, selects the best transport
6for each outbound message via competitive bidding, and publishes
7addresses in RouterInfo.
8"""
9
10import asyncio
11import logging
12
13from i2p_transport.transport_base import (
14 Transport,
15 TransportBid,
16 TransportStyle,
17 ReachabilityStatus,
18 _reachability_rank,
19)
20
21logger = logging.getLogger(__name__)
22
23
24class TransportManager:
25 """Coordinates multiple transports for message delivery.
26
27 Each registered transport bids on outbound messages. The manager
28 picks the lowest bid and delegates the send to that transport.
29 """
30
31 def __init__(self, banlist=None, router_hash: bytes | None = None) -> None:
32 self._transports: dict[TransportStyle, Transport] = {}
33 self._running = False
34 self._banlist = banlist
35 self._router_hash = router_hash
36 self._failed_counts: dict[bytes, int] = {}
37
38 # ------------------------------------------------------------------
39 # Registration
40 # ------------------------------------------------------------------
41
42 def register(self, transport: Transport) -> None:
43 """Register a transport instance (keyed by style)."""
44 self._transports[transport.style] = transport
45
46 def get_transport(self, style: TransportStyle) -> Transport | None:
47 """Get a specific transport by style."""
48 return self._transports.get(style)
49
50 # ------------------------------------------------------------------
51 # Lifecycle
52 # ------------------------------------------------------------------
53
54 async def start_all(self) -> None:
55 """Start all registered transports concurrently."""
56 if not self._transports:
57 return
58 await asyncio.gather(*(t.start() for t in self._transports.values()))
59 self._running = True
60 logger.info("TransportManager started %d transports", len(self._transports))
61
62 async def stop_all(self) -> None:
63 """Stop all registered transports concurrently."""
64 await asyncio.gather(*(t.stop() for t in self._transports.values()))
65 self._running = False
66 logger.info("TransportManager stopped")
67
68 # ------------------------------------------------------------------
69 # Sending
70 # ------------------------------------------------------------------
71
72 async def send(self, peer_hash: bytes, data: bytes) -> bool:
73 """Send data to peer using the best available transport.
74
75 Collects bids from all transports, selects the lowest bid,
76 and sends via that transport. Returns True on success.
77 """
78 # Reject self-send
79 if self._router_hash and peer_hash == self._router_hash:
80 logger.debug("Rejecting self-send to %s", peer_hash.hex()[:16])
81 return False
82
83 # Reject banlisted peers
84 if self._banlist and self._banlist.is_banlisted(peer_hash):
85 logger.debug("Rejecting send to banlisted peer %s", peer_hash.hex()[:16])
86 return False
87
88 # Reject peers with repeated failures
89 if self._failed_counts.get(peer_hash, 0) > 1:
90 logger.debug("Rejecting send to repeatedly-failed peer %s", peer_hash.hex()[:16])
91 return False
92
93 best = await self.get_best_transport(peer_hash)
94 if best is None:
95 logger.debug("No transport available for peer %s", peer_hash.hex()[:16])
96 return False
97
98 result = await best.send(peer_hash, data)
99 if result:
100 self._failed_counts.pop(peer_hash, None)
101 else:
102 count = self._failed_counts.get(peer_hash, 0) + 1
103 self._failed_counts[peer_hash] = count
104 return result
105
106 def clear_failed_count(self, peer_hash: bytes) -> None:
107 """Clear failure tracking for a peer."""
108 self._failed_counts.pop(peer_hash, None)
109
110 async def get_best_transport(self, peer_hash: bytes) -> Transport | None:
111 """Select the best transport for a peer via competitive bidding.
112
113 Returns None if no transport is willing/able to send.
114 """
115 if not self._transports:
116 return None
117
118 bids: list[TransportBid] = []
119 for transport in self._transports.values():
120 try:
121 bid = await transport.bid(peer_hash)
122 if bid.latency_ms != TransportBid.WILL_NOT_SEND:
123 bids.append(bid)
124 except Exception:
125 logger.debug(
126 "Transport %s failed to bid for peer %s",
127 transport.style.value,
128 peer_hash.hex()[:16],
129 exc_info=True,
130 )
131
132 if not bids:
133 return None
134
135 best_bid = min(bids)
136 return best_bid.transport
137
138 # ------------------------------------------------------------------
139 # Address publication
140 # ------------------------------------------------------------------
141
142 def get_addresses(self) -> list[dict]:
143 """Get published addresses from all transports."""
144 addresses: list[dict] = []
145 for transport in self._transports.values():
146 addr = transport.current_address
147 if addr is not None:
148 addresses.append(addr)
149 return addresses
150
151 # ------------------------------------------------------------------
152 # Properties
153 # ------------------------------------------------------------------
154
155 @property
156 def reachability(self) -> ReachabilityStatus:
157 """Overall reachability — best (lowest rank) of all transports."""
158 if not self._transports:
159 return ReachabilityStatus.UNKNOWN
160 best = min(
161 (t.reachability for t in self._transports.values()),
162 key=_reachability_rank,
163 )
164 return best
165
166 @property
167 def is_running(self) -> bool:
168 return self._running
169
170 @property
171 def transport_count(self) -> int:
172 return len(self._transports)