A Python port of the Invisible Internet Project (I2P)
1"""Connection manager for I2P streaming.
2
3Ported from net.i2p.client.streaming.impl.ConnectionManager.
4"""
5
6import asyncio
7
8from i2p_streaming.bandwidth import BandwidthEstimator
9from i2p_streaming.connection import ConnectionState, StreamConnection
10from i2p_streaming.options import StreamOptions
11from i2p_streaming.packet import Flags, StreamPacket
12
13
14class ConnectionManager:
15 """Manages multiple streaming connections.
16
17 Handles connection lifecycle, packet routing, and resource limits.
18 """
19
20 def __init__(self, options: StreamOptions | None = None):
21 self._connections: dict[int, StreamConnection] = {}
22 self._accept_queue: asyncio.Queue[StreamConnection] = asyncio.Queue()
23 self._options = options or StreamOptions()
24 self._next_id = 1
25 self._bandwidth = BandwidthEstimator()
26 self._lock = asyncio.Lock()
27
28 async def connect(self, destination: bytes) -> StreamConnection:
29 """Initiate outbound connection.
30
31 Args:
32 destination: The remote I2P destination.
33
34 Returns:
35 A new StreamConnection in SYN_SENT state.
36
37 Raises:
38 ConnectionError: If max concurrent streams is exceeded.
39 """
40 async with self._lock:
41 if len(self._connections) >= self._options.max_concurrent_streams:
42 raise ConnectionError(
43 f"Max concurrent streams ({self._options.max_concurrent_streams}) exceeded"
44 )
45 conn = StreamConnection(options=self._options)
46 conn.destination = destination
47 conn_id = self._next_id
48 self._next_id += 1
49 self._connections[conn_id] = conn
50 conn.send_syn()
51 return conn
52
53 async def accept(self) -> StreamConnection:
54 """Wait for and accept incoming connection.
55
56 Returns:
57 A StreamConnection for the accepted connection.
58 """
59 return await self._accept_queue.get()
60
61 def receive_packet(self, packet: StreamPacket) -> None:
62 """Route incoming packet to correct connection.
63
64 If the packet is a SYN for an unknown connection, create a new
65 connection and place it on the accept queue.
66 """
67 conn_id = packet.recv_id
68
69 if conn_id in self._connections:
70 conn = self._connections[conn_id]
71 self._bandwidth.record_received(len(packet.payload))
72 return
73
74 # New incoming connection (SYN with recv_id=0)
75 if packet.flags & Flags.SYNCHRONIZE:
76 conn = StreamConnection(options=self._options)
77 conn.receive_syn()
78 new_id = self._next_id
79 self._next_id += 1
80 self._connections[new_id] = conn
81 self._bandwidth.record_received(len(packet.payload))
82 try:
83 self._accept_queue.put_nowait(conn)
84 except asyncio.QueueFull:
85 # Accept backlog full, drop the connection
86 del self._connections[new_id]
87
88 def get_connection(self, conn_id: int) -> StreamConnection | None:
89 """Get a connection by its ID."""
90 return self._connections.get(conn_id)
91
92 async def close_all(self) -> None:
93 """Gracefully close all connections."""
94 async with self._lock:
95 for conn_id in list(self._connections.keys()):
96 conn = self._connections[conn_id]
97 if conn.state not in (ConnectionState.CLOSED,
98 ConnectionState.RESET):
99 conn.reset()
100 self._connections.clear()
101
102 @property
103 def active_count(self) -> int:
104 """Number of active (non-closed) connections."""
105 return len(self._connections)
106
107 @property
108 def bandwidth(self) -> BandwidthEstimator:
109 """The bandwidth estimator for this manager."""
110 return self._bandwidth