A Python port of the Invisible Internet Project (I2P)
at main 316 lines 9.9 kB view raw
1"""Garlic encryption/decryption pipeline. 2 3Ported from net.i2p.crypto.ElGamalAESEngine. 4 5Garlic messages use two modes: 6 71. **Existing session** -- the sender and receiver already share a session 8 key and session tags. The sender picks a tag, prepends it (32 bytes) 9 to the AES-256-CBC ciphertext. The receiver looks up the tag to find 10 the session key. 11 122. **New session** -- no shared state yet. The sender ElGamal-encrypts 13 a "tag delivery block" (session key + tags) with the receiver's public 14 key, then AES-encrypts the cloves data with the session key. 15 16Wire formats: 17 18 Existing session: 19 session_tag(32) || AES-CBC(cloves, key, iv=tag[:16]) 20 21 New session: 22 ElGamal(tag_delivery_block)(514) || AES-CBC(cloves, key, iv=zeros(16)) 23 24 Tag delivery block (plaintext inside ElGamal): 25 session_key(32) || tag_count(2, big-endian) || tags(32 each) 26""" 27 28from __future__ import annotations 29 30import hashlib 31import hmac as _hmac 32import os 33import struct 34import time 35from collections import defaultdict 36 37from i2p_crypto.aes import AESEngine 38from i2p_crypto.elgamal import ElGamalEngine 39 40# Maximum tags per ElGamal block: 32 + 2 + N*32 <= 222 => N <= 5 41_MAX_TAGS_PER_ELGAMAL = 5 42 43# Zero IV for new-session AES (session key is wrapped, not tag-derived) 44_ZERO_IV = b"\x00" * 16 45 46 47class GarlicEncryptor: 48 """Encrypts garlic messages for existing or new sessions.""" 49 50 @staticmethod 51 def encrypt( 52 cloves_data: bytes, 53 session_key: bytes, 54 session_tag: bytes, 55 ) -> bytes: 56 """Encrypt *cloves_data* for an existing session. 57 58 Parameters 59 ---------- 60 cloves_data: 61 Plaintext (must be a multiple of 16 bytes). 62 session_key: 63 32-byte AES session key shared with the receiver. 64 session_tag: 65 32-byte session tag (single-use). 66 67 Returns 68 ------- 69 bytes 70 ``session_tag(32) || AES-CBC(cloves_data, session_key, iv=tag[:16])`` 71 """ 72 iv = session_tag[:16] 73 ciphertext = AESEngine.encrypt(cloves_data, session_key, iv) 74 return session_tag + ciphertext 75 76 @staticmethod 77 def encrypt_new_session( 78 cloves_data: bytes, 79 session_key: bytes, 80 tags: list[bytes], 81 peer_public_key: bytes, 82 ) -> bytes: 83 """Encrypt *cloves_data* for a new session with ElGamal key wrapping. 84 85 Parameters 86 ---------- 87 cloves_data: 88 Plaintext (must be a multiple of 16 bytes). 89 session_key: 90 32-byte AES session key to deliver to the receiver. 91 tags: 92 List of 32-byte session tags to deliver (max 5). 93 peer_public_key: 94 256-byte ElGamal public key of the receiver. 95 96 Returns 97 ------- 98 bytes 99 ``ElGamal(tag_delivery)(514) || AES-CBC(cloves_data, session_key, iv=zeros)`` 100 101 Raises 102 ------ 103 ValueError 104 If more than 5 tags are provided. 105 """ 106 if len(tags) > _MAX_TAGS_PER_ELGAMAL: 107 raise ValueError( 108 f"Too many tags: {len(tags)} > {_MAX_TAGS_PER_ELGAMAL}. " 109 f"Tag delivery block must fit in 222-byte ElGamal plaintext." 110 ) 111 112 # Build tag delivery block 113 tag_count = struct.pack(">H", len(tags)) 114 tag_delivery = session_key + tag_count + b"".join(tags) 115 116 # ElGamal encrypt the tag delivery block 117 elgamal_block = ElGamalEngine.encrypt(tag_delivery, peer_public_key) 118 119 # AES encrypt the cloves data with a zero IV 120 # (the session key is delivered via ElGamal, not via tag) 121 aes_ciphertext = AESEngine.encrypt(cloves_data, session_key, _ZERO_IV) 122 123 return elgamal_block + aes_ciphertext 124 125 126class GarlicDecryptor: 127 """Decrypts garlic messages for existing or new sessions.""" 128 129 @staticmethod 130 def decrypt_existing( 131 encrypted: bytes, 132 session_key_mgr, 133 ) -> bytes | None: 134 """Decrypt an existing-session garlic message. 135 136 Parameters 137 ---------- 138 encrypted: 139 The full encrypted message: ``tag(32) || aes_ciphertext``. 140 session_key_mgr: 141 A ``SessionKeyManager`` instance with ``consume_tag(tag)`` 142 returning the session key or None. 143 144 Returns 145 ------- 146 bytes or None 147 Decrypted cloves data, or None if the tag is unknown. 148 """ 149 if len(encrypted) < 48: # 32 tag + at least 16 AES block 150 return None 151 152 session_tag = encrypted[:32] 153 aes_ciphertext = encrypted[32:] 154 155 session_key = session_key_mgr.consume_tag(session_tag) 156 if session_key is None: 157 return None 158 159 iv = session_tag[:16] 160 return AESEngine.decrypt(aes_ciphertext, session_key, iv) 161 162 @staticmethod 163 def decrypt_new_session( 164 encrypted: bytes, 165 our_private_key: bytes, 166 session_key_mgr, 167 ) -> bytes | None: 168 """Decrypt a new-session garlic message with ElGamal unwrapping. 169 170 Parameters 171 ---------- 172 encrypted: 173 The full message: ``elgamal_block(514) || aes_ciphertext``. 174 our_private_key: 175 256-byte ElGamal private key. 176 session_key_mgr: 177 A ``SessionKeyManager`` with ``add_tags(key, tags, expiration_ms)``. 178 179 Returns 180 ------- 181 bytes or None 182 Decrypted cloves data, or None if ElGamal decryption fails. 183 """ 184 if len(encrypted) < 514 + 16: # ElGamal block + at least 1 AES block 185 return None 186 187 elgamal_block = encrypted[:514] 188 aes_ciphertext = encrypted[514:] 189 190 # Decrypt the tag delivery block 191 tag_delivery = ElGamalEngine.decrypt(elgamal_block, our_private_key) 192 if tag_delivery is None: 193 return None 194 195 # Parse: session_key(32) + tag_count(2) + tags(32 each) 196 if len(tag_delivery) < 34: 197 return None 198 199 session_key = tag_delivery[:32] 200 tag_count = struct.unpack(">H", tag_delivery[32:34])[0] 201 202 expected_len = 34 + tag_count * 32 203 if len(tag_delivery) < expected_len: 204 return None 205 206 tags = [] 207 offset = 34 208 for _ in range(tag_count): 209 tags.append(tag_delivery[offset : offset + 32]) 210 offset += 32 211 212 # Register the delivered tags 213 now_ms = int(time.time() * 1000) 214 expiration_ms = now_ms + 720_000 # 12 minutes default 215 session_key_mgr.add_tags(session_key, tags, expiration_ms) 216 217 # Decrypt the cloves 218 return AESEngine.decrypt(aes_ciphertext, session_key, _ZERO_IV) 219 220 221def apply_aes_padding(plaintext: bytes) -> bytes: 222 """Java-compatible AES garlic padding. 223 224 Format: pad_len(2, big-endian) + random_pad(pad_len) + payload + SHA-256(payload) 225 Total size padded to multiple of 16. 226 """ 227 checksum = hashlib.sha256(plaintext).digest() 228 inner = plaintext + checksum # payload + 32-byte hash 229 230 # Calculate pad needed to reach next 16-byte boundary 231 # Total: 2 (pad_len) + pad + len(inner) must be multiple of 16 232 min_total = 2 + len(inner) 233 pad_len = (16 - (min_total % 16)) % 16 234 random_pad = os.urandom(pad_len) 235 236 return struct.pack("!H", pad_len) + random_pad + inner 237 238 239def verify_aes_padding(decrypted: bytes) -> bytes | None: 240 """Verify and strip AES garlic padding. 241 242 Returns the payload, or None if the checksum doesn't match. 243 """ 244 if len(decrypted) < 34: # 2 (pad_len) + 0 (payload) + 32 (hash) 245 return None 246 247 pad_len = struct.unpack("!H", decrypted[:2])[0] 248 offset = 2 + pad_len 249 250 if offset + 32 > len(decrypted): 251 return None 252 253 inner = decrypted[offset:] 254 payload = inner[:-32] 255 expected_hash = inner[-32:] 256 257 actual_hash = hashlib.sha256(payload).digest() 258 if not _hmac.compare_digest(actual_hash, expected_hash): 259 return None 260 261 return payload 262 263 264class SessionTagManager: 265 """Proactive session tag lifecycle management. 266 267 Manages per-session tag pools with automatic generation, 268 consumption, and replenishment tracking. 269 """ 270 271 DEFAULT_MAX_TAGS = 200 272 REPLENISH_THRESHOLD = 10 273 274 def __init__(self, max_tags_per_session: int = DEFAULT_MAX_TAGS) -> None: 275 self._max_tags = max_tags_per_session 276 self._tags: dict[bytes, list[bytes]] = defaultdict(list) 277 278 def generate_tag_bundle(self, session_key: bytes, count: int = 5) -> list[bytes]: 279 """Generate and store a bundle of new tags for a session. 280 281 Returns the generated tags (also stored internally). 282 """ 283 tags = [os.urandom(32) for _ in range(count)] 284 current = self._tags[session_key] 285 space = self._max_tags - len(current) 286 to_add = tags[:space] # respect max 287 current.extend(to_add) 288 return tags 289 290 def consume_tag(self, session_key: bytes) -> bytes | None: 291 """Consume the next available tag for a session. 292 293 Returns the tag, or None if no tags remain. 294 """ 295 pool = self._tags.get(session_key) 296 if not pool: 297 return None 298 return pool.pop(0) 299 300 def receive_tags(self, session_key: bytes, tags: list[bytes]) -> None: 301 """Add received tags to a session's pool.""" 302 current = self._tags[session_key] 303 space = self._max_tags - len(current) 304 current.extend(tags[:space]) 305 306 def remaining_tags(self, session_key: bytes) -> int: 307 """Number of remaining tags for a session.""" 308 return len(self._tags.get(session_key, [])) 309 310 def should_replenish(self, session_key: bytes) -> bool: 311 """True if the tag pool is running low.""" 312 return self.remaining_tags(session_key) < self.REPLENISH_THRESHOLD 313 314 def has_tags(self, session_key: bytes) -> bool: 315 """True if at least one tag is available.""" 316 return self.remaining_tags(session_key) > 0