"""Tests for ConnectionManager.""" import asyncio import pytest class TestConnectionManager: def test_create_manager(self): from i2p_streaming.manager import ConnectionManager mgr = ConnectionManager() assert mgr.active_count == 0 def test_create_manager_with_options(self): from i2p_streaming.manager import ConnectionManager from i2p_streaming.options import StreamOptions opts = StreamOptions(max_window_size=64) mgr = ConnectionManager(options=opts) assert mgr._options.max_window_size == 64 @pytest.mark.asyncio async def test_connect_creates_connection(self): from i2p_streaming.manager import ConnectionManager mgr = ConnectionManager() conn = await mgr.connect(b"destination1") assert conn is not None assert mgr.active_count == 1 @pytest.mark.asyncio async def test_connect_assigns_unique_ids(self): from i2p_streaming.manager import ConnectionManager mgr = ConnectionManager() c1 = await mgr.connect(b"dest1") c2 = await mgr.connect(b"dest2") assert c1 is not c2 assert mgr.active_count == 2 @pytest.mark.asyncio async def test_get_connection(self): from i2p_streaming.manager import ConnectionManager mgr = ConnectionManager() conn = await mgr.connect(b"dest1") retrieved = mgr.get_connection(1) assert retrieved is conn def test_get_nonexistent_connection(self): from i2p_streaming.manager import ConnectionManager mgr = ConnectionManager() assert mgr.get_connection(999) is None @pytest.mark.asyncio async def test_receive_packet_routes_to_connection(self): from i2p_streaming.manager import ConnectionManager from i2p_streaming.packet import StreamPacket, Flags mgr = ConnectionManager() conn = await mgr.connect(b"dest1") conn_id = 1 # Create a packet addressed to this connection pkt = StreamPacket( send_id=42, recv_id=conn_id, seq_num=0, ack_through=0, flags=0, payload=b"hello" ) mgr.receive_packet(pkt) # Packet should have been routed (no error) @pytest.mark.asyncio async def test_accept_queue(self): from i2p_streaming.manager import ConnectionManager from i2p_streaming.packet import StreamPacket, Flags mgr = ConnectionManager() # Simulate incoming SYN packet for unknown connection pkt = StreamPacket( send_id=100, recv_id=0, seq_num=0, ack_through=0, flags=Flags.SYNCHRONIZE, payload=b"" ) mgr.receive_packet(pkt) # Accept should return the new connection conn = await asyncio.wait_for(mgr.accept(), timeout=1.0) assert conn is not None @pytest.mark.asyncio async def test_close_all(self): from i2p_streaming.manager import ConnectionManager mgr = ConnectionManager() await mgr.connect(b"dest1") await mgr.connect(b"dest2") assert mgr.active_count == 2 await mgr.close_all() assert mgr.active_count == 0 def test_bandwidth_estimator_accessible(self): from i2p_streaming.manager import ConnectionManager from i2p_streaming.bandwidth import BandwidthEstimator mgr = ConnectionManager() assert isinstance(mgr.bandwidth, BandwidthEstimator) @pytest.mark.asyncio async def test_max_concurrent_streams_respected(self): from i2p_streaming.manager import ConnectionManager from i2p_streaming.options import StreamOptions opts = StreamOptions(max_concurrent_streams=2) mgr = ConnectionManager(options=opts) await mgr.connect(b"dest1") await mgr.connect(b"dest2") with pytest.raises(ConnectionError): await mgr.connect(b"dest3")