A Python port of the Invisible Internet Project (I2P)
1"""GarlicMessageBuilder — build encrypted garlic messages from GarlicConfig.
2
3Ported from net.i2p.router.crypto.GarlicMessageBuilder.
4
5Takes a GarlicConfig with assembled cloves, serializes them into
6the garlic wire format, and encrypts using either an existing
7session (AES + tag) or a new session (ElGamal + AES).
8"""
9
10from __future__ import annotations
11
12import os
13import struct
14import time
15
16from i2p_crypto.garlic_crypto import GarlicEncryptor
17from i2p_crypto.session_key_manager import SessionKeyManager, TagSet
18from i2p_data.garlic import DeliveryType
19from i2p_data.garlic_config import GarlicConfig, CloveConfig
20
21
22class GarlicMessageBuilder:
23 """Build encrypted garlic messages from configuration objects."""
24
25 @staticmethod
26 def build_message(
27 config: GarlicConfig,
28 skm: SessionKeyManager,
29 ) -> tuple[bytes, int | None]:
30 """Build an encrypted garlic message.
31
32 Returns (encrypted_bytes, reply_token_or_none).
33 Uses existing session if tags are available, otherwise new session.
34 """
35 dest_hash = config.recipient_public_key[:32]
36
37 # Get or create session
38 session_key, is_new = skm.get_current_or_new_key(dest_hash)
39
40 # Try to get an existing tag
41 tag = skm.consume_next_available_tag(dest_hash)
42
43 # Build the clove set plaintext
44 plaintext = GarlicMessageBuilder._build_clove_set(config)
45
46 # Pad to 16-byte boundary
47 plaintext = GarlicMessageBuilder._pad_to_block_boundary(plaintext)
48
49 if tag is not None:
50 # Existing session path
51 encrypted = GarlicEncryptor.encrypt(plaintext, session_key, tag)
52 else:
53 # New session path — generate tags to deliver
54 new_tags = [os.urandom(32) for _ in range(5)]
55 encrypted = GarlicEncryptor.encrypt_new_session(
56 plaintext, session_key, new_tags, config.recipient_public_key,
57 )
58
59 return encrypted, None
60
61 @staticmethod
62 def _build_clove_set(config: GarlicConfig) -> bytes:
63 """Serialize cloves into garlic wire format.
64
65 Format: clove_count(1) + [clove]* + trailing_cert(3) + msg_id(4) + exp(8)
66 Each clove: delivery_instructions + msg_len(4) + msg + cert(3) + clove_id(4) + exp(8)
67 """
68 parts = [struct.pack("!B", config.clove_count())]
69
70 for clove in config.cloves:
71 parts.append(GarlicMessageBuilder._serialize_clove(clove))
72
73 # Trailing certificate (NULL) + message ID + expiration
74 parts.append(b"\x00\x00\x00") # NULL cert
75 parts.append(struct.pack("!I", config.message_id))
76 parts.append(struct.pack("!Q", config.expiration))
77
78 return b"".join(parts)
79
80 @staticmethod
81 def _serialize_clove(clove: CloveConfig) -> bytes:
82 """Serialize a single clove to wire format."""
83 # Delivery instructions
84 flag = (clove.delivery_type & 0x03) << 5
85 parts = [struct.pack("!B", flag)]
86
87 if clove.delivery_type == 1: # DESTINATION
88 assert clove.dest_hash is not None
89 parts.append(clove.dest_hash)
90 elif clove.delivery_type == 2: # ROUTER
91 assert clove.router_hash is not None
92 parts.append(clove.router_hash)
93 elif clove.delivery_type == 3: # TUNNEL
94 assert clove.router_hash is not None
95 parts.append(clove.router_hash)
96 parts.append(struct.pack("!I", clove.tunnel_id))
97
98 # Message data
99 parts.append(struct.pack("!I", len(clove.message_data)))
100 parts.append(clove.message_data)
101
102 # Certificate + clove ID + expiration
103 parts.append(clove.certificate)
104 parts.append(struct.pack("!I", clove.clove_id))
105 parts.append(struct.pack("!Q", clove.expiration))
106
107 return b"".join(parts)
108
109 @staticmethod
110 def _pad_to_block_boundary(data: bytes) -> bytes:
111 """Pad to 16-byte AES block boundary."""
112 pad_len = (16 - len(data) % 16) % 16
113 return data + b"\x00" * pad_len