A Python port of the Invisible Internet Project (I2P)
1"""Tests for tunnel build orchestration — TunnelBuildExecutor and TunnelManager."""
2
3import os
4import struct
5import time
6
7import pytest
8
9
10class TestTunnelBuildExecutor:
11 """Tests for TunnelBuildExecutor."""
12
13 def _make_hop_configs(self, count):
14 """Create hop configs with ElGamal keypairs for testing."""
15 from i2p_crypto.elgamal import ElGamalEngine
16 from i2p_data.tunnel import TunnelId, HopConfig
17
18 hops = []
19 keypairs = []
20 for i in range(count):
21 pub, priv = ElGamalEngine.generate_keypair()
22 keypairs.append((pub, priv))
23 hop = HopConfig(
24 receive_tunnel_id=TunnelId(1000 + i),
25 send_tunnel_id=TunnelId(2000 + i),
26 receive_key=os.urandom(32),
27 send_key=os.urandom(32),
28 iv_key=os.urandom(32),
29 reply_key=os.urandom(32),
30 reply_iv=os.urandom(16),
31 layer_key=os.urandom(32),
32 )
33 hops.append(hop)
34 return hops, keypairs
35
36 def test_build_tunnel_creates_8_records(self):
37 """build_tunnel must produce a TunnelBuildMessage with exactly 8 records."""
38 from i2p_tunnel.build_executor import TunnelBuildExecutor
39 from i2p_data.i2np_tunnel import TunnelBuildMessage
40
41 hops, keypairs = self._make_hop_configs(3)
42 public_keys = [kp[0] for kp in keypairs]
43
44 executor = TunnelBuildExecutor()
45 msg = executor.build_tunnel(hops, public_keys, is_inbound=False)
46
47 assert isinstance(msg, TunnelBuildMessage)
48 assert len(msg.records) == 8
49 for rec in msg.records:
50 assert len(rec) == 528
51
52 def test_build_tunnel_single_hop(self):
53 """build_tunnel works with a single hop tunnel."""
54 from i2p_tunnel.build_executor import TunnelBuildExecutor
55
56 hops, keypairs = self._make_hop_configs(1)
57 public_keys = [kp[0] for kp in keypairs]
58
59 executor = TunnelBuildExecutor()
60 msg = executor.build_tunnel(hops, public_keys, is_inbound=True)
61
62 assert len(msg.records) == 8
63
64 def test_build_tunnel_fills_unused_slots_with_random(self):
65 """Unused record slots (beyond hop count) should be random, not all zeros."""
66 from i2p_tunnel.build_executor import TunnelBuildExecutor
67
68 hops, keypairs = self._make_hop_configs(2)
69 public_keys = [kp[0] for kp in keypairs]
70
71 executor = TunnelBuildExecutor()
72 msg = executor.build_tunnel(hops, public_keys, is_inbound=False)
73
74 # Records beyond the 2 hops should not be all zeros
75 for i in range(2, 8):
76 assert msg.records[i] != b"\x00" * 528
77
78 def test_process_reply_all_accepted_returns_tunnel_entry(self):
79 """When all hops accept (status=0), process_reply returns a TunnelEntry."""
80 from i2p_tunnel.build_executor import TunnelBuildExecutor
81 from i2p_tunnel.builder import BuildReplyRecord, TunnelEntry
82 from i2p_data.i2np_tunnel import TunnelBuildReplyMessage
83 from i2p_crypto.aes import AESEngine
84
85 hops, keypairs = self._make_hop_configs(3)
86
87 # Build accepted reply records, AES-encrypted with each hop's reply_key/iv
88 encrypted_records = []
89 for hop in hops:
90 reply = BuildReplyRecord(status=0, reply_data=os.urandom(495))
91 plaintext = reply.to_bytes()
92 # Pad plaintext to multiple of 16 if needed
93 pad_len = (16 - len(plaintext) % 16) % 16
94 padded = plaintext + b"\x00" * pad_len
95 iv = hop.reply_iv[:16]
96 encrypted = AESEngine.encrypt(padded, hop.reply_key, iv)
97 # Pad to 528 bytes
98 encrypted = encrypted.ljust(528, b"\x00")
99 encrypted_records.append(encrypted)
100
101 # Fill remaining slots with random
102 for _ in range(5):
103 encrypted_records.append(os.urandom(528))
104
105 reply_msg = TunnelBuildReplyMessage(encrypted_records)
106
107 executor = TunnelBuildExecutor()
108 result = executor.process_reply(reply_msg, hops)
109
110 assert result is not None
111 assert isinstance(result, TunnelEntry)
112 assert result.length == 3
113
114 def test_process_reply_one_rejected_returns_none(self):
115 """When any hop rejects (status != 0), process_reply returns None."""
116 from i2p_tunnel.build_executor import TunnelBuildExecutor
117 from i2p_tunnel.builder import BuildReplyRecord
118 from i2p_data.i2np_tunnel import TunnelBuildReplyMessage
119 from i2p_crypto.aes import AESEngine
120
121 hops, keypairs = self._make_hop_configs(3)
122
123 encrypted_records = []
124 for i, hop in enumerate(hops):
125 # Second hop rejects
126 status = 30 if i == 1 else 0
127 reply = BuildReplyRecord(status=status, reply_data=os.urandom(495))
128 plaintext = reply.to_bytes()
129 pad_len = (16 - len(plaintext) % 16) % 16
130 padded = plaintext + b"\x00" * pad_len
131 iv = hop.reply_iv[:16]
132 encrypted = AESEngine.encrypt(padded, hop.reply_key, iv)
133 encrypted = encrypted.ljust(528, b"\x00")
134 encrypted_records.append(encrypted)
135
136 for _ in range(5):
137 encrypted_records.append(os.urandom(528))
138
139 reply_msg = TunnelBuildReplyMessage(encrypted_records)
140
141 executor = TunnelBuildExecutor()
142 result = executor.process_reply(reply_msg, hops)
143
144 assert result is None
145
146 def test_process_reply_empty_hops_returns_entry(self):
147 """With zero hops to check, process_reply should still return a TunnelEntry."""
148 from i2p_tunnel.build_executor import TunnelBuildExecutor
149 from i2p_data.i2np_tunnel import TunnelBuildReplyMessage
150
151 records = [os.urandom(528) for _ in range(8)]
152 reply_msg = TunnelBuildReplyMessage(records)
153
154 executor = TunnelBuildExecutor()
155 result = executor.process_reply(reply_msg, [])
156
157 assert result is not None
158 assert result.length == 0
159
160
161class TestTunnelManager:
162 """Tests for TunnelManager pool maintenance."""
163
164 def _make_entry(self, tunnel_id_val, creation_ms, expiration_ms):
165 from i2p_tunnel.builder import TunnelEntry
166 from i2p_data.tunnel import TunnelId
167 return TunnelEntry(
168 tunnel_id=TunnelId(tunnel_id_val),
169 gateway=os.urandom(32),
170 length=3,
171 creation_time=creation_ms,
172 expiration=expiration_ms,
173 )
174
175 def test_initial_counts_zero(self):
176 from i2p_tunnel.build_executor import TunnelManager
177 mgr = TunnelManager(target_inbound=3, target_outbound=3)
178 assert mgr.inbound_count() == 0
179 assert mgr.outbound_count() == 0
180
181 def test_add_inbound_tunnel(self):
182 from i2p_tunnel.build_executor import TunnelManager
183 mgr = TunnelManager()
184 now = int(time.time() * 1000)
185 entry = self._make_entry(1, now, now + 600_000)
186 mgr.add_tunnel(entry, is_inbound=True)
187 assert mgr.inbound_count() == 1
188 assert mgr.outbound_count() == 0
189
190 def test_add_outbound_tunnel(self):
191 from i2p_tunnel.build_executor import TunnelManager
192 mgr = TunnelManager()
193 now = int(time.time() * 1000)
194 entry = self._make_entry(2, now, now + 600_000)
195 mgr.add_tunnel(entry, is_inbound=False)
196 assert mgr.outbound_count() == 1
197 assert mgr.inbound_count() == 0
198
199 def test_needs_more_when_below_target(self):
200 from i2p_tunnel.build_executor import TunnelManager
201 mgr = TunnelManager(target_inbound=2, target_outbound=2)
202 now = int(time.time() * 1000)
203
204 assert mgr.needs_more_inbound() is True
205 assert mgr.needs_more_outbound() is True
206
207 mgr.add_tunnel(self._make_entry(1, now, now + 600_000), is_inbound=True)
208 mgr.add_tunnel(self._make_entry(2, now, now + 600_000), is_inbound=True)
209 assert mgr.needs_more_inbound() is False
210
211 mgr.add_tunnel(self._make_entry(3, now, now + 600_000), is_inbound=False)
212 assert mgr.needs_more_outbound() is True
213 mgr.add_tunnel(self._make_entry(4, now, now + 600_000), is_inbound=False)
214 assert mgr.needs_more_outbound() is False
215
216 def test_remove_expired(self):
217 from i2p_tunnel.build_executor import TunnelManager
218 mgr = TunnelManager()
219 now = int(time.time() * 1000)
220
221 # One expired, one fresh
222 expired_entry = self._make_entry(10, now - 700_000, now - 100_000)
223 fresh_entry = self._make_entry(11, now, now + 600_000)
224 mgr.add_tunnel(expired_entry, is_inbound=True)
225 mgr.add_tunnel(fresh_entry, is_inbound=True)
226
227 assert mgr.inbound_count() == 2
228 mgr.remove_expired(now)
229 assert mgr.inbound_count() == 1
230
231 def test_remove_expired_outbound(self):
232 from i2p_tunnel.build_executor import TunnelManager
233 mgr = TunnelManager()
234 now = int(time.time() * 1000)
235
236 expired = self._make_entry(20, now - 700_000, now - 100_000)
237 mgr.add_tunnel(expired, is_inbound=False)
238 assert mgr.outbound_count() == 1
239 mgr.remove_expired(now)
240 assert mgr.outbound_count() == 0
241
242 def test_get_inbound_tunnels(self):
243 from i2p_tunnel.build_executor import TunnelManager
244 mgr = TunnelManager()
245 now = int(time.time() * 1000)
246
247 e1 = self._make_entry(1, now, now + 600_000)
248 e2 = self._make_entry(2, now, now + 600_000)
249 mgr.add_tunnel(e1, is_inbound=True)
250 mgr.add_tunnel(e2, is_inbound=True)
251
252 tunnels = mgr.get_inbound_tunnels()
253 assert len(tunnels) == 2
254
255 def test_get_outbound_tunnels(self):
256 from i2p_tunnel.build_executor import TunnelManager
257 mgr = TunnelManager()
258 now = int(time.time() * 1000)
259
260 e1 = self._make_entry(1, now, now + 600_000)
261 mgr.add_tunnel(e1, is_inbound=False)
262
263 tunnels = mgr.get_outbound_tunnels()
264 assert len(tunnels) == 1