"""Certificate — I2P certificate type system. Ported from net.i2p.data.Certificate and net.i2p.data.KeyCertificate. Wire format: 1 byte type + 2 bytes big-endian payload length + payload NULL certificate: type=0, length=0 (3 bytes total: 0x00 0x00 0x00) KEY certificate: type=5, payload contains SigType code (2 bytes) + EncType code (2 bytes) + extra key data """ from __future__ import annotations import enum import io import struct from typing import ClassVar class CertificateType(enum.IntEnum): NULL = 0 HASHCASH = 1 HIDDEN = 2 SIGNED = 3 MULTIPLE = 4 KEY = 5 class Certificate: """I2P Certificate — flexible key type specification.""" NULL: ClassVar[Certificate] __slots__ = ("_type", "_payload") def __init__(self, cert_type: CertificateType | int, payload: bytes = b"") -> None: if isinstance(cert_type, int) and not isinstance(cert_type, CertificateType): cert_type = CertificateType(cert_type) self._type = cert_type self._payload = bytes(payload) @property def cert_type(self) -> CertificateType: return self._type @property def payload(self) -> bytes: return self._payload def to_bytes(self) -> bytes: """Serialize: 1 byte type + 2 bytes length + payload.""" return struct.pack("!BH", self._type, len(self._payload)) + self._payload @classmethod def from_bytes(cls, data: bytes) -> "Certificate": """Deserialize from bytes.""" stream = io.BytesIO(data) return cls.from_stream(stream) @classmethod def from_stream(cls, stream: io.IOBase) -> "Certificate": """Read a certificate from a stream.""" header = stream.read(3) if len(header) != 3: raise ValueError(f"Certificate header requires 3 bytes, got {len(header)}") cert_type, length = struct.unpack("!BH", header) payload = b"" if length > 0: payload = stream.read(length) if len(payload) != length: raise ValueError(f"Expected {length} payload bytes, got {len(payload)}") cert_type_enum = CertificateType(cert_type) if cert_type_enum == CertificateType.KEY: return KeyCertificate(payload) return cls(cert_type_enum, payload) def __eq__(self, other: object) -> bool: if not isinstance(other, Certificate): return NotImplemented return self._type == other._type and self._payload == other._payload def __hash__(self) -> int: return hash((self._type, self._payload)) def __repr__(self) -> str: return f"Certificate(type={self._type.name}, payload={len(self._payload)} bytes)" def __len__(self) -> int: """Total serialized length.""" return 3 + len(self._payload) # Class-level NULL constant Certificate.NULL = Certificate(CertificateType.NULL) class KeyCertificate(Certificate): """KEY certificate (type=5) with structured access to SigType, EncType, and extra key data. Payload format: 2 bytes SigType code + 2 bytes EncType code + extra key data """ def __init__(self, payload: bytes) -> None: super().__init__(CertificateType.KEY, payload) if len(payload) < 4: raise ValueError(f"KeyCertificate payload must be >= 4 bytes, got {len(payload)}") def get_sig_type_code(self) -> int: """Get the SigType code from the payload.""" return struct.unpack("!H", self._payload[0:2])[0] def get_enc_type_code(self) -> int: """Get the EncType code from the payload.""" return struct.unpack("!H", self._payload[2:4])[0] def get_sig_type(self): """Get the SigType enum value.""" from i2p_crypto.dsa import SigType return SigType.by_code(self.get_sig_type_code()) def get_enc_type(self): """Get the EncType enum value.""" from i2p_data.key_types import EncType return EncType.by_code(self.get_enc_type_code()) def get_extra_key_data(self) -> bytes: """Get any extra key data beyond the type codes.""" return self._payload[4:] def __repr__(self) -> str: return (f"KeyCertificate(sig_type_code={self.get_sig_type_code()}, " f"enc_type_code={self.get_enc_type_code()}, " f"extra={len(self.get_extra_key_data())} bytes)")