A Python port of the Invisible Internet Project (I2P)
at main 127 lines 4.4 kB view raw
1"""Certificate — I2P certificate type system. 2 3Ported from net.i2p.data.Certificate and net.i2p.data.KeyCertificate. 4 5Wire format: 1 byte type + 2 bytes big-endian payload length + payload 6NULL certificate: type=0, length=0 (3 bytes total: 0x00 0x00 0x00) 7KEY certificate: type=5, payload contains SigType code (2 bytes) + EncType code (2 bytes) + extra key data 8""" 9from __future__ import annotations 10import enum 11import io 12import struct 13from typing import ClassVar 14 15 16class CertificateType(enum.IntEnum): 17 NULL = 0 18 HASHCASH = 1 19 HIDDEN = 2 20 SIGNED = 3 21 MULTIPLE = 4 22 KEY = 5 23 24 25class Certificate: 26 """I2P Certificate — flexible key type specification.""" 27 NULL: ClassVar[Certificate] 28 __slots__ = ("_type", "_payload") 29 30 def __init__(self, cert_type: CertificateType | int, payload: bytes = b"") -> None: 31 if isinstance(cert_type, int) and not isinstance(cert_type, CertificateType): 32 cert_type = CertificateType(cert_type) 33 self._type = cert_type 34 self._payload = bytes(payload) 35 36 @property 37 def cert_type(self) -> CertificateType: 38 return self._type 39 40 @property 41 def payload(self) -> bytes: 42 return self._payload 43 44 def to_bytes(self) -> bytes: 45 """Serialize: 1 byte type + 2 bytes length + payload.""" 46 return struct.pack("!BH", self._type, len(self._payload)) + self._payload 47 48 @classmethod 49 def from_bytes(cls, data: bytes) -> "Certificate": 50 """Deserialize from bytes.""" 51 stream = io.BytesIO(data) 52 return cls.from_stream(stream) 53 54 @classmethod 55 def from_stream(cls, stream: io.IOBase) -> "Certificate": 56 """Read a certificate from a stream.""" 57 header = stream.read(3) 58 if len(header) != 3: 59 raise ValueError(f"Certificate header requires 3 bytes, got {len(header)}") 60 cert_type, length = struct.unpack("!BH", header) 61 payload = b"" 62 if length > 0: 63 payload = stream.read(length) 64 if len(payload) != length: 65 raise ValueError(f"Expected {length} payload bytes, got {len(payload)}") 66 cert_type_enum = CertificateType(cert_type) 67 if cert_type_enum == CertificateType.KEY: 68 return KeyCertificate(payload) 69 return cls(cert_type_enum, payload) 70 71 def __eq__(self, other: object) -> bool: 72 if not isinstance(other, Certificate): 73 return NotImplemented 74 return self._type == other._type and self._payload == other._payload 75 76 def __hash__(self) -> int: 77 return hash((self._type, self._payload)) 78 79 def __repr__(self) -> str: 80 return f"Certificate(type={self._type.name}, payload={len(self._payload)} bytes)" 81 82 def __len__(self) -> int: 83 """Total serialized length.""" 84 return 3 + len(self._payload) 85 86 87# Class-level NULL constant 88Certificate.NULL = Certificate(CertificateType.NULL) 89 90 91class KeyCertificate(Certificate): 92 """KEY certificate (type=5) with structured access to SigType, EncType, and extra key data. 93 94 Payload format: 2 bytes SigType code + 2 bytes EncType code + extra key data 95 """ 96 97 def __init__(self, payload: bytes) -> None: 98 super().__init__(CertificateType.KEY, payload) 99 if len(payload) < 4: 100 raise ValueError(f"KeyCertificate payload must be >= 4 bytes, got {len(payload)}") 101 102 def get_sig_type_code(self) -> int: 103 """Get the SigType code from the payload.""" 104 return struct.unpack("!H", self._payload[0:2])[0] 105 106 def get_enc_type_code(self) -> int: 107 """Get the EncType code from the payload.""" 108 return struct.unpack("!H", self._payload[2:4])[0] 109 110 def get_sig_type(self): 111 """Get the SigType enum value.""" 112 from i2p_crypto.dsa import SigType 113 return SigType.by_code(self.get_sig_type_code()) 114 115 def get_enc_type(self): 116 """Get the EncType enum value.""" 117 from i2p_data.key_types import EncType 118 return EncType.by_code(self.get_enc_type_code()) 119 120 def get_extra_key_data(self) -> bytes: 121 """Get any extra key data beyond the type codes.""" 122 return self._payload[4:] 123 124 def __repr__(self) -> str: 125 return (f"KeyCertificate(sig_type_code={self.get_sig_type_code()}, " 126 f"enc_type_code={self.get_enc_type_code()}, " 127 f"extra={len(self.get_extra_key_data())} bytes)")