"""I2PTunnel controller — manages lifecycle of a single tunnel. Ported from net.i2p.i2ptunnel.TunnelController. """ from __future__ import annotations import enum import logging from i2p_apps.i2ptunnel.config import TunnelDefinition logger = logging.getLogger(__name__) class TunnelState(enum.Enum): STOPPED = "stopped" STARTING = "starting" RUNNING = "running" STOPPING = "stopping" DESTROYED = "destroyed" class TunnelController: """Manages the lifecycle of a single I2P tunnel. State machine: STOPPED -> STARTING -> RUNNING -> STOPPING -> STOPPED DESTROYED is terminal. """ def __init__(self, config: TunnelDefinition, session_factory) -> None: self._config = config self._session_factory = session_factory self._state = TunnelState.STOPPED self._session = None self._tunnel_task = None @property def state(self) -> TunnelState: return self._state @property def config(self) -> TunnelDefinition: return self._config @property def session(self): return self._session async def start(self) -> None: """Start the tunnel: create session and tunnel task.""" if self._state == TunnelState.DESTROYED: raise RuntimeError("Cannot start a destroyed tunnel") if self._state == TunnelState.RUNNING: return # already running self._state = TunnelState.STARTING logger.info("Starting tunnel %r", self._config.name) try: self._session = await self._session_factory.create( self._config, priv_key=self._config.priv_key_file or None, ) self._state = TunnelState.RUNNING logger.info("Tunnel %r is RUNNING", self._config.name) except Exception: self._state = TunnelState.STOPPED logger.exception("Failed to start tunnel %r", self._config.name) raise async def stop(self) -> None: """Stop the tunnel: release session and clean up.""" if self._state in (TunnelState.STOPPED, TunnelState.DESTROYED): return # nothing to do self._state = TunnelState.STOPPING logger.info("Stopping tunnel %r", self._config.name) if self._session is not None: try: await self._session_factory.release(self._session) except Exception: logger.exception("Error releasing session for tunnel %r", self._config.name) self._session = None self._tunnel_task = None self._state = TunnelState.STOPPED logger.info("Tunnel %r is STOPPED", self._config.name) async def restart(self) -> None: """Stop then start the tunnel.""" await self.stop() await self.start() def destroy(self) -> None: """Mark tunnel as destroyed — cannot be restarted.""" self._state = TunnelState.DESTROYED logger.info("Tunnel %r DESTROYED", self._config.name)