A Python port of the Invisible Internet Project (I2P)
1"""MessageWrapper — wrap messages in single-clove garlic for privacy.
2
3Ported from net.i2p.router.message.MessageWrapper.
4
5Provides utility methods to wrap a message in a garlic clove
6with LOCAL delivery, encrypted with a session key and tag.
7"""
8
9from __future__ import annotations
10
11import os
12import struct
13import time
14
15from i2p_crypto.garlic_crypto import GarlicEncryptor
16from i2p_crypto.session_key_manager import SessionKeyManager, TagSet
17from i2p_data.garlic import DeliveryType
18
19
20class MessageWrapper:
21 """Wrap messages in garlic encryption for tunnel delivery."""
22
23 @staticmethod
24 def wrap(
25 msg_bytes: bytes,
26 session_key: bytes,
27 session_tag: bytes,
28 ) -> bytes:
29 """Wrap message bytes in a single-clove garlic, encrypted with key/tag.
30
31 Creates LOCAL delivery instructions + message + certificate + cloveId + expiration,
32 then encrypts with AES using the session key and tag.
33 """
34 # Build single-clove garlic plaintext:
35 # clove_count(1) + [delivery_instructions + msg_len(4) + msg + cert(3) + cloveId(4) + exp(8)]
36 # + trailing cert(3) + messageId(4) + expiration(8)
37 flag = (DeliveryType.LOCAL & 0x03) << 5 # LOCAL = 0
38 clove_id = int.from_bytes(os.urandom(4), "big")
39 exp = int(time.time() * 1000) + 60_000 # 1 minute
40
41 clove = (
42 struct.pack("!B", flag) # delivery instructions (LOCAL)
43 + struct.pack("!I", len(msg_bytes))
44 + msg_bytes
45 + b"\x00\x00\x00" # NULL certificate
46 + struct.pack("!I", clove_id)
47 + struct.pack("!Q", exp)
48 )
49
50 garlic_msg_id = int.from_bytes(os.urandom(4), "big")
51 garlic_exp = exp
52
53 plaintext = (
54 struct.pack("!B", 1) # 1 clove
55 + clove
56 + b"\x00\x00\x00" # trailing certificate
57 + struct.pack("!I", garlic_msg_id)
58 + struct.pack("!Q", garlic_exp)
59 )
60
61 # Pad to 16-byte boundary
62 pad_len = (16 - len(plaintext) % 16) % 16
63 plaintext += b"\x00" * pad_len
64
65 return GarlicEncryptor.encrypt(plaintext, session_key, session_tag)
66
67 @staticmethod
68 def generate_session(skm: SessionKeyManager) -> tuple[bytes, bytes]:
69 """Create a one-time session key + tag pair, registered in the SKM.
70
71 Returns (session_key, session_tag).
72 """
73 session_key = os.urandom(32)
74 tag = os.urandom(32)
75
76 now_ms = int(time.time() * 1000)
77 skm.add_tags(session_key, [tag], now_ms + TagSet.DEFAULT_LIFETIME_MS)
78
79 return session_key, tag