A Python port of the Invisible Internet Project (I2P)
1"""Tunnel hop encryption and decryption.
2
3Ported from:
4 net.i2p.router.tunnel.TunnelCryptoUtil
5 net.i2p.router.tunnel.OutboundTunnelEndpoint
6 net.i2p.router.tunnel.InboundEndpointProcessor
7
8Provides layer encryption for tunnel data (AES-256-CBC) and
9ElGamal-based build record encryption.
10"""
11
12from __future__ import annotations
13
14import hashlib
15
16from i2p_crypto.aes import AESEngine
17from i2p_crypto.elgamal import ElGamalEngine
18
19
20def _derive_iv(iv_key: bytes) -> bytes:
21 """Derive a 16-byte IV from an iv_key.
22
23 Uses the first 16 bytes of the iv_key directly, matching I2P's
24 tunnel layer encryption behaviour.
25 """
26 return iv_key[:16]
27
28
29class TunnelLayerEncryptor:
30 """Encrypt a single tunnel layer using AES-256-CBC."""
31
32 @staticmethod
33 def encrypt_layer(data: bytes, layer_key: bytes, iv_key: bytes) -> bytes:
34 """AES-256-CBC encrypt *data* using *layer_key*.
35
36 Parameters
37 ----------
38 data:
39 Plaintext, must be a multiple of 16 bytes (typically 1024).
40 layer_key:
41 32-byte AES key for this layer.
42 iv_key:
43 Key from which the 16-byte IV is derived (first 16 bytes used).
44
45 Returns
46 -------
47 bytes
48 Ciphertext of the same length as *data*.
49 """
50 iv = _derive_iv(iv_key)
51 return AESEngine.encrypt(data, layer_key, iv)
52
53
54class TunnelLayerDecryptor:
55 """Decrypt a single tunnel layer using AES-256-CBC."""
56
57 @staticmethod
58 def decrypt_layer(data: bytes, layer_key: bytes, iv_key: bytes) -> bytes:
59 """AES-256-CBC decrypt *data* using *layer_key*.
60
61 Reverse of :meth:`TunnelLayerEncryptor.encrypt_layer`.
62 """
63 iv = _derive_iv(iv_key)
64 return AESEngine.decrypt(data, layer_key, iv)
65
66
67class OutboundTunnelEncryptor:
68 """Encrypt tunnel data for an outbound tunnel (multiple hops).
69
70 Layers are applied in **reverse** order: the endpoint's layer is
71 applied first, then the next-to-last hop, and so on, finishing with
72 the gateway's layer. Each hop in the tunnel will decrypt (peel) one
73 layer in forward order, revealing the plaintext at the endpoint.
74 """
75
76 @staticmethod
77 def encrypt(data: bytes, hop_keys: list[tuple[bytes, bytes]]) -> bytes:
78 """Apply encryption layers in reverse hop order.
79
80 Parameters
81 ----------
82 data:
83 Plaintext tunnel data (must be multiple of 16 bytes).
84 hop_keys:
85 List of ``(layer_key, iv_key)`` tuples ordered from gateway
86 to endpoint.
87
88 Returns
89 -------
90 bytes
91 Fully-encrypted tunnel data.
92 """
93 result = data
94 for layer_key, iv_key in reversed(hop_keys):
95 result = TunnelLayerEncryptor.encrypt_layer(result, layer_key, iv_key)
96 return result
97
98
99class InboundTunnelDecryptor:
100 """Decrypt tunnel data for an inbound tunnel (multiple hops).
101
102 Each hop in an inbound tunnel encrypts a layer (forward order).
103 The tunnel endpoint decrypts all layers in forward order to recover
104 the plaintext.
105 """
106
107 @staticmethod
108 def decrypt(data: bytes, hop_keys: list[tuple[bytes, bytes]]) -> bytes:
109 """Remove all encryption layers in forward hop order.
110
111 Parameters
112 ----------
113 data:
114 Encrypted tunnel data with all layers applied.
115 hop_keys:
116 List of ``(layer_key, iv_key)`` tuples ordered from first
117 hop to last hop.
118
119 Returns
120 -------
121 bytes
122 Decrypted plaintext.
123 """
124 result = data
125 for layer_key, iv_key in reversed(hop_keys):
126 result = TunnelLayerDecryptor.decrypt_layer(result, layer_key, iv_key)
127 return result
128
129
130class BuildRecordEncryptor:
131 """Encrypt a tunnel build request record using ElGamal.
132
133 The 222-byte ``BuildRecord`` serialization is ElGamal-encrypted to
134 the target hop's public key, producing 514 bytes of ciphertext which
135 is then zero-padded to 528 bytes (I2P's fixed record slot size).
136 """
137
138 @staticmethod
139 def encrypt_record(record_bytes: bytes, peer_public_key: bytes) -> bytes:
140 """ElGamal-encrypt a build record.
141
142 Parameters
143 ----------
144 record_bytes:
145 222-byte serialized ``BuildRecord``.
146 peer_public_key:
147 256-byte ElGamal public key of the target hop.
148
149 Returns
150 -------
151 bytes
152 528-byte encrypted record (514 ciphertext + 14 zero padding).
153 """
154 ciphertext = ElGamalEngine.encrypt(record_bytes, peer_public_key)
155 # ciphertext is 514 bytes; pad to 528
156 return ciphertext.ljust(528, b"\x00")
157
158
159class BuildReplyDecryptor:
160 """Decrypt a tunnel build reply record using AES-256-CBC."""
161
162 @staticmethod
163 def decrypt_reply(
164 encrypted_reply: bytes, reply_key: bytes, reply_iv: bytes
165 ) -> bytes:
166 """AES-decrypt a build reply record.
167
168 Parameters
169 ----------
170 encrypted_reply:
171 Encrypted reply data (multiple of 16 bytes).
172 reply_key:
173 32-byte AES key provided in the original build record.
174 reply_iv:
175 IV bytes; first 16 bytes are used as the AES IV.
176
177 Returns
178 -------
179 bytes
180 Decrypted reply record.
181 """
182 iv = reply_iv[:16]
183 return AESEngine.decrypt(encrypted_reply, reply_key, iv)