A Python port of the Invisible Internet Project (I2P)
1"""Tests for SSU2 payload block types."""
2
3import os
4import struct
5import time
6
7import pytest
8
9
10class TestDateTimeBlock:
11 def test_roundtrip(self):
12 from i2p_transport.ssu2_payload import DateTimeBlock, SSU2BlockType, parse_payload
13
14 ts = int(time.time())
15 block = DateTimeBlock(timestamp=ts)
16 data = block.to_block()
17
18 parsed = parse_payload(data)
19 assert len(parsed) == 1
20 assert parsed[0].block_type == SSU2BlockType.DATETIME
21 assert parsed[0].timestamp == ts
22
23
24class TestRouterInfoBlock:
25 def test_roundtrip(self):
26 from i2p_transport.ssu2_payload import RouterInfoBlock, SSU2BlockType, parse_payload
27
28 ri_data = os.urandom(256)
29 block = RouterInfoBlock(flag=1, router_info_data=ri_data)
30 data = block.to_block()
31
32 parsed = parse_payload(data)
33 assert len(parsed) == 1
34 assert parsed[0].block_type == SSU2BlockType.ROUTER_INFO
35 assert parsed[0].flag == 1
36 assert parsed[0].router_info_data == ri_data
37
38
39class TestI2NPBlock:
40 def test_roundtrip(self):
41 from i2p_transport.ssu2_payload import I2NPBlock, SSU2BlockType, parse_payload
42
43 msg = os.urandom(128)
44 block = I2NPBlock(i2np_data=msg)
45 data = block.to_block()
46
47 parsed = parse_payload(data)
48 assert len(parsed) == 1
49 assert parsed[0].block_type == SSU2BlockType.I2NP
50 assert parsed[0].i2np_data == msg
51
52
53class TestFirstFragmentBlock:
54 def test_roundtrip(self):
55 from i2p_transport.ssu2_payload import FirstFragmentBlock, SSU2BlockType, parse_payload
56
57 frag_data = os.urandom(512)
58 block = FirstFragmentBlock(msg_id=0xDEADBEEF, total_fragments=5, fragment_data=frag_data)
59 data = block.to_block()
60
61 parsed = parse_payload(data)
62 assert len(parsed) == 1
63 assert parsed[0].block_type == SSU2BlockType.FIRST_FRAGMENT
64 assert parsed[0].msg_id == 0xDEADBEEF
65 assert parsed[0].total_fragments == 5
66 assert parsed[0].fragment_data == frag_data
67
68
69class TestFollowOnFragmentBlock:
70 def test_roundtrip(self):
71 from i2p_transport.ssu2_payload import FollowOnFragmentBlock, SSU2BlockType, parse_payload
72
73 frag_data = os.urandom(256)
74 block = FollowOnFragmentBlock(
75 msg_id=0x12345678, fragment_num=3, is_last=True, fragment_data=frag_data
76 )
77 data = block.to_block()
78
79 parsed = parse_payload(data)
80 assert len(parsed) == 1
81 assert parsed[0].block_type == SSU2BlockType.FOLLOW_ON_FRAGMENT
82 assert parsed[0].msg_id == 0x12345678
83 assert parsed[0].fragment_num == 3
84 assert parsed[0].is_last is True
85 assert parsed[0].fragment_data == frag_data
86
87 def test_not_last(self):
88 from i2p_transport.ssu2_payload import FollowOnFragmentBlock, parse_payload
89
90 block = FollowOnFragmentBlock(
91 msg_id=1, fragment_num=2, is_last=False, fragment_data=b"data"
92 )
93 data = block.to_block()
94 parsed = parse_payload(data)
95 assert parsed[0].is_last is False
96
97
98class TestAckBlock:
99 def test_roundtrip(self):
100 from i2p_transport.ssu2_payload import AckBlock, SSU2BlockType, parse_payload
101
102 ranges = [(5, 2), (3, 0)]
103 block = AckBlock(ack_through=100, ack_count=2, ranges=ranges)
104 data = block.to_block()
105
106 parsed = parse_payload(data)
107 assert len(parsed) == 1
108 assert parsed[0].block_type == SSU2BlockType.ACK
109 assert parsed[0].ack_through == 100
110 assert parsed[0].ack_count == 2
111 assert parsed[0].ranges == ranges
112
113
114class TestAddressBlock:
115 def test_ipv4(self):
116 from i2p_transport.ssu2_payload import AddressBlock, SSU2BlockType, parse_payload
117
118 ip = bytes([192, 168, 1, 1])
119 block = AddressBlock(ip_address=ip, port=8080)
120 data = block.to_block()
121
122 parsed = parse_payload(data)
123 assert len(parsed) == 1
124 assert parsed[0].block_type == SSU2BlockType.ADDRESS
125 assert parsed[0].ip_address == ip
126 assert parsed[0].port == 8080
127
128 def test_ipv6(self):
129 from i2p_transport.ssu2_payload import AddressBlock, SSU2BlockType, parse_payload
130
131 ip = bytes(range(16)) # 16-byte IPv6 address
132 block = AddressBlock(ip_address=ip, port=443)
133 data = block.to_block()
134
135 parsed = parse_payload(data)
136 assert len(parsed) == 1
137 assert parsed[0].ip_address == ip
138 assert parsed[0].port == 443
139
140
141class TestNewTokenBlock:
142 def test_roundtrip(self):
143 from i2p_transport.ssu2_payload import NewTokenBlock, SSU2BlockType, parse_payload
144
145 block = NewTokenBlock(expires=1700000000, token=0xFEDCBA9876543210)
146 data = block.to_block()
147
148 parsed = parse_payload(data)
149 assert len(parsed) == 1
150 assert parsed[0].block_type == SSU2BlockType.NEW_TOKEN
151 assert parsed[0].expires == 1700000000
152 assert parsed[0].token == 0xFEDCBA9876543210
153
154
155class TestPathChallengeResponseBlock:
156 def test_roundtrip(self):
157 from i2p_transport.ssu2_payload import (
158 PathChallengeBlock, PathResponseBlock, SSU2BlockType, parse_payload
159 )
160
161 challenge_data = os.urandom(8)
162 challenge = PathChallengeBlock(challenge_data=challenge_data)
163 response = PathResponseBlock(response_data=challenge_data)
164
165 data = challenge.to_block() + response.to_block()
166 parsed = parse_payload(data)
167 assert len(parsed) == 2
168 assert parsed[0].block_type == SSU2BlockType.PATH_CHALLENGE
169 assert parsed[0].challenge_data == challenge_data
170 assert parsed[1].block_type == SSU2BlockType.PATH_RESPONSE
171 assert parsed[1].response_data == challenge_data
172
173
174class TestTerminationBlock:
175 def test_roundtrip(self):
176 from i2p_transport.ssu2_payload import TerminationBlock, SSU2BlockType, parse_payload
177
178 block = TerminationBlock(reason=3, valid_frames_received=9999)
179 data = block.to_block()
180
181 parsed = parse_payload(data)
182 assert len(parsed) == 1
183 assert parsed[0].block_type == SSU2BlockType.TERMINATION
184 assert parsed[0].reason == 3
185 assert parsed[0].valid_frames_received == 9999
186
187
188class TestPaddingBlock:
189 def test_padding(self):
190 from i2p_transport.ssu2_payload import PaddingBlock, SSU2BlockType, parse_payload
191
192 pad = os.urandom(64)
193 block = PaddingBlock(padding=pad)
194 data = block.to_block()
195
196 parsed = parse_payload(data)
197 assert len(parsed) == 1
198 assert parsed[0].block_type == SSU2BlockType.PADDING
199 assert parsed[0].padding == pad
200
201
202class TestParsePayload:
203 def test_multiple_blocks(self):
204 from i2p_transport.ssu2_payload import (
205 DateTimeBlock, I2NPBlock, PaddingBlock,
206 build_payload, parse_payload,
207 )
208
209 blocks = [
210 DateTimeBlock(timestamp=1000000),
211 I2NPBlock(i2np_data=b"hello i2np"),
212 PaddingBlock(padding=b"\x00" * 16),
213 ]
214 payload = build_payload(blocks)
215 parsed = parse_payload(payload)
216
217 assert len(parsed) == 3
218 assert parsed[0].timestamp == 1000000
219 assert parsed[1].i2np_data == b"hello i2np"
220 assert parsed[2].padding == b"\x00" * 16
221
222 def test_build_payload_roundtrip(self):
223 from i2p_transport.ssu2_payload import (
224 DateTimeBlock, AckBlock, AddressBlock, TerminationBlock,
225 build_payload, parse_payload,
226 )
227
228 blocks = [
229 DateTimeBlock(timestamp=42),
230 AckBlock(ack_through=50, ack_count=1, ranges=[(10, 5)]),
231 AddressBlock(ip_address=bytes([10, 0, 0, 1]), port=1234),
232 TerminationBlock(reason=0, valid_frames_received=100),
233 ]
234 payload = build_payload(blocks)
235 parsed = parse_payload(payload)
236
237 assert len(parsed) == 4
238 assert parsed[0].timestamp == 42
239 assert parsed[1].ack_through == 50
240 assert parsed[2].ip_address == bytes([10, 0, 0, 1])
241 assert parsed[3].valid_frames_received == 100
242
243 def test_empty_payload(self):
244 from i2p_transport.ssu2_payload import parse_payload
245
246 parsed = parse_payload(b"")
247 assert parsed == []
248
249 def test_options_block_roundtrip(self):
250 from i2p_transport.ssu2_payload import OptionsBlock, parse_payload
251
252 opts = os.urandom(32)
253 block = OptionsBlock(options=opts)
254 parsed = parse_payload(block.to_block())
255 assert parsed[0].options == opts
256
257 def test_relay_tag_request_roundtrip(self):
258 from i2p_transport.ssu2_payload import RelayTagRequestBlock, SSU2BlockType, parse_payload
259
260 block = RelayTagRequestBlock()
261 parsed = parse_payload(block.to_block())
262 assert len(parsed) == 1
263 assert parsed[0].block_type == SSU2BlockType.RELAY_TAG_REQUEST
264
265 def test_relay_tag_roundtrip(self):
266 from i2p_transport.ssu2_payload import RelayTagBlock, parse_payload
267
268 block = RelayTagBlock(relay_tag=0xCAFEBABE)
269 parsed = parse_payload(block.to_block())
270 assert parsed[0].relay_tag == 0xCAFEBABE
271
272 def test_intro_key_roundtrip(self):
273 from i2p_transport.ssu2_payload import IntroKeyBlock, parse_payload
274
275 key = os.urandom(32)
276 block = IntroKeyBlock(intro_key=key)
277 parsed = parse_payload(block.to_block())
278 assert parsed[0].intro_key == key
279
280 def test_first_packet_number_roundtrip(self):
281 from i2p_transport.ssu2_payload import FirstPacketNumberBlock, parse_payload
282
283 block = FirstPacketNumberBlock(packet_number=42)
284 parsed = parse_payload(block.to_block())
285 assert parsed[0].packet_number == 42
286
287 def test_congestion_roundtrip(self):
288 from i2p_transport.ssu2_payload import CongestionBlock, parse_payload
289
290 cdata = os.urandom(8)
291 block = CongestionBlock(congestion_data=cdata)
292 parsed = parse_payload(block.to_block())
293 assert parsed[0].congestion_data == cdata