"""Tests for i2p_data.su3 — SU3 signed update file parser.""" import io import struct import zipfile import pytest from i2p_data.su3 import SU3File def _build_su3( *, magic: bytes = b"I2Psu3", file_format_version: int = 0, sig_type_code: int = 0x000B, # EdDSA_SHA512_Ed25519 sig_length: int = 64, version_string: str = "0.9.62", signer_id: str = "admin@stormycloud.org", content: bytes = b"", file_type: int = 0, # ZIP content_type: int = 3, # RESEED signature: bytes | None = None, ) -> bytes: """Build a synthetic SU3 binary for testing.""" version_bytes = version_string.encode("utf-8") signer_bytes = signer_id.encode("utf-8") vlen = len(version_bytes) slen = len(signer_bytes) clen = len(content) if signature is None: signature = b"\x00" * sig_length buf = bytearray() # 0-5: magic buf.extend(magic) # 6: reserved buf.append(0) # 7: file format version buf.append(file_format_version) # 8-9: sig type code (big-endian) buf.extend(struct.pack("!H", sig_type_code)) # 10-11: sig length (big-endian) buf.extend(struct.pack("!H", sig_length)) # 12: reserved buf.append(0) # 13: version string length buf.append(vlen) # 14: reserved buf.append(0) # 15: signer id length buf.append(slen) # 16-23: content length (big-endian, 8 bytes) buf.extend(struct.pack("!Q", clen)) # 24: reserved buf.append(0) # 25: file type buf.append(file_type) # 26: reserved buf.append(0) # 27: content type buf.append(content_type) # 28-39: reserved (12 bytes) buf.extend(b"\x00" * 12) # 40: version string buf.extend(version_bytes) # 40+vlen: signer id buf.extend(signer_bytes) # content buf.extend(content) # signature buf.extend(signature) return bytes(buf) def _make_zip_with_routerinfos(names: list[str], data_list: list[bytes] | None = None) -> bytes: """Create an in-memory ZIP containing the given filenames.""" if data_list is None: data_list = [b"routerinfo-data-" + name.encode() for name in names] buf = io.BytesIO() with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: for name, data in zip(names, data_list): zf.writestr(name, data) return buf.getvalue() class TestSU3Magic: """Magic number validation.""" def test_valid_magic(self): data = _build_su3() su3 = SU3File.from_bytes(data) assert su3.magic == b"I2Psu3" def test_invalid_magic_raises(self): data = _build_su3(magic=b"BADMAG") with pytest.raises(ValueError, match="[Mm]agic"): SU3File.from_bytes(data) def test_short_magic_raises(self): with pytest.raises(ValueError): SU3File.from_bytes(b"I2P") class TestSU3HeaderFields: """Header field parsing.""" def test_sig_type_code_eddsa(self): su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x000B)) assert su3.sig_type_code == 0x000B def test_sig_type_code_dsa_sha1(self): su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0000, sig_length=40)) assert su3.sig_type_code == 0x0000 def test_sig_type_code_ecdsa_p256(self): su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0003, sig_length=64)) assert su3.sig_type_code == 0x0003 def test_sig_type_code_rsa_2048(self): su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0006, sig_length=256)) assert su3.sig_type_code == 0x0006 def test_sig_length(self): su3 = SU3File.from_bytes(_build_su3(sig_length=64)) assert su3.sig_length == 64 def test_sig_length_rsa_4096(self): su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0008, sig_length=512)) assert su3.sig_length == 512 def test_version_string(self): su3 = SU3File.from_bytes(_build_su3(version_string="0.9.62")) assert su3.version == "0.9.62" def test_version_string_long(self): ver = "1.2.3-beta4" su3 = SU3File.from_bytes(_build_su3(version_string=ver)) assert su3.version == ver def test_version_null_padded(self): """Version strings may contain null padding — strip it.""" ver_padded = "0.9.62\x00\x00\x00" su3 = SU3File.from_bytes(_build_su3(version_string=ver_padded)) assert su3.version == "0.9.62" def test_signer_id(self): su3 = SU3File.from_bytes(_build_su3(signer_id="admin@stormycloud.org")) assert su3.signer_id == "admin@stormycloud.org" def test_signer_id_different(self): su3 = SU3File.from_bytes(_build_su3(signer_id="zzz@mail.i2p")) assert su3.signer_id == "zzz@mail.i2p" def test_content_length_zero(self): su3 = SU3File.from_bytes(_build_su3(content=b"")) assert su3.content_length == 0 def test_content_length_nonzero(self): payload = b"X" * 1024 su3 = SU3File.from_bytes(_build_su3(content=payload)) assert su3.content_length == 1024 def test_file_type_zip(self): su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_ZIP)) assert su3.file_type == SU3File.TYPE_ZIP def test_file_type_xml(self): su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_XML)) assert su3.file_type == SU3File.TYPE_XML def test_file_type_html(self): su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_HTML)) assert su3.file_type == SU3File.TYPE_HTML def test_file_type_xml_gz(self): su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_XML_GZ)) assert su3.file_type == SU3File.TYPE_XML_GZ def test_file_type_txt_gz(self): su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_TXT_GZ)) assert su3.file_type == SU3File.TYPE_TXT_GZ def test_content_type_reseed(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_RESEED)) assert su3.content_type == SU3File.CONTENT_RESEED def test_content_type_router(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_ROUTER)) assert su3.content_type == SU3File.CONTENT_ROUTER def test_content_type_plugin(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_PLUGIN)) assert su3.content_type == SU3File.CONTENT_PLUGIN def test_content_type_news(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_NEWS)) assert su3.content_type == SU3File.CONTENT_NEWS def test_content_type_blocklist(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_BLOCKLIST)) assert su3.content_type == SU3File.CONTENT_BLOCKLIST class TestSU3ContentAndSignature: """Content and signature extraction.""" def test_get_content_empty(self): su3 = SU3File.from_bytes(_build_su3(content=b"")) assert su3.get_content() == b"" def test_get_content_nonempty(self): payload = b"\xDE\xAD\xBE\xEF" * 100 su3 = SU3File.from_bytes(_build_su3(content=payload)) assert su3.get_content() == payload def test_get_signature(self): sig = b"\xAB" * 64 su3 = SU3File.from_bytes(_build_su3(signature=sig, sig_length=64)) assert su3.get_signature() == sig def test_get_signature_rsa(self): sig = b"\xCD" * 256 su3 = SU3File.from_bytes( _build_su3(sig_type_code=0x0006, sig_length=256, signature=sig) ) assert su3.get_signature() == sig assert len(su3.get_signature()) == 256 def test_content_and_signature_do_not_overlap(self): content = b"A" * 200 sig = b"S" * 64 su3 = SU3File.from_bytes(_build_su3(content=content, signature=sig)) assert su3.get_content() == content assert su3.get_signature() == sig class TestSU3IsReseed: """is_reseed() convenience method.""" def test_reseed_returns_true(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_RESEED)) assert su3.is_reseed() is True def test_router_returns_false(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_ROUTER)) assert su3.is_reseed() is False def test_unknown_returns_false(self): su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_UNKNOWN)) assert su3.is_reseed() is False class TestSU3ExtractRouterInfos: """extract_routerinfos() — ZIP extraction for reseed bundles.""" def test_extracts_routerinfo_files(self): names = ["routerInfo-AAAA.dat", "routerInfo-BBBB.dat", "routerInfo-CCCC.dat"] data_list = [b"ri-data-1", b"ri-data-2", b"ri-data-3"] zip_bytes = _make_zip_with_routerinfos(names, data_list) su3 = SU3File.from_bytes( _build_su3( content=zip_bytes, file_type=SU3File.TYPE_ZIP, content_type=SU3File.CONTENT_RESEED, ) ) infos = su3.extract_routerinfos() assert len(infos) == 3 assert set(infos) == {b"ri-data-1", b"ri-data-2", b"ri-data-3"} def test_ignores_non_routerinfo_files(self): """Only files matching routerInfo-*.dat should be extracted.""" zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w") as zf: zf.writestr("routerInfo-AAAA.dat", b"good") zf.writestr("other-file.txt", b"ignored") zf.writestr("README.md", b"also ignored") su3 = SU3File.from_bytes( _build_su3( content=zip_buf.getvalue(), file_type=SU3File.TYPE_ZIP, content_type=SU3File.CONTENT_RESEED, ) ) infos = su3.extract_routerinfos() assert len(infos) == 1 assert infos[0] == b"good" def test_empty_zip_returns_empty_list(self): zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w") as zf: pass # empty zip su3 = SU3File.from_bytes( _build_su3( content=zip_buf.getvalue(), file_type=SU3File.TYPE_ZIP, content_type=SU3File.CONTENT_RESEED, ) ) assert su3.extract_routerinfos() == [] def test_non_zip_file_type_raises(self): """extract_routerinfos should raise if file_type is not ZIP.""" su3 = SU3File.from_bytes( _build_su3( content=b"", file_type=SU3File.TYPE_XML, content_type=SU3File.CONTENT_RESEED, ) ) with pytest.raises(ValueError, match="[Zz][Ii][Pp]"): su3.extract_routerinfos() def test_non_reseed_content_type_raises(self): """extract_routerinfos should raise if content_type is not RESEED.""" zip_buf = io.BytesIO() with zipfile.ZipFile(zip_buf, "w") as zf: zf.writestr("routerInfo-AAAA.dat", b"data") su3 = SU3File.from_bytes( _build_su3( content=zip_buf.getvalue(), file_type=SU3File.TYPE_ZIP, content_type=SU3File.CONTENT_ROUTER, ) ) with pytest.raises(ValueError, match="(?i)reseed"): su3.extract_routerinfos() class TestSU3Validation: """Error handling for malformed data.""" def test_truncated_header_raises(self): """Data shorter than 40 bytes must raise.""" with pytest.raises(ValueError): SU3File.from_bytes(b"I2Psu3" + b"\x00" * 10) def test_truncated_at_version_raises(self): """Header complete but not enough data for version string.""" data = _build_su3(version_string="0.9.62") # Truncate in the middle of version string area with pytest.raises(ValueError): SU3File.from_bytes(data[:42]) def test_truncated_at_content_raises(self): """Header + version + signer complete but content truncated.""" full = _build_su3(content=b"X" * 100) # Cut off in content area with pytest.raises(ValueError): SU3File.from_bytes(full[:50]) def test_wrong_file_format_version_raises(self): """File format version must be 0.""" data = _build_su3(file_format_version=99) with pytest.raises(ValueError, match="[Vv]ersion|[Ff]ormat"): SU3File.from_bytes(data)