A Python port of the Invisible Internet Project (I2P)
1"""Tunnel task base classes — abstract client and server tunnel patterns.
2
3Ported from net.i2p.i2ptunnel.I2PTunnelClientBase and I2PTunnelServer.
4"""
5
6from __future__ import annotations
7
8import asyncio
9import logging
10from abc import ABC, abstractmethod
11from dataclasses import dataclass, field
12
13from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType
14from i2p_apps.i2ptunnel.forwarder import bridge
15
16logger = logging.getLogger(__name__)
17
18
19@dataclass
20class TunnelStats:
21 """Connection statistics for a tunnel task."""
22 total_connections: int = 0
23 active_connections: int = 0
24 total_bytes: int = 0
25
26
27class TunnelTask(ABC):
28 """Abstract base for all tunnel tasks."""
29
30 def __init__(self, config: TunnelDefinition, session) -> None:
31 self._config = config
32 self._session = session
33 self._open = False
34 self._stats = TunnelStats()
35
36 @property
37 def is_open(self) -> bool:
38 return self._open
39
40 @property
41 def tunnel_type(self) -> TunnelType:
42 return self._config.type
43
44 @property
45 def stats(self) -> TunnelStats:
46 return self._stats
47
48 @abstractmethod
49 async def open(self) -> None:
50 ...
51
52 @abstractmethod
53 async def close(self) -> None:
54 ...
55
56
57class ClientTunnelTask(TunnelTask):
58 """Base for client tunnels — listens on local port, forwards to I2P.
59
60 Manages asyncio.Server on interface:listen_port.
61 Subclasses implement handle_client() for per-tunnel-type logic.
62 """
63
64 def __init__(self, config: TunnelDefinition, session) -> None:
65 super().__init__(config, session)
66 self._server: asyncio.Server | None = None
67 self._active: set[asyncio.Task] = set()
68
69 async def open(self) -> None:
70 self._server = await asyncio.start_server(
71 self._on_connect,
72 host=self._config.interface,
73 port=self._config.listen_port,
74 )
75 self._open = True
76 logger.info("Client tunnel %r listening on %s:%d",
77 self._config.name, self._config.interface,
78 self._config.listen_port)
79
80 async def close(self) -> None:
81 self._open = False
82 if self._server:
83 self._server.close()
84 await self._server.wait_closed()
85 self._server = None
86
87 # Cancel active connection tasks
88 for task in self._active:
89 task.cancel()
90 if self._active:
91 await asyncio.gather(*self._active, return_exceptions=True)
92 self._active.clear()
93
94 async def _on_connect(
95 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
96 ) -> None:
97 self._stats.total_connections += 1
98 self._stats.active_connections += 1
99 task = asyncio.current_task()
100 if task:
101 self._active.add(task)
102 try:
103 await self.handle_client(reader, writer)
104 except Exception:
105 logger.exception("Error handling client in tunnel %r",
106 self._config.name)
107 finally:
108 self._stats.active_connections -= 1
109 if task:
110 self._active.discard(task)
111 try:
112 writer.close()
113 await writer.wait_closed()
114 except Exception:
115 pass
116
117 @abstractmethod
118 async def handle_client(
119 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
120 ) -> None:
121 """Handle a single client connection. Implemented by subclasses."""
122 ...
123
124
125class ServerTunnelTask(TunnelTask):
126 """Base for server tunnels — accepts I2P connections, forwards to local service.
127
128 Runs an accept loop on the SAM session.
129 Subclasses can override handle_incoming() for per-tunnel-type processing.
130 """
131
132 def __init__(self, config: TunnelDefinition, session) -> None:
133 super().__init__(config, session)
134 self._accept_task: asyncio.Task | None = None
135
136 async def open(self) -> None:
137 self._open = True
138 self._accept_task = asyncio.create_task(self._accept_loop())
139 logger.info("Server tunnel %r started, target=%s:%d",
140 self._config.name, self._config.target_host,
141 self._config.target_port)
142
143 async def close(self) -> None:
144 self._open = False
145 if self._accept_task:
146 self._accept_task.cancel()
147 try:
148 await self._accept_task
149 except asyncio.CancelledError:
150 pass
151 self._accept_task = None
152
153 async def _accept_loop(self) -> None:
154 while self._open:
155 try:
156 remote_dest, i2p_reader, i2p_writer = await self._session.accept()
157 asyncio.create_task(
158 self._handle_connection(remote_dest, i2p_reader, i2p_writer)
159 )
160 except asyncio.CancelledError:
161 break
162 except Exception:
163 logger.exception("Error in accept loop for tunnel %r",
164 self._config.name)
165 await asyncio.sleep(1.0)
166
167 async def _handle_connection(
168 self,
169 remote_dest: str,
170 i2p_reader: asyncio.StreamReader,
171 i2p_writer: asyncio.StreamWriter,
172 ) -> None:
173 self._stats.total_connections += 1
174 self._stats.active_connections += 1
175 try:
176 local_reader, local_writer = await asyncio.open_connection(
177 self._config.target_host, self._config.target_port
178 )
179 await self.handle_incoming(
180 remote_dest, i2p_reader, i2p_writer, local_reader, local_writer
181 )
182 except ConnectionRefusedError:
183 logger.warning("Local service %s:%d refused connection",
184 self._config.target_host, self._config.target_port)
185 except Exception:
186 logger.exception("Error handling incoming connection for tunnel %r",
187 self._config.name)
188 finally:
189 self._stats.active_connections -= 1
190 try:
191 i2p_writer.close()
192 except Exception:
193 pass
194
195 async def handle_incoming(
196 self,
197 remote_dest: str,
198 i2p_reader: asyncio.StreamReader,
199 i2p_writer: asyncio.StreamWriter,
200 local_reader: asyncio.StreamReader,
201 local_writer: asyncio.StreamWriter,
202 ) -> None:
203 """Handle an incoming I2P connection. Override for custom processing."""
204 def _count(n: int) -> None:
205 self._stats.total_bytes += n
206
207 await bridge(i2p_reader, i2p_writer, local_reader, local_writer, on_data=_count)