"""SU3 — I2P signed update file parser. Ported from net.i2p.crypto.SU3File. Wire format: 0-5 Magic "I2Psu3" (ASCII) 6 Reserved (0) 7 File format version (0) 8-9 Signature type code (big-endian) 10-11 Signature length (big-endian) 12 Reserved (0) 13 Version string length (1-255) 14 Reserved (0) 15 Signer ID length (1-255) 16-23 Content length (big-endian, 8 bytes) 24 Reserved (0) 25 File type 26 Reserved (0) 27 Content type 28-39 Reserved (12 zero bytes) 40 Version string (UTF-8, possibly null-padded) 40+vl Signer ID (UTF-8) ... Content bytes (length = content_length) ... Signature bytes (length = sig_length) """ from __future__ import annotations import io import struct import zipfile class SU3File: """Parser for the I2P SU3 signed update file format.""" MAGIC = b"I2Psu3" HEADER_SIZE = 40 # Fixed header before variable-length fields # File types TYPE_ZIP = 0 TYPE_XML = 1 TYPE_HTML = 2 TYPE_XML_GZ = 3 TYPE_TXT_GZ = 4 # Content types CONTENT_UNKNOWN = 0 CONTENT_ROUTER = 1 CONTENT_PLUGIN = 2 CONTENT_RESEED = 3 CONTENT_NEWS = 4 CONTENT_BLOCKLIST = 5 # Signature type codes and their expected lengths SIG_TYPES = { 0x0000: ("DSA_SHA1", 40), 0x0003: ("ECDSA_SHA256_P256", 64), 0x0004: ("ECDSA_SHA384_P384", 96), 0x0005: ("ECDSA_SHA512_P521", 132), 0x0006: ("RSA_SHA256_2048", 256), 0x0007: ("RSA_SHA384_3072", 384), 0x0008: ("RSA_SHA512_4096", 512), 0x000B: ("EdDSA_SHA512_Ed25519", 64), } __slots__ = ( "_magic", "_sig_type_code", "_sig_length", "_version", "_signer_id", "_file_type", "_content_type", "_content_length", "_content", "_signature", ) def __init__( self, *, magic: bytes, sig_type_code: int, sig_length: int, version: str, signer_id: str, file_type: int, content_type: int, content_length: int, content: bytes, signature: bytes, ) -> None: self._magic = magic self._sig_type_code = sig_type_code self._sig_length = sig_length self._version = version self._signer_id = signer_id self._file_type = file_type self._content_type = content_type self._content_length = content_length self._content = content self._signature = signature @classmethod def from_bytes(cls, data: bytes) -> SU3File: """Parse an SU3 file from raw bytes. Raises ValueError on invalid magic, unsupported format version, or truncated data. """ if len(data) < cls.HEADER_SIZE: raise ValueError( f"SU3 data too short for header: {len(data)} < {cls.HEADER_SIZE}" ) magic = data[0:6] if magic != cls.MAGIC: raise ValueError( f"Invalid SU3 magic: expected {cls.MAGIC!r}, got {magic!r}" ) file_format_version = data[7] if file_format_version != 0: raise ValueError( f"Unsupported SU3 file format version: {file_format_version}" ) sig_type_code = struct.unpack_from("!H", data, 8)[0] sig_length = struct.unpack_from("!H", data, 10)[0] version_length = data[13] signer_length = data[15] content_length = struct.unpack_from("!Q", data, 16)[0] file_type = data[25] content_type = data[27] # Variable-length fields start at offset 40 offset = cls.HEADER_SIZE # Version string needed = offset + version_length if len(data) < needed: raise ValueError( f"SU3 data truncated at version string: need {needed}, have {len(data)}" ) version_raw = data[offset : offset + version_length] version = version_raw.decode("utf-8").rstrip("\x00") offset += version_length # Signer ID needed = offset + signer_length if len(data) < needed: raise ValueError( f"SU3 data truncated at signer ID: need {needed}, have {len(data)}" ) signer_id = data[offset : offset + signer_length].decode("utf-8") offset += signer_length # Content needed = offset + content_length if len(data) < needed: raise ValueError( f"SU3 data truncated at content: need {needed}, have {len(data)}" ) content = data[offset : offset + content_length] offset += content_length # Signature needed = offset + sig_length if len(data) < needed: raise ValueError( f"SU3 data truncated at signature: need {needed}, have {len(data)}" ) signature = data[offset : offset + sig_length] return cls( magic=magic, sig_type_code=sig_type_code, sig_length=sig_length, version=version, signer_id=signer_id, file_type=file_type, content_type=content_type, content_length=content_length, content=content, signature=signature, ) # --- Properties --- @property def magic(self) -> bytes: return self._magic @property def sig_type_code(self) -> int: return self._sig_type_code @property def sig_length(self) -> int: return self._sig_length @property def version(self) -> str: return self._version @property def signer_id(self) -> str: return self._signer_id @property def file_type(self) -> int: return self._file_type @property def content_type(self) -> int: return self._content_type @property def content_length(self) -> int: return self._content_length # --- Methods --- def get_content(self) -> bytes: """Return the raw content bytes (e.g. ZIP data for reseed).""" return self._content def get_signature(self) -> bytes: """Return the raw signature bytes.""" return self._signature def is_reseed(self) -> bool: """Return True if this SU3 file is a reseed bundle.""" return self._content_type == self.CONTENT_RESEED def extract_routerinfos(self) -> list[bytes]: """Extract RouterInfo data from a reseed ZIP bundle. Returns a list of raw RouterInfo bytes, one per routerInfo-*.dat entry in the ZIP. Raises ValueError if file_type is not ZIP or content_type is not RESEED. """ if self._content_type != self.CONTENT_RESEED: raise ValueError( "extract_routerinfos() requires content_type RESEED, " f"got {self._content_type}" ) if self._file_type != self.TYPE_ZIP: raise ValueError( "extract_routerinfos() requires file_type ZIP, " f"got {self._file_type}" ) result: list[bytes] = [] with zipfile.ZipFile(io.BytesIO(self._content), "r") as zf: for name in zf.namelist(): if name.startswith("routerInfo-") and name.endswith(".dat"): result.append(zf.read(name)) return result