"""I2PTunnel controller group — manages multiple tunnels from config. Ported from net.i2p.i2ptunnel.TunnelControllerGroup. """ from __future__ import annotations import logging from pathlib import Path from i2p_apps.i2ptunnel.config import TunnelConfigParser, TunnelDefinition from i2p_apps.i2ptunnel.controller import TunnelController, TunnelState from i2p_apps.i2ptunnel.sam_session import SessionFactory logger = logging.getLogger(__name__) class TunnelControllerGroup: """Manages a group of TunnelControllers loaded from config.""" def __init__( self, config_path: Path, sam_host: str = "127.0.0.1", sam_port: int = 7656, ) -> None: self._config_path = config_path self._session_factory = SessionFactory(sam_host, sam_port) self._controllers: list[TunnelController] = [] @property def controllers(self) -> list[TunnelController]: return list(self._controllers) def get_controller(self, name: str) -> TunnelController | None: for c in self._controllers: if c.config.name == name: return c return None def list_controllers(self) -> list[TunnelController]: return list(self._controllers) async def load_and_start(self) -> None: """Load config, create controllers, start startOnLoad tunnels.""" tunnels = TunnelConfigParser.load(self._config_path) # Also load from config.d/ if it exists config_d = self._config_path.parent / (self._config_path.name + ".d") if config_d.exists(): tunnels.extend(TunnelConfigParser.load_dir(config_d)) for td in tunnels: controller = TunnelController(td, self._session_factory) self._controllers.append(controller) # Start tunnels marked startOnLoad for controller in self._controllers: if controller.config.start_on_load: try: await controller.start() except Exception: logger.exception("Failed to start tunnel %r", controller.config.name) async def start_all(self) -> None: """Start all tunnels.""" for controller in self._controllers: if controller.state == TunnelState.STOPPED: try: await controller.start() except Exception: logger.exception("Failed to start tunnel %r", controller.config.name) async def stop_all(self) -> None: """Stop all running tunnels.""" for controller in self._controllers: if controller.state == TunnelState.RUNNING: try: await controller.stop() except Exception: logger.exception("Failed to stop tunnel %r", controller.config.name) # Release shared sessions await self._session_factory.close_all()