A Python port of the Invisible Internet Project (I2P)
at main 356 lines 12 kB view raw
1"""Tests for i2p_data.su3 — SU3 signed update file parser.""" 2import io 3import struct 4import zipfile 5 6import pytest 7 8from i2p_data.su3 import SU3File 9 10 11def _build_su3( 12 *, 13 magic: bytes = b"I2Psu3", 14 file_format_version: int = 0, 15 sig_type_code: int = 0x000B, # EdDSA_SHA512_Ed25519 16 sig_length: int = 64, 17 version_string: str = "0.9.62", 18 signer_id: str = "admin@stormycloud.org", 19 content: bytes = b"", 20 file_type: int = 0, # ZIP 21 content_type: int = 3, # RESEED 22 signature: bytes | None = None, 23) -> bytes: 24 """Build a synthetic SU3 binary for testing.""" 25 version_bytes = version_string.encode("utf-8") 26 signer_bytes = signer_id.encode("utf-8") 27 vlen = len(version_bytes) 28 slen = len(signer_bytes) 29 clen = len(content) 30 if signature is None: 31 signature = b"\x00" * sig_length 32 33 buf = bytearray() 34 # 0-5: magic 35 buf.extend(magic) 36 # 6: reserved 37 buf.append(0) 38 # 7: file format version 39 buf.append(file_format_version) 40 # 8-9: sig type code (big-endian) 41 buf.extend(struct.pack("!H", sig_type_code)) 42 # 10-11: sig length (big-endian) 43 buf.extend(struct.pack("!H", sig_length)) 44 # 12: reserved 45 buf.append(0) 46 # 13: version string length 47 buf.append(vlen) 48 # 14: reserved 49 buf.append(0) 50 # 15: signer id length 51 buf.append(slen) 52 # 16-23: content length (big-endian, 8 bytes) 53 buf.extend(struct.pack("!Q", clen)) 54 # 24: reserved 55 buf.append(0) 56 # 25: file type 57 buf.append(file_type) 58 # 26: reserved 59 buf.append(0) 60 # 27: content type 61 buf.append(content_type) 62 # 28-39: reserved (12 bytes) 63 buf.extend(b"\x00" * 12) 64 # 40: version string 65 buf.extend(version_bytes) 66 # 40+vlen: signer id 67 buf.extend(signer_bytes) 68 # content 69 buf.extend(content) 70 # signature 71 buf.extend(signature) 72 return bytes(buf) 73 74 75def _make_zip_with_routerinfos(names: list[str], data_list: list[bytes] | None = None) -> bytes: 76 """Create an in-memory ZIP containing the given filenames.""" 77 if data_list is None: 78 data_list = [b"routerinfo-data-" + name.encode() for name in names] 79 buf = io.BytesIO() 80 with zipfile.ZipFile(buf, "w", zipfile.ZIP_DEFLATED) as zf: 81 for name, data in zip(names, data_list): 82 zf.writestr(name, data) 83 return buf.getvalue() 84 85 86class TestSU3Magic: 87 """Magic number validation.""" 88 89 def test_valid_magic(self): 90 data = _build_su3() 91 su3 = SU3File.from_bytes(data) 92 assert su3.magic == b"I2Psu3" 93 94 def test_invalid_magic_raises(self): 95 data = _build_su3(magic=b"BADMAG") 96 with pytest.raises(ValueError, match="[Mm]agic"): 97 SU3File.from_bytes(data) 98 99 def test_short_magic_raises(self): 100 with pytest.raises(ValueError): 101 SU3File.from_bytes(b"I2P") 102 103 104class TestSU3HeaderFields: 105 """Header field parsing.""" 106 107 def test_sig_type_code_eddsa(self): 108 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x000B)) 109 assert su3.sig_type_code == 0x000B 110 111 def test_sig_type_code_dsa_sha1(self): 112 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0000, sig_length=40)) 113 assert su3.sig_type_code == 0x0000 114 115 def test_sig_type_code_ecdsa_p256(self): 116 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0003, sig_length=64)) 117 assert su3.sig_type_code == 0x0003 118 119 def test_sig_type_code_rsa_2048(self): 120 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0006, sig_length=256)) 121 assert su3.sig_type_code == 0x0006 122 123 def test_sig_length(self): 124 su3 = SU3File.from_bytes(_build_su3(sig_length=64)) 125 assert su3.sig_length == 64 126 127 def test_sig_length_rsa_4096(self): 128 su3 = SU3File.from_bytes(_build_su3(sig_type_code=0x0008, sig_length=512)) 129 assert su3.sig_length == 512 130 131 def test_version_string(self): 132 su3 = SU3File.from_bytes(_build_su3(version_string="0.9.62")) 133 assert su3.version == "0.9.62" 134 135 def test_version_string_long(self): 136 ver = "1.2.3-beta4" 137 su3 = SU3File.from_bytes(_build_su3(version_string=ver)) 138 assert su3.version == ver 139 140 def test_version_null_padded(self): 141 """Version strings may contain null padding — strip it.""" 142 ver_padded = "0.9.62\x00\x00\x00" 143 su3 = SU3File.from_bytes(_build_su3(version_string=ver_padded)) 144 assert su3.version == "0.9.62" 145 146 def test_signer_id(self): 147 su3 = SU3File.from_bytes(_build_su3(signer_id="admin@stormycloud.org")) 148 assert su3.signer_id == "admin@stormycloud.org" 149 150 def test_signer_id_different(self): 151 su3 = SU3File.from_bytes(_build_su3(signer_id="zzz@mail.i2p")) 152 assert su3.signer_id == "zzz@mail.i2p" 153 154 def test_content_length_zero(self): 155 su3 = SU3File.from_bytes(_build_su3(content=b"")) 156 assert su3.content_length == 0 157 158 def test_content_length_nonzero(self): 159 payload = b"X" * 1024 160 su3 = SU3File.from_bytes(_build_su3(content=payload)) 161 assert su3.content_length == 1024 162 163 def test_file_type_zip(self): 164 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_ZIP)) 165 assert su3.file_type == SU3File.TYPE_ZIP 166 167 def test_file_type_xml(self): 168 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_XML)) 169 assert su3.file_type == SU3File.TYPE_XML 170 171 def test_file_type_html(self): 172 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_HTML)) 173 assert su3.file_type == SU3File.TYPE_HTML 174 175 def test_file_type_xml_gz(self): 176 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_XML_GZ)) 177 assert su3.file_type == SU3File.TYPE_XML_GZ 178 179 def test_file_type_txt_gz(self): 180 su3 = SU3File.from_bytes(_build_su3(file_type=SU3File.TYPE_TXT_GZ)) 181 assert su3.file_type == SU3File.TYPE_TXT_GZ 182 183 def test_content_type_reseed(self): 184 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_RESEED)) 185 assert su3.content_type == SU3File.CONTENT_RESEED 186 187 def test_content_type_router(self): 188 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_ROUTER)) 189 assert su3.content_type == SU3File.CONTENT_ROUTER 190 191 def test_content_type_plugin(self): 192 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_PLUGIN)) 193 assert su3.content_type == SU3File.CONTENT_PLUGIN 194 195 def test_content_type_news(self): 196 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_NEWS)) 197 assert su3.content_type == SU3File.CONTENT_NEWS 198 199 def test_content_type_blocklist(self): 200 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_BLOCKLIST)) 201 assert su3.content_type == SU3File.CONTENT_BLOCKLIST 202 203 204class TestSU3ContentAndSignature: 205 """Content and signature extraction.""" 206 207 def test_get_content_empty(self): 208 su3 = SU3File.from_bytes(_build_su3(content=b"")) 209 assert su3.get_content() == b"" 210 211 def test_get_content_nonempty(self): 212 payload = b"\xDE\xAD\xBE\xEF" * 100 213 su3 = SU3File.from_bytes(_build_su3(content=payload)) 214 assert su3.get_content() == payload 215 216 def test_get_signature(self): 217 sig = b"\xAB" * 64 218 su3 = SU3File.from_bytes(_build_su3(signature=sig, sig_length=64)) 219 assert su3.get_signature() == sig 220 221 def test_get_signature_rsa(self): 222 sig = b"\xCD" * 256 223 su3 = SU3File.from_bytes( 224 _build_su3(sig_type_code=0x0006, sig_length=256, signature=sig) 225 ) 226 assert su3.get_signature() == sig 227 assert len(su3.get_signature()) == 256 228 229 def test_content_and_signature_do_not_overlap(self): 230 content = b"A" * 200 231 sig = b"S" * 64 232 su3 = SU3File.from_bytes(_build_su3(content=content, signature=sig)) 233 assert su3.get_content() == content 234 assert su3.get_signature() == sig 235 236 237class TestSU3IsReseed: 238 """is_reseed() convenience method.""" 239 240 def test_reseed_returns_true(self): 241 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_RESEED)) 242 assert su3.is_reseed() is True 243 244 def test_router_returns_false(self): 245 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_ROUTER)) 246 assert su3.is_reseed() is False 247 248 def test_unknown_returns_false(self): 249 su3 = SU3File.from_bytes(_build_su3(content_type=SU3File.CONTENT_UNKNOWN)) 250 assert su3.is_reseed() is False 251 252 253class TestSU3ExtractRouterInfos: 254 """extract_routerinfos() — ZIP extraction for reseed bundles.""" 255 256 def test_extracts_routerinfo_files(self): 257 names = ["routerInfo-AAAA.dat", "routerInfo-BBBB.dat", "routerInfo-CCCC.dat"] 258 data_list = [b"ri-data-1", b"ri-data-2", b"ri-data-3"] 259 zip_bytes = _make_zip_with_routerinfos(names, data_list) 260 su3 = SU3File.from_bytes( 261 _build_su3( 262 content=zip_bytes, 263 file_type=SU3File.TYPE_ZIP, 264 content_type=SU3File.CONTENT_RESEED, 265 ) 266 ) 267 infos = su3.extract_routerinfos() 268 assert len(infos) == 3 269 assert set(infos) == {b"ri-data-1", b"ri-data-2", b"ri-data-3"} 270 271 def test_ignores_non_routerinfo_files(self): 272 """Only files matching routerInfo-*.dat should be extracted.""" 273 zip_buf = io.BytesIO() 274 with zipfile.ZipFile(zip_buf, "w") as zf: 275 zf.writestr("routerInfo-AAAA.dat", b"good") 276 zf.writestr("other-file.txt", b"ignored") 277 zf.writestr("README.md", b"also ignored") 278 su3 = SU3File.from_bytes( 279 _build_su3( 280 content=zip_buf.getvalue(), 281 file_type=SU3File.TYPE_ZIP, 282 content_type=SU3File.CONTENT_RESEED, 283 ) 284 ) 285 infos = su3.extract_routerinfos() 286 assert len(infos) == 1 287 assert infos[0] == b"good" 288 289 def test_empty_zip_returns_empty_list(self): 290 zip_buf = io.BytesIO() 291 with zipfile.ZipFile(zip_buf, "w") as zf: 292 pass # empty zip 293 su3 = SU3File.from_bytes( 294 _build_su3( 295 content=zip_buf.getvalue(), 296 file_type=SU3File.TYPE_ZIP, 297 content_type=SU3File.CONTENT_RESEED, 298 ) 299 ) 300 assert su3.extract_routerinfos() == [] 301 302 def test_non_zip_file_type_raises(self): 303 """extract_routerinfos should raise if file_type is not ZIP.""" 304 su3 = SU3File.from_bytes( 305 _build_su3( 306 content=b"<xml/>", 307 file_type=SU3File.TYPE_XML, 308 content_type=SU3File.CONTENT_RESEED, 309 ) 310 ) 311 with pytest.raises(ValueError, match="[Zz][Ii][Pp]"): 312 su3.extract_routerinfos() 313 314 def test_non_reseed_content_type_raises(self): 315 """extract_routerinfos should raise if content_type is not RESEED.""" 316 zip_buf = io.BytesIO() 317 with zipfile.ZipFile(zip_buf, "w") as zf: 318 zf.writestr("routerInfo-AAAA.dat", b"data") 319 su3 = SU3File.from_bytes( 320 _build_su3( 321 content=zip_buf.getvalue(), 322 file_type=SU3File.TYPE_ZIP, 323 content_type=SU3File.CONTENT_ROUTER, 324 ) 325 ) 326 with pytest.raises(ValueError, match="(?i)reseed"): 327 su3.extract_routerinfos() 328 329 330class TestSU3Validation: 331 """Error handling for malformed data.""" 332 333 def test_truncated_header_raises(self): 334 """Data shorter than 40 bytes must raise.""" 335 with pytest.raises(ValueError): 336 SU3File.from_bytes(b"I2Psu3" + b"\x00" * 10) 337 338 def test_truncated_at_version_raises(self): 339 """Header complete but not enough data for version string.""" 340 data = _build_su3(version_string="0.9.62") 341 # Truncate in the middle of version string area 342 with pytest.raises(ValueError): 343 SU3File.from_bytes(data[:42]) 344 345 def test_truncated_at_content_raises(self): 346 """Header + version + signer complete but content truncated.""" 347 full = _build_su3(content=b"X" * 100) 348 # Cut off in content area 349 with pytest.raises(ValueError): 350 SU3File.from_bytes(full[:50]) 351 352 def test_wrong_file_format_version_raises(self): 353 """File format version must be 0.""" 354 data = _build_su3(file_format_version=99) 355 with pytest.raises(ValueError, match="[Vv]ersion|[Ff]ormat"): 356 SU3File.from_bytes(data)