A Python port of the Invisible Internet Project (I2P)
at main 260 lines 7.5 kB view raw
1"""SU3 — I2P signed update file parser. 2 3Ported from net.i2p.crypto.SU3File. 4 5Wire format: 6 0-5 Magic "I2Psu3" (ASCII) 7 6 Reserved (0) 8 7 File format version (0) 9 8-9 Signature type code (big-endian) 10 10-11 Signature length (big-endian) 11 12 Reserved (0) 12 13 Version string length (1-255) 13 14 Reserved (0) 14 15 Signer ID length (1-255) 15 16-23 Content length (big-endian, 8 bytes) 16 24 Reserved (0) 17 25 File type 18 26 Reserved (0) 19 27 Content type 20 28-39 Reserved (12 zero bytes) 21 40 Version string (UTF-8, possibly null-padded) 22 40+vl Signer ID (UTF-8) 23 ... Content bytes (length = content_length) 24 ... Signature bytes (length = sig_length) 25""" 26from __future__ import annotations 27 28import io 29import struct 30import zipfile 31 32 33class SU3File: 34 """Parser for the I2P SU3 signed update file format.""" 35 36 MAGIC = b"I2Psu3" 37 HEADER_SIZE = 40 # Fixed header before variable-length fields 38 39 # File types 40 TYPE_ZIP = 0 41 TYPE_XML = 1 42 TYPE_HTML = 2 43 TYPE_XML_GZ = 3 44 TYPE_TXT_GZ = 4 45 46 # Content types 47 CONTENT_UNKNOWN = 0 48 CONTENT_ROUTER = 1 49 CONTENT_PLUGIN = 2 50 CONTENT_RESEED = 3 51 CONTENT_NEWS = 4 52 CONTENT_BLOCKLIST = 5 53 54 # Signature type codes and their expected lengths 55 SIG_TYPES = { 56 0x0000: ("DSA_SHA1", 40), 57 0x0003: ("ECDSA_SHA256_P256", 64), 58 0x0004: ("ECDSA_SHA384_P384", 96), 59 0x0005: ("ECDSA_SHA512_P521", 132), 60 0x0006: ("RSA_SHA256_2048", 256), 61 0x0007: ("RSA_SHA384_3072", 384), 62 0x0008: ("RSA_SHA512_4096", 512), 63 0x000B: ("EdDSA_SHA512_Ed25519", 64), 64 } 65 66 __slots__ = ( 67 "_magic", 68 "_sig_type_code", 69 "_sig_length", 70 "_version", 71 "_signer_id", 72 "_file_type", 73 "_content_type", 74 "_content_length", 75 "_content", 76 "_signature", 77 ) 78 79 def __init__( 80 self, 81 *, 82 magic: bytes, 83 sig_type_code: int, 84 sig_length: int, 85 version: str, 86 signer_id: str, 87 file_type: int, 88 content_type: int, 89 content_length: int, 90 content: bytes, 91 signature: bytes, 92 ) -> None: 93 self._magic = magic 94 self._sig_type_code = sig_type_code 95 self._sig_length = sig_length 96 self._version = version 97 self._signer_id = signer_id 98 self._file_type = file_type 99 self._content_type = content_type 100 self._content_length = content_length 101 self._content = content 102 self._signature = signature 103 104 @classmethod 105 def from_bytes(cls, data: bytes) -> SU3File: 106 """Parse an SU3 file from raw bytes. 107 108 Raises ValueError on invalid magic, unsupported format version, 109 or truncated data. 110 """ 111 if len(data) < cls.HEADER_SIZE: 112 raise ValueError( 113 f"SU3 data too short for header: {len(data)} < {cls.HEADER_SIZE}" 114 ) 115 116 magic = data[0:6] 117 if magic != cls.MAGIC: 118 raise ValueError( 119 f"Invalid SU3 magic: expected {cls.MAGIC!r}, got {magic!r}" 120 ) 121 122 file_format_version = data[7] 123 if file_format_version != 0: 124 raise ValueError( 125 f"Unsupported SU3 file format version: {file_format_version}" 126 ) 127 128 sig_type_code = struct.unpack_from("!H", data, 8)[0] 129 sig_length = struct.unpack_from("!H", data, 10)[0] 130 version_length = data[13] 131 signer_length = data[15] 132 content_length = struct.unpack_from("!Q", data, 16)[0] 133 file_type = data[25] 134 content_type = data[27] 135 136 # Variable-length fields start at offset 40 137 offset = cls.HEADER_SIZE 138 139 # Version string 140 needed = offset + version_length 141 if len(data) < needed: 142 raise ValueError( 143 f"SU3 data truncated at version string: need {needed}, have {len(data)}" 144 ) 145 version_raw = data[offset : offset + version_length] 146 version = version_raw.decode("utf-8").rstrip("\x00") 147 offset += version_length 148 149 # Signer ID 150 needed = offset + signer_length 151 if len(data) < needed: 152 raise ValueError( 153 f"SU3 data truncated at signer ID: need {needed}, have {len(data)}" 154 ) 155 signer_id = data[offset : offset + signer_length].decode("utf-8") 156 offset += signer_length 157 158 # Content 159 needed = offset + content_length 160 if len(data) < needed: 161 raise ValueError( 162 f"SU3 data truncated at content: need {needed}, have {len(data)}" 163 ) 164 content = data[offset : offset + content_length] 165 offset += content_length 166 167 # Signature 168 needed = offset + sig_length 169 if len(data) < needed: 170 raise ValueError( 171 f"SU3 data truncated at signature: need {needed}, have {len(data)}" 172 ) 173 signature = data[offset : offset + sig_length] 174 175 return cls( 176 magic=magic, 177 sig_type_code=sig_type_code, 178 sig_length=sig_length, 179 version=version, 180 signer_id=signer_id, 181 file_type=file_type, 182 content_type=content_type, 183 content_length=content_length, 184 content=content, 185 signature=signature, 186 ) 187 188 # --- Properties --- 189 190 @property 191 def magic(self) -> bytes: 192 return self._magic 193 194 @property 195 def sig_type_code(self) -> int: 196 return self._sig_type_code 197 198 @property 199 def sig_length(self) -> int: 200 return self._sig_length 201 202 @property 203 def version(self) -> str: 204 return self._version 205 206 @property 207 def signer_id(self) -> str: 208 return self._signer_id 209 210 @property 211 def file_type(self) -> int: 212 return self._file_type 213 214 @property 215 def content_type(self) -> int: 216 return self._content_type 217 218 @property 219 def content_length(self) -> int: 220 return self._content_length 221 222 # --- Methods --- 223 224 def get_content(self) -> bytes: 225 """Return the raw content bytes (e.g. ZIP data for reseed).""" 226 return self._content 227 228 def get_signature(self) -> bytes: 229 """Return the raw signature bytes.""" 230 return self._signature 231 232 def is_reseed(self) -> bool: 233 """Return True if this SU3 file is a reseed bundle.""" 234 return self._content_type == self.CONTENT_RESEED 235 236 def extract_routerinfos(self) -> list[bytes]: 237 """Extract RouterInfo data from a reseed ZIP bundle. 238 239 Returns a list of raw RouterInfo bytes, one per routerInfo-*.dat 240 entry in the ZIP. 241 242 Raises ValueError if file_type is not ZIP or content_type is not RESEED. 243 """ 244 if self._content_type != self.CONTENT_RESEED: 245 raise ValueError( 246 "extract_routerinfos() requires content_type RESEED, " 247 f"got {self._content_type}" 248 ) 249 if self._file_type != self.TYPE_ZIP: 250 raise ValueError( 251 "extract_routerinfos() requires file_type ZIP, " 252 f"got {self._file_type}" 253 ) 254 255 result: list[bytes] = [] 256 with zipfile.ZipFile(io.BytesIO(self._content), "r") as zf: 257 for name in zf.namelist(): 258 if name.startswith("routerInfo-") and name.endswith(".dat"): 259 result.append(zf.read(name)) 260 return result