A Python port of the Invisible Internet Project (I2P)
1"""Tests for tunnel build pipeline wiring — TunnelBuildPipeline."""
2
3import os
4import time
5
6import pytest
7
8from i2p_data.tunnel import TunnelId, HopConfig
9from i2p_tunnel.build_executor import TunnelBuildExecutor, TunnelManager
10from i2p_tunnel.data_handler import TunnelCryptoRegistry
11from i2p_tunnel.builder import TunnelEntry
12from i2p_peer.profile import PeerProfile, PeerSelector
13
14
15def _make_peer_profiles(count):
16 """Create peer profiles with random router hashes."""
17 profiles = []
18 for _ in range(count):
19 profile = PeerProfile(router_hash=os.urandom(32))
20 profiles.append(profile)
21 return profiles
22
23
24class TestBuildTunnel:
25 """Tests for TunnelBuildPipeline.build_tunnel."""
26
27 def test_build_tunnel_generates_configs_with_random_keys(self):
28 """build_tunnel must generate HopConfigs with random keys for each peer."""
29 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
30
31 executor = TunnelBuildExecutor()
32 manager = TunnelManager()
33 registry = TunnelCryptoRegistry()
34
35 pipeline = TunnelBuildPipeline(executor, manager, registry)
36
37 peers = _make_peer_profiles(3)
38 result = pipeline.build_tunnel(peers, is_inbound=True)
39
40 assert "hop_configs" in result
41 assert "build_msg" in result
42
43 hop_configs = result["hop_configs"]
44 assert len(hop_configs) == 3
45
46 # Each hop config must have non-empty random keys
47 for hop in hop_configs:
48 assert isinstance(hop, HopConfig)
49 assert len(hop.layer_key) == 32
50 assert len(hop.iv_key) == 32
51 assert len(hop.reply_key) == 32
52 assert len(hop.reply_iv) == 16
53
54 # Keys must differ between hops (random, not shared)
55 keys = [hop.layer_key for hop in hop_configs]
56 assert len(set(keys)) == 3, "Each hop should have a unique layer_key"
57
58 def test_build_tunnel_outbound(self):
59 """build_tunnel works for outbound tunnels."""
60 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
61
62 executor = TunnelBuildExecutor()
63 manager = TunnelManager()
64 registry = TunnelCryptoRegistry()
65
66 pipeline = TunnelBuildPipeline(executor, manager, registry)
67
68 peers = _make_peer_profiles(2)
69 result = pipeline.build_tunnel(peers, is_inbound=False)
70
71 assert len(result["hop_configs"]) == 2
72 assert result["build_msg"] is not None
73
74
75class TestProcessReply:
76 """Tests for TunnelBuildPipeline.process_reply."""
77
78 def _make_hop_configs(self, count):
79 """Create HopConfigs with random keys."""
80 configs = []
81 for i in range(count):
82 hop = HopConfig(
83 receive_tunnel_id=TunnelId(1000 + i),
84 send_tunnel_id=TunnelId(2000 + i),
85 receive_key=os.urandom(32),
86 send_key=os.urandom(32),
87 iv_key=os.urandom(32),
88 reply_key=os.urandom(32),
89 reply_iv=os.urandom(16),
90 layer_key=os.urandom(32),
91 )
92 configs.append(hop)
93 return configs
94
95 def test_process_reply_accepted_creates_tunnel_entry(self):
96 """When all hops accept, process_reply returns a TunnelEntry."""
97 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
98
99 executor = TunnelBuildExecutor()
100 manager = TunnelManager()
101 registry = TunnelCryptoRegistry()
102 pipeline = TunnelBuildPipeline(executor, manager, registry)
103
104 hop_configs = self._make_hop_configs(3)
105 now_ms = int(time.time() * 1000)
106 expiration_ms = now_ms + 600_000
107
108 # Simulate all-accepted reply
109 reply_result = {"accepted": True}
110 tunnel_id = 42
111
112 entry = pipeline.process_reply(
113 reply_result, hop_configs, tunnel_id,
114 is_inbound=True, expiration_ms=expiration_ms,
115 )
116
117 assert entry is not None
118 assert isinstance(entry, TunnelEntry)
119 assert int(entry.tunnel_id) == 42
120 assert entry.length == 3
121 assert entry.expiration == expiration_ms
122
123 def test_process_reply_accepted_registers_keys_in_crypto_registry(self):
124 """Accepted reply registers each hop's keys in the crypto registry."""
125 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
126
127 executor = TunnelBuildExecutor()
128 manager = TunnelManager()
129 registry = TunnelCryptoRegistry()
130 pipeline = TunnelBuildPipeline(executor, manager, registry)
131
132 hop_configs = self._make_hop_configs(3)
133 now_ms = int(time.time() * 1000)
134 expiration_ms = now_ms + 600_000
135
136 reply_result = {"accepted": True}
137 tunnel_id = 55
138
139 pipeline.process_reply(
140 reply_result, hop_configs, tunnel_id,
141 is_inbound=True, expiration_ms=expiration_ms,
142 )
143
144 # Each hop's receive_tunnel_id should be registered
145 for hop in hop_configs:
146 tid = int(hop.receive_tunnel_id)
147 keys = registry.get_keys(tid)
148 assert keys is not None, f"Tunnel {tid} not registered"
149 layer_key, iv_key, is_endpoint = keys
150 assert layer_key == hop.layer_key
151 assert iv_key == hop.iv_key
152
153 def test_process_reply_accepted_adds_to_tunnel_manager(self):
154 """Accepted reply adds tunnel to the manager's pool."""
155 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
156
157 executor = TunnelBuildExecutor()
158 manager = TunnelManager()
159 registry = TunnelCryptoRegistry()
160 pipeline = TunnelBuildPipeline(executor, manager, registry)
161
162 hop_configs = self._make_hop_configs(2)
163 now_ms = int(time.time() * 1000)
164 expiration_ms = now_ms + 600_000
165
166 reply_result = {"accepted": True}
167
168 # Inbound
169 pipeline.process_reply(
170 reply_result, hop_configs, tunnel_id=10,
171 is_inbound=True, expiration_ms=expiration_ms,
172 )
173 assert manager.inbound_count() == 1
174 assert manager.outbound_count() == 0
175
176 # Outbound
177 hop_configs2 = self._make_hop_configs(2)
178 pipeline.process_reply(
179 reply_result, hop_configs2, tunnel_id=11,
180 is_inbound=False, expiration_ms=expiration_ms,
181 )
182 assert manager.outbound_count() == 1
183
184 def test_process_reply_rejected_returns_none(self):
185 """When any hop rejects, process_reply returns None."""
186 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
187
188 executor = TunnelBuildExecutor()
189 manager = TunnelManager()
190 registry = TunnelCryptoRegistry()
191 pipeline = TunnelBuildPipeline(executor, manager, registry)
192
193 hop_configs = self._make_hop_configs(3)
194 now_ms = int(time.time() * 1000)
195
196 reply_result = {"accepted": False}
197
198 entry = pipeline.process_reply(
199 reply_result, hop_configs, tunnel_id=99,
200 is_inbound=True, expiration_ms=now_ms + 600_000,
201 )
202
203 assert entry is None
204 # Nothing should be registered or added
205 assert manager.inbound_count() == 0
206 assert len(registry.registered_tunnels()) == 0
207
208
209class TestMaintainPools:
210 """Tests for TunnelBuildPipeline.maintain_pools."""
211
212 def test_maintain_pools_reports_needs_correctly(self):
213 """maintain_pools reports pool status relative to targets."""
214 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
215
216 executor = TunnelBuildExecutor()
217 manager = TunnelManager(target_inbound=2, target_outbound=2)
218 registry = TunnelCryptoRegistry()
219 pipeline = TunnelBuildPipeline(executor, manager, registry)
220
221 status = pipeline.maintain_pools()
222
223 assert status["needs_inbound"] is True
224 assert status["needs_outbound"] is True
225 assert status["inbound_count"] == 0
226 assert status["outbound_count"] == 0
227
228 def test_maintain_pools_satisfied_after_adding(self):
229 """After adding enough tunnels, maintain_pools shows no needs."""
230 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
231
232 executor = TunnelBuildExecutor()
233 manager = TunnelManager(target_inbound=1, target_outbound=1)
234 registry = TunnelCryptoRegistry()
235 pipeline = TunnelBuildPipeline(executor, manager, registry)
236
237 now_ms = int(time.time() * 1000)
238 entry_in = TunnelEntry(
239 tunnel_id=TunnelId(1), gateway=os.urandom(32),
240 length=3, creation_time=now_ms, expiration=now_ms + 600_000,
241 )
242 entry_out = TunnelEntry(
243 tunnel_id=TunnelId(2), gateway=os.urandom(32),
244 length=3, creation_time=now_ms, expiration=now_ms + 600_000,
245 )
246 manager.add_tunnel(entry_in, is_inbound=True)
247 manager.add_tunnel(entry_out, is_inbound=False)
248
249 status = pipeline.maintain_pools()
250
251 assert status["needs_inbound"] is False
252 assert status["needs_outbound"] is False
253 assert status["inbound_count"] == 1
254 assert status["outbound_count"] == 1
255
256
257class TestFullLifecycle:
258 """Full lifecycle: build -> accept -> registered and pooled."""
259
260 def test_full_lifecycle_build_accept_registered_and_pooled(self):
261 """End-to-end: build tunnel, process accepted reply, verify registered and pooled."""
262 from i2p_tunnel.build_pipeline import TunnelBuildPipeline
263
264 executor = TunnelBuildExecutor()
265 manager = TunnelManager(target_inbound=2, target_outbound=2)
266 registry = TunnelCryptoRegistry()
267 pipeline = TunnelBuildPipeline(executor, manager, registry)
268
269 # Step 1: Build
270 peers = _make_peer_profiles(3)
271 build_result = pipeline.build_tunnel(peers, is_inbound=True)
272 hop_configs = build_result["hop_configs"]
273 assert len(hop_configs) == 3
274
275 # Step 2: Simulate accepted reply
276 now_ms = int(time.time() * 1000)
277 expiration_ms = now_ms + 600_000
278 reply_result = {"accepted": True}
279 tunnel_id = 777
280
281 entry = pipeline.process_reply(
282 reply_result, hop_configs, tunnel_id,
283 is_inbound=True, expiration_ms=expiration_ms,
284 )
285
286 # Step 3: Verify entry created
287 assert entry is not None
288 assert int(entry.tunnel_id) == 777
289 assert entry.length == 3
290
291 # Step 4: Verify keys registered
292 for hop in hop_configs:
293 tid = int(hop.receive_tunnel_id)
294 keys = registry.get_keys(tid)
295 assert keys is not None
296 assert keys[0] == hop.layer_key
297 assert keys[1] == hop.iv_key
298
299 # Step 5: Verify added to pool
300 assert manager.inbound_count() == 1
301
302 # Step 6: Pool status reflects the addition
303 status = pipeline.maintain_pools()
304 assert status["needs_inbound"] is True # 1 < target 2
305 assert status["inbound_count"] == 1