A Python port of the Invisible Internet Project (I2P)
1"""Tunnel pool policies and build orchestration.
2
3Ported from net.i2p.router.tunnel.pool.TunnelPool and
4net.i2p.router.tunnel.pool.TunnelPoolManager.
5
6Provides:
7- TunnelEntry: a single active tunnel with lifetime tracking
8- TunnelPool: manages a pool of tunnels with policy-based maintenance
9- TunnelPoolManager: orchestrates inbound/outbound pools with build scheduling
10"""
11
12from __future__ import annotations
13
14import random
15import time
16from dataclasses import dataclass, field
17from typing import TYPE_CHECKING
18
19if TYPE_CHECKING:
20 from i2p_peer.hop_config import TunnelHopConfig
21 from i2p_peer.selector import PeerSelector
22
23
24@dataclass
25class TunnelEntry:
26 """One active tunnel."""
27
28 tunnel_id: int
29 hops: list[bytes] # peer hashes
30 created_at: float = field(default_factory=time.monotonic)
31 lifetime_seconds: float = 600.0
32 is_inbound: bool = False
33
34 @property
35 def is_expired(self) -> bool:
36 return time.monotonic() >= self.created_at + self.lifetime_seconds
37
38 @property
39 def remaining_seconds(self) -> float:
40 return (self.created_at + self.lifetime_seconds) - time.monotonic()
41
42 @property
43 def lifetime_fraction(self) -> float:
44 """Fraction of lifetime elapsed, 0.0 (fresh) to 1.0+ (expired)."""
45 elapsed = time.monotonic() - self.created_at
46 if self.lifetime_seconds <= 0:
47 return 1.0
48 return elapsed / self.lifetime_seconds
49
50
51class TunnelPool:
52 """Manages a pool of tunnels with policy-based maintenance."""
53
54 def __init__(
55 self,
56 target_count: int = 3,
57 min_count: int = 1,
58 tunnel_lifetime: float = 600.0,
59 rebuild_threshold: float = 0.80,
60 is_inbound: bool = False,
61 ):
62 self._target_count = target_count
63 self._min_count = min_count
64 self._tunnel_lifetime = tunnel_lifetime
65 self._rebuild_threshold = rebuild_threshold
66 self._is_inbound = is_inbound
67 self._tunnels: list[TunnelEntry] = []
68 self._round_robin_idx: int = 0
69
70 def add_tunnel(self, entry: TunnelEntry) -> None:
71 self._tunnels.append(entry)
72
73 def remove_expired(self) -> int:
74 """Remove expired tunnels, return count removed."""
75 before = len(self._tunnels)
76 self._tunnels = [t for t in self._tunnels if not t.is_expired]
77 removed = before - len(self._tunnels)
78 return removed
79
80 def needs_rebuild(self) -> bool:
81 """True if active (non-expired) tunnel count is below target."""
82 active = sum(1 for t in self._tunnels if not t.is_expired)
83 return active < self._target_count
84
85 def needs_preemptive_rebuild(self) -> bool:
86 """True if any tunnel has used more than rebuild_threshold of its lifetime.
87
88 This allows starting a replacement build before the tunnel actually expires.
89 """
90 for t in self._tunnels:
91 if t.lifetime_fraction >= self._rebuild_threshold:
92 return True
93 return False
94
95 def select_for_routing(self) -> TunnelEntry | None:
96 """Select a tunnel for routing traffic.
97
98 Prefers tunnels that are not near expiry. Falls back to any tunnel
99 if all are near expiry.
100 """
101 if not self._tunnels:
102 return None
103
104 # Filter to tunnels below the rebuild threshold
105 healthy = [t for t in self._tunnels
106 if not t.is_expired and t.lifetime_fraction < self._rebuild_threshold]
107 if healthy:
108 return random.choice(healthy)
109
110 # Fall back to any non-expired tunnel
111 alive = [t for t in self._tunnels if not t.is_expired]
112 if alive:
113 return random.choice(alive)
114
115 # All expired — return any (caller should handle expiry)
116 return random.choice(self._tunnels)
117
118 def get_statistics(self) -> dict:
119 return {
120 "active_count": self.active_count,
121 "target_count": self._target_count,
122 "is_inbound": self._is_inbound,
123 "needs_rebuild": self.needs_rebuild(),
124 "needs_preemptive_rebuild": self.needs_preemptive_rebuild(),
125 }
126
127 @property
128 def active_count(self) -> int:
129 return len(self._tunnels)
130
131
132class TunnelPoolManager:
133 """Manages inbound and outbound tunnel pools with build orchestration."""
134
135 def __init__(
136 self,
137 peer_selector: PeerSelector,
138 target_count: int = 3,
139 min_count: int = 1,
140 tunnel_lifetime: float = 600.0,
141 rebuild_threshold: float = 0.80,
142 tunnel_length: int = 3,
143 max_failures: int = 3,
144 ):
145 self.inbound_pool = TunnelPool(
146 target_count=target_count, min_count=min_count,
147 tunnel_lifetime=tunnel_lifetime, rebuild_threshold=rebuild_threshold,
148 is_inbound=True,
149 )
150 self.outbound_pool = TunnelPool(
151 target_count=target_count, min_count=min_count,
152 tunnel_lifetime=tunnel_lifetime, rebuild_threshold=rebuild_threshold,
153 is_inbound=False,
154 )
155 self._peer_selector = peer_selector
156 self._build_failures: dict[bytes, int] = {}
157 self._tunnel_length = tunnel_length
158 self._max_failures = max_failures
159
160 def maintain_pools(self) -> list[list[TunnelHopConfig]]:
161 """Check pools, return list of hop configs for tunnels that need building.
162
163 Steps:
164 1. Remove expired tunnels from both pools.
165 2. Compute how many new tunnels each pool needs.
166 3. Add preemptive rebuilds for tunnels nearing expiry.
167 4. Select hops for each needed tunnel, excluding failed peers.
168 """
169 self.inbound_pool.remove_expired()
170 self.outbound_pool.remove_expired()
171
172 excluded = self._get_excluded_peers()
173 configs: list[list[TunnelHopConfig]] = []
174
175 # Inbound builds
176 inbound_needed = max(0, self.inbound_pool._target_count - self.inbound_pool.active_count)
177 if inbound_needed == 0 and self.inbound_pool.needs_preemptive_rebuild():
178 inbound_needed = 1
179 for _ in range(inbound_needed):
180 hops = self._peer_selector.select_hops(self._tunnel_length, exclude=excluded)
181 if hops:
182 configs.append(hops)
183
184 # Outbound builds
185 outbound_needed = max(0, self.outbound_pool._target_count - self.outbound_pool.active_count)
186 if outbound_needed == 0 and self.outbound_pool.needs_preemptive_rebuild():
187 outbound_needed = 1
188 for _ in range(outbound_needed):
189 hops = self._peer_selector.select_hops(self._tunnel_length, exclude=excluded)
190 if hops:
191 configs.append(hops)
192
193 return configs
194
195 def select_inbound_outbound_pair(self) -> tuple[TunnelEntry, TunnelEntry] | None:
196 """Select one inbound and one outbound tunnel for communication."""
197 inbound = self.inbound_pool.select_for_routing()
198 outbound = self.outbound_pool.select_for_routing()
199 if inbound is None or outbound is None:
200 return None
201 return (inbound, outbound)
202
203 def record_build_failure(self, peer_hash: bytes) -> None:
204 self._build_failures[peer_hash] = self._build_failures.get(peer_hash, 0) + 1
205
206 def record_build_success(self, tunnel_entry: TunnelEntry) -> None:
207 """Record a successful build by adding the tunnel to the appropriate pool."""
208 if tunnel_entry.is_inbound:
209 self.inbound_pool.add_tunnel(tunnel_entry)
210 else:
211 self.outbound_pool.add_tunnel(tunnel_entry)
212
213 def get_build_failure_count(self, peer_hash: bytes) -> int:
214 return self._build_failures.get(peer_hash, 0)
215
216 def _get_excluded_peers(self) -> set[bytes]:
217 """Return peer hashes that have exceeded the failure threshold."""
218 return {h for h, count in self._build_failures.items()
219 if count > self._max_failures}