A Python port of the Invisible Internet Project (I2P)
at main 127 lines 4.2 kB view raw
1"""Tunnel data processing wiring. 2 3Ported from: 4 net.i2p.router.tunnel.TunnelDataHandler (conceptual) 5 6Connects tunnel crypto operations to tunnel routing decisions: 7the TunnelCryptoRegistry stores per-tunnel keys, and the 8TunnelDataHandler uses those keys to decrypt inbound data 9(peeling one layer per hop) and encrypt outbound data 10(applying all layers for all hops). 11""" 12 13from __future__ import annotations 14 15from i2p_tunnel.crypto import TunnelLayerDecryptor, OutboundTunnelEncryptor 16 17 18class TunnelCryptoRegistry: 19 """Registry mapping tunnel IDs to their crypto keys. 20 21 Each tunnel we participate in has a layer key, an IV key, and 22 a flag indicating whether we are the endpoint of that tunnel. 23 """ 24 25 def __init__(self) -> None: 26 # tunnel_id -> (layer_key, iv_key, is_endpoint) 27 self._tunnels: dict[int, tuple[bytes, bytes, bool]] = {} 28 29 def register( 30 self, 31 tunnel_id: int, 32 layer_key: bytes, 33 iv_key: bytes, 34 is_endpoint: bool = False, 35 ) -> None: 36 """Store crypto keys for a tunnel we participate in. 37 38 Parameters 39 ---------- 40 tunnel_id: 41 The numeric tunnel identifier. 42 layer_key: 43 32-byte AES key for this tunnel's layer encryption. 44 iv_key: 45 Key from which the 16-byte IV is derived. 46 is_endpoint: 47 True if we are the endpoint of this tunnel. 48 """ 49 self._tunnels[tunnel_id] = (layer_key, iv_key, is_endpoint) 50 51 def get_keys(self, tunnel_id: int) -> tuple[bytes, bytes, bool] | None: 52 """Return (layer_key, iv_key, is_endpoint) or None if unknown.""" 53 return self._tunnels.get(tunnel_id) 54 55 def remove(self, tunnel_id: int) -> None: 56 """Unregister a tunnel, discarding its keys.""" 57 self._tunnels.pop(tunnel_id, None) 58 59 def registered_tunnels(self) -> list[int]: 60 """Return a list of all registered tunnel IDs.""" 61 return list(self._tunnels.keys()) 62 63 64class TunnelDataHandler: 65 """Process tunnel data: decrypt inbound, encrypt outbound. 66 67 Inbound processing peels one encryption layer using the keys 68 registered for the given tunnel ID and returns a routing decision 69 (deliver at endpoint, forward to next hop, or unknown tunnel). 70 71 Outbound processing applies all encryption layers for a set of 72 hop keys, producing fully-encrypted tunnel data ready to send. 73 """ 74 75 def __init__(self, crypto_registry: TunnelCryptoRegistry) -> None: 76 self._registry = crypto_registry 77 78 def handle_inbound(self, tunnel_id: int, encrypted_data: bytes) -> dict: 79 """Decrypt one layer of inbound tunnel data and decide routing. 80 81 Parameters 82 ---------- 83 tunnel_id: 84 The tunnel ID this data arrived on. 85 encrypted_data: 86 The encrypted tunnel data (multiple of 16 bytes). 87 88 Returns 89 ------- 90 dict 91 One of: 92 - ``{"action": "deliver", "data": <bytes>}`` if we are the endpoint 93 - ``{"action": "forward", "data": <bytes>}`` if we are intermediate 94 - ``{"action": "unknown", "tunnel_id": <int>}`` if tunnel is unknown 95 """ 96 keys = self._registry.get_keys(tunnel_id) 97 if keys is None: 98 return {"action": "unknown", "tunnel_id": tunnel_id} 99 100 layer_key, iv_key, is_endpoint = keys 101 decrypted = TunnelLayerDecryptor.decrypt_layer( 102 encrypted_data, layer_key, iv_key 103 ) 104 105 if is_endpoint: 106 return {"action": "deliver", "data": decrypted} 107 return {"action": "forward", "data": decrypted} 108 109 def handle_outbound( 110 self, data: bytes, hop_keys: list[tuple[bytes, bytes]] 111 ) -> bytes: 112 """Encrypt data for all hops of an outbound tunnel. 113 114 Parameters 115 ---------- 116 data: 117 Plaintext tunnel data (multiple of 16 bytes). 118 hop_keys: 119 List of ``(layer_key, iv_key)`` tuples ordered from 120 gateway to endpoint. 121 122 Returns 123 ------- 124 bytes 125 Fully-encrypted tunnel data. 126 """ 127 return OutboundTunnelEncryptor.encrypt(data, hop_keys)