A Python port of the Invisible Internet Project (I2P)
at main 159 lines 5.5 kB view raw
1"""Blinding API — generate blinding factors and blind/unblind Ed25519 keys. 2 3Ported from net.i2p.crypto.Blinding. 4 5Used for EncryptedLeaseSet (LS2) where the published signing key is 6blinded so observers cannot link it to the real destination. 7""" 8 9from __future__ import annotations 10 11import hashlib 12import zlib 13from datetime import datetime, timezone 14 15from i2p_crypto.dsa import SigType 16from i2p_crypto.eddsa_blinding import EdDSABlinding 17from i2p_crypto.hkdf import HKDF 18from i2p_data.key_types import SigningPublicKey, SigningPrivateKey 19 20_HKDF = HKDF() 21 22 23class Blinding: 24 """Static methods for I2P destination blinding.""" 25 26 @staticmethod 27 def generate_alpha( 28 dest_spk: SigningPublicKey, 29 sig_type_in: SigType = SigType.EdDSA_SHA512_Ed25519, 30 sig_type_out: SigType = SigType.RedDSA_SHA512_Ed25519, 31 timestamp_sec: int | None = None, 32 secret: str | None = None, 33 ) -> SigningPrivateKey: 34 """Generate the daily blinding factor (alpha) as a scalar. 35 36 Parameters 37 ---------- 38 dest_spk: unblinded Ed25519 signing public key 39 sig_type_in: source signature type (usually Ed25519) 40 sig_type_out: blinded signature type (usually RedDSA) 41 timestamp_sec: seconds since epoch (UTC) 42 secret: optional shared secret string 43 """ 44 if timestamp_sec is None: 45 timestamp_sec = int(datetime.now(timezone.utc).timestamp()) 46 47 # Date string: "yyyyMMdd" UTC 48 dt = datetime.fromtimestamp(timestamp_sec, tz=timezone.utc) 49 date_str = dt.strftime("%Y%m%d").encode("ascii") 50 51 # HKDF input data = date_str [+ secret] 52 data = date_str 53 if secret: 54 data += secret.encode("utf-8") 55 56 # Salt = SHA-256("I2PGenerateAlpha" + spk + sig_type_in(2) + sig_type_out(2)) 57 salt_input = ( 58 b"I2PGenerateAlpha" 59 + dest_spk.to_bytes() 60 + sig_type_in.code.to_bytes(2, "big") 61 + sig_type_out.code.to_bytes(2, "big") 62 ) 63 salt = hashlib.sha256(salt_input).digest() 64 65 # HKDF extract-and-expand to get 64 bytes 66 seed = _HKDF.extract_and_expand(salt, data, b"i2pblinding1", 64) 67 68 # Reduce 64 bytes to 32-byte scalar 69 alpha = EdDSABlinding.reduce(seed) 70 return SigningPrivateKey(alpha, sig_type=sig_type_out) 71 72 @staticmethod 73 def blind( 74 spk: SigningPublicKey, 75 alpha: SigningPrivateKey, 76 ) -> SigningPublicKey: 77 """Blind a public key: blinded = spk + alpha*G.""" 78 alpha_pub = EdDSABlinding.scalar_mult_base(alpha.to_bytes()) 79 blinded = EdDSABlinding.blind_public(spk.to_bytes(), alpha_pub) 80 return SigningPublicKey(blinded, sig_type=SigType.RedDSA_SHA512_Ed25519) 81 82 @staticmethod 83 def unblind( 84 spk_seed: bytes, 85 alpha: SigningPrivateKey, 86 ) -> SigningPrivateKey: 87 """Unblind (derive blinded private key from seed + alpha).""" 88 scalar = EdDSABlinding.seed_to_scalar(spk_seed) 89 blinded_scalar = EdDSABlinding.blind_private(scalar, alpha.to_bytes()) 90 return SigningPrivateKey(blinded_scalar, sig_type=SigType.RedDSA_SHA512_Ed25519) 91 92 @staticmethod 93 def encode_b32( 94 spk: SigningPublicKey, 95 sig_type_in: SigType = SigType.EdDSA_SHA512_Ed25519, 96 sig_type_out: SigType = SigType.RedDSA_SHA512_Ed25519, 97 secret_required: bool = False, 98 per_client_auth: bool = False, 99 ) -> str: 100 """Encode a blinded destination as a .b32.i2p address. 101 102 Format: flag(1) + sig_type_in(1) + sig_type_out(1) + spk(32) 103 First 3 bytes XORed with CRC32 of bytes[3:35] for obfuscation. 104 """ 105 flags = 0x00 106 if secret_required: 107 flags |= 0x02 108 if per_client_auth: 109 flags |= 0x04 110 111 raw = bytearray(35) 112 raw[0] = flags 113 raw[1] = sig_type_in.code & 0xFF 114 raw[2] = sig_type_out.code & 0xFF 115 raw[3:35] = spk.to_bytes()[:32] 116 117 # CRC32 of the key portion 118 crc = zlib.crc32(bytes(raw[3:35])) & 0xFFFFFFFF 119 crc_bytes = crc.to_bytes(4, "big") 120 121 # XOR first 3 bytes with CRC bytes[1:4] 122 raw[0] ^= crc_bytes[1] 123 raw[1] ^= crc_bytes[2] 124 raw[2] ^= crc_bytes[3] 125 126 import base64 127 b32 = base64.b32encode(bytes(raw)).rstrip(b"=").decode("ascii").lower() 128 return f"{b32}.b32.i2p" 129 130 @staticmethod 131 def decode_b32(address: str) -> tuple[SigType, SigType, SigningPublicKey, int]: 132 """Decode a blinded .b32.i2p address. 133 134 Returns (sig_type_in, sig_type_out, spk, flags). 135 """ 136 import base64 137 hostname = address.removesuffix(".b32.i2p") 138 # Add padding 139 padding = (8 - len(hostname) % 8) % 8 140 b32_padded = hostname.upper() + "=" * padding 141 raw = bytearray(base64.b32decode(b32_padded)) 142 143 if len(raw) < 35: 144 raise ValueError(f"Blinded b32 too short: {len(raw)} bytes") 145 146 # Reverse CRC XOR 147 crc = zlib.crc32(bytes(raw[3:35])) & 0xFFFFFFFF 148 crc_bytes = crc.to_bytes(4, "big") 149 raw[0] ^= crc_bytes[1] 150 raw[1] ^= crc_bytes[2] 151 raw[2] ^= crc_bytes[3] 152 153 flags = raw[0] 154 sig_type_in = SigType.by_code(raw[1]) 155 sig_type_out = SigType.by_code(raw[2]) 156 spk = SigningPublicKey(bytes(raw[3:35]), sig_type=sig_type_in) 157 158 assert sig_type_in is not None and sig_type_out is not None 159 return sig_type_in, sig_type_out, spk, flags