A Python port of the Invisible Internet Project (I2P)
at main 110 lines 3.9 kB view raw
1"""Connection manager for I2P streaming. 2 3Ported from net.i2p.client.streaming.impl.ConnectionManager. 4""" 5 6import asyncio 7 8from i2p_streaming.bandwidth import BandwidthEstimator 9from i2p_streaming.connection import ConnectionState, StreamConnection 10from i2p_streaming.options import StreamOptions 11from i2p_streaming.packet import Flags, StreamPacket 12 13 14class ConnectionManager: 15 """Manages multiple streaming connections. 16 17 Handles connection lifecycle, packet routing, and resource limits. 18 """ 19 20 def __init__(self, options: StreamOptions | None = None): 21 self._connections: dict[int, StreamConnection] = {} 22 self._accept_queue: asyncio.Queue[StreamConnection] = asyncio.Queue() 23 self._options = options or StreamOptions() 24 self._next_id = 1 25 self._bandwidth = BandwidthEstimator() 26 self._lock = asyncio.Lock() 27 28 async def connect(self, destination: bytes) -> StreamConnection: 29 """Initiate outbound connection. 30 31 Args: 32 destination: The remote I2P destination. 33 34 Returns: 35 A new StreamConnection in SYN_SENT state. 36 37 Raises: 38 ConnectionError: If max concurrent streams is exceeded. 39 """ 40 async with self._lock: 41 if len(self._connections) >= self._options.max_concurrent_streams: 42 raise ConnectionError( 43 f"Max concurrent streams ({self._options.max_concurrent_streams}) exceeded" 44 ) 45 conn = StreamConnection(options=self._options) 46 conn.destination = destination 47 conn_id = self._next_id 48 self._next_id += 1 49 self._connections[conn_id] = conn 50 conn.send_syn() 51 return conn 52 53 async def accept(self) -> StreamConnection: 54 """Wait for and accept incoming connection. 55 56 Returns: 57 A StreamConnection for the accepted connection. 58 """ 59 return await self._accept_queue.get() 60 61 def receive_packet(self, packet: StreamPacket) -> None: 62 """Route incoming packet to correct connection. 63 64 If the packet is a SYN for an unknown connection, create a new 65 connection and place it on the accept queue. 66 """ 67 conn_id = packet.recv_id 68 69 if conn_id in self._connections: 70 conn = self._connections[conn_id] 71 self._bandwidth.record_received(len(packet.payload)) 72 return 73 74 # New incoming connection (SYN with recv_id=0) 75 if packet.flags & Flags.SYNCHRONIZE: 76 conn = StreamConnection(options=self._options) 77 conn.receive_syn() 78 new_id = self._next_id 79 self._next_id += 1 80 self._connections[new_id] = conn 81 self._bandwidth.record_received(len(packet.payload)) 82 try: 83 self._accept_queue.put_nowait(conn) 84 except asyncio.QueueFull: 85 # Accept backlog full, drop the connection 86 del self._connections[new_id] 87 88 def get_connection(self, conn_id: int) -> StreamConnection | None: 89 """Get a connection by its ID.""" 90 return self._connections.get(conn_id) 91 92 async def close_all(self) -> None: 93 """Gracefully close all connections.""" 94 async with self._lock: 95 for conn_id in list(self._connections.keys()): 96 conn = self._connections[conn_id] 97 if conn.state not in (ConnectionState.CLOSED, 98 ConnectionState.RESET): 99 conn.reset() 100 self._connections.clear() 101 102 @property 103 def active_count(self) -> int: 104 """Number of active (non-closed) connections.""" 105 return len(self._connections) 106 107 @property 108 def bandwidth(self) -> BandwidthEstimator: 109 """The bandwidth estimator for this manager.""" 110 return self._bandwidth