A Python port of the Invisible Internet Project (I2P)
at main 183 lines 5.4 kB view raw
1"""Tunnel hop encryption and decryption. 2 3Ported from: 4 net.i2p.router.tunnel.TunnelCryptoUtil 5 net.i2p.router.tunnel.OutboundTunnelEndpoint 6 net.i2p.router.tunnel.InboundEndpointProcessor 7 8Provides layer encryption for tunnel data (AES-256-CBC) and 9ElGamal-based build record encryption. 10""" 11 12from __future__ import annotations 13 14import hashlib 15 16from i2p_crypto.aes import AESEngine 17from i2p_crypto.elgamal import ElGamalEngine 18 19 20def _derive_iv(iv_key: bytes) -> bytes: 21 """Derive a 16-byte IV from an iv_key. 22 23 Uses the first 16 bytes of the iv_key directly, matching I2P's 24 tunnel layer encryption behaviour. 25 """ 26 return iv_key[:16] 27 28 29class TunnelLayerEncryptor: 30 """Encrypt a single tunnel layer using AES-256-CBC.""" 31 32 @staticmethod 33 def encrypt_layer(data: bytes, layer_key: bytes, iv_key: bytes) -> bytes: 34 """AES-256-CBC encrypt *data* using *layer_key*. 35 36 Parameters 37 ---------- 38 data: 39 Plaintext, must be a multiple of 16 bytes (typically 1024). 40 layer_key: 41 32-byte AES key for this layer. 42 iv_key: 43 Key from which the 16-byte IV is derived (first 16 bytes used). 44 45 Returns 46 ------- 47 bytes 48 Ciphertext of the same length as *data*. 49 """ 50 iv = _derive_iv(iv_key) 51 return AESEngine.encrypt(data, layer_key, iv) 52 53 54class TunnelLayerDecryptor: 55 """Decrypt a single tunnel layer using AES-256-CBC.""" 56 57 @staticmethod 58 def decrypt_layer(data: bytes, layer_key: bytes, iv_key: bytes) -> bytes: 59 """AES-256-CBC decrypt *data* using *layer_key*. 60 61 Reverse of :meth:`TunnelLayerEncryptor.encrypt_layer`. 62 """ 63 iv = _derive_iv(iv_key) 64 return AESEngine.decrypt(data, layer_key, iv) 65 66 67class OutboundTunnelEncryptor: 68 """Encrypt tunnel data for an outbound tunnel (multiple hops). 69 70 Layers are applied in **reverse** order: the endpoint's layer is 71 applied first, then the next-to-last hop, and so on, finishing with 72 the gateway's layer. Each hop in the tunnel will decrypt (peel) one 73 layer in forward order, revealing the plaintext at the endpoint. 74 """ 75 76 @staticmethod 77 def encrypt(data: bytes, hop_keys: list[tuple[bytes, bytes]]) -> bytes: 78 """Apply encryption layers in reverse hop order. 79 80 Parameters 81 ---------- 82 data: 83 Plaintext tunnel data (must be multiple of 16 bytes). 84 hop_keys: 85 List of ``(layer_key, iv_key)`` tuples ordered from gateway 86 to endpoint. 87 88 Returns 89 ------- 90 bytes 91 Fully-encrypted tunnel data. 92 """ 93 result = data 94 for layer_key, iv_key in reversed(hop_keys): 95 result = TunnelLayerEncryptor.encrypt_layer(result, layer_key, iv_key) 96 return result 97 98 99class InboundTunnelDecryptor: 100 """Decrypt tunnel data for an inbound tunnel (multiple hops). 101 102 Each hop in an inbound tunnel encrypts a layer (forward order). 103 The tunnel endpoint decrypts all layers in forward order to recover 104 the plaintext. 105 """ 106 107 @staticmethod 108 def decrypt(data: bytes, hop_keys: list[tuple[bytes, bytes]]) -> bytes: 109 """Remove all encryption layers in forward hop order. 110 111 Parameters 112 ---------- 113 data: 114 Encrypted tunnel data with all layers applied. 115 hop_keys: 116 List of ``(layer_key, iv_key)`` tuples ordered from first 117 hop to last hop. 118 119 Returns 120 ------- 121 bytes 122 Decrypted plaintext. 123 """ 124 result = data 125 for layer_key, iv_key in reversed(hop_keys): 126 result = TunnelLayerDecryptor.decrypt_layer(result, layer_key, iv_key) 127 return result 128 129 130class BuildRecordEncryptor: 131 """Encrypt a tunnel build request record using ElGamal. 132 133 The 222-byte ``BuildRecord`` serialization is ElGamal-encrypted to 134 the target hop's public key, producing 514 bytes of ciphertext which 135 is then zero-padded to 528 bytes (I2P's fixed record slot size). 136 """ 137 138 @staticmethod 139 def encrypt_record(record_bytes: bytes, peer_public_key: bytes) -> bytes: 140 """ElGamal-encrypt a build record. 141 142 Parameters 143 ---------- 144 record_bytes: 145 222-byte serialized ``BuildRecord``. 146 peer_public_key: 147 256-byte ElGamal public key of the target hop. 148 149 Returns 150 ------- 151 bytes 152 528-byte encrypted record (514 ciphertext + 14 zero padding). 153 """ 154 ciphertext = ElGamalEngine.encrypt(record_bytes, peer_public_key) 155 # ciphertext is 514 bytes; pad to 528 156 return ciphertext.ljust(528, b"\x00") 157 158 159class BuildReplyDecryptor: 160 """Decrypt a tunnel build reply record using AES-256-CBC.""" 161 162 @staticmethod 163 def decrypt_reply( 164 encrypted_reply: bytes, reply_key: bytes, reply_iv: bytes 165 ) -> bytes: 166 """AES-decrypt a build reply record. 167 168 Parameters 169 ---------- 170 encrypted_reply: 171 Encrypted reply data (multiple of 16 bytes). 172 reply_key: 173 32-byte AES key provided in the original build record. 174 reply_iv: 175 IV bytes; first 16 bytes are used as the AES IV. 176 177 Returns 178 ------- 179 bytes 180 Decrypted reply record. 181 """ 182 iv = reply_iv[:16] 183 return AESEngine.decrypt(encrypted_reply, reply_key, iv)