"""Garlic encryption/decryption pipeline. Ported from net.i2p.crypto.ElGamalAESEngine. Garlic messages use two modes: 1. **Existing session** -- the sender and receiver already share a session key and session tags. The sender picks a tag, prepends it (32 bytes) to the AES-256-CBC ciphertext. The receiver looks up the tag to find the session key. 2. **New session** -- no shared state yet. The sender ElGamal-encrypts a "tag delivery block" (session key + tags) with the receiver's public key, then AES-encrypts the cloves data with the session key. Wire formats: Existing session: session_tag(32) || AES-CBC(cloves, key, iv=tag[:16]) New session: ElGamal(tag_delivery_block)(514) || AES-CBC(cloves, key, iv=zeros(16)) Tag delivery block (plaintext inside ElGamal): session_key(32) || tag_count(2, big-endian) || tags(32 each) """ from __future__ import annotations import hashlib import hmac as _hmac import os import struct import time from collections import defaultdict from i2p_crypto.aes import AESEngine from i2p_crypto.elgamal import ElGamalEngine # Maximum tags per ElGamal block: 32 + 2 + N*32 <= 222 => N <= 5 _MAX_TAGS_PER_ELGAMAL = 5 # Zero IV for new-session AES (session key is wrapped, not tag-derived) _ZERO_IV = b"\x00" * 16 class GarlicEncryptor: """Encrypts garlic messages for existing or new sessions.""" @staticmethod def encrypt( cloves_data: bytes, session_key: bytes, session_tag: bytes, ) -> bytes: """Encrypt *cloves_data* for an existing session. Parameters ---------- cloves_data: Plaintext (must be a multiple of 16 bytes). session_key: 32-byte AES session key shared with the receiver. session_tag: 32-byte session tag (single-use). Returns ------- bytes ``session_tag(32) || AES-CBC(cloves_data, session_key, iv=tag[:16])`` """ iv = session_tag[:16] ciphertext = AESEngine.encrypt(cloves_data, session_key, iv) return session_tag + ciphertext @staticmethod def encrypt_new_session( cloves_data: bytes, session_key: bytes, tags: list[bytes], peer_public_key: bytes, ) -> bytes: """Encrypt *cloves_data* for a new session with ElGamal key wrapping. Parameters ---------- cloves_data: Plaintext (must be a multiple of 16 bytes). session_key: 32-byte AES session key to deliver to the receiver. tags: List of 32-byte session tags to deliver (max 5). peer_public_key: 256-byte ElGamal public key of the receiver. Returns ------- bytes ``ElGamal(tag_delivery)(514) || AES-CBC(cloves_data, session_key, iv=zeros)`` Raises ------ ValueError If more than 5 tags are provided. """ if len(tags) > _MAX_TAGS_PER_ELGAMAL: raise ValueError( f"Too many tags: {len(tags)} > {_MAX_TAGS_PER_ELGAMAL}. " f"Tag delivery block must fit in 222-byte ElGamal plaintext." ) # Build tag delivery block tag_count = struct.pack(">H", len(tags)) tag_delivery = session_key + tag_count + b"".join(tags) # ElGamal encrypt the tag delivery block elgamal_block = ElGamalEngine.encrypt(tag_delivery, peer_public_key) # AES encrypt the cloves data with a zero IV # (the session key is delivered via ElGamal, not via tag) aes_ciphertext = AESEngine.encrypt(cloves_data, session_key, _ZERO_IV) return elgamal_block + aes_ciphertext class GarlicDecryptor: """Decrypts garlic messages for existing or new sessions.""" @staticmethod def decrypt_existing( encrypted: bytes, session_key_mgr, ) -> bytes | None: """Decrypt an existing-session garlic message. Parameters ---------- encrypted: The full encrypted message: ``tag(32) || aes_ciphertext``. session_key_mgr: A ``SessionKeyManager`` instance with ``consume_tag(tag)`` returning the session key or None. Returns ------- bytes or None Decrypted cloves data, or None if the tag is unknown. """ if len(encrypted) < 48: # 32 tag + at least 16 AES block return None session_tag = encrypted[:32] aes_ciphertext = encrypted[32:] session_key = session_key_mgr.consume_tag(session_tag) if session_key is None: return None iv = session_tag[:16] return AESEngine.decrypt(aes_ciphertext, session_key, iv) @staticmethod def decrypt_new_session( encrypted: bytes, our_private_key: bytes, session_key_mgr, ) -> bytes | None: """Decrypt a new-session garlic message with ElGamal unwrapping. Parameters ---------- encrypted: The full message: ``elgamal_block(514) || aes_ciphertext``. our_private_key: 256-byte ElGamal private key. session_key_mgr: A ``SessionKeyManager`` with ``add_tags(key, tags, expiration_ms)``. Returns ------- bytes or None Decrypted cloves data, or None if ElGamal decryption fails. """ if len(encrypted) < 514 + 16: # ElGamal block + at least 1 AES block return None elgamal_block = encrypted[:514] aes_ciphertext = encrypted[514:] # Decrypt the tag delivery block tag_delivery = ElGamalEngine.decrypt(elgamal_block, our_private_key) if tag_delivery is None: return None # Parse: session_key(32) + tag_count(2) + tags(32 each) if len(tag_delivery) < 34: return None session_key = tag_delivery[:32] tag_count = struct.unpack(">H", tag_delivery[32:34])[0] expected_len = 34 + tag_count * 32 if len(tag_delivery) < expected_len: return None tags = [] offset = 34 for _ in range(tag_count): tags.append(tag_delivery[offset : offset + 32]) offset += 32 # Register the delivered tags now_ms = int(time.time() * 1000) expiration_ms = now_ms + 720_000 # 12 minutes default session_key_mgr.add_tags(session_key, tags, expiration_ms) # Decrypt the cloves return AESEngine.decrypt(aes_ciphertext, session_key, _ZERO_IV) def apply_aes_padding(plaintext: bytes) -> bytes: """Java-compatible AES garlic padding. Format: pad_len(2, big-endian) + random_pad(pad_len) + payload + SHA-256(payload) Total size padded to multiple of 16. """ checksum = hashlib.sha256(plaintext).digest() inner = plaintext + checksum # payload + 32-byte hash # Calculate pad needed to reach next 16-byte boundary # Total: 2 (pad_len) + pad + len(inner) must be multiple of 16 min_total = 2 + len(inner) pad_len = (16 - (min_total % 16)) % 16 random_pad = os.urandom(pad_len) return struct.pack("!H", pad_len) + random_pad + inner def verify_aes_padding(decrypted: bytes) -> bytes | None: """Verify and strip AES garlic padding. Returns the payload, or None if the checksum doesn't match. """ if len(decrypted) < 34: # 2 (pad_len) + 0 (payload) + 32 (hash) return None pad_len = struct.unpack("!H", decrypted[:2])[0] offset = 2 + pad_len if offset + 32 > len(decrypted): return None inner = decrypted[offset:] payload = inner[:-32] expected_hash = inner[-32:] actual_hash = hashlib.sha256(payload).digest() if not _hmac.compare_digest(actual_hash, expected_hash): return None return payload class SessionTagManager: """Proactive session tag lifecycle management. Manages per-session tag pools with automatic generation, consumption, and replenishment tracking. """ DEFAULT_MAX_TAGS = 200 REPLENISH_THRESHOLD = 10 def __init__(self, max_tags_per_session: int = DEFAULT_MAX_TAGS) -> None: self._max_tags = max_tags_per_session self._tags: dict[bytes, list[bytes]] = defaultdict(list) def generate_tag_bundle(self, session_key: bytes, count: int = 5) -> list[bytes]: """Generate and store a bundle of new tags for a session. Returns the generated tags (also stored internally). """ tags = [os.urandom(32) for _ in range(count)] current = self._tags[session_key] space = self._max_tags - len(current) to_add = tags[:space] # respect max current.extend(to_add) return tags def consume_tag(self, session_key: bytes) -> bytes | None: """Consume the next available tag for a session. Returns the tag, or None if no tags remain. """ pool = self._tags.get(session_key) if not pool: return None return pool.pop(0) def receive_tags(self, session_key: bytes, tags: list[bytes]) -> None: """Add received tags to a session's pool.""" current = self._tags[session_key] space = self._max_tags - len(current) current.extend(tags[:space]) def remaining_tags(self, session_key: bytes) -> int: """Number of remaining tags for a session.""" return len(self._tags.get(session_key, [])) def should_replenish(self, session_key: bytes) -> bool: """True if the tag pool is running low.""" return self.remaining_tags(session_key) < self.REPLENISH_THRESHOLD def has_tags(self, session_key: bytes) -> bool: """True if at least one tag is available.""" return self.remaining_tags(session_key) > 0