A Python port of the Invisible Internet Project (I2P)
at main 207 lines 6.7 kB view raw
1"""Tunnel task base classes — abstract client and server tunnel patterns. 2 3Ported from net.i2p.i2ptunnel.I2PTunnelClientBase and I2PTunnelServer. 4""" 5 6from __future__ import annotations 7 8import asyncio 9import logging 10from abc import ABC, abstractmethod 11from dataclasses import dataclass, field 12 13from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType 14from i2p_apps.i2ptunnel.forwarder import bridge 15 16logger = logging.getLogger(__name__) 17 18 19@dataclass 20class TunnelStats: 21 """Connection statistics for a tunnel task.""" 22 total_connections: int = 0 23 active_connections: int = 0 24 total_bytes: int = 0 25 26 27class TunnelTask(ABC): 28 """Abstract base for all tunnel tasks.""" 29 30 def __init__(self, config: TunnelDefinition, session) -> None: 31 self._config = config 32 self._session = session 33 self._open = False 34 self._stats = TunnelStats() 35 36 @property 37 def is_open(self) -> bool: 38 return self._open 39 40 @property 41 def tunnel_type(self) -> TunnelType: 42 return self._config.type 43 44 @property 45 def stats(self) -> TunnelStats: 46 return self._stats 47 48 @abstractmethod 49 async def open(self) -> None: 50 ... 51 52 @abstractmethod 53 async def close(self) -> None: 54 ... 55 56 57class ClientTunnelTask(TunnelTask): 58 """Base for client tunnels — listens on local port, forwards to I2P. 59 60 Manages asyncio.Server on interface:listen_port. 61 Subclasses implement handle_client() for per-tunnel-type logic. 62 """ 63 64 def __init__(self, config: TunnelDefinition, session) -> None: 65 super().__init__(config, session) 66 self._server: asyncio.Server | None = None 67 self._active: set[asyncio.Task] = set() 68 69 async def open(self) -> None: 70 self._server = await asyncio.start_server( 71 self._on_connect, 72 host=self._config.interface, 73 port=self._config.listen_port, 74 ) 75 self._open = True 76 logger.info("Client tunnel %r listening on %s:%d", 77 self._config.name, self._config.interface, 78 self._config.listen_port) 79 80 async def close(self) -> None: 81 self._open = False 82 if self._server: 83 self._server.close() 84 await self._server.wait_closed() 85 self._server = None 86 87 # Cancel active connection tasks 88 for task in self._active: 89 task.cancel() 90 if self._active: 91 await asyncio.gather(*self._active, return_exceptions=True) 92 self._active.clear() 93 94 async def _on_connect( 95 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter 96 ) -> None: 97 self._stats.total_connections += 1 98 self._stats.active_connections += 1 99 task = asyncio.current_task() 100 if task: 101 self._active.add(task) 102 try: 103 await self.handle_client(reader, writer) 104 except Exception: 105 logger.exception("Error handling client in tunnel %r", 106 self._config.name) 107 finally: 108 self._stats.active_connections -= 1 109 if task: 110 self._active.discard(task) 111 try: 112 writer.close() 113 await writer.wait_closed() 114 except Exception: 115 pass 116 117 @abstractmethod 118 async def handle_client( 119 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter 120 ) -> None: 121 """Handle a single client connection. Implemented by subclasses.""" 122 ... 123 124 125class ServerTunnelTask(TunnelTask): 126 """Base for server tunnels — accepts I2P connections, forwards to local service. 127 128 Runs an accept loop on the SAM session. 129 Subclasses can override handle_incoming() for per-tunnel-type processing. 130 """ 131 132 def __init__(self, config: TunnelDefinition, session) -> None: 133 super().__init__(config, session) 134 self._accept_task: asyncio.Task | None = None 135 136 async def open(self) -> None: 137 self._open = True 138 self._accept_task = asyncio.create_task(self._accept_loop()) 139 logger.info("Server tunnel %r started, target=%s:%d", 140 self._config.name, self._config.target_host, 141 self._config.target_port) 142 143 async def close(self) -> None: 144 self._open = False 145 if self._accept_task: 146 self._accept_task.cancel() 147 try: 148 await self._accept_task 149 except asyncio.CancelledError: 150 pass 151 self._accept_task = None 152 153 async def _accept_loop(self) -> None: 154 while self._open: 155 try: 156 remote_dest, i2p_reader, i2p_writer = await self._session.accept() 157 asyncio.create_task( 158 self._handle_connection(remote_dest, i2p_reader, i2p_writer) 159 ) 160 except asyncio.CancelledError: 161 break 162 except Exception: 163 logger.exception("Error in accept loop for tunnel %r", 164 self._config.name) 165 await asyncio.sleep(1.0) 166 167 async def _handle_connection( 168 self, 169 remote_dest: str, 170 i2p_reader: asyncio.StreamReader, 171 i2p_writer: asyncio.StreamWriter, 172 ) -> None: 173 self._stats.total_connections += 1 174 self._stats.active_connections += 1 175 try: 176 local_reader, local_writer = await asyncio.open_connection( 177 self._config.target_host, self._config.target_port 178 ) 179 await self.handle_incoming( 180 remote_dest, i2p_reader, i2p_writer, local_reader, local_writer 181 ) 182 except ConnectionRefusedError: 183 logger.warning("Local service %s:%d refused connection", 184 self._config.target_host, self._config.target_port) 185 except Exception: 186 logger.exception("Error handling incoming connection for tunnel %r", 187 self._config.name) 188 finally: 189 self._stats.active_connections -= 1 190 try: 191 i2p_writer.close() 192 except Exception: 193 pass 194 195 async def handle_incoming( 196 self, 197 remote_dest: str, 198 i2p_reader: asyncio.StreamReader, 199 i2p_writer: asyncio.StreamWriter, 200 local_reader: asyncio.StreamReader, 201 local_writer: asyncio.StreamWriter, 202 ) -> None: 203 """Handle an incoming I2P connection. Override for custom processing.""" 204 def _count(n: int) -> None: 205 self._stats.total_bytes += n 206 207 await bridge(i2p_reader, i2p_writer, local_reader, local_writer, on_data=_count)