A Python port of the Invisible Internet Project (I2P)
1"""SU3 — I2P signed update file parser.
2
3Ported from net.i2p.crypto.SU3File.
4
5Wire format:
6 0-5 Magic "I2Psu3" (ASCII)
7 6 Reserved (0)
8 7 File format version (0)
9 8-9 Signature type code (big-endian)
10 10-11 Signature length (big-endian)
11 12 Reserved (0)
12 13 Version string length (1-255)
13 14 Reserved (0)
14 15 Signer ID length (1-255)
15 16-23 Content length (big-endian, 8 bytes)
16 24 Reserved (0)
17 25 File type
18 26 Reserved (0)
19 27 Content type
20 28-39 Reserved (12 zero bytes)
21 40 Version string (UTF-8, possibly null-padded)
22 40+vl Signer ID (UTF-8)
23 ... Content bytes (length = content_length)
24 ... Signature bytes (length = sig_length)
25"""
26from __future__ import annotations
27
28import io
29import struct
30import zipfile
31
32
33class SU3File:
34 """Parser for the I2P SU3 signed update file format."""
35
36 MAGIC = b"I2Psu3"
37 HEADER_SIZE = 40 # Fixed header before variable-length fields
38
39 # File types
40 TYPE_ZIP = 0
41 TYPE_XML = 1
42 TYPE_HTML = 2
43 TYPE_XML_GZ = 3
44 TYPE_TXT_GZ = 4
45
46 # Content types
47 CONTENT_UNKNOWN = 0
48 CONTENT_ROUTER = 1
49 CONTENT_PLUGIN = 2
50 CONTENT_RESEED = 3
51 CONTENT_NEWS = 4
52 CONTENT_BLOCKLIST = 5
53
54 # Signature type codes and their expected lengths
55 SIG_TYPES = {
56 0x0000: ("DSA_SHA1", 40),
57 0x0003: ("ECDSA_SHA256_P256", 64),
58 0x0004: ("ECDSA_SHA384_P384", 96),
59 0x0005: ("ECDSA_SHA512_P521", 132),
60 0x0006: ("RSA_SHA256_2048", 256),
61 0x0007: ("RSA_SHA384_3072", 384),
62 0x0008: ("RSA_SHA512_4096", 512),
63 0x000B: ("EdDSA_SHA512_Ed25519", 64),
64 }
65
66 __slots__ = (
67 "_magic",
68 "_sig_type_code",
69 "_sig_length",
70 "_version",
71 "_signer_id",
72 "_file_type",
73 "_content_type",
74 "_content_length",
75 "_content",
76 "_signature",
77 )
78
79 def __init__(
80 self,
81 *,
82 magic: bytes,
83 sig_type_code: int,
84 sig_length: int,
85 version: str,
86 signer_id: str,
87 file_type: int,
88 content_type: int,
89 content_length: int,
90 content: bytes,
91 signature: bytes,
92 ) -> None:
93 self._magic = magic
94 self._sig_type_code = sig_type_code
95 self._sig_length = sig_length
96 self._version = version
97 self._signer_id = signer_id
98 self._file_type = file_type
99 self._content_type = content_type
100 self._content_length = content_length
101 self._content = content
102 self._signature = signature
103
104 @classmethod
105 def from_bytes(cls, data: bytes) -> SU3File:
106 """Parse an SU3 file from raw bytes.
107
108 Raises ValueError on invalid magic, unsupported format version,
109 or truncated data.
110 """
111 if len(data) < cls.HEADER_SIZE:
112 raise ValueError(
113 f"SU3 data too short for header: {len(data)} < {cls.HEADER_SIZE}"
114 )
115
116 magic = data[0:6]
117 if magic != cls.MAGIC:
118 raise ValueError(
119 f"Invalid SU3 magic: expected {cls.MAGIC!r}, got {magic!r}"
120 )
121
122 file_format_version = data[7]
123 if file_format_version != 0:
124 raise ValueError(
125 f"Unsupported SU3 file format version: {file_format_version}"
126 )
127
128 sig_type_code = struct.unpack_from("!H", data, 8)[0]
129 sig_length = struct.unpack_from("!H", data, 10)[0]
130 version_length = data[13]
131 signer_length = data[15]
132 content_length = struct.unpack_from("!Q", data, 16)[0]
133 file_type = data[25]
134 content_type = data[27]
135
136 # Variable-length fields start at offset 40
137 offset = cls.HEADER_SIZE
138
139 # Version string
140 needed = offset + version_length
141 if len(data) < needed:
142 raise ValueError(
143 f"SU3 data truncated at version string: need {needed}, have {len(data)}"
144 )
145 version_raw = data[offset : offset + version_length]
146 version = version_raw.decode("utf-8").rstrip("\x00")
147 offset += version_length
148
149 # Signer ID
150 needed = offset + signer_length
151 if len(data) < needed:
152 raise ValueError(
153 f"SU3 data truncated at signer ID: need {needed}, have {len(data)}"
154 )
155 signer_id = data[offset : offset + signer_length].decode("utf-8")
156 offset += signer_length
157
158 # Content
159 needed = offset + content_length
160 if len(data) < needed:
161 raise ValueError(
162 f"SU3 data truncated at content: need {needed}, have {len(data)}"
163 )
164 content = data[offset : offset + content_length]
165 offset += content_length
166
167 # Signature
168 needed = offset + sig_length
169 if len(data) < needed:
170 raise ValueError(
171 f"SU3 data truncated at signature: need {needed}, have {len(data)}"
172 )
173 signature = data[offset : offset + sig_length]
174
175 return cls(
176 magic=magic,
177 sig_type_code=sig_type_code,
178 sig_length=sig_length,
179 version=version,
180 signer_id=signer_id,
181 file_type=file_type,
182 content_type=content_type,
183 content_length=content_length,
184 content=content,
185 signature=signature,
186 )
187
188 # --- Properties ---
189
190 @property
191 def magic(self) -> bytes:
192 return self._magic
193
194 @property
195 def sig_type_code(self) -> int:
196 return self._sig_type_code
197
198 @property
199 def sig_length(self) -> int:
200 return self._sig_length
201
202 @property
203 def version(self) -> str:
204 return self._version
205
206 @property
207 def signer_id(self) -> str:
208 return self._signer_id
209
210 @property
211 def file_type(self) -> int:
212 return self._file_type
213
214 @property
215 def content_type(self) -> int:
216 return self._content_type
217
218 @property
219 def content_length(self) -> int:
220 return self._content_length
221
222 # --- Methods ---
223
224 def get_content(self) -> bytes:
225 """Return the raw content bytes (e.g. ZIP data for reseed)."""
226 return self._content
227
228 def get_signature(self) -> bytes:
229 """Return the raw signature bytes."""
230 return self._signature
231
232 def is_reseed(self) -> bool:
233 """Return True if this SU3 file is a reseed bundle."""
234 return self._content_type == self.CONTENT_RESEED
235
236 def extract_routerinfos(self) -> list[bytes]:
237 """Extract RouterInfo data from a reseed ZIP bundle.
238
239 Returns a list of raw RouterInfo bytes, one per routerInfo-*.dat
240 entry in the ZIP.
241
242 Raises ValueError if file_type is not ZIP or content_type is not RESEED.
243 """
244 if self._content_type != self.CONTENT_RESEED:
245 raise ValueError(
246 "extract_routerinfos() requires content_type RESEED, "
247 f"got {self._content_type}"
248 )
249 if self._file_type != self.TYPE_ZIP:
250 raise ValueError(
251 "extract_routerinfos() requires file_type ZIP, "
252 f"got {self._file_type}"
253 )
254
255 result: list[bytes] = []
256 with zipfile.ZipFile(io.BytesIO(self._content), "r") as zf:
257 for name in zf.namelist():
258 if name.startswith("routerInfo-") and name.endswith(".dat"):
259 result.append(zf.read(name))
260 return result