A Python port of the Invisible Internet Project (I2P)
at main 295 lines 10 kB view raw
1"""Fixtures for I2PTunnel Tier 1 tests. 2 3Provides MockSAMServer, sample config content, and TunnelDefinition helpers. 4""" 5 6import asyncio 7import textwrap 8from pathlib import Path 9 10import pytest 11 12from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType 13 14 15# --------------------------------------------------------------------------- 16# Sample config matching Java I2P defaults 17# --------------------------------------------------------------------------- 18 19SAMPLE_CONFIG = textwrap.dedent("""\ 20 # I2P tunnel configuration 21 # Default tunnels 22 23 tunnel.0.name=I2P HTTP Proxy 24 tunnel.0.description=HTTP proxy for browsing I2P and the web 25 tunnel.0.type=httpclient 26 tunnel.0.interface=127.0.0.1 27 tunnel.0.listenPort=4444 28 tunnel.0.sharedClient=true 29 tunnel.0.proxyList=false.i2p 30 tunnel.0.startOnLoad=true 31 tunnel.0.option.inbound.length=3 32 tunnel.0.option.outbound.length=3 33 34 tunnel.1.name=I2P HTTPS/CONNECT Proxy 35 tunnel.1.type=connectclient 36 tunnel.1.interface=127.0.0.1 37 tunnel.1.listenPort=4445 38 tunnel.1.sharedClient=true 39 tunnel.1.proxyList=false.i2p 40 tunnel.1.startOnLoad=true 41 42 tunnel.2.name=IRC Tunnel 43 tunnel.2.type=ircclient 44 tunnel.2.interface=127.0.0.1 45 tunnel.2.listenPort=6668 46 tunnel.2.targetDestination=irc.postman.i2p 47 tunnel.2.startOnLoad=false 48 49 tunnel.3.name=I2P Webserver 50 tunnel.3.description=Local webserver accessible over I2P 51 tunnel.3.type=httpserver 52 tunnel.3.interface=127.0.0.1 53 tunnel.3.targetHost=127.0.0.1 54 tunnel.3.targetPort=7658 55 tunnel.3.spoofedHost=mysite.i2p 56 tunnel.3.privKeyFile=eepsite/eepPriv.dat 57 tunnel.3.startOnLoad=true 58 59 tunnel.4.name=SMTP Tunnel 60 tunnel.4.type=client 61 tunnel.4.interface=127.0.0.1 62 tunnel.4.listenPort=7659 63 tunnel.4.targetDestination=smtp.postman.i2p 64 tunnel.4.startOnLoad=true 65 66 tunnel.5.name=POP3 Tunnel 67 tunnel.5.type=client 68 tunnel.5.interface=127.0.0.1 69 tunnel.5.listenPort=7660 70 tunnel.5.targetDestination=pop3.postman.i2p 71 tunnel.5.startOnLoad=true 72 73 tunnel.6.name=SOCKS Proxy 74 tunnel.6.type=sockstunnel 75 tunnel.6.interface=127.0.0.1 76 tunnel.6.listenPort=4446 77 tunnel.6.startOnLoad=false 78""") 79 80 81@pytest.fixture 82def sample_config_path(tmp_path: Path) -> Path: 83 """Write sample config to a temp file and return its path.""" 84 p = tmp_path / "i2ptunnel.config" 85 p.write_text(SAMPLE_CONFIG) 86 return p 87 88 89@pytest.fixture 90def sample_config_dir(tmp_path: Path) -> Path: 91 """Create a config.d/ directory with individual tunnel files.""" 92 d = tmp_path / "i2ptunnel.config.d" 93 d.mkdir() 94 (d / "http-proxy.config").write_text(textwrap.dedent("""\ 95 tunnel.0.name=HTTP Proxy 96 tunnel.0.type=httpclient 97 tunnel.0.interface=127.0.0.1 98 tunnel.0.listenPort=4444 99 tunnel.0.startOnLoad=true 100 """)) 101 (d / "server.config").write_text(textwrap.dedent("""\ 102 tunnel.0.name=My Server 103 tunnel.0.type=server 104 tunnel.0.targetHost=127.0.0.1 105 tunnel.0.targetPort=8080 106 tunnel.0.privKeyFile=server.dat 107 tunnel.0.startOnLoad=true 108 """)) 109 return d 110 111 112def make_tunnel_def(**overrides) -> TunnelDefinition: 113 """Create a TunnelDefinition with sensible defaults.""" 114 defaults = dict( 115 name="test-tunnel", 116 description="", 117 type=TunnelType.CLIENT, 118 interface="127.0.0.1", 119 listen_port=12345, 120 target_host="", 121 target_port=0, 122 target_destination="", 123 proxy_list=[], 124 priv_key_file="", 125 start_on_load=False, 126 shared_client=False, 127 spoofed_host="", 128 options={}, 129 ) 130 defaults.update(overrides) 131 return TunnelDefinition(**defaults) 132 133 134# --------------------------------------------------------------------------- 135# MockSAMServer — lightweight asyncio server speaking SAM protocol 136# --------------------------------------------------------------------------- 137 138class MockSAMServer: 139 """Minimal SAM server for testing session creation and naming lookup. 140 141 Speaks just enough SAM protocol for I2PTunnel testing: 142 - HELLO VERSION -> HELLO REPLY RESULT=OK VERSION=3.3 143 - SESSION CREATE -> SESSION STATUS RESULT=OK DESTINATION=<transient> 144 - NAMING LOOKUP -> NAMING REPLY RESULT=OK NAME=<name> VALUE=<dest> 145 - STREAM CONNECT -> STREAM STATUS RESULT=OK (then echoes data) 146 - STREAM ACCEPT -> waits, then sends remote dest + echoes data 147 """ 148 149 MOCK_DEST_B64 = "A" * 516 # fake base64 destination (387 bytes -> 516 chars) 150 151 def __init__(self) -> None: 152 self.host = "127.0.0.1" 153 self.port = 0 # auto-assign 154 self._server: asyncio.Server | None = None 155 self._name_map: dict[str, str] = {} 156 self._sessions: dict[str, str] = {} # id -> destination 157 self._connect_echo = True # if True, echo data back on STREAM CONNECT 158 159 def add_name(self, name: str, dest: str) -> None: 160 self._name_map[name] = dest 161 162 async def start(self) -> int: 163 """Start server, return the assigned port.""" 164 self._server = await asyncio.start_server( 165 self._handle_client, self.host, self.port 166 ) 167 addr = self._server.sockets[0].getsockname() 168 self.port = addr[1] 169 return self.port 170 171 async def stop(self) -> None: 172 if self._server: 173 self._server.close() 174 await self._server.wait_closed() 175 176 async def _handle_client( 177 self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter 178 ) -> None: 179 try: 180 while True: 181 line = await asyncio.wait_for(reader.readline(), timeout=5.0) 182 if not line: 183 break 184 text = line.decode("utf-8").strip() 185 if not text: 186 continue 187 188 parts = text.split() 189 verb = parts[0].upper() if parts else "" 190 191 if verb == "HELLO": 192 writer.write(b"HELLO REPLY RESULT=OK VERSION=3.3\n") 193 await writer.drain() 194 195 elif verb == "SESSION": 196 # SESSION CREATE STYLE=STREAM ID=<id> DESTINATION=TRANSIENT 197 params = self._parse_params(parts[2:]) 198 sid = params.get("ID", "default") 199 dest = params.get("DESTINATION", "TRANSIENT") 200 if dest == "TRANSIENT": 201 dest = self.MOCK_DEST_B64 202 self._sessions[sid] = dest 203 writer.write( 204 f"SESSION STATUS RESULT=OK DESTINATION={dest}\n".encode() 205 ) 206 await writer.drain() 207 208 elif verb == "NAMING": 209 # NAMING LOOKUP NAME=<name> 210 params = self._parse_params(parts[2:]) 211 name = params.get("NAME", "") 212 if name in self._name_map: 213 writer.write( 214 f"NAMING REPLY RESULT=OK NAME={name} VALUE={self._name_map[name]}\n".encode() 215 ) 216 else: 217 writer.write( 218 f"NAMING REPLY RESULT=KEY_NOT_FOUND NAME={name}\n".encode() 219 ) 220 await writer.drain() 221 222 elif verb == "STREAM": 223 opcode = parts[1].upper() if len(parts) > 1 else "" 224 if opcode == "CONNECT": 225 writer.write(b"STREAM STATUS RESULT=OK\n") 226 await writer.drain() 227 # Echo mode: read and echo data back 228 if self._connect_echo: 229 try: 230 while True: 231 data = await asyncio.wait_for( 232 reader.read(4096), timeout=2.0 233 ) 234 if not data: 235 break 236 writer.write(data) 237 await writer.drain() 238 except (asyncio.TimeoutError, ConnectionError): 239 pass 240 break # socket consumed after STREAM CONNECT 241 elif opcode == "ACCEPT": 242 writer.write(b"STREAM STATUS RESULT=OK\n") 243 await writer.drain() 244 # Send fake remote destination 245 writer.write(f"{self.MOCK_DEST_B64}\n".encode()) 246 await writer.drain() 247 # Echo mode 248 try: 249 while True: 250 data = await asyncio.wait_for( 251 reader.read(4096), timeout=2.0 252 ) 253 if not data: 254 break 255 writer.write(data) 256 await writer.drain() 257 except (asyncio.TimeoutError, ConnectionError): 258 pass 259 break 260 261 else: 262 # Unknown command 263 writer.write(b"ERROR unknown command\n") 264 await writer.drain() 265 266 except (asyncio.TimeoutError, ConnectionError, OSError): 267 pass 268 finally: 269 try: 270 writer.close() 271 await writer.wait_closed() 272 except Exception: 273 pass 274 275 @staticmethod 276 def _parse_params(tokens: list[str]) -> dict[str, str]: 277 """Parse KEY=VALUE tokens into a dict.""" 278 params: dict[str, str] = {} 279 for tok in tokens: 280 if "=" in tok: 281 key, _, value = tok.partition("=") 282 # Strip quotes 283 if value.startswith('"') and value.endswith('"'): 284 value = value[1:-1] 285 params[key.upper()] = value 286 return params 287 288 289@pytest.fixture 290async def mock_sam_server(): 291 """Provide a running MockSAMServer and clean it up after.""" 292 server = MockSAMServer() 293 port = await server.start() 294 yield server 295 await server.stop()