A Python port of the Invisible Internet Project (I2P)
at main 106 lines 3.9 kB view raw
1"""Tests for ConnectionManager.""" 2 3import asyncio 4 5import pytest 6 7 8class TestConnectionManager: 9 def test_create_manager(self): 10 from i2p_streaming.manager import ConnectionManager 11 mgr = ConnectionManager() 12 assert mgr.active_count == 0 13 14 def test_create_manager_with_options(self): 15 from i2p_streaming.manager import ConnectionManager 16 from i2p_streaming.options import StreamOptions 17 opts = StreamOptions(max_window_size=64) 18 mgr = ConnectionManager(options=opts) 19 assert mgr._options.max_window_size == 64 20 21 @pytest.mark.asyncio 22 async def test_connect_creates_connection(self): 23 from i2p_streaming.manager import ConnectionManager 24 mgr = ConnectionManager() 25 conn = await mgr.connect(b"destination1") 26 assert conn is not None 27 assert mgr.active_count == 1 28 29 @pytest.mark.asyncio 30 async def test_connect_assigns_unique_ids(self): 31 from i2p_streaming.manager import ConnectionManager 32 mgr = ConnectionManager() 33 c1 = await mgr.connect(b"dest1") 34 c2 = await mgr.connect(b"dest2") 35 assert c1 is not c2 36 assert mgr.active_count == 2 37 38 @pytest.mark.asyncio 39 async def test_get_connection(self): 40 from i2p_streaming.manager import ConnectionManager 41 mgr = ConnectionManager() 42 conn = await mgr.connect(b"dest1") 43 retrieved = mgr.get_connection(1) 44 assert retrieved is conn 45 46 def test_get_nonexistent_connection(self): 47 from i2p_streaming.manager import ConnectionManager 48 mgr = ConnectionManager() 49 assert mgr.get_connection(999) is None 50 51 @pytest.mark.asyncio 52 async def test_receive_packet_routes_to_connection(self): 53 from i2p_streaming.manager import ConnectionManager 54 from i2p_streaming.packet import StreamPacket, Flags 55 mgr = ConnectionManager() 56 conn = await mgr.connect(b"dest1") 57 conn_id = 1 58 # Create a packet addressed to this connection 59 pkt = StreamPacket( 60 send_id=42, recv_id=conn_id, seq_num=0, 61 ack_through=0, flags=0, payload=b"hello" 62 ) 63 mgr.receive_packet(pkt) 64 # Packet should have been routed (no error) 65 66 @pytest.mark.asyncio 67 async def test_accept_queue(self): 68 from i2p_streaming.manager import ConnectionManager 69 from i2p_streaming.packet import StreamPacket, Flags 70 mgr = ConnectionManager() 71 # Simulate incoming SYN packet for unknown connection 72 pkt = StreamPacket( 73 send_id=100, recv_id=0, seq_num=0, 74 ack_through=0, flags=Flags.SYNCHRONIZE, payload=b"" 75 ) 76 mgr.receive_packet(pkt) 77 # Accept should return the new connection 78 conn = await asyncio.wait_for(mgr.accept(), timeout=1.0) 79 assert conn is not None 80 81 @pytest.mark.asyncio 82 async def test_close_all(self): 83 from i2p_streaming.manager import ConnectionManager 84 mgr = ConnectionManager() 85 await mgr.connect(b"dest1") 86 await mgr.connect(b"dest2") 87 assert mgr.active_count == 2 88 await mgr.close_all() 89 assert mgr.active_count == 0 90 91 def test_bandwidth_estimator_accessible(self): 92 from i2p_streaming.manager import ConnectionManager 93 from i2p_streaming.bandwidth import BandwidthEstimator 94 mgr = ConnectionManager() 95 assert isinstance(mgr.bandwidth, BandwidthEstimator) 96 97 @pytest.mark.asyncio 98 async def test_max_concurrent_streams_respected(self): 99 from i2p_streaming.manager import ConnectionManager 100 from i2p_streaming.options import StreamOptions 101 opts = StreamOptions(max_concurrent_streams=2) 102 mgr = ConnectionManager(options=opts) 103 await mgr.connect(b"dest1") 104 await mgr.connect(b"dest2") 105 with pytest.raises(ConnectionError): 106 await mgr.connect(b"dest3")