A Python port of the Invisible Internet Project (I2P)
at main 93 lines 3.0 kB view raw
1"""SAMBridge -- main SAM protocol server. 2 3Ported from net.i2p.sam.SAMBridge. 4""" 5 6from __future__ import annotations 7 8import asyncio 9import logging 10 11from i2p_sam.handler import SAMHandler 12from i2p_sam.sessions_db import SessionsDB 13 14logger = logging.getLogger(__name__) 15 16 17class SAMBridge: 18 """SAM protocol server -- listens for client connections on TCP port. 19 20 Each connection gets its own SAMHandler for protocol negotiation 21 and command dispatch. 22 23 Default port is 7656 (standard SAM port). Use port=0 to let the OS 24 assign a random available port. 25 """ 26 27 def __init__(self, host: str = "127.0.0.1", port: int = 7656) -> None: 28 self._host = host 29 self._port = port 30 self._sessions_db = SessionsDB() 31 self._server: asyncio.Server | None = None 32 self._handlers: set[SAMHandler] = set() 33 self._handler_tasks: set[asyncio.Task] = set() 34 35 async def start(self) -> None: 36 """Start listening for SAM client connections.""" 37 self._server = await asyncio.start_server( 38 self._accept_client, self._host, self._port 39 ) 40 # Update port in case 0 was used (OS-assigned) 41 sockets = self._server.sockets 42 if sockets: 43 self._port = sockets[0].getsockname()[1] 44 logger.info("SAMBridge listening on %s:%d", self._host, self._port) 45 46 async def _accept_client( 47 self, 48 reader: asyncio.StreamReader, 49 writer: asyncio.StreamWriter, 50 ) -> None: 51 """Handle new client connection. 52 53 Spawns a tracked background task for the handler so it can 54 be cancelled on shutdown (e.g. if blocked on STREAM ACCEPT). 55 """ 56 handler = SAMHandler(reader, writer, self._sessions_db) 57 self._handlers.add(handler) 58 task = asyncio.create_task(self._run_handler(handler)) 59 self._handler_tasks.add(task) 60 task.add_done_callback(self._handler_tasks.discard) 61 62 async def _run_handler(self, handler: SAMHandler) -> None: 63 """Run a handler, cleaning up on completion.""" 64 try: 65 await handler.run() 66 finally: 67 self._handlers.discard(handler) 68 69 async def stop(self) -> None: 70 """Stop server and cancel all active handler tasks.""" 71 if self._server: 72 self._server.close() 73 await self._server.wait_closed() 74 self._server = None 75 76 # Cancel any handlers still blocked (e.g. STREAM ACCEPT) 77 for task in list(self._handler_tasks): 78 task.cancel() 79 if self._handler_tasks: 80 await asyncio.gather(*self._handler_tasks, return_exceptions=True) 81 self._handler_tasks.clear() 82 83 logger.info("SAMBridge stopped") 84 85 @property 86 def sessions_db(self) -> SessionsDB: 87 """The session registry.""" 88 return self._sessions_db 89 90 @property 91 def port(self) -> int: 92 """The port the server is listening on.""" 93 return self._port