"""SAM stream session -- TCP-like connections over I2P tunnels. Ported from net.i2p.sam.SAMStreamSession. """ import asyncio import logging logger = logging.getLogger(__name__) class SAMStreamSession: """Manages stream connections for a SAM session. On STREAM CONNECT: connects to remote destination, then "steals" the client socket for bidirectional raw tunnel I/O. On STREAM ACCEPT: waits for incoming connection, then steals socket. """ def __init__(self, nickname: str, destination_b64: str) -> None: self._nickname = nickname self._destination_b64 = destination_b64 self._accept_queue: asyncio.Queue[tuple[asyncio.StreamReader, asyncio.StreamWriter]] = ( asyncio.Queue() ) async def connect( self, target_dest: str, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, silent: bool = False, from_port: int = 0, to_port: int = 0, ) -> None: """Connect to target and bridge client socket with I2P tunnel. After sending STREAM STATUS OK, this method runs the bidirectional pipe until one side closes. In this implementation (without a live I2P router), the connection cannot actually be established, so an error status is returned. Args: target_dest: Base64-encoded target destination. reader: Client's stream reader. writer: Client's stream writer. silent: If True, suppress the status reply. from_port: Source port (virtual). to_port: Destination port (virtual). """ # Without a real I2P router, we cannot establish tunnel connections. # In production, this would: # 1. Look up the target destination in the I2P network database # 2. Build tunnels to reach the destination # 3. Establish a streaming connection # 4. Bridge the client socket <-> I2P tunnel bidirectionally if not silent: from i2p_sam.protocol import SAMReply writer.write(SAMReply.stream_error("CANT_REACH_PEER", "No I2P router available").encode("utf-8")) await writer.drain() async def accept( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, silent: bool = False, ) -> None: """Wait for incoming connection and bridge sockets. First sends the sender's destination base64 to the client, then runs bidirectional pipe. Args: reader: Client's stream reader. writer: Client's stream writer. silent: If True, suppress the initial destination line. """ # Wait for incoming connection from the accept queue try: incoming_reader, incoming_writer = await asyncio.wait_for( self._accept_queue.get(), timeout=300.0 ) except asyncio.TimeoutError: from i2p_sam.protocol import SAMReply writer.write(SAMReply.stream_error("TIMEOUT", "No incoming connection").encode("utf-8")) await writer.drain() return if not silent: # Send the remote destination to the client writer.write(f"{self._destination_b64}\n".encode("utf-8")) await writer.drain() # Bridge the two connections await self._bridge(reader, writer, incoming_reader, incoming_writer) async def forward( self, listen_port: int, host: str = "127.0.0.1", silent: bool = False, ) -> None: """Listen on local TCP port and forward connections to I2P tunnel. Args: listen_port: Local TCP port to listen on. host: Local host to bind to. silent: If True, suppress status messages. """ logger.info("STREAM FORWARD on %s:%d for session %s", host, listen_port, self._nickname) # In production, this would start a local TCP listener and forward # each connection through the I2P tunnel. async def _bridge( self, r1: asyncio.StreamReader, w1: asyncio.StreamWriter, r2: asyncio.StreamReader, w2: asyncio.StreamWriter, ) -> None: """Bidirectional data pipe between two stream pairs.""" async def _copy(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None: try: while True: data = await src.read(8192) if not data: break dst.write(data) await dst.drain() except (ConnectionResetError, BrokenPipeError): pass finally: try: dst.close() except Exception: pass await asyncio.gather( _copy(r1, w2), _copy(r2, w1), return_exceptions=True, )