"""Destination — I2P client/service endpoint. Ported from net.i2p.data.Destination. A Destination is a KeysAndCert that represents a client or service endpoint in the I2P network. It adds Base64 and Base32 encoding for human-readable addressing. """ from __future__ import annotations import base64 import hashlib from i2p_data.data_helper import to_base64 as _i2p_b64enc, from_base64 as _i2p_b64dec from i2p_data.keys_and_cert import KeysAndCert class Destination(KeysAndCert): """I2P Destination — endpoint identity with Base64/Base32 support.""" __slots__ = ("_cached_b64", "_cached_b32") def __init__(self, public_key, signing_public_key, certificate, raw: bytes | None = None) -> None: super().__init__(public_key, signing_public_key, certificate, raw=raw) self._cached_b64: str | None = None self._cached_b32: str | None = None def to_base64(self) -> str: """Encode to I2P-style Base64 (I2P alphabet: - instead of +, ~ instead of /).""" if self._cached_b64 is None: self._cached_b64 = _i2p_b64enc(self.to_bytes()) return self._cached_b64 @classmethod def from_base64(cls, s: str) -> "Destination": """Decode from I2P-style Base64.""" data = _i2p_b64dec(s) return cls.from_bytes(data) def to_base32(self) -> str: """Get the .b32.i2p address (SHA-256 of destination, base32-encoded).""" if self._cached_b32 is None: h = hashlib.sha256(self.to_bytes()).digest() b32 = base64.b32encode(h).rstrip(b"=").decode("ascii").lower() self._cached_b32 = f"{b32}.b32.i2p" return self._cached_b32 @classmethod def from_stream(cls, stream) -> "Destination": """Read a Destination from a stream (self-delimiting). Reads 256 + 128 bytes for key areas, then the certificate (which is self-delimiting via its 3-byte header). """ import io from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import Certificate, KeyCertificate from i2p_crypto.dsa import SigType pub_area = stream.read(cls.PUBKEY_AREA_SIZE) if len(pub_area) != cls.PUBKEY_AREA_SIZE: raise ValueError(f"Expected {cls.PUBKEY_AREA_SIZE} pub bytes, got {len(pub_area)}") sig_area = stream.read(cls.SIGKEY_AREA_SIZE) if len(sig_area) != cls.SIGKEY_AREA_SIZE: raise ValueError(f"Expected {cls.SIGKEY_AREA_SIZE} sig bytes, got {len(sig_area)}") cert = Certificate.from_stream(stream) if isinstance(cert, KeyCertificate): enc_type = cert.get_enc_type() or EncType.ELGAMAL sig_type = cert.get_sig_type() or SigType.DSA_SHA1 else: enc_type = EncType.ELGAMAL sig_type = SigType.DSA_SHA1 pub_len = enc_type.pubkey_len pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type) sig_len = sig_type.pubkey_len sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type) raw = pub_area + sig_area + cert.to_bytes() return cls(pub_key, sig_key, cert, raw=raw) @classmethod def from_bytes(cls, data: bytes) -> "Destination": """Deserialize from wire format.""" import io from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import Certificate, KeyCertificate from i2p_crypto.dsa import SigType if len(data) < 387: raise ValueError(f"Destination requires at least 387 bytes, got {len(data)}") pub_area = data[:cls.PUBKEY_AREA_SIZE] sig_area = data[cls.PUBKEY_AREA_SIZE:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE] cert_data = data[cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE:] cert = Certificate.from_bytes(cert_data) if isinstance(cert, KeyCertificate): enc_type = cert.get_enc_type() or EncType.ELGAMAL sig_type = cert.get_sig_type() or SigType.DSA_SHA1 else: enc_type = EncType.ELGAMAL sig_type = SigType.DSA_SHA1 pub_len = enc_type.pubkey_len pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type) sig_len = sig_type.pubkey_len sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type) raw = data[:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE + len(cert)] return cls(pub_key, sig_key, cert, raw=raw) def __repr__(self) -> str: b32 = self.to_base32() return f"Destination({b32[:16]}...)"