"""Tunnel pool policies and build orchestration. Ported from net.i2p.router.tunnel.pool.TunnelPool and net.i2p.router.tunnel.pool.TunnelPoolManager. Provides: - TunnelEntry: a single active tunnel with lifetime tracking - TunnelPool: manages a pool of tunnels with policy-based maintenance - TunnelPoolManager: orchestrates inbound/outbound pools with build scheduling """ from __future__ import annotations import random import time from dataclasses import dataclass, field from typing import TYPE_CHECKING if TYPE_CHECKING: from i2p_peer.hop_config import TunnelHopConfig from i2p_peer.selector import PeerSelector @dataclass class TunnelEntry: """One active tunnel.""" tunnel_id: int hops: list[bytes] # peer hashes created_at: float = field(default_factory=time.monotonic) lifetime_seconds: float = 600.0 is_inbound: bool = False @property def is_expired(self) -> bool: return time.monotonic() >= self.created_at + self.lifetime_seconds @property def remaining_seconds(self) -> float: return (self.created_at + self.lifetime_seconds) - time.monotonic() @property def lifetime_fraction(self) -> float: """Fraction of lifetime elapsed, 0.0 (fresh) to 1.0+ (expired).""" elapsed = time.monotonic() - self.created_at if self.lifetime_seconds <= 0: return 1.0 return elapsed / self.lifetime_seconds class TunnelPool: """Manages a pool of tunnels with policy-based maintenance.""" def __init__( self, target_count: int = 3, min_count: int = 1, tunnel_lifetime: float = 600.0, rebuild_threshold: float = 0.80, is_inbound: bool = False, ): self._target_count = target_count self._min_count = min_count self._tunnel_lifetime = tunnel_lifetime self._rebuild_threshold = rebuild_threshold self._is_inbound = is_inbound self._tunnels: list[TunnelEntry] = [] self._round_robin_idx: int = 0 def add_tunnel(self, entry: TunnelEntry) -> None: self._tunnels.append(entry) def remove_expired(self) -> int: """Remove expired tunnels, return count removed.""" before = len(self._tunnels) self._tunnels = [t for t in self._tunnels if not t.is_expired] removed = before - len(self._tunnels) return removed def needs_rebuild(self) -> bool: """True if active (non-expired) tunnel count is below target.""" active = sum(1 for t in self._tunnels if not t.is_expired) return active < self._target_count def needs_preemptive_rebuild(self) -> bool: """True if any tunnel has used more than rebuild_threshold of its lifetime. This allows starting a replacement build before the tunnel actually expires. """ for t in self._tunnels: if t.lifetime_fraction >= self._rebuild_threshold: return True return False def select_for_routing(self) -> TunnelEntry | None: """Select a tunnel for routing traffic. Prefers tunnels that are not near expiry. Falls back to any tunnel if all are near expiry. """ if not self._tunnels: return None # Filter to tunnels below the rebuild threshold healthy = [t for t in self._tunnels if not t.is_expired and t.lifetime_fraction < self._rebuild_threshold] if healthy: return random.choice(healthy) # Fall back to any non-expired tunnel alive = [t for t in self._tunnels if not t.is_expired] if alive: return random.choice(alive) # All expired — return any (caller should handle expiry) return random.choice(self._tunnels) def get_statistics(self) -> dict: return { "active_count": self.active_count, "target_count": self._target_count, "is_inbound": self._is_inbound, "needs_rebuild": self.needs_rebuild(), "needs_preemptive_rebuild": self.needs_preemptive_rebuild(), } @property def active_count(self) -> int: return len(self._tunnels) class TunnelPoolManager: """Manages inbound and outbound tunnel pools with build orchestration.""" def __init__( self, peer_selector: PeerSelector, target_count: int = 3, min_count: int = 1, tunnel_lifetime: float = 600.0, rebuild_threshold: float = 0.80, tunnel_length: int = 3, max_failures: int = 3, ): self.inbound_pool = TunnelPool( target_count=target_count, min_count=min_count, tunnel_lifetime=tunnel_lifetime, rebuild_threshold=rebuild_threshold, is_inbound=True, ) self.outbound_pool = TunnelPool( target_count=target_count, min_count=min_count, tunnel_lifetime=tunnel_lifetime, rebuild_threshold=rebuild_threshold, is_inbound=False, ) self._peer_selector = peer_selector self._build_failures: dict[bytes, int] = {} self._tunnel_length = tunnel_length self._max_failures = max_failures def maintain_pools(self) -> list[list[TunnelHopConfig]]: """Check pools, return list of hop configs for tunnels that need building. Steps: 1. Remove expired tunnels from both pools. 2. Compute how many new tunnels each pool needs. 3. Add preemptive rebuilds for tunnels nearing expiry. 4. Select hops for each needed tunnel, excluding failed peers. """ self.inbound_pool.remove_expired() self.outbound_pool.remove_expired() excluded = self._get_excluded_peers() configs: list[list[TunnelHopConfig]] = [] # Inbound builds inbound_needed = max(0, self.inbound_pool._target_count - self.inbound_pool.active_count) if inbound_needed == 0 and self.inbound_pool.needs_preemptive_rebuild(): inbound_needed = 1 for _ in range(inbound_needed): hops = self._peer_selector.select_hops(self._tunnel_length, exclude=excluded) if hops: configs.append(hops) # Outbound builds outbound_needed = max(0, self.outbound_pool._target_count - self.outbound_pool.active_count) if outbound_needed == 0 and self.outbound_pool.needs_preemptive_rebuild(): outbound_needed = 1 for _ in range(outbound_needed): hops = self._peer_selector.select_hops(self._tunnel_length, exclude=excluded) if hops: configs.append(hops) return configs def select_inbound_outbound_pair(self) -> tuple[TunnelEntry, TunnelEntry] | None: """Select one inbound and one outbound tunnel for communication.""" inbound = self.inbound_pool.select_for_routing() outbound = self.outbound_pool.select_for_routing() if inbound is None or outbound is None: return None return (inbound, outbound) def record_build_failure(self, peer_hash: bytes) -> None: self._build_failures[peer_hash] = self._build_failures.get(peer_hash, 0) + 1 def record_build_success(self, tunnel_entry: TunnelEntry) -> None: """Record a successful build by adding the tunnel to the appropriate pool.""" if tunnel_entry.is_inbound: self.inbound_pool.add_tunnel(tunnel_entry) else: self.outbound_pool.add_tunnel(tunnel_entry) def get_build_failure_count(self, peer_hash: bytes) -> int: return self._build_failures.get(peer_hash, 0) def _get_excluded_peers(self) -> set[bytes]: """Return peer hashes that have exceeded the failure threshold.""" return {h for h, count in self._build_failures.items() if count > self._max_failures}