A Python port of the Invisible Internet Project (I2P)
at main 172 lines 6.1 kB view raw
1"""Multi-transport coordinator. 2 3Ported from net.i2p.router.transport.TransportManager. 4 5Manages NTCP2 and SSU2 transport instances, selects the best transport 6for each outbound message via competitive bidding, and publishes 7addresses in RouterInfo. 8""" 9 10import asyncio 11import logging 12 13from i2p_transport.transport_base import ( 14 Transport, 15 TransportBid, 16 TransportStyle, 17 ReachabilityStatus, 18 _reachability_rank, 19) 20 21logger = logging.getLogger(__name__) 22 23 24class TransportManager: 25 """Coordinates multiple transports for message delivery. 26 27 Each registered transport bids on outbound messages. The manager 28 picks the lowest bid and delegates the send to that transport. 29 """ 30 31 def __init__(self, banlist=None, router_hash: bytes | None = None) -> None: 32 self._transports: dict[TransportStyle, Transport] = {} 33 self._running = False 34 self._banlist = banlist 35 self._router_hash = router_hash 36 self._failed_counts: dict[bytes, int] = {} 37 38 # ------------------------------------------------------------------ 39 # Registration 40 # ------------------------------------------------------------------ 41 42 def register(self, transport: Transport) -> None: 43 """Register a transport instance (keyed by style).""" 44 self._transports[transport.style] = transport 45 46 def get_transport(self, style: TransportStyle) -> Transport | None: 47 """Get a specific transport by style.""" 48 return self._transports.get(style) 49 50 # ------------------------------------------------------------------ 51 # Lifecycle 52 # ------------------------------------------------------------------ 53 54 async def start_all(self) -> None: 55 """Start all registered transports concurrently.""" 56 if not self._transports: 57 return 58 await asyncio.gather(*(t.start() for t in self._transports.values())) 59 self._running = True 60 logger.info("TransportManager started %d transports", len(self._transports)) 61 62 async def stop_all(self) -> None: 63 """Stop all registered transports concurrently.""" 64 await asyncio.gather(*(t.stop() for t in self._transports.values())) 65 self._running = False 66 logger.info("TransportManager stopped") 67 68 # ------------------------------------------------------------------ 69 # Sending 70 # ------------------------------------------------------------------ 71 72 async def send(self, peer_hash: bytes, data: bytes) -> bool: 73 """Send data to peer using the best available transport. 74 75 Collects bids from all transports, selects the lowest bid, 76 and sends via that transport. Returns True on success. 77 """ 78 # Reject self-send 79 if self._router_hash and peer_hash == self._router_hash: 80 logger.debug("Rejecting self-send to %s", peer_hash.hex()[:16]) 81 return False 82 83 # Reject banlisted peers 84 if self._banlist and self._banlist.is_banlisted(peer_hash): 85 logger.debug("Rejecting send to banlisted peer %s", peer_hash.hex()[:16]) 86 return False 87 88 # Reject peers with repeated failures 89 if self._failed_counts.get(peer_hash, 0) > 1: 90 logger.debug("Rejecting send to repeatedly-failed peer %s", peer_hash.hex()[:16]) 91 return False 92 93 best = await self.get_best_transport(peer_hash) 94 if best is None: 95 logger.debug("No transport available for peer %s", peer_hash.hex()[:16]) 96 return False 97 98 result = await best.send(peer_hash, data) 99 if result: 100 self._failed_counts.pop(peer_hash, None) 101 else: 102 count = self._failed_counts.get(peer_hash, 0) + 1 103 self._failed_counts[peer_hash] = count 104 return result 105 106 def clear_failed_count(self, peer_hash: bytes) -> None: 107 """Clear failure tracking for a peer.""" 108 self._failed_counts.pop(peer_hash, None) 109 110 async def get_best_transport(self, peer_hash: bytes) -> Transport | None: 111 """Select the best transport for a peer via competitive bidding. 112 113 Returns None if no transport is willing/able to send. 114 """ 115 if not self._transports: 116 return None 117 118 bids: list[TransportBid] = [] 119 for transport in self._transports.values(): 120 try: 121 bid = await transport.bid(peer_hash) 122 if bid.latency_ms != TransportBid.WILL_NOT_SEND: 123 bids.append(bid) 124 except Exception: 125 logger.debug( 126 "Transport %s failed to bid for peer %s", 127 transport.style.value, 128 peer_hash.hex()[:16], 129 exc_info=True, 130 ) 131 132 if not bids: 133 return None 134 135 best_bid = min(bids) 136 return best_bid.transport 137 138 # ------------------------------------------------------------------ 139 # Address publication 140 # ------------------------------------------------------------------ 141 142 def get_addresses(self) -> list[dict]: 143 """Get published addresses from all transports.""" 144 addresses: list[dict] = [] 145 for transport in self._transports.values(): 146 addr = transport.current_address 147 if addr is not None: 148 addresses.append(addr) 149 return addresses 150 151 # ------------------------------------------------------------------ 152 # Properties 153 # ------------------------------------------------------------------ 154 155 @property 156 def reachability(self) -> ReachabilityStatus: 157 """Overall reachability — best (lowest rank) of all transports.""" 158 if not self._transports: 159 return ReachabilityStatus.UNKNOWN 160 best = min( 161 (t.reachability for t in self._transports.values()), 162 key=_reachability_rank, 163 ) 164 return best 165 166 @property 167 def is_running(self) -> bool: 168 return self._running 169 170 @property 171 def transport_count(self) -> int: 172 return len(self._transports)