A Python port of the Invisible Internet Project (I2P)
at main 218 lines 7.2 kB view raw
1"""I2PTunnel config parser — Java Properties format for tunnel definitions. 2 3Parses the `i2ptunnel.config` format used by Java I2P: 4 tunnel.N.key=value 5 6Ported from net.i2p.i2ptunnel.TunnelController + TunnelControllerGroup config loading. 7""" 8 9from __future__ import annotations 10 11import enum 12import logging 13from dataclasses import dataclass, field 14from pathlib import Path 15 16logger = logging.getLogger(__name__) 17 18 19class TunnelType(enum.Enum): 20 """All 12 tunnel types supported by Java I2P.""" 21 HTTPCLIENT = "httpclient" 22 CONNECTCLIENT = "connectclient" 23 CLIENT = "client" 24 SERVER = "server" 25 HTTPSERVER = "httpserver" 26 IRCCLIENT = "ircclient" 27 IRCSERVER = "ircserver" 28 SOCKSTUNNEL = "sockstunnel" 29 SOCKSIRCTUNNEL = "socksirctunnel" 30 HTTPBIDIRSERVER = "httpbidirserver" 31 STREAMRCLIENT = "streamrclient" 32 STREAMRSERVER = "streamrserver" 33 34 @property 35 def is_client(self) -> bool: 36 return self in _CLIENT_TYPES 37 38 @property 39 def is_server(self) -> bool: 40 return self in _SERVER_TYPES 41 42 43_CLIENT_TYPES = { 44 TunnelType.HTTPCLIENT, 45 TunnelType.CONNECTCLIENT, 46 TunnelType.CLIENT, 47 TunnelType.IRCCLIENT, 48 TunnelType.SOCKSTUNNEL, 49 TunnelType.SOCKSIRCTUNNEL, 50 TunnelType.STREAMRCLIENT, 51} 52 53_SERVER_TYPES = { 54 TunnelType.SERVER, 55 TunnelType.HTTPSERVER, 56 TunnelType.IRCSERVER, 57 TunnelType.HTTPBIDIRSERVER, 58 TunnelType.STREAMRSERVER, 59} 60 61 62@dataclass 63class TunnelDefinition: 64 """Parsed tunnel configuration entry.""" 65 66 name: str 67 type: TunnelType 68 listen_port: int = 0 69 description: str = "" 70 interface: str = "127.0.0.1" 71 target_host: str = "" 72 target_port: int = 0 73 target_destination: str = "" 74 proxy_list: list[str] = field(default_factory=list) 75 priv_key_file: str = "" 76 start_on_load: bool = False 77 shared_client: bool = False 78 spoofed_host: str = "" 79 options: dict[str, str] = field(default_factory=dict) 80 81 @property 82 def is_client(self) -> bool: 83 return self.type.is_client 84 85 @property 86 def is_server(self) -> bool: 87 return self.type.is_server 88 89 90class TunnelConfigParser: 91 """Parse and write i2ptunnel.config Java Properties format.""" 92 93 @staticmethod 94 def load(path: Path) -> list[TunnelDefinition]: 95 """Load tunnel definitions from an i2ptunnel.config file. 96 97 Returns empty list if file doesn't exist. 98 """ 99 if not path.exists(): 100 return [] 101 102 # Parse all tunnel.N.key=value lines, grouped by N 103 groups: dict[int, dict[str, str]] = {} 104 105 for line in path.read_text().splitlines(): 106 line = line.strip() 107 if not line or line.startswith("#"): 108 continue 109 110 eq_idx = line.find("=") 111 if eq_idx < 0: 112 continue 113 114 key = line[:eq_idx].strip() 115 value = line[eq_idx + 1:].strip() 116 117 if not key.startswith("tunnel."): 118 continue 119 120 parts = key.split(".", 3) # tunnel, N, property[.sub] 121 if len(parts) < 3: 122 continue 123 124 try: 125 idx = int(parts[1]) 126 except ValueError: 127 continue 128 129 prop = ".".join(parts[2:]) 130 groups.setdefault(idx, {})[prop] = value 131 132 # Convert groups to TunnelDefinition objects 133 result = [] 134 for idx in sorted(groups): 135 props = groups[idx] 136 try: 137 tunnel_type = TunnelType(props.get("type", "client")) 138 except ValueError: 139 logger.warning("Unknown tunnel type %r for tunnel.%d, skipping", 140 props.get("type"), idx) 141 continue 142 143 # Extract option.* properties 144 options = {} 145 for k, v in props.items(): 146 if k.startswith("option."): 147 options[k[7:]] = v # strip "option." prefix 148 149 # Parse proxy list 150 proxy_str = props.get("proxyList", "") 151 proxy_list = [p.strip() for p in proxy_str.split(",") if p.strip()] if proxy_str else [] 152 153 td = TunnelDefinition( 154 name=props.get("name", f"tunnel-{idx}"), 155 description=props.get("description", ""), 156 type=tunnel_type, 157 interface=props.get("interface", "127.0.0.1"), 158 listen_port=int(props.get("listenPort", "0")), 159 target_host=props.get("targetHost", ""), 160 target_port=int(props.get("targetPort", "0")), 161 target_destination=props.get("targetDestination", ""), 162 proxy_list=proxy_list, 163 priv_key_file=props.get("privKeyFile", ""), 164 start_on_load=props.get("startOnLoad", "false").lower() == "true", 165 shared_client=props.get("sharedClient", "false").lower() == "true", 166 spoofed_host=props.get("spoofedHost", ""), 167 options=options, 168 ) 169 result.append(td) 170 171 return result 172 173 @staticmethod 174 def load_dir(path: Path) -> list[TunnelDefinition]: 175 """Load tunnel definitions from all .config files in a directory.""" 176 if not path.exists() or not path.is_dir(): 177 return [] 178 179 result = [] 180 for config_file in sorted(path.glob("*.config")): 181 result.extend(TunnelConfigParser.load(config_file)) 182 return result 183 184 @staticmethod 185 def save(path: Path, tunnels: list[TunnelDefinition]) -> None: 186 """Write tunnel definitions to i2ptunnel.config format.""" 187 lines = ["# I2P tunnel configuration"] 188 189 for idx, td in enumerate(tunnels): 190 lines.append("") 191 prefix = f"tunnel.{idx}" 192 lines.append(f"{prefix}.name={td.name}") 193 if td.description: 194 lines.append(f"{prefix}.description={td.description}") 195 lines.append(f"{prefix}.type={td.type.value}") 196 lines.append(f"{prefix}.interface={td.interface}") 197 if td.listen_port: 198 lines.append(f"{prefix}.listenPort={td.listen_port}") 199 if td.target_host: 200 lines.append(f"{prefix}.targetHost={td.target_host}") 201 if td.target_port: 202 lines.append(f"{prefix}.targetPort={td.target_port}") 203 if td.target_destination: 204 lines.append(f"{prefix}.targetDestination={td.target_destination}") 205 if td.proxy_list: 206 lines.append(f"{prefix}.proxyList={','.join(td.proxy_list)}") 207 if td.priv_key_file: 208 lines.append(f"{prefix}.privKeyFile={td.priv_key_file}") 209 if td.shared_client: 210 lines.append(f"{prefix}.sharedClient=true") 211 if td.spoofed_host: 212 lines.append(f"{prefix}.spoofedHost={td.spoofed_host}") 213 lines.append(f"{prefix}.startOnLoad={'true' if td.start_on_load else 'false'}") 214 for opt_key, opt_val in sorted(td.options.items()): 215 lines.append(f"{prefix}.option.{opt_key}={opt_val}") 216 217 lines.append("") 218 path.write_text("\n".join(lines))