A Python port of the Invisible Internet Project (I2P)
at main 113 lines 4.0 kB view raw
1"""GarlicMessageBuilder — build encrypted garlic messages from GarlicConfig. 2 3Ported from net.i2p.router.crypto.GarlicMessageBuilder. 4 5Takes a GarlicConfig with assembled cloves, serializes them into 6the garlic wire format, and encrypts using either an existing 7session (AES + tag) or a new session (ElGamal + AES). 8""" 9 10from __future__ import annotations 11 12import os 13import struct 14import time 15 16from i2p_crypto.garlic_crypto import GarlicEncryptor 17from i2p_crypto.session_key_manager import SessionKeyManager, TagSet 18from i2p_data.garlic import DeliveryType 19from i2p_data.garlic_config import GarlicConfig, CloveConfig 20 21 22class GarlicMessageBuilder: 23 """Build encrypted garlic messages from configuration objects.""" 24 25 @staticmethod 26 def build_message( 27 config: GarlicConfig, 28 skm: SessionKeyManager, 29 ) -> tuple[bytes, int | None]: 30 """Build an encrypted garlic message. 31 32 Returns (encrypted_bytes, reply_token_or_none). 33 Uses existing session if tags are available, otherwise new session. 34 """ 35 dest_hash = config.recipient_public_key[:32] 36 37 # Get or create session 38 session_key, is_new = skm.get_current_or_new_key(dest_hash) 39 40 # Try to get an existing tag 41 tag = skm.consume_next_available_tag(dest_hash) 42 43 # Build the clove set plaintext 44 plaintext = GarlicMessageBuilder._build_clove_set(config) 45 46 # Pad to 16-byte boundary 47 plaintext = GarlicMessageBuilder._pad_to_block_boundary(plaintext) 48 49 if tag is not None: 50 # Existing session path 51 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tag) 52 else: 53 # New session path — generate tags to deliver 54 new_tags = [os.urandom(32) for _ in range(5)] 55 encrypted = GarlicEncryptor.encrypt_new_session( 56 plaintext, session_key, new_tags, config.recipient_public_key, 57 ) 58 59 return encrypted, None 60 61 @staticmethod 62 def _build_clove_set(config: GarlicConfig) -> bytes: 63 """Serialize cloves into garlic wire format. 64 65 Format: clove_count(1) + [clove]* + trailing_cert(3) + msg_id(4) + exp(8) 66 Each clove: delivery_instructions + msg_len(4) + msg + cert(3) + clove_id(4) + exp(8) 67 """ 68 parts = [struct.pack("!B", config.clove_count())] 69 70 for clove in config.cloves: 71 parts.append(GarlicMessageBuilder._serialize_clove(clove)) 72 73 # Trailing certificate (NULL) + message ID + expiration 74 parts.append(b"\x00\x00\x00") # NULL cert 75 parts.append(struct.pack("!I", config.message_id)) 76 parts.append(struct.pack("!Q", config.expiration)) 77 78 return b"".join(parts) 79 80 @staticmethod 81 def _serialize_clove(clove: CloveConfig) -> bytes: 82 """Serialize a single clove to wire format.""" 83 # Delivery instructions 84 flag = (clove.delivery_type & 0x03) << 5 85 parts = [struct.pack("!B", flag)] 86 87 if clove.delivery_type == 1: # DESTINATION 88 assert clove.dest_hash is not None 89 parts.append(clove.dest_hash) 90 elif clove.delivery_type == 2: # ROUTER 91 assert clove.router_hash is not None 92 parts.append(clove.router_hash) 93 elif clove.delivery_type == 3: # TUNNEL 94 assert clove.router_hash is not None 95 parts.append(clove.router_hash) 96 parts.append(struct.pack("!I", clove.tunnel_id)) 97 98 # Message data 99 parts.append(struct.pack("!I", len(clove.message_data))) 100 parts.append(clove.message_data) 101 102 # Certificate + clove ID + expiration 103 parts.append(clove.certificate) 104 parts.append(struct.pack("!I", clove.clove_id)) 105 parts.append(struct.pack("!Q", clove.expiration)) 106 107 return b"".join(parts) 108 109 @staticmethod 110 def _pad_to_block_boundary(data: bytes) -> bytes: 111 """Pad to 16-byte AES block boundary.""" 112 pad_len = (16 - len(data) % 16) % 16 113 return data + b"\x00" * pad_len