A Python port of the Invisible Internet Project (I2P)
at main 424 lines 16 kB view raw
1"""SAM v3 connection handler -- one per client TCP connection. 2 3Ported from net.i2p.sam.SAMv3Handler. 4""" 5 6from __future__ import annotations 7 8import asyncio 9import logging 10 11from i2p_sam.protocol import SAMCommand, SAMReply, SUPPORTED_VERSIONS 12from i2p_sam.sessions_db import SessionsDB, SessionRecord 13from i2p_sam.stream_session import SAMStreamSession 14from i2p_sam.datagram_session import SAMDatagramSession 15from i2p_sam.raw_session import SAMRawSession 16from i2p_sam.primary_session import PrimarySession 17from i2p_sam.utils import generate_transient_destination, negotiate_version 18 19logger = logging.getLogger(__name__) 20 21 22class SAMHandler: 23 """Handles one SAM client connection. 24 25 Lifecycle: 26 1. HELLO negotiation (mandatory first message) 27 2. SESSION CREATE (one session per handler, or PRIMARY for multi) 28 3. Command loop: STREAM, DATAGRAM, RAW, DEST, NAMING, PING 29 4. Socket stealing on STREAM CONNECT/ACCEPT 30 """ 31 32 # Timeout for initial HELLO negotiation 33 HELLO_TIMEOUT = 60.0 34 # Timeout for subsequent commands (keepalive) 35 COMMAND_TIMEOUT = 180.0 36 37 def __init__( 38 self, 39 reader: asyncio.StreamReader, 40 writer: asyncio.StreamWriter, 41 sessions_db: SessionsDB, 42 ) -> None: 43 self._reader = reader 44 self._writer = writer 45 self._sessions_db = sessions_db 46 self._negotiated_version: str | None = None 47 self._session_nickname: str | None = None 48 self._session_style: str | None = None 49 self._stream_session: SAMStreamSession | None = None 50 self._datagram_session: SAMDatagramSession | None = None 51 self._raw_session: SAMRawSession | None = None 52 self._primary_session: PrimarySession | None = None 53 self._running = True 54 55 async def run(self) -> None: 56 """Main handler loop. 57 58 First negotiates protocol version via HELLO, then enters 59 command dispatch loop until the client disconnects or sends QUIT. 60 """ 61 try: 62 # Step 1: HELLO negotiation 63 if not await self._handle_hello(): 64 return 65 # Step 2: Command loop 66 while self._running: 67 line = await asyncio.wait_for( 68 self._reader.readline(), timeout=self.COMMAND_TIMEOUT 69 ) 70 if not line: 71 break 72 text = line.decode("utf-8", errors="replace").strip() 73 if text: 74 await self._dispatch(text) 75 except asyncio.TimeoutError: 76 logger.debug("SAM client timed out") 77 except ConnectionResetError: 78 logger.debug("SAM client connection reset") 79 finally: 80 # Clean up session on disconnect 81 if self._session_nickname: 82 await self._sessions_db.remove(self._session_nickname) 83 try: 84 self._writer.close() 85 await self._writer.wait_closed() 86 except Exception: 87 pass 88 89 async def _handle_hello(self) -> bool: 90 """Negotiate protocol version. Returns True on success. 91 92 Reads the HELLO VERSION line, negotiates version, and sends reply. 93 The client must send HELLO as the first command within HELLO_TIMEOUT. 94 """ 95 try: 96 line = await asyncio.wait_for( 97 self._reader.readline(), timeout=self.HELLO_TIMEOUT 98 ) 99 except asyncio.TimeoutError: 100 logger.debug("SAM client did not send HELLO within timeout") 101 return False 102 103 if not line: 104 return False 105 106 text = line.decode("utf-8", errors="replace").strip() 107 if not text: 108 return False 109 110 try: 111 cmd = SAMCommand.parse(text) 112 except ValueError: 113 await self._write(SAMReply.hello_noversion()) 114 return False 115 116 if cmd.verb != "HELLO" or cmd.opcode != "VERSION": 117 await self._write(SAMReply.hello_noversion()) 118 return False 119 120 client_min = cmd.params.get("MIN", "3.0") 121 client_max = cmd.params.get("MAX", "3.3") 122 123 version = negotiate_version(client_min, client_max, SUPPORTED_VERSIONS) 124 if version is None: 125 await self._write(SAMReply.hello_noversion()) 126 return False 127 128 self._negotiated_version = version 129 await self._write(SAMReply.hello_ok(version)) 130 return True 131 132 async def _dispatch(self, line: str) -> None: 133 """Route command to handler method. 134 135 Args: 136 line: The raw text line from the client (already stripped). 137 """ 138 try: 139 cmd = SAMCommand.parse(line) 140 except ValueError: 141 logger.warning("Failed to parse SAM command: %s", line) 142 return 143 144 match cmd.verb: 145 case "SESSION": 146 await self._handle_session(cmd) 147 case "STREAM": 148 await self._handle_stream(cmd) 149 case "DATAGRAM": 150 await self._handle_datagram(cmd) 151 case "RAW": 152 await self._handle_raw(cmd) 153 case "DEST": 154 await self._handle_dest(cmd) 155 case "NAMING": 156 await self._handle_naming(cmd) 157 case "PING": 158 await self._handle_ping(cmd) 159 case "QUIT" | "STOP" | "EXIT": 160 self._running = False 161 case _: 162 logger.warning("Unknown SAM verb: %s", cmd.verb) 163 164 async def _handle_session(self, cmd: SAMCommand) -> None: 165 """SESSION CREATE/ADD/REMOVE. 166 167 CREATE: generate or load destination, register in SessionsDB. 168 ADD: add subsession to existing PRIMARY session. 169 REMOVE: remove subsession from PRIMARY session. 170 """ 171 opcode = cmd.opcode.upper() 172 173 if opcode == "CREATE": 174 await self._session_create(cmd) 175 elif opcode == "ADD": 176 await self._session_add(cmd) 177 elif opcode == "REMOVE": 178 await self._session_remove(cmd) 179 else: 180 await self._write(SAMReply.session_error( 181 "I2P_ERROR", f"Unknown SESSION opcode: {opcode}")) 182 183 async def _session_create(self, cmd: SAMCommand) -> None: 184 """Handle SESSION CREATE command.""" 185 nickname = cmd.params.get("ID", "") 186 style = cmd.params.get("STYLE", "STREAM").upper() 187 dest_param = cmd.params.get("DESTINATION", "TRANSIENT") 188 189 if not nickname: 190 await self._write(SAMReply.session_error( 191 "I2P_ERROR", "Missing session ID")) 192 return 193 194 # Check for duplicate nickname 195 if await self._sessions_db.has(nickname): 196 await self._write(SAMReply.session_error("DUPLICATED_ID")) 197 return 198 199 # Generate or load destination 200 if dest_param.upper() == "TRANSIENT": 201 raw_dest, dest_b64 = generate_transient_destination() 202 else: 203 # Use provided destination key (I2P base64) 204 from i2p_data.data_helper import from_base64 205 try: 206 raw_dest = from_base64(dest_param) 207 dest_b64 = dest_param 208 except Exception: 209 await self._write(SAMReply.session_error( 210 "INVALID_KEY", "Cannot decode destination")) 211 return 212 213 # Create session record 214 record = SessionRecord( 215 nickname=nickname, 216 style=style, 217 destination=raw_dest, 218 destination_b64=dest_b64, 219 handler=self, 220 ) 221 222 if not await self._sessions_db.add(record): 223 await self._write(SAMReply.session_error("DUPLICATED_ID")) 224 return 225 226 self._session_nickname = nickname 227 self._session_style = style 228 229 # Create style-specific session object 230 if style == "STREAM": 231 self._stream_session = SAMStreamSession(nickname, dest_b64) 232 elif style == "DATAGRAM": 233 listen_port = int(cmd.params.get("PORT", "0")) 234 listen_host = cmd.params.get("HOST", "127.0.0.1") 235 self._datagram_session = SAMDatagramSession( 236 nickname, dest_b64, listen_port, listen_host) 237 elif style == "RAW": 238 protocol = int(cmd.params.get("PROTOCOL", "18")) 239 self._raw_session = SAMRawSession(nickname, dest_b64, protocol) 240 elif style == "PRIMARY": 241 self._primary_session = PrimarySession(nickname, dest_b64) 242 243 await self._write(SAMReply.session_ok(dest_b64)) 244 245 async def _session_add(self, cmd: SAMCommand) -> None: 246 """Handle SESSION ADD (for PRIMARY sessions).""" 247 nickname = cmd.params.get("ID", "") 248 style = cmd.params.get("STYLE", "STREAM").upper() 249 from_port = cmd.params.get("FROM_PORT", "0") 250 251 if not self._primary_session or self._session_nickname != nickname: 252 await self._write(SAMReply.session_error( 253 "I2P_ERROR", "No PRIMARY session with that ID")) 254 return 255 256 try: 257 record = await self._primary_session.add_subsession( 258 from_port, style, self) 259 await self._write(SAMReply.session_ok(record.destination_b64)) 260 except ValueError: 261 await self._write(SAMReply.session_error("DUPLICATED_ID")) 262 263 async def _session_remove(self, cmd: SAMCommand) -> None: 264 """Handle SESSION REMOVE (for PRIMARY sessions).""" 265 nickname = cmd.params.get("ID", "") 266 from_port = cmd.params.get("FROM_PORT", "0") 267 268 if not self._primary_session or self._session_nickname != nickname: 269 await self._write(SAMReply.session_error( 270 "I2P_ERROR", "No PRIMARY session with that ID")) 271 return 272 273 if await self._primary_session.remove_subsession(from_port): 274 await self._write(SAMReply.session_ok( 275 self._primary_session._destination_b64)) 276 else: 277 await self._write(SAMReply.session_error( 278 "I2P_ERROR", "Subsession not found")) 279 280 async def _handle_stream(self, cmd: SAMCommand) -> None: 281 """STREAM CONNECT/ACCEPT/FORWARD. 282 283 CONNECT: connect to remote destination, steal socket for raw tunnel. 284 ACCEPT: wait for incoming connection, steal socket. 285 FORWARD: listen on local port, forward connections. 286 """ 287 opcode = cmd.opcode.upper() 288 nickname = cmd.params.get("ID", "") 289 290 if not self._stream_session: 291 await self._write(SAMReply.stream_error( 292 "I2P_ERROR", "No STREAM session active")) 293 return 294 295 if opcode == "CONNECT": 296 target = cmd.params.get("DESTINATION", "") 297 if not target: 298 await self._write(SAMReply.stream_error( 299 "I2P_ERROR", "Missing DESTINATION")) 300 return 301 silent = cmd.params.get("SILENT", "false").lower() == "true" 302 from_port = int(cmd.params.get("FROM_PORT", "0")) 303 to_port = int(cmd.params.get("TO_PORT", "0")) 304 await self._stream_session.connect( 305 target, self._reader, self._writer, 306 silent=silent, from_port=from_port, to_port=to_port) 307 self._running = False # Socket is stolen 308 309 elif opcode == "ACCEPT": 310 silent = cmd.params.get("SILENT", "false").lower() == "true" 311 await self._stream_session.accept( 312 self._reader, self._writer, silent=silent) 313 self._running = False # Socket is stolen 314 315 elif opcode == "FORWARD": 316 port = int(cmd.params.get("PORT", "0")) 317 host = cmd.params.get("HOST", "127.0.0.1") 318 silent = cmd.params.get("SILENT", "false").lower() == "true" 319 await self._stream_session.forward(port, host, silent) 320 await self._write(SAMReply.stream_ok()) 321 322 else: 323 await self._write(SAMReply.stream_error( 324 "I2P_ERROR", f"Unknown STREAM opcode: {opcode}")) 325 326 async def _handle_datagram(self, cmd: SAMCommand) -> None: 327 """DATAGRAM SEND.""" 328 if not self._datagram_session: 329 return 330 331 opcode = cmd.opcode.upper() 332 if opcode == "SEND": 333 target = cmd.params.get("DESTINATION", "") 334 size = int(cmd.params.get("SIZE", "0")) 335 from_port = int(cmd.params.get("FROM_PORT", "0")) 336 to_port = int(cmd.params.get("TO_PORT", "0")) 337 338 # Read the payload 339 if size > 0: 340 data = await self._reader.readexactly(size) 341 else: 342 data = b"" 343 344 await self._datagram_session.send( 345 target, data, from_port=from_port, to_port=to_port) 346 347 async def _handle_raw(self, cmd: SAMCommand) -> None: 348 """RAW SEND.""" 349 if not self._raw_session: 350 return 351 352 opcode = cmd.opcode.upper() 353 if opcode == "SEND": 354 target = cmd.params.get("DESTINATION", "") 355 size = int(cmd.params.get("SIZE", "0")) 356 protocol = int(cmd.params.get("PROTOCOL", "18")) 357 from_port = int(cmd.params.get("FROM_PORT", "0")) 358 to_port = int(cmd.params.get("TO_PORT", "0")) 359 360 if size > 0: 361 data = await self._reader.readexactly(size) 362 else: 363 data = b"" 364 365 await self._raw_session.send( 366 target, data, protocol=protocol, 367 from_port=from_port, to_port=to_port) 368 369 async def _handle_dest(self, cmd: SAMCommand) -> None: 370 """DEST LOOKUP — resolve a hostname to a destination. 371 372 In production, this would query the I2P network database. 373 """ 374 opcode = cmd.opcode.upper() 375 if opcode == "GENERATE": 376 # Generate a new transient destination 377 _, dest_b64 = generate_transient_destination() 378 await self._write(SAMReply.dest_reply("TRANSIENT", dest_b64)) 379 elif opcode == "LOOKUP": 380 name = cmd.params.get("NAME", "") 381 if not name: 382 await self._write(SAMReply.dest_not_found("")) 383 return 384 # Without a real naming service, we can only resolve "ME" 385 if name == "ME" and self._session_nickname: 386 session = await self._sessions_db.get(self._session_nickname) 387 if session: 388 await self._write(SAMReply.dest_reply(name, session.destination_b64)) 389 return 390 await self._write(SAMReply.dest_not_found(name)) 391 392 async def _handle_naming(self, cmd: SAMCommand) -> None: 393 """NAMING LOOKUP — resolve a name via the naming service.""" 394 name = cmd.params.get("NAME", "") 395 if not name: 396 await self._write(SAMReply.naming_not_found("")) 397 return 398 399 # "ME" resolves to this session's destination 400 if name == "ME" and self._session_nickname: 401 session = await self._sessions_db.get(self._session_nickname) 402 if session: 403 await self._write(SAMReply.naming_reply(name, session.destination_b64)) 404 return 405 406 # Without a real naming service, all other lookups fail 407 await self._write(SAMReply.naming_not_found(name)) 408 409 async def _handle_ping(self, cmd: SAMCommand) -> None: 410 """PING -> PONG. 411 412 The opcode field contains the ping data to echo back. 413 """ 414 data = cmd.opcode if cmd.opcode else "" 415 await self._write(SAMReply.pong(data)) 416 417 async def _write(self, message: str) -> None: 418 """Write message to client. 419 420 Args: 421 message: The SAM protocol message to send. 422 """ 423 self._writer.write(message.encode("utf-8")) 424 await self._writer.drain()