A Python port of the Invisible Internet Project (I2P)
at main 191 lines 6.2 kB view raw
1"""Fragment handler: reassembles I2NP messages from 1024-byte tunnel data blocks. 2 3Parses fragment control bytes, tracks partial messages by message_id, 4and expires incomplete messages after MAX_DEFRAGMENT_TIME_MS. 5""" 6 7from __future__ import annotations 8 9import struct 10import time 11from dataclasses import dataclass, field 12 13 14@dataclass 15class ReassembledMessage: 16 """A fully reassembled message with delivery instructions.""" 17 18 payload: bytes 19 delivery_type: int = 0 20 tunnel_id: int | None = None 21 router_hash: bytes | None = None 22 23 24@dataclass 25class FragmentedMessage: 26 """Tracks fragments of a single in-progress message.""" 27 28 delivery_type: int = 0 29 tunnel_id: int | None = None 30 router_hash: bytes | None = None 31 fragments: dict[int, bytes] = field(default_factory=dict) 32 total_fragments: int | None = None # set when last fragment received 33 created_ms: float = 0.0 34 35 def is_complete(self) -> bool: 36 if self.total_fragments is None: 37 return False 38 return len(self.fragments) == self.total_fragments 39 40 def reassemble(self) -> bytes: 41 """Concatenate fragments in order.""" 42 assert self.total_fragments is not None 43 return b"".join(self.fragments[i] for i in range(self.total_fragments)) 44 45 46class FragmentHandler: 47 """Receives 1024-byte tunnel data blocks and reassembles fragmented messages.""" 48 49 MAX_DEFRAGMENT_TIME_MS = 60_000 50 51 def __init__(self) -> None: 52 self.pending: dict[int, FragmentedMessage] = {} 53 54 def receive_block(self, block: bytes) -> ReassembledMessage | None: 55 """Parse a 1024-byte block and return a ReassembledMessage if complete. 56 57 Returns None if the message is still incomplete (waiting for more fragments). 58 """ 59 if len(block) != 1024: 60 raise ValueError(f"Block must be 1024 bytes, got {len(block)}") 61 62 # Skip padding: find the 0x00 terminator 63 offset = 0 64 while offset < len(block) and block[offset] != 0x00: 65 offset += 1 66 if offset >= len(block): 67 raise ValueError("No padding terminator found") 68 offset += 1 # skip the 0x00 69 70 control = block[offset] 71 offset += 1 72 73 if control & 0x01: 74 # Follow-on fragment 75 return self._receive_followon(block, offset, control) 76 else: 77 # First (or only) fragment 78 return self._receive_first(block, offset, control) 79 80 def _receive_first( 81 self, block: bytes, offset: int, control: int 82 ) -> ReassembledMessage | None: 83 delivery_type = (control >> 1) & 0x03 84 fragmented = bool(control & 0x10) 85 86 tunnel_id = None 87 router_hash = None 88 89 # Parse delivery instructions 90 if delivery_type == 1: # TUNNEL 91 tunnel_id = struct.unpack("!I", block[offset:offset + 4])[0] 92 offset += 4 93 router_hash = block[offset:offset + 32] 94 offset += 32 95 elif delivery_type == 2: # ROUTER 96 router_hash = block[offset:offset + 32] 97 offset += 32 98 # delivery_type 0 (LOCAL): no extra fields 99 100 # Skip delay (unimplemented in I2P, but check flag) 101 if control & 0x08: 102 offset += 1 # 1-byte delay 103 104 message_id = None 105 if fragmented: 106 message_id = struct.unpack("!I", block[offset:offset + 4])[0] 107 offset += 4 108 109 size = struct.unpack("!H", block[offset:offset + 2])[0] 110 offset += 2 111 112 payload = block[offset:offset + size] 113 114 if not fragmented: 115 # Complete single-fragment message 116 return ReassembledMessage( 117 payload=payload, 118 delivery_type=delivery_type, 119 tunnel_id=tunnel_id, 120 router_hash=router_hash, 121 ) 122 123 # First fragment of multi-fragment message 124 assert message_id is not None 125 frag_msg = self.pending.get(message_id) 126 if frag_msg is None: 127 frag_msg = FragmentedMessage(created_ms=time.time() * 1000) 128 self.pending[message_id] = frag_msg 129 130 frag_msg.delivery_type = delivery_type 131 frag_msg.tunnel_id = tunnel_id 132 frag_msg.router_hash = router_hash 133 frag_msg.fragments[0] = payload 134 135 if frag_msg.is_complete(): 136 del self.pending[message_id] 137 return ReassembledMessage( 138 payload=frag_msg.reassemble(), 139 delivery_type=frag_msg.delivery_type, 140 tunnel_id=frag_msg.tunnel_id, 141 router_hash=frag_msg.router_hash, 142 ) 143 144 return None 145 146 def _receive_followon( 147 self, block: bytes, offset: int, control: int 148 ) -> ReassembledMessage | None: 149 fragment_num = (control >> 1) & 0x3F 150 is_last = bool(control & 0x80) 151 152 message_id = struct.unpack("!I", block[offset:offset + 4])[0] 153 offset += 4 154 155 size = struct.unpack("!H", block[offset:offset + 2])[0] 156 offset += 2 157 158 payload = block[offset:offset + size] 159 160 frag_msg = self.pending.get(message_id) 161 if frag_msg is None: 162 # First fragment hasn't arrived yet — create placeholder 163 frag_msg = FragmentedMessage(created_ms=time.time() * 1000) 164 self.pending[message_id] = frag_msg 165 166 frag_msg.fragments[fragment_num] = payload 167 168 if is_last: 169 frag_msg.total_fragments = fragment_num + 1 170 171 if frag_msg.is_complete(): 172 del self.pending[message_id] 173 return ReassembledMessage( 174 payload=frag_msg.reassemble(), 175 delivery_type=frag_msg.delivery_type, 176 tunnel_id=frag_msg.tunnel_id, 177 router_hash=frag_msg.router_hash, 178 ) 179 180 return None 181 182 def expire_old(self, now_ms: float) -> int: 183 """Drop incomplete messages older than MAX_DEFRAGMENT_TIME_MS. Return count dropped.""" 184 expired = [ 185 mid 186 for mid, frag in self.pending.items() 187 if now_ms - frag.created_ms > self.MAX_DEFRAGMENT_TIME_MS 188 ] 189 for mid in expired: 190 del self.pending[mid] 191 return len(expired)