A Python port of the Invisible Internet Project (I2P)
at main 205 lines 7.2 kB view raw
1"""Tests for tunnel data processing wiring. 2 3Covers: TunnelCryptoRegistry, TunnelDataHandler. 4""" 5 6import os 7 8import pytest 9 10from i2p_tunnel.data_handler import TunnelCryptoRegistry, TunnelDataHandler 11from i2p_tunnel.crypto import ( 12 TunnelLayerDecryptor, 13 TunnelLayerEncryptor, 14 OutboundTunnelEncryptor, 15) 16 17 18# --------------------------------------------------------------------------- 19# Helpers 20# --------------------------------------------------------------------------- 21 22def _random_key() -> bytes: 23 """32-byte random AES key.""" 24 return os.urandom(32) 25 26 27def _random_iv_key() -> bytes: 28 """32-byte random IV key (first 16 bytes used as IV).""" 29 return os.urandom(32) 30 31 32TUNNEL_DATA_SIZE = 1024 33 34 35# --------------------------------------------------------------------------- 36# TunnelCryptoRegistry 37# --------------------------------------------------------------------------- 38 39class TestTunnelCryptoRegistry: 40 """Registry for tunnel crypto keys.""" 41 42 def test_register_and_get_keys(self): 43 """Register a tunnel and retrieve its keys.""" 44 registry = TunnelCryptoRegistry() 45 layer_key = _random_key() 46 iv_key = _random_iv_key() 47 48 registry.register(42, layer_key, iv_key, is_endpoint=True) 49 result = registry.get_keys(42) 50 51 assert result is not None 52 assert result == (layer_key, iv_key, True) 53 54 def test_unknown_tunnel_returns_none(self): 55 """Looking up an unregistered tunnel returns None.""" 56 registry = TunnelCryptoRegistry() 57 assert registry.get_keys(999) is None 58 59 def test_remove_tunnel(self): 60 """Removing a tunnel makes it unknown.""" 61 registry = TunnelCryptoRegistry() 62 layer_key = _random_key() 63 iv_key = _random_iv_key() 64 65 registry.register(10, layer_key, iv_key) 66 assert registry.get_keys(10) is not None 67 68 registry.remove(10) 69 assert registry.get_keys(10) is None 70 71 def test_registered_tunnels_lists_all(self): 72 """registered_tunnels returns all registered tunnel IDs.""" 73 registry = TunnelCryptoRegistry() 74 registry.register(1, _random_key(), _random_iv_key()) 75 registry.register(2, _random_key(), _random_iv_key(), is_endpoint=True) 76 registry.register(3, _random_key(), _random_iv_key()) 77 78 ids = registry.registered_tunnels() 79 assert sorted(ids) == [1, 2, 3] 80 81 def test_register_defaults_not_endpoint(self): 82 """is_endpoint defaults to False.""" 83 registry = TunnelCryptoRegistry() 84 registry.register(7, _random_key(), _random_iv_key()) 85 result = registry.get_keys(7) 86 assert result is not None 87 assert result[2] is False 88 89 90# --------------------------------------------------------------------------- 91# TunnelDataHandler — inbound 92# --------------------------------------------------------------------------- 93 94class TestTunnelDataHandlerInbound: 95 """Inbound data processing: decrypt one layer and route.""" 96 97 def test_endpoint_tunnel_delivers(self): 98 """Endpoint tunnel decrypts one layer and returns deliver action.""" 99 registry = TunnelCryptoRegistry() 100 handler = TunnelDataHandler(registry) 101 102 layer_key = _random_key() 103 iv_key = _random_iv_key() 104 registry.register(100, layer_key, iv_key, is_endpoint=True) 105 106 plaintext = os.urandom(TUNNEL_DATA_SIZE) 107 encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) 108 109 result = handler.handle_inbound(100, encrypted) 110 111 assert result["action"] == "deliver" 112 assert result["data"] == plaintext 113 114 def test_intermediate_tunnel_forwards(self): 115 """Intermediate tunnel decrypts one layer and returns forward action.""" 116 registry = TunnelCryptoRegistry() 117 handler = TunnelDataHandler(registry) 118 119 layer_key = _random_key() 120 iv_key = _random_iv_key() 121 registry.register(200, layer_key, iv_key, is_endpoint=False) 122 123 plaintext = os.urandom(TUNNEL_DATA_SIZE) 124 encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key) 125 126 result = handler.handle_inbound(200, encrypted) 127 128 assert result["action"] == "forward" 129 assert result["data"] == plaintext 130 131 def test_unknown_tunnel_returns_unknown_action(self): 132 """Unknown tunnel ID returns unknown action with the tunnel_id.""" 133 registry = TunnelCryptoRegistry() 134 handler = TunnelDataHandler(registry) 135 136 result = handler.handle_inbound(999, b"\x00" * TUNNEL_DATA_SIZE) 137 138 assert result["action"] == "unknown" 139 assert result["tunnel_id"] == 999 140 141 142# --------------------------------------------------------------------------- 143# TunnelDataHandler — outbound 144# --------------------------------------------------------------------------- 145 146class TestTunnelDataHandlerOutbound: 147 """Outbound data processing: encrypt all layers.""" 148 149 def test_outbound_encrypts_all_layers(self): 150 """Outbound encrypt applies all hop layers in reverse order.""" 151 registry = TunnelCryptoRegistry() 152 handler = TunnelDataHandler(registry) 153 154 plaintext = os.urandom(TUNNEL_DATA_SIZE) 155 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] 156 157 encrypted = handler.handle_outbound(plaintext, hop_keys) 158 159 # Verify by peeling layers in forward order 160 data = encrypted 161 for layer_key, iv_key in hop_keys: 162 data = TunnelLayerDecryptor.decrypt_layer(data, layer_key, iv_key) 163 assert data == plaintext 164 165 166# --------------------------------------------------------------------------- 167# Full roundtrip 168# --------------------------------------------------------------------------- 169 170class TestFullRoundtrip: 171 """Outbound encrypt -> per-hop inbound decrypt -> endpoint deliver.""" 172 173 def test_three_hop_roundtrip(self): 174 """Encrypt outbound, then simulate each hop handling inbound.""" 175 plaintext = os.urandom(TUNNEL_DATA_SIZE) 176 177 # 3 hops: gateway (0), middle (1), endpoint (2) 178 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)] 179 180 # Register all hops in a registry 181 registry = TunnelCryptoRegistry() 182 tunnel_ids = [10, 20, 30] 183 for i, (tid, (lk, ik)) in enumerate(zip(tunnel_ids, hop_keys)): 184 registry.register(tid, lk, ik, is_endpoint=(i == 2)) 185 186 handler = TunnelDataHandler(registry) 187 188 # Outbound encrypt 189 encrypted = handler.handle_outbound(plaintext, hop_keys) 190 assert encrypted != plaintext 191 192 # Hop 0 (gateway) — intermediate, forwards 193 result = handler.handle_inbound(tunnel_ids[0], encrypted) 194 assert result["action"] == "forward" 195 data = result["data"] 196 197 # Hop 1 (middle) — intermediate, forwards 198 result = handler.handle_inbound(tunnel_ids[1], data) 199 assert result["action"] == "forward" 200 data = result["data"] 201 202 # Hop 2 (endpoint) — delivers original plaintext 203 result = handler.handle_inbound(tunnel_ids[2], data) 204 assert result["action"] == "deliver" 205 assert result["data"] == plaintext