"""I2PTunnel config parser — Java Properties format for tunnel definitions. Parses the `i2ptunnel.config` format used by Java I2P: tunnel.N.key=value Ported from net.i2p.i2ptunnel.TunnelController + TunnelControllerGroup config loading. """ from __future__ import annotations import enum import logging from dataclasses import dataclass, field from pathlib import Path logger = logging.getLogger(__name__) class TunnelType(enum.Enum): """All 12 tunnel types supported by Java I2P.""" HTTPCLIENT = "httpclient" CONNECTCLIENT = "connectclient" CLIENT = "client" SERVER = "server" HTTPSERVER = "httpserver" IRCCLIENT = "ircclient" IRCSERVER = "ircserver" SOCKSTUNNEL = "sockstunnel" SOCKSIRCTUNNEL = "socksirctunnel" HTTPBIDIRSERVER = "httpbidirserver" STREAMRCLIENT = "streamrclient" STREAMRSERVER = "streamrserver" @property def is_client(self) -> bool: return self in _CLIENT_TYPES @property def is_server(self) -> bool: return self in _SERVER_TYPES _CLIENT_TYPES = { TunnelType.HTTPCLIENT, TunnelType.CONNECTCLIENT, TunnelType.CLIENT, TunnelType.IRCCLIENT, TunnelType.SOCKSTUNNEL, TunnelType.SOCKSIRCTUNNEL, TunnelType.STREAMRCLIENT, } _SERVER_TYPES = { TunnelType.SERVER, TunnelType.HTTPSERVER, TunnelType.IRCSERVER, TunnelType.HTTPBIDIRSERVER, TunnelType.STREAMRSERVER, } @dataclass class TunnelDefinition: """Parsed tunnel configuration entry.""" name: str type: TunnelType listen_port: int = 0 description: str = "" interface: str = "127.0.0.1" target_host: str = "" target_port: int = 0 target_destination: str = "" proxy_list: list[str] = field(default_factory=list) priv_key_file: str = "" start_on_load: bool = False shared_client: bool = False spoofed_host: str = "" options: dict[str, str] = field(default_factory=dict) @property def is_client(self) -> bool: return self.type.is_client @property def is_server(self) -> bool: return self.type.is_server class TunnelConfigParser: """Parse and write i2ptunnel.config Java Properties format.""" @staticmethod def load(path: Path) -> list[TunnelDefinition]: """Load tunnel definitions from an i2ptunnel.config file. Returns empty list if file doesn't exist. """ if not path.exists(): return [] # Parse all tunnel.N.key=value lines, grouped by N groups: dict[int, dict[str, str]] = {} for line in path.read_text().splitlines(): line = line.strip() if not line or line.startswith("#"): continue eq_idx = line.find("=") if eq_idx < 0: continue key = line[:eq_idx].strip() value = line[eq_idx + 1:].strip() if not key.startswith("tunnel."): continue parts = key.split(".", 3) # tunnel, N, property[.sub] if len(parts) < 3: continue try: idx = int(parts[1]) except ValueError: continue prop = ".".join(parts[2:]) groups.setdefault(idx, {})[prop] = value # Convert groups to TunnelDefinition objects result = [] for idx in sorted(groups): props = groups[idx] try: tunnel_type = TunnelType(props.get("type", "client")) except ValueError: logger.warning("Unknown tunnel type %r for tunnel.%d, skipping", props.get("type"), idx) continue # Extract option.* properties options = {} for k, v in props.items(): if k.startswith("option."): options[k[7:]] = v # strip "option." prefix # Parse proxy list proxy_str = props.get("proxyList", "") proxy_list = [p.strip() for p in proxy_str.split(",") if p.strip()] if proxy_str else [] td = TunnelDefinition( name=props.get("name", f"tunnel-{idx}"), description=props.get("description", ""), type=tunnel_type, interface=props.get("interface", "127.0.0.1"), listen_port=int(props.get("listenPort", "0")), target_host=props.get("targetHost", ""), target_port=int(props.get("targetPort", "0")), target_destination=props.get("targetDestination", ""), proxy_list=proxy_list, priv_key_file=props.get("privKeyFile", ""), start_on_load=props.get("startOnLoad", "false").lower() == "true", shared_client=props.get("sharedClient", "false").lower() == "true", spoofed_host=props.get("spoofedHost", ""), options=options, ) result.append(td) return result @staticmethod def load_dir(path: Path) -> list[TunnelDefinition]: """Load tunnel definitions from all .config files in a directory.""" if not path.exists() or not path.is_dir(): return [] result = [] for config_file in sorted(path.glob("*.config")): result.extend(TunnelConfigParser.load(config_file)) return result @staticmethod def save(path: Path, tunnels: list[TunnelDefinition]) -> None: """Write tunnel definitions to i2ptunnel.config format.""" lines = ["# I2P tunnel configuration"] for idx, td in enumerate(tunnels): lines.append("") prefix = f"tunnel.{idx}" lines.append(f"{prefix}.name={td.name}") if td.description: lines.append(f"{prefix}.description={td.description}") lines.append(f"{prefix}.type={td.type.value}") lines.append(f"{prefix}.interface={td.interface}") if td.listen_port: lines.append(f"{prefix}.listenPort={td.listen_port}") if td.target_host: lines.append(f"{prefix}.targetHost={td.target_host}") if td.target_port: lines.append(f"{prefix}.targetPort={td.target_port}") if td.target_destination: lines.append(f"{prefix}.targetDestination={td.target_destination}") if td.proxy_list: lines.append(f"{prefix}.proxyList={','.join(td.proxy_list)}") if td.priv_key_file: lines.append(f"{prefix}.privKeyFile={td.priv_key_file}") if td.shared_client: lines.append(f"{prefix}.sharedClient=true") if td.spoofed_host: lines.append(f"{prefix}.spoofedHost={td.spoofed_host}") lines.append(f"{prefix}.startOnLoad={'true' if td.start_on_load else 'false'}") for opt_key, opt_val in sorted(td.options.items()): lines.append(f"{prefix}.option.{opt_key}={opt_val}") lines.append("") path.write_text("\n".join(lines))