"""GarlicMessageBuilder — build encrypted garlic messages from GarlicConfig. Ported from net.i2p.router.crypto.GarlicMessageBuilder. Takes a GarlicConfig with assembled cloves, serializes them into the garlic wire format, and encrypts using either an existing session (AES + tag) or a new session (ElGamal + AES). """ from __future__ import annotations import os import struct import time from i2p_crypto.garlic_crypto import GarlicEncryptor from i2p_crypto.session_key_manager import SessionKeyManager, TagSet from i2p_data.garlic import DeliveryType from i2p_data.garlic_config import GarlicConfig, CloveConfig class GarlicMessageBuilder: """Build encrypted garlic messages from configuration objects.""" @staticmethod def build_message( config: GarlicConfig, skm: SessionKeyManager, ) -> tuple[bytes, int | None]: """Build an encrypted garlic message. Returns (encrypted_bytes, reply_token_or_none). Uses existing session if tags are available, otherwise new session. """ dest_hash = config.recipient_public_key[:32] # Get or create session session_key, is_new = skm.get_current_or_new_key(dest_hash) # Try to get an existing tag tag = skm.consume_next_available_tag(dest_hash) # Build the clove set plaintext plaintext = GarlicMessageBuilder._build_clove_set(config) # Pad to 16-byte boundary plaintext = GarlicMessageBuilder._pad_to_block_boundary(plaintext) if tag is not None: # Existing session path encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tag) else: # New session path — generate tags to deliver new_tags = [os.urandom(32) for _ in range(5)] encrypted = GarlicEncryptor.encrypt_new_session( plaintext, session_key, new_tags, config.recipient_public_key, ) return encrypted, None @staticmethod def _build_clove_set(config: GarlicConfig) -> bytes: """Serialize cloves into garlic wire format. Format: clove_count(1) + [clove]* + trailing_cert(3) + msg_id(4) + exp(8) Each clove: delivery_instructions + msg_len(4) + msg + cert(3) + clove_id(4) + exp(8) """ parts = [struct.pack("!B", config.clove_count())] for clove in config.cloves: parts.append(GarlicMessageBuilder._serialize_clove(clove)) # Trailing certificate (NULL) + message ID + expiration parts.append(b"\x00\x00\x00") # NULL cert parts.append(struct.pack("!I", config.message_id)) parts.append(struct.pack("!Q", config.expiration)) return b"".join(parts) @staticmethod def _serialize_clove(clove: CloveConfig) -> bytes: """Serialize a single clove to wire format.""" # Delivery instructions flag = (clove.delivery_type & 0x03) << 5 parts = [struct.pack("!B", flag)] if clove.delivery_type == 1: # DESTINATION assert clove.dest_hash is not None parts.append(clove.dest_hash) elif clove.delivery_type == 2: # ROUTER assert clove.router_hash is not None parts.append(clove.router_hash) elif clove.delivery_type == 3: # TUNNEL assert clove.router_hash is not None parts.append(clove.router_hash) parts.append(struct.pack("!I", clove.tunnel_id)) # Message data parts.append(struct.pack("!I", len(clove.message_data))) parts.append(clove.message_data) # Certificate + clove ID + expiration parts.append(clove.certificate) parts.append(struct.pack("!I", clove.clove_id)) parts.append(struct.pack("!Q", clove.expiration)) return b"".join(parts) @staticmethod def _pad_to_block_boundary(data: bytes) -> bytes: """Pad to 16-byte AES block boundary.""" pad_len = (16 - len(data) % 16) % 16 return data + b"\x00" * pad_len