A Python port of the Invisible Internet Project (I2P)
1"""Fixtures for I2PTunnel Tier 1 tests.
2
3Provides MockSAMServer, sample config content, and TunnelDefinition helpers.
4"""
5
6import asyncio
7import textwrap
8from pathlib import Path
9
10import pytest
11
12from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType
13
14
15# ---------------------------------------------------------------------------
16# Sample config matching Java I2P defaults
17# ---------------------------------------------------------------------------
18
19SAMPLE_CONFIG = textwrap.dedent("""\
20 # I2P tunnel configuration
21 # Default tunnels
22
23 tunnel.0.name=I2P HTTP Proxy
24 tunnel.0.description=HTTP proxy for browsing I2P and the web
25 tunnel.0.type=httpclient
26 tunnel.0.interface=127.0.0.1
27 tunnel.0.listenPort=4444
28 tunnel.0.sharedClient=true
29 tunnel.0.proxyList=false.i2p
30 tunnel.0.startOnLoad=true
31 tunnel.0.option.inbound.length=3
32 tunnel.0.option.outbound.length=3
33
34 tunnel.1.name=I2P HTTPS/CONNECT Proxy
35 tunnel.1.type=connectclient
36 tunnel.1.interface=127.0.0.1
37 tunnel.1.listenPort=4445
38 tunnel.1.sharedClient=true
39 tunnel.1.proxyList=false.i2p
40 tunnel.1.startOnLoad=true
41
42 tunnel.2.name=IRC Tunnel
43 tunnel.2.type=ircclient
44 tunnel.2.interface=127.0.0.1
45 tunnel.2.listenPort=6668
46 tunnel.2.targetDestination=irc.postman.i2p
47 tunnel.2.startOnLoad=false
48
49 tunnel.3.name=I2P Webserver
50 tunnel.3.description=Local webserver accessible over I2P
51 tunnel.3.type=httpserver
52 tunnel.3.interface=127.0.0.1
53 tunnel.3.targetHost=127.0.0.1
54 tunnel.3.targetPort=7658
55 tunnel.3.spoofedHost=mysite.i2p
56 tunnel.3.privKeyFile=eepsite/eepPriv.dat
57 tunnel.3.startOnLoad=true
58
59 tunnel.4.name=SMTP Tunnel
60 tunnel.4.type=client
61 tunnel.4.interface=127.0.0.1
62 tunnel.4.listenPort=7659
63 tunnel.4.targetDestination=smtp.postman.i2p
64 tunnel.4.startOnLoad=true
65
66 tunnel.5.name=POP3 Tunnel
67 tunnel.5.type=client
68 tunnel.5.interface=127.0.0.1
69 tunnel.5.listenPort=7660
70 tunnel.5.targetDestination=pop3.postman.i2p
71 tunnel.5.startOnLoad=true
72
73 tunnel.6.name=SOCKS Proxy
74 tunnel.6.type=sockstunnel
75 tunnel.6.interface=127.0.0.1
76 tunnel.6.listenPort=4446
77 tunnel.6.startOnLoad=false
78""")
79
80
81@pytest.fixture
82def sample_config_path(tmp_path: Path) -> Path:
83 """Write sample config to a temp file and return its path."""
84 p = tmp_path / "i2ptunnel.config"
85 p.write_text(SAMPLE_CONFIG)
86 return p
87
88
89@pytest.fixture
90def sample_config_dir(tmp_path: Path) -> Path:
91 """Create a config.d/ directory with individual tunnel files."""
92 d = tmp_path / "i2ptunnel.config.d"
93 d.mkdir()
94 (d / "http-proxy.config").write_text(textwrap.dedent("""\
95 tunnel.0.name=HTTP Proxy
96 tunnel.0.type=httpclient
97 tunnel.0.interface=127.0.0.1
98 tunnel.0.listenPort=4444
99 tunnel.0.startOnLoad=true
100 """))
101 (d / "server.config").write_text(textwrap.dedent("""\
102 tunnel.0.name=My Server
103 tunnel.0.type=server
104 tunnel.0.targetHost=127.0.0.1
105 tunnel.0.targetPort=8080
106 tunnel.0.privKeyFile=server.dat
107 tunnel.0.startOnLoad=true
108 """))
109 return d
110
111
112def make_tunnel_def(**overrides) -> TunnelDefinition:
113 """Create a TunnelDefinition with sensible defaults."""
114 defaults = dict(
115 name="test-tunnel",
116 description="",
117 type=TunnelType.CLIENT,
118 interface="127.0.0.1",
119 listen_port=12345,
120 target_host="",
121 target_port=0,
122 target_destination="",
123 proxy_list=[],
124 priv_key_file="",
125 start_on_load=False,
126 shared_client=False,
127 spoofed_host="",
128 options={},
129 )
130 defaults.update(overrides)
131 return TunnelDefinition(**defaults)
132
133
134# ---------------------------------------------------------------------------
135# MockSAMServer — lightweight asyncio server speaking SAM protocol
136# ---------------------------------------------------------------------------
137
138class MockSAMServer:
139 """Minimal SAM server for testing session creation and naming lookup.
140
141 Speaks just enough SAM protocol for I2PTunnel testing:
142 - HELLO VERSION -> HELLO REPLY RESULT=OK VERSION=3.3
143 - SESSION CREATE -> SESSION STATUS RESULT=OK DESTINATION=<transient>
144 - NAMING LOOKUP -> NAMING REPLY RESULT=OK NAME=<name> VALUE=<dest>
145 - STREAM CONNECT -> STREAM STATUS RESULT=OK (then echoes data)
146 - STREAM ACCEPT -> waits, then sends remote dest + echoes data
147 """
148
149 MOCK_DEST_B64 = "A" * 516 # fake base64 destination (387 bytes -> 516 chars)
150
151 def __init__(self) -> None:
152 self.host = "127.0.0.1"
153 self.port = 0 # auto-assign
154 self._server: asyncio.Server | None = None
155 self._name_map: dict[str, str] = {}
156 self._sessions: dict[str, str] = {} # id -> destination
157 self._connect_echo = True # if True, echo data back on STREAM CONNECT
158
159 def add_name(self, name: str, dest: str) -> None:
160 self._name_map[name] = dest
161
162 async def start(self) -> int:
163 """Start server, return the assigned port."""
164 self._server = await asyncio.start_server(
165 self._handle_client, self.host, self.port
166 )
167 addr = self._server.sockets[0].getsockname()
168 self.port = addr[1]
169 return self.port
170
171 async def stop(self) -> None:
172 if self._server:
173 self._server.close()
174 await self._server.wait_closed()
175
176 async def _handle_client(
177 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter
178 ) -> None:
179 try:
180 while True:
181 line = await asyncio.wait_for(reader.readline(), timeout=5.0)
182 if not line:
183 break
184 text = line.decode("utf-8").strip()
185 if not text:
186 continue
187
188 parts = text.split()
189 verb = parts[0].upper() if parts else ""
190
191 if verb == "HELLO":
192 writer.write(b"HELLO REPLY RESULT=OK VERSION=3.3\n")
193 await writer.drain()
194
195 elif verb == "SESSION":
196 # SESSION CREATE STYLE=STREAM ID=<id> DESTINATION=TRANSIENT
197 params = self._parse_params(parts[2:])
198 sid = params.get("ID", "default")
199 dest = params.get("DESTINATION", "TRANSIENT")
200 if dest == "TRANSIENT":
201 dest = self.MOCK_DEST_B64
202 self._sessions[sid] = dest
203 writer.write(
204 f"SESSION STATUS RESULT=OK DESTINATION={dest}\n".encode()
205 )
206 await writer.drain()
207
208 elif verb == "NAMING":
209 # NAMING LOOKUP NAME=<name>
210 params = self._parse_params(parts[2:])
211 name = params.get("NAME", "")
212 if name in self._name_map:
213 writer.write(
214 f"NAMING REPLY RESULT=OK NAME={name} VALUE={self._name_map[name]}\n".encode()
215 )
216 else:
217 writer.write(
218 f"NAMING REPLY RESULT=KEY_NOT_FOUND NAME={name}\n".encode()
219 )
220 await writer.drain()
221
222 elif verb == "STREAM":
223 opcode = parts[1].upper() if len(parts) > 1 else ""
224 if opcode == "CONNECT":
225 writer.write(b"STREAM STATUS RESULT=OK\n")
226 await writer.drain()
227 # Echo mode: read and echo data back
228 if self._connect_echo:
229 try:
230 while True:
231 data = await asyncio.wait_for(
232 reader.read(4096), timeout=2.0
233 )
234 if not data:
235 break
236 writer.write(data)
237 await writer.drain()
238 except (asyncio.TimeoutError, ConnectionError):
239 pass
240 break # socket consumed after STREAM CONNECT
241 elif opcode == "ACCEPT":
242 writer.write(b"STREAM STATUS RESULT=OK\n")
243 await writer.drain()
244 # Send fake remote destination
245 writer.write(f"{self.MOCK_DEST_B64}\n".encode())
246 await writer.drain()
247 # Echo mode
248 try:
249 while True:
250 data = await asyncio.wait_for(
251 reader.read(4096), timeout=2.0
252 )
253 if not data:
254 break
255 writer.write(data)
256 await writer.drain()
257 except (asyncio.TimeoutError, ConnectionError):
258 pass
259 break
260
261 else:
262 # Unknown command
263 writer.write(b"ERROR unknown command\n")
264 await writer.drain()
265
266 except (asyncio.TimeoutError, ConnectionError, OSError):
267 pass
268 finally:
269 try:
270 writer.close()
271 await writer.wait_closed()
272 except Exception:
273 pass
274
275 @staticmethod
276 def _parse_params(tokens: list[str]) -> dict[str, str]:
277 """Parse KEY=VALUE tokens into a dict."""
278 params: dict[str, str] = {}
279 for tok in tokens:
280 if "=" in tok:
281 key, _, value = tok.partition("=")
282 # Strip quotes
283 if value.startswith('"') and value.endswith('"'):
284 value = value[1:-1]
285 params[key.upper()] = value
286 return params
287
288
289@pytest.fixture
290async def mock_sam_server():
291 """Provide a running MockSAMServer and clean it up after."""
292 server = MockSAMServer()
293 port = await server.start()
294 yield server
295 await server.stop()