"""Tunnel hop encryption and decryption. Ported from: net.i2p.router.tunnel.TunnelCryptoUtil net.i2p.router.tunnel.OutboundTunnelEndpoint net.i2p.router.tunnel.InboundEndpointProcessor Provides layer encryption for tunnel data (AES-256-CBC) and ElGamal-based build record encryption. """ from __future__ import annotations import hashlib from i2p_crypto.aes import AESEngine from i2p_crypto.elgamal import ElGamalEngine def _derive_iv(iv_key: bytes) -> bytes: """Derive a 16-byte IV from an iv_key. Uses the first 16 bytes of the iv_key directly, matching I2P's tunnel layer encryption behaviour. """ return iv_key[:16] class TunnelLayerEncryptor: """Encrypt a single tunnel layer using AES-256-CBC.""" @staticmethod def encrypt_layer(data: bytes, layer_key: bytes, iv_key: bytes) -> bytes: """AES-256-CBC encrypt *data* using *layer_key*. Parameters ---------- data: Plaintext, must be a multiple of 16 bytes (typically 1024). layer_key: 32-byte AES key for this layer. iv_key: Key from which the 16-byte IV is derived (first 16 bytes used). Returns ------- bytes Ciphertext of the same length as *data*. """ iv = _derive_iv(iv_key) return AESEngine.encrypt(data, layer_key, iv) class TunnelLayerDecryptor: """Decrypt a single tunnel layer using AES-256-CBC.""" @staticmethod def decrypt_layer(data: bytes, layer_key: bytes, iv_key: bytes) -> bytes: """AES-256-CBC decrypt *data* using *layer_key*. Reverse of :meth:`TunnelLayerEncryptor.encrypt_layer`. """ iv = _derive_iv(iv_key) return AESEngine.decrypt(data, layer_key, iv) class OutboundTunnelEncryptor: """Encrypt tunnel data for an outbound tunnel (multiple hops). Layers are applied in **reverse** order: the endpoint's layer is applied first, then the next-to-last hop, and so on, finishing with the gateway's layer. Each hop in the tunnel will decrypt (peel) one layer in forward order, revealing the plaintext at the endpoint. """ @staticmethod def encrypt(data: bytes, hop_keys: list[tuple[bytes, bytes]]) -> bytes: """Apply encryption layers in reverse hop order. Parameters ---------- data: Plaintext tunnel data (must be multiple of 16 bytes). hop_keys: List of ``(layer_key, iv_key)`` tuples ordered from gateway to endpoint. Returns ------- bytes Fully-encrypted tunnel data. """ result = data for layer_key, iv_key in reversed(hop_keys): result = TunnelLayerEncryptor.encrypt_layer(result, layer_key, iv_key) return result class InboundTunnelDecryptor: """Decrypt tunnel data for an inbound tunnel (multiple hops). Each hop in an inbound tunnel encrypts a layer (forward order). The tunnel endpoint decrypts all layers in forward order to recover the plaintext. """ @staticmethod def decrypt(data: bytes, hop_keys: list[tuple[bytes, bytes]]) -> bytes: """Remove all encryption layers in forward hop order. Parameters ---------- data: Encrypted tunnel data with all layers applied. hop_keys: List of ``(layer_key, iv_key)`` tuples ordered from first hop to last hop. Returns ------- bytes Decrypted plaintext. """ result = data for layer_key, iv_key in reversed(hop_keys): result = TunnelLayerDecryptor.decrypt_layer(result, layer_key, iv_key) return result class BuildRecordEncryptor: """Encrypt a tunnel build request record using ElGamal. The 222-byte ``BuildRecord`` serialization is ElGamal-encrypted to the target hop's public key, producing 514 bytes of ciphertext which is then zero-padded to 528 bytes (I2P's fixed record slot size). """ @staticmethod def encrypt_record(record_bytes: bytes, peer_public_key: bytes) -> bytes: """ElGamal-encrypt a build record. Parameters ---------- record_bytes: 222-byte serialized ``BuildRecord``. peer_public_key: 256-byte ElGamal public key of the target hop. Returns ------- bytes 528-byte encrypted record (514 ciphertext + 14 zero padding). """ ciphertext = ElGamalEngine.encrypt(record_bytes, peer_public_key) # ciphertext is 514 bytes; pad to 528 return ciphertext.ljust(528, b"\x00") class BuildReplyDecryptor: """Decrypt a tunnel build reply record using AES-256-CBC.""" @staticmethod def decrypt_reply( encrypted_reply: bytes, reply_key: bytes, reply_iv: bytes ) -> bytes: """AES-decrypt a build reply record. Parameters ---------- encrypted_reply: Encrypted reply data (multiple of 16 bytes). reply_key: 32-byte AES key provided in the original build record. reply_iv: IV bytes; first 16 bytes are used as the AES IV. Returns ------- bytes Decrypted reply record. """ iv = reply_iv[:16] return AESEngine.decrypt(encrypted_reply, reply_key, iv)