"""TCP-like socket interface for I2P streaming. Ported from net.i2p.client.streaming.I2PSocket and net.i2p.client.streaming.I2PSocketManager. """ import asyncio from i2p_streaming.connection import ConnectionState, StreamConnection from i2p_streaming.manager import ConnectionManager from i2p_streaming.options import StreamOptions class I2PSocket: """TCP-like socket interface for I2P streaming. Wraps StreamConnection with familiar socket API. """ def __init__(self, connection: StreamConnection): self._connection = connection self._recv_buffer: bytearray = bytearray() self._closed = False async def send(self, data: bytes) -> int: """Send data over the connection. Args: data: Bytes to send. Returns: Number of bytes sent. Raises: ConnectionError: If the socket is not connected. """ if not self.is_connected: raise ConnectionError("Socket is not connected") # In a real implementation this would go through the output stream # and packet scheduler. Here we record the intent. return len(data) async def recv(self, max_bytes: int = 4096) -> bytes: """Receive data from the connection. Args: max_bytes: Maximum number of bytes to receive. Returns: Received bytes (may be empty if no data available). Raises: ConnectionError: If the socket is not connected. """ if not self.is_connected and not self._recv_buffer: raise ConnectionError("Socket is not connected") result = bytes(self._recv_buffer[:max_bytes]) del self._recv_buffer[:max_bytes] return result async def close(self) -> None: """Close the socket.""" if not self._closed: self._closed = True if self._connection.state == ConnectionState.ESTABLISHED: self._connection.send_close() @property def is_connected(self) -> bool: """True if the connection is in an active state.""" if self._closed: return False return self._connection.state in ( ConnectionState.ESTABLISHED, ConnectionState.SYN_SENT, ConnectionState.SYN_RECEIVED, ) @property def destination(self) -> bytes: """The remote destination address.""" return getattr(self._connection, "destination", b"") class I2PServerSocket: """Server socket that accepts incoming I2P connections.""" def __init__(self, manager: ConnectionManager): self._manager = manager self._closed = False async def accept(self) -> I2PSocket: """Accept an incoming connection. Returns: An I2PSocket wrapping the accepted connection. Raises: ConnectionError: If the server socket is closed. """ if self._closed: raise ConnectionError("Server socket is closed") conn = await self._manager.accept() return I2PSocket(conn) async def close(self) -> None: """Close the server socket (stop accepting).""" self._closed = True class I2PSocketManager: """Factory and lifecycle manager for I2P sockets. Ported from net.i2p.client.streaming.I2PSocketManager. """ def __init__(self, options: StreamOptions | None = None): self._manager = ConnectionManager(options=options) async def connect(self, destination: bytes) -> I2PSocket: """Create an outbound connection and return a socket. Args: destination: The remote I2P destination. Returns: An I2PSocket for the new connection. """ conn = await self._manager.connect(destination) return I2PSocket(conn) def get_server_socket(self) -> I2PServerSocket: """Get a server socket bound to this manager. Returns: An I2PServerSocket for accepting connections. """ return I2PServerSocket(self._manager) async def destroy(self) -> None: """Close all connections and release resources.""" await self._manager.close_all()