"""Lease and LeaseSet — tunnel authorization data structures. Ported from net.i2p.data.Lease and net.i2p.data.LeaseSet. Lease: 44 bytes = 32 (gateway hash) + 4 (tunnel ID) + 8 (end date ms) LeaseSet: destination + encryption key + signing key + leases + signature """ from __future__ import annotations import io import struct import time class Lease: """A single tunnel authorization. Contains the gateway router's identity hash, the tunnel ID, and the expiration timestamp. """ __slots__ = ("_gateway_hash", "_tunnel_id", "_end_date") SIZE = 44 # 32 + 4 + 8 def __init__(self, gateway_hash: bytes, tunnel_id: int, end_date: int) -> None: if len(gateway_hash) != 32: raise ValueError(f"Gateway hash must be 32 bytes, got {len(gateway_hash)}") self._gateway_hash = gateway_hash self._tunnel_id = tunnel_id self._end_date = end_date @property def gateway_hash(self) -> bytes: return self._gateway_hash @property def tunnel_id(self) -> int: return self._tunnel_id @property def end_date(self) -> int: """Expiration timestamp in milliseconds.""" return self._end_date def is_expired(self, now_ms: int | None = None) -> bool: """Check if this lease has expired.""" if now_ms is None: now_ms = int(time.time() * 1000) return self._end_date <= now_ms def to_bytes(self) -> bytes: """Serialize to 44 bytes.""" return ( self._gateway_hash + struct.pack("!I", self._tunnel_id) + struct.pack("!Q", self._end_date) ) @classmethod def from_bytes(cls, data: bytes) -> "Lease": """Deserialize from 44 bytes.""" if len(data) < cls.SIZE: raise ValueError(f"Lease requires {cls.SIZE} bytes, got {len(data)}") gateway_hash = data[:32] tunnel_id = struct.unpack("!I", data[32:36])[0] end_date = struct.unpack("!Q", data[36:44])[0] return cls(gateway_hash, tunnel_id, end_date) def __eq__(self, other: object) -> bool: if not isinstance(other, Lease): return NotImplemented return (self._gateway_hash == other._gateway_hash and self._tunnel_id == other._tunnel_id and self._end_date == other._end_date) def __hash__(self) -> int: return hash((self._gateway_hash, self._tunnel_id, self._end_date)) def __repr__(self) -> str: return f"Lease(gw={self._gateway_hash[:4].hex()}..., tid={self._tunnel_id})" class LeaseSet: """A set of leases for a destination. Contains the destination identity, an encryption key, a signing key, up to 16 leases, and a signature. """ __slots__ = ("_destination", "_encryption_key", "_signing_key", "_leases", "_signature") MAX_LEASES = 16 def __init__(self, destination, encryption_key, signing_key, leases: list[Lease] | None = None, signature: bytes = b"") -> None: self._destination = destination self._encryption_key = encryption_key self._signing_key = signing_key self._leases = list(leases) if leases else [] self._signature = signature @property def destination(self): return self._destination @property def encryption_key(self): return self._encryption_key @property def signing_key(self): return self._signing_key @property def leases(self) -> list[Lease]: return list(self._leases) @property def signature(self) -> bytes: return self._signature def add_lease(self, lease: Lease) -> None: """Add a lease. Raises ValueError if max leases exceeded.""" if len(self._leases) >= self.MAX_LEASES: raise ValueError(f"Maximum {self.MAX_LEASES} leases allowed") self._leases.append(lease) def get_lease(self, index: int) -> Lease: """Get lease by index.""" return self._leases[index] def lease_count(self) -> int: return len(self._leases) def is_current(self, now_ms: int | None = None) -> bool: """True if any lease is not expired.""" if now_ms is None: now_ms = int(time.time() * 1000) return any(not lease.is_expired(now_ms) for lease in self._leases) def _signable_bytes(self) -> bytes: """Get bytes that are signed.""" buf = io.BytesIO() # Destination buf.write(self._destination.to_bytes()) # Encryption key buf.write(self._encryption_key.to_bytes()) # Signing key buf.write(self._signing_key.to_bytes()) # Number of leases (1 byte) buf.write(struct.pack("!B", len(self._leases))) # Leases for lease in self._leases: buf.write(lease.to_bytes()) return buf.getvalue() def to_bytes(self) -> bytes: """Serialize to wire format.""" return self._signable_bytes() + self._signature def sign(self, private_key: bytes) -> None: """Sign this LeaseSet.""" from i2p_crypto.dsa import DSAEngine sig_type = self._destination.signing_public_key.sig_type self._signature = DSAEngine.sign(self._signable_bytes(), private_key, sig_type) def verify(self) -> bool: """Verify the signature using the destination's signing key.""" if not self._signature: return False from i2p_crypto.dsa import DSAEngine sig_type = self._destination.signing_public_key.sig_type pub_key = self._destination.signing_public_key.to_bytes() return DSAEngine.verify(self._signable_bytes(), self._signature, pub_key, sig_type) def __repr__(self) -> str: return f"LeaseSet(leases={len(self._leases)}, current={self.is_current()})" class Lease2(Lease): """LS2 lease with 4-byte second-resolution timestamps (40 bytes total). Ported from net.i2p.data.Lease2. Wire format: 32 (gateway hash) + 4 (tunnel ID) + 4 (end date seconds) Internal API uses milliseconds for compatibility with Lease. """ SIZE = 40 # 32 + 4 + 4 def to_bytes(self) -> bytes: """Serialize to 40 bytes (end_date stored as seconds on wire).""" return ( self._gateway_hash + struct.pack("!I", self._tunnel_id) + struct.pack("!I", self._end_date // 1000) ) @classmethod def from_bytes(cls, data: bytes) -> "Lease2": """Deserialize from 40 bytes.""" if len(data) < cls.SIZE: raise ValueError(f"Lease2 requires {cls.SIZE} bytes, got {len(data)}") gateway_hash = data[:32] tunnel_id = struct.unpack("!I", data[32:36])[0] end_seconds = struct.unpack("!I", data[36:40])[0] return cls(gateway_hash, tunnel_id, end_seconds * 1000)