A Python port of the Invisible Internet Project (I2P)
at main 90 lines 3.0 kB view raw
1"""BlindData — container for blinded destination state. 2 3Ported from net.i2p.data.BlindData. 4 5Holds the unblinded signing public key and configuration for 6computing blinded keys, credentials, and subcredentials. 7""" 8 9from __future__ import annotations 10 11import enum 12import hashlib 13from datetime import datetime, timezone 14 15from i2p_crypto.dsa import SigType 16from i2p_data.key_types import SigningPublicKey 17 18 19class AuthType(enum.IntEnum): 20 """EncryptedLeaseSet authentication type.""" 21 NONE = 0 22 DH = 1 23 PSK = 3 24 25 26class BlindData: 27 """Container for blinded destination state with daily key caching.""" 28 29 def __init__( 30 self, 31 sig_type_in: SigType, 32 sig_type_out: SigType, 33 unblinded_spk: SigningPublicKey, 34 secret: str | None = None, 35 auth_type: AuthType = AuthType.NONE, 36 auth_private_key: bytes | None = None, 37 ) -> None: 38 self.sig_type_in = sig_type_in 39 self.sig_type_out = sig_type_out 40 self.unblinded_spk = unblinded_spk 41 self.secret = secret 42 self.auth_type = auth_type 43 self.auth_private_key = auth_private_key 44 self._cached_blinded: dict[str, SigningPublicKey] = {} 45 46 def get_blinded_pubkey(self, now_sec: int | None = None) -> SigningPublicKey: 47 """Get the blinded public key for the current (or given) day.""" 48 from i2p_crypto.blinding import Blinding 49 50 if now_sec is None: 51 now_sec = int(datetime.now(timezone.utc).timestamp()) 52 53 dt = datetime.fromtimestamp(now_sec, tz=timezone.utc) 54 date_key = dt.strftime("%Y%m%d") 55 56 if date_key in self._cached_blinded: 57 return self._cached_blinded[date_key] 58 59 alpha = Blinding.generate_alpha( 60 self.unblinded_spk, self.sig_type_in, self.sig_type_out, 61 now_sec, self.secret, 62 ) 63 blinded = Blinding.blind(self.unblinded_spk, alpha) 64 self._cached_blinded[date_key] = blinded 65 return blinded 66 67 def get_credential(self) -> bytes: 68 """Compute the credential: SHA-256("credential" + spk + sig_types).""" 69 data = ( 70 b"credential" 71 + self.unblinded_spk.to_bytes() 72 + self.sig_type_in.code.to_bytes(2, "big") 73 + self.sig_type_out.code.to_bytes(2, "big") 74 ) 75 return hashlib.sha256(data).digest() 76 77 def get_subcredential(self, now_sec: int | None = None) -> bytes: 78 """Compute the subcredential (changes daily with blinded key).""" 79 blinded_pub = self.get_blinded_pubkey(now_sec) 80 data = b"subcredential" + self.get_credential() + blinded_pub.to_bytes() 81 return hashlib.sha256(data).digest() 82 83 def get_destination_hash(self, now_sec: int | None = None) -> bytes: 84 """Compute the blinded destination hash for NetDB lookup.""" 85 blinded_pub = self.get_blinded_pubkey(now_sec) 86 data = ( 87 self.sig_type_out.code.to_bytes(2, "big") 88 + blinded_pub.to_bytes() 89 ) 90 return hashlib.sha256(data).digest()