A Python port of the Invisible Internet Project (I2P)
1"""BlindData — container for blinded destination state.
2
3Ported from net.i2p.data.BlindData.
4
5Holds the unblinded signing public key and configuration for
6computing blinded keys, credentials, and subcredentials.
7"""
8
9from __future__ import annotations
10
11import enum
12import hashlib
13from datetime import datetime, timezone
14
15from i2p_crypto.dsa import SigType
16from i2p_data.key_types import SigningPublicKey
17
18
19class AuthType(enum.IntEnum):
20 """EncryptedLeaseSet authentication type."""
21 NONE = 0
22 DH = 1
23 PSK = 3
24
25
26class BlindData:
27 """Container for blinded destination state with daily key caching."""
28
29 def __init__(
30 self,
31 sig_type_in: SigType,
32 sig_type_out: SigType,
33 unblinded_spk: SigningPublicKey,
34 secret: str | None = None,
35 auth_type: AuthType = AuthType.NONE,
36 auth_private_key: bytes | None = None,
37 ) -> None:
38 self.sig_type_in = sig_type_in
39 self.sig_type_out = sig_type_out
40 self.unblinded_spk = unblinded_spk
41 self.secret = secret
42 self.auth_type = auth_type
43 self.auth_private_key = auth_private_key
44 self._cached_blinded: dict[str, SigningPublicKey] = {}
45
46 def get_blinded_pubkey(self, now_sec: int | None = None) -> SigningPublicKey:
47 """Get the blinded public key for the current (or given) day."""
48 from i2p_crypto.blinding import Blinding
49
50 if now_sec is None:
51 now_sec = int(datetime.now(timezone.utc).timestamp())
52
53 dt = datetime.fromtimestamp(now_sec, tz=timezone.utc)
54 date_key = dt.strftime("%Y%m%d")
55
56 if date_key in self._cached_blinded:
57 return self._cached_blinded[date_key]
58
59 alpha = Blinding.generate_alpha(
60 self.unblinded_spk, self.sig_type_in, self.sig_type_out,
61 now_sec, self.secret,
62 )
63 blinded = Blinding.blind(self.unblinded_spk, alpha)
64 self._cached_blinded[date_key] = blinded
65 return blinded
66
67 def get_credential(self) -> bytes:
68 """Compute the credential: SHA-256("credential" + spk + sig_types)."""
69 data = (
70 b"credential"
71 + self.unblinded_spk.to_bytes()
72 + self.sig_type_in.code.to_bytes(2, "big")
73 + self.sig_type_out.code.to_bytes(2, "big")
74 )
75 return hashlib.sha256(data).digest()
76
77 def get_subcredential(self, now_sec: int | None = None) -> bytes:
78 """Compute the subcredential (changes daily with blinded key)."""
79 blinded_pub = self.get_blinded_pubkey(now_sec)
80 data = b"subcredential" + self.get_credential() + blinded_pub.to_bytes()
81 return hashlib.sha256(data).digest()
82
83 def get_destination_hash(self, now_sec: int | None = None) -> bytes:
84 """Compute the blinded destination hash for NetDB lookup."""
85 blinded_pub = self.get_blinded_pubkey(now_sec)
86 data = (
87 self.sig_type_out.code.to_bytes(2, "big")
88 + blinded_pub.to_bytes()
89 )
90 return hashlib.sha256(data).digest()