A Python port of the Invisible Internet Project (I2P)
at main 142 lines 4.2 kB view raw
1"""TCP-like socket interface for I2P streaming. 2 3Ported from net.i2p.client.streaming.I2PSocket and 4net.i2p.client.streaming.I2PSocketManager. 5""" 6 7import asyncio 8 9from i2p_streaming.connection import ConnectionState, StreamConnection 10from i2p_streaming.manager import ConnectionManager 11from i2p_streaming.options import StreamOptions 12 13 14class I2PSocket: 15 """TCP-like socket interface for I2P streaming. 16 17 Wraps StreamConnection with familiar socket API. 18 """ 19 20 def __init__(self, connection: StreamConnection): 21 self._connection = connection 22 self._recv_buffer: bytearray = bytearray() 23 self._closed = False 24 25 async def send(self, data: bytes) -> int: 26 """Send data over the connection. 27 28 Args: 29 data: Bytes to send. 30 31 Returns: 32 Number of bytes sent. 33 34 Raises: 35 ConnectionError: If the socket is not connected. 36 """ 37 if not self.is_connected: 38 raise ConnectionError("Socket is not connected") 39 # In a real implementation this would go through the output stream 40 # and packet scheduler. Here we record the intent. 41 return len(data) 42 43 async def recv(self, max_bytes: int = 4096) -> bytes: 44 """Receive data from the connection. 45 46 Args: 47 max_bytes: Maximum number of bytes to receive. 48 49 Returns: 50 Received bytes (may be empty if no data available). 51 52 Raises: 53 ConnectionError: If the socket is not connected. 54 """ 55 if not self.is_connected and not self._recv_buffer: 56 raise ConnectionError("Socket is not connected") 57 result = bytes(self._recv_buffer[:max_bytes]) 58 del self._recv_buffer[:max_bytes] 59 return result 60 61 async def close(self) -> None: 62 """Close the socket.""" 63 if not self._closed: 64 self._closed = True 65 if self._connection.state == ConnectionState.ESTABLISHED: 66 self._connection.send_close() 67 68 @property 69 def is_connected(self) -> bool: 70 """True if the connection is in an active state.""" 71 if self._closed: 72 return False 73 return self._connection.state in ( 74 ConnectionState.ESTABLISHED, 75 ConnectionState.SYN_SENT, 76 ConnectionState.SYN_RECEIVED, 77 ) 78 79 @property 80 def destination(self) -> bytes: 81 """The remote destination address.""" 82 return getattr(self._connection, "destination", b"") 83 84 85class I2PServerSocket: 86 """Server socket that accepts incoming I2P connections.""" 87 88 def __init__(self, manager: ConnectionManager): 89 self._manager = manager 90 self._closed = False 91 92 async def accept(self) -> I2PSocket: 93 """Accept an incoming connection. 94 95 Returns: 96 An I2PSocket wrapping the accepted connection. 97 98 Raises: 99 ConnectionError: If the server socket is closed. 100 """ 101 if self._closed: 102 raise ConnectionError("Server socket is closed") 103 conn = await self._manager.accept() 104 return I2PSocket(conn) 105 106 async def close(self) -> None: 107 """Close the server socket (stop accepting).""" 108 self._closed = True 109 110 111class I2PSocketManager: 112 """Factory and lifecycle manager for I2P sockets. 113 114 Ported from net.i2p.client.streaming.I2PSocketManager. 115 """ 116 117 def __init__(self, options: StreamOptions | None = None): 118 self._manager = ConnectionManager(options=options) 119 120 async def connect(self, destination: bytes) -> I2PSocket: 121 """Create an outbound connection and return a socket. 122 123 Args: 124 destination: The remote I2P destination. 125 126 Returns: 127 An I2PSocket for the new connection. 128 """ 129 conn = await self._manager.connect(destination) 130 return I2PSocket(conn) 131 132 def get_server_socket(self) -> I2PServerSocket: 133 """Get a server socket bound to this manager. 134 135 Returns: 136 An I2PServerSocket for accepting connections. 137 """ 138 return I2PServerSocket(self._manager) 139 140 async def destroy(self) -> None: 141 """Close all connections and release resources.""" 142 await self._manager.close_all()