A Python port of the Invisible Internet Project (I2P)
1"""Fragment handler: reassembles I2NP messages from 1024-byte tunnel data blocks.
2
3Parses fragment control bytes, tracks partial messages by message_id,
4and expires incomplete messages after MAX_DEFRAGMENT_TIME_MS.
5"""
6
7from __future__ import annotations
8
9import struct
10import time
11from dataclasses import dataclass, field
12
13
14@dataclass
15class ReassembledMessage:
16 """A fully reassembled message with delivery instructions."""
17
18 payload: bytes
19 delivery_type: int = 0
20 tunnel_id: int | None = None
21 router_hash: bytes | None = None
22
23
24@dataclass
25class FragmentedMessage:
26 """Tracks fragments of a single in-progress message."""
27
28 delivery_type: int = 0
29 tunnel_id: int | None = None
30 router_hash: bytes | None = None
31 fragments: dict[int, bytes] = field(default_factory=dict)
32 total_fragments: int | None = None # set when last fragment received
33 created_ms: float = 0.0
34
35 def is_complete(self) -> bool:
36 if self.total_fragments is None:
37 return False
38 return len(self.fragments) == self.total_fragments
39
40 def reassemble(self) -> bytes:
41 """Concatenate fragments in order."""
42 assert self.total_fragments is not None
43 return b"".join(self.fragments[i] for i in range(self.total_fragments))
44
45
46class FragmentHandler:
47 """Receives 1024-byte tunnel data blocks and reassembles fragmented messages."""
48
49 MAX_DEFRAGMENT_TIME_MS = 60_000
50
51 def __init__(self) -> None:
52 self.pending: dict[int, FragmentedMessage] = {}
53
54 def receive_block(self, block: bytes) -> ReassembledMessage | None:
55 """Parse a 1024-byte block and return a ReassembledMessage if complete.
56
57 Returns None if the message is still incomplete (waiting for more fragments).
58 """
59 if len(block) != 1024:
60 raise ValueError(f"Block must be 1024 bytes, got {len(block)}")
61
62 # Skip padding: find the 0x00 terminator
63 offset = 0
64 while offset < len(block) and block[offset] != 0x00:
65 offset += 1
66 if offset >= len(block):
67 raise ValueError("No padding terminator found")
68 offset += 1 # skip the 0x00
69
70 control = block[offset]
71 offset += 1
72
73 if control & 0x01:
74 # Follow-on fragment
75 return self._receive_followon(block, offset, control)
76 else:
77 # First (or only) fragment
78 return self._receive_first(block, offset, control)
79
80 def _receive_first(
81 self, block: bytes, offset: int, control: int
82 ) -> ReassembledMessage | None:
83 delivery_type = (control >> 1) & 0x03
84 fragmented = bool(control & 0x10)
85
86 tunnel_id = None
87 router_hash = None
88
89 # Parse delivery instructions
90 if delivery_type == 1: # TUNNEL
91 tunnel_id = struct.unpack("!I", block[offset:offset + 4])[0]
92 offset += 4
93 router_hash = block[offset:offset + 32]
94 offset += 32
95 elif delivery_type == 2: # ROUTER
96 router_hash = block[offset:offset + 32]
97 offset += 32
98 # delivery_type 0 (LOCAL): no extra fields
99
100 # Skip delay (unimplemented in I2P, but check flag)
101 if control & 0x08:
102 offset += 1 # 1-byte delay
103
104 message_id = None
105 if fragmented:
106 message_id = struct.unpack("!I", block[offset:offset + 4])[0]
107 offset += 4
108
109 size = struct.unpack("!H", block[offset:offset + 2])[0]
110 offset += 2
111
112 payload = block[offset:offset + size]
113
114 if not fragmented:
115 # Complete single-fragment message
116 return ReassembledMessage(
117 payload=payload,
118 delivery_type=delivery_type,
119 tunnel_id=tunnel_id,
120 router_hash=router_hash,
121 )
122
123 # First fragment of multi-fragment message
124 assert message_id is not None
125 frag_msg = self.pending.get(message_id)
126 if frag_msg is None:
127 frag_msg = FragmentedMessage(created_ms=time.time() * 1000)
128 self.pending[message_id] = frag_msg
129
130 frag_msg.delivery_type = delivery_type
131 frag_msg.tunnel_id = tunnel_id
132 frag_msg.router_hash = router_hash
133 frag_msg.fragments[0] = payload
134
135 if frag_msg.is_complete():
136 del self.pending[message_id]
137 return ReassembledMessage(
138 payload=frag_msg.reassemble(),
139 delivery_type=frag_msg.delivery_type,
140 tunnel_id=frag_msg.tunnel_id,
141 router_hash=frag_msg.router_hash,
142 )
143
144 return None
145
146 def _receive_followon(
147 self, block: bytes, offset: int, control: int
148 ) -> ReassembledMessage | None:
149 fragment_num = (control >> 1) & 0x3F
150 is_last = bool(control & 0x80)
151
152 message_id = struct.unpack("!I", block[offset:offset + 4])[0]
153 offset += 4
154
155 size = struct.unpack("!H", block[offset:offset + 2])[0]
156 offset += 2
157
158 payload = block[offset:offset + size]
159
160 frag_msg = self.pending.get(message_id)
161 if frag_msg is None:
162 # First fragment hasn't arrived yet — create placeholder
163 frag_msg = FragmentedMessage(created_ms=time.time() * 1000)
164 self.pending[message_id] = frag_msg
165
166 frag_msg.fragments[fragment_num] = payload
167
168 if is_last:
169 frag_msg.total_fragments = fragment_num + 1
170
171 if frag_msg.is_complete():
172 del self.pending[message_id]
173 return ReassembledMessage(
174 payload=frag_msg.reassemble(),
175 delivery_type=frag_msg.delivery_type,
176 tunnel_id=frag_msg.tunnel_id,
177 router_hash=frag_msg.router_hash,
178 )
179
180 return None
181
182 def expire_old(self, now_ms: float) -> int:
183 """Drop incomplete messages older than MAX_DEFRAGMENT_TIME_MS. Return count dropped."""
184 expired = [
185 mid
186 for mid, frag in self.pending.items()
187 if now_ms - frag.created_ms > self.MAX_DEFRAGMENT_TIME_MS
188 ]
189 for mid in expired:
190 del self.pending[mid]
191 return len(expired)