A Python port of the Invisible Internet Project (I2P)
at main 259 lines 9.3 kB view raw
1"""Tier 4 protocol gap tests: FragmentHandler + FragmentBuilder. 2 3Tests tunnel message fragmentation and reassembly per I2P spec. 41024-byte tunnel data blocks, multi-fragment messages, expiry. 5""" 6 7import os 8import time 9 10import pytest 11 12from i2p_tunnel.fragment_handler import FragmentHandler, FragmentedMessage 13from i2p_tunnel.fragment_builder import FragmentBuilder 14 15 16# -- Constants -- 17 18class TestConstants: 19 def test_block_size(self): 20 assert FragmentBuilder.BLOCK_SIZE == 1024 21 22 def test_delivery_types(self): 23 assert FragmentBuilder.DELIVERY_LOCAL == 0 24 assert FragmentBuilder.DELIVERY_TUNNEL == 1 25 assert FragmentBuilder.DELIVERY_ROUTER == 2 26 27 def test_max_defragment_time(self): 28 assert FragmentHandler.MAX_DEFRAGMENT_TIME_MS == 60_000 29 30 31# -- FragmentBuilder: single-fragment messages -- 32 33class TestBuildSingleFragment: 34 def test_small_message_single_block(self): 35 """Message < max payload -> single 1024-byte block.""" 36 msg = os.urandom(200) 37 blocks = FragmentBuilder.build(msg, delivery_type=0) 38 assert len(blocks) == 1 39 assert len(blocks[0]) == 1024 40 41 def test_single_fragment_control_byte(self): 42 """Single-fragment control byte: MSB=0, not fragmented.""" 43 msg = os.urandom(100) 44 blocks = FragmentBuilder.build(msg, delivery_type=0) 45 block = blocks[0] 46 # Find first non-zero byte after padding area, preceded by 0x00 terminator 47 # Block format: padding + 0x00 + control + size(2) + payload 48 # For local delivery, no fragmented flag 49 # Control byte should have delivery_type=0, fragmented=0 50 # Just verify it's a single block 51 assert len(blocks) == 1 52 53 def test_local_delivery(self): 54 """Local delivery (type 0) roundtrip.""" 55 msg = os.urandom(200) 56 blocks = FragmentBuilder.build(msg, delivery_type=0) 57 handler = FragmentHandler() 58 results = [] 59 for block in blocks: 60 result = handler.receive_block(block) 61 if result is not None: 62 results.append(result) 63 assert len(results) == 1 64 assert results[0].payload == msg 65 66 def test_tunnel_delivery(self): 67 """Tunnel delivery (type 1) with tunnel_id and router_hash.""" 68 msg = os.urandom(150) 69 router_hash = os.urandom(32) 70 blocks = FragmentBuilder.build( 71 msg, 72 delivery_type=1, 73 target_tunnel_id=0xDEADBEEF, 74 target_router_hash=router_hash, 75 ) 76 handler = FragmentHandler() 77 result = handler.receive_block(blocks[0]) 78 assert result is not None 79 assert result.payload == msg 80 assert result.delivery_type == 1 81 assert result.tunnel_id == 0xDEADBEEF 82 assert result.router_hash == router_hash 83 84 def test_router_delivery(self): 85 """Router delivery (type 2) with router_hash.""" 86 msg = os.urandom(150) 87 router_hash = os.urandom(32) 88 blocks = FragmentBuilder.build( 89 msg, 90 delivery_type=2, 91 target_router_hash=router_hash, 92 ) 93 handler = FragmentHandler() 94 result = handler.receive_block(blocks[0]) 95 assert result is not None 96 assert result.payload == msg 97 assert result.delivery_type == 2 98 assert result.router_hash == router_hash 99 100 101# -- FragmentBuilder: multi-fragment messages -- 102 103class TestBuildMultiFragment: 104 def test_large_message_multiple_blocks(self): 105 """Message > max single payload -> multiple 1024-byte blocks.""" 106 msg = os.urandom(3000) 107 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=0x12345678) 108 assert len(blocks) > 1 109 for block in blocks: 110 assert len(block) == 1024 111 112 def test_all_blocks_1024_bytes(self): 113 """Every block must be exactly 1024 bytes.""" 114 for size in [500, 1000, 2000, 5000, 10000]: 115 msg = os.urandom(size) 116 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=1) 117 for i, block in enumerate(blocks): 118 assert len(block) == 1024, f"Block {i} for msg size {size} is {len(block)} bytes" 119 120 121# -- FragmentHandler: defragmentation -- 122 123class TestDefragmentSingle: 124 def test_single_fragment_immediate(self): 125 """Single fragment -> immediate complete message.""" 126 msg = os.urandom(200) 127 blocks = FragmentBuilder.build(msg, delivery_type=0) 128 handler = FragmentHandler() 129 result = handler.receive_block(blocks[0]) 130 assert result is not None 131 assert result.payload == msg 132 133 134class TestDefragmentMulti: 135 def test_multi_fragment_in_order(self): 136 """Multiple fragments in order -> complete message.""" 137 msg = os.urandom(3000) 138 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=42) 139 handler = FragmentHandler() 140 results = [] 141 for block in blocks: 142 result = handler.receive_block(block) 143 if result is not None: 144 results.append(result) 145 assert len(results) == 1 146 assert results[0].payload == msg 147 148 def test_multi_fragment_out_of_order(self): 149 """Fragments arrive out of order -> still reassembles.""" 150 msg = os.urandom(3000) 151 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=99) 152 assert len(blocks) >= 3 153 handler = FragmentHandler() 154 155 # Send last, first, middle 156 reordered = [blocks[-1]] + [blocks[0]] + blocks[1:-1] 157 results = [] 158 for block in reordered: 159 result = handler.receive_block(block) 160 if result is not None: 161 results.append(result) 162 assert len(results) == 1 163 assert results[0].payload == msg 164 165 def test_two_interleaved_messages(self): 166 """Two messages interleaved -> both reassemble correctly.""" 167 msg_a = os.urandom(2500) 168 msg_b = os.urandom(2500) 169 blocks_a = FragmentBuilder.build(msg_a, delivery_type=0, message_id=100) 170 blocks_b = FragmentBuilder.build(msg_b, delivery_type=0, message_id=200) 171 172 handler = FragmentHandler() 173 results = [] 174 # Interleave blocks 175 max_len = max(len(blocks_a), len(blocks_b)) 176 for i in range(max_len): 177 if i < len(blocks_a): 178 r = handler.receive_block(blocks_a[i]) 179 if r: 180 results.append(r) 181 if i < len(blocks_b): 182 r = handler.receive_block(blocks_b[i]) 183 if r: 184 results.append(r) 185 assert len(results) == 2 186 payloads = {r.payload for r in results} 187 assert msg_a in payloads 188 assert msg_b in payloads 189 190 191class TestDefragmentExpiry: 192 def test_expire_incomplete(self): 193 """Incomplete fragments are cleaned up after timeout.""" 194 msg = os.urandom(3000) 195 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=77) 196 handler = FragmentHandler() 197 198 # Send only the first block (incomplete message) 199 handler.receive_block(blocks[0]) 200 assert len(handler.pending) == 1 201 202 # Expire with future timestamp 203 dropped = handler.expire_old(time.time() * 1000 + 61_000) 204 assert dropped == 1 205 assert len(handler.pending) == 0 206 207 def test_no_expire_recent(self): 208 """Recent fragments are not expired.""" 209 msg = os.urandom(3000) 210 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=88) 211 handler = FragmentHandler() 212 handler.receive_block(blocks[0]) 213 214 # Try to expire at current time (should not expire) 215 dropped = handler.expire_old(time.time() * 1000) 216 assert dropped == 0 217 assert len(handler.pending) == 1 218 219 220# -- Roundtrip tests -- 221 222class TestRoundtrip: 223 @pytest.mark.parametrize("size", [1, 100, 500, 996, 997, 1500, 3000, 10000]) 224 def test_roundtrip_various_sizes(self, size): 225 """Fragment and defragment messages of various sizes.""" 226 msg = os.urandom(size) 227 blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=size) 228 handler = FragmentHandler() 229 results = [] 230 for block in blocks: 231 result = handler.receive_block(block) 232 if result is not None: 233 results.append(result) 234 assert len(results) == 1 235 assert results[0].payload == msg 236 237 def test_roundtrip_with_delivery_info(self): 238 """Delivery info preserved through fragment/defragment cycle.""" 239 msg = os.urandom(2500) 240 router_hash = os.urandom(32) 241 blocks = FragmentBuilder.build( 242 msg, 243 delivery_type=1, 244 target_tunnel_id=0xCAFEBABE, 245 target_router_hash=router_hash, 246 message_id=555, 247 ) 248 handler = FragmentHandler() 249 results = [] 250 for block in blocks: 251 result = handler.receive_block(block) 252 if result is not None: 253 results.append(result) 254 assert len(results) == 1 255 r = results[0] 256 assert r.payload == msg 257 assert r.delivery_type == 1 258 assert r.tunnel_id == 0xCAFEBABE 259 assert r.router_hash == router_hash