A Python port of the Invisible Internet Project (I2P)
1"""Tunnel data processing wiring.
2
3Ported from:
4 net.i2p.router.tunnel.TunnelDataHandler (conceptual)
5
6Connects tunnel crypto operations to tunnel routing decisions:
7the TunnelCryptoRegistry stores per-tunnel keys, and the
8TunnelDataHandler uses those keys to decrypt inbound data
9(peeling one layer per hop) and encrypt outbound data
10(applying all layers for all hops).
11"""
12
13from __future__ import annotations
14
15from i2p_tunnel.crypto import TunnelLayerDecryptor, OutboundTunnelEncryptor
16
17
18class TunnelCryptoRegistry:
19 """Registry mapping tunnel IDs to their crypto keys.
20
21 Each tunnel we participate in has a layer key, an IV key, and
22 a flag indicating whether we are the endpoint of that tunnel.
23 """
24
25 def __init__(self) -> None:
26 # tunnel_id -> (layer_key, iv_key, is_endpoint)
27 self._tunnels: dict[int, tuple[bytes, bytes, bool]] = {}
28
29 def register(
30 self,
31 tunnel_id: int,
32 layer_key: bytes,
33 iv_key: bytes,
34 is_endpoint: bool = False,
35 ) -> None:
36 """Store crypto keys for a tunnel we participate in.
37
38 Parameters
39 ----------
40 tunnel_id:
41 The numeric tunnel identifier.
42 layer_key:
43 32-byte AES key for this tunnel's layer encryption.
44 iv_key:
45 Key from which the 16-byte IV is derived.
46 is_endpoint:
47 True if we are the endpoint of this tunnel.
48 """
49 self._tunnels[tunnel_id] = (layer_key, iv_key, is_endpoint)
50
51 def get_keys(self, tunnel_id: int) -> tuple[bytes, bytes, bool] | None:
52 """Return (layer_key, iv_key, is_endpoint) or None if unknown."""
53 return self._tunnels.get(tunnel_id)
54
55 def remove(self, tunnel_id: int) -> None:
56 """Unregister a tunnel, discarding its keys."""
57 self._tunnels.pop(tunnel_id, None)
58
59 def registered_tunnels(self) -> list[int]:
60 """Return a list of all registered tunnel IDs."""
61 return list(self._tunnels.keys())
62
63
64class TunnelDataHandler:
65 """Process tunnel data: decrypt inbound, encrypt outbound.
66
67 Inbound processing peels one encryption layer using the keys
68 registered for the given tunnel ID and returns a routing decision
69 (deliver at endpoint, forward to next hop, or unknown tunnel).
70
71 Outbound processing applies all encryption layers for a set of
72 hop keys, producing fully-encrypted tunnel data ready to send.
73 """
74
75 def __init__(self, crypto_registry: TunnelCryptoRegistry) -> None:
76 self._registry = crypto_registry
77
78 def handle_inbound(self, tunnel_id: int, encrypted_data: bytes) -> dict:
79 """Decrypt one layer of inbound tunnel data and decide routing.
80
81 Parameters
82 ----------
83 tunnel_id:
84 The tunnel ID this data arrived on.
85 encrypted_data:
86 The encrypted tunnel data (multiple of 16 bytes).
87
88 Returns
89 -------
90 dict
91 One of:
92 - ``{"action": "deliver", "data": <bytes>}`` if we are the endpoint
93 - ``{"action": "forward", "data": <bytes>}`` if we are intermediate
94 - ``{"action": "unknown", "tunnel_id": <int>}`` if tunnel is unknown
95 """
96 keys = self._registry.get_keys(tunnel_id)
97 if keys is None:
98 return {"action": "unknown", "tunnel_id": tunnel_id}
99
100 layer_key, iv_key, is_endpoint = keys
101 decrypted = TunnelLayerDecryptor.decrypt_layer(
102 encrypted_data, layer_key, iv_key
103 )
104
105 if is_endpoint:
106 return {"action": "deliver", "data": decrypted}
107 return {"action": "forward", "data": decrypted}
108
109 def handle_outbound(
110 self, data: bytes, hop_keys: list[tuple[bytes, bytes]]
111 ) -> bytes:
112 """Encrypt data for all hops of an outbound tunnel.
113
114 Parameters
115 ----------
116 data:
117 Plaintext tunnel data (multiple of 16 bytes).
118 hop_keys:
119 List of ``(layer_key, iv_key)`` tuples ordered from
120 gateway to endpoint.
121
122 Returns
123 -------
124 bytes
125 Fully-encrypted tunnel data.
126 """
127 return OutboundTunnelEncryptor.encrypt(data, hop_keys)