"""Connection manager for I2P streaming. Ported from net.i2p.client.streaming.impl.ConnectionManager. """ import asyncio from i2p_streaming.bandwidth import BandwidthEstimator from i2p_streaming.connection import ConnectionState, StreamConnection from i2p_streaming.options import StreamOptions from i2p_streaming.packet import Flags, StreamPacket class ConnectionManager: """Manages multiple streaming connections. Handles connection lifecycle, packet routing, and resource limits. """ def __init__(self, options: StreamOptions | None = None): self._connections: dict[int, StreamConnection] = {} self._accept_queue: asyncio.Queue[StreamConnection] = asyncio.Queue() self._options = options or StreamOptions() self._next_id = 1 self._bandwidth = BandwidthEstimator() self._lock = asyncio.Lock() async def connect(self, destination: bytes) -> StreamConnection: """Initiate outbound connection. Args: destination: The remote I2P destination. Returns: A new StreamConnection in SYN_SENT state. Raises: ConnectionError: If max concurrent streams is exceeded. """ async with self._lock: if len(self._connections) >= self._options.max_concurrent_streams: raise ConnectionError( f"Max concurrent streams ({self._options.max_concurrent_streams}) exceeded" ) conn = StreamConnection(options=self._options) conn.destination = destination conn_id = self._next_id self._next_id += 1 self._connections[conn_id] = conn conn.send_syn() return conn async def accept(self) -> StreamConnection: """Wait for and accept incoming connection. Returns: A StreamConnection for the accepted connection. """ return await self._accept_queue.get() def receive_packet(self, packet: StreamPacket) -> None: """Route incoming packet to correct connection. If the packet is a SYN for an unknown connection, create a new connection and place it on the accept queue. """ conn_id = packet.recv_id if conn_id in self._connections: conn = self._connections[conn_id] self._bandwidth.record_received(len(packet.payload)) return # New incoming connection (SYN with recv_id=0) if packet.flags & Flags.SYNCHRONIZE: conn = StreamConnection(options=self._options) conn.receive_syn() new_id = self._next_id self._next_id += 1 self._connections[new_id] = conn self._bandwidth.record_received(len(packet.payload)) try: self._accept_queue.put_nowait(conn) except asyncio.QueueFull: # Accept backlog full, drop the connection del self._connections[new_id] def get_connection(self, conn_id: int) -> StreamConnection | None: """Get a connection by its ID.""" return self._connections.get(conn_id) async def close_all(self) -> None: """Gracefully close all connections.""" async with self._lock: for conn_id in list(self._connections.keys()): conn = self._connections[conn_id] if conn.state not in (ConnectionState.CLOSED, ConnectionState.RESET): conn.reset() self._connections.clear() @property def active_count(self) -> int: """Number of active (non-closed) connections.""" return len(self._connections) @property def bandwidth(self) -> BandwidthEstimator: """The bandwidth estimator for this manager.""" return self._bandwidth