A Python port of the Invisible Internet Project (I2P)
1"""Tests for i2p_data.su3 — SU3 signed update file parser."""
2import io
3import struct
4import zipfile
5
6import pytest
7
8from i2p_data.su3 import SU3File
9
10
11def _build_su3(
12 *,
13 magic: bytes = b"I2Psu3",
14 file_format_version: int = 0,
15 sig_type_code: int = 0x000B, # EdDSA_SHA512_Ed25519
16 sig_length: int = 64,
17 version_string: str = "0.9.62",
18 signer_id: str = "admin@stormycloud.org",
19 content: bytes = b"",
20 file_type: int = 0, # ZIP
21 content_type: int = 3, # RESEED
22 signature: bytes | None = None,
23) -> bytes:
24 """Build a synthetic SU3 binary for testing."""
25 version_bytes = version_string.encode("utf-8")
26 signer_bytes = signer_id.encode("utf-8")
27 vlen = len(version_bytes)
28 slen = len(signer_bytes)
29 clen = len(content)
30 if signature is None:
31 signature = b"\x00" * sig_length
32
33 buf = bytearray()
34 # 0-5: magic
35 buf.extend(magic)
36 # 6: reserved
37 buf.append(0)
38 # 7: file format version
39 buf.append(file_format_version)
40 # 8-9: sig type code (big-endian)
41 buf.extend(struct.pack("!H", sig_type_code))
42 # 10-11: sig length (big-endian)
43 buf.extend(struct.pack("!H", sig_length))
44 # 12: reserved
45 buf.append(0)
46 # 13: version string length
47 buf.append(vlen)
48 # 14: reserved
49 buf.append(0)
50 # 15: signer id length
51 buf.append(slen)
52 # 16-23: content length (big-endian, 8 bytes)
53 buf.extend(struct.pack("!Q", clen))
54 # 24: reserved
55 buf.append(0)
56 # 25: file type
57 buf.append(file_type)
58 # 26: reserved
59 buf.append(0)
60 # 27: content type
61 buf.append(content_type)
62 # 28-39: reserved (12 bytes)
63 buf.extend(b"\x00" * 12)
64 # 40: version string
65 buf.extend(version_bytes)
66 # 40+vlen: signer id
67 buf.extend(signer_bytes)
68 # content
69 buf.extend(content)
70 # signature
71 buf.extend(signature)
72 return bytes(buf)
73
74
75def _make_zip_with_routerinfos(names: list[str], data_list: list[bytes] | None = None) -> bytes:
76 """Create an in-memory ZIP containing the given filenames."""
77 if data_list is None:
78 data_list = [b"routerinfo-data-" + name.encode() for name in names]
79 buf = io.BytesIO()
80 with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf:
81 for name, data in zip(names, data_list):
82 zf.writestr(name, data)
83 return buf.getvalue()
84
85
86class TestSU3Magic:
87 """Magic number validation."""
88
89 def test_valid_magic(self):
90 data = _build_su3()
91 su3 = SU3File.from_bytes(data)
92 assert su3.magic == b"I2Psu3"
93
94 def test_invalid_magic_raises(self):
95 data = _build_su3(magic=b"BADMAG")
96 with pytest.raises(ValueError, match="[Mm]agic"):
97 SU3File.from_bytes(data)
98
99 def test_short_magic_raises(self):
100 with pytest.raises(ValueError):
101 SU3File.from_bytes(b"I2P")
102
103
104class TestSU3HeaderFields:
105 """Header field parsing."""
106
107 def test_sig_type_code_eddsa(self):
108 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x000B))
109 assert su3.sig_type_code == 0x000B
110
111 def test_sig_type_code_dsa_sha1(self):
112 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0000, sig_length=40))
113 assert su3.sig_type_code == 0x0000
114
115 def test_sig_type_code_ecdsa_p256(self):
116 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0003, sig_length=64))
117 assert su3.sig_type_code == 0x0003
118
119 def test_sig_type_code_rsa_2048(self):
120 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0006, sig_length=256))
121 assert su3.sig_type_code == 0x0006
122
123 def test_sig_length(self):
124 su3 = SU3File.from_bytes(_build_su3(sig_length=64))
125 assert su3.sig_length == 64
126
127 def test_sig_length_rsa_4096(self):
128 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0008, sig_length=512))
129 assert su3.sig_length == 512
130
131 def test_version_string(self):
132 su3 = SU3File.from_bytes(_build_su3(version_string="0.9.62"))
133 assert su3.version == "0.9.62"
134
135 def test_version_string_long(self):
136 ver = "1.2.3-beta4"
137 su3 = SU3File.from_bytes(_build_su3(version_string=ver))
138 assert su3.version == ver
139
140 def test_version_null_padded(self):
141 """Version strings may contain null padding — strip it."""
142 ver_padded = "0.9.62\x00\x00\x00"
143 su3 = SU3File.from_bytes(_build_su3(version_string=ver_padded))
144 assert su3.version == "0.9.62"
145
146 def test_signer_id(self):
147 su3 = SU3File.from_bytes(_build_su3(signer_id="admin@stormycloud.org"))
148 assert su3.signer_id == "admin@stormycloud.org"
149
150 def test_signer_id_different(self):
151 su3 = SU3File.from_bytes(_build_su3(signer_id="zzz@mail.i2p"))
152 assert su3.signer_id == "zzz@mail.i2p"
153
154 def test_content_length_zero(self):
155 su3 = SU3File.from_bytes(_build_su3(content=b""))
156 assert su3.content_length == 0
157
158 def test_content_length_nonzero(self):
159 payload = b"X" * 1024
160 su3 = SU3File.from_bytes(_build_su3(content=payload))
161 assert su3.content_length == 1024
162
163 def test_file_type_zip(self):
164 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_ZIP))
165 assert su3.file_type == SU3File.TYPE_ZIP
166
167 def test_file_type_xml(self):
168 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_XML))
169 assert su3.file_type == SU3File.TYPE_XML
170
171 def test_file_type_html(self):
172 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_HTML))
173 assert su3.file_type == SU3File.TYPE_HTML
174
175 def test_file_type_xml_gz(self):
176 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_XML_GZ))
177 assert su3.file_type == SU3File.TYPE_XML_GZ
178
179 def test_file_type_txt_gz(self):
180 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_TXT_GZ))
181 assert su3.file_type == SU3File.TYPE_TXT_GZ
182
183 def test_content_type_reseed(self):
184 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_RESEED))
185 assert su3.content_type == SU3File.CONTENT_RESEED
186
187 def test_content_type_router(self):
188 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_ROUTER))
189 assert su3.content_type == SU3File.CONTENT_ROUTER
190
191 def test_content_type_plugin(self):
192 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_PLUGIN))
193 assert su3.content_type == SU3File.CONTENT_PLUGIN
194
195 def test_content_type_news(self):
196 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_NEWS))
197 assert su3.content_type == SU3File.CONTENT_NEWS
198
199 def test_content_type_blocklist(self):
200 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_BLOCKLIST))
201 assert su3.content_type == SU3File.CONTENT_BLOCKLIST
202
203
204class TestSU3ContentAndSignature:
205 """Content and signature extraction."""
206
207 def test_get_content_empty(self):
208 su3 = SU3File.from_bytes(_build_su3(content=b""))
209 assert su3.get_content() == b""
210
211 def test_get_content_nonempty(self):
212 payload = b"\xDE\xAD\xBE\xEF" * 100
213 su3 = SU3File.from_bytes(_build_su3(content=payload))
214 assert su3.get_content() == payload
215
216 def test_get_signature(self):
217 sig = b"\xAB" * 64
218 su3 = SU3File.from_bytes(_build_su3(signature=sig, sig_length=64))
219 assert su3.get_signature() == sig
220
221 def test_get_signature_rsa(self):
222 sig = b"\xCD" * 256
223 su3 = SU3File.from_bytes(
224 _build_su3(sig_type_code=0x0006, sig_length=256, signature=sig)
225 )
226 assert su3.get_signature() == sig
227 assert len(su3.get_signature()) == 256
228
229 def test_content_and_signature_do_not_overlap(self):
230 content = b"A" * 200
231 sig = b"S" * 64
232 su3 = SU3File.from_bytes(_build_su3(content=content, signature=sig))
233 assert su3.get_content() == content
234 assert su3.get_signature() == sig
235
236
237class TestSU3IsReseed:
238 """is_reseed() convenience method."""
239
240 def test_reseed_returns_true(self):
241 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_RESEED))
242 assert su3.is_reseed() is True
243
244 def test_router_returns_false(self):
245 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_ROUTER))
246 assert su3.is_reseed() is False
247
248 def test_unknown_returns_false(self):
249 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_UNKNOWN))
250 assert su3.is_reseed() is False
251
252
253class TestSU3ExtractRouterInfos:
254 """extract_routerinfos() — ZIP extraction for reseed bundles."""
255
256 def test_extracts_routerinfo_files(self):
257 names = ["routerInfo-AAAA.dat", "routerInfo-BBBB.dat", "routerInfo-CCCC.dat"]
258 data_list = [b"ri-data-1", b"ri-data-2", b"ri-data-3"]
259 zip_bytes = _make_zip_with_routerinfos(names, data_list)
260 su3 = SU3File.from_bytes(
261 _build_su3(
262 content=zip_bytes,
263 file_type=SU3File.TYPE_ZIP,
264 content_type=SU3File.CONTENT_RESEED,
265 )
266 )
267 infos = su3.extract_routerinfos()
268 assert len(infos) == 3
269 assert set(infos) == {b"ri-data-1", b"ri-data-2", b"ri-data-3"}
270
271 def test_ignores_non_routerinfo_files(self):
272 """Only files matching routerInfo-*.dat should be extracted."""
273 zip_buf = io.BytesIO()
274 with zipfile.ZipFile(zip_buf, "w") as zf:
275 zf.writestr("routerInfo-AAAA.dat", b"good")
276 zf.writestr("other-file.txt", b"ignored")
277 zf.writestr("README.md", b"also ignored")
278 su3 = SU3File.from_bytes(
279 _build_su3(
280 content=zip_buf.getvalue(),
281 file_type=SU3File.TYPE_ZIP,
282 content_type=SU3File.CONTENT_RESEED,
283 )
284 )
285 infos = su3.extract_routerinfos()
286 assert len(infos) == 1
287 assert infos[0] == b"good"
288
289 def test_empty_zip_returns_empty_list(self):
290 zip_buf = io.BytesIO()
291 with zipfile.ZipFile(zip_buf, "w") as zf:
292 pass # empty zip
293 su3 = SU3File.from_bytes(
294 _build_su3(
295 content=zip_buf.getvalue(),
296 file_type=SU3File.TYPE_ZIP,
297 content_type=SU3File.CONTENT_RESEED,
298 )
299 )
300 assert su3.extract_routerinfos() == []
301
302 def test_non_zip_file_type_raises(self):
303 """extract_routerinfos should raise if file_type is not ZIP."""
304 su3 = SU3File.from_bytes(
305 _build_su3(
306 content=b"<xml/>",
307 file_type=SU3File.TYPE_XML,
308 content_type=SU3File.CONTENT_RESEED,
309 )
310 )
311 with pytest.raises(ValueError, match="[Zz][Ii][Pp]"):
312 su3.extract_routerinfos()
313
314 def test_non_reseed_content_type_raises(self):
315 """extract_routerinfos should raise if content_type is not RESEED."""
316 zip_buf = io.BytesIO()
317 with zipfile.ZipFile(zip_buf, "w") as zf:
318 zf.writestr("routerInfo-AAAA.dat", b"data")
319 su3 = SU3File.from_bytes(
320 _build_su3(
321 content=zip_buf.getvalue(),
322 file_type=SU3File.TYPE_ZIP,
323 content_type=SU3File.CONTENT_ROUTER,
324 )
325 )
326 with pytest.raises(ValueError, match="(?i)reseed"):
327 su3.extract_routerinfos()
328
329
330class TestSU3Validation:
331 """Error handling for malformed data."""
332
333 def test_truncated_header_raises(self):
334 """Data shorter than 40 bytes must raise."""
335 with pytest.raises(ValueError):
336 SU3File.from_bytes(b"I2Psu3" + b"\x00" * 10)
337
338 def test_truncated_at_version_raises(self):
339 """Header complete but not enough data for version string."""
340 data = _build_su3(version_string="0.9.62")
341 # Truncate in the middle of version string area
342 with pytest.raises(ValueError):
343 SU3File.from_bytes(data[:42])
344
345 def test_truncated_at_content_raises(self):
346 """Header + version + signer complete but content truncated."""
347 full = _build_su3(content=b"X" * 100)
348 # Cut off in content area
349 with pytest.raises(ValueError):
350 SU3File.from_bytes(full[:50])
351
352 def test_wrong_file_format_version_raises(self):
353 """File format version must be 0."""
354 data = _build_su3(file_format_version=99)
355 with pytest.raises(ValueError, match="[Vv]ersion|[Ff]ormat"):
356 SU3File.from_bytes(data)