"""Tunnel task base classes — abstract client and server tunnel patterns. Ported from net.i2p.i2ptunnel.I2PTunnelClientBase and I2PTunnelServer. """ from __future__ import annotations import asyncio import logging from abc import ABC, abstractmethod from dataclasses import dataclass, field from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType from i2p_apps.i2ptunnel.forwarder import bridge logger = logging.getLogger(__name__) @dataclass class TunnelStats: """Connection statistics for a tunnel task.""" total_connections: int = 0 active_connections: int = 0 total_bytes: int = 0 class TunnelTask(ABC): """Abstract base for all tunnel tasks.""" def __init__(self, config: TunnelDefinition, session) -> None: self._config = config self._session = session self._open = False self._stats = TunnelStats() @property def is_open(self) -> bool: return self._open @property def tunnel_type(self) -> TunnelType: return self._config.type @property def stats(self) -> TunnelStats: return self._stats @abstractmethod async def open(self) -> None: ... @abstractmethod async def close(self) -> None: ... class ClientTunnelTask(TunnelTask): """Base for client tunnels — listens on local port, forwards to I2P. Manages asyncio.Server on interface:listen_port. Subclasses implement handle_client() for per-tunnel-type logic. """ def __init__(self, config: TunnelDefinition, session) -> None: super().__init__(config, session) self._server: asyncio.Server | None = None self._active: set[asyncio.Task] = set() async def open(self) -> None: self._server = await asyncio.start_server( self._on_connect, host=self._config.interface, port=self._config.listen_port, ) self._open = True logger.info("Client tunnel %r listening on %s:%d", self._config.name, self._config.interface, self._config.listen_port) async def close(self) -> None: self._open = False if self._server: self._server.close() await self._server.wait_closed() self._server = None # Cancel active connection tasks for task in self._active: task.cancel() if self._active: await asyncio.gather(*self._active, return_exceptions=True) self._active.clear() async def _on_connect( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: self._stats.total_connections += 1 self._stats.active_connections += 1 task = asyncio.current_task() if task: self._active.add(task) try: await self.handle_client(reader, writer) except Exception: logger.exception("Error handling client in tunnel %r", self._config.name) finally: self._stats.active_connections -= 1 if task: self._active.discard(task) try: writer.close() await writer.wait_closed() except Exception: pass @abstractmethod async def handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: """Handle a single client connection. Implemented by subclasses.""" ... class ServerTunnelTask(TunnelTask): """Base for server tunnels — accepts I2P connections, forwards to local service. Runs an accept loop on the SAM session. Subclasses can override handle_incoming() for per-tunnel-type processing. """ def __init__(self, config: TunnelDefinition, session) -> None: super().__init__(config, session) self._accept_task: asyncio.Task | None = None async def open(self) -> None: self._open = True self._accept_task = asyncio.create_task(self._accept_loop()) logger.info("Server tunnel %r started, target=%s:%d", self._config.name, self._config.target_host, self._config.target_port) async def close(self) -> None: self._open = False if self._accept_task: self._accept_task.cancel() try: await self._accept_task except asyncio.CancelledError: pass self._accept_task = None async def _accept_loop(self) -> None: while self._open: try: remote_dest, i2p_reader, i2p_writer = await self._session.accept() asyncio.create_task( self._handle_connection(remote_dest, i2p_reader, i2p_writer) ) except asyncio.CancelledError: break except Exception: logger.exception("Error in accept loop for tunnel %r", self._config.name) await asyncio.sleep(1.0) async def _handle_connection( self, remote_dest: str, i2p_reader: asyncio.StreamReader, i2p_writer: asyncio.StreamWriter, ) -> None: self._stats.total_connections += 1 self._stats.active_connections += 1 try: local_reader, local_writer = await asyncio.open_connection( self._config.target_host, self._config.target_port ) await self.handle_incoming( remote_dest, i2p_reader, i2p_writer, local_reader, local_writer ) except ConnectionRefusedError: logger.warning("Local service %s:%d refused connection", self._config.target_host, self._config.target_port) except Exception: logger.exception("Error handling incoming connection for tunnel %r", self._config.name) finally: self._stats.active_connections -= 1 try: i2p_writer.close() except Exception: pass async def handle_incoming( self, remote_dest: str, i2p_reader: asyncio.StreamReader, i2p_writer: asyncio.StreamWriter, local_reader: asyncio.StreamReader, local_writer: asyncio.StreamWriter, ) -> None: """Handle an incoming I2P connection. Override for custom processing.""" def _count(n: int) -> None: self._stats.total_bytes += n await bridge(i2p_reader, i2p_writer, local_reader, local_writer, on_data=_count)