A Python port of the Invisible Internet Project (I2P)
at main 79 lines 2.7 kB view raw
1"""MessageWrapper — wrap messages in single-clove garlic for privacy. 2 3Ported from net.i2p.router.message.MessageWrapper. 4 5Provides utility methods to wrap a message in a garlic clove 6with LOCAL delivery, encrypted with a session key and tag. 7""" 8 9from __future__ import annotations 10 11import os 12import struct 13import time 14 15from i2p_crypto.garlic_crypto import GarlicEncryptor 16from i2p_crypto.session_key_manager import SessionKeyManager, TagSet 17from i2p_data.garlic import DeliveryType 18 19 20class MessageWrapper: 21 """Wrap messages in garlic encryption for tunnel delivery.""" 22 23 @staticmethod 24 def wrap( 25 msg_bytes: bytes, 26 session_key: bytes, 27 session_tag: bytes, 28 ) -> bytes: 29 """Wrap message bytes in a single-clove garlic, encrypted with key/tag. 30 31 Creates LOCAL delivery instructions + message + certificate + cloveId + expiration, 32 then encrypts with AES using the session key and tag. 33 """ 34 # Build single-clove garlic plaintext: 35 # clove_count(1) + [delivery_instructions + msg_len(4) + msg + cert(3) + cloveId(4) + exp(8)] 36 # + trailing cert(3) + messageId(4) + expiration(8) 37 flag = (DeliveryType.LOCAL & 0x03) << 5 # LOCAL = 0 38 clove_id = int.from_bytes(os.urandom(4), "big") 39 exp = int(time.time() * 1000) + 60_000 # 1 minute 40 41 clove = ( 42 struct.pack("!B", flag) # delivery instructions (LOCAL) 43 + struct.pack("!I", len(msg_bytes)) 44 + msg_bytes 45 + b"\x00\x00\x00" # NULL certificate 46 + struct.pack("!I", clove_id) 47 + struct.pack("!Q", exp) 48 ) 49 50 garlic_msg_id = int.from_bytes(os.urandom(4), "big") 51 garlic_exp = exp 52 53 plaintext = ( 54 struct.pack("!B", 1) # 1 clove 55 + clove 56 + b"\x00\x00\x00" # trailing certificate 57 + struct.pack("!I", garlic_msg_id) 58 + struct.pack("!Q", garlic_exp) 59 ) 60 61 # Pad to 16-byte boundary 62 pad_len = (16 - len(plaintext) % 16) % 16 63 plaintext += b"\x00" * pad_len 64 65 return GarlicEncryptor.encrypt(plaintext, session_key, session_tag) 66 67 @staticmethod 68 def generate_session(skm: SessionKeyManager) -> tuple[bytes, bytes]: 69 """Create a one-time session key + tag pair, registered in the SKM. 70 71 Returns (session_key, session_tag). 72 """ 73 session_key = os.urandom(32) 74 tag = os.urandom(32) 75 76 now_ms = int(time.time() * 1000) 77 skm.add_tags(session_key, [tag], now_ms + TagSet.DEFAULT_LIFETIME_MS) 78 79 return session_key, tag