"""Tests for TransportManager — multi-transport coordinator. TDD: tests written before implementation. """ import asyncio import pytest from i2p_transport.transport_base import ( Transport, TransportBid, TransportStyle, ReachabilityStatus, ) from i2p_transport.manager import TransportManager class MockTransport(Transport): """Mock transport for testing TransportManager.""" def __init__( self, style: TransportStyle, bid_latency: int = 100, reachability: ReachabilityStatus = ReachabilityStatus.OK, address: dict | None = None, ): self._style = style self._bid_latency = bid_latency self._reachability = reachability self._running = False self._sent: list[tuple[bytes, bytes]] = [] self._address = address or {"host": "127.0.0.1", "port": 15000} self._send_result = True @property def style(self) -> TransportStyle: return self._style async def start(self) -> None: self._running = True async def stop(self) -> None: self._running = False @property def is_running(self) -> bool: return self._running async def bid(self, peer_hash: bytes) -> TransportBid: if self._bid_latency == TransportBid.WILL_NOT_SEND: return TransportBid( latency_ms=TransportBid.WILL_NOT_SEND, transport=self, ) return TransportBid(latency_ms=self._bid_latency, transport=self) async def send(self, peer_hash: bytes, data: bytes) -> bool: self._sent.append((peer_hash, data)) return self._send_result @property def reachability(self) -> ReachabilityStatus: return self._reachability @property def current_address(self) -> dict | None: return self._address # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- class TestTransportManager: def test_register_transport(self): mgr = TransportManager() t = MockTransport(TransportStyle.NTCP2) mgr.register(t) assert mgr.get_transport(TransportStyle.NTCP2) is t def test_register_multiple(self): mgr = TransportManager() ntcp2 = MockTransport(TransportStyle.NTCP2) ssu2 = MockTransport(TransportStyle.SSU2) mgr.register(ntcp2) mgr.register(ssu2) assert mgr.get_transport(TransportStyle.NTCP2) is ntcp2 assert mgr.get_transport(TransportStyle.SSU2) is ssu2 @pytest.mark.asyncio async def test_start_stop_all(self): mgr = TransportManager() ntcp2 = MockTransport(TransportStyle.NTCP2) ssu2 = MockTransport(TransportStyle.SSU2) mgr.register(ntcp2) mgr.register(ssu2) await mgr.start_all() assert ntcp2.is_running assert ssu2.is_running assert mgr.is_running await mgr.stop_all() assert not ntcp2.is_running assert not ssu2.is_running assert not mgr.is_running @pytest.mark.asyncio async def test_send_selects_best_bid(self): mgr = TransportManager() slow = MockTransport(TransportStyle.NTCP2, bid_latency=200) fast = MockTransport(TransportStyle.SSU2, bid_latency=50) mgr.register(slow) mgr.register(fast) await mgr.start_all() peer = b"\x01" * 32 result = await mgr.send(peer, b"hello") assert result is True # Fast transport should have been selected assert len(fast._sent) == 1 assert len(slow._sent) == 0 assert fast._sent[0] == (peer, b"hello") @pytest.mark.asyncio async def test_send_no_transport(self): mgr = TransportManager() peer = b"\x01" * 32 result = await mgr.send(peer, b"hello") assert result is False @pytest.mark.asyncio async def test_send_will_not_send(self): mgr = TransportManager() t = MockTransport(TransportStyle.NTCP2, bid_latency=TransportBid.WILL_NOT_SEND) mgr.register(t) await mgr.start_all() peer = b"\x01" * 32 result = await mgr.send(peer, b"hello") assert result is False assert len(t._sent) == 0 def test_get_addresses(self): mgr = TransportManager() ntcp2 = MockTransport( TransportStyle.NTCP2, address={"host": "1.2.3.4", "port": 15000}, ) ssu2 = MockTransport( TransportStyle.SSU2, address={"host": "1.2.3.4", "port": 15001}, ) mgr.register(ntcp2) mgr.register(ssu2) addrs = mgr.get_addresses() assert len(addrs) == 2 ports = {a["port"] for a in addrs} assert ports == {15000, 15001} def test_reachability_best_of_all(self): mgr = TransportManager() firewalled = MockTransport( TransportStyle.NTCP2, reachability=ReachabilityStatus.FIREWALLED, ) ok = MockTransport( TransportStyle.SSU2, reachability=ReachabilityStatus.OK, ) mgr.register(firewalled) mgr.register(ok) assert mgr.reachability == ReachabilityStatus.OK def test_reachability_no_transports(self): mgr = TransportManager() assert mgr.reachability == ReachabilityStatus.UNKNOWN def test_transport_count(self): mgr = TransportManager() assert mgr.transport_count == 0 mgr.register(MockTransport(TransportStyle.NTCP2)) assert mgr.transport_count == 1 mgr.register(MockTransport(TransportStyle.SSU2)) assert mgr.transport_count == 2 @pytest.mark.asyncio async def test_get_best_transport(self): mgr = TransportManager() slow = MockTransport(TransportStyle.NTCP2, bid_latency=200) fast = MockTransport(TransportStyle.SSU2, bid_latency=50) mgr.register(slow) mgr.register(fast) await mgr.start_all() peer = b"\x01" * 32 best = await mgr.get_best_transport(peer) assert best is fast def test_get_addresses_skips_none(self): """Transports with no address should not appear in address list.""" mgr = TransportManager() t = MockTransport(TransportStyle.NTCP2, address=None) t._address = None mgr.register(t) addrs = mgr.get_addresses() assert len(addrs) == 0