"""BlindData — container for blinded destination state. Ported from net.i2p.data.BlindData. Holds the unblinded signing public key and configuration for computing blinded keys, credentials, and subcredentials. """ from __future__ import annotations import enum import hashlib from datetime import datetime, timezone from i2p_crypto.dsa import SigType from i2p_data.key_types import SigningPublicKey class AuthType(enum.IntEnum): """EncryptedLeaseSet authentication type.""" NONE = 0 DH = 1 PSK = 3 class BlindData: """Container for blinded destination state with daily key caching.""" def __init__( self, sig_type_in: SigType, sig_type_out: SigType, unblinded_spk: SigningPublicKey, secret: str | None = None, auth_type: AuthType = AuthType.NONE, auth_private_key: bytes | None = None, ) -> None: self.sig_type_in = sig_type_in self.sig_type_out = sig_type_out self.unblinded_spk = unblinded_spk self.secret = secret self.auth_type = auth_type self.auth_private_key = auth_private_key self._cached_blinded: dict[str, SigningPublicKey] = {} def get_blinded_pubkey(self, now_sec: int | None = None) -> SigningPublicKey: """Get the blinded public key for the current (or given) day.""" from i2p_crypto.blinding import Blinding if now_sec is None: now_sec = int(datetime.now(timezone.utc).timestamp()) dt = datetime.fromtimestamp(now_sec, tz=timezone.utc) date_key = dt.strftime("%Y%m%d") if date_key in self._cached_blinded: return self._cached_blinded[date_key] alpha = Blinding.generate_alpha( self.unblinded_spk, self.sig_type_in, self.sig_type_out, now_sec, self.secret, ) blinded = Blinding.blind(self.unblinded_spk, alpha) self._cached_blinded[date_key] = blinded return blinded def get_credential(self) -> bytes: """Compute the credential: SHA-256("credential" + spk + sig_types).""" data = ( b"credential" + self.unblinded_spk.to_bytes() + self.sig_type_in.code.to_bytes(2, "big") + self.sig_type_out.code.to_bytes(2, "big") ) return hashlib.sha256(data).digest() def get_subcredential(self, now_sec: int | None = None) -> bytes: """Compute the subcredential (changes daily with blinded key).""" blinded_pub = self.get_blinded_pubkey(now_sec) data = b"subcredential" + self.get_credential() + blinded_pub.to_bytes() return hashlib.sha256(data).digest() def get_destination_hash(self, now_sec: int | None = None) -> bytes: """Compute the blinded destination hash for NetDB lookup.""" blinded_pub = self.get_blinded_pubkey(now_sec) data = ( self.sig_type_out.code.to_bytes(2, "big") + blinded_pub.to_bytes() ) return hashlib.sha256(data).digest()