"""Tests for SSU2 payload block types.""" import os import struct import time import pytest class TestDateTimeBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import DateTimeBlock, SSU2BlockType, parse_payload ts = int(time.time()) block = DateTimeBlock(timestamp=ts) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.DATETIME assert parsed[0].timestamp == ts class TestRouterInfoBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import RouterInfoBlock, SSU2BlockType, parse_payload ri_data = os.urandom(256) block = RouterInfoBlock(flag=1, router_info_data=ri_data) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.ROUTER_INFO assert parsed[0].flag == 1 assert parsed[0].router_info_data == ri_data class TestI2NPBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import I2NPBlock, SSU2BlockType, parse_payload msg = os.urandom(128) block = I2NPBlock(i2np_data=msg) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.I2NP assert parsed[0].i2np_data == msg class TestFirstFragmentBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import FirstFragmentBlock, SSU2BlockType, parse_payload frag_data = os.urandom(512) block = FirstFragmentBlock(msg_id=0xDEADBEEF, total_fragments=5, fragment_data=frag_data) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.FIRST_FRAGMENT assert parsed[0].msg_id == 0xDEADBEEF assert parsed[0].total_fragments == 5 assert parsed[0].fragment_data == frag_data class TestFollowOnFragmentBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import FollowOnFragmentBlock, SSU2BlockType, parse_payload frag_data = os.urandom(256) block = FollowOnFragmentBlock( msg_id=0x12345678, fragment_num=3, is_last=True, fragment_data=frag_data ) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.FOLLOW_ON_FRAGMENT assert parsed[0].msg_id == 0x12345678 assert parsed[0].fragment_num == 3 assert parsed[0].is_last is True assert parsed[0].fragment_data == frag_data def test_not_last(self): from i2p_transport.ssu2_payload import FollowOnFragmentBlock, parse_payload block = FollowOnFragmentBlock( msg_id=1, fragment_num=2, is_last=False, fragment_data=b"data" ) data = block.to_block() parsed = parse_payload(data) assert parsed[0].is_last is False class TestAckBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import AckBlock, SSU2BlockType, parse_payload ranges = [(5, 2), (3, 0)] block = AckBlock(ack_through=100, ack_count=2, ranges=ranges) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.ACK assert parsed[0].ack_through == 100 assert parsed[0].ack_count == 2 assert parsed[0].ranges == ranges class TestAddressBlock: def test_ipv4(self): from i2p_transport.ssu2_payload import AddressBlock, SSU2BlockType, parse_payload ip = bytes([192, 168, 1, 1]) block = AddressBlock(ip_address=ip, port=8080) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.ADDRESS assert parsed[0].ip_address == ip assert parsed[0].port == 8080 def test_ipv6(self): from i2p_transport.ssu2_payload import AddressBlock, SSU2BlockType, parse_payload ip = bytes(range(16)) # 16-byte IPv6 address block = AddressBlock(ip_address=ip, port=443) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].ip_address == ip assert parsed[0].port == 443 class TestNewTokenBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import NewTokenBlock, SSU2BlockType, parse_payload block = NewTokenBlock(expires=1700000000, token=0xFEDCBA9876543210) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.NEW_TOKEN assert parsed[0].expires == 1700000000 assert parsed[0].token == 0xFEDCBA9876543210 class TestPathChallengeResponseBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import ( PathChallengeBlock, PathResponseBlock, SSU2BlockType, parse_payload ) challenge_data = os.urandom(8) challenge = PathChallengeBlock(challenge_data=challenge_data) response = PathResponseBlock(response_data=challenge_data) data = challenge.to_block() + response.to_block() parsed = parse_payload(data) assert len(parsed) == 2 assert parsed[0].block_type == SSU2BlockType.PATH_CHALLENGE assert parsed[0].challenge_data == challenge_data assert parsed[1].block_type == SSU2BlockType.PATH_RESPONSE assert parsed[1].response_data == challenge_data class TestTerminationBlock: def test_roundtrip(self): from i2p_transport.ssu2_payload import TerminationBlock, SSU2BlockType, parse_payload block = TerminationBlock(reason=3, valid_frames_received=9999) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.TERMINATION assert parsed[0].reason == 3 assert parsed[0].valid_frames_received == 9999 class TestPaddingBlock: def test_padding(self): from i2p_transport.ssu2_payload import PaddingBlock, SSU2BlockType, parse_payload pad = os.urandom(64) block = PaddingBlock(padding=pad) data = block.to_block() parsed = parse_payload(data) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.PADDING assert parsed[0].padding == pad class TestParsePayload: def test_multiple_blocks(self): from i2p_transport.ssu2_payload import ( DateTimeBlock, I2NPBlock, PaddingBlock, build_payload, parse_payload, ) blocks = [ DateTimeBlock(timestamp=1000000), I2NPBlock(i2np_data=b"hello i2np"), PaddingBlock(padding=b"\x00" * 16), ] payload = build_payload(blocks) parsed = parse_payload(payload) assert len(parsed) == 3 assert parsed[0].timestamp == 1000000 assert parsed[1].i2np_data == b"hello i2np" assert parsed[2].padding == b"\x00" * 16 def test_build_payload_roundtrip(self): from i2p_transport.ssu2_payload import ( DateTimeBlock, AckBlock, AddressBlock, TerminationBlock, build_payload, parse_payload, ) blocks = [ DateTimeBlock(timestamp=42), AckBlock(ack_through=50, ack_count=1, ranges=[(10, 5)]), AddressBlock(ip_address=bytes([10, 0, 0, 1]), port=1234), TerminationBlock(reason=0, valid_frames_received=100), ] payload = build_payload(blocks) parsed = parse_payload(payload) assert len(parsed) == 4 assert parsed[0].timestamp == 42 assert parsed[1].ack_through == 50 assert parsed[2].ip_address == bytes([10, 0, 0, 1]) assert parsed[3].valid_frames_received == 100 def test_empty_payload(self): from i2p_transport.ssu2_payload import parse_payload parsed = parse_payload(b"") assert parsed == [] def test_options_block_roundtrip(self): from i2p_transport.ssu2_payload import OptionsBlock, parse_payload opts = os.urandom(32) block = OptionsBlock(options=opts) parsed = parse_payload(block.to_block()) assert parsed[0].options == opts def test_relay_tag_request_roundtrip(self): from i2p_transport.ssu2_payload import RelayTagRequestBlock, SSU2BlockType, parse_payload block = RelayTagRequestBlock() parsed = parse_payload(block.to_block()) assert len(parsed) == 1 assert parsed[0].block_type == SSU2BlockType.RELAY_TAG_REQUEST def test_relay_tag_roundtrip(self): from i2p_transport.ssu2_payload import RelayTagBlock, parse_payload block = RelayTagBlock(relay_tag=0xCAFEBABE) parsed = parse_payload(block.to_block()) assert parsed[0].relay_tag == 0xCAFEBABE def test_intro_key_roundtrip(self): from i2p_transport.ssu2_payload import IntroKeyBlock, parse_payload key = os.urandom(32) block = IntroKeyBlock(intro_key=key) parsed = parse_payload(block.to_block()) assert parsed[0].intro_key == key def test_first_packet_number_roundtrip(self): from i2p_transport.ssu2_payload import FirstPacketNumberBlock, parse_payload block = FirstPacketNumberBlock(packet_number=42) parsed = parse_payload(block.to_block()) assert parsed[0].packet_number == 42 def test_congestion_roundtrip(self): from i2p_transport.ssu2_payload import CongestionBlock, parse_payload cdata = os.urandom(8) block = CongestionBlock(congestion_data=cdata) parsed = parse_payload(block.to_block()) assert parsed[0].congestion_data == cdata