A Python port of the Invisible Internet Project (I2P)
1"""Tests for TunnelPool and TunnelPoolManager — tunnel pool policies and build orchestration.
2
3TDD: tests written first, implementation follows.
4"""
5
6from __future__ import annotations
7
8import time
9from unittest.mock import MagicMock, patch
10
11import pytest
12
13from i2p_tunnel.pool import TunnelEntry, TunnelPool, TunnelPoolManager
14
15
16# ---------------------------------------------------------------------------
17# TunnelEntry tests
18# ---------------------------------------------------------------------------
19
20class TestTunnelEntry:
21 def test_not_expired_when_fresh(self):
22 now = time.monotonic()
23 entry = TunnelEntry(tunnel_id=1, hops=[b"\x01" * 32], created_at=now,
24 lifetime_seconds=600.0, is_inbound=False)
25 assert not entry.is_expired
26
27 def test_expired_after_lifetime(self):
28 now = time.monotonic()
29 entry = TunnelEntry(tunnel_id=1, hops=[b"\x01" * 32],
30 created_at=now - 700, lifetime_seconds=600.0)
31 assert entry.is_expired
32
33 def test_remaining_seconds(self):
34 now = time.monotonic()
35 entry = TunnelEntry(tunnel_id=1, hops=[], created_at=now,
36 lifetime_seconds=600.0)
37 remaining = entry.remaining_seconds
38 assert 599.0 <= remaining <= 600.0
39
40 def test_remaining_seconds_negative_when_expired(self):
41 entry = TunnelEntry(tunnel_id=1, hops=[], created_at=time.monotonic() - 700,
42 lifetime_seconds=600.0)
43 assert entry.remaining_seconds < 0
44
45 def test_lifetime_fraction_fresh(self):
46 entry = TunnelEntry(tunnel_id=1, hops=[], created_at=time.monotonic(),
47 lifetime_seconds=600.0)
48 assert 0.0 <= entry.lifetime_fraction <= 0.01
49
50 def test_lifetime_fraction_halfway(self):
51 entry = TunnelEntry(tunnel_id=1, hops=[],
52 created_at=time.monotonic() - 300,
53 lifetime_seconds=600.0)
54 assert 0.49 <= entry.lifetime_fraction <= 0.51
55
56 def test_lifetime_fraction_expired(self):
57 entry = TunnelEntry(tunnel_id=1, hops=[],
58 created_at=time.monotonic() - 700,
59 lifetime_seconds=600.0)
60 assert entry.lifetime_fraction >= 1.0
61
62
63# ---------------------------------------------------------------------------
64# TunnelPool tests
65# ---------------------------------------------------------------------------
66
67class TestTunnelPool:
68 def _make_entry(self, tunnel_id=1, age=0.0, lifetime=600.0, is_inbound=False):
69 return TunnelEntry(
70 tunnel_id=tunnel_id,
71 hops=[b"\xaa" * 32],
72 created_at=time.monotonic() - age,
73 lifetime_seconds=lifetime,
74 is_inbound=is_inbound,
75 )
76
77 def test_initial_state_empty(self):
78 pool = TunnelPool(target_count=3)
79 assert pool.active_count == 0
80
81 def test_needs_rebuild_when_empty(self):
82 pool = TunnelPool(target_count=3, min_count=1)
83 assert pool.needs_rebuild()
84
85 def test_add_tunnel_increases_count(self):
86 pool = TunnelPool(target_count=3)
87 pool.add_tunnel(self._make_entry(tunnel_id=1))
88 assert pool.active_count == 1
89
90 def test_needs_rebuild_true_below_target(self):
91 pool = TunnelPool(target_count=3, min_count=1)
92 pool.add_tunnel(self._make_entry(tunnel_id=1))
93 pool.add_tunnel(self._make_entry(tunnel_id=2))
94 assert pool.needs_rebuild()
95
96 def test_needs_rebuild_false_at_target(self):
97 pool = TunnelPool(target_count=3, min_count=1)
98 for i in range(3):
99 pool.add_tunnel(self._make_entry(tunnel_id=i))
100 assert not pool.needs_rebuild()
101
102 def test_needs_rebuild_false_above_target(self):
103 pool = TunnelPool(target_count=2, min_count=1)
104 for i in range(4):
105 pool.add_tunnel(self._make_entry(tunnel_id=i))
106 assert not pool.needs_rebuild()
107
108 def test_remove_expired(self):
109 pool = TunnelPool(target_count=3)
110 # One fresh, one expired
111 pool.add_tunnel(self._make_entry(tunnel_id=1, age=0.0))
112 pool.add_tunnel(self._make_entry(tunnel_id=2, age=700.0, lifetime=600.0))
113 removed = pool.remove_expired()
114 assert removed == 1
115 assert pool.active_count == 1
116
117 def test_remove_expired_returns_zero_when_none_expired(self):
118 pool = TunnelPool(target_count=3)
119 pool.add_tunnel(self._make_entry(tunnel_id=1))
120 assert pool.remove_expired() == 0
121
122 def test_select_for_routing_returns_tunnel(self):
123 pool = TunnelPool(target_count=3)
124 pool.add_tunnel(self._make_entry(tunnel_id=42))
125 result = pool.select_for_routing()
126 assert result is not None
127 assert result.tunnel_id == 42
128
129 def test_select_for_routing_none_when_empty(self):
130 pool = TunnelPool(target_count=3)
131 assert pool.select_for_routing() is None
132
133 def test_select_for_routing_avoids_nearly_expired(self):
134 pool = TunnelPool(target_count=3, rebuild_threshold=0.80)
135 # Tunnel at 95% of lifetime — should be skipped
136 pool.add_tunnel(self._make_entry(tunnel_id=1, age=570.0, lifetime=600.0))
137 # Fresh tunnel — should be selected
138 pool.add_tunnel(self._make_entry(tunnel_id=2, age=0.0))
139 result = pool.select_for_routing()
140 assert result is not None
141 assert result.tunnel_id == 2
142
143 def test_select_for_routing_returns_nearly_expired_if_only_option(self):
144 """If all tunnels are near expiry, still return one rather than None."""
145 pool = TunnelPool(target_count=3, rebuild_threshold=0.80)
146 pool.add_tunnel(self._make_entry(tunnel_id=1, age=570.0, lifetime=600.0))
147 result = pool.select_for_routing()
148 assert result is not None
149
150 def test_get_statistics(self):
151 pool = TunnelPool(target_count=3, min_count=1, is_inbound=True)
152 pool.add_tunnel(self._make_entry(tunnel_id=1, age=0.0))
153 pool.add_tunnel(self._make_entry(tunnel_id=2, age=700.0, lifetime=600.0))
154 stats = pool.get_statistics()
155 assert stats["active_count"] == 2 # not yet cleaned
156 assert stats["target_count"] == 3
157 assert stats["is_inbound"] is True
158 assert "needs_rebuild" in stats
159
160 def test_needs_preemptive_rebuild_when_tunnel_nearing_expiry(self):
161 pool = TunnelPool(target_count=3, min_count=1, rebuild_threshold=0.80)
162 # 3 tunnels at target, but one is at 85% lifetime — preemptive rebuild
163 pool.add_tunnel(self._make_entry(tunnel_id=1, age=0.0))
164 pool.add_tunnel(self._make_entry(tunnel_id=2, age=0.0))
165 pool.add_tunnel(self._make_entry(tunnel_id=3, age=510.0, lifetime=600.0))
166 assert pool.needs_preemptive_rebuild()
167
168 def test_no_preemptive_rebuild_when_all_fresh(self):
169 pool = TunnelPool(target_count=3, min_count=1, rebuild_threshold=0.80)
170 for i in range(3):
171 pool.add_tunnel(self._make_entry(tunnel_id=i, age=0.0))
172 assert not pool.needs_preemptive_rebuild()
173
174
175# ---------------------------------------------------------------------------
176# TunnelPoolManager tests
177# ---------------------------------------------------------------------------
178
179class TestTunnelPoolManager:
180 def _make_selector(self, peer_count=5):
181 """Create a mock PeerSelector with peer_count peers."""
182 from i2p_peer.hop_config import TunnelHopConfig
183 import os
184 import struct
185
186 selector = MagicMock()
187 peers = [os.urandom(32) for _ in range(peer_count)]
188
189 def fake_select_hops(length, exclude=None):
190 exclude = exclude or set()
191 available = [p for p in peers if p not in exclude]
192 hops = []
193 for i, ph in enumerate(available[:length]):
194 tid = struct.unpack("!I", os.urandom(4))[0] or 1
195 hops.append(TunnelHopConfig(
196 peer_hash=ph, receive_tunnel_id=tid,
197 layer_key=os.urandom(32), iv_key=os.urandom(32),
198 reply_key=os.urandom(32), reply_iv=os.urandom(16),
199 is_gateway=(i == 0), is_endpoint=(i == length - 1),
200 ))
201 return hops
202
203 selector.select_hops = MagicMock(side_effect=fake_select_hops)
204 return selector, peers
205
206 def test_initial_pools_need_rebuild(self):
207 selector, _ = self._make_selector()
208 mgr = TunnelPoolManager(peer_selector=selector)
209 assert mgr.inbound_pool.needs_rebuild()
210 assert mgr.outbound_pool.needs_rebuild()
211
212 def test_maintain_pools_returns_hop_configs(self):
213 selector, _ = self._make_selector()
214 mgr = TunnelPoolManager(peer_selector=selector, tunnel_length=2)
215 configs = mgr.maintain_pools()
216 # Should request builds for both inbound and outbound (6 total at target=3 each)
217 assert len(configs) > 0
218 # Each config is a list of TunnelHopConfig
219 for cfg in configs:
220 assert len(cfg) == 2 # tunnel_length=2
221
222 def test_select_inbound_outbound_pair_none_when_empty(self):
223 selector, _ = self._make_selector()
224 mgr = TunnelPoolManager(peer_selector=selector)
225 assert mgr.select_inbound_outbound_pair() is None
226
227 def test_select_inbound_outbound_pair(self):
228 selector, _ = self._make_selector()
229 mgr = TunnelPoolManager(peer_selector=selector)
230 # Manually add entries
231 inbound = TunnelEntry(tunnel_id=10, hops=[b"\x01" * 32],
232 created_at=time.monotonic(), is_inbound=True)
233 outbound = TunnelEntry(tunnel_id=20, hops=[b"\x02" * 32],
234 created_at=time.monotonic(), is_inbound=False)
235 mgr.inbound_pool.add_tunnel(inbound)
236 mgr.outbound_pool.add_tunnel(outbound)
237 pair = mgr.select_inbound_outbound_pair()
238 assert pair is not None
239 assert pair[0].is_inbound
240 assert not pair[1].is_inbound
241
242 def test_record_build_failure(self):
243 selector, peers = self._make_selector()
244 mgr = TunnelPoolManager(peer_selector=selector)
245 peer = peers[0]
246 assert mgr.get_build_failure_count(peer) == 0
247 mgr.record_build_failure(peer)
248 assert mgr.get_build_failure_count(peer) == 1
249 mgr.record_build_failure(peer)
250 mgr.record_build_failure(peer)
251 assert mgr.get_build_failure_count(peer) == 3
252
253 def test_record_build_success_adds_to_pool(self):
254 selector, _ = self._make_selector()
255 mgr = TunnelPoolManager(peer_selector=selector)
256 entry = TunnelEntry(tunnel_id=99, hops=[b"\xbb" * 32],
257 created_at=time.monotonic(), is_inbound=True)
258 mgr.record_build_success(entry)
259 assert mgr.inbound_pool.active_count == 1
260
261 def test_record_build_success_outbound(self):
262 selector, _ = self._make_selector()
263 mgr = TunnelPoolManager(peer_selector=selector)
264 entry = TunnelEntry(tunnel_id=99, hops=[b"\xbb" * 32],
265 created_at=time.monotonic(), is_inbound=False)
266 mgr.record_build_success(entry)
267 assert mgr.outbound_pool.active_count == 1
268
269 def test_maintain_pools_excludes_failed_peers(self):
270 selector, peers = self._make_selector()
271 mgr = TunnelPoolManager(peer_selector=selector, max_failures=3)
272 # Fail a peer 4 times (> max_failures=3)
273 for _ in range(4):
274 mgr.record_build_failure(peers[0])
275 configs = mgr.maintain_pools()
276 # The excluded peer should be passed to select_hops
277 for call_args in selector.select_hops.call_args_list:
278 exclude = call_args.kwargs.get("exclude") or (
279 call_args.args[1] if len(call_args.args) > 1 else set()
280 )
281 assert peers[0] in exclude
282
283 def test_maintain_pools_handles_preemptive_rebuild(self):
284 selector, _ = self._make_selector()
285 mgr = TunnelPoolManager(peer_selector=selector, tunnel_length=2,
286 target_count=2, rebuild_threshold=0.80)
287 # Fill both pools to target
288 for i in range(2):
289 mgr.inbound_pool.add_tunnel(TunnelEntry(
290 tunnel_id=i, hops=[b"\x01" * 32],
291 created_at=time.monotonic(), is_inbound=True))
292 mgr.outbound_pool.add_tunnel(TunnelEntry(
293 tunnel_id=i + 100, hops=[b"\x02" * 32],
294 created_at=time.monotonic(), is_inbound=False))
295 # No builds needed — pools at target
296 configs = mgr.maintain_pools()
297 assert len(configs) == 0
298
299 # Now age one inbound tunnel past threshold
300 mgr.inbound_pool._tunnels[0] = TunnelEntry(
301 tunnel_id=0, hops=[b"\x01" * 32],
302 created_at=time.monotonic() - 500, lifetime_seconds=600.0,
303 is_inbound=True)
304 configs = mgr.maintain_pools()
305 assert len(configs) >= 1 # preemptive rebuild triggered