"""LeaseSet2 (Type 3) — modern lease set with multiple encryption keys. Ported from net.i2p.data.LeaseSet2. Wire format: HEADER: Destination + published(4) + expires_offset(2) + flags(2) + [offline_block] BODY: options_len(2) + [options] + num_enc_keys(1) + enc_keys + num_leases(1) + leases SIGNATURE: variable (from sig type) Signature covers: type_byte(0x03) + header + body """ from __future__ import annotations import io import struct from dataclasses import dataclass from typing import TYPE_CHECKING if TYPE_CHECKING: from i2p_crypto.dsa import SigType from i2p_data.destination import Destination from i2p_data.lease import Lease2 # --------------------------------------------------------------------------- # OfflineBlock # --------------------------------------------------------------------------- @dataclass class OfflineBlock: """Offline signing block — delegates signing to a transient key. Wire format: transient_expires (uint32, seconds) transient_sig_type (uint16) transient_spk (variable, from sig type) offline_signature (variable, from dest's sig type) The offline_signature signs: published(4) + transient_expires(4) + transient_sig_type(2) + transient_spk """ transient_expires: int transient_sig_type: "SigType" transient_spk: bytes offline_signature: bytes def to_bytes(self) -> bytes: """Serialize to wire format (without leading published timestamp).""" return ( struct.pack("!I", self.transient_expires) + struct.pack("!H", self.transient_sig_type.code) + self.transient_spk + self.offline_signature ) @classmethod def from_stream(cls, stream: io.IOBase, dest_sig_type, published: int) -> "OfflineBlock": """Read an OfflineBlock from a stream. Args: stream: byte stream positioned after the flags field dest_sig_type: SigType of the destination (for offline_signature length) published: the published timestamp (needed for verify) """ from i2p_crypto.dsa import SigType raw = stream.read(4) if len(raw) != 4: raise ValueError("Truncated offline block (transient_expires)") transient_expires = struct.unpack("!I", raw)[0] raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated offline block (transient_sig_type)") trans_sig_code = struct.unpack("!H", raw)[0] trans_sig_type = SigType.by_code(trans_sig_code) if trans_sig_type is None: raise ValueError(f"Unknown transient sig type code: {trans_sig_code}") spk_len = trans_sig_type.pubkey_len transient_spk = stream.read(spk_len) if len(transient_spk) != spk_len: raise ValueError(f"Truncated offline block (transient_spk), expected {spk_len}") sig_len = dest_sig_type.sig_len offline_sig = stream.read(sig_len) if len(offline_sig) != sig_len: raise ValueError(f"Truncated offline block (signature), expected {sig_len}") return cls( transient_expires=transient_expires, transient_sig_type=trans_sig_type, transient_spk=transient_spk, offline_signature=offline_sig, ) def verify(self, published: int, dest_pub_key: bytes, dest_sig_type) -> bool: """Verify the offline signature using the destination's signing key. The signed payload is: published(4) + transient_expires(4) + transient_sig_type(2) + transient_spk """ from i2p_crypto.dsa import DSAEngine payload = ( struct.pack("!I", published) + struct.pack("!I", self.transient_expires) + struct.pack("!H", self.transient_sig_type.code) + self.transient_spk ) return DSAEngine.verify(payload, self.offline_signature, dest_pub_key, dest_sig_type) # --------------------------------------------------------------------------- # LeaseSet2 # --------------------------------------------------------------------------- class LeaseSet2: """LeaseSet2 (database type 3) — modern lease set. Supports multiple encryption key types, options, and offline signing. """ TYPE = 3 FLAG_OFFLINE_KEYS = 0x0001 FLAG_UNPUBLISHED = 0x0002 FLAG_BLINDED = 0x0004 MAX_LEASES = 16 MAX_ENC_KEYS = 8 __slots__ = ( "_destination", "_published", "_expires", "_flags", "_offline_block", "_options", "_encryption_keys", "_leases", "_signature", ) def __init__( self, destination: Destination, published: int, expires: int, flags: int = 0, encryption_keys: list[tuple[int, bytes]] | None = None, leases: list[Lease2] | None = None, options: dict[str, str] | None = None, offline_block: OfflineBlock | None = None, signature: bytes = b"", ) -> None: self._destination = destination self._published = published self._expires = expires self._flags = flags self._encryption_keys = list(encryption_keys) if encryption_keys else [] self._leases = list(leases) if leases else [] self._options = dict(options) if options else {} self._offline_block = offline_block self._signature = signature # -- Properties --------------------------------------------------------- @property def destination(self) -> Destination: return self._destination @property def published(self) -> int: return self._published @property def expires(self) -> int: return self._expires @property def flags(self) -> int: return self._flags @property def encryption_keys(self) -> list[tuple[int, bytes]]: return list(self._encryption_keys) @property def leases(self) -> list[Lease2]: return list(self._leases) @property def options(self) -> dict[str, str]: return dict(self._options) @property def offline_block(self) -> OfflineBlock | None: return self._offline_block @property def signature(self) -> bytes: return self._signature @property def is_offline(self) -> bool: return bool(self._flags & self.FLAG_OFFLINE_KEYS) @property def is_unpublished(self) -> bool: return bool(self._flags & self.FLAG_UNPUBLISHED) @property def is_blinded(self) -> bool: return bool(self._flags & self.FLAG_BLINDED) def get_signing_key(self) -> bytes: """Return the effective signing public key bytes. If offline, returns the transient SPK; otherwise returns the destination's signing public key. """ if self.is_offline and self._offline_block is not None: return self._offline_block.transient_spk return self._destination.signing_public_key.to_bytes() def _get_signing_sig_type(self): """Return the SigType used for the LS2 signature.""" if self.is_offline and self._offline_block is not None: return self._offline_block.transient_sig_type return self._destination.signing_public_key.sig_type # -- Wire format -------------------------------------------------------- def _header_bytes(self) -> bytes: """Serialize the header portion (destination through offline block).""" buf = io.BytesIO() buf.write(self._destination.to_bytes()) buf.write(struct.pack("!I", self._published)) buf.write(struct.pack("!H", self._expires - self._published)) buf.write(struct.pack("!H", self._flags)) if self.is_offline and self._offline_block is not None: buf.write(self._offline_block.to_bytes()) return buf.getvalue() def _body_bytes(self) -> bytes: """Serialize the body portion (options + enc keys + leases).""" buf = io.BytesIO() # Options if self._options: opts_str = "".join( f"{k}={v};\n" for k, v in sorted(self._options.items()) ) opts_data = opts_str.encode("utf-8") buf.write(struct.pack("!H", len(opts_data))) buf.write(opts_data) else: buf.write(struct.pack("!H", 0)) # Encryption keys buf.write(struct.pack("!B", len(self._encryption_keys))) for enc_type_code, key_data in self._encryption_keys: buf.write(struct.pack("!H", enc_type_code)) buf.write(struct.pack("!H", len(key_data))) buf.write(key_data) # Leases buf.write(struct.pack("!B", len(self._leases))) for lease in self._leases: buf.write(lease.to_bytes()) return buf.getvalue() def _signable_bytes(self) -> bytes: """Bytes covered by the signature: type_byte + header + body.""" return bytes([self.TYPE]) + self._header_bytes() + self._body_bytes() def to_bytes(self) -> bytes: """Serialize to full wire format: header + body + signature.""" return self._header_bytes() + self._body_bytes() + self._signature def sign(self, private_key_bytes: bytes, sig_type) -> None: """Sign this LeaseSet2. Args: private_key_bytes: raw private key for signing sig_type: SigType to use for signing """ from i2p_crypto.dsa import DSAEngine self._signature = DSAEngine.sign( self._signable_bytes(), private_key_bytes, sig_type ) def verify(self) -> bool: """Verify the LS2 signature. Uses the transient key if offline, else the destination's signing key. """ if not self._signature: return False from i2p_crypto.dsa import DSAEngine sig_type = self._get_signing_sig_type() pub_key = self.get_signing_key() # First verify the offline block if present if self.is_offline and self._offline_block is not None: dest_sig_type = self._destination.signing_public_key.sig_type dest_pub = self._destination.signing_public_key.to_bytes() if not self._offline_block.verify(self._published, dest_pub, dest_sig_type): return False return DSAEngine.verify( self._signable_bytes(), self._signature, pub_key, sig_type ) # -- Deserialization ---------------------------------------------------- @classmethod def from_bytes(cls, data: bytes) -> "LeaseSet2": """Deserialize a LeaseSet2 from wire format.""" from i2p_crypto.dsa import SigType stream = io.BytesIO(data) # Header dest = Destination.from_stream(stream) dest_sig_type = dest.signing_public_key.sig_type raw = stream.read(4) if len(raw) != 4: raise ValueError("Truncated LeaseSet2 (published)") published = struct.unpack("!I", raw)[0] raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated LeaseSet2 (expires_offset)") expires_offset = struct.unpack("!H", raw)[0] expires = published + expires_offset raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated LeaseSet2 (flags)") flags = struct.unpack("!H", raw)[0] # Offline block offline_block = None if flags & cls.FLAG_OFFLINE_KEYS: offline_block = OfflineBlock.from_stream(stream, dest_sig_type, published) # Body — options raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated LeaseSet2 (options_len)") opts_len = struct.unpack("!H", raw)[0] options = {} if opts_len > 0: opts_data = stream.read(opts_len) if len(opts_data) != opts_len: raise ValueError("Truncated LeaseSet2 (options data)") opts_str = opts_data.decode("utf-8") for line in opts_str.strip().split("\n"): line = line.rstrip(";") if "=" in line: k, v = line.split("=", 1) options[k] = v # Encryption keys raw = stream.read(1) if len(raw) != 1: raise ValueError("Truncated LeaseSet2 (num_enc_keys)") num_enc_keys = raw[0] encryption_keys = [] for _ in range(num_enc_keys): raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated LeaseSet2 (enc_type)") enc_type_code = struct.unpack("!H", raw)[0] raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated LeaseSet2 (enc_key_len)") key_len = struct.unpack("!H", raw)[0] key_data = stream.read(key_len) if len(key_data) != key_len: raise ValueError("Truncated LeaseSet2 (enc_key_data)") encryption_keys.append((enc_type_code, key_data)) # Leases raw = stream.read(1) if len(raw) != 1: raise ValueError("Truncated LeaseSet2 (num_leases)") num_leases = raw[0] leases = [] for _ in range(num_leases): lease_data = stream.read(Lease2.SIZE) if len(lease_data) != Lease2.SIZE: raise ValueError("Truncated LeaseSet2 (lease data)") leases.append(Lease2.from_bytes(lease_data)) # Signature — determine sig type for length if offline_block is not None: sig_type = offline_block.transient_sig_type else: sig_type = dest_sig_type sig_len = sig_type.sig_len signature = stream.read(sig_len) if len(signature) != sig_len: raise ValueError(f"Truncated LeaseSet2 (signature), expected {sig_len}") return cls( destination=dest, published=published, expires=expires, flags=flags, encryption_keys=encryption_keys, leases=leases, options=options, offline_block=offline_block, signature=signature, ) def __repr__(self) -> str: return ( f"LeaseSet2(leases={len(self._leases)}, " f"enc_keys={len(self._encryption_keys)}, " f"offline={self.is_offline})" ) # --------------------------------------------------------------------------- # MetaLease (40 bytes) # --------------------------------------------------------------------------- class MetaLease: """A single meta-lease entry in a MetaLeaseSet (Type 7). Wire format (40 bytes): gateway_hash: 32 bytes (hash of the sub-LeaseSet holder) flags: 2 bytes (uint16, reserved — currently 0) ls_type: 1 byte (uint8: 3=LS2, 7=MetaLS2) cost: 1 byte (uint8: routing metric, lower=preferred) end_date: 4 bytes (uint32, seconds since epoch) """ __slots__ = ("_gateway_hash", "_flags", "_ls_type", "_cost", "_end_date") SIZE = 40 # 32 + 2 + 1 + 1 + 4 def __init__( self, gateway_hash: bytes, flags: int = 0, ls_type: int = 3, cost: int = 0, end_date: int = 0, ) -> None: if len(gateway_hash) != 32: raise ValueError(f"Gateway hash must be 32 bytes, got {len(gateway_hash)}") self._gateway_hash = gateway_hash self._flags = flags self._ls_type = ls_type self._cost = cost self._end_date = end_date @property def gateway_hash(self) -> bytes: return self._gateway_hash @property def flags(self) -> int: return self._flags @property def ls_type(self) -> int: return self._ls_type @property def cost(self) -> int: return self._cost @property def end_date(self) -> int: return self._end_date def to_bytes(self) -> bytes: """Serialize to 40 bytes.""" return ( self._gateway_hash + struct.pack("!H", self._flags) + struct.pack("!B", self._ls_type) + struct.pack("!B", self._cost) + struct.pack("!I", self._end_date) ) @classmethod def from_bytes(cls, data: bytes) -> "MetaLease": """Deserialize from 40 bytes.""" if len(data) < cls.SIZE: raise ValueError(f"MetaLease requires {cls.SIZE} bytes, got {len(data)}") gateway_hash = data[:32] flags = struct.unpack("!H", data[32:34])[0] ls_type = data[34] cost = data[35] end_date = struct.unpack("!I", data[36:40])[0] return cls( gateway_hash=gateway_hash, flags=flags, ls_type=ls_type, cost=cost, end_date=end_date, ) def __eq__(self, other: object) -> bool: if not isinstance(other, MetaLease): return NotImplemented return ( self._gateway_hash == other._gateway_hash and self._flags == other._flags and self._ls_type == other._ls_type and self._cost == other._cost and self._end_date == other._end_date ) def __hash__(self) -> int: return hash((self._gateway_hash, self._flags, self._ls_type, self._cost, self._end_date)) def __repr__(self) -> str: return ( f"MetaLease(gw={self._gateway_hash[:4].hex()}..., " f"type={self._ls_type}, cost={self._cost})" ) # --------------------------------------------------------------------------- # MetaLeaseSet (Type 7) # --------------------------------------------------------------------------- class MetaLeaseSet(LeaseSet2): """MetaLeaseSet (database type 7) — hierarchical lease set. Same header as LeaseSet2 (destination + published + expires + flags + [offline block]). Body differs from LeaseSet2: options_len(2) + [options] num_meta_leases(1) + MetaLease * N (40 bytes each) num_revocations(1) + revocation_hash * N (32 bytes each) Key difference: NO encryption keys section. Signature covers: type_byte(7) + header + body """ TYPE = 7 MAX_META_LEASES = 16 MAX_REVOCATIONS = 16 __slots__ = ("_meta_leases", "_revocations") def __init__( self, destination: "Destination", published: int, expires: int, flags: int = 0, meta_leases: list[MetaLease] | None = None, revocations: list[bytes] | None = None, options: dict[str, str] | None = None, offline_block: OfflineBlock | None = None, signature: bytes = b"", ) -> None: # Initialize the parent for header fields — pass empty enc_keys/leases super().__init__( destination=destination, published=published, expires=expires, flags=flags, encryption_keys=None, leases=None, options=options, offline_block=offline_block, signature=signature, ) self._meta_leases = list(meta_leases) if meta_leases else [] self._revocations = list(revocations) if revocations else [] @property def meta_leases(self) -> list[MetaLease]: return list(self._meta_leases) @property def revocations(self) -> list[bytes]: return list(self._revocations) def _body_bytes(self) -> bytes: """Serialize the body: options + meta-leases + revocations (no enc keys).""" buf = io.BytesIO() # Options if self._options: opts_str = "".join( f"{k}={v};\n" for k, v in sorted(self._options.items()) ) opts_data = opts_str.encode("utf-8") buf.write(struct.pack("!H", len(opts_data))) buf.write(opts_data) else: buf.write(struct.pack("!H", 0)) # Meta-leases (no encryption keys section) buf.write(struct.pack("!B", len(self._meta_leases))) for ml in self._meta_leases: buf.write(ml.to_bytes()) # Revocations buf.write(struct.pack("!B", len(self._revocations))) for rev_hash in self._revocations: buf.write(rev_hash) return buf.getvalue() @classmethod def from_bytes(cls, data: bytes) -> "MetaLeaseSet": """Deserialize a MetaLeaseSet from wire format.""" from i2p_crypto.dsa import SigType stream = io.BytesIO(data) # Header (reuse same parsing as LeaseSet2) dest = Destination.from_stream(stream) dest_sig_type = dest.signing_public_key.sig_type raw = stream.read(4) if len(raw) != 4: raise ValueError("Truncated MetaLeaseSet (published)") published = struct.unpack("!I", raw)[0] raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated MetaLeaseSet (expires_offset)") expires_offset = struct.unpack("!H", raw)[0] expires = published + expires_offset raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated MetaLeaseSet (flags)") flags = struct.unpack("!H", raw)[0] offline_block = None if flags & cls.FLAG_OFFLINE_KEYS: offline_block = OfflineBlock.from_stream(stream, dest_sig_type, published) # Body — options raw = stream.read(2) if len(raw) != 2: raise ValueError("Truncated MetaLeaseSet (options_len)") opts_len = struct.unpack("!H", raw)[0] options = {} if opts_len > 0: opts_data = stream.read(opts_len) if len(opts_data) != opts_len: raise ValueError("Truncated MetaLeaseSet (options data)") opts_str = opts_data.decode("utf-8") for line in opts_str.strip().split("\n"): line = line.rstrip(";") if "=" in line: k, v = line.split("=", 1) options[k] = v # Meta-leases (NOT encryption keys) raw = stream.read(1) if len(raw) != 1: raise ValueError("Truncated MetaLeaseSet (num_meta_leases)") num_meta_leases = raw[0] meta_leases = [] for _ in range(num_meta_leases): ml_data = stream.read(MetaLease.SIZE) if len(ml_data) != MetaLease.SIZE: raise ValueError("Truncated MetaLeaseSet (meta lease data)") meta_leases.append(MetaLease.from_bytes(ml_data)) # Revocations raw = stream.read(1) if len(raw) != 1: raise ValueError("Truncated MetaLeaseSet (num_revocations)") num_revocations = raw[0] revocations = [] for _ in range(num_revocations): rev_hash = stream.read(32) if len(rev_hash) != 32: raise ValueError("Truncated MetaLeaseSet (revocation hash)") revocations.append(rev_hash) # Signature if offline_block is not None: sig_type = offline_block.transient_sig_type else: sig_type = dest_sig_type sig_len = sig_type.sig_len signature = stream.read(sig_len) if len(signature) != sig_len: raise ValueError(f"Truncated MetaLeaseSet (signature), expected {sig_len}") return cls( destination=dest, published=published, expires=expires, flags=flags, meta_leases=meta_leases, revocations=revocations, options=options, offline_block=offline_block, signature=signature, ) def __repr__(self) -> str: return ( f"MetaLeaseSet(meta_leases={len(self._meta_leases)}, " f"revocations={len(self._revocations)}, " f"offline={self.is_offline})" )