A Python port of the Invisible Internet Project (I2P)
1"""Fragment builder: splits I2NP messages into 1024-byte tunnel data blocks.
2
3Produces blocks with padding, control bytes, delivery instructions,
4and fragment headers per the I2P tunnel message spec.
5"""
6
7from __future__ import annotations
8
9import os
10import struct
11
12
13class FragmentBuilder:
14 """Builds 1024-byte tunnel data blocks from I2NP messages."""
15
16 BLOCK_SIZE = 1024
17
18 # Delivery type constants
19 DELIVERY_LOCAL = 0
20 DELIVERY_TUNNEL = 1
21 DELIVERY_ROUTER = 2
22
23 @classmethod
24 def build(
25 cls,
26 msg: bytes,
27 delivery_type: int = 0,
28 message_id: int | None = None,
29 target_tunnel_id: int | None = None,
30 target_router_hash: bytes | None = None,
31 ) -> list[bytes]:
32 """Split message into 1024-byte tunnel data blocks.
33
34 Args:
35 msg: The I2NP message payload to fragment.
36 delivery_type: 0=local, 1=tunnel, 2=router.
37 message_id: Required for multi-fragment messages. Auto-generated if None.
38 target_tunnel_id: Required for tunnel delivery.
39 target_router_hash: Required for tunnel/router delivery.
40
41 Returns:
42 List of 1024-byte blocks.
43 """
44 # Calculate instruction overhead for first fragment
45 first_instr_size = cls._first_instruction_size(delivery_type, fragmented=False)
46 first_instr_size_frag = cls._first_instruction_size(delivery_type, fragmented=True)
47
48 # Minimum 1 byte padding + 1 byte terminator
49 min_overhead = 2
50 max_single_payload = cls.BLOCK_SIZE - min_overhead - first_instr_size
51
52 if len(msg) <= max_single_payload:
53 # Single fragment
54 block = cls._build_first_block(
55 msg, delivery_type, fragmented=False, message_id=None,
56 target_tunnel_id=target_tunnel_id,
57 target_router_hash=target_router_hash,
58 )
59 return [block]
60
61 # Multi-fragment
62 if message_id is None:
63 message_id = struct.unpack("!I", os.urandom(4))[0]
64
65 blocks = []
66 remaining = msg
67 frag_num = 0
68
69 # First fragment
70 max_first_payload = cls.BLOCK_SIZE - min_overhead - first_instr_size_frag
71 first_payload = remaining[:max_first_payload]
72 remaining = remaining[max_first_payload:]
73 block = cls._build_first_block(
74 first_payload, delivery_type, fragmented=True, message_id=message_id,
75 target_tunnel_id=target_tunnel_id,
76 target_router_hash=target_router_hash,
77 )
78 blocks.append(block)
79 frag_num += 1
80
81 # Follow-on fragments
82 # Follow-on overhead: 1 (control) + 4 (message_id) + 2 (size) = 7
83 followon_overhead = 7
84 max_followon_payload = cls.BLOCK_SIZE - min_overhead - followon_overhead
85
86 while remaining:
87 chunk = remaining[:max_followon_payload]
88 remaining = remaining[max_followon_payload:]
89 is_last = len(remaining) == 0
90
91 block = cls._build_followon_block(
92 chunk, message_id, frag_num, is_last,
93 )
94 blocks.append(block)
95 frag_num += 1
96
97 return blocks
98
99 @classmethod
100 def _first_instruction_size(cls, delivery_type: int, fragmented: bool) -> int:
101 """Calculate instruction header size for a first fragment."""
102 size = 1 # control byte
103 if delivery_type == 1: # TUNNEL
104 size += 4 + 32 # tunnel_id + router_hash
105 elif delivery_type == 2: # ROUTER
106 size += 32 # router_hash
107 if fragmented:
108 size += 4 # message_id
109 size += 2 # payload size
110 return size
111
112 @classmethod
113 def _build_first_block(
114 cls,
115 payload: bytes,
116 delivery_type: int,
117 fragmented: bool,
118 message_id: int | None,
119 target_tunnel_id: int | None,
120 target_router_hash: bytes | None,
121 ) -> bytes:
122 """Build a 1024-byte block for a first (or only) fragment."""
123 # Build instruction + payload
124 control = (delivery_type & 0x03) << 1
125 if fragmented:
126 control |= 0x10 # fragmented flag
127
128 parts = [bytes([control])]
129
130 if delivery_type == 1: # TUNNEL
131 assert target_tunnel_id is not None and target_router_hash is not None
132 parts.append(struct.pack("!I", target_tunnel_id))
133 parts.append(target_router_hash)
134 elif delivery_type == 2: # ROUTER
135 assert target_router_hash is not None
136 parts.append(target_router_hash)
137
138 if fragmented:
139 parts.append(struct.pack("!I", message_id))
140
141 parts.append(struct.pack("!H", len(payload)))
142 parts.append(payload)
143
144 fragment_data = b"".join(parts)
145
146 # Pad to 1024 bytes: random non-zero padding + 0x00 terminator + fragment_data
147 pad_len = cls.BLOCK_SIZE - 1 - len(fragment_data) # 1 for 0x00 terminator
148 padding = cls._random_nonzero(pad_len)
149
150 return padding + b"\x00" + fragment_data
151
152 @classmethod
153 def _build_followon_block(
154 cls,
155 payload: bytes,
156 message_id: int,
157 fragment_num: int,
158 is_last: bool,
159 ) -> bytes:
160 """Build a 1024-byte block for a follow-on fragment."""
161 control = 0x01 # MSB bit 0 = follow-on
162 control |= (fragment_num & 0x3F) << 1
163 if is_last:
164 control |= 0x80
165
166 parts = [
167 bytes([control]),
168 struct.pack("!I", message_id),
169 struct.pack("!H", len(payload)),
170 payload,
171 ]
172
173 fragment_data = b"".join(parts)
174
175 pad_len = cls.BLOCK_SIZE - 1 - len(fragment_data)
176 padding = cls._random_nonzero(pad_len)
177
178 return padding + b"\x00" + fragment_data
179
180 @staticmethod
181 def _random_nonzero(length: int) -> bytes:
182 """Generate random bytes with no zero bytes (padding)."""
183 if length <= 0:
184 return b""
185 result = bytearray(os.urandom(length))
186 for i in range(length):
187 while result[i] == 0:
188 result[i] = os.urandom(1)[0]
189 return bytes(result)