A Python port of the Invisible Internet Project (I2P)
1"""EncryptedLeaseSet (Type 5) — two-layer ChaCha20 encrypted lease set.
2
3Ported from net.i2p.data.EncryptedLeaseSet.
4
5Wire format (outer):
6 blinded_sig_type(2) + blinded_spk(var) + published(4) +
7 expires_offset(2) + flags(2) + [offline_block] +
8 encrypted_size(2) + encrypted_data + outer_signature
9
10The encrypted_data contains two encryption layers:
11 Layer 1: ChaCha20 with key derived from subcredential + timestamp
12 Layer 2: ChaCha20 with key derived from subcredential + timestamp + optional cookie
13
14The inner plaintext is a LeaseSet2 (Type 3) or MetaLeaseSet (Type 7).
15"""
16
17from __future__ import annotations
18
19import hashlib
20import io
21import os
22import struct
23
24from i2p_crypto.chacha20 import ChaCha20
25from i2p_crypto.dsa import SigType
26from i2p_crypto.hkdf import HKDF
27from i2p_data.key_types import SigningPublicKey
28
29_HKDF = HKDF()
30
31
32# ---------------------------------------------------------------------------
33# Subcredential computation
34# ---------------------------------------------------------------------------
35
36def compute_credential(
37 spk: SigningPublicKey,
38 sig_type_in: SigType = SigType.EdDSA_SHA512_Ed25519,
39 sig_type_out: SigType = SigType.RedDSA_SHA512_Ed25519,
40) -> bytes:
41 """Compute the credential: SHA256("credential" + spk + sig_type_in(2) + sig_type_out(2))."""
42 return hashlib.sha256(
43 b"credential"
44 + spk.to_bytes()
45 + struct.pack("!H", sig_type_in.code)
46 + struct.pack("!H", sig_type_out.code)
47 ).digest()
48
49
50def compute_subcredential(credential: bytes, blinded_spk: bytes) -> bytes:
51 """Compute subcredential: SHA256("subcredential" + credential + blinded_spk)."""
52 return hashlib.sha256(
53 b"subcredential" + credential + blinded_spk
54 ).digest()
55
56
57# ---------------------------------------------------------------------------
58# EncryptedLeaseSet
59# ---------------------------------------------------------------------------
60
61class EncryptedLeaseSet:
62 """EncryptedLeaseSet (database type 5).
63
64 Uses a blinded signing public key instead of a Destination.
65 The inner LeaseSet2 is encrypted with two layers of ChaCha20.
66 """
67
68 TYPE = 5
69
70 # Auth types
71 AUTH_NONE = 0
72 AUTH_DH = 1
73 AUTH_PSK = 2
74
75 __slots__ = (
76 "_blinded_sig_type", "_blinded_spk", "_published", "_expires",
77 "_flags", "_offline_block", "_encrypted_data", "_signature",
78 )
79
80 def __init__(
81 self,
82 blinded_sig_type: SigType,
83 blinded_spk: bytes,
84 published: int,
85 expires: int,
86 flags: int = 0,
87 encrypted_data: bytes = b"",
88 signature: bytes = b"",
89 offline_block=None,
90 ) -> None:
91 self._blinded_sig_type = blinded_sig_type
92 self._blinded_spk = blinded_spk
93 self._published = published
94 self._expires = expires
95 self._flags = flags
96 self._encrypted_data = encrypted_data
97 self._signature = signature
98 self._offline_block = offline_block
99
100 # -- Properties ---------------------------------------------------------
101
102 @property
103 def blinded_sig_type(self) -> SigType:
104 return self._blinded_sig_type
105
106 @property
107 def blinded_spk(self) -> bytes:
108 return self._blinded_spk
109
110 @property
111 def published(self) -> int:
112 return self._published
113
114 @property
115 def expires(self) -> int:
116 return self._expires
117
118 @property
119 def flags(self) -> int:
120 return self._flags
121
122 @property
123 def encrypted_data(self) -> bytes:
124 return self._encrypted_data
125
126 @property
127 def signature(self) -> bytes:
128 return self._signature
129
130 # -- Hash ---------------------------------------------------------------
131
132 def compute_hash(self) -> bytes:
133 """Compute the NetDB hash: SHA256(blinded_sig_type(2) + blinded_spk)."""
134 return hashlib.sha256(
135 struct.pack("!H", self._blinded_sig_type.code) + self._blinded_spk
136 ).digest()
137
138 # -- Wire format --------------------------------------------------------
139
140 def _header_bytes(self) -> bytes:
141 """Serialize header: sig_type(2) + spk + published(4) + offset(2) + flags(2)."""
142 buf = io.BytesIO()
143 buf.write(struct.pack("!H", self._blinded_sig_type.code))
144 buf.write(self._blinded_spk)
145 buf.write(struct.pack("!I", self._published))
146 buf.write(struct.pack("!H", self._expires - self._published))
147 buf.write(struct.pack("!H", self._flags))
148 if self._offline_block is not None:
149 buf.write(self._offline_block.to_bytes())
150 return buf.getvalue()
151
152 def _signable_bytes(self) -> bytes:
153 """Bytes covered by the outer signature: type(1) + header + enc_len(2) + enc_data."""
154 header = self._header_bytes()
155 return (
156 bytes([self.TYPE])
157 + header
158 + struct.pack("!H", len(self._encrypted_data))
159 + self._encrypted_data
160 )
161
162 def to_bytes(self) -> bytes:
163 """Serialize to full wire format (without type byte)."""
164 header = self._header_bytes()
165 return (
166 header
167 + struct.pack("!H", len(self._encrypted_data))
168 + self._encrypted_data
169 + self._signature
170 )
171
172 @classmethod
173 def from_bytes(cls, data: bytes) -> "EncryptedLeaseSet":
174 """Deserialize from wire format."""
175 stream = io.BytesIO(data)
176
177 # Blinded sig type
178 raw = stream.read(2)
179 if len(raw) != 2:
180 raise ValueError("Truncated EncryptedLeaseSet (sig_type)")
181 sig_code = struct.unpack("!H", raw)[0]
182 blinded_sig_type = SigType.by_code(sig_code)
183 if blinded_sig_type is None:
184 raise ValueError(f"Unknown blinded sig type code: {sig_code}")
185
186 # Blinded SPK
187 spk_len = blinded_sig_type.pubkey_len
188 blinded_spk = stream.read(spk_len)
189 if len(blinded_spk) != spk_len:
190 raise ValueError("Truncated EncryptedLeaseSet (blinded_spk)")
191
192 # Published timestamp
193 raw = stream.read(4)
194 if len(raw) != 4:
195 raise ValueError("Truncated EncryptedLeaseSet (published)")
196 published = struct.unpack("!I", raw)[0]
197
198 # Expires offset
199 raw = stream.read(2)
200 if len(raw) != 2:
201 raise ValueError("Truncated EncryptedLeaseSet (expires_offset)")
202 expires_offset = struct.unpack("!H", raw)[0]
203 expires = published + expires_offset
204
205 # Flags
206 raw = stream.read(2)
207 if len(raw) != 2:
208 raise ValueError("Truncated EncryptedLeaseSet (flags)")
209 flags = struct.unpack("!H", raw)[0]
210
211 # Offline block (if flagged)
212 offline_block = None
213 if flags & 0x0001:
214 from i2p_data.lease_set2 import OfflineBlock
215 offline_block = OfflineBlock.from_stream(stream, blinded_sig_type, published)
216
217 # Encrypted data
218 raw = stream.read(2)
219 if len(raw) != 2:
220 raise ValueError("Truncated EncryptedLeaseSet (encrypted_len)")
221 enc_len = struct.unpack("!H", raw)[0]
222 encrypted_data = stream.read(enc_len)
223 if len(encrypted_data) != enc_len:
224 raise ValueError("Truncated EncryptedLeaseSet (encrypted_data)")
225
226 # Signature
227 sig_type = blinded_sig_type
228 if offline_block is not None:
229 sig_type = offline_block.transient_sig_type
230 sig_len = sig_type.sig_len
231 signature = stream.read(sig_len)
232 if len(signature) != sig_len:
233 raise ValueError(f"Truncated EncryptedLeaseSet (signature), expected {sig_len}")
234
235 return cls(
236 blinded_sig_type=blinded_sig_type,
237 blinded_spk=blinded_spk,
238 published=published,
239 expires=expires,
240 flags=flags,
241 encrypted_data=encrypted_data,
242 signature=signature,
243 offline_block=offline_block,
244 )
245
246 def sign(self, private_key_bytes: bytes, sig_type: SigType) -> None:
247 """Sign with the blinded private key."""
248 from i2p_crypto.dsa import DSAEngine
249 self._signature = DSAEngine.sign(
250 self._signable_bytes(), private_key_bytes, sig_type
251 )
252
253 def verify(self) -> bool:
254 """Verify the outer signature."""
255 if not self._signature:
256 return False
257 from i2p_crypto.dsa import DSAEngine
258 sig_type = self._blinded_sig_type
259 if self._offline_block is not None:
260 sig_type = self._offline_block.transient_sig_type
261 # Verify offline block first
262 if not self._offline_block.verify(
263 self._published, self._blinded_spk, self._blinded_sig_type
264 ):
265 return False
266 pub_key = self._offline_block.transient_spk
267 else:
268 pub_key = self._blinded_spk
269 return DSAEngine.verify(
270 self._signable_bytes(), self._signature, pub_key, sig_type
271 )
272
273 # -- Encryption ---------------------------------------------------------
274
275 @staticmethod
276 def encrypt_inner(
277 inner_bytes: bytes,
278 subcredential: bytes,
279 published: int,
280 auth_type: int = 0,
281 auth_cookie: bytes | None = None,
282 ) -> bytes:
283 """Encrypt inner LS bytes with two layers of ChaCha20.
284
285 Returns the encrypted_data field (outer_salt + layer1_ciphertext).
286
287 Layer 2 (inner): encrypt inner_bytes with key from subcredential + timestamp
288 Layer 1 (outer): encrypt (auth_flag + inner_salt + layer2_ciphertext)
289 """
290 published_bytes = struct.pack("!I", published)
291
292 # Layer 2: encrypt inner_bytes
293 inner_salt = os.urandom(32)
294
295 if auth_type == 0:
296 ikm2 = subcredential + published_bytes
297 else:
298 if auth_cookie is None:
299 raise ValueError("auth_cookie required for auth_type != 0")
300 ikm2 = auth_cookie + subcredential + published_bytes
301
302 key2 = bytearray(32)
303 iv2 = bytearray(32)
304 _HKDF.calculate(inner_salt, ikm2, "ELS2_L2K", out=key2, out2=iv2)
305 layer2_ciphertext = ChaCha20.encrypt(bytes(key2), bytes(iv2[:12]), inner_bytes)
306
307 # Build layer 1 plaintext: auth_flag(1) + inner_salt(32) + layer2_ciphertext
308 if auth_type == 0:
309 auth_section = bytes([0x00])
310 else:
311 # Simplified: store auth_type flag only (full per-client auth
312 # with encrypted cookies would go here in a production impl)
313 auth_section = bytes([auth_type & 0x0F])
314
315 layer1_plaintext = auth_section + inner_salt + layer2_ciphertext
316
317 # Layer 1: encrypt with outer salt
318 outer_salt = os.urandom(32)
319 ikm1 = subcredential + published_bytes
320 key1 = bytearray(32)
321 iv1 = bytearray(32)
322 _HKDF.calculate(outer_salt, ikm1, "ELS2_L1K", out=key1, out2=iv1)
323 layer1_ciphertext = ChaCha20.encrypt(bytes(key1), bytes(iv1[:12]), layer1_plaintext)
324
325 return outer_salt + layer1_ciphertext
326
327 @staticmethod
328 def decrypt_inner(
329 encrypted_data: bytes,
330 subcredential: bytes,
331 published: int,
332 auth_type: int = 0,
333 auth_cookie: bytes | None = None,
334 ) -> bytes:
335 """Decrypt the encrypted_data field to recover inner LS bytes.
336
337 Args:
338 encrypted_data: outer_salt(32) + layer1_ciphertext
339 subcredential: 32-byte subcredential
340 published: published timestamp (seconds)
341 auth_type: 0=none, 1=DH, 2=PSK
342 auth_cookie: 32-byte cookie (required if auth_type != 0)
343
344 Returns:
345 Decrypted inner LS bytes.
346 """
347 published_bytes = struct.pack("!I", published)
348
349 # Layer 1: decrypt
350 outer_salt = encrypted_data[:32]
351 layer1_ciphertext = encrypted_data[32:]
352
353 ikm1 = subcredential + published_bytes
354 key1 = bytearray(32)
355 iv1 = bytearray(32)
356 _HKDF.calculate(outer_salt, ikm1, "ELS2_L1K", out=key1, out2=iv1)
357 layer1_plaintext = ChaCha20.decrypt(bytes(key1), bytes(iv1[:12]), layer1_ciphertext)
358
359 # Parse layer 1 plaintext: auth_flag(1) + inner_salt(32) + layer2_ciphertext
360 auth_flag = layer1_plaintext[0]
361 offset = 1
362
363 # Skip auth section (simplified — in full impl, would parse per-client entries)
364 inner_salt = layer1_plaintext[offset:offset + 32]
365 offset += 32
366 layer2_ciphertext = layer1_plaintext[offset:]
367
368 # Layer 2: decrypt
369 if auth_type == 0:
370 ikm2 = subcredential + published_bytes
371 else:
372 if auth_cookie is None:
373 raise ValueError("auth_cookie required for auth_type != 0")
374 ikm2 = auth_cookie + subcredential + published_bytes
375
376 key2 = bytearray(32)
377 iv2 = bytearray(32)
378 _HKDF.calculate(inner_salt, ikm2, "ELS2_L2K", out=key2, out2=iv2)
379 return ChaCha20.decrypt(bytes(key2), bytes(iv2[:12]), layer2_ciphertext)
380
381 def __repr__(self) -> str:
382 return (
383 f"EncryptedLeaseSet(sig_type={self._blinded_sig_type.name}, "
384 f"enc_len={len(self._encrypted_data)})"
385 )