A Python port of the Invisible Internet Project (I2P)
at main 305 lines 11 kB view raw
1"""Tests for tunnel build pipeline wiring — TunnelBuildPipeline.""" 2 3import os 4import time 5 6import pytest 7 8from i2p_data.tunnel import TunnelId, HopConfig 9from i2p_tunnel.build_executor import TunnelBuildExecutor, TunnelManager 10from i2p_tunnel.data_handler import TunnelCryptoRegistry 11from i2p_tunnel.builder import TunnelEntry 12from i2p_peer.profile import PeerProfile, PeerSelector 13 14 15def _make_peer_profiles(count): 16 """Create peer profiles with random router hashes.""" 17 profiles = [] 18 for _ in range(count): 19 profile = PeerProfile(router_hash=os.urandom(32)) 20 profiles.append(profile) 21 return profiles 22 23 24class TestBuildTunnel: 25 """Tests for TunnelBuildPipeline.build_tunnel.""" 26 27 def test_build_tunnel_generates_configs_with_random_keys(self): 28 """build_tunnel must generate HopConfigs with random keys for each peer.""" 29 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 30 31 executor = TunnelBuildExecutor() 32 manager = TunnelManager() 33 registry = TunnelCryptoRegistry() 34 35 pipeline = TunnelBuildPipeline(executor, manager, registry) 36 37 peers = _make_peer_profiles(3) 38 result = pipeline.build_tunnel(peers, is_inbound=True) 39 40 assert "hop_configs" in result 41 assert "build_msg" in result 42 43 hop_configs = result["hop_configs"] 44 assert len(hop_configs) == 3 45 46 # Each hop config must have non-empty random keys 47 for hop in hop_configs: 48 assert isinstance(hop, HopConfig) 49 assert len(hop.layer_key) == 32 50 assert len(hop.iv_key) == 32 51 assert len(hop.reply_key) == 32 52 assert len(hop.reply_iv) == 16 53 54 # Keys must differ between hops (random, not shared) 55 keys = [hop.layer_key for hop in hop_configs] 56 assert len(set(keys)) == 3, "Each hop should have a unique layer_key" 57 58 def test_build_tunnel_outbound(self): 59 """build_tunnel works for outbound tunnels.""" 60 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 61 62 executor = TunnelBuildExecutor() 63 manager = TunnelManager() 64 registry = TunnelCryptoRegistry() 65 66 pipeline = TunnelBuildPipeline(executor, manager, registry) 67 68 peers = _make_peer_profiles(2) 69 result = pipeline.build_tunnel(peers, is_inbound=False) 70 71 assert len(result["hop_configs"]) == 2 72 assert result["build_msg"] is not None 73 74 75class TestProcessReply: 76 """Tests for TunnelBuildPipeline.process_reply.""" 77 78 def _make_hop_configs(self, count): 79 """Create HopConfigs with random keys.""" 80 configs = [] 81 for i in range(count): 82 hop = HopConfig( 83 receive_tunnel_id=TunnelId(1000 + i), 84 send_tunnel_id=TunnelId(2000 + i), 85 receive_key=os.urandom(32), 86 send_key=os.urandom(32), 87 iv_key=os.urandom(32), 88 reply_key=os.urandom(32), 89 reply_iv=os.urandom(16), 90 layer_key=os.urandom(32), 91 ) 92 configs.append(hop) 93 return configs 94 95 def test_process_reply_accepted_creates_tunnel_entry(self): 96 """When all hops accept, process_reply returns a TunnelEntry.""" 97 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 98 99 executor = TunnelBuildExecutor() 100 manager = TunnelManager() 101 registry = TunnelCryptoRegistry() 102 pipeline = TunnelBuildPipeline(executor, manager, registry) 103 104 hop_configs = self._make_hop_configs(3) 105 now_ms = int(time.time() * 1000) 106 expiration_ms = now_ms + 600_000 107 108 # Simulate all-accepted reply 109 reply_result = {"accepted": True} 110 tunnel_id = 42 111 112 entry = pipeline.process_reply( 113 reply_result, hop_configs, tunnel_id, 114 is_inbound=True, expiration_ms=expiration_ms, 115 ) 116 117 assert entry is not None 118 assert isinstance(entry, TunnelEntry) 119 assert int(entry.tunnel_id) == 42 120 assert entry.length == 3 121 assert entry.expiration == expiration_ms 122 123 def test_process_reply_accepted_registers_keys_in_crypto_registry(self): 124 """Accepted reply registers each hop's keys in the crypto registry.""" 125 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 126 127 executor = TunnelBuildExecutor() 128 manager = TunnelManager() 129 registry = TunnelCryptoRegistry() 130 pipeline = TunnelBuildPipeline(executor, manager, registry) 131 132 hop_configs = self._make_hop_configs(3) 133 now_ms = int(time.time() * 1000) 134 expiration_ms = now_ms + 600_000 135 136 reply_result = {"accepted": True} 137 tunnel_id = 55 138 139 pipeline.process_reply( 140 reply_result, hop_configs, tunnel_id, 141 is_inbound=True, expiration_ms=expiration_ms, 142 ) 143 144 # Each hop's receive_tunnel_id should be registered 145 for hop in hop_configs: 146 tid = int(hop.receive_tunnel_id) 147 keys = registry.get_keys(tid) 148 assert keys is not None, f"Tunnel {tid} not registered" 149 layer_key, iv_key, is_endpoint = keys 150 assert layer_key == hop.layer_key 151 assert iv_key == hop.iv_key 152 153 def test_process_reply_accepted_adds_to_tunnel_manager(self): 154 """Accepted reply adds tunnel to the manager's pool.""" 155 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 156 157 executor = TunnelBuildExecutor() 158 manager = TunnelManager() 159 registry = TunnelCryptoRegistry() 160 pipeline = TunnelBuildPipeline(executor, manager, registry) 161 162 hop_configs = self._make_hop_configs(2) 163 now_ms = int(time.time() * 1000) 164 expiration_ms = now_ms + 600_000 165 166 reply_result = {"accepted": True} 167 168 # Inbound 169 pipeline.process_reply( 170 reply_result, hop_configs, tunnel_id=10, 171 is_inbound=True, expiration_ms=expiration_ms, 172 ) 173 assert manager.inbound_count() == 1 174 assert manager.outbound_count() == 0 175 176 # Outbound 177 hop_configs2 = self._make_hop_configs(2) 178 pipeline.process_reply( 179 reply_result, hop_configs2, tunnel_id=11, 180 is_inbound=False, expiration_ms=expiration_ms, 181 ) 182 assert manager.outbound_count() == 1 183 184 def test_process_reply_rejected_returns_none(self): 185 """When any hop rejects, process_reply returns None.""" 186 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 187 188 executor = TunnelBuildExecutor() 189 manager = TunnelManager() 190 registry = TunnelCryptoRegistry() 191 pipeline = TunnelBuildPipeline(executor, manager, registry) 192 193 hop_configs = self._make_hop_configs(3) 194 now_ms = int(time.time() * 1000) 195 196 reply_result = {"accepted": False} 197 198 entry = pipeline.process_reply( 199 reply_result, hop_configs, tunnel_id=99, 200 is_inbound=True, expiration_ms=now_ms + 600_000, 201 ) 202 203 assert entry is None 204 # Nothing should be registered or added 205 assert manager.inbound_count() == 0 206 assert len(registry.registered_tunnels()) == 0 207 208 209class TestMaintainPools: 210 """Tests for TunnelBuildPipeline.maintain_pools.""" 211 212 def test_maintain_pools_reports_needs_correctly(self): 213 """maintain_pools reports pool status relative to targets.""" 214 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 215 216 executor = TunnelBuildExecutor() 217 manager = TunnelManager(target_inbound=2, target_outbound=2) 218 registry = TunnelCryptoRegistry() 219 pipeline = TunnelBuildPipeline(executor, manager, registry) 220 221 status = pipeline.maintain_pools() 222 223 assert status["needs_inbound"] is True 224 assert status["needs_outbound"] is True 225 assert status["inbound_count"] == 0 226 assert status["outbound_count"] == 0 227 228 def test_maintain_pools_satisfied_after_adding(self): 229 """After adding enough tunnels, maintain_pools shows no needs.""" 230 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 231 232 executor = TunnelBuildExecutor() 233 manager = TunnelManager(target_inbound=1, target_outbound=1) 234 registry = TunnelCryptoRegistry() 235 pipeline = TunnelBuildPipeline(executor, manager, registry) 236 237 now_ms = int(time.time() * 1000) 238 entry_in = TunnelEntry( 239 tunnel_id=TunnelId(1), gateway=os.urandom(32), 240 length=3, creation_time=now_ms, expiration=now_ms + 600_000, 241 ) 242 entry_out = TunnelEntry( 243 tunnel_id=TunnelId(2), gateway=os.urandom(32), 244 length=3, creation_time=now_ms, expiration=now_ms + 600_000, 245 ) 246 manager.add_tunnel(entry_in, is_inbound=True) 247 manager.add_tunnel(entry_out, is_inbound=False) 248 249 status = pipeline.maintain_pools() 250 251 assert status["needs_inbound"] is False 252 assert status["needs_outbound"] is False 253 assert status["inbound_count"] == 1 254 assert status["outbound_count"] == 1 255 256 257class TestFullLifecycle: 258 """Full lifecycle: build -> accept -> registered and pooled.""" 259 260 def test_full_lifecycle_build_accept_registered_and_pooled(self): 261 """End-to-end: build tunnel, process accepted reply, verify registered and pooled.""" 262 from i2p_tunnel.build_pipeline import TunnelBuildPipeline 263 264 executor = TunnelBuildExecutor() 265 manager = TunnelManager(target_inbound=2, target_outbound=2) 266 registry = TunnelCryptoRegistry() 267 pipeline = TunnelBuildPipeline(executor, manager, registry) 268 269 # Step 1: Build 270 peers = _make_peer_profiles(3) 271 build_result = pipeline.build_tunnel(peers, is_inbound=True) 272 hop_configs = build_result["hop_configs"] 273 assert len(hop_configs) == 3 274 275 # Step 2: Simulate accepted reply 276 now_ms = int(time.time() * 1000) 277 expiration_ms = now_ms + 600_000 278 reply_result = {"accepted": True} 279 tunnel_id = 777 280 281 entry = pipeline.process_reply( 282 reply_result, hop_configs, tunnel_id, 283 is_inbound=True, expiration_ms=expiration_ms, 284 ) 285 286 # Step 3: Verify entry created 287 assert entry is not None 288 assert int(entry.tunnel_id) == 777 289 assert entry.length == 3 290 291 # Step 4: Verify keys registered 292 for hop in hop_configs: 293 tid = int(hop.receive_tunnel_id) 294 keys = registry.get_keys(tid) 295 assert keys is not None 296 assert keys[0] == hop.layer_key 297 assert keys[1] == hop.iv_key 298 299 # Step 5: Verify added to pool 300 assert manager.inbound_count() == 1 301 302 # Step 6: Pool status reflects the addition 303 status = pipeline.maintain_pools() 304 assert status["needs_inbound"] is True # 1 < target 2 305 assert status["inbound_count"] == 1