A Python port of the Invisible Internet Project (I2P)
at main 260 lines 9.3 kB view raw
1"""SAM session integration for I2PTunnel. 2 3Provides TunnelSession protocol and SAMTunnelSession implementation that 4speaks SAM v3.3 for tunnel session management. 5 6Ported from net.i2p.i2ptunnel.I2PTunnel SAM bridge usage. 7""" 8 9from __future__ import annotations 10 11import asyncio 12import logging 13from typing import Protocol, runtime_checkable 14 15logger = logging.getLogger(__name__) 16 17 18@runtime_checkable 19class TunnelSession(Protocol): 20 """Abstract tunnel session — can be SAM or I2CP backed.""" 21 22 @property 23 def destination(self) -> str: 24 """Our base64 destination.""" 25 ... 26 27 async def connect(self, dest: str) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: 28 """Connect to a remote I2P destination. Returns (reader, writer).""" 29 ... 30 31 async def accept(self) -> tuple[str, asyncio.StreamReader, asyncio.StreamWriter]: 32 """Accept an incoming connection. Returns (remote_dest, reader, writer).""" 33 ... 34 35 async def lookup(self, name: str) -> str | None: 36 """Resolve an I2P hostname to base64 destination.""" 37 ... 38 39 async def close(self) -> None: 40 """Close the session.""" 41 ... 42 43 44class SAMTunnelSession: 45 """SAM v3.3 backed tunnel session. 46 47 Manages a control connection to SAM for session lifecycle, and creates 48 new connections for STREAM CONNECT / STREAM ACCEPT. 49 """ 50 51 def __init__(self, sam_host: str = "127.0.0.1", sam_port: int = 7656) -> None: 52 self._sam_host = sam_host 53 self._sam_port = sam_port 54 self._destination: str | None = None 55 self._nickname: str | None = None 56 self._ctrl_reader: asyncio.StreamReader | None = None 57 self._ctrl_writer: asyncio.StreamWriter | None = None 58 self._closed = False 59 60 @property 61 def destination(self) -> str | None: 62 return self._destination 63 64 async def open( 65 self, 66 nickname: str, 67 private_key: str | None = None, 68 options: dict[str, str] | None = None, 69 ) -> None: 70 """Open SAM session: HELLO + SESSION CREATE.""" 71 self._nickname = nickname 72 self._ctrl_reader, self._ctrl_writer = await asyncio.open_connection( 73 self._sam_host, self._sam_port 74 ) 75 76 # HELLO handshake 77 self._ctrl_writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") 78 await self._ctrl_writer.drain() 79 hello_reply = await asyncio.wait_for( 80 self._ctrl_reader.readline(), timeout=10.0 81 ) 82 hello_text = hello_reply.decode("utf-8").strip() 83 if "RESULT=OK" not in hello_text: 84 raise ConnectionError(f"SAM HELLO failed: {hello_text}") 85 86 # SESSION CREATE 87 dest = private_key if private_key else "TRANSIENT" 88 cmd = f"SESSION CREATE STYLE=STREAM ID={nickname} DESTINATION={dest}" 89 if options: 90 for k, v in options.items(): 91 cmd += f" {k}={v}" 92 cmd += "\n" 93 self._ctrl_writer.write(cmd.encode("utf-8")) 94 await self._ctrl_writer.drain() 95 96 session_reply = await asyncio.wait_for( 97 self._ctrl_reader.readline(), timeout=10.0 98 ) 99 session_text = session_reply.decode("utf-8").strip() 100 if "RESULT=OK" not in session_text: 101 raise ConnectionError(f"SAM SESSION CREATE failed: {session_text}") 102 103 # Extract destination from reply 104 for part in session_text.split(): 105 if part.startswith("DESTINATION="): 106 self._destination = part[12:] 107 break 108 109 logger.info("SAM session %r opened, dest=%s...", 110 nickname, (self._destination or "")[:20]) 111 112 async def connect(self, dest: str) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: 113 """Open a new SAM connection and STREAM CONNECT to dest. 114 115 Returns the reader/writer pair for the connected stream. 116 The SAM control connection remains separate. 117 """ 118 reader, writer = await asyncio.open_connection( 119 self._sam_host, self._sam_port 120 ) 121 122 # HELLO on new connection 123 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") 124 await writer.drain() 125 hello = await asyncio.wait_for(reader.readline(), timeout=10.0) 126 if b"RESULT=OK" not in hello: 127 writer.close() 128 raise ConnectionError(f"SAM HELLO failed on stream socket") 129 130 # STREAM CONNECT 131 writer.write(f"STREAM CONNECT ID={self._nickname} DESTINATION={dest}\n".encode()) 132 await writer.drain() 133 status = await asyncio.wait_for(reader.readline(), timeout=10.0) 134 status_text = status.decode("utf-8").strip() 135 if "RESULT=OK" not in status_text: 136 writer.close() 137 raise ConnectionError(f"STREAM CONNECT failed: {status_text}") 138 139 # Socket is now a raw data tunnel 140 return reader, writer 141 142 async def accept(self) -> tuple[str, asyncio.StreamReader, asyncio.StreamWriter]: 143 """Open a new SAM connection and STREAM ACCEPT. 144 145 Returns (remote_dest, reader, writer) when a connection arrives. 146 """ 147 reader, writer = await asyncio.open_connection( 148 self._sam_host, self._sam_port 149 ) 150 151 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") 152 await writer.drain() 153 hello = await asyncio.wait_for(reader.readline(), timeout=10.0) 154 if b"RESULT=OK" not in hello: 155 writer.close() 156 raise ConnectionError("SAM HELLO failed on accept socket") 157 158 writer.write(f"STREAM ACCEPT ID={self._nickname}\n".encode()) 159 await writer.drain() 160 status = await asyncio.wait_for(reader.readline(), timeout=10.0) 161 status_text = status.decode("utf-8").strip() 162 if "RESULT=OK" not in status_text: 163 writer.close() 164 raise ConnectionError(f"STREAM ACCEPT failed: {status_text}") 165 166 # Next line is the remote destination 167 dest_line = await asyncio.wait_for(reader.readline(), timeout=300.0) 168 remote_dest = dest_line.decode("utf-8").strip() 169 170 return remote_dest, reader, writer 171 172 async def lookup(self, name: str) -> str | None: 173 """Resolve a name via SAM NAMING LOOKUP on the control connection.""" 174 if self._ctrl_writer is None: 175 raise RuntimeError("Session not open") 176 177 assert self._ctrl_writer is not None and self._ctrl_reader is not None 178 self._ctrl_writer.write(f"NAMING LOOKUP NAME={name}\n".encode()) 179 await self._ctrl_writer.drain() 180 reply = await asyncio.wait_for( 181 self._ctrl_reader.readline(), timeout=10.0 182 ) 183 text = reply.decode("utf-8").strip() 184 if "RESULT=OK" in text: 185 for part in text.split(): 186 if part.startswith("VALUE="): 187 return part[6:] 188 return None 189 190 async def close(self) -> None: 191 """Close the SAM control connection.""" 192 if self._closed: 193 return 194 self._closed = True 195 if self._ctrl_writer is not None: 196 try: 197 self._ctrl_writer.close() 198 await self._ctrl_writer.wait_closed() 199 except Exception: 200 pass 201 self._ctrl_writer = None 202 self._ctrl_reader = None 203 204 205class SessionFactory: 206 """Creates and manages SAMTunnelSession instances. 207 208 Supports shared sessions for tunnels with sharedClient=true. 209 """ 210 211 def __init__(self, sam_host: str = "127.0.0.1", sam_port: int = 7656) -> None: 212 self._sam_host = sam_host 213 self._sam_port = sam_port 214 self._shared: dict[str, SAMTunnelSession] = {} 215 self._all_sessions: list[SAMTunnelSession] = [] 216 217 async def create( 218 self, 219 nickname: str | None = None, 220 tunnel_def=None, 221 priv_key: str | None = None, 222 ) -> SAMTunnelSession: 223 """Create a new SAM session.""" 224 session = SAMTunnelSession(self._sam_host, self._sam_port) 225 nick = nickname or (tunnel_def.name if tunnel_def else "tunnel") 226 await session.open(nickname=nick, private_key=priv_key) 227 self._all_sessions.append(session) 228 return session 229 230 async def release(self, session: SAMTunnelSession) -> None: 231 """Release a session (close it).""" 232 await session.close() 233 234 async def get_shared(self, group_name: str) -> SAMTunnelSession: 235 """Get or create a shared session for the given group.""" 236 if group_name not in self._shared: 237 session = await self.create(nickname=f"shared-{group_name}") 238 self._shared[group_name] = session 239 return self._shared[group_name] 240 241 async def release_shared(self, group_name: str) -> None: 242 """Release a shared session.""" 243 if group_name in self._shared: 244 await self._shared[group_name].close() 245 del self._shared[group_name] 246 247 async def close_all(self) -> None: 248 """Close all sessions.""" 249 for session in self._all_sessions: 250 try: 251 await session.close() 252 except Exception: 253 pass 254 for session in self._shared.values(): 255 try: 256 await session.close() 257 except Exception: 258 pass 259 self._shared.clear() 260 self._all_sessions.clear()