"""Tunnel data processing wiring. Ported from: net.i2p.router.tunnel.TunnelDataHandler (conceptual) Connects tunnel crypto operations to tunnel routing decisions: the TunnelCryptoRegistry stores per-tunnel keys, and the TunnelDataHandler uses those keys to decrypt inbound data (peeling one layer per hop) and encrypt outbound data (applying all layers for all hops). """ from __future__ import annotations from i2p_tunnel.crypto import TunnelLayerDecryptor, OutboundTunnelEncryptor class TunnelCryptoRegistry: """Registry mapping tunnel IDs to their crypto keys. Each tunnel we participate in has a layer key, an IV key, and a flag indicating whether we are the endpoint of that tunnel. """ def __init__(self) -> None: # tunnel_id -> (layer_key, iv_key, is_endpoint) self._tunnels: dict[int, tuple[bytes, bytes, bool]] = {} def register( self, tunnel_id: int, layer_key: bytes, iv_key: bytes, is_endpoint: bool = False, ) -> None: """Store crypto keys for a tunnel we participate in. Parameters ---------- tunnel_id: The numeric tunnel identifier. layer_key: 32-byte AES key for this tunnel's layer encryption. iv_key: Key from which the 16-byte IV is derived. is_endpoint: True if we are the endpoint of this tunnel. """ self._tunnels[tunnel_id] = (layer_key, iv_key, is_endpoint) def get_keys(self, tunnel_id: int) -> tuple[bytes, bytes, bool] | None: """Return (layer_key, iv_key, is_endpoint) or None if unknown.""" return self._tunnels.get(tunnel_id) def remove(self, tunnel_id: int) -> None: """Unregister a tunnel, discarding its keys.""" self._tunnels.pop(tunnel_id, None) def registered_tunnels(self) -> list[int]: """Return a list of all registered tunnel IDs.""" return list(self._tunnels.keys()) class TunnelDataHandler: """Process tunnel data: decrypt inbound, encrypt outbound. Inbound processing peels one encryption layer using the keys registered for the given tunnel ID and returns a routing decision (deliver at endpoint, forward to next hop, or unknown tunnel). Outbound processing applies all encryption layers for a set of hop keys, producing fully-encrypted tunnel data ready to send. """ def __init__(self, crypto_registry: TunnelCryptoRegistry) -> None: self._registry = crypto_registry def handle_inbound(self, tunnel_id: int, encrypted_data: bytes) -> dict: """Decrypt one layer of inbound tunnel data and decide routing. Parameters ---------- tunnel_id: The tunnel ID this data arrived on. encrypted_data: The encrypted tunnel data (multiple of 16 bytes). Returns ------- dict One of: - ``{"action": "deliver", "data": }`` if we are the endpoint - ``{"action": "forward", "data": }`` if we are intermediate - ``{"action": "unknown", "tunnel_id": }`` if tunnel is unknown """ keys = self._registry.get_keys(tunnel_id) if keys is None: return {"action": "unknown", "tunnel_id": tunnel_id} layer_key, iv_key, is_endpoint = keys decrypted = TunnelLayerDecryptor.decrypt_layer( encrypted_data, layer_key, iv_key ) if is_endpoint: return {"action": "deliver", "data": decrypted} return {"action": "forward", "data": decrypted} def handle_outbound( self, data: bytes, hop_keys: list[tuple[bytes, bytes]] ) -> bytes: """Encrypt data for all hops of an outbound tunnel. Parameters ---------- data: Plaintext tunnel data (multiple of 16 bytes). hop_keys: List of ``(layer_key, iv_key)`` tuples ordered from gateway to endpoint. Returns ------- bytes Fully-encrypted tunnel data. """ return OutboundTunnelEncryptor.encrypt(data, hop_keys)