"""Tier 4 protocol gap tests: FragmentHandler + FragmentBuilder. Tests tunnel message fragmentation and reassembly per I2P spec. 1024-byte tunnel data blocks, multi-fragment messages, expiry. """ import os import time import pytest from i2p_tunnel.fragment_handler import FragmentHandler, FragmentedMessage from i2p_tunnel.fragment_builder import FragmentBuilder # -- Constants -- class TestConstants: def test_block_size(self): assert FragmentBuilder.BLOCK_SIZE == 1024 def test_delivery_types(self): assert FragmentBuilder.DELIVERY_LOCAL == 0 assert FragmentBuilder.DELIVERY_TUNNEL == 1 assert FragmentBuilder.DELIVERY_ROUTER == 2 def test_max_defragment_time(self): assert FragmentHandler.MAX_DEFRAGMENT_TIME_MS == 60_000 # -- FragmentBuilder: single-fragment messages -- class TestBuildSingleFragment: def test_small_message_single_block(self): """Message < max payload -> single 1024-byte block.""" msg = os.urandom(200) blocks = FragmentBuilder.build(msg, delivery_type=0) assert len(blocks) == 1 assert len(blocks[0]) == 1024 def test_single_fragment_control_byte(self): """Single-fragment control byte: MSB=0, not fragmented.""" msg = os.urandom(100) blocks = FragmentBuilder.build(msg, delivery_type=0) block = blocks[0] # Find first non-zero byte after padding area, preceded by 0x00 terminator # Block format: padding + 0x00 + control + size(2) + payload # For local delivery, no fragmented flag # Control byte should have delivery_type=0, fragmented=0 # Just verify it's a single block assert len(blocks) == 1 def test_local_delivery(self): """Local delivery (type 0) roundtrip.""" msg = os.urandom(200) blocks = FragmentBuilder.build(msg, delivery_type=0) handler = FragmentHandler() results = [] for block in blocks: result = handler.receive_block(block) if result is not None: results.append(result) assert len(results) == 1 assert results[0].payload == msg def test_tunnel_delivery(self): """Tunnel delivery (type 1) with tunnel_id and router_hash.""" msg = os.urandom(150) router_hash = os.urandom(32) blocks = FragmentBuilder.build( msg, delivery_type=1, target_tunnel_id=0xDEADBEEF, target_router_hash=router_hash, ) handler = FragmentHandler() result = handler.receive_block(blocks[0]) assert result is not None assert result.payload == msg assert result.delivery_type == 1 assert result.tunnel_id == 0xDEADBEEF assert result.router_hash == router_hash def test_router_delivery(self): """Router delivery (type 2) with router_hash.""" msg = os.urandom(150) router_hash = os.urandom(32) blocks = FragmentBuilder.build( msg, delivery_type=2, target_router_hash=router_hash, ) handler = FragmentHandler() result = handler.receive_block(blocks[0]) assert result is not None assert result.payload == msg assert result.delivery_type == 2 assert result.router_hash == router_hash # -- FragmentBuilder: multi-fragment messages -- class TestBuildMultiFragment: def test_large_message_multiple_blocks(self): """Message > max single payload -> multiple 1024-byte blocks.""" msg = os.urandom(3000) blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=0x12345678) assert len(blocks) > 1 for block in blocks: assert len(block) == 1024 def test_all_blocks_1024_bytes(self): """Every block must be exactly 1024 bytes.""" for size in [500, 1000, 2000, 5000, 10000]: msg = os.urandom(size) blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=1) for i, block in enumerate(blocks): assert len(block) == 1024, f"Block {i} for msg size {size} is {len(block)} bytes" # -- FragmentHandler: defragmentation -- class TestDefragmentSingle: def test_single_fragment_immediate(self): """Single fragment -> immediate complete message.""" msg = os.urandom(200) blocks = FragmentBuilder.build(msg, delivery_type=0) handler = FragmentHandler() result = handler.receive_block(blocks[0]) assert result is not None assert result.payload == msg class TestDefragmentMulti: def test_multi_fragment_in_order(self): """Multiple fragments in order -> complete message.""" msg = os.urandom(3000) blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=42) handler = FragmentHandler() results = [] for block in blocks: result = handler.receive_block(block) if result is not None: results.append(result) assert len(results) == 1 assert results[0].payload == msg def test_multi_fragment_out_of_order(self): """Fragments arrive out of order -> still reassembles.""" msg = os.urandom(3000) blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=99) assert len(blocks) >= 3 handler = FragmentHandler() # Send last, first, middle reordered = [blocks[-1]] + [blocks[0]] + blocks[1:-1] results = [] for block in reordered: result = handler.receive_block(block) if result is not None: results.append(result) assert len(results) == 1 assert results[0].payload == msg def test_two_interleaved_messages(self): """Two messages interleaved -> both reassemble correctly.""" msg_a = os.urandom(2500) msg_b = os.urandom(2500) blocks_a = FragmentBuilder.build(msg_a, delivery_type=0, message_id=100) blocks_b = FragmentBuilder.build(msg_b, delivery_type=0, message_id=200) handler = FragmentHandler() results = [] # Interleave blocks max_len = max(len(blocks_a), len(blocks_b)) for i in range(max_len): if i < len(blocks_a): r = handler.receive_block(blocks_a[i]) if r: results.append(r) if i < len(blocks_b): r = handler.receive_block(blocks_b[i]) if r: results.append(r) assert len(results) == 2 payloads = {r.payload for r in results} assert msg_a in payloads assert msg_b in payloads class TestDefragmentExpiry: def test_expire_incomplete(self): """Incomplete fragments are cleaned up after timeout.""" msg = os.urandom(3000) blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=77) handler = FragmentHandler() # Send only the first block (incomplete message) handler.receive_block(blocks[0]) assert len(handler.pending) == 1 # Expire with future timestamp dropped = handler.expire_old(time.time() * 1000 + 61_000) assert dropped == 1 assert len(handler.pending) == 0 def test_no_expire_recent(self): """Recent fragments are not expired.""" msg = os.urandom(3000) blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=88) handler = FragmentHandler() handler.receive_block(blocks[0]) # Try to expire at current time (should not expire) dropped = handler.expire_old(time.time() * 1000) assert dropped == 0 assert len(handler.pending) == 1 # -- Roundtrip tests -- class TestRoundtrip: @pytest.mark.parametrize("size", [1, 100, 500, 996, 997, 1500, 3000, 10000]) def test_roundtrip_various_sizes(self, size): """Fragment and defragment messages of various sizes.""" msg = os.urandom(size) blocks = FragmentBuilder.build(msg, delivery_type=0, message_id=size) handler = FragmentHandler() results = [] for block in blocks: result = handler.receive_block(block) if result is not None: results.append(result) assert len(results) == 1 assert results[0].payload == msg def test_roundtrip_with_delivery_info(self): """Delivery info preserved through fragment/defragment cycle.""" msg = os.urandom(2500) router_hash = os.urandom(32) blocks = FragmentBuilder.build( msg, delivery_type=1, target_tunnel_id=0xCAFEBABE, target_router_hash=router_hash, message_id=555, ) handler = FragmentHandler() results = [] for block in blocks: result = handler.receive_block(block) if result is not None: results.append(result) assert len(results) == 1 r = results[0] assert r.payload == msg assert r.delivery_type == 1 assert r.tunnel_id == 0xCAFEBABE assert r.router_hash == router_hash