"""EncryptedLeaseSet (Type 5) — two-layer ChaCha20 encrypted lease set. Ported from net.i2p.data.EncryptedLeaseSet. Wire format (outer): blinded_sig_type(2) + blinded_spk(var) + published(4) + expires_offset(2) + flags(2) + [offline_block] + encrypted_size(2) + encrypted_data + outer_signature The encrypted_data contains two encryption layers: Layer 1: ChaCha20 with key derived from subcredential + timestamp Layer 2: ChaCha20 with key derived from subcredential + timestamp + optional cookie The inner plaintext is a LeaseSet2 (Type 3) or MetaLeaseSet (Type 7). """ from __future__ import annotations import hashlib import io import os import struct from i2p_crypto.chacha20 import ChaCha20 from i2p_crypto.dsa import SigType from i2p_crypto.hkdf import HKDF from i2p_data.key_types import SigningPublicKey _HKDF = HKDF() # --------------------------------------------------------------------------- # Subcredential computation # --------------------------------------------------------------------------- def compute_credential( spk: SigningPublicKey, sig_type_in: SigType = SigType.EdDSA_SHA512_Ed25519, sig_type_out: SigType = SigType.RedDSA_SHA512_Ed25519, ) -> bytes: """Compute the credential: SHA256("credential" + spk + sig_type_in(2) + sig_type_out(2)).""" return hashlib.sha256( b"credential" + spk.to_bytes() + struct.pack("!H", sig_type_in.code) + struct.pack("!H", sig_type_out.code) ).digest() def compute_subcredential(credential: bytes, blinded_spk: bytes) -> bytes: """Compute subcredential: SHA256("subcredential" + credential + blinded_spk).""" return hashlib.sha256( b"subcredential" + credential + blinded_spk ).digest() # --------------------------------------------------------------------------- # EncryptedLeaseSet # --------------------------------------------------------------------------- class EncryptedLeaseSet: """EncryptedLeaseSet (database type 5). Uses a blinded signing public key instead of a Destination. The inner LeaseSet2 is encrypted with two layers of ChaCha20. """ TYPE = 5 # Auth types AUTH_NONE = 0 AUTH_DH = 1 AUTH_PSK = 2 __slots__ = ( "_blinded_sig_type", "_blinded_spk", "_published", "_expires", "_flags", "_offline_block", "_encrypted_data", "_signature", ) def __init__( self, blinded_sig_type: SigType, blinded_spk: bytes, published: int, expires: int, flags: int = 0, encrypted_data: bytes = b"", signature: bytes = b"", offline_block=None, ) -> None: self._blinded_sig_type = blinded_sig_type self._blinded_spk = blinded_spk self._published = published self._expires = expires self._flags = flags self._encrypted_data = encrypted_data self._signature = signature self._offline_block = offline_block # -- Properties --------------------------------------------------------- @property def blinded_sig_type(self) -> SigType: return self._blinded_sig_type @property def blinded_spk(self) -> bytes: return self._blinded_spk @property def published(self) -> int: return self._published @property def expires(self) -> int: return self._expires @property def flags(self) -> int: return self._flags @property def encrypted_data(self) -> bytes: return self._encrypted_data @property def signature(self) -> bytes: return self._signature # -- Hash --------------------------------------------------------------- def compute_hash(self) -> bytes: """Compute the NetDB hash: SHA256(blinded_sig_type(2) + blinded_spk).""" return hashlib.sha256( struct.pack("!H", self._blinded_sig_type.code) + self._blinded_spk ).digest() # -- Wire format -------------------------------------------------------- def _header_bytes(self) -> bytes: """Serialize header: sig_type(2) + spk + published(4) + offset(2) + flags(2).""" buf = io.BytesIO() buf.write(struct.pack("!H", self._blinded_sig_type.code)) buf.write(self._blinded_spk) buf.write(struct.pack("!I", self._published)) buf.write(struct.pack("!H", self._expires - self._published)) buf.write(struct.pack("!H", self._flags)) if self._offline_block is not None: buf.write(self._offline_block.to_bytes()) return buf.getvalue() def _signable_bytes(self) -> bytes: """Bytes covered by the outer signature: type(1) + header + enc_len(2) + enc_data.""" header = self._header_bytes() return ( bytes([self.TYPE]) + header + struct.pack("!H", len(self._encrypted_data)) + self._encrypted_data ) def to_bytes(self) -> bytes: """Serialize to full wire format (without type byte).""" header = self._header_bytes() return ( header + struct.pack("!H", len(self._encrypted_data)) + self._encrypted_data + self._signature ) @classmethod def from_bytes(cls, data: bytes) -> "EncryptedLeaseSet": """Deserialize from wire format.""" stream = io.BytesIO(data) # Blinded sig type raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated EncryptedLeaseSet (sig_type)") sig_code = struct.unpack("!H", raw)[0] blinded_sig_type = SigType.by_code(sig_code) if blinded_sig_type is None: raise ValueError(f"Unknown blinded sig type code: {sig_code}") # Blinded SPK spk_len = blinded_sig_type.pubkey_len blinded_spk = stream.read(spk_len) if len(blinded_spk) != spk_len: raise ValueError("Truncated EncryptedLeaseSet (blinded_spk)") # Published timestamp raw = stream.read(4) if len(raw) != 4: raise ValueError("Truncated EncryptedLeaseSet (published)") published = struct.unpack("!I", raw)[0] # Expires offset raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated EncryptedLeaseSet (expires_offset)") expires_offset = struct.unpack("!H", raw)[0] expires = published + expires_offset # Flags raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated EncryptedLeaseSet (flags)") flags = struct.unpack("!H", raw)[0] # Offline block (if flagged) offline_block = None if flags & 0x0001: from i2p_data.lease_set2 import OfflineBlock offline_block = OfflineBlock.from_stream(stream, blinded_sig_type, published) # Encrypted data raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated EncryptedLeaseSet (encrypted_len)") enc_len = struct.unpack("!H", raw)[0] encrypted_data = stream.read(enc_len) if len(encrypted_data) != enc_len: raise ValueError("Truncated EncryptedLeaseSet (encrypted_data)") # Signature sig_type = blinded_sig_type if offline_block is not None: sig_type = offline_block.transient_sig_type sig_len = sig_type.sig_len signature = stream.read(sig_len) if len(signature) != sig_len: raise ValueError(f"Truncated EncryptedLeaseSet (signature), expected {sig_len}") return cls( blinded_sig_type=blinded_sig_type, blinded_spk=blinded_spk, published=published, expires=expires, flags=flags, encrypted_data=encrypted_data, signature=signature, offline_block=offline_block, ) def sign(self, private_key_bytes: bytes, sig_type: SigType) -> None: """Sign with the blinded private key.""" from i2p_crypto.dsa import DSAEngine self._signature = DSAEngine.sign( self._signable_bytes(), private_key_bytes, sig_type ) def verify(self) -> bool: """Verify the outer signature.""" if not self._signature: return False from i2p_crypto.dsa import DSAEngine sig_type = self._blinded_sig_type if self._offline_block is not None: sig_type = self._offline_block.transient_sig_type # Verify offline block first if not self._offline_block.verify( self._published, self._blinded_spk, self._blinded_sig_type ): return False pub_key = self._offline_block.transient_spk else: pub_key = self._blinded_spk return DSAEngine.verify( self._signable_bytes(), self._signature, pub_key, sig_type ) # -- Encryption --------------------------------------------------------- @staticmethod def encrypt_inner( inner_bytes: bytes, subcredential: bytes, published: int, auth_type: int = 0, auth_cookie: bytes | None = None, ) -> bytes: """Encrypt inner LS bytes with two layers of ChaCha20. Returns the encrypted_data field (outer_salt + layer1_ciphertext). Layer 2 (inner): encrypt inner_bytes with key from subcredential + timestamp Layer 1 (outer): encrypt (auth_flag + inner_salt + layer2_ciphertext) """ published_bytes = struct.pack("!I", published) # Layer 2: encrypt inner_bytes inner_salt = os.urandom(32) if auth_type == 0: ikm2 = subcredential + published_bytes else: if auth_cookie is None: raise ValueError("auth_cookie required for auth_type != 0") ikm2 = auth_cookie + subcredential + published_bytes key2 = bytearray(32) iv2 = bytearray(32) _HKDF.calculate(inner_salt, ikm2, "ELS2_L2K", out=key2, out2=iv2) layer2_ciphertext = ChaCha20.encrypt(bytes(key2), bytes(iv2[:12]), inner_bytes) # Build layer 1 plaintext: auth_flag(1) + inner_salt(32) + layer2_ciphertext if auth_type == 0: auth_section = bytes([0x00]) else: # Simplified: store auth_type flag only (full per-client auth # with encrypted cookies would go here in a production impl) auth_section = bytes([auth_type & 0x0F]) layer1_plaintext = auth_section + inner_salt + layer2_ciphertext # Layer 1: encrypt with outer salt outer_salt = os.urandom(32) ikm1 = subcredential + published_bytes key1 = bytearray(32) iv1 = bytearray(32) _HKDF.calculate(outer_salt, ikm1, "ELS2_L1K", out=key1, out2=iv1) layer1_ciphertext = ChaCha20.encrypt(bytes(key1), bytes(iv1[:12]), layer1_plaintext) return outer_salt + layer1_ciphertext @staticmethod def decrypt_inner( encrypted_data: bytes, subcredential: bytes, published: int, auth_type: int = 0, auth_cookie: bytes | None = None, ) -> bytes: """Decrypt the encrypted_data field to recover inner LS bytes. Args: encrypted_data: outer_salt(32) + layer1_ciphertext subcredential: 32-byte subcredential published: published timestamp (seconds) auth_type: 0=none, 1=DH, 2=PSK auth_cookie: 32-byte cookie (required if auth_type != 0) Returns: Decrypted inner LS bytes. """ published_bytes = struct.pack("!I", published) # Layer 1: decrypt outer_salt = encrypted_data[:32] layer1_ciphertext = encrypted_data[32:] ikm1 = subcredential + published_bytes key1 = bytearray(32) iv1 = bytearray(32) _HKDF.calculate(outer_salt, ikm1, "ELS2_L1K", out=key1, out2=iv1) layer1_plaintext = ChaCha20.decrypt(bytes(key1), bytes(iv1[:12]), layer1_ciphertext) # Parse layer 1 plaintext: auth_flag(1) + inner_salt(32) + layer2_ciphertext auth_flag = layer1_plaintext[0] offset = 1 # Skip auth section (simplified — in full impl, would parse per-client entries) inner_salt = layer1_plaintext[offset:offset + 32] offset += 32 layer2_ciphertext = layer1_plaintext[offset:] # Layer 2: decrypt if auth_type == 0: ikm2 = subcredential + published_bytes else: if auth_cookie is None: raise ValueError("auth_cookie required for auth_type != 0") ikm2 = auth_cookie + subcredential + published_bytes key2 = bytearray(32) iv2 = bytearray(32) _HKDF.calculate(inner_salt, ikm2, "ELS2_L2K", out=key2, out2=iv2) return ChaCha20.decrypt(bytes(key2), bytes(iv2[:12]), layer2_ciphertext) def __repr__(self) -> str: return ( f"EncryptedLeaseSet(sig_type={self._blinded_sig_type.name}, " f"enc_len={len(self._encrypted_data)})" )