A Python port of the Invisible Internet Project (I2P)
1"""SAM session integration for I2PTunnel.
2
3Provides TunnelSession protocol and SAMTunnelSession implementation that
4speaks SAM v3.3 for tunnel session management.
5
6Ported from net.i2p.i2ptunnel.I2PTunnel SAM bridge usage.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import logging
13from typing import Protocol, runtime_checkable
14
15logger = logging.getLogger(__name__)
16
17
18@runtime_checkable
19class TunnelSession(Protocol):
20 """Abstract tunnel session — can be SAM or I2CP backed."""
21
22 @property
23 def destination(self) -> str:
24 """Our base64 destination."""
25 ...
26
27 async def connect(self, dest: str) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
28 """Connect to a remote I2P destination. Returns (reader, writer)."""
29 ...
30
31 async def accept(self) -> tuple[str, asyncio.StreamReader, asyncio.StreamWriter]:
32 """Accept an incoming connection. Returns (remote_dest, reader, writer)."""
33 ...
34
35 async def lookup(self, name: str) -> str | None:
36 """Resolve an I2P hostname to base64 destination."""
37 ...
38
39 async def close(self) -> None:
40 """Close the session."""
41 ...
42
43
44class SAMTunnelSession:
45 """SAM v3.3 backed tunnel session.
46
47 Manages a control connection to SAM for session lifecycle, and creates
48 new connections for STREAM CONNECT / STREAM ACCEPT.
49 """
50
51 def __init__(self, sam_host: str = "127.0.0.1", sam_port: int = 7656) -> None:
52 self._sam_host = sam_host
53 self._sam_port = sam_port
54 self._destination: str | None = None
55 self._nickname: str | None = None
56 self._ctrl_reader: asyncio.StreamReader | None = None
57 self._ctrl_writer: asyncio.StreamWriter | None = None
58 self._closed = False
59
60 @property
61 def destination(self) -> str | None:
62 return self._destination
63
64 async def open(
65 self,
66 nickname: str,
67 private_key: str | None = None,
68 options: dict[str, str] | None = None,
69 ) -> None:
70 """Open SAM session: HELLO + SESSION CREATE."""
71 self._nickname = nickname
72 self._ctrl_reader, self._ctrl_writer = await asyncio.open_connection(
73 self._sam_host, self._sam_port
74 )
75
76 # HELLO handshake
77 self._ctrl_writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n")
78 await self._ctrl_writer.drain()
79 hello_reply = await asyncio.wait_for(
80 self._ctrl_reader.readline(), timeout=10.0
81 )
82 hello_text = hello_reply.decode("utf-8").strip()
83 if "RESULT=OK" not in hello_text:
84 raise ConnectionError(f"SAM HELLO failed: {hello_text}")
85
86 # SESSION CREATE
87 dest = private_key if private_key else "TRANSIENT"
88 cmd = f"SESSION CREATE STYLE=STREAM ID={nickname} DESTINATION={dest}"
89 if options:
90 for k, v in options.items():
91 cmd += f" {k}={v}"
92 cmd += "\n"
93 self._ctrl_writer.write(cmd.encode("utf-8"))
94 await self._ctrl_writer.drain()
95
96 session_reply = await asyncio.wait_for(
97 self._ctrl_reader.readline(), timeout=10.0
98 )
99 session_text = session_reply.decode("utf-8").strip()
100 if "RESULT=OK" not in session_text:
101 raise ConnectionError(f"SAM SESSION CREATE failed: {session_text}")
102
103 # Extract destination from reply
104 for part in session_text.split():
105 if part.startswith("DESTINATION="):
106 self._destination = part[12:]
107 break
108
109 logger.info("SAM session %r opened, dest=%s...",
110 nickname, (self._destination or "")[:20])
111
112 async def connect(self, dest: str) -> tuple[asyncio.StreamReader, asyncio.StreamWriter]:
113 """Open a new SAM connection and STREAM CONNECT to dest.
114
115 Returns the reader/writer pair for the connected stream.
116 The SAM control connection remains separate.
117 """
118 reader, writer = await asyncio.open_connection(
119 self._sam_host, self._sam_port
120 )
121
122 # HELLO on new connection
123 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n")
124 await writer.drain()
125 hello = await asyncio.wait_for(reader.readline(), timeout=10.0)
126 if b"RESULT=OK" not in hello:
127 writer.close()
128 raise ConnectionError(f"SAM HELLO failed on stream socket")
129
130 # STREAM CONNECT
131 writer.write(f"STREAM CONNECT ID={self._nickname} DESTINATION={dest}\n".encode())
132 await writer.drain()
133 status = await asyncio.wait_for(reader.readline(), timeout=10.0)
134 status_text = status.decode("utf-8").strip()
135 if "RESULT=OK" not in status_text:
136 writer.close()
137 raise ConnectionError(f"STREAM CONNECT failed: {status_text}")
138
139 # Socket is now a raw data tunnel
140 return reader, writer
141
142 async def accept(self) -> tuple[str, asyncio.StreamReader, asyncio.StreamWriter]:
143 """Open a new SAM connection and STREAM ACCEPT.
144
145 Returns (remote_dest, reader, writer) when a connection arrives.
146 """
147 reader, writer = await asyncio.open_connection(
148 self._sam_host, self._sam_port
149 )
150
151 writer.write(b"HELLO VERSION MIN=3.0 MAX=3.3\n")
152 await writer.drain()
153 hello = await asyncio.wait_for(reader.readline(), timeout=10.0)
154 if b"RESULT=OK" not in hello:
155 writer.close()
156 raise ConnectionError("SAM HELLO failed on accept socket")
157
158 writer.write(f"STREAM ACCEPT ID={self._nickname}\n".encode())
159 await writer.drain()
160 status = await asyncio.wait_for(reader.readline(), timeout=10.0)
161 status_text = status.decode("utf-8").strip()
162 if "RESULT=OK" not in status_text:
163 writer.close()
164 raise ConnectionError(f"STREAM ACCEPT failed: {status_text}")
165
166 # Next line is the remote destination
167 dest_line = await asyncio.wait_for(reader.readline(), timeout=300.0)
168 remote_dest = dest_line.decode("utf-8").strip()
169
170 return remote_dest, reader, writer
171
172 async def lookup(self, name: str) -> str | None:
173 """Resolve a name via SAM NAMING LOOKUP on the control connection."""
174 if self._ctrl_writer is None:
175 raise RuntimeError("Session not open")
176
177 assert self._ctrl_writer is not None and self._ctrl_reader is not None
178 self._ctrl_writer.write(f"NAMING LOOKUP NAME={name}\n".encode())
179 await self._ctrl_writer.drain()
180 reply = await asyncio.wait_for(
181 self._ctrl_reader.readline(), timeout=10.0
182 )
183 text = reply.decode("utf-8").strip()
184 if "RESULT=OK" in text:
185 for part in text.split():
186 if part.startswith("VALUE="):
187 return part[6:]
188 return None
189
190 async def close(self) -> None:
191 """Close the SAM control connection."""
192 if self._closed:
193 return
194 self._closed = True
195 if self._ctrl_writer is not None:
196 try:
197 self._ctrl_writer.close()
198 await self._ctrl_writer.wait_closed()
199 except Exception:
200 pass
201 self._ctrl_writer = None
202 self._ctrl_reader = None
203
204
205class SessionFactory:
206 """Creates and manages SAMTunnelSession instances.
207
208 Supports shared sessions for tunnels with sharedClient=true.
209 """
210
211 def __init__(self, sam_host: str = "127.0.0.1", sam_port: int = 7656) -> None:
212 self._sam_host = sam_host
213 self._sam_port = sam_port
214 self._shared: dict[str, SAMTunnelSession] = {}
215 self._all_sessions: list[SAMTunnelSession] = []
216
217 async def create(
218 self,
219 nickname: str | None = None,
220 tunnel_def=None,
221 priv_key: str | None = None,
222 ) -> SAMTunnelSession:
223 """Create a new SAM session."""
224 session = SAMTunnelSession(self._sam_host, self._sam_port)
225 nick = nickname or (tunnel_def.name if tunnel_def else "tunnel")
226 await session.open(nickname=nick, private_key=priv_key)
227 self._all_sessions.append(session)
228 return session
229
230 async def release(self, session: SAMTunnelSession) -> None:
231 """Release a session (close it)."""
232 await session.close()
233
234 async def get_shared(self, group_name: str) -> SAMTunnelSession:
235 """Get or create a shared session for the given group."""
236 if group_name not in self._shared:
237 session = await self.create(nickname=f"shared-{group_name}")
238 self._shared[group_name] = session
239 return self._shared[group_name]
240
241 async def release_shared(self, group_name: str) -> None:
242 """Release a shared session."""
243 if group_name in self._shared:
244 await self._shared[group_name].close()
245 del self._shared[group_name]
246
247 async def close_all(self) -> None:
248 """Close all sessions."""
249 for session in self._all_sessions:
250 try:
251 await session.close()
252 except Exception:
253 pass
254 for session in self._shared.values():
255 try:
256 await session.close()
257 except Exception:
258 pass
259 self._shared.clear()
260 self._all_sessions.clear()