A Python port of the Invisible Internet Project (I2P)
1"""Certificate — I2P certificate type system.
2
3Ported from net.i2p.data.Certificate and net.i2p.data.KeyCertificate.
4
5Wire format: 1 byte type + 2 bytes big-endian payload length + payload
6NULL certificate: type=0, length=0 (3 bytes total: 0x00 0x00 0x00)
7KEY certificate: type=5, payload contains SigType code (2 bytes) + EncType code (2 bytes) + extra key data
8"""
9from __future__ import annotations
10import enum
11import io
12import struct
13from typing import ClassVar
14
15
16class CertificateType(enum.IntEnum):
17 NULL = 0
18 HASHCASH = 1
19 HIDDEN = 2
20 SIGNED = 3
21 MULTIPLE = 4
22 KEY = 5
23
24
25class Certificate:
26 """I2P Certificate — flexible key type specification."""
27 NULL: ClassVar[Certificate]
28 __slots__ = ("_type", "_payload")
29
30 def __init__(self, cert_type: CertificateType | int, payload: bytes = b"") -> None:
31 if isinstance(cert_type, int) and not isinstance(cert_type, CertificateType):
32 cert_type = CertificateType(cert_type)
33 self._type = cert_type
34 self._payload = bytes(payload)
35
36 @property
37 def cert_type(self) -> CertificateType:
38 return self._type
39
40 @property
41 def payload(self) -> bytes:
42 return self._payload
43
44 def to_bytes(self) -> bytes:
45 """Serialize: 1 byte type + 2 bytes length + payload."""
46 return struct.pack("!BH", self._type, len(self._payload)) + self._payload
47
48 @classmethod
49 def from_bytes(cls, data: bytes) -> "Certificate":
50 """Deserialize from bytes."""
51 stream = io.BytesIO(data)
52 return cls.from_stream(stream)
53
54 @classmethod
55 def from_stream(cls, stream: io.IOBase) -> "Certificate":
56 """Read a certificate from a stream."""
57 header = stream.read(3)
58 if len(header) != 3:
59 raise ValueError(f"Certificate header requires 3 bytes, got {len(header)}")
60 cert_type, length = struct.unpack("!BH", header)
61 payload = b""
62 if length > 0:
63 payload = stream.read(length)
64 if len(payload) != length:
65 raise ValueError(f"Expected {length} payload bytes, got {len(payload)}")
66 cert_type_enum = CertificateType(cert_type)
67 if cert_type_enum == CertificateType.KEY:
68 return KeyCertificate(payload)
69 return cls(cert_type_enum, payload)
70
71 def __eq__(self, other: object) -> bool:
72 if not isinstance(other, Certificate):
73 return NotImplemented
74 return self._type == other._type and self._payload == other._payload
75
76 def __hash__(self) -> int:
77 return hash((self._type, self._payload))
78
79 def __repr__(self) -> str:
80 return f"Certificate(type={self._type.name}, payload={len(self._payload)} bytes)"
81
82 def __len__(self) -> int:
83 """Total serialized length."""
84 return 3 + len(self._payload)
85
86
87# Class-level NULL constant
88Certificate.NULL = Certificate(CertificateType.NULL)
89
90
91class KeyCertificate(Certificate):
92 """KEY certificate (type=5) with structured access to SigType, EncType, and extra key data.
93
94 Payload format: 2 bytes SigType code + 2 bytes EncType code + extra key data
95 """
96
97 def __init__(self, payload: bytes) -> None:
98 super().__init__(CertificateType.KEY, payload)
99 if len(payload) < 4:
100 raise ValueError(f"KeyCertificate payload must be >= 4 bytes, got {len(payload)}")
101
102 def get_sig_type_code(self) -> int:
103 """Get the SigType code from the payload."""
104 return struct.unpack("!H", self._payload[0:2])[0]
105
106 def get_enc_type_code(self) -> int:
107 """Get the EncType code from the payload."""
108 return struct.unpack("!H", self._payload[2:4])[0]
109
110 def get_sig_type(self):
111 """Get the SigType enum value."""
112 from i2p_crypto.dsa import SigType
113 return SigType.by_code(self.get_sig_type_code())
114
115 def get_enc_type(self):
116 """Get the EncType enum value."""
117 from i2p_data.key_types import EncType
118 return EncType.by_code(self.get_enc_type_code())
119
120 def get_extra_key_data(self) -> bytes:
121 """Get any extra key data beyond the type codes."""
122 return self._payload[4:]
123
124 def __repr__(self) -> str:
125 return (f"KeyCertificate(sig_type_code={self.get_sig_type_code()}, "
126 f"enc_type_code={self.get_enc_type_code()}, "
127 f"extra={len(self.get_extra_key_data())} bytes)")