A Python port of the Invisible Internet Project (I2P)
at main 264 lines 10 kB view raw
1"""Tests for tunnel build orchestration — TunnelBuildExecutor and TunnelManager.""" 2 3import os 4import struct 5import time 6 7import pytest 8 9 10class TestTunnelBuildExecutor: 11 """Tests for TunnelBuildExecutor.""" 12 13 def _make_hop_configs(self, count): 14 """Create hop configs with ElGamal keypairs for testing.""" 15 from i2p_crypto.elgamal import ElGamalEngine 16 from i2p_data.tunnel import TunnelId, HopConfig 17 18 hops = [] 19 keypairs = [] 20 for i in range(count): 21 pub, priv = ElGamalEngine.generate_keypair() 22 keypairs.append((pub, priv)) 23 hop = HopConfig( 24 receive_tunnel_id=TunnelId(1000 + i), 25 send_tunnel_id=TunnelId(2000 + i), 26 receive_key=os.urandom(32), 27 send_key=os.urandom(32), 28 iv_key=os.urandom(32), 29 reply_key=os.urandom(32), 30 reply_iv=os.urandom(16), 31 layer_key=os.urandom(32), 32 ) 33 hops.append(hop) 34 return hops, keypairs 35 36 def test_build_tunnel_creates_8_records(self): 37 """build_tunnel must produce a TunnelBuildMessage with exactly 8 records.""" 38 from i2p_tunnel.build_executor import TunnelBuildExecutor 39 from i2p_data.i2np_tunnel import TunnelBuildMessage 40 41 hops, keypairs = self._make_hop_configs(3) 42 public_keys = [kp[0] for kp in keypairs] 43 44 executor = TunnelBuildExecutor() 45 msg = executor.build_tunnel(hops, public_keys, is_inbound=False) 46 47 assert isinstance(msg, TunnelBuildMessage) 48 assert len(msg.records) == 8 49 for rec in msg.records: 50 assert len(rec) == 528 51 52 def test_build_tunnel_single_hop(self): 53 """build_tunnel works with a single hop tunnel.""" 54 from i2p_tunnel.build_executor import TunnelBuildExecutor 55 56 hops, keypairs = self._make_hop_configs(1) 57 public_keys = [kp[0] for kp in keypairs] 58 59 executor = TunnelBuildExecutor() 60 msg = executor.build_tunnel(hops, public_keys, is_inbound=True) 61 62 assert len(msg.records) == 8 63 64 def test_build_tunnel_fills_unused_slots_with_random(self): 65 """Unused record slots (beyond hop count) should be random, not all zeros.""" 66 from i2p_tunnel.build_executor import TunnelBuildExecutor 67 68 hops, keypairs = self._make_hop_configs(2) 69 public_keys = [kp[0] for kp in keypairs] 70 71 executor = TunnelBuildExecutor() 72 msg = executor.build_tunnel(hops, public_keys, is_inbound=False) 73 74 # Records beyond the 2 hops should not be all zeros 75 for i in range(2, 8): 76 assert msg.records[i] != b"\x00" * 528 77 78 def test_process_reply_all_accepted_returns_tunnel_entry(self): 79 """When all hops accept (status=0), process_reply returns a TunnelEntry.""" 80 from i2p_tunnel.build_executor import TunnelBuildExecutor 81 from i2p_tunnel.builder import BuildReplyRecord, TunnelEntry 82 from i2p_data.i2np_tunnel import TunnelBuildReplyMessage 83 from i2p_crypto.aes import AESEngine 84 85 hops, keypairs = self._make_hop_configs(3) 86 87 # Build accepted reply records, AES-encrypted with each hop's reply_key/iv 88 encrypted_records = [] 89 for hop in hops: 90 reply = BuildReplyRecord(status=0, reply_data=os.urandom(495)) 91 plaintext = reply.to_bytes() 92 # Pad plaintext to multiple of 16 if needed 93 pad_len = (16 - len(plaintext) % 16) % 16 94 padded = plaintext + b"\x00" * pad_len 95 iv = hop.reply_iv[:16] 96 encrypted = AESEngine.encrypt(padded, hop.reply_key, iv) 97 # Pad to 528 bytes 98 encrypted = encrypted.ljust(528, b"\x00") 99 encrypted_records.append(encrypted) 100 101 # Fill remaining slots with random 102 for _ in range(5): 103 encrypted_records.append(os.urandom(528)) 104 105 reply_msg = TunnelBuildReplyMessage(encrypted_records) 106 107 executor = TunnelBuildExecutor() 108 result = executor.process_reply(reply_msg, hops) 109 110 assert result is not None 111 assert isinstance(result, TunnelEntry) 112 assert result.length == 3 113 114 def test_process_reply_one_rejected_returns_none(self): 115 """When any hop rejects (status != 0), process_reply returns None.""" 116 from i2p_tunnel.build_executor import TunnelBuildExecutor 117 from i2p_tunnel.builder import BuildReplyRecord 118 from i2p_data.i2np_tunnel import TunnelBuildReplyMessage 119 from i2p_crypto.aes import AESEngine 120 121 hops, keypairs = self._make_hop_configs(3) 122 123 encrypted_records = [] 124 for i, hop in enumerate(hops): 125 # Second hop rejects 126 status = 30 if i == 1 else 0 127 reply = BuildReplyRecord(status=status, reply_data=os.urandom(495)) 128 plaintext = reply.to_bytes() 129 pad_len = (16 - len(plaintext) % 16) % 16 130 padded = plaintext + b"\x00" * pad_len 131 iv = hop.reply_iv[:16] 132 encrypted = AESEngine.encrypt(padded, hop.reply_key, iv) 133 encrypted = encrypted.ljust(528, b"\x00") 134 encrypted_records.append(encrypted) 135 136 for _ in range(5): 137 encrypted_records.append(os.urandom(528)) 138 139 reply_msg = TunnelBuildReplyMessage(encrypted_records) 140 141 executor = TunnelBuildExecutor() 142 result = executor.process_reply(reply_msg, hops) 143 144 assert result is None 145 146 def test_process_reply_empty_hops_returns_entry(self): 147 """With zero hops to check, process_reply should still return a TunnelEntry.""" 148 from i2p_tunnel.build_executor import TunnelBuildExecutor 149 from i2p_data.i2np_tunnel import TunnelBuildReplyMessage 150 151 records = [os.urandom(528) for _ in range(8)] 152 reply_msg = TunnelBuildReplyMessage(records) 153 154 executor = TunnelBuildExecutor() 155 result = executor.process_reply(reply_msg, []) 156 157 assert result is not None 158 assert result.length == 0 159 160 161class TestTunnelManager: 162 """Tests for TunnelManager pool maintenance.""" 163 164 def _make_entry(self, tunnel_id_val, creation_ms, expiration_ms): 165 from i2p_tunnel.builder import TunnelEntry 166 from i2p_data.tunnel import TunnelId 167 return TunnelEntry( 168 tunnel_id=TunnelId(tunnel_id_val), 169 gateway=os.urandom(32), 170 length=3, 171 creation_time=creation_ms, 172 expiration=expiration_ms, 173 ) 174 175 def test_initial_counts_zero(self): 176 from i2p_tunnel.build_executor import TunnelManager 177 mgr = TunnelManager(target_inbound=3, target_outbound=3) 178 assert mgr.inbound_count() == 0 179 assert mgr.outbound_count() == 0 180 181 def test_add_inbound_tunnel(self): 182 from i2p_tunnel.build_executor import TunnelManager 183 mgr = TunnelManager() 184 now = int(time.time() * 1000) 185 entry = self._make_entry(1, now, now + 600_000) 186 mgr.add_tunnel(entry, is_inbound=True) 187 assert mgr.inbound_count() == 1 188 assert mgr.outbound_count() == 0 189 190 def test_add_outbound_tunnel(self): 191 from i2p_tunnel.build_executor import TunnelManager 192 mgr = TunnelManager() 193 now = int(time.time() * 1000) 194 entry = self._make_entry(2, now, now + 600_000) 195 mgr.add_tunnel(entry, is_inbound=False) 196 assert mgr.outbound_count() == 1 197 assert mgr.inbound_count() == 0 198 199 def test_needs_more_when_below_target(self): 200 from i2p_tunnel.build_executor import TunnelManager 201 mgr = TunnelManager(target_inbound=2, target_outbound=2) 202 now = int(time.time() * 1000) 203 204 assert mgr.needs_more_inbound() is True 205 assert mgr.needs_more_outbound() is True 206 207 mgr.add_tunnel(self._make_entry(1, now, now + 600_000), is_inbound=True) 208 mgr.add_tunnel(self._make_entry(2, now, now + 600_000), is_inbound=True) 209 assert mgr.needs_more_inbound() is False 210 211 mgr.add_tunnel(self._make_entry(3, now, now + 600_000), is_inbound=False) 212 assert mgr.needs_more_outbound() is True 213 mgr.add_tunnel(self._make_entry(4, now, now + 600_000), is_inbound=False) 214 assert mgr.needs_more_outbound() is False 215 216 def test_remove_expired(self): 217 from i2p_tunnel.build_executor import TunnelManager 218 mgr = TunnelManager() 219 now = int(time.time() * 1000) 220 221 # One expired, one fresh 222 expired_entry = self._make_entry(10, now - 700_000, now - 100_000) 223 fresh_entry = self._make_entry(11, now, now + 600_000) 224 mgr.add_tunnel(expired_entry, is_inbound=True) 225 mgr.add_tunnel(fresh_entry, is_inbound=True) 226 227 assert mgr.inbound_count() == 2 228 mgr.remove_expired(now) 229 assert mgr.inbound_count() == 1 230 231 def test_remove_expired_outbound(self): 232 from i2p_tunnel.build_executor import TunnelManager 233 mgr = TunnelManager() 234 now = int(time.time() * 1000) 235 236 expired = self._make_entry(20, now - 700_000, now - 100_000) 237 mgr.add_tunnel(expired, is_inbound=False) 238 assert mgr.outbound_count() == 1 239 mgr.remove_expired(now) 240 assert mgr.outbound_count() == 0 241 242 def test_get_inbound_tunnels(self): 243 from i2p_tunnel.build_executor import TunnelManager 244 mgr = TunnelManager() 245 now = int(time.time() * 1000) 246 247 e1 = self._make_entry(1, now, now + 600_000) 248 e2 = self._make_entry(2, now, now + 600_000) 249 mgr.add_tunnel(e1, is_inbound=True) 250 mgr.add_tunnel(e2, is_inbound=True) 251 252 tunnels = mgr.get_inbound_tunnels() 253 assert len(tunnels) == 2 254 255 def test_get_outbound_tunnels(self): 256 from i2p_tunnel.build_executor import TunnelManager 257 mgr = TunnelManager() 258 now = int(time.time() * 1000) 259 260 e1 = self._make_entry(1, now, now + 600_000) 261 mgr.add_tunnel(e1, is_inbound=False) 262 263 tunnels = mgr.get_outbound_tunnels() 264 assert len(tunnels) == 1