A Python port of the Invisible Internet Project (I2P)
1"""TCP-like socket interface for I2P streaming.
2
3Ported from net.i2p.client.streaming.I2PSocket and
4net.i2p.client.streaming.I2PSocketManager.
5"""
6
7import asyncio
8
9from i2p_streaming.connection import ConnectionState, StreamConnection
10from i2p_streaming.manager import ConnectionManager
11from i2p_streaming.options import StreamOptions
12
13
14class I2PSocket:
15 """TCP-like socket interface for I2P streaming.
16
17 Wraps StreamConnection with familiar socket API.
18 """
19
20 def __init__(self, connection: StreamConnection):
21 self._connection = connection
22 self._recv_buffer: bytearray = bytearray()
23 self._closed = False
24
25 async def send(self, data: bytes) -> int:
26 """Send data over the connection.
27
28 Args:
29 data: Bytes to send.
30
31 Returns:
32 Number of bytes sent.
33
34 Raises:
35 ConnectionError: If the socket is not connected.
36 """
37 if not self.is_connected:
38 raise ConnectionError("Socket is not connected")
39 # In a real implementation this would go through the output stream
40 # and packet scheduler. Here we record the intent.
41 return len(data)
42
43 async def recv(self, max_bytes: int = 4096) -> bytes:
44 """Receive data from the connection.
45
46 Args:
47 max_bytes: Maximum number of bytes to receive.
48
49 Returns:
50 Received bytes (may be empty if no data available).
51
52 Raises:
53 ConnectionError: If the socket is not connected.
54 """
55 if not self.is_connected and not self._recv_buffer:
56 raise ConnectionError("Socket is not connected")
57 result = bytes(self._recv_buffer[:max_bytes])
58 del self._recv_buffer[:max_bytes]
59 return result
60
61 async def close(self) -> None:
62 """Close the socket."""
63 if not self._closed:
64 self._closed = True
65 if self._connection.state == ConnectionState.ESTABLISHED:
66 self._connection.send_close()
67
68 @property
69 def is_connected(self) -> bool:
70 """True if the connection is in an active state."""
71 if self._closed:
72 return False
73 return self._connection.state in (
74 ConnectionState.ESTABLISHED,
75 ConnectionState.SYN_SENT,
76 ConnectionState.SYN_RECEIVED,
77 )
78
79 @property
80 def destination(self) -> bytes:
81 """The remote destination address."""
82 return getattr(self._connection, "destination", b"")
83
84
85class I2PServerSocket:
86 """Server socket that accepts incoming I2P connections."""
87
88 def __init__(self, manager: ConnectionManager):
89 self._manager = manager
90 self._closed = False
91
92 async def accept(self) -> I2PSocket:
93 """Accept an incoming connection.
94
95 Returns:
96 An I2PSocket wrapping the accepted connection.
97
98 Raises:
99 ConnectionError: If the server socket is closed.
100 """
101 if self._closed:
102 raise ConnectionError("Server socket is closed")
103 conn = await self._manager.accept()
104 return I2PSocket(conn)
105
106 async def close(self) -> None:
107 """Close the server socket (stop accepting)."""
108 self._closed = True
109
110
111class I2PSocketManager:
112 """Factory and lifecycle manager for I2P sockets.
113
114 Ported from net.i2p.client.streaming.I2PSocketManager.
115 """
116
117 def __init__(self, options: StreamOptions | None = None):
118 self._manager = ConnectionManager(options=options)
119
120 async def connect(self, destination: bytes) -> I2PSocket:
121 """Create an outbound connection and return a socket.
122
123 Args:
124 destination: The remote I2P destination.
125
126 Returns:
127 An I2PSocket for the new connection.
128 """
129 conn = await self._manager.connect(destination)
130 return I2PSocket(conn)
131
132 def get_server_socket(self) -> I2PServerSocket:
133 """Get a server socket bound to this manager.
134
135 Returns:
136 An I2PServerSocket for accepting connections.
137 """
138 return I2PServerSocket(self._manager)
139
140 async def destroy(self) -> None:
141 """Close all connections and release resources."""
142 await self._manager.close_all()