A Python port of the Invisible Internet Project (I2P)
at main 385 lines 13 kB view raw
1"""EncryptedLeaseSet (Type 5) — two-layer ChaCha20 encrypted lease set. 2 3Ported from net.i2p.data.EncryptedLeaseSet. 4 5Wire format (outer): 6 blinded_sig_type(2) + blinded_spk(var) + published(4) + 7 expires_offset(2) + flags(2) + [offline_block] + 8 encrypted_size(2) + encrypted_data + outer_signature 9 10The encrypted_data contains two encryption layers: 11 Layer 1: ChaCha20 with key derived from subcredential + timestamp 12 Layer 2: ChaCha20 with key derived from subcredential + timestamp + optional cookie 13 14The inner plaintext is a LeaseSet2 (Type 3) or MetaLeaseSet (Type 7). 15""" 16 17from __future__ import annotations 18 19import hashlib 20import io 21import os 22import struct 23 24from i2p_crypto.chacha20 import ChaCha20 25from i2p_crypto.dsa import SigType 26from i2p_crypto.hkdf import HKDF 27from i2p_data.key_types import SigningPublicKey 28 29_HKDF = HKDF() 30 31 32# --------------------------------------------------------------------------- 33# Subcredential computation 34# --------------------------------------------------------------------------- 35 36def compute_credential( 37 spk: SigningPublicKey, 38 sig_type_in: SigType = SigType.EdDSA_SHA512_Ed25519, 39 sig_type_out: SigType = SigType.RedDSA_SHA512_Ed25519, 40) -> bytes: 41 """Compute the credential: SHA256("credential" + spk + sig_type_in(2) + sig_type_out(2)).""" 42 return hashlib.sha256( 43 b"credential" 44 + spk.to_bytes() 45 + struct.pack("!H", sig_type_in.code) 46 + struct.pack("!H", sig_type_out.code) 47 ).digest() 48 49 50def compute_subcredential(credential: bytes, blinded_spk: bytes) -> bytes: 51 """Compute subcredential: SHA256("subcredential" + credential + blinded_spk).""" 52 return hashlib.sha256( 53 b"subcredential" + credential + blinded_spk 54 ).digest() 55 56 57# --------------------------------------------------------------------------- 58# EncryptedLeaseSet 59# --------------------------------------------------------------------------- 60 61class EncryptedLeaseSet: 62 """EncryptedLeaseSet (database type 5). 63 64 Uses a blinded signing public key instead of a Destination. 65 The inner LeaseSet2 is encrypted with two layers of ChaCha20. 66 """ 67 68 TYPE = 5 69 70 # Auth types 71 AUTH_NONE = 0 72 AUTH_DH = 1 73 AUTH_PSK = 2 74 75 __slots__ = ( 76 "_blinded_sig_type", "_blinded_spk", "_published", "_expires", 77 "_flags", "_offline_block", "_encrypted_data", "_signature", 78 ) 79 80 def __init__( 81 self, 82 blinded_sig_type: SigType, 83 blinded_spk: bytes, 84 published: int, 85 expires: int, 86 flags: int = 0, 87 encrypted_data: bytes = b"", 88 signature: bytes = b"", 89 offline_block=None, 90 ) -> None: 91 self._blinded_sig_type = blinded_sig_type 92 self._blinded_spk = blinded_spk 93 self._published = published 94 self._expires = expires 95 self._flags = flags 96 self._encrypted_data = encrypted_data 97 self._signature = signature 98 self._offline_block = offline_block 99 100 # -- Properties --------------------------------------------------------- 101 102 @property 103 def blinded_sig_type(self) -> SigType: 104 return self._blinded_sig_type 105 106 @property 107 def blinded_spk(self) -> bytes: 108 return self._blinded_spk 109 110 @property 111 def published(self) -> int: 112 return self._published 113 114 @property 115 def expires(self) -> int: 116 return self._expires 117 118 @property 119 def flags(self) -> int: 120 return self._flags 121 122 @property 123 def encrypted_data(self) -> bytes: 124 return self._encrypted_data 125 126 @property 127 def signature(self) -> bytes: 128 return self._signature 129 130 # -- Hash --------------------------------------------------------------- 131 132 def compute_hash(self) -> bytes: 133 """Compute the NetDB hash: SHA256(blinded_sig_type(2) + blinded_spk).""" 134 return hashlib.sha256( 135 struct.pack("!H", self._blinded_sig_type.code) + self._blinded_spk 136 ).digest() 137 138 # -- Wire format -------------------------------------------------------- 139 140 def _header_bytes(self) -> bytes: 141 """Serialize header: sig_type(2) + spk + published(4) + offset(2) + flags(2).""" 142 buf = io.BytesIO() 143 buf.write(struct.pack("!H", self._blinded_sig_type.code)) 144 buf.write(self._blinded_spk) 145 buf.write(struct.pack("!I", self._published)) 146 buf.write(struct.pack("!H", self._expires - self._published)) 147 buf.write(struct.pack("!H", self._flags)) 148 if self._offline_block is not None: 149 buf.write(self._offline_block.to_bytes()) 150 return buf.getvalue() 151 152 def _signable_bytes(self) -> bytes: 153 """Bytes covered by the outer signature: type(1) + header + enc_len(2) + enc_data.""" 154 header = self._header_bytes() 155 return ( 156 bytes([self.TYPE]) 157 + header 158 + struct.pack("!H", len(self._encrypted_data)) 159 + self._encrypted_data 160 ) 161 162 def to_bytes(self) -> bytes: 163 """Serialize to full wire format (without type byte).""" 164 header = self._header_bytes() 165 return ( 166 header 167 + struct.pack("!H", len(self._encrypted_data)) 168 + self._encrypted_data 169 + self._signature 170 ) 171 172 @classmethod 173 def from_bytes(cls, data: bytes) -> "EncryptedLeaseSet": 174 """Deserialize from wire format.""" 175 stream = io.BytesIO(data) 176 177 # Blinded sig type 178 raw = stream.read(2) 179 if len(raw) != 2: 180 raise ValueError("Truncated EncryptedLeaseSet (sig_type)") 181 sig_code = struct.unpack("!H", raw)[0] 182 blinded_sig_type = SigType.by_code(sig_code) 183 if blinded_sig_type is None: 184 raise ValueError(f"Unknown blinded sig type code: {sig_code}") 185 186 # Blinded SPK 187 spk_len = blinded_sig_type.pubkey_len 188 blinded_spk = stream.read(spk_len) 189 if len(blinded_spk) != spk_len: 190 raise ValueError("Truncated EncryptedLeaseSet (blinded_spk)") 191 192 # Published timestamp 193 raw = stream.read(4) 194 if len(raw) != 4: 195 raise ValueError("Truncated EncryptedLeaseSet (published)") 196 published = struct.unpack("!I", raw)[0] 197 198 # Expires offset 199 raw = stream.read(2) 200 if len(raw) != 2: 201 raise ValueError("Truncated EncryptedLeaseSet (expires_offset)") 202 expires_offset = struct.unpack("!H", raw)[0] 203 expires = published + expires_offset 204 205 # Flags 206 raw = stream.read(2) 207 if len(raw) != 2: 208 raise ValueError("Truncated EncryptedLeaseSet (flags)") 209 flags = struct.unpack("!H", raw)[0] 210 211 # Offline block (if flagged) 212 offline_block = None 213 if flags & 0x0001: 214 from i2p_data.lease_set2 import OfflineBlock 215 offline_block = OfflineBlock.from_stream(stream, blinded_sig_type, published) 216 217 # Encrypted data 218 raw = stream.read(2) 219 if len(raw) != 2: 220 raise ValueError("Truncated EncryptedLeaseSet (encrypted_len)") 221 enc_len = struct.unpack("!H", raw)[0] 222 encrypted_data = stream.read(enc_len) 223 if len(encrypted_data) != enc_len: 224 raise ValueError("Truncated EncryptedLeaseSet (encrypted_data)") 225 226 # Signature 227 sig_type = blinded_sig_type 228 if offline_block is not None: 229 sig_type = offline_block.transient_sig_type 230 sig_len = sig_type.sig_len 231 signature = stream.read(sig_len) 232 if len(signature) != sig_len: 233 raise ValueError(f"Truncated EncryptedLeaseSet (signature), expected {sig_len}") 234 235 return cls( 236 blinded_sig_type=blinded_sig_type, 237 blinded_spk=blinded_spk, 238 published=published, 239 expires=expires, 240 flags=flags, 241 encrypted_data=encrypted_data, 242 signature=signature, 243 offline_block=offline_block, 244 ) 245 246 def sign(self, private_key_bytes: bytes, sig_type: SigType) -> None: 247 """Sign with the blinded private key.""" 248 from i2p_crypto.dsa import DSAEngine 249 self._signature = DSAEngine.sign( 250 self._signable_bytes(), private_key_bytes, sig_type 251 ) 252 253 def verify(self) -> bool: 254 """Verify the outer signature.""" 255 if not self._signature: 256 return False 257 from i2p_crypto.dsa import DSAEngine 258 sig_type = self._blinded_sig_type 259 if self._offline_block is not None: 260 sig_type = self._offline_block.transient_sig_type 261 # Verify offline block first 262 if not self._offline_block.verify( 263 self._published, self._blinded_spk, self._blinded_sig_type 264 ): 265 return False 266 pub_key = self._offline_block.transient_spk 267 else: 268 pub_key = self._blinded_spk 269 return DSAEngine.verify( 270 self._signable_bytes(), self._signature, pub_key, sig_type 271 ) 272 273 # -- Encryption --------------------------------------------------------- 274 275 @staticmethod 276 def encrypt_inner( 277 inner_bytes: bytes, 278 subcredential: bytes, 279 published: int, 280 auth_type: int = 0, 281 auth_cookie: bytes | None = None, 282 ) -> bytes: 283 """Encrypt inner LS bytes with two layers of ChaCha20. 284 285 Returns the encrypted_data field (outer_salt + layer1_ciphertext). 286 287 Layer 2 (inner): encrypt inner_bytes with key from subcredential + timestamp 288 Layer 1 (outer): encrypt (auth_flag + inner_salt + layer2_ciphertext) 289 """ 290 published_bytes = struct.pack("!I", published) 291 292 # Layer 2: encrypt inner_bytes 293 inner_salt = os.urandom(32) 294 295 if auth_type == 0: 296 ikm2 = subcredential + published_bytes 297 else: 298 if auth_cookie is None: 299 raise ValueError("auth_cookie required for auth_type != 0") 300 ikm2 = auth_cookie + subcredential + published_bytes 301 302 key2 = bytearray(32) 303 iv2 = bytearray(32) 304 _HKDF.calculate(inner_salt, ikm2, "ELS2_L2K", out=key2, out2=iv2) 305 layer2_ciphertext = ChaCha20.encrypt(bytes(key2), bytes(iv2[:12]), inner_bytes) 306 307 # Build layer 1 plaintext: auth_flag(1) + inner_salt(32) + layer2_ciphertext 308 if auth_type == 0: 309 auth_section = bytes([0x00]) 310 else: 311 # Simplified: store auth_type flag only (full per-client auth 312 # with encrypted cookies would go here in a production impl) 313 auth_section = bytes([auth_type & 0x0F]) 314 315 layer1_plaintext = auth_section + inner_salt + layer2_ciphertext 316 317 # Layer 1: encrypt with outer salt 318 outer_salt = os.urandom(32) 319 ikm1 = subcredential + published_bytes 320 key1 = bytearray(32) 321 iv1 = bytearray(32) 322 _HKDF.calculate(outer_salt, ikm1, "ELS2_L1K", out=key1, out2=iv1) 323 layer1_ciphertext = ChaCha20.encrypt(bytes(key1), bytes(iv1[:12]), layer1_plaintext) 324 325 return outer_salt + layer1_ciphertext 326 327 @staticmethod 328 def decrypt_inner( 329 encrypted_data: bytes, 330 subcredential: bytes, 331 published: int, 332 auth_type: int = 0, 333 auth_cookie: bytes | None = None, 334 ) -> bytes: 335 """Decrypt the encrypted_data field to recover inner LS bytes. 336 337 Args: 338 encrypted_data: outer_salt(32) + layer1_ciphertext 339 subcredential: 32-byte subcredential 340 published: published timestamp (seconds) 341 auth_type: 0=none, 1=DH, 2=PSK 342 auth_cookie: 32-byte cookie (required if auth_type != 0) 343 344 Returns: 345 Decrypted inner LS bytes. 346 """ 347 published_bytes = struct.pack("!I", published) 348 349 # Layer 1: decrypt 350 outer_salt = encrypted_data[:32] 351 layer1_ciphertext = encrypted_data[32:] 352 353 ikm1 = subcredential + published_bytes 354 key1 = bytearray(32) 355 iv1 = bytearray(32) 356 _HKDF.calculate(outer_salt, ikm1, "ELS2_L1K", out=key1, out2=iv1) 357 layer1_plaintext = ChaCha20.decrypt(bytes(key1), bytes(iv1[:12]), layer1_ciphertext) 358 359 # Parse layer 1 plaintext: auth_flag(1) + inner_salt(32) + layer2_ciphertext 360 auth_flag = layer1_plaintext[0] 361 offset = 1 362 363 # Skip auth section (simplified — in full impl, would parse per-client entries) 364 inner_salt = layer1_plaintext[offset:offset + 32] 365 offset += 32 366 layer2_ciphertext = layer1_plaintext[offset:] 367 368 # Layer 2: decrypt 369 if auth_type == 0: 370 ikm2 = subcredential + published_bytes 371 else: 372 if auth_cookie is None: 373 raise ValueError("auth_cookie required for auth_type != 0") 374 ikm2 = auth_cookie + subcredential + published_bytes 375 376 key2 = bytearray(32) 377 iv2 = bytearray(32) 378 _HKDF.calculate(inner_salt, ikm2, "ELS2_L2K", out=key2, out2=iv2) 379 return ChaCha20.decrypt(bytes(key2), bytes(iv2[:12]), layer2_ciphertext) 380 381 def __repr__(self) -> str: 382 return ( 383 f"EncryptedLeaseSet(sig_type={self._blinded_sig_type.name}, " 384 f"enc_len={len(self._encrypted_data)})" 385 )