A Python port of the Invisible Internet Project (I2P)
at main 122 lines 4.7 kB view raw
1"""Destination — I2P client/service endpoint. 2 3Ported from net.i2p.data.Destination. 4 5A Destination is a KeysAndCert that represents a client or service 6endpoint in the I2P network. It adds Base64 and Base32 encoding 7for human-readable addressing. 8""" 9 10from __future__ import annotations 11 12import base64 13import hashlib 14 15from i2p_data.data_helper import to_base64 as _i2p_b64enc, from_base64 as _i2p_b64dec 16from i2p_data.keys_and_cert import KeysAndCert 17 18 19class Destination(KeysAndCert): 20 """I2P Destination — endpoint identity with Base64/Base32 support.""" 21 22 __slots__ = ("_cached_b64", "_cached_b32") 23 24 def __init__(self, public_key, signing_public_key, certificate, 25 raw: bytes | None = None) -> None: 26 super().__init__(public_key, signing_public_key, certificate, raw=raw) 27 self._cached_b64: str | None = None 28 self._cached_b32: str | None = None 29 30 def to_base64(self) -> str: 31 """Encode to I2P-style Base64 (I2P alphabet: - instead of +, ~ instead of /).""" 32 if self._cached_b64 is None: 33 self._cached_b64 = _i2p_b64enc(self.to_bytes()) 34 return self._cached_b64 35 36 @classmethod 37 def from_base64(cls, s: str) -> "Destination": 38 """Decode from I2P-style Base64.""" 39 data = _i2p_b64dec(s) 40 return cls.from_bytes(data) 41 42 def to_base32(self) -> str: 43 """Get the .b32.i2p address (SHA-256 of destination, base32-encoded).""" 44 if self._cached_b32 is None: 45 h = hashlib.sha256(self.to_bytes()).digest() 46 b32 = base64.b32encode(h).rstrip(b"=").decode("ascii").lower() 47 self._cached_b32 = f"{b32}.b32.i2p" 48 return self._cached_b32 49 50 @classmethod 51 def from_stream(cls, stream) -> "Destination": 52 """Read a Destination from a stream (self-delimiting). 53 54 Reads 256 + 128 bytes for key areas, then the certificate 55 (which is self-delimiting via its 3-byte header). 56 """ 57 import io 58 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 59 from i2p_data.certificate import Certificate, KeyCertificate 60 from i2p_crypto.dsa import SigType 61 62 pub_area = stream.read(cls.PUBKEY_AREA_SIZE) 63 if len(pub_area) != cls.PUBKEY_AREA_SIZE: 64 raise ValueError(f"Expected {cls.PUBKEY_AREA_SIZE} pub bytes, got {len(pub_area)}") 65 sig_area = stream.read(cls.SIGKEY_AREA_SIZE) 66 if len(sig_area) != cls.SIGKEY_AREA_SIZE: 67 raise ValueError(f"Expected {cls.SIGKEY_AREA_SIZE} sig bytes, got {len(sig_area)}") 68 69 cert = Certificate.from_stream(stream) 70 71 if isinstance(cert, KeyCertificate): 72 enc_type = cert.get_enc_type() or EncType.ELGAMAL 73 sig_type = cert.get_sig_type() or SigType.DSA_SHA1 74 else: 75 enc_type = EncType.ELGAMAL 76 sig_type = SigType.DSA_SHA1 77 78 pub_len = enc_type.pubkey_len 79 pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type) 80 81 sig_len = sig_type.pubkey_len 82 sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type) 83 84 raw = pub_area + sig_area + cert.to_bytes() 85 return cls(pub_key, sig_key, cert, raw=raw) 86 87 @classmethod 88 def from_bytes(cls, data: bytes) -> "Destination": 89 """Deserialize from wire format.""" 90 import io 91 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 92 from i2p_data.certificate import Certificate, KeyCertificate 93 from i2p_crypto.dsa import SigType 94 95 if len(data) < 387: 96 raise ValueError(f"Destination requires at least 387 bytes, got {len(data)}") 97 98 pub_area = data[:cls.PUBKEY_AREA_SIZE] 99 sig_area = data[cls.PUBKEY_AREA_SIZE:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE] 100 cert_data = data[cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE:] 101 102 cert = Certificate.from_bytes(cert_data) 103 104 if isinstance(cert, KeyCertificate): 105 enc_type = cert.get_enc_type() or EncType.ELGAMAL 106 sig_type = cert.get_sig_type() or SigType.DSA_SHA1 107 else: 108 enc_type = EncType.ELGAMAL 109 sig_type = SigType.DSA_SHA1 110 111 pub_len = enc_type.pubkey_len 112 pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type) 113 114 sig_len = sig_type.pubkey_len 115 sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type) 116 117 raw = data[:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE + len(cert)] 118 return cls(pub_key, sig_key, cert, raw=raw) 119 120 def __repr__(self) -> str: 121 b32 = self.to_base32() 122 return f"Destination({b32[:16]}...)"