"""MessageWrapper — wrap messages in single-clove garlic for privacy. Ported from net.i2p.router.message.MessageWrapper. Provides utility methods to wrap a message in a garlic clove with LOCAL delivery, encrypted with a session key and tag. """ from __future__ import annotations import os import struct import time from i2p_crypto.garlic_crypto import GarlicEncryptor from i2p_crypto.session_key_manager import SessionKeyManager, TagSet from i2p_data.garlic import DeliveryType class MessageWrapper: """Wrap messages in garlic encryption for tunnel delivery.""" @staticmethod def wrap( msg_bytes: bytes, session_key: bytes, session_tag: bytes, ) -> bytes: """Wrap message bytes in a single-clove garlic, encrypted with key/tag. Creates LOCAL delivery instructions + message + certificate + cloveId + expiration, then encrypts with AES using the session key and tag. """ # Build single-clove garlic plaintext: # clove_count(1) + [delivery_instructions + msg_len(4) + msg + cert(3) + cloveId(4) + exp(8)] # + trailing cert(3) + messageId(4) + expiration(8) flag = (DeliveryType.LOCAL & 0x03) << 5 # LOCAL = 0 clove_id = int.from_bytes(os.urandom(4), "big") exp = int(time.time() * 1000) + 60_000 # 1 minute clove = ( struct.pack("!B", flag) # delivery instructions (LOCAL) + struct.pack("!I", len(msg_bytes)) + msg_bytes + b"\x00\x00\x00" # NULL certificate + struct.pack("!I", clove_id) + struct.pack("!Q", exp) ) garlic_msg_id = int.from_bytes(os.urandom(4), "big") garlic_exp = exp plaintext = ( struct.pack("!B", 1) # 1 clove + clove + b"\x00\x00\x00" # trailing certificate + struct.pack("!I", garlic_msg_id) + struct.pack("!Q", garlic_exp) ) # Pad to 16-byte boundary pad_len = (16 - len(plaintext) % 16) % 16 plaintext += b"\x00" * pad_len return GarlicEncryptor.encrypt(plaintext, session_key, session_tag) @staticmethod def generate_session(skm: SessionKeyManager) -> tuple[bytes, bytes]: """Create a one-time session key + tag pair, registered in the SKM. Returns (session_key, session_tag). """ session_key = os.urandom(32) tag = os.urandom(32) now_ms = int(time.time() * 1000) skm.add_tags(session_key, [tag], now_ms + TagSet.DEFAULT_LIFETIME_MS) return session_key, tag