A Python port of the Invisible Internet Project (I2P)
at main 219 lines 7.9 kB view raw
1"""Tunnel pool policies and build orchestration. 2 3Ported from net.i2p.router.tunnel.pool.TunnelPool and 4net.i2p.router.tunnel.pool.TunnelPoolManager. 5 6Provides: 7- TunnelEntry: a single active tunnel with lifetime tracking 8- TunnelPool: manages a pool of tunnels with policy-based maintenance 9- TunnelPoolManager: orchestrates inbound/outbound pools with build scheduling 10""" 11 12from __future__ import annotations 13 14import random 15import time 16from dataclasses import dataclass, field 17from typing import TYPE_CHECKING 18 19if TYPE_CHECKING: 20 from i2p_peer.hop_config import TunnelHopConfig 21 from i2p_peer.selector import PeerSelector 22 23 24@dataclass 25class TunnelEntry: 26 """One active tunnel.""" 27 28 tunnel_id: int 29 hops: list[bytes] # peer hashes 30 created_at: float = field(default_factory=time.monotonic) 31 lifetime_seconds: float = 600.0 32 is_inbound: bool = False 33 34 @property 35 def is_expired(self) -> bool: 36 return time.monotonic() >= self.created_at + self.lifetime_seconds 37 38 @property 39 def remaining_seconds(self) -> float: 40 return (self.created_at + self.lifetime_seconds) - time.monotonic() 41 42 @property 43 def lifetime_fraction(self) -> float: 44 """Fraction of lifetime elapsed, 0.0 (fresh) to 1.0+ (expired).""" 45 elapsed = time.monotonic() - self.created_at 46 if self.lifetime_seconds <= 0: 47 return 1.0 48 return elapsed / self.lifetime_seconds 49 50 51class TunnelPool: 52 """Manages a pool of tunnels with policy-based maintenance.""" 53 54 def __init__( 55 self, 56 target_count: int = 3, 57 min_count: int = 1, 58 tunnel_lifetime: float = 600.0, 59 rebuild_threshold: float = 0.80, 60 is_inbound: bool = False, 61 ): 62 self._target_count = target_count 63 self._min_count = min_count 64 self._tunnel_lifetime = tunnel_lifetime 65 self._rebuild_threshold = rebuild_threshold 66 self._is_inbound = is_inbound 67 self._tunnels: list[TunnelEntry] = [] 68 self._round_robin_idx: int = 0 69 70 def add_tunnel(self, entry: TunnelEntry) -> None: 71 self._tunnels.append(entry) 72 73 def remove_expired(self) -> int: 74 """Remove expired tunnels, return count removed.""" 75 before = len(self._tunnels) 76 self._tunnels = [t for t in self._tunnels if not t.is_expired] 77 removed = before - len(self._tunnels) 78 return removed 79 80 def needs_rebuild(self) -> bool: 81 """True if active (non-expired) tunnel count is below target.""" 82 active = sum(1 for t in self._tunnels if not t.is_expired) 83 return active < self._target_count 84 85 def needs_preemptive_rebuild(self) -> bool: 86 """True if any tunnel has used more than rebuild_threshold of its lifetime. 87 88 This allows starting a replacement build before the tunnel actually expires. 89 """ 90 for t in self._tunnels: 91 if t.lifetime_fraction >= self._rebuild_threshold: 92 return True 93 return False 94 95 def select_for_routing(self) -> TunnelEntry | None: 96 """Select a tunnel for routing traffic. 97 98 Prefers tunnels that are not near expiry. Falls back to any tunnel 99 if all are near expiry. 100 """ 101 if not self._tunnels: 102 return None 103 104 # Filter to tunnels below the rebuild threshold 105 healthy = [t for t in self._tunnels 106 if not t.is_expired and t.lifetime_fraction < self._rebuild_threshold] 107 if healthy: 108 return random.choice(healthy) 109 110 # Fall back to any non-expired tunnel 111 alive = [t for t in self._tunnels if not t.is_expired] 112 if alive: 113 return random.choice(alive) 114 115 # All expired — return any (caller should handle expiry) 116 return random.choice(self._tunnels) 117 118 def get_statistics(self) -> dict: 119 return { 120 "active_count": self.active_count, 121 "target_count": self._target_count, 122 "is_inbound": self._is_inbound, 123 "needs_rebuild": self.needs_rebuild(), 124 "needs_preemptive_rebuild": self.needs_preemptive_rebuild(), 125 } 126 127 @property 128 def active_count(self) -> int: 129 return len(self._tunnels) 130 131 132class TunnelPoolManager: 133 """Manages inbound and outbound tunnel pools with build orchestration.""" 134 135 def __init__( 136 self, 137 peer_selector: PeerSelector, 138 target_count: int = 3, 139 min_count: int = 1, 140 tunnel_lifetime: float = 600.0, 141 rebuild_threshold: float = 0.80, 142 tunnel_length: int = 3, 143 max_failures: int = 3, 144 ): 145 self.inbound_pool = TunnelPool( 146 target_count=target_count, min_count=min_count, 147 tunnel_lifetime=tunnel_lifetime, rebuild_threshold=rebuild_threshold, 148 is_inbound=True, 149 ) 150 self.outbound_pool = TunnelPool( 151 target_count=target_count, min_count=min_count, 152 tunnel_lifetime=tunnel_lifetime, rebuild_threshold=rebuild_threshold, 153 is_inbound=False, 154 ) 155 self._peer_selector = peer_selector 156 self._build_failures: dict[bytes, int] = {} 157 self._tunnel_length = tunnel_length 158 self._max_failures = max_failures 159 160 def maintain_pools(self) -> list[list[TunnelHopConfig]]: 161 """Check pools, return list of hop configs for tunnels that need building. 162 163 Steps: 164 1. Remove expired tunnels from both pools. 165 2. Compute how many new tunnels each pool needs. 166 3. Add preemptive rebuilds for tunnels nearing expiry. 167 4. Select hops for each needed tunnel, excluding failed peers. 168 """ 169 self.inbound_pool.remove_expired() 170 self.outbound_pool.remove_expired() 171 172 excluded = self._get_excluded_peers() 173 configs: list[list[TunnelHopConfig]] = [] 174 175 # Inbound builds 176 inbound_needed = max(0, self.inbound_pool._target_count - self.inbound_pool.active_count) 177 if inbound_needed == 0 and self.inbound_pool.needs_preemptive_rebuild(): 178 inbound_needed = 1 179 for _ in range(inbound_needed): 180 hops = self._peer_selector.select_hops(self._tunnel_length, exclude=excluded) 181 if hops: 182 configs.append(hops) 183 184 # Outbound builds 185 outbound_needed = max(0, self.outbound_pool._target_count - self.outbound_pool.active_count) 186 if outbound_needed == 0 and self.outbound_pool.needs_preemptive_rebuild(): 187 outbound_needed = 1 188 for _ in range(outbound_needed): 189 hops = self._peer_selector.select_hops(self._tunnel_length, exclude=excluded) 190 if hops: 191 configs.append(hops) 192 193 return configs 194 195 def select_inbound_outbound_pair(self) -> tuple[TunnelEntry, TunnelEntry] | None: 196 """Select one inbound and one outbound tunnel for communication.""" 197 inbound = self.inbound_pool.select_for_routing() 198 outbound = self.outbound_pool.select_for_routing() 199 if inbound is None or outbound is None: 200 return None 201 return (inbound, outbound) 202 203 def record_build_failure(self, peer_hash: bytes) -> None: 204 self._build_failures[peer_hash] = self._build_failures.get(peer_hash, 0) + 1 205 206 def record_build_success(self, tunnel_entry: TunnelEntry) -> None: 207 """Record a successful build by adding the tunnel to the appropriate pool.""" 208 if tunnel_entry.is_inbound: 209 self.inbound_pool.add_tunnel(tunnel_entry) 210 else: 211 self.outbound_pool.add_tunnel(tunnel_entry) 212 213 def get_build_failure_count(self, peer_hash: bytes) -> int: 214 return self._build_failures.get(peer_hash, 0) 215 216 def _get_excluded_peers(self) -> set[bytes]: 217 """Return peer hashes that have exceeded the failure threshold.""" 218 return {h for h, count in self._build_failures.items() 219 if count > self._max_failures}