"""Tests for tunnel build pipeline wiring — TunnelBuildPipeline.""" import os import time import pytest from i2p_data.tunnel import TunnelId, HopConfig from i2p_tunnel.build_executor import TunnelBuildExecutor, TunnelManager from i2p_tunnel.data_handler import TunnelCryptoRegistry from i2p_tunnel.builder import TunnelEntry from i2p_peer.profile import PeerProfile, PeerSelector def _make_peer_profiles(count): """Create peer profiles with random router hashes.""" profiles = [] for _ in range(count): profile = PeerProfile(router_hash=os.urandom(32)) profiles.append(profile) return profiles class TestBuildTunnel: """Tests for TunnelBuildPipeline.build_tunnel.""" def test_build_tunnel_generates_configs_with_random_keys(self): """build_tunnel must generate HopConfigs with random keys for each peer.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager() registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) peers = _make_peer_profiles(3) result = pipeline.build_tunnel(peers, is_inbound=True) assert "hop_configs" in result assert "build_msg" in result hop_configs = result["hop_configs"] assert len(hop_configs) == 3 # Each hop config must have non-empty random keys for hop in hop_configs: assert isinstance(hop, HopConfig) assert len(hop.layer_key) == 32 assert len(hop.iv_key) == 32 assert len(hop.reply_key) == 32 assert len(hop.reply_iv) == 16 # Keys must differ between hops (random, not shared) keys = [hop.layer_key for hop in hop_configs] assert len(set(keys)) == 3, "Each hop should have a unique layer_key" def test_build_tunnel_outbound(self): """build_tunnel works for outbound tunnels.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager() registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) peers = _make_peer_profiles(2) result = pipeline.build_tunnel(peers, is_inbound=False) assert len(result["hop_configs"]) == 2 assert result["build_msg"] is not None class TestProcessReply: """Tests for TunnelBuildPipeline.process_reply.""" def _make_hop_configs(self, count): """Create HopConfigs with random keys.""" configs = [] for i in range(count): hop = HopConfig( receive_tunnel_id=TunnelId(1000 + i), send_tunnel_id=TunnelId(2000 + i), receive_key=os.urandom(32), send_key=os.urandom(32), iv_key=os.urandom(32), reply_key=os.urandom(32), reply_iv=os.urandom(16), layer_key=os.urandom(32), ) configs.append(hop) return configs def test_process_reply_accepted_creates_tunnel_entry(self): """When all hops accept, process_reply returns a TunnelEntry.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager() registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) hop_configs = self._make_hop_configs(3) now_ms = int(time.time() * 1000) expiration_ms = now_ms + 600_000 # Simulate all-accepted reply reply_result = {"accepted": True} tunnel_id = 42 entry = pipeline.process_reply( reply_result, hop_configs, tunnel_id, is_inbound=True, expiration_ms=expiration_ms, ) assert entry is not None assert isinstance(entry, TunnelEntry) assert int(entry.tunnel_id) == 42 assert entry.length == 3 assert entry.expiration == expiration_ms def test_process_reply_accepted_registers_keys_in_crypto_registry(self): """Accepted reply registers each hop's keys in the crypto registry.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager() registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) hop_configs = self._make_hop_configs(3) now_ms = int(time.time() * 1000) expiration_ms = now_ms + 600_000 reply_result = {"accepted": True} tunnel_id = 55 pipeline.process_reply( reply_result, hop_configs, tunnel_id, is_inbound=True, expiration_ms=expiration_ms, ) # Each hop's receive_tunnel_id should be registered for hop in hop_configs: tid = int(hop.receive_tunnel_id) keys = registry.get_keys(tid) assert keys is not None, f"Tunnel {tid} not registered" layer_key, iv_key, is_endpoint = keys assert layer_key == hop.layer_key assert iv_key == hop.iv_key def test_process_reply_accepted_adds_to_tunnel_manager(self): """Accepted reply adds tunnel to the manager's pool.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager() registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) hop_configs = self._make_hop_configs(2) now_ms = int(time.time() * 1000) expiration_ms = now_ms + 600_000 reply_result = {"accepted": True} # Inbound pipeline.process_reply( reply_result, hop_configs, tunnel_id=10, is_inbound=True, expiration_ms=expiration_ms, ) assert manager.inbound_count() == 1 assert manager.outbound_count() == 0 # Outbound hop_configs2 = self._make_hop_configs(2) pipeline.process_reply( reply_result, hop_configs2, tunnel_id=11, is_inbound=False, expiration_ms=expiration_ms, ) assert manager.outbound_count() == 1 def test_process_reply_rejected_returns_none(self): """When any hop rejects, process_reply returns None.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager() registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) hop_configs = self._make_hop_configs(3) now_ms = int(time.time() * 1000) reply_result = {"accepted": False} entry = pipeline.process_reply( reply_result, hop_configs, tunnel_id=99, is_inbound=True, expiration_ms=now_ms + 600_000, ) assert entry is None # Nothing should be registered or added assert manager.inbound_count() == 0 assert len(registry.registered_tunnels()) == 0 class TestMaintainPools: """Tests for TunnelBuildPipeline.maintain_pools.""" def test_maintain_pools_reports_needs_correctly(self): """maintain_pools reports pool status relative to targets.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager(target_inbound=2, target_outbound=2) registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) status = pipeline.maintain_pools() assert status["needs_inbound"] is True assert status["needs_outbound"] is True assert status["inbound_count"] == 0 assert status["outbound_count"] == 0 def test_maintain_pools_satisfied_after_adding(self): """After adding enough tunnels, maintain_pools shows no needs.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager(target_inbound=1, target_outbound=1) registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) now_ms = int(time.time() * 1000) entry_in = TunnelEntry( tunnel_id=TunnelId(1), gateway=os.urandom(32), length=3, creation_time=now_ms, expiration=now_ms + 600_000, ) entry_out = TunnelEntry( tunnel_id=TunnelId(2), gateway=os.urandom(32), length=3, creation_time=now_ms, expiration=now_ms + 600_000, ) manager.add_tunnel(entry_in, is_inbound=True) manager.add_tunnel(entry_out, is_inbound=False) status = pipeline.maintain_pools() assert status["needs_inbound"] is False assert status["needs_outbound"] is False assert status["inbound_count"] == 1 assert status["outbound_count"] == 1 class TestFullLifecycle: """Full lifecycle: build -> accept -> registered and pooled.""" def test_full_lifecycle_build_accept_registered_and_pooled(self): """End-to-end: build tunnel, process accepted reply, verify registered and pooled.""" from i2p_tunnel.build_pipeline import TunnelBuildPipeline executor = TunnelBuildExecutor() manager = TunnelManager(target_inbound=2, target_outbound=2) registry = TunnelCryptoRegistry() pipeline = TunnelBuildPipeline(executor, manager, registry) # Step 1: Build peers = _make_peer_profiles(3) build_result = pipeline.build_tunnel(peers, is_inbound=True) hop_configs = build_result["hop_configs"] assert len(hop_configs) == 3 # Step 2: Simulate accepted reply now_ms = int(time.time() * 1000) expiration_ms = now_ms + 600_000 reply_result = {"accepted": True} tunnel_id = 777 entry = pipeline.process_reply( reply_result, hop_configs, tunnel_id, is_inbound=True, expiration_ms=expiration_ms, ) # Step 3: Verify entry created assert entry is not None assert int(entry.tunnel_id) == 777 assert entry.length == 3 # Step 4: Verify keys registered for hop in hop_configs: tid = int(hop.receive_tunnel_id) keys = registry.get_keys(tid) assert keys is not None assert keys[0] == hop.layer_key assert keys[1] == hop.iv_key # Step 5: Verify added to pool assert manager.inbound_count() == 1 # Step 6: Pool status reflects the addition status = pipeline.maintain_pools() assert status["needs_inbound"] is True # 1 < target 2 assert status["inbound_count"] == 1