A Python port of the Invisible Internet Project (I2P)
at main 147 lines 5.1 kB view raw
1"""SAM stream session -- TCP-like connections over I2P tunnels. 2 3Ported from net.i2p.sam.SAMStreamSession. 4""" 5 6import asyncio 7import logging 8 9logger = logging.getLogger(__name__) 10 11 12class SAMStreamSession: 13 """Manages stream connections for a SAM session. 14 15 On STREAM CONNECT: connects to remote destination, then "steals" the 16 client socket for bidirectional raw tunnel I/O. 17 18 On STREAM ACCEPT: waits for incoming connection, then steals socket. 19 """ 20 21 def __init__(self, nickname: str, destination_b64: str) -> None: 22 self._nickname = nickname 23 self._destination_b64 = destination_b64 24 self._accept_queue: asyncio.Queue[tuple[asyncio.StreamReader, asyncio.StreamWriter]] = ( 25 asyncio.Queue() 26 ) 27 28 async def connect( 29 self, 30 target_dest: str, 31 reader: asyncio.StreamReader, 32 writer: asyncio.StreamWriter, 33 silent: bool = False, 34 from_port: int = 0, 35 to_port: int = 0, 36 ) -> None: 37 """Connect to target and bridge client socket with I2P tunnel. 38 39 After sending STREAM STATUS OK, this method runs the bidirectional 40 pipe until one side closes. 41 42 In this implementation (without a live I2P router), the connection 43 cannot actually be established, so an error status is returned. 44 45 Args: 46 target_dest: Base64-encoded target destination. 47 reader: Client's stream reader. 48 writer: Client's stream writer. 49 silent: If True, suppress the status reply. 50 from_port: Source port (virtual). 51 to_port: Destination port (virtual). 52 """ 53 # Without a real I2P router, we cannot establish tunnel connections. 54 # In production, this would: 55 # 1. Look up the target destination in the I2P network database 56 # 2. Build tunnels to reach the destination 57 # 3. Establish a streaming connection 58 # 4. Bridge the client socket <-> I2P tunnel bidirectionally 59 if not silent: 60 from i2p_sam.protocol import SAMReply 61 writer.write(SAMReply.stream_error("CANT_REACH_PEER", 62 "No I2P router available").encode("utf-8")) 63 await writer.drain() 64 65 async def accept( 66 self, 67 reader: asyncio.StreamReader, 68 writer: asyncio.StreamWriter, 69 silent: bool = False, 70 ) -> None: 71 """Wait for incoming connection and bridge sockets. 72 73 First sends the sender's destination base64 to the client, 74 then runs bidirectional pipe. 75 76 Args: 77 reader: Client's stream reader. 78 writer: Client's stream writer. 79 silent: If True, suppress the initial destination line. 80 """ 81 # Wait for incoming connection from the accept queue 82 try: 83 incoming_reader, incoming_writer = await asyncio.wait_for( 84 self._accept_queue.get(), timeout=300.0 85 ) 86 except asyncio.TimeoutError: 87 from i2p_sam.protocol import SAMReply 88 writer.write(SAMReply.stream_error("TIMEOUT", 89 "No incoming connection").encode("utf-8")) 90 await writer.drain() 91 return 92 93 if not silent: 94 # Send the remote destination to the client 95 writer.write(f"{self._destination_b64}\n".encode("utf-8")) 96 await writer.drain() 97 98 # Bridge the two connections 99 await self._bridge(reader, writer, incoming_reader, incoming_writer) 100 101 async def forward( 102 self, 103 listen_port: int, 104 host: str = "127.0.0.1", 105 silent: bool = False, 106 ) -> None: 107 """Listen on local TCP port and forward connections to I2P tunnel. 108 109 Args: 110 listen_port: Local TCP port to listen on. 111 host: Local host to bind to. 112 silent: If True, suppress status messages. 113 """ 114 logger.info("STREAM FORWARD on %s:%d for session %s", 115 host, listen_port, self._nickname) 116 # In production, this would start a local TCP listener and forward 117 # each connection through the I2P tunnel. 118 119 async def _bridge( 120 self, 121 r1: asyncio.StreamReader, 122 w1: asyncio.StreamWriter, 123 r2: asyncio.StreamReader, 124 w2: asyncio.StreamWriter, 125 ) -> None: 126 """Bidirectional data pipe between two stream pairs.""" 127 async def _copy(src: asyncio.StreamReader, dst: asyncio.StreamWriter) -> None: 128 try: 129 while True: 130 data = await src.read(8192) 131 if not data: 132 break 133 dst.write(data) 134 await dst.drain() 135 except (ConnectionResetError, BrokenPipeError): 136 pass 137 finally: 138 try: 139 dst.close() 140 except Exception: 141 pass 142 143 await asyncio.gather( 144 _copy(r1, w2), 145 _copy(r2, w1), 146 return_exceptions=True, 147 )