A Python port of the Invisible Internet Project (I2P)
1"""SAMBridge -- main SAM protocol server.
2
3Ported from net.i2p.sam.SAMBridge.
4"""
5
6from __future__ import annotations
7
8import asyncio
9import logging
10
11from i2p_sam.handler import SAMHandler
12from i2p_sam.sessions_db import SessionsDB
13
14logger = logging.getLogger(__name__)
15
16
17class SAMBridge:
18 """SAM protocol server -- listens for client connections on TCP port.
19
20 Each connection gets its own SAMHandler for protocol negotiation
21 and command dispatch.
22
23 Default port is 7656 (standard SAM port). Use port=0 to let the OS
24 assign a random available port.
25 """
26
27 def __init__(self, host: str = "127.0.0.1", port: int = 7656) -> None:
28 self._host = host
29 self._port = port
30 self._sessions_db = SessionsDB()
31 self._server: asyncio.Server | None = None
32 self._handlers: set[SAMHandler] = set()
33 self._handler_tasks: set[asyncio.Task] = set()
34
35 async def start(self) -> None:
36 """Start listening for SAM client connections."""
37 self._server = await asyncio.start_server(
38 self._accept_client, self._host, self._port
39 )
40 # Update port in case 0 was used (OS-assigned)
41 sockets = self._server.sockets
42 if sockets:
43 self._port = sockets[0].getsockname()[1]
44 logger.info("SAMBridge listening on %s:%d", self._host, self._port)
45
46 async def _accept_client(
47 self,
48 reader: asyncio.StreamReader,
49 writer: asyncio.StreamWriter,
50 ) -> None:
51 """Handle new client connection.
52
53 Spawns a tracked background task for the handler so it can
54 be cancelled on shutdown (e.g. if blocked on STREAM ACCEPT).
55 """
56 handler = SAMHandler(reader, writer, self._sessions_db)
57 self._handlers.add(handler)
58 task = asyncio.create_task(self._run_handler(handler))
59 self._handler_tasks.add(task)
60 task.add_done_callback(self._handler_tasks.discard)
61
62 async def _run_handler(self, handler: SAMHandler) -> None:
63 """Run a handler, cleaning up on completion."""
64 try:
65 await handler.run()
66 finally:
67 self._handlers.discard(handler)
68
69 async def stop(self) -> None:
70 """Stop server and cancel all active handler tasks."""
71 if self._server:
72 self._server.close()
73 await self._server.wait_closed()
74 self._server = None
75
76 # Cancel any handlers still blocked (e.g. STREAM ACCEPT)
77 for task in list(self._handler_tasks):
78 task.cancel()
79 if self._handler_tasks:
80 await asyncio.gather(*self._handler_tasks, return_exceptions=True)
81 self._handler_tasks.clear()
82
83 logger.info("SAMBridge stopped")
84
85 @property
86 def sessions_db(self) -> SessionsDB:
87 """The session registry."""
88 return self._sessions_db
89
90 @property
91 def port(self) -> int:
92 """The port the server is listening on."""
93 return self._port