A Python port of the Invisible Internet Project (I2P)
at main 293 lines 10 kB view raw
1"""Tests for SSU2 payload block types.""" 2 3import os 4import struct 5import time 6 7import pytest 8 9 10class TestDateTimeBlock: 11 def test_roundtrip(self): 12 from i2p_transport.ssu2_payload import DateTimeBlock, SSU2BlockType, parse_payload 13 14 ts = int(time.time()) 15 block = DateTimeBlock(timestamp=ts) 16 data = block.to_block() 17 18 parsed = parse_payload(data) 19 assert len(parsed) == 1 20 assert parsed[0].block_type == SSU2BlockType.DATETIME 21 assert parsed[0].timestamp == ts 22 23 24class TestRouterInfoBlock: 25 def test_roundtrip(self): 26 from i2p_transport.ssu2_payload import RouterInfoBlock, SSU2BlockType, parse_payload 27 28 ri_data = os.urandom(256) 29 block = RouterInfoBlock(flag=1, router_info_data=ri_data) 30 data = block.to_block() 31 32 parsed = parse_payload(data) 33 assert len(parsed) == 1 34 assert parsed[0].block_type == SSU2BlockType.ROUTER_INFO 35 assert parsed[0].flag == 1 36 assert parsed[0].router_info_data == ri_data 37 38 39class TestI2NPBlock: 40 def test_roundtrip(self): 41 from i2p_transport.ssu2_payload import I2NPBlock, SSU2BlockType, parse_payload 42 43 msg = os.urandom(128) 44 block = I2NPBlock(i2np_data=msg) 45 data = block.to_block() 46 47 parsed = parse_payload(data) 48 assert len(parsed) == 1 49 assert parsed[0].block_type == SSU2BlockType.I2NP 50 assert parsed[0].i2np_data == msg 51 52 53class TestFirstFragmentBlock: 54 def test_roundtrip(self): 55 from i2p_transport.ssu2_payload import FirstFragmentBlock, SSU2BlockType, parse_payload 56 57 frag_data = os.urandom(512) 58 block = FirstFragmentBlock(msg_id=0xDEADBEEF, total_fragments=5, fragment_data=frag_data) 59 data = block.to_block() 60 61 parsed = parse_payload(data) 62 assert len(parsed) == 1 63 assert parsed[0].block_type == SSU2BlockType.FIRST_FRAGMENT 64 assert parsed[0].msg_id == 0xDEADBEEF 65 assert parsed[0].total_fragments == 5 66 assert parsed[0].fragment_data == frag_data 67 68 69class TestFollowOnFragmentBlock: 70 def test_roundtrip(self): 71 from i2p_transport.ssu2_payload import FollowOnFragmentBlock, SSU2BlockType, parse_payload 72 73 frag_data = os.urandom(256) 74 block = FollowOnFragmentBlock( 75 msg_id=0x12345678, fragment_num=3, is_last=True, fragment_data=frag_data 76 ) 77 data = block.to_block() 78 79 parsed = parse_payload(data) 80 assert len(parsed) == 1 81 assert parsed[0].block_type == SSU2BlockType.FOLLOW_ON_FRAGMENT 82 assert parsed[0].msg_id == 0x12345678 83 assert parsed[0].fragment_num == 3 84 assert parsed[0].is_last is True 85 assert parsed[0].fragment_data == frag_data 86 87 def test_not_last(self): 88 from i2p_transport.ssu2_payload import FollowOnFragmentBlock, parse_payload 89 90 block = FollowOnFragmentBlock( 91 msg_id=1, fragment_num=2, is_last=False, fragment_data=b"data" 92 ) 93 data = block.to_block() 94 parsed = parse_payload(data) 95 assert parsed[0].is_last is False 96 97 98class TestAckBlock: 99 def test_roundtrip(self): 100 from i2p_transport.ssu2_payload import AckBlock, SSU2BlockType, parse_payload 101 102 ranges = [(5, 2), (3, 0)] 103 block = AckBlock(ack_through=100, ack_count=2, ranges=ranges) 104 data = block.to_block() 105 106 parsed = parse_payload(data) 107 assert len(parsed) == 1 108 assert parsed[0].block_type == SSU2BlockType.ACK 109 assert parsed[0].ack_through == 100 110 assert parsed[0].ack_count == 2 111 assert parsed[0].ranges == ranges 112 113 114class TestAddressBlock: 115 def test_ipv4(self): 116 from i2p_transport.ssu2_payload import AddressBlock, SSU2BlockType, parse_payload 117 118 ip = bytes([192, 168, 1, 1]) 119 block = AddressBlock(ip_address=ip, port=8080) 120 data = block.to_block() 121 122 parsed = parse_payload(data) 123 assert len(parsed) == 1 124 assert parsed[0].block_type == SSU2BlockType.ADDRESS 125 assert parsed[0].ip_address == ip 126 assert parsed[0].port == 8080 127 128 def test_ipv6(self): 129 from i2p_transport.ssu2_payload import AddressBlock, SSU2BlockType, parse_payload 130 131 ip = bytes(range(16)) # 16-byte IPv6 address 132 block = AddressBlock(ip_address=ip, port=443) 133 data = block.to_block() 134 135 parsed = parse_payload(data) 136 assert len(parsed) == 1 137 assert parsed[0].ip_address == ip 138 assert parsed[0].port == 443 139 140 141class TestNewTokenBlock: 142 def test_roundtrip(self): 143 from i2p_transport.ssu2_payload import NewTokenBlock, SSU2BlockType, parse_payload 144 145 block = NewTokenBlock(expires=1700000000, token=0xFEDCBA9876543210) 146 data = block.to_block() 147 148 parsed = parse_payload(data) 149 assert len(parsed) == 1 150 assert parsed[0].block_type == SSU2BlockType.NEW_TOKEN 151 assert parsed[0].expires == 1700000000 152 assert parsed[0].token == 0xFEDCBA9876543210 153 154 155class TestPathChallengeResponseBlock: 156 def test_roundtrip(self): 157 from i2p_transport.ssu2_payload import ( 158 PathChallengeBlock, PathResponseBlock, SSU2BlockType, parse_payload 159 ) 160 161 challenge_data = os.urandom(8) 162 challenge = PathChallengeBlock(challenge_data=challenge_data) 163 response = PathResponseBlock(response_data=challenge_data) 164 165 data = challenge.to_block() + response.to_block() 166 parsed = parse_payload(data) 167 assert len(parsed) == 2 168 assert parsed[0].block_type == SSU2BlockType.PATH_CHALLENGE 169 assert parsed[0].challenge_data == challenge_data 170 assert parsed[1].block_type == SSU2BlockType.PATH_RESPONSE 171 assert parsed[1].response_data == challenge_data 172 173 174class TestTerminationBlock: 175 def test_roundtrip(self): 176 from i2p_transport.ssu2_payload import TerminationBlock, SSU2BlockType, parse_payload 177 178 block = TerminationBlock(reason=3, valid_frames_received=9999) 179 data = block.to_block() 180 181 parsed = parse_payload(data) 182 assert len(parsed) == 1 183 assert parsed[0].block_type == SSU2BlockType.TERMINATION 184 assert parsed[0].reason == 3 185 assert parsed[0].valid_frames_received == 9999 186 187 188class TestPaddingBlock: 189 def test_padding(self): 190 from i2p_transport.ssu2_payload import PaddingBlock, SSU2BlockType, parse_payload 191 192 pad = os.urandom(64) 193 block = PaddingBlock(padding=pad) 194 data = block.to_block() 195 196 parsed = parse_payload(data) 197 assert len(parsed) == 1 198 assert parsed[0].block_type == SSU2BlockType.PADDING 199 assert parsed[0].padding == pad 200 201 202class TestParsePayload: 203 def test_multiple_blocks(self): 204 from i2p_transport.ssu2_payload import ( 205 DateTimeBlock, I2NPBlock, PaddingBlock, 206 build_payload, parse_payload, 207 ) 208 209 blocks = [ 210 DateTimeBlock(timestamp=1000000), 211 I2NPBlock(i2np_data=b"hello i2np"), 212 PaddingBlock(padding=b"\x00" * 16), 213 ] 214 payload = build_payload(blocks) 215 parsed = parse_payload(payload) 216 217 assert len(parsed) == 3 218 assert parsed[0].timestamp == 1000000 219 assert parsed[1].i2np_data == b"hello i2np" 220 assert parsed[2].padding == b"\x00" * 16 221 222 def test_build_payload_roundtrip(self): 223 from i2p_transport.ssu2_payload import ( 224 DateTimeBlock, AckBlock, AddressBlock, TerminationBlock, 225 build_payload, parse_payload, 226 ) 227 228 blocks = [ 229 DateTimeBlock(timestamp=42), 230 AckBlock(ack_through=50, ack_count=1, ranges=[(10, 5)]), 231 AddressBlock(ip_address=bytes([10, 0, 0, 1]), port=1234), 232 TerminationBlock(reason=0, valid_frames_received=100), 233 ] 234 payload = build_payload(blocks) 235 parsed = parse_payload(payload) 236 237 assert len(parsed) == 4 238 assert parsed[0].timestamp == 42 239 assert parsed[1].ack_through == 50 240 assert parsed[2].ip_address == bytes([10, 0, 0, 1]) 241 assert parsed[3].valid_frames_received == 100 242 243 def test_empty_payload(self): 244 from i2p_transport.ssu2_payload import parse_payload 245 246 parsed = parse_payload(b"") 247 assert parsed == [] 248 249 def test_options_block_roundtrip(self): 250 from i2p_transport.ssu2_payload import OptionsBlock, parse_payload 251 252 opts = os.urandom(32) 253 block = OptionsBlock(options=opts) 254 parsed = parse_payload(block.to_block()) 255 assert parsed[0].options == opts 256 257 def test_relay_tag_request_roundtrip(self): 258 from i2p_transport.ssu2_payload import RelayTagRequestBlock, SSU2BlockType, parse_payload 259 260 block = RelayTagRequestBlock() 261 parsed = parse_payload(block.to_block()) 262 assert len(parsed) == 1 263 assert parsed[0].block_type == SSU2BlockType.RELAY_TAG_REQUEST 264 265 def test_relay_tag_roundtrip(self): 266 from i2p_transport.ssu2_payload import RelayTagBlock, parse_payload 267 268 block = RelayTagBlock(relay_tag=0xCAFEBABE) 269 parsed = parse_payload(block.to_block()) 270 assert parsed[0].relay_tag == 0xCAFEBABE 271 272 def test_intro_key_roundtrip(self): 273 from i2p_transport.ssu2_payload import IntroKeyBlock, parse_payload 274 275 key = os.urandom(32) 276 block = IntroKeyBlock(intro_key=key) 277 parsed = parse_payload(block.to_block()) 278 assert parsed[0].intro_key == key 279 280 def test_first_packet_number_roundtrip(self): 281 from i2p_transport.ssu2_payload import FirstPacketNumberBlock, parse_payload 282 283 block = FirstPacketNumberBlock(packet_number=42) 284 parsed = parse_payload(block.to_block()) 285 assert parsed[0].packet_number == 42 286 287 def test_congestion_roundtrip(self): 288 from i2p_transport.ssu2_payload import CongestionBlock, parse_payload 289 290 cdata = os.urandom(8) 291 block = CongestionBlock(congestion_data=cdata) 292 parsed = parse_payload(block.to_block()) 293 assert parsed[0].congestion_data == cdata