A Python port of the Invisible Internet Project (I2P)
1"""Destination — I2P client/service endpoint.
2
3Ported from net.i2p.data.Destination.
4
5A Destination is a KeysAndCert that represents a client or service
6endpoint in the I2P network. It adds Base64 and Base32 encoding
7for human-readable addressing.
8"""
9
10from __future__ import annotations
11
12import base64
13import hashlib
14
15from i2p_data.data_helper import to_base64 as _i2p_b64enc, from_base64 as _i2p_b64dec
16from i2p_data.keys_and_cert import KeysAndCert
17
18
19class Destination(KeysAndCert):
20 """I2P Destination — endpoint identity with Base64/Base32 support."""
21
22 __slots__ = ("_cached_b64", "_cached_b32")
23
24 def __init__(self, public_key, signing_public_key, certificate,
25 raw: bytes | None = None) -> None:
26 super().__init__(public_key, signing_public_key, certificate, raw=raw)
27 self._cached_b64: str | None = None
28 self._cached_b32: str | None = None
29
30 def to_base64(self) -> str:
31 """Encode to I2P-style Base64 (I2P alphabet: - instead of +, ~ instead of /)."""
32 if self._cached_b64 is None:
33 self._cached_b64 = _i2p_b64enc(self.to_bytes())
34 return self._cached_b64
35
36 @classmethod
37 def from_base64(cls, s: str) -> "Destination":
38 """Decode from I2P-style Base64."""
39 data = _i2p_b64dec(s)
40 return cls.from_bytes(data)
41
42 def to_base32(self) -> str:
43 """Get the .b32.i2p address (SHA-256 of destination, base32-encoded)."""
44 if self._cached_b32 is None:
45 h = hashlib.sha256(self.to_bytes()).digest()
46 b32 = base64.b32encode(h).rstrip(b"=").decode("ascii").lower()
47 self._cached_b32 = f"{b32}.b32.i2p"
48 return self._cached_b32
49
50 @classmethod
51 def from_stream(cls, stream) -> "Destination":
52 """Read a Destination from a stream (self-delimiting).
53
54 Reads 256 + 128 bytes for key areas, then the certificate
55 (which is self-delimiting via its 3-byte header).
56 """
57 import io
58 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType
59 from i2p_data.certificate import Certificate, KeyCertificate
60 from i2p_crypto.dsa import SigType
61
62 pub_area = stream.read(cls.PUBKEY_AREA_SIZE)
63 if len(pub_area) != cls.PUBKEY_AREA_SIZE:
64 raise ValueError(f"Expected {cls.PUBKEY_AREA_SIZE} pub bytes, got {len(pub_area)}")
65 sig_area = stream.read(cls.SIGKEY_AREA_SIZE)
66 if len(sig_area) != cls.SIGKEY_AREA_SIZE:
67 raise ValueError(f"Expected {cls.SIGKEY_AREA_SIZE} sig bytes, got {len(sig_area)}")
68
69 cert = Certificate.from_stream(stream)
70
71 if isinstance(cert, KeyCertificate):
72 enc_type = cert.get_enc_type() or EncType.ELGAMAL
73 sig_type = cert.get_sig_type() or SigType.DSA_SHA1
74 else:
75 enc_type = EncType.ELGAMAL
76 sig_type = SigType.DSA_SHA1
77
78 pub_len = enc_type.pubkey_len
79 pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type)
80
81 sig_len = sig_type.pubkey_len
82 sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type)
83
84 raw = pub_area + sig_area + cert.to_bytes()
85 return cls(pub_key, sig_key, cert, raw=raw)
86
87 @classmethod
88 def from_bytes(cls, data: bytes) -> "Destination":
89 """Deserialize from wire format."""
90 import io
91 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType
92 from i2p_data.certificate import Certificate, KeyCertificate
93 from i2p_crypto.dsa import SigType
94
95 if len(data) < 387:
96 raise ValueError(f"Destination requires at least 387 bytes, got {len(data)}")
97
98 pub_area = data[:cls.PUBKEY_AREA_SIZE]
99 sig_area = data[cls.PUBKEY_AREA_SIZE:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE]
100 cert_data = data[cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE:]
101
102 cert = Certificate.from_bytes(cert_data)
103
104 if isinstance(cert, KeyCertificate):
105 enc_type = cert.get_enc_type() or EncType.ELGAMAL
106 sig_type = cert.get_sig_type() or SigType.DSA_SHA1
107 else:
108 enc_type = EncType.ELGAMAL
109 sig_type = SigType.DSA_SHA1
110
111 pub_len = enc_type.pubkey_len
112 pub_key = PublicKey(pub_area[cls.PUBKEY_AREA_SIZE - pub_len:], enc_type)
113
114 sig_len = sig_type.pubkey_len
115 sig_key = SigningPublicKey(sig_area[cls.SIGKEY_AREA_SIZE - sig_len:], sig_type)
116
117 raw = data[:cls.PUBKEY_AREA_SIZE + cls.SIGKEY_AREA_SIZE + len(cert)]
118 return cls(pub_key, sig_key, cert, raw=raw)
119
120 def __repr__(self) -> str:
121 b32 = self.to_base32()
122 return f"Destination({b32[:16]}...)"