"""Tests for tunnel data processing wiring. Covers: TunnelCryptoRegistry, TunnelDataHandler. """ import os import pytest from i2p_tunnel.data_handler import TunnelCryptoRegistry, TunnelDataHandler from i2p_tunnel.crypto import ( TunnelLayerDecryptor, TunnelLayerEncryptor, OutboundTunnelEncryptor, ) # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _random_key() -> bytes: """32-byte random AES key.""" return os.urandom(32) def _random_iv_key() -> bytes: """32-byte random IV key (first 16 bytes used as IV).""" return os.urandom(32) TUNNEL_DATA_SIZE = 1024 # --------------------------------------------------------------------------- # TunnelCryptoRegistry # --------------------------------------------------------------------------- class TestTunnelCryptoRegistry: """Registry for tunnel crypto keys.""" def test_register_and_get_keys(self): """Register a tunnel and retrieve its keys.""" registry = TunnelCryptoRegistry() layer_key = _random_key() iv_key = _random_iv_key() registry.register(42, layer_key, iv_key, is_endpoint=True) result = registry.get_keys(42) assert result is not None assert result == (layer_key, iv_key, True) def test_unknown_tunnel_returns_none(self): """Looking up an unregistered tunnel returns None.""" registry = TunnelCryptoRegistry() assert registry.get_keys(999) is None def test_remove_tunnel(self): """Removing a tunnel makes it unknown.""" registry = TunnelCryptoRegistry() layer_key = _random_key() iv_key = _random_iv_key() registry.register(10, layer_key, iv_key) assert registry.get_keys(10) is not None registry.remove(10) assert registry.get_keys(10) is None def test_registered_tunnels_lists_all(self): """registered_tunnels returns all registered tunnel IDs.""" registry = TunnelCryptoRegistry() registry.register(1, _random_key(), _random_iv_key()) registry.register(2, _random_key(), _random_iv_key(), is_endpoint=True) registry.register(3, _random_key(), _random_iv_key()) ids = registry.registered_tunnels() assert sorted(ids) == [1, 2, 3] def test_register_defaults_not_endpoint(self): """is_endpoint defaults to False.""" registry = TunnelCryptoRegistry() registry.register(7, _random_key(), _random_iv_key()) result = registry.get_keys(7) assert result is not None assert result[2] is False # --------------------------------------------------------------------------- # TunnelDataHandler — inbound # --------------------------------------------------------------------------- class TestTunnelDataHandlerInbound: """Inbound data processing: decrypt one layer and route.""" def test_endpoint_tunnel_delivers(self): """Endpoint tunnel decrypts one layer and returns deliver action.""" registry = TunnelCryptoRegistry() handler = TunnelDataHandler(registry) layer_key = _random_key() iv_key = _random_iv_key() registry.register(100, layer_key, iv_key, is_endpoint=True) plaintext = os.urandom(TUNNEL_DATA_SIZE) encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) result = handler.handle_inbound(100, encrypted) assert result["action"] == "deliver" assert result["data"] == plaintext def test_intermediate_tunnel_forwards(self): """Intermediate tunnel decrypts one layer and returns forward action.""" registry = TunnelCryptoRegistry() handler = TunnelDataHandler(registry) layer_key = _random_key() iv_key = _random_iv_key() registry.register(200, layer_key, iv_key, is_endpoint=False) plaintext = os.urandom(TUNNEL_DATA_SIZE) encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) result = handler.handle_inbound(200, encrypted) assert result["action"] == "forward" assert result["data"] == plaintext def test_unknown_tunnel_returns_unknown_action(self): """Unknown tunnel ID returns unknown action with the tunnel_id.""" registry = TunnelCryptoRegistry() handler = TunnelDataHandler(registry) result = handler.handle_inbound(999, b"\x00" * TUNNEL_DATA_SIZE) assert result["action"] == "unknown" assert result["tunnel_id"] == 999 # --------------------------------------------------------------------------- # TunnelDataHandler — outbound # --------------------------------------------------------------------------- class TestTunnelDataHandlerOutbound: """Outbound data processing: encrypt all layers.""" def test_outbound_encrypts_all_layers(self): """Outbound encrypt applies all hop layers in reverse order.""" registry = TunnelCryptoRegistry() handler = TunnelDataHandler(registry) plaintext = os.urandom(TUNNEL_DATA_SIZE) hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] encrypted = handler.handle_outbound(plaintext, hop_keys) # Verify by peeling layers in forward order data = encrypted for layer_key, iv_key in hop_keys: data = TunnelLayerDecryptor.decrypt_layer(data, layer_key, iv_key) assert data == plaintext # --------------------------------------------------------------------------- # Full roundtrip # --------------------------------------------------------------------------- class TestFullRoundtrip: """Outbound encrypt -> per-hop inbound decrypt -> endpoint deliver.""" def test_three_hop_roundtrip(self): """Encrypt outbound, then simulate each hop handling inbound.""" plaintext = os.urandom(TUNNEL_DATA_SIZE) # 3 hops: gateway (0), middle (1), endpoint (2) hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] # Register all hops in a registry registry = TunnelCryptoRegistry() tunnel_ids = [10, 20, 30] for i, (tid, (lk, ik)) in enumerate(zip(tunnel_ids, hop_keys)): registry.register(tid, lk, ik, is_endpoint=(i == 2)) handler = TunnelDataHandler(registry) # Outbound encrypt encrypted = handler.handle_outbound(plaintext, hop_keys) assert encrypted != plaintext # Hop 0 (gateway) — intermediate, forwards result = handler.handle_inbound(tunnel_ids[0], encrypted) assert result["action"] == "forward" data = result["data"] # Hop 1 (middle) — intermediate, forwards result = handler.handle_inbound(tunnel_ids[1], data) assert result["action"] == "forward" data = result["data"] # Hop 2 (endpoint) — delivers original plaintext result = handler.handle_inbound(tunnel_ids[2], data) assert result["action"] == "deliver" assert result["data"] == plaintext