"""Fragment handler: reassembles I2NP messages from 1024-byte tunnel data blocks. Parses fragment control bytes, tracks partial messages by message_id, and expires incomplete messages after MAX_DEFRAGMENT_TIME_MS. """ from __future__ import annotations import struct import time from dataclasses import dataclass, field @dataclass class ReassembledMessage: """A fully reassembled message with delivery instructions.""" payload: bytes delivery_type: int = 0 tunnel_id: int | None = None router_hash: bytes | None = None @dataclass class FragmentedMessage: """Tracks fragments of a single in-progress message.""" delivery_type: int = 0 tunnel_id: int | None = None router_hash: bytes | None = None fragments: dict[int, bytes] = field(default_factory=dict) total_fragments: int | None = None # set when last fragment received created_ms: float = 0.0 def is_complete(self) -> bool: if self.total_fragments is None: return False return len(self.fragments) == self.total_fragments def reassemble(self) -> bytes: """Concatenate fragments in order.""" assert self.total_fragments is not None return b"".join(self.fragments[i] for i in range(self.total_fragments)) class FragmentHandler: """Receives 1024-byte tunnel data blocks and reassembles fragmented messages.""" MAX_DEFRAGMENT_TIME_MS = 60_000 def __init__(self) -> None: self.pending: dict[int, FragmentedMessage] = {} def receive_block(self, block: bytes) -> ReassembledMessage | None: """Parse a 1024-byte block and return a ReassembledMessage if complete. Returns None if the message is still incomplete (waiting for more fragments). """ if len(block) != 1024: raise ValueError(f"Block must be 1024 bytes, got {len(block)}") # Skip padding: find the 0x00 terminator offset = 0 while offset < len(block) and block[offset] != 0x00: offset += 1 if offset >= len(block): raise ValueError("No padding terminator found") offset += 1 # skip the 0x00 control = block[offset] offset += 1 if control & 0x01: # Follow-on fragment return self._receive_followon(block, offset, control) else: # First (or only) fragment return self._receive_first(block, offset, control) def _receive_first( self, block: bytes, offset: int, control: int ) -> ReassembledMessage | None: delivery_type = (control >> 1) & 0x03 fragmented = bool(control & 0x10) tunnel_id = None router_hash = None # Parse delivery instructions if delivery_type == 1: # TUNNEL tunnel_id = struct.unpack("!I", block[offset:offset + 4])[0] offset += 4 router_hash = block[offset:offset + 32] offset += 32 elif delivery_type == 2: # ROUTER router_hash = block[offset:offset + 32] offset += 32 # delivery_type 0 (LOCAL): no extra fields # Skip delay (unimplemented in I2P, but check flag) if control & 0x08: offset += 1 # 1-byte delay message_id = None if fragmented: message_id = struct.unpack("!I", block[offset:offset + 4])[0] offset += 4 size = struct.unpack("!H", block[offset:offset + 2])[0] offset += 2 payload = block[offset:offset + size] if not fragmented: # Complete single-fragment message return ReassembledMessage( payload=payload, delivery_type=delivery_type, tunnel_id=tunnel_id, router_hash=router_hash, ) # First fragment of multi-fragment message assert message_id is not None frag_msg = self.pending.get(message_id) if frag_msg is None: frag_msg = FragmentedMessage(created_ms=time.time() * 1000) self.pending[message_id] = frag_msg frag_msg.delivery_type = delivery_type frag_msg.tunnel_id = tunnel_id frag_msg.router_hash = router_hash frag_msg.fragments[0] = payload if frag_msg.is_complete(): del self.pending[message_id] return ReassembledMessage( payload=frag_msg.reassemble(), delivery_type=frag_msg.delivery_type, tunnel_id=frag_msg.tunnel_id, router_hash=frag_msg.router_hash, ) return None def _receive_followon( self, block: bytes, offset: int, control: int ) -> ReassembledMessage | None: fragment_num = (control >> 1) & 0x3F is_last = bool(control & 0x80) message_id = struct.unpack("!I", block[offset:offset + 4])[0] offset += 4 size = struct.unpack("!H", block[offset:offset + 2])[0] offset += 2 payload = block[offset:offset + size] frag_msg = self.pending.get(message_id) if frag_msg is None: # First fragment hasn't arrived yet — create placeholder frag_msg = FragmentedMessage(created_ms=time.time() * 1000) self.pending[message_id] = frag_msg frag_msg.fragments[fragment_num] = payload if is_last: frag_msg.total_fragments = fragment_num + 1 if frag_msg.is_complete(): del self.pending[message_id] return ReassembledMessage( payload=frag_msg.reassemble(), delivery_type=frag_msg.delivery_type, tunnel_id=frag_msg.tunnel_id, router_hash=frag_msg.router_hash, ) return None def expire_old(self, now_ms: float) -> int: """Drop incomplete messages older than MAX_DEFRAGMENT_TIME_MS. Return count dropped.""" expired = [ mid for mid, frag in self.pending.items() if now_ms - frag.created_ms > self.MAX_DEFRAGMENT_TIME_MS ] for mid in expired: del self.pending[mid] return len(expired)