A Python port of the Invisible Internet Project (I2P)
at main 217 lines 6.9 kB view raw
1"""Lease and LeaseSet — tunnel authorization data structures. 2 3Ported from net.i2p.data.Lease and net.i2p.data.LeaseSet. 4 5Lease: 44 bytes = 32 (gateway hash) + 4 (tunnel ID) + 8 (end date ms) 6LeaseSet: destination + encryption key + signing key + leases + signature 7""" 8 9from __future__ import annotations 10 11import io 12import struct 13import time 14 15 16class Lease: 17 """A single tunnel authorization. 18 19 Contains the gateway router's identity hash, the tunnel ID, and 20 the expiration timestamp. 21 """ 22 23 __slots__ = ("_gateway_hash", "_tunnel_id", "_end_date") 24 25 SIZE = 44 # 32 + 4 + 8 26 27 def __init__(self, gateway_hash: bytes, tunnel_id: int, end_date: int) -> None: 28 if len(gateway_hash) != 32: 29 raise ValueError(f"Gateway hash must be 32 bytes, got {len(gateway_hash)}") 30 self._gateway_hash = gateway_hash 31 self._tunnel_id = tunnel_id 32 self._end_date = end_date 33 34 @property 35 def gateway_hash(self) -> bytes: 36 return self._gateway_hash 37 38 @property 39 def tunnel_id(self) -> int: 40 return self._tunnel_id 41 42 @property 43 def end_date(self) -> int: 44 """Expiration timestamp in milliseconds.""" 45 return self._end_date 46 47 def is_expired(self, now_ms: int | None = None) -> bool: 48 """Check if this lease has expired.""" 49 if now_ms is None: 50 now_ms = int(time.time() * 1000) 51 return self._end_date <= now_ms 52 53 def to_bytes(self) -> bytes: 54 """Serialize to 44 bytes.""" 55 return ( 56 self._gateway_hash 57 + struct.pack("!I", self._tunnel_id) 58 + struct.pack("!Q", self._end_date) 59 ) 60 61 @classmethod 62 def from_bytes(cls, data: bytes) -> "Lease": 63 """Deserialize from 44 bytes.""" 64 if len(data) < cls.SIZE: 65 raise ValueError(f"Lease requires {cls.SIZE} bytes, got {len(data)}") 66 gateway_hash = data[:32] 67 tunnel_id = struct.unpack("!I", data[32:36])[0] 68 end_date = struct.unpack("!Q", data[36:44])[0] 69 return cls(gateway_hash, tunnel_id, end_date) 70 71 def __eq__(self, other: object) -> bool: 72 if not isinstance(other, Lease): 73 return NotImplemented 74 return (self._gateway_hash == other._gateway_hash 75 and self._tunnel_id == other._tunnel_id 76 and self._end_date == other._end_date) 77 78 def __hash__(self) -> int: 79 return hash((self._gateway_hash, self._tunnel_id, self._end_date)) 80 81 def __repr__(self) -> str: 82 return f"Lease(gw={self._gateway_hash[:4].hex()}..., tid={self._tunnel_id})" 83 84 85class LeaseSet: 86 """A set of leases for a destination. 87 88 Contains the destination identity, an encryption key, a signing key, 89 up to 16 leases, and a signature. 90 """ 91 92 __slots__ = ("_destination", "_encryption_key", "_signing_key", 93 "_leases", "_signature") 94 95 MAX_LEASES = 16 96 97 def __init__(self, destination, encryption_key, signing_key, 98 leases: list[Lease] | None = None, 99 signature: bytes = b"") -> None: 100 self._destination = destination 101 self._encryption_key = encryption_key 102 self._signing_key = signing_key 103 self._leases = list(leases) if leases else [] 104 self._signature = signature 105 106 @property 107 def destination(self): 108 return self._destination 109 110 @property 111 def encryption_key(self): 112 return self._encryption_key 113 114 @property 115 def signing_key(self): 116 return self._signing_key 117 118 @property 119 def leases(self) -> list[Lease]: 120 return list(self._leases) 121 122 @property 123 def signature(self) -> bytes: 124 return self._signature 125 126 def add_lease(self, lease: Lease) -> None: 127 """Add a lease. Raises ValueError if max leases exceeded.""" 128 if len(self._leases) >= self.MAX_LEASES: 129 raise ValueError(f"Maximum {self.MAX_LEASES} leases allowed") 130 self._leases.append(lease) 131 132 def get_lease(self, index: int) -> Lease: 133 """Get lease by index.""" 134 return self._leases[index] 135 136 def lease_count(self) -> int: 137 return len(self._leases) 138 139 def is_current(self, now_ms: int | None = None) -> bool: 140 """True if any lease is not expired.""" 141 if now_ms is None: 142 now_ms = int(time.time() * 1000) 143 return any(not lease.is_expired(now_ms) for lease in self._leases) 144 145 def _signable_bytes(self) -> bytes: 146 """Get bytes that are signed.""" 147 buf = io.BytesIO() 148 149 # Destination 150 buf.write(self._destination.to_bytes()) 151 152 # Encryption key 153 buf.write(self._encryption_key.to_bytes()) 154 155 # Signing key 156 buf.write(self._signing_key.to_bytes()) 157 158 # Number of leases (1 byte) 159 buf.write(struct.pack("!B", len(self._leases))) 160 161 # Leases 162 for lease in self._leases: 163 buf.write(lease.to_bytes()) 164 165 return buf.getvalue() 166 167 def to_bytes(self) -> bytes: 168 """Serialize to wire format.""" 169 return self._signable_bytes() + self._signature 170 171 def sign(self, private_key: bytes) -> None: 172 """Sign this LeaseSet.""" 173 from i2p_crypto.dsa import DSAEngine 174 sig_type = self._destination.signing_public_key.sig_type 175 self._signature = DSAEngine.sign(self._signable_bytes(), private_key, sig_type) 176 177 def verify(self) -> bool: 178 """Verify the signature using the destination's signing key.""" 179 if not self._signature: 180 return False 181 from i2p_crypto.dsa import DSAEngine 182 sig_type = self._destination.signing_public_key.sig_type 183 pub_key = self._destination.signing_public_key.to_bytes() 184 return DSAEngine.verify(self._signable_bytes(), self._signature, pub_key, sig_type) 185 186 def __repr__(self) -> str: 187 return f"LeaseSet(leases={len(self._leases)}, current={self.is_current()})" 188 189 190class Lease2(Lease): 191 """LS2 lease with 4-byte second-resolution timestamps (40 bytes total). 192 193 Ported from net.i2p.data.Lease2. 194 195 Wire format: 32 (gateway hash) + 4 (tunnel ID) + 4 (end date seconds) 196 Internal API uses milliseconds for compatibility with Lease. 197 """ 198 199 SIZE = 40 # 32 + 4 + 4 200 201 def to_bytes(self) -> bytes: 202 """Serialize to 40 bytes (end_date stored as seconds on wire).""" 203 return ( 204 self._gateway_hash 205 + struct.pack("!I", self._tunnel_id) 206 + struct.pack("!I", self._end_date // 1000) 207 ) 208 209 @classmethod 210 def from_bytes(cls, data: bytes) -> "Lease2": 211 """Deserialize from 40 bytes.""" 212 if len(data) < cls.SIZE: 213 raise ValueError(f"Lease2 requires {cls.SIZE} bytes, got {len(data)}") 214 gateway_hash = data[:32] 215 tunnel_id = struct.unpack("!I", data[32:36])[0] 216 end_seconds = struct.unpack("!I", data[36:40])[0] 217 return cls(gateway_hash, tunnel_id, end_seconds * 1000)