A Python port of the Invisible Internet Project (I2P)
1"""Garlic encryption/decryption pipeline.
2
3Ported from net.i2p.crypto.ElGamalAESEngine.
4
5Garlic messages use two modes:
6
71. **Existing session** -- the sender and receiver already share a session
8 key and session tags. The sender picks a tag, prepends it (32 bytes)
9 to the AES-256-CBC ciphertext. The receiver looks up the tag to find
10 the session key.
11
122. **New session** -- no shared state yet. The sender ElGamal-encrypts
13 a "tag delivery block" (session key + tags) with the receiver's public
14 key, then AES-encrypts the cloves data with the session key.
15
16Wire formats:
17
18 Existing session:
19 session_tag(32) || AES-CBC(cloves, key, iv=tag[:16])
20
21 New session:
22 ElGamal(tag_delivery_block)(514) || AES-CBC(cloves, key, iv=zeros(16))
23
24 Tag delivery block (plaintext inside ElGamal):
25 session_key(32) || tag_count(2, big-endian) || tags(32 each)
26"""
27
28from __future__ import annotations
29
30import hashlib
31import hmac as _hmac
32import os
33import struct
34import time
35from collections import defaultdict
36
37from i2p_crypto.aes import AESEngine
38from i2p_crypto.elgamal import ElGamalEngine
39
40# Maximum tags per ElGamal block: 32 + 2 + N*32 <= 222 => N <= 5
41_MAX_TAGS_PER_ELGAMAL = 5
42
43# Zero IV for new-session AES (session key is wrapped, not tag-derived)
44_ZERO_IV = b"\x00" * 16
45
46
47class GarlicEncryptor:
48 """Encrypts garlic messages for existing or new sessions."""
49
50 @staticmethod
51 def encrypt(
52 cloves_data: bytes,
53 session_key: bytes,
54 session_tag: bytes,
55 ) -> bytes:
56 """Encrypt *cloves_data* for an existing session.
57
58 Parameters
59 ----------
60 cloves_data:
61 Plaintext (must be a multiple of 16 bytes).
62 session_key:
63 32-byte AES session key shared with the receiver.
64 session_tag:
65 32-byte session tag (single-use).
66
67 Returns
68 -------
69 bytes
70 ``session_tag(32) || AES-CBC(cloves_data, session_key, iv=tag[:16])``
71 """
72 iv = session_tag[:16]
73 ciphertext = AESEngine.encrypt(cloves_data, session_key, iv)
74 return session_tag + ciphertext
75
76 @staticmethod
77 def encrypt_new_session(
78 cloves_data: bytes,
79 session_key: bytes,
80 tags: list[bytes],
81 peer_public_key: bytes,
82 ) -> bytes:
83 """Encrypt *cloves_data* for a new session with ElGamal key wrapping.
84
85 Parameters
86 ----------
87 cloves_data:
88 Plaintext (must be a multiple of 16 bytes).
89 session_key:
90 32-byte AES session key to deliver to the receiver.
91 tags:
92 List of 32-byte session tags to deliver (max 5).
93 peer_public_key:
94 256-byte ElGamal public key of the receiver.
95
96 Returns
97 -------
98 bytes
99 ``ElGamal(tag_delivery)(514) || AES-CBC(cloves_data, session_key, iv=zeros)``
100
101 Raises
102 ------
103 ValueError
104 If more than 5 tags are provided.
105 """
106 if len(tags) > _MAX_TAGS_PER_ELGAMAL:
107 raise ValueError(
108 f"Too many tags: {len(tags)} > {_MAX_TAGS_PER_ELGAMAL}. "
109 f"Tag delivery block must fit in 222-byte ElGamal plaintext."
110 )
111
112 # Build tag delivery block
113 tag_count = struct.pack(">H", len(tags))
114 tag_delivery = session_key + tag_count + b"".join(tags)
115
116 # ElGamal encrypt the tag delivery block
117 elgamal_block = ElGamalEngine.encrypt(tag_delivery, peer_public_key)
118
119 # AES encrypt the cloves data with a zero IV
120 # (the session key is delivered via ElGamal, not via tag)
121 aes_ciphertext = AESEngine.encrypt(cloves_data, session_key, _ZERO_IV)
122
123 return elgamal_block + aes_ciphertext
124
125
126class GarlicDecryptor:
127 """Decrypts garlic messages for existing or new sessions."""
128
129 @staticmethod
130 def decrypt_existing(
131 encrypted: bytes,
132 session_key_mgr,
133 ) -> bytes | None:
134 """Decrypt an existing-session garlic message.
135
136 Parameters
137 ----------
138 encrypted:
139 The full encrypted message: ``tag(32) || aes_ciphertext``.
140 session_key_mgr:
141 A ``SessionKeyManager`` instance with ``consume_tag(tag)``
142 returning the session key or None.
143
144 Returns
145 -------
146 bytes or None
147 Decrypted cloves data, or None if the tag is unknown.
148 """
149 if len(encrypted) < 48: # 32 tag + at least 16 AES block
150 return None
151
152 session_tag = encrypted[:32]
153 aes_ciphertext = encrypted[32:]
154
155 session_key = session_key_mgr.consume_tag(session_tag)
156 if session_key is None:
157 return None
158
159 iv = session_tag[:16]
160 return AESEngine.decrypt(aes_ciphertext, session_key, iv)
161
162 @staticmethod
163 def decrypt_new_session(
164 encrypted: bytes,
165 our_private_key: bytes,
166 session_key_mgr,
167 ) -> bytes | None:
168 """Decrypt a new-session garlic message with ElGamal unwrapping.
169
170 Parameters
171 ----------
172 encrypted:
173 The full message: ``elgamal_block(514) || aes_ciphertext``.
174 our_private_key:
175 256-byte ElGamal private key.
176 session_key_mgr:
177 A ``SessionKeyManager`` with ``add_tags(key, tags, expiration_ms)``.
178
179 Returns
180 -------
181 bytes or None
182 Decrypted cloves data, or None if ElGamal decryption fails.
183 """
184 if len(encrypted) < 514 + 16: # ElGamal block + at least 1 AES block
185 return None
186
187 elgamal_block = encrypted[:514]
188 aes_ciphertext = encrypted[514:]
189
190 # Decrypt the tag delivery block
191 tag_delivery = ElGamalEngine.decrypt(elgamal_block, our_private_key)
192 if tag_delivery is None:
193 return None
194
195 # Parse: session_key(32) + tag_count(2) + tags(32 each)
196 if len(tag_delivery) < 34:
197 return None
198
199 session_key = tag_delivery[:32]
200 tag_count = struct.unpack(">H", tag_delivery[32:34])[0]
201
202 expected_len = 34 + tag_count * 32
203 if len(tag_delivery) < expected_len:
204 return None
205
206 tags = []
207 offset = 34
208 for _ in range(tag_count):
209 tags.append(tag_delivery[offset : offset + 32])
210 offset += 32
211
212 # Register the delivered tags
213 now_ms = int(time.time() * 1000)
214 expiration_ms = now_ms + 720_000 # 12 minutes default
215 session_key_mgr.add_tags(session_key, tags, expiration_ms)
216
217 # Decrypt the cloves
218 return AESEngine.decrypt(aes_ciphertext, session_key, _ZERO_IV)
219
220
221def apply_aes_padding(plaintext: bytes) -> bytes:
222 """Java-compatible AES garlic padding.
223
224 Format: pad_len(2, big-endian) + random_pad(pad_len) + payload + SHA-256(payload)
225 Total size padded to multiple of 16.
226 """
227 checksum = hashlib.sha256(plaintext).digest()
228 inner = plaintext + checksum # payload + 32-byte hash
229
230 # Calculate pad needed to reach next 16-byte boundary
231 # Total: 2 (pad_len) + pad + len(inner) must be multiple of 16
232 min_total = 2 + len(inner)
233 pad_len = (16 - (min_total % 16)) % 16
234 random_pad = os.urandom(pad_len)
235
236 return struct.pack("!H", pad_len) + random_pad + inner
237
238
239def verify_aes_padding(decrypted: bytes) -> bytes | None:
240 """Verify and strip AES garlic padding.
241
242 Returns the payload, or None if the checksum doesn't match.
243 """
244 if len(decrypted) < 34: # 2 (pad_len) + 0 (payload) + 32 (hash)
245 return None
246
247 pad_len = struct.unpack("!H", decrypted[:2])[0]
248 offset = 2 + pad_len
249
250 if offset + 32 > len(decrypted):
251 return None
252
253 inner = decrypted[offset:]
254 payload = inner[:-32]
255 expected_hash = inner[-32:]
256
257 actual_hash = hashlib.sha256(payload).digest()
258 if not _hmac.compare_digest(actual_hash, expected_hash):
259 return None
260
261 return payload
262
263
264class SessionTagManager:
265 """Proactive session tag lifecycle management.
266
267 Manages per-session tag pools with automatic generation,
268 consumption, and replenishment tracking.
269 """
270
271 DEFAULT_MAX_TAGS = 200
272 REPLENISH_THRESHOLD = 10
273
274 def __init__(self, max_tags_per_session: int = DEFAULT_MAX_TAGS) -> None:
275 self._max_tags = max_tags_per_session
276 self._tags: dict[bytes, list[bytes]] = defaultdict(list)
277
278 def generate_tag_bundle(self, session_key: bytes, count: int = 5) -> list[bytes]:
279 """Generate and store a bundle of new tags for a session.
280
281 Returns the generated tags (also stored internally).
282 """
283 tags = [os.urandom(32) for _ in range(count)]
284 current = self._tags[session_key]
285 space = self._max_tags - len(current)
286 to_add = tags[:space] # respect max
287 current.extend(to_add)
288 return tags
289
290 def consume_tag(self, session_key: bytes) -> bytes | None:
291 """Consume the next available tag for a session.
292
293 Returns the tag, or None if no tags remain.
294 """
295 pool = self._tags.get(session_key)
296 if not pool:
297 return None
298 return pool.pop(0)
299
300 def receive_tags(self, session_key: bytes, tags: list[bytes]) -> None:
301 """Add received tags to a session's pool."""
302 current = self._tags[session_key]
303 space = self._max_tags - len(current)
304 current.extend(tags[:space])
305
306 def remaining_tags(self, session_key: bytes) -> int:
307 """Number of remaining tags for a session."""
308 return len(self._tags.get(session_key, []))
309
310 def should_replenish(self, session_key: bytes) -> bool:
311 """True if the tag pool is running low."""
312 return self.remaining_tags(session_key) < self.REPLENISH_THRESHOLD
313
314 def has_tags(self, session_key: bytes) -> bool:
315 """True if at least one tag is available."""
316 return self.remaining_tags(session_key) > 0