"""Fixtures for I2PTunnel Tier 1 tests. Provides MockSAMServer, sample config content, and TunnelDefinition helpers. """ import asyncio import textwrap from pathlib import Path import pytest from i2p_apps.i2ptunnel.config import TunnelDefinition, TunnelType # --------------------------------------------------------------------------- # Sample config matching Java I2P defaults # --------------------------------------------------------------------------- SAMPLE_CONFIG = textwrap.dedent("""\ # I2P tunnel configuration # Default tunnels tunnel.0.name=I2P HTTP Proxy tunnel.0.description=HTTP proxy for browsing I2P and the web tunnel.0.type=httpclient tunnel.0.interface=127.0.0.1 tunnel.0.listenPort=4444 tunnel.0.sharedClient=true tunnel.0.proxyList=false.i2p tunnel.0.startOnLoad=true tunnel.0.option.inbound.length=3 tunnel.0.option.outbound.length=3 tunnel.1.name=I2P HTTPS/CONNECT Proxy tunnel.1.type=connectclient tunnel.1.interface=127.0.0.1 tunnel.1.listenPort=4445 tunnel.1.sharedClient=true tunnel.1.proxyList=false.i2p tunnel.1.startOnLoad=true tunnel.2.name=IRC Tunnel tunnel.2.type=ircclient tunnel.2.interface=127.0.0.1 tunnel.2.listenPort=6668 tunnel.2.targetDestination=irc.postman.i2p tunnel.2.startOnLoad=false tunnel.3.name=I2P Webserver tunnel.3.description=Local webserver accessible over I2P tunnel.3.type=httpserver tunnel.3.interface=127.0.0.1 tunnel.3.targetHost=127.0.0.1 tunnel.3.targetPort=7658 tunnel.3.spoofedHost=mysite.i2p tunnel.3.privKeyFile=eepsite/eepPriv.dat tunnel.3.startOnLoad=true tunnel.4.name=SMTP Tunnel tunnel.4.type=client tunnel.4.interface=127.0.0.1 tunnel.4.listenPort=7659 tunnel.4.targetDestination=smtp.postman.i2p tunnel.4.startOnLoad=true tunnel.5.name=POP3 Tunnel tunnel.5.type=client tunnel.5.interface=127.0.0.1 tunnel.5.listenPort=7660 tunnel.5.targetDestination=pop3.postman.i2p tunnel.5.startOnLoad=true tunnel.6.name=SOCKS Proxy tunnel.6.type=sockstunnel tunnel.6.interface=127.0.0.1 tunnel.6.listenPort=4446 tunnel.6.startOnLoad=false """) @pytest.fixture def sample_config_path(tmp_path: Path) -> Path: """Write sample config to a temp file and return its path.""" p = tmp_path / "i2ptunnel.config" p.write_text(SAMPLE_CONFIG) return p @pytest.fixture def sample_config_dir(tmp_path: Path) -> Path: """Create a config.d/ directory with individual tunnel files.""" d = tmp_path / "i2ptunnel.config.d" d.mkdir() (d / "http-proxy.config").write_text(textwrap.dedent("""\ tunnel.0.name=HTTP Proxy tunnel.0.type=httpclient tunnel.0.interface=127.0.0.1 tunnel.0.listenPort=4444 tunnel.0.startOnLoad=true """)) (d / "server.config").write_text(textwrap.dedent("""\ tunnel.0.name=My Server tunnel.0.type=server tunnel.0.targetHost=127.0.0.1 tunnel.0.targetPort=8080 tunnel.0.privKeyFile=server.dat tunnel.0.startOnLoad=true """)) return d def make_tunnel_def(**overrides) -> TunnelDefinition: """Create a TunnelDefinition with sensible defaults.""" defaults = dict( name="test-tunnel", description="", type=TunnelType.CLIENT, interface="127.0.0.1", listen_port=12345, target_host="", target_port=0, target_destination="", proxy_list=[], priv_key_file="", start_on_load=False, shared_client=False, spoofed_host="", options={}, ) defaults.update(overrides) return TunnelDefinition(**defaults) # --------------------------------------------------------------------------- # MockSAMServer — lightweight asyncio server speaking SAM protocol # --------------------------------------------------------------------------- class MockSAMServer: """Minimal SAM server for testing session creation and naming lookup. Speaks just enough SAM protocol for I2PTunnel testing: - HELLO VERSION -> HELLO REPLY RESULT=OK VERSION=3.3 - SESSION CREATE -> SESSION STATUS RESULT=OK DESTINATION= - NAMING LOOKUP -> NAMING REPLY RESULT=OK NAME= VALUE= - STREAM CONNECT -> STREAM STATUS RESULT=OK (then echoes data) - STREAM ACCEPT -> waits, then sends remote dest + echoes data """ MOCK_DEST_B64 = "A" * 516 # fake base64 destination (387 bytes -> 516 chars) def __init__(self) -> None: self.host = "127.0.0.1" self.port = 0 # auto-assign self._server: asyncio.Server | None = None self._name_map: dict[str, str] = {} self._sessions: dict[str, str] = {} # id -> destination self._connect_echo = True # if True, echo data back on STREAM CONNECT def add_name(self, name: str, dest: str) -> None: self._name_map[name] = dest async def start(self) -> int: """Start server, return the assigned port.""" self._server = await asyncio.start_server( self._handle_client, self.host, self.port ) addr = self._server.sockets[0].getsockname() self.port = addr[1] return self.port async def stop(self) -> None: if self._server: self._server.close() await self._server.wait_closed() async def _handle_client( self, reader: asyncio.StreamReader, writer: asyncio.StreamWriter ) -> None: try: while True: line = await asyncio.wait_for(reader.readline(), timeout=5.0) if not line: break text = line.decode("utf-8").strip() if not text: continue parts = text.split() verb = parts[0].upper() if parts else "" if verb == "HELLO": writer.write(b"HELLO REPLY RESULT=OK VERSION=3.3\n") await writer.drain() elif verb == "SESSION": # SESSION CREATE STYLE=STREAM ID= DESTINATION=TRANSIENT params = self._parse_params(parts[2:]) sid = params.get("ID", "default") dest = params.get("DESTINATION", "TRANSIENT") if dest == "TRANSIENT": dest = self.MOCK_DEST_B64 self._sessions[sid] = dest writer.write( f"SESSION STATUS RESULT=OK DESTINATION={dest}\n".encode() ) await writer.drain() elif verb == "NAMING": # NAMING LOOKUP NAME= params = self._parse_params(parts[2:]) name = params.get("NAME", "") if name in self._name_map: writer.write( f"NAMING REPLY RESULT=OK NAME={name} VALUE={self._name_map[name]}\n".encode() ) else: writer.write( f"NAMING REPLY RESULT=KEY_NOT_FOUND NAME={name}\n".encode() ) await writer.drain() elif verb == "STREAM": opcode = parts[1].upper() if len(parts) > 1 else "" if opcode == "CONNECT": writer.write(b"STREAM STATUS RESULT=OK\n") await writer.drain() # Echo mode: read and echo data back if self._connect_echo: try: while True: data = await asyncio.wait_for( reader.read(4096), timeout=2.0 ) if not data: break writer.write(data) await writer.drain() except (asyncio.TimeoutError, ConnectionError): pass break # socket consumed after STREAM CONNECT elif opcode == "ACCEPT": writer.write(b"STREAM STATUS RESULT=OK\n") await writer.drain() # Send fake remote destination writer.write(f"{self.MOCK_DEST_B64}\n".encode()) await writer.drain() # Echo mode try: while True: data = await asyncio.wait_for( reader.read(4096), timeout=2.0 ) if not data: break writer.write(data) await writer.drain() except (asyncio.TimeoutError, ConnectionError): pass break else: # Unknown command writer.write(b"ERROR unknown command\n") await writer.drain() except (asyncio.TimeoutError, ConnectionError, OSError): pass finally: try: writer.close() await writer.wait_closed() except Exception: pass @staticmethod def _parse_params(tokens: list[str]) -> dict[str, str]: """Parse KEY=VALUE tokens into a dict.""" params: dict[str, str] = {} for tok in tokens: if "=" in tok: key, _, value = tok.partition("=") # Strip quotes if value.startswith('"') and value.endswith('"'): value = value[1:-1] params[key.upper()] = value return params @pytest.fixture async def mock_sam_server(): """Provide a running MockSAMServer and clean it up after.""" server = MockSAMServer() port = await server.start() yield server await server.stop()