"""Fragment builder: splits I2NP messages into 1024-byte tunnel data blocks. Produces blocks with padding, control bytes, delivery instructions, and fragment headers per the I2P tunnel message spec. """ from __future__ import annotations import os import struct class FragmentBuilder: """Builds 1024-byte tunnel data blocks from I2NP messages.""" BLOCK_SIZE = 1024 # Delivery type constants DELIVERY_LOCAL = 0 DELIVERY_TUNNEL = 1 DELIVERY_ROUTER = 2 @classmethod def build( cls, msg: bytes, delivery_type: int = 0, message_id: int | None = None, target_tunnel_id: int | None = None, target_router_hash: bytes | None = None, ) -> list[bytes]: """Split message into 1024-byte tunnel data blocks. Args: msg: The I2NP message payload to fragment. delivery_type: 0=local, 1=tunnel, 2=router. message_id: Required for multi-fragment messages. Auto-generated if None. target_tunnel_id: Required for tunnel delivery. target_router_hash: Required for tunnel/router delivery. Returns: List of 1024-byte blocks. """ # Calculate instruction overhead for first fragment first_instr_size = cls._first_instruction_size(delivery_type, fragmented=False) first_instr_size_frag = cls._first_instruction_size(delivery_type, fragmented=True) # Minimum 1 byte padding + 1 byte terminator min_overhead = 2 max_single_payload = cls.BLOCK_SIZE - min_overhead - first_instr_size if len(msg) <= max_single_payload: # Single fragment block = cls._build_first_block( msg, delivery_type, fragmented=False, message_id=None, target_tunnel_id=target_tunnel_id, target_router_hash=target_router_hash, ) return [block] # Multi-fragment if message_id is None: message_id = struct.unpack("!I", os.urandom(4))[0] blocks = [] remaining = msg frag_num = 0 # First fragment max_first_payload = cls.BLOCK_SIZE - min_overhead - first_instr_size_frag first_payload = remaining[:max_first_payload] remaining = remaining[max_first_payload:] block = cls._build_first_block( first_payload, delivery_type, fragmented=True, message_id=message_id, target_tunnel_id=target_tunnel_id, target_router_hash=target_router_hash, ) blocks.append(block) frag_num += 1 # Follow-on fragments # Follow-on overhead: 1 (control) + 4 (message_id) + 2 (size) = 7 followon_overhead = 7 max_followon_payload = cls.BLOCK_SIZE - min_overhead - followon_overhead while remaining: chunk = remaining[:max_followon_payload] remaining = remaining[max_followon_payload:] is_last = len(remaining) == 0 block = cls._build_followon_block( chunk, message_id, frag_num, is_last, ) blocks.append(block) frag_num += 1 return blocks @classmethod def _first_instruction_size(cls, delivery_type: int, fragmented: bool) -> int: """Calculate instruction header size for a first fragment.""" size = 1 # control byte if delivery_type == 1: # TUNNEL size += 4 + 32 # tunnel_id + router_hash elif delivery_type == 2: # ROUTER size += 32 # router_hash if fragmented: size += 4 # message_id size += 2 # payload size return size @classmethod def _build_first_block( cls, payload: bytes, delivery_type: int, fragmented: bool, message_id: int | None, target_tunnel_id: int | None, target_router_hash: bytes | None, ) -> bytes: """Build a 1024-byte block for a first (or only) fragment.""" # Build instruction + payload control = (delivery_type & 0x03) << 1 if fragmented: control |= 0x10 # fragmented flag parts = [bytes([control])] if delivery_type == 1: # TUNNEL assert target_tunnel_id is not None and target_router_hash is not None parts.append(struct.pack("!I", target_tunnel_id)) parts.append(target_router_hash) elif delivery_type == 2: # ROUTER assert target_router_hash is not None parts.append(target_router_hash) if fragmented: parts.append(struct.pack("!I", message_id)) parts.append(struct.pack("!H", len(payload))) parts.append(payload) fragment_data = b"".join(parts) # Pad to 1024 bytes: random non-zero padding + 0x00 terminator + fragment_data pad_len = cls.BLOCK_SIZE - 1 - len(fragment_data) # 1 for 0x00 terminator padding = cls._random_nonzero(pad_len) return padding + b"\x00" + fragment_data @classmethod def _build_followon_block( cls, payload: bytes, message_id: int, fragment_num: int, is_last: bool, ) -> bytes: """Build a 1024-byte block for a follow-on fragment.""" control = 0x01 # MSB bit 0 = follow-on control |= (fragment_num & 0x3F) << 1 if is_last: control |= 0x80 parts = [ bytes([control]), struct.pack("!I", message_id), struct.pack("!H", len(payload)), payload, ] fragment_data = b"".join(parts) pad_len = cls.BLOCK_SIZE - 1 - len(fragment_data) padding = cls._random_nonzero(pad_len) return padding + b"\x00" + fragment_data @staticmethod def _random_nonzero(length: int) -> bytes: """Generate random bytes with no zero bytes (padding).""" if length <= 0: return b"" result = bytearray(os.urandom(length)) for i in range(length): while result[i] == 0: result[i] = os.urandom(1)[0] return bytes(result)