A Python port of the Invisible Internet Project (I2P)
at main 130 lines 4.7 kB view raw
1"""KeysAndCert — immutable container combining public key, signing key, and certificate. 2 3Ported from net.i2p.data.KeysAndCert. 4 5Serialized form (387+ bytes): 6- 256 bytes: public key area (ElGamal key or padding + shorter key) 7- 128 bytes: signing key area (DSA key or padding + shorter key) 8- 3+ bytes: certificate 9 10With KEY certificate and shorter keys (e.g., X25519 + EdDSA), the key 11data is right-aligned in each area, and the leading padding bytes are 12included in the certificate as extra key data. 13""" 14 15from __future__ import annotations 16 17import hashlib 18 19 20class KeysAndCert: 21 """Immutable container for public key + signing public key + certificate. 22 23 This is the base class for Destination and RouterIdentity. 24 """ 25 26 __slots__ = ("_public_key", "_signing_public_key", "_certificate", "_cached_hash", "_raw") 27 28 # Standard area sizes in serialized form 29 PUBKEY_AREA_SIZE = 256 30 SIGKEY_AREA_SIZE = 128 31 32 def __init__(self, public_key, signing_public_key, certificate, raw: bytes | None = None) -> None: 33 self._public_key = public_key 34 self._signing_public_key = signing_public_key 35 self._certificate = certificate 36 self._raw = raw 37 self._cached_hash: bytes | None = None 38 39 @property 40 def public_key(self): 41 return self._public_key 42 43 @property 44 def signing_public_key(self): 45 return self._signing_public_key 46 47 @property 48 def certificate(self): 49 return self._certificate 50 51 def to_bytes(self) -> bytes: 52 """Serialize to wire format: 256 + 128 + cert bytes.""" 53 if self._raw is not None: 54 return self._raw 55 56 from i2p_data.key_types import PublicKey, SigningPublicKey 57 from i2p_data.certificate import CertificateType 58 59 pub_data = self._public_key.to_bytes() 60 sig_data = self._signing_public_key.to_bytes() 61 62 # Pad public key area to 256 bytes (right-aligned) 63 if len(pub_data) < self.PUBKEY_AREA_SIZE: 64 pub_area = b"\x00" * (self.PUBKEY_AREA_SIZE - len(pub_data)) + pub_data 65 else: 66 pub_area = pub_data[:self.PUBKEY_AREA_SIZE] 67 68 # Pad signing key area to 128 bytes (right-aligned) 69 if len(sig_data) < self.SIGKEY_AREA_SIZE: 70 sig_area = b"\x00" * (self.SIGKEY_AREA_SIZE - len(sig_data)) + sig_data 71 else: 72 sig_area = sig_data[:self.SIGKEY_AREA_SIZE] 73 74 cert_data = self._certificate.to_bytes() 75 return pub_area + sig_area + cert_data 76 77 @classmethod 78 def from_bytes(cls, data: bytes) -> "KeysAndCert": 79 """Deserialize from wire format.""" 80 import io 81 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 82 from i2p_data.certificate import Certificate, CertificateType, KeyCertificate 83 from i2p_crypto.dsa import SigType 84 85 if len(data) < 387: 86 raise ValueError(f"KeysAndCert requires at least 387 bytes, got {len(data)}") 87 88 pub_area = data[:cls.PUBKEY_AREA_SIZE] 89 sig_area = data[cls.PUBKEY_AREA_SIZE:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE] 90 cert_data = data[cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE:] 91 92 # Parse certificate 93 cert = Certificate.from_bytes(cert_data) 94 95 # Determine key types from certificate 96 if isinstance(cert, KeyCertificate): 97 enc_type = cert.get_enc_type() or EncType.ELGAMAL 98 sig_type = cert.get_sig_type() or SigType.DSA_SHA1 99 else: 100 enc_type = EncType.ELGAMAL 101 sig_type = SigType.DSA_SHA1 102 103 # Extract actual key data (right-aligned in area) 104 pub_len = enc_type.pubkey_len 105 pub_key_data = pub_area[cls.PUBKEY_AREA_SIZE - pub_len:] 106 pub_key = PublicKey(pub_key_data, enc_type) 107 108 sig_len = sig_type.pubkey_len 109 sig_key_data = sig_area[cls.SIGKEY_AREA_SIZE - sig_len:] 110 sig_key = SigningPublicKey(sig_key_data, sig_type) 111 112 raw = data[:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE + len(cert)] 113 return cls(pub_key, sig_key, cert, raw=raw) 114 115 def hash(self) -> bytes: 116 """SHA-256 hash of the serialized form (cached).""" 117 if self._cached_hash is None: 118 self._cached_hash = hashlib.sha256(self.to_bytes()).digest() 119 return self._cached_hash 120 121 def __eq__(self, other: object) -> bool: 122 if not isinstance(other, KeysAndCert): 123 return NotImplemented 124 return self.to_bytes() == other.to_bytes() 125 126 def __hash__(self) -> int: 127 return int.from_bytes(self.hash()[:4], "big") 128 129 def __repr__(self) -> str: 130 return f"{self.__class__.__name__}(hash={self.hash()[:4].hex()}...)"