"""SAM session integration for I2PTunnel. Provides TunnelSession protocol and SAMTunnelSession implementation that speaks SAM v3.3 for tunnel session management. Ported from net.i2p.i2ptunnel.I2PTunnel SAM bridge usage. """ from __future__ import annotations import asyncio import logging from typing import Protocol, runtime_checkable logger = logging.getLogger(__name__) @runtime_checkable class TunnelSession(Protocol): """Abstract tunnel session — can be SAM or I2CP backed.""" @property def destination(self) -> str: """Our base64 destination.""" ... async def connect(self, dest: str) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Connect to a remote I2P destination. Returns (reader, writer).""" ... async def accept(self) -> tuple[str, asyncio.StreamReader, asyncio.StreamWriter]: """Accept an incoming connection. Returns (remote_dest, reader, writer).""" ... async def lookup(self, name: str) -> str | None: """Resolve an I2P hostname to base64 destination.""" ... async def close(self) -> None: """Close the session.""" ... class SAMTunnelSession: """SAM v3.3 backed tunnel session. Manages a control connection to SAM for session lifecycle, and creates new connections for STREAM CONNECT / STREAM ACCEPT. """ def __init__(self, sam_host: str = "127.0.0.1", sam_port: int = 7656) -> None: self._sam_host = sam_host self._sam_port = sam_port self._destination: str | None = None self._nickname: str | None = None self._ctrl_reader: asyncio.StreamReader | None = None self._ctrl_writer: asyncio.StreamWriter | None = None self._closed = False @property def destination(self) -> str | None: return self._destination async def open( self, nickname: str, private_key: str | None = None, options: dict[str, str] | None = None, ) -> None: """Open SAM session: HELLO + SESSION CREATE.""" self._nickname = nickname self._ctrl_reader, self._ctrl_writer = await asyncio.open_connection( self._sam_host, self._sam_port ) # HELLO handshake self._ctrl_writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") await self._ctrl_writer.drain() hello_reply = await asyncio.wait_for( self._ctrl_reader.readline(), timeout=10.0 ) hello_text = hello_reply.decode("utf-8").strip() if "RESULT=OK" not in hello_text: raise ConnectionError(f"SAM HELLO failed: {hello_text}") # SESSION CREATE dest = private_key if private_key else "TRANSIENT" cmd = f"SESSION CREATE STYLE=STREAM ID={nickname} DESTINATION={dest}" if options: for k, v in options.items(): cmd += f" {k}={v}" cmd += "\n" self._ctrl_writer.write(cmd.encode("utf-8")) await self._ctrl_writer.drain() session_reply = await asyncio.wait_for( self._ctrl_reader.readline(), timeout=10.0 ) session_text = session_reply.decode("utf-8").strip() if "RESULT=OK" not in session_text: raise ConnectionError(f"SAM SESSION CREATE failed: {session_text}") # Extract destination from reply for part in session_text.split(): if part.startswith("DESTINATION="): self._destination = part[12:] break logger.info("SAM session %r opened, dest=%s...", nickname, (self._destination or "")[:20]) async def connect(self, dest: str) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]: """Open a new SAM connection and STREAM CONNECT to dest. Returns the reader/writer pair for the connected stream. The SAM control connection remains separate. """ reader, writer = await asyncio.open_connection( self._sam_host, self._sam_port ) # HELLO on new connection writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") await writer.drain() hello = await asyncio.wait_for(reader.readline(), timeout=10.0) if b"RESULT=OK" not in hello: writer.close() raise ConnectionError(f"SAM HELLO failed on stream socket") # STREAM CONNECT writer.write(f"STREAM CONNECT ID={self._nickname} DESTINATION={dest}\n".encode()) await writer.drain() status = await asyncio.wait_for(reader.readline(), timeout=10.0) status_text = status.decode("utf-8").strip() if "RESULT=OK" not in status_text: writer.close() raise ConnectionError(f"STREAM CONNECT failed: {status_text}") # Socket is now a raw data tunnel return reader, writer async def accept(self) -> tuple[str, asyncio.StreamReader, asyncio.StreamWriter]: """Open a new SAM connection and STREAM ACCEPT. Returns (remote_dest, reader, writer) when a connection arrives. """ reader, writer = await asyncio.open_connection( self._sam_host, self._sam_port ) writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n") await writer.drain() hello = await asyncio.wait_for(reader.readline(), timeout=10.0) if b"RESULT=OK" not in hello: writer.close() raise ConnectionError("SAM HELLO failed on accept socket") writer.write(f"STREAM ACCEPT ID={self._nickname}\n".encode()) await writer.drain() status = await asyncio.wait_for(reader.readline(), timeout=10.0) status_text = status.decode("utf-8").strip() if "RESULT=OK" not in status_text: writer.close() raise ConnectionError(f"STREAM ACCEPT failed: {status_text}") # Next line is the remote destination dest_line = await asyncio.wait_for(reader.readline(), timeout=300.0) remote_dest = dest_line.decode("utf-8").strip() return remote_dest, reader, writer async def lookup(self, name: str) -> str | None: """Resolve a name via SAM NAMING LOOKUP on the control connection.""" if self._ctrl_writer is None: raise RuntimeError("Session not open") assert self._ctrl_writer is not None and self._ctrl_reader is not None self._ctrl_writer.write(f"NAMING LOOKUP NAME={name}\n".encode()) await self._ctrl_writer.drain() reply = await asyncio.wait_for( self._ctrl_reader.readline(), timeout=10.0 ) text = reply.decode("utf-8").strip() if "RESULT=OK" in text: for part in text.split(): if part.startswith("VALUE="): return part[6:] return None async def close(self) -> None: """Close the SAM control connection.""" if self._closed: return self._closed = True if self._ctrl_writer is not None: try: self._ctrl_writer.close() await self._ctrl_writer.wait_closed() except Exception: pass self._ctrl_writer = None self._ctrl_reader = None class SessionFactory: """Creates and manages SAMTunnelSession instances. Supports shared sessions for tunnels with sharedClient=true. """ def __init__(self, sam_host: str = "127.0.0.1", sam_port: int = 7656) -> None: self._sam_host = sam_host self._sam_port = sam_port self._shared: dict[str, SAMTunnelSession] = {} self._all_sessions: list[SAMTunnelSession] = [] async def create( self, nickname: str | None = None, tunnel_def=None, priv_key: str | None = None, ) -> SAMTunnelSession: """Create a new SAM session.""" session = SAMTunnelSession(self._sam_host, self._sam_port) nick = nickname or (tunnel_def.name if tunnel_def else "tunnel") await session.open(nickname=nick, private_key=priv_key) self._all_sessions.append(session) return session async def release(self, session: SAMTunnelSession) -> None: """Release a session (close it).""" await session.close() async def get_shared(self, group_name: str) -> SAMTunnelSession: """Get or create a shared session for the given group.""" if group_name not in self._shared: session = await self.create(nickname=f"shared-{group_name}") self._shared[group_name] = session return self._shared[group_name] async def release_shared(self, group_name: str) -> None: """Release a shared session.""" if group_name in self._shared: await self._shared[group_name].close() del self._shared[group_name] async def close_all(self) -> None: """Close all sessions.""" for session in self._all_sessions: try: await session.close() except Exception: pass for session in self._shared.values(): try: await session.close() except Exception: pass self._shared.clear() self._all_sessions.clear()