A Python port of the Invisible Internet Project (I2P)
1"""SAM v3 connection handler -- one per client TCP connection.
2
3Ported from net.i2p.sam.SAMv3Handler.
4"""
5
6from __future__ import annotations
7
8import asyncio
9import logging
10
11from i2p_sam.protocol import SAMCommand, SAMReply, SUPPORTED_VERSIONS
12from i2p_sam.sessions_db import SessionsDB, SessionRecord
13from i2p_sam.stream_session import SAMStreamSession
14from i2p_sam.datagram_session import SAMDatagramSession
15from i2p_sam.raw_session import SAMRawSession
16from i2p_sam.primary_session import PrimarySession
17from i2p_sam.utils import generate_transient_destination, negotiate_version
18
19logger = logging.getLogger(__name__)
20
21
22class SAMHandler:
23 """Handles one SAM client connection.
24
25 Lifecycle:
26 1. HELLO negotiation (mandatory first message)
27 2. SESSION CREATE (one session per handler, or PRIMARY for multi)
28 3. Command loop: STREAM, DATAGRAM, RAW, DEST, NAMING, PING
29 4. Socket stealing on STREAM CONNECT/ACCEPT
30 """
31
32 # Timeout for initial HELLO negotiation
33 HELLO_TIMEOUT = 60.0
34 # Timeout for subsequent commands (keepalive)
35 COMMAND_TIMEOUT = 180.0
36
37 def __init__(
38 self,
39 reader: asyncio.StreamReader,
40 writer: asyncio.StreamWriter,
41 sessions_db: SessionsDB,
42 ) -> None:
43 self._reader = reader
44 self._writer = writer
45 self._sessions_db = sessions_db
46 self._negotiated_version: str | None = None
47 self._session_nickname: str | None = None
48 self._session_style: str | None = None
49 self._stream_session: SAMStreamSession | None = None
50 self._datagram_session: SAMDatagramSession | None = None
51 self._raw_session: SAMRawSession | None = None
52 self._primary_session: PrimarySession | None = None
53 self._running = True
54
55 async def run(self) -> None:
56 """Main handler loop.
57
58 First negotiates protocol version via HELLO, then enters
59 command dispatch loop until the client disconnects or sends QUIT.
60 """
61 try:
62 # Step 1: HELLO negotiation
63 if not await self._handle_hello():
64 return
65 # Step 2: Command loop
66 while self._running:
67 line = await asyncio.wait_for(
68 self._reader.readline(), timeout=self.COMMAND_TIMEOUT
69 )
70 if not line:
71 break
72 text = line.decode("utf-8", errors="replace").strip()
73 if text:
74 await self._dispatch(text)
75 except asyncio.TimeoutError:
76 logger.debug("SAM client timed out")
77 except ConnectionResetError:
78 logger.debug("SAM client connection reset")
79 finally:
80 # Clean up session on disconnect
81 if self._session_nickname:
82 await self._sessions_db.remove(self._session_nickname)
83 try:
84 self._writer.close()
85 await self._writer.wait_closed()
86 except Exception:
87 pass
88
89 async def _handle_hello(self) -> bool:
90 """Negotiate protocol version. Returns True on success.
91
92 Reads the HELLO VERSION line, negotiates version, and sends reply.
93 The client must send HELLO as the first command within HELLO_TIMEOUT.
94 """
95 try:
96 line = await asyncio.wait_for(
97 self._reader.readline(), timeout=self.HELLO_TIMEOUT
98 )
99 except asyncio.TimeoutError:
100 logger.debug("SAM client did not send HELLO within timeout")
101 return False
102
103 if not line:
104 return False
105
106 text = line.decode("utf-8", errors="replace").strip()
107 if not text:
108 return False
109
110 try:
111 cmd = SAMCommand.parse(text)
112 except ValueError:
113 await self._write(SAMReply.hello_noversion())
114 return False
115
116 if cmd.verb != "HELLO" or cmd.opcode != "VERSION":
117 await self._write(SAMReply.hello_noversion())
118 return False
119
120 client_min = cmd.params.get("MIN", "3.0")
121 client_max = cmd.params.get("MAX", "3.3")
122
123 version = negotiate_version(client_min, client_max, SUPPORTED_VERSIONS)
124 if version is None:
125 await self._write(SAMReply.hello_noversion())
126 return False
127
128 self._negotiated_version = version
129 await self._write(SAMReply.hello_ok(version))
130 return True
131
132 async def _dispatch(self, line: str) -> None:
133 """Route command to handler method.
134
135 Args:
136 line: The raw text line from the client (already stripped).
137 """
138 try:
139 cmd = SAMCommand.parse(line)
140 except ValueError:
141 logger.warning("Failed to parse SAM command: %s", line)
142 return
143
144 match cmd.verb:
145 case "SESSION":
146 await self._handle_session(cmd)
147 case "STREAM":
148 await self._handle_stream(cmd)
149 case "DATAGRAM":
150 await self._handle_datagram(cmd)
151 case "RAW":
152 await self._handle_raw(cmd)
153 case "DEST":
154 await self._handle_dest(cmd)
155 case "NAMING":
156 await self._handle_naming(cmd)
157 case "PING":
158 await self._handle_ping(cmd)
159 case "QUIT" | "STOP" | "EXIT":
160 self._running = False
161 case _:
162 logger.warning("Unknown SAM verb: %s", cmd.verb)
163
164 async def _handle_session(self, cmd: SAMCommand) -> None:
165 """SESSION CREATE/ADD/REMOVE.
166
167 CREATE: generate or load destination, register in SessionsDB.
168 ADD: add subsession to existing PRIMARY session.
169 REMOVE: remove subsession from PRIMARY session.
170 """
171 opcode = cmd.opcode.upper()
172
173 if opcode == "CREATE":
174 await self._session_create(cmd)
175 elif opcode == "ADD":
176 await self._session_add(cmd)
177 elif opcode == "REMOVE":
178 await self._session_remove(cmd)
179 else:
180 await self._write(SAMReply.session_error(
181 "I2P_ERROR", f"Unknown SESSION opcode: {opcode}"))
182
183 async def _session_create(self, cmd: SAMCommand) -> None:
184 """Handle SESSION CREATE command."""
185 nickname = cmd.params.get("ID", "")
186 style = cmd.params.get("STYLE", "STREAM").upper()
187 dest_param = cmd.params.get("DESTINATION", "TRANSIENT")
188
189 if not nickname:
190 await self._write(SAMReply.session_error(
191 "I2P_ERROR", "Missing session ID"))
192 return
193
194 # Check for duplicate nickname
195 if await self._sessions_db.has(nickname):
196 await self._write(SAMReply.session_error("DUPLICATED_ID"))
197 return
198
199 # Generate or load destination
200 if dest_param.upper() == "TRANSIENT":
201 raw_dest, dest_b64 = generate_transient_destination()
202 else:
203 # Use provided destination key (I2P base64)
204 from i2p_data.data_helper import from_base64
205 try:
206 raw_dest = from_base64(dest_param)
207 dest_b64 = dest_param
208 except Exception:
209 await self._write(SAMReply.session_error(
210 "INVALID_KEY", "Cannot decode destination"))
211 return
212
213 # Create session record
214 record = SessionRecord(
215 nickname=nickname,
216 style=style,
217 destination=raw_dest,
218 destination_b64=dest_b64,
219 handler=self,
220 )
221
222 if not await self._sessions_db.add(record):
223 await self._write(SAMReply.session_error("DUPLICATED_ID"))
224 return
225
226 self._session_nickname = nickname
227 self._session_style = style
228
229 # Create style-specific session object
230 if style == "STREAM":
231 self._stream_session = SAMStreamSession(nickname, dest_b64)
232 elif style == "DATAGRAM":
233 listen_port = int(cmd.params.get("PORT", "0"))
234 listen_host = cmd.params.get("HOST", "127.0.0.1")
235 self._datagram_session = SAMDatagramSession(
236 nickname, dest_b64, listen_port, listen_host)
237 elif style == "RAW":
238 protocol = int(cmd.params.get("PROTOCOL", "18"))
239 self._raw_session = SAMRawSession(nickname, dest_b64, protocol)
240 elif style == "PRIMARY":
241 self._primary_session = PrimarySession(nickname, dest_b64)
242
243 await self._write(SAMReply.session_ok(dest_b64))
244
245 async def _session_add(self, cmd: SAMCommand) -> None:
246 """Handle SESSION ADD (for PRIMARY sessions)."""
247 nickname = cmd.params.get("ID", "")
248 style = cmd.params.get("STYLE", "STREAM").upper()
249 from_port = cmd.params.get("FROM_PORT", "0")
250
251 if not self._primary_session or self._session_nickname != nickname:
252 await self._write(SAMReply.session_error(
253 "I2P_ERROR", "No PRIMARY session with that ID"))
254 return
255
256 try:
257 record = await self._primary_session.add_subsession(
258 from_port, style, self)
259 await self._write(SAMReply.session_ok(record.destination_b64))
260 except ValueError:
261 await self._write(SAMReply.session_error("DUPLICATED_ID"))
262
263 async def _session_remove(self, cmd: SAMCommand) -> None:
264 """Handle SESSION REMOVE (for PRIMARY sessions)."""
265 nickname = cmd.params.get("ID", "")
266 from_port = cmd.params.get("FROM_PORT", "0")
267
268 if not self._primary_session or self._session_nickname != nickname:
269 await self._write(SAMReply.session_error(
270 "I2P_ERROR", "No PRIMARY session with that ID"))
271 return
272
273 if await self._primary_session.remove_subsession(from_port):
274 await self._write(SAMReply.session_ok(
275 self._primary_session._destination_b64))
276 else:
277 await self._write(SAMReply.session_error(
278 "I2P_ERROR", "Subsession not found"))
279
280 async def _handle_stream(self, cmd: SAMCommand) -> None:
281 """STREAM CONNECT/ACCEPT/FORWARD.
282
283 CONNECT: connect to remote destination, steal socket for raw tunnel.
284 ACCEPT: wait for incoming connection, steal socket.
285 FORWARD: listen on local port, forward connections.
286 """
287 opcode = cmd.opcode.upper()
288 nickname = cmd.params.get("ID", "")
289
290 if not self._stream_session:
291 await self._write(SAMReply.stream_error(
292 "I2P_ERROR", "No STREAM session active"))
293 return
294
295 if opcode == "CONNECT":
296 target = cmd.params.get("DESTINATION", "")
297 if not target:
298 await self._write(SAMReply.stream_error(
299 "I2P_ERROR", "Missing DESTINATION"))
300 return
301 silent = cmd.params.get("SILENT", "false").lower() == "true"
302 from_port = int(cmd.params.get("FROM_PORT", "0"))
303 to_port = int(cmd.params.get("TO_PORT", "0"))
304 await self._stream_session.connect(
305 target, self._reader, self._writer,
306 silent=silent, from_port=from_port, to_port=to_port)
307 self._running = False # Socket is stolen
308
309 elif opcode == "ACCEPT":
310 silent = cmd.params.get("SILENT", "false").lower() == "true"
311 await self._stream_session.accept(
312 self._reader, self._writer, silent=silent)
313 self._running = False # Socket is stolen
314
315 elif opcode == "FORWARD":
316 port = int(cmd.params.get("PORT", "0"))
317 host = cmd.params.get("HOST", "127.0.0.1")
318 silent = cmd.params.get("SILENT", "false").lower() == "true"
319 await self._stream_session.forward(port, host, silent)
320 await self._write(SAMReply.stream_ok())
321
322 else:
323 await self._write(SAMReply.stream_error(
324 "I2P_ERROR", f"Unknown STREAM opcode: {opcode}"))
325
326 async def _handle_datagram(self, cmd: SAMCommand) -> None:
327 """DATAGRAM SEND."""
328 if not self._datagram_session:
329 return
330
331 opcode = cmd.opcode.upper()
332 if opcode == "SEND":
333 target = cmd.params.get("DESTINATION", "")
334 size = int(cmd.params.get("SIZE", "0"))
335 from_port = int(cmd.params.get("FROM_PORT", "0"))
336 to_port = int(cmd.params.get("TO_PORT", "0"))
337
338 # Read the payload
339 if size > 0:
340 data = await self._reader.readexactly(size)
341 else:
342 data = b""
343
344 await self._datagram_session.send(
345 target, data, from_port=from_port, to_port=to_port)
346
347 async def _handle_raw(self, cmd: SAMCommand) -> None:
348 """RAW SEND."""
349 if not self._raw_session:
350 return
351
352 opcode = cmd.opcode.upper()
353 if opcode == "SEND":
354 target = cmd.params.get("DESTINATION", "")
355 size = int(cmd.params.get("SIZE", "0"))
356 protocol = int(cmd.params.get("PROTOCOL", "18"))
357 from_port = int(cmd.params.get("FROM_PORT", "0"))
358 to_port = int(cmd.params.get("TO_PORT", "0"))
359
360 if size > 0:
361 data = await self._reader.readexactly(size)
362 else:
363 data = b""
364
365 await self._raw_session.send(
366 target, data, protocol=protocol,
367 from_port=from_port, to_port=to_port)
368
369 async def _handle_dest(self, cmd: SAMCommand) -> None:
370 """DEST LOOKUP — resolve a hostname to a destination.
371
372 In production, this would query the I2P network database.
373 """
374 opcode = cmd.opcode.upper()
375 if opcode == "GENERATE":
376 # Generate a new transient destination
377 _, dest_b64 = generate_transient_destination()
378 await self._write(SAMReply.dest_reply("TRANSIENT", dest_b64))
379 elif opcode == "LOOKUP":
380 name = cmd.params.get("NAME", "")
381 if not name:
382 await self._write(SAMReply.dest_not_found(""))
383 return
384 # Without a real naming service, we can only resolve "ME"
385 if name == "ME" and self._session_nickname:
386 session = await self._sessions_db.get(self._session_nickname)
387 if session:
388 await self._write(SAMReply.dest_reply(name, session.destination_b64))
389 return
390 await self._write(SAMReply.dest_not_found(name))
391
392 async def _handle_naming(self, cmd: SAMCommand) -> None:
393 """NAMING LOOKUP — resolve a name via the naming service."""
394 name = cmd.params.get("NAME", "")
395 if not name:
396 await self._write(SAMReply.naming_not_found(""))
397 return
398
399 # "ME" resolves to this session's destination
400 if name == "ME" and self._session_nickname:
401 session = await self._sessions_db.get(self._session_nickname)
402 if session:
403 await self._write(SAMReply.naming_reply(name, session.destination_b64))
404 return
405
406 # Without a real naming service, all other lookups fail
407 await self._write(SAMReply.naming_not_found(name))
408
409 async def _handle_ping(self, cmd: SAMCommand) -> None:
410 """PING -> PONG.
411
412 The opcode field contains the ping data to echo back.
413 """
414 data = cmd.opcode if cmd.opcode else ""
415 await self._write(SAMReply.pong(data))
416
417 async def _write(self, message: str) -> None:
418 """Write message to client.
419
420 Args:
421 message: The SAM protocol message to send.
422 """
423 self._writer.write(message.encode("utf-8"))
424 await self._writer.drain()