A Python port of the Invisible Internet Project (I2P)
1"""Tests for ConnectionManager."""
2
3import asyncio
4
5import pytest
6
7
8class TestConnectionManager:
9 def test_create_manager(self):
10 from i2p_streaming.manager import ConnectionManager
11 mgr = ConnectionManager()
12 assert mgr.active_count == 0
13
14 def test_create_manager_with_options(self):
15 from i2p_streaming.manager import ConnectionManager
16 from i2p_streaming.options import StreamOptions
17 opts = StreamOptions(max_window_size=64)
18 mgr = ConnectionManager(options=opts)
19 assert mgr._options.max_window_size == 64
20
21 @pytest.mark.asyncio
22 async def test_connect_creates_connection(self):
23 from i2p_streaming.manager import ConnectionManager
24 mgr = ConnectionManager()
25 conn = await mgr.connect(b"destination1")
26 assert conn is not None
27 assert mgr.active_count == 1
28
29 @pytest.mark.asyncio
30 async def test_connect_assigns_unique_ids(self):
31 from i2p_streaming.manager import ConnectionManager
32 mgr = ConnectionManager()
33 c1 = await mgr.connect(b"dest1")
34 c2 = await mgr.connect(b"dest2")
35 assert c1 is not c2
36 assert mgr.active_count == 2
37
38 @pytest.mark.asyncio
39 async def test_get_connection(self):
40 from i2p_streaming.manager import ConnectionManager
41 mgr = ConnectionManager()
42 conn = await mgr.connect(b"dest1")
43 retrieved = mgr.get_connection(1)
44 assert retrieved is conn
45
46 def test_get_nonexistent_connection(self):
47 from i2p_streaming.manager import ConnectionManager
48 mgr = ConnectionManager()
49 assert mgr.get_connection(999) is None
50
51 @pytest.mark.asyncio
52 async def test_receive_packet_routes_to_connection(self):
53 from i2p_streaming.manager import ConnectionManager
54 from i2p_streaming.packet import StreamPacket, Flags
55 mgr = ConnectionManager()
56 conn = await mgr.connect(b"dest1")
57 conn_id = 1
58 # Create a packet addressed to this connection
59 pkt = StreamPacket(
60 send_id=42, recv_id=conn_id, seq_num=0,
61 ack_through=0, flags=0, payload=b"hello"
62 )
63 mgr.receive_packet(pkt)
64 # Packet should have been routed (no error)
65
66 @pytest.mark.asyncio
67 async def test_accept_queue(self):
68 from i2p_streaming.manager import ConnectionManager
69 from i2p_streaming.packet import StreamPacket, Flags
70 mgr = ConnectionManager()
71 # Simulate incoming SYN packet for unknown connection
72 pkt = StreamPacket(
73 send_id=100, recv_id=0, seq_num=0,
74 ack_through=0, flags=Flags.SYNCHRONIZE, payload=b""
75 )
76 mgr.receive_packet(pkt)
77 # Accept should return the new connection
78 conn = await asyncio.wait_for(mgr.accept(), timeout=1.0)
79 assert conn is not None
80
81 @pytest.mark.asyncio
82 async def test_close_all(self):
83 from i2p_streaming.manager import ConnectionManager
84 mgr = ConnectionManager()
85 await mgr.connect(b"dest1")
86 await mgr.connect(b"dest2")
87 assert mgr.active_count == 2
88 await mgr.close_all()
89 assert mgr.active_count == 0
90
91 def test_bandwidth_estimator_accessible(self):
92 from i2p_streaming.manager import ConnectionManager
93 from i2p_streaming.bandwidth import BandwidthEstimator
94 mgr = ConnectionManager()
95 assert isinstance(mgr.bandwidth, BandwidthEstimator)
96
97 @pytest.mark.asyncio
98 async def test_max_concurrent_streams_respected(self):
99 from i2p_streaming.manager import ConnectionManager
100 from i2p_streaming.options import StreamOptions
101 opts = StreamOptions(max_concurrent_streams=2)
102 mgr = ConnectionManager(options=opts)
103 await mgr.connect(b"dest1")
104 await mgr.connect(b"dest2")
105 with pytest.raises(ConnectionError):
106 await mgr.connect(b"dest3")