"""SAMBridge -- main SAM protocol server. Ported from net.i2p.sam.SAMBridge. """ from __future__ import annotations import asyncio import logging from i2p_sam.handler import SAMHandler from i2p_sam.sessions_db import SessionsDB logger = logging.getLogger(__name__) class SAMBridge: """SAM protocol server -- listens for client connections on TCP port. Each connection gets its own SAMHandler for protocol negotiation and command dispatch. Default port is 7656 (standard SAM port). Use port=0 to let the OS assign a random available port. """ def __init__(self, host: str = "127.0.0.1", port: int = 7656) -> None: self._host = host self._port = port self._sessions_db = SessionsDB() self._server: asyncio.Server | None = None self._handlers: set[SAMHandler] = set() self._handler_tasks: set[asyncio.Task] = set() async def start(self) -> None: """Start listening for SAM client connections.""" self._server = await asyncio.start_server( self._accept_client, self._host, self._port ) # Update port in case 0 was used (OS-assigned) sockets = self._server.sockets if sockets: self._port = sockets[0].getsockname()[1] logger.info("SAMBridge listening on %s:%d", self._host, self._port) async def _accept_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter, ) -> None: """Handle new client connection. Spawns a tracked background task for the handler so it can be cancelled on shutdown (e.g. if blocked on STREAM ACCEPT). """ handler = SAMHandler(reader, writer, self._sessions_db) self._handlers.add(handler) task = asyncio.create_task(self._run_handler(handler)) self._handler_tasks.add(task) task.add_done_callback(self._handler_tasks.discard) async def _run_handler(self, handler: SAMHandler) -> None: """Run a handler, cleaning up on completion.""" try: await handler.run() finally: self._handlers.discard(handler) async def stop(self) -> None: """Stop server and cancel all active handler tasks.""" if self._server: self._server.close() await self._server.wait_closed() self._server = None # Cancel any handlers still blocked (e.g. STREAM ACCEPT) for task in list(self._handler_tasks): task.cancel() if self._handler_tasks: await asyncio.gather(*self._handler_tasks, return_exceptions=True) self._handler_tasks.clear() logger.info("SAMBridge stopped") @property def sessions_db(self) -> SessionsDB: """The session registry.""" return self._sessions_db @property def port(self) -> int: """The port the server is listening on.""" return self._port