A Python port of the Invisible Internet Project (I2P)
1"""Tests for tunnel data processing wiring.
2
3Covers: TunnelCryptoRegistry, TunnelDataHandler.
4"""
5
6import os
7
8import pytest
9
10from i2p_tunnel.data_handler import TunnelCryptoRegistry, TunnelDataHandler
11from i2p_tunnel.crypto import (
12 TunnelLayerDecryptor,
13 TunnelLayerEncryptor,
14 OutboundTunnelEncryptor,
15)
16
17
18# ---------------------------------------------------------------------------
19# Helpers
20# ---------------------------------------------------------------------------
21
22def _random_key() -> bytes:
23 """32-byte random AES key."""
24 return os.urandom(32)
25
26
27def _random_iv_key() -> bytes:
28 """32-byte random IV key (first 16 bytes used as IV)."""
29 return os.urandom(32)
30
31
32TUNNEL_DATA_SIZE = 1024
33
34
35# ---------------------------------------------------------------------------
36# TunnelCryptoRegistry
37# ---------------------------------------------------------------------------
38
39class TestTunnelCryptoRegistry:
40 """Registry for tunnel crypto keys."""
41
42 def test_register_and_get_keys(self):
43 """Register a tunnel and retrieve its keys."""
44 registry = TunnelCryptoRegistry()
45 layer_key = _random_key()
46 iv_key = _random_iv_key()
47
48 registry.register(42, layer_key, iv_key, is_endpoint=True)
49 result = registry.get_keys(42)
50
51 assert result is not None
52 assert result == (layer_key, iv_key, True)
53
54 def test_unknown_tunnel_returns_none(self):
55 """Looking up an unregistered tunnel returns None."""
56 registry = TunnelCryptoRegistry()
57 assert registry.get_keys(999) is None
58
59 def test_remove_tunnel(self):
60 """Removing a tunnel makes it unknown."""
61 registry = TunnelCryptoRegistry()
62 layer_key = _random_key()
63 iv_key = _random_iv_key()
64
65 registry.register(10, layer_key, iv_key)
66 assert registry.get_keys(10) is not None
67
68 registry.remove(10)
69 assert registry.get_keys(10) is None
70
71 def test_registered_tunnels_lists_all(self):
72 """registered_tunnels returns all registered tunnel IDs."""
73 registry = TunnelCryptoRegistry()
74 registry.register(1, _random_key(), _random_iv_key())
75 registry.register(2, _random_key(), _random_iv_key(), is_endpoint=True)
76 registry.register(3, _random_key(), _random_iv_key())
77
78 ids = registry.registered_tunnels()
79 assert sorted(ids) == [1, 2, 3]
80
81 def test_register_defaults_not_endpoint(self):
82 """is_endpoint defaults to False."""
83 registry = TunnelCryptoRegistry()
84 registry.register(7, _random_key(), _random_iv_key())
85 result = registry.get_keys(7)
86 assert result is not None
87 assert result[2] is False
88
89
90# ---------------------------------------------------------------------------
91# TunnelDataHandler — inbound
92# ---------------------------------------------------------------------------
93
94class TestTunnelDataHandlerInbound:
95 """Inbound data processing: decrypt one layer and route."""
96
97 def test_endpoint_tunnel_delivers(self):
98 """Endpoint tunnel decrypts one layer and returns deliver action."""
99 registry = TunnelCryptoRegistry()
100 handler = TunnelDataHandler(registry)
101
102 layer_key = _random_key()
103 iv_key = _random_iv_key()
104 registry.register(100, layer_key, iv_key, is_endpoint=True)
105
106 plaintext = os.urandom(TUNNEL_DATA_SIZE)
107 encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key)
108
109 result = handler.handle_inbound(100, encrypted)
110
111 assert result["action"] == "deliver"
112 assert result["data"] == plaintext
113
114 def test_intermediate_tunnel_forwards(self):
115 """Intermediate tunnel decrypts one layer and returns forward action."""
116 registry = TunnelCryptoRegistry()
117 handler = TunnelDataHandler(registry)
118
119 layer_key = _random_key()
120 iv_key = _random_iv_key()
121 registry.register(200, layer_key, iv_key, is_endpoint=False)
122
123 plaintext = os.urandom(TUNNEL_DATA_SIZE)
124 encrypted = TunnelLayerEncryptor.encrypt_layer(plaintext, layer_key, iv_key)
125
126 result = handler.handle_inbound(200, encrypted)
127
128 assert result["action"] == "forward"
129 assert result["data"] == plaintext
130
131 def test_unknown_tunnel_returns_unknown_action(self):
132 """Unknown tunnel ID returns unknown action with the tunnel_id."""
133 registry = TunnelCryptoRegistry()
134 handler = TunnelDataHandler(registry)
135
136 result = handler.handle_inbound(999, b"\x00" * TUNNEL_DATA_SIZE)
137
138 assert result["action"] == "unknown"
139 assert result["tunnel_id"] == 999
140
141
142# ---------------------------------------------------------------------------
143# TunnelDataHandler — outbound
144# ---------------------------------------------------------------------------
145
146class TestTunnelDataHandlerOutbound:
147 """Outbound data processing: encrypt all layers."""
148
149 def test_outbound_encrypts_all_layers(self):
150 """Outbound encrypt applies all hop layers in reverse order."""
151 registry = TunnelCryptoRegistry()
152 handler = TunnelDataHandler(registry)
153
154 plaintext = os.urandom(TUNNEL_DATA_SIZE)
155 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)]
156
157 encrypted = handler.handle_outbound(plaintext, hop_keys)
158
159 # Verify by peeling layers in forward order
160 data = encrypted
161 for layer_key, iv_key in hop_keys:
162 data = TunnelLayerDecryptor.decrypt_layer(data, layer_key, iv_key)
163 assert data == plaintext
164
165
166# ---------------------------------------------------------------------------
167# Full roundtrip
168# ---------------------------------------------------------------------------
169
170class TestFullRoundtrip:
171 """Outbound encrypt -> per-hop inbound decrypt -> endpoint deliver."""
172
173 def test_three_hop_roundtrip(self):
174 """Encrypt outbound, then simulate each hop handling inbound."""
175 plaintext = os.urandom(TUNNEL_DATA_SIZE)
176
177 # 3 hops: gateway (0), middle (1), endpoint (2)
178 hop_keys = [(_random_key(), _random_iv_key()) for _ in range(3)]
179
180 # Register all hops in a registry
181 registry = TunnelCryptoRegistry()
182 tunnel_ids = [10, 20, 30]
183 for i, (tid, (lk, ik)) in enumerate(zip(tunnel_ids, hop_keys)):
184 registry.register(tid, lk, ik, is_endpoint=(i == 2))
185
186 handler = TunnelDataHandler(registry)
187
188 # Outbound encrypt
189 encrypted = handler.handle_outbound(plaintext, hop_keys)
190 assert encrypted != plaintext
191
192 # Hop 0 (gateway) — intermediate, forwards
193 result = handler.handle_inbound(tunnel_ids[0], encrypted)
194 assert result["action"] == "forward"
195 data = result["data"]
196
197 # Hop 1 (middle) — intermediate, forwards
198 result = handler.handle_inbound(tunnel_ids[1], data)
199 assert result["action"] == "forward"
200 data = result["data"]
201
202 # Hop 2 (endpoint) — delivers original plaintext
203 result = handler.handle_inbound(tunnel_ids[2], data)
204 assert result["action"] == "deliver"
205 assert result["data"] == plaintext