"""Tests for tunnel build orchestration — TunnelBuildExecutor and TunnelManager.""" import os import struct import time import pytest class TestTunnelBuildExecutor: """Tests for TunnelBuildExecutor.""" def _make_hop_configs(self, count): """Create hop configs with ElGamal keypairs for testing.""" from i2p_crypto.elgamal import ElGamalEngine from i2p_data.tunnel import TunnelId, HopConfig hops = [] keypairs = [] for i in range(count): pub, priv = ElGamalEngine.generate_keypair() keypairs.append((pub, priv)) hop = HopConfig( receive_tunnel_id=TunnelId(1000 + i), send_tunnel_id=TunnelId(2000 + i), receive_key=os.urandom(32), send_key=os.urandom(32), iv_key=os.urandom(32), reply_key=os.urandom(32), reply_iv=os.urandom(16), layer_key=os.urandom(32), ) hops.append(hop) return hops, keypairs def test_build_tunnel_creates_8_records(self): """build_tunnel must produce a TunnelBuildMessage with exactly 8 records.""" from i2p_tunnel.build_executor import TunnelBuildExecutor from i2p_data.i2np_tunnel import TunnelBuildMessage hops, keypairs = self._make_hop_configs(3) public_keys = [kp[0] for kp in keypairs] executor = TunnelBuildExecutor() msg = executor.build_tunnel(hops, public_keys, is_inbound=False) assert isinstance(msg, TunnelBuildMessage) assert len(msg.records) == 8 for rec in msg.records: assert len(rec) == 528 def test_build_tunnel_single_hop(self): """build_tunnel works with a single hop tunnel.""" from i2p_tunnel.build_executor import TunnelBuildExecutor hops, keypairs = self._make_hop_configs(1) public_keys = [kp[0] for kp in keypairs] executor = TunnelBuildExecutor() msg = executor.build_tunnel(hops, public_keys, is_inbound=True) assert len(msg.records) == 8 def test_build_tunnel_fills_unused_slots_with_random(self): """Unused record slots (beyond hop count) should be random, not all zeros.""" from i2p_tunnel.build_executor import TunnelBuildExecutor hops, keypairs = self._make_hop_configs(2) public_keys = [kp[0] for kp in keypairs] executor = TunnelBuildExecutor() msg = executor.build_tunnel(hops, public_keys, is_inbound=False) # Records beyond the 2 hops should not be all zeros for i in range(2, 8): assert msg.records[i] != b"\x00" * 528 def test_process_reply_all_accepted_returns_tunnel_entry(self): """When all hops accept (status=0), process_reply returns a TunnelEntry.""" from i2p_tunnel.build_executor import TunnelBuildExecutor from i2p_tunnel.builder import BuildReplyRecord, TunnelEntry from i2p_data.i2np_tunnel import TunnelBuildReplyMessage from i2p_crypto.aes import AESEngine hops, keypairs = self._make_hop_configs(3) # Build accepted reply records, AES-encrypted with each hop's reply_key/iv encrypted_records = [] for hop in hops: reply = BuildReplyRecord(status=0, reply_data=os.urandom(495)) plaintext = reply.to_bytes() # Pad plaintext to multiple of 16 if needed pad_len = (16 - len(plaintext) % 16) % 16 padded = plaintext + b"\x00" * pad_len iv = hop.reply_iv[:16] encrypted = AESEngine.encrypt(padded, hop.reply_key, iv) # Pad to 528 bytes encrypted = encrypted.ljust(528, b"\x00") encrypted_records.append(encrypted) # Fill remaining slots with random for _ in range(5): encrypted_records.append(os.urandom(528)) reply_msg = TunnelBuildReplyMessage(encrypted_records) executor = TunnelBuildExecutor() result = executor.process_reply(reply_msg, hops) assert result is not None assert isinstance(result, TunnelEntry) assert result.length == 3 def test_process_reply_one_rejected_returns_none(self): """When any hop rejects (status != 0), process_reply returns None.""" from i2p_tunnel.build_executor import TunnelBuildExecutor from i2p_tunnel.builder import BuildReplyRecord from i2p_data.i2np_tunnel import TunnelBuildReplyMessage from i2p_crypto.aes import AESEngine hops, keypairs = self._make_hop_configs(3) encrypted_records = [] for i, hop in enumerate(hops): # Second hop rejects status = 30 if i == 1 else 0 reply = BuildReplyRecord(status=status, reply_data=os.urandom(495)) plaintext = reply.to_bytes() pad_len = (16 - len(plaintext) % 16) % 16 padded = plaintext + b"\x00" * pad_len iv = hop.reply_iv[:16] encrypted = AESEngine.encrypt(padded, hop.reply_key, iv) encrypted = encrypted.ljust(528, b"\x00") encrypted_records.append(encrypted) for _ in range(5): encrypted_records.append(os.urandom(528)) reply_msg = TunnelBuildReplyMessage(encrypted_records) executor = TunnelBuildExecutor() result = executor.process_reply(reply_msg, hops) assert result is None def test_process_reply_empty_hops_returns_entry(self): """With zero hops to check, process_reply should still return a TunnelEntry.""" from i2p_tunnel.build_executor import TunnelBuildExecutor from i2p_data.i2np_tunnel import TunnelBuildReplyMessage records = [os.urandom(528) for _ in range(8)] reply_msg = TunnelBuildReplyMessage(records) executor = TunnelBuildExecutor() result = executor.process_reply(reply_msg, []) assert result is not None assert result.length == 0 class TestTunnelManager: """Tests for TunnelManager pool maintenance.""" def _make_entry(self, tunnel_id_val, creation_ms, expiration_ms): from i2p_tunnel.builder import TunnelEntry from i2p_data.tunnel import TunnelId return TunnelEntry( tunnel_id=TunnelId(tunnel_id_val), gateway=os.urandom(32), length=3, creation_time=creation_ms, expiration=expiration_ms, ) def test_initial_counts_zero(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager(target_inbound=3, target_outbound=3) assert mgr.inbound_count() == 0 assert mgr.outbound_count() == 0 def test_add_inbound_tunnel(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager() now = int(time.time() * 1000) entry = self._make_entry(1, now, now + 600_000) mgr.add_tunnel(entry, is_inbound=True) assert mgr.inbound_count() == 1 assert mgr.outbound_count() == 0 def test_add_outbound_tunnel(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager() now = int(time.time() * 1000) entry = self._make_entry(2, now, now + 600_000) mgr.add_tunnel(entry, is_inbound=False) assert mgr.outbound_count() == 1 assert mgr.inbound_count() == 0 def test_needs_more_when_below_target(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager(target_inbound=2, target_outbound=2) now = int(time.time() * 1000) assert mgr.needs_more_inbound() is True assert mgr.needs_more_outbound() is True mgr.add_tunnel(self._make_entry(1, now, now + 600_000), is_inbound=True) mgr.add_tunnel(self._make_entry(2, now, now + 600_000), is_inbound=True) assert mgr.needs_more_inbound() is False mgr.add_tunnel(self._make_entry(3, now, now + 600_000), is_inbound=False) assert mgr.needs_more_outbound() is True mgr.add_tunnel(self._make_entry(4, now, now + 600_000), is_inbound=False) assert mgr.needs_more_outbound() is False def test_remove_expired(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager() now = int(time.time() * 1000) # One expired, one fresh expired_entry = self._make_entry(10, now - 700_000, now - 100_000) fresh_entry = self._make_entry(11, now, now + 600_000) mgr.add_tunnel(expired_entry, is_inbound=True) mgr.add_tunnel(fresh_entry, is_inbound=True) assert mgr.inbound_count() == 2 mgr.remove_expired(now) assert mgr.inbound_count() == 1 def test_remove_expired_outbound(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager() now = int(time.time() * 1000) expired = self._make_entry(20, now - 700_000, now - 100_000) mgr.add_tunnel(expired, is_inbound=False) assert mgr.outbound_count() == 1 mgr.remove_expired(now) assert mgr.outbound_count() == 0 def test_get_inbound_tunnels(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager() now = int(time.time() * 1000) e1 = self._make_entry(1, now, now + 600_000) e2 = self._make_entry(2, now, now + 600_000) mgr.add_tunnel(e1, is_inbound=True) mgr.add_tunnel(e2, is_inbound=True) tunnels = mgr.get_inbound_tunnels() assert len(tunnels) == 2 def test_get_outbound_tunnels(self): from i2p_tunnel.build_executor import TunnelManager mgr = TunnelManager() now = int(time.time() * 1000) e1 = self._make_entry(1, now, now + 600_000) mgr.add_tunnel(e1, is_inbound=False) tunnels = mgr.get_outbound_tunnels() assert len(tunnels) == 1