A Python port of the Invisible Internet Project (I2P)
1"""Lease and LeaseSet — tunnel authorization data structures.
2
3Ported from net.i2p.data.Lease and net.i2p.data.LeaseSet.
4
5Lease: 44 bytes = 32 (gateway hash) + 4 (tunnel ID) + 8 (end date ms)
6LeaseSet: destination + encryption key + signing key + leases + signature
7"""
8
9from __future__ import annotations
10
11import io
12import struct
13import time
14
15
16class Lease:
17 """A single tunnel authorization.
18
19 Contains the gateway router's identity hash, the tunnel ID, and
20 the expiration timestamp.
21 """
22
23 __slots__ = ("_gateway_hash", "_tunnel_id", "_end_date")
24
25 SIZE = 44 # 32 + 4 + 8
26
27 def __init__(self, gateway_hash: bytes, tunnel_id: int, end_date: int) -> None:
28 if len(gateway_hash) != 32:
29 raise ValueError(f"Gateway hash must be 32 bytes, got {len(gateway_hash)}")
30 self._gateway_hash = gateway_hash
31 self._tunnel_id = tunnel_id
32 self._end_date = end_date
33
34 @property
35 def gateway_hash(self) -> bytes:
36 return self._gateway_hash
37
38 @property
39 def tunnel_id(self) -> int:
40 return self._tunnel_id
41
42 @property
43 def end_date(self) -> int:
44 """Expiration timestamp in milliseconds."""
45 return self._end_date
46
47 def is_expired(self, now_ms: int | None = None) -> bool:
48 """Check if this lease has expired."""
49 if now_ms is None:
50 now_ms = int(time.time() * 1000)
51 return self._end_date <= now_ms
52
53 def to_bytes(self) -> bytes:
54 """Serialize to 44 bytes."""
55 return (
56 self._gateway_hash
57 + struct.pack("!I", self._tunnel_id)
58 + struct.pack("!Q", self._end_date)
59 )
60
61 @classmethod
62 def from_bytes(cls, data: bytes) -> "Lease":
63 """Deserialize from 44 bytes."""
64 if len(data) < cls.SIZE:
65 raise ValueError(f"Lease requires {cls.SIZE} bytes, got {len(data)}")
66 gateway_hash = data[:32]
67 tunnel_id = struct.unpack("!I", data[32:36])[0]
68 end_date = struct.unpack("!Q", data[36:44])[0]
69 return cls(gateway_hash, tunnel_id, end_date)
70
71 def __eq__(self, other: object) -> bool:
72 if not isinstance(other, Lease):
73 return NotImplemented
74 return (self._gateway_hash == other._gateway_hash
75 and self._tunnel_id == other._tunnel_id
76 and self._end_date == other._end_date)
77
78 def __hash__(self) -> int:
79 return hash((self._gateway_hash, self._tunnel_id, self._end_date))
80
81 def __repr__(self) -> str:
82 return f"Lease(gw={self._gateway_hash[:4].hex()}..., tid={self._tunnel_id})"
83
84
85class LeaseSet:
86 """A set of leases for a destination.
87
88 Contains the destination identity, an encryption key, a signing key,
89 up to 16 leases, and a signature.
90 """
91
92 __slots__ = ("_destination", "_encryption_key", "_signing_key",
93 "_leases", "_signature")
94
95 MAX_LEASES = 16
96
97 def __init__(self, destination, encryption_key, signing_key,
98 leases: list[Lease] | None = None,
99 signature: bytes = b"") -> None:
100 self._destination = destination
101 self._encryption_key = encryption_key
102 self._signing_key = signing_key
103 self._leases = list(leases) if leases else []
104 self._signature = signature
105
106 @property
107 def destination(self):
108 return self._destination
109
110 @property
111 def encryption_key(self):
112 return self._encryption_key
113
114 @property
115 def signing_key(self):
116 return self._signing_key
117
118 @property
119 def leases(self) -> list[Lease]:
120 return list(self._leases)
121
122 @property
123 def signature(self) -> bytes:
124 return self._signature
125
126 def add_lease(self, lease: Lease) -> None:
127 """Add a lease. Raises ValueError if max leases exceeded."""
128 if len(self._leases) >= self.MAX_LEASES:
129 raise ValueError(f"Maximum {self.MAX_LEASES} leases allowed")
130 self._leases.append(lease)
131
132 def get_lease(self, index: int) -> Lease:
133 """Get lease by index."""
134 return self._leases[index]
135
136 def lease_count(self) -> int:
137 return len(self._leases)
138
139 def is_current(self, now_ms: int | None = None) -> bool:
140 """True if any lease is not expired."""
141 if now_ms is None:
142 now_ms = int(time.time() * 1000)
143 return any(not lease.is_expired(now_ms) for lease in self._leases)
144
145 def _signable_bytes(self) -> bytes:
146 """Get bytes that are signed."""
147 buf = io.BytesIO()
148
149 # Destination
150 buf.write(self._destination.to_bytes())
151
152 # Encryption key
153 buf.write(self._encryption_key.to_bytes())
154
155 # Signing key
156 buf.write(self._signing_key.to_bytes())
157
158 # Number of leases (1 byte)
159 buf.write(struct.pack("!B", len(self._leases)))
160
161 # Leases
162 for lease in self._leases:
163 buf.write(lease.to_bytes())
164
165 return buf.getvalue()
166
167 def to_bytes(self) -> bytes:
168 """Serialize to wire format."""
169 return self._signable_bytes() + self._signature
170
171 def sign(self, private_key: bytes) -> None:
172 """Sign this LeaseSet."""
173 from i2p_crypto.dsa import DSAEngine
174 sig_type = self._destination.signing_public_key.sig_type
175 self._signature = DSAEngine.sign(self._signable_bytes(), private_key, sig_type)
176
177 def verify(self) -> bool:
178 """Verify the signature using the destination's signing key."""
179 if not self._signature:
180 return False
181 from i2p_crypto.dsa import DSAEngine
182 sig_type = self._destination.signing_public_key.sig_type
183 pub_key = self._destination.signing_public_key.to_bytes()
184 return DSAEngine.verify(self._signable_bytes(), self._signature, pub_key, sig_type)
185
186 def __repr__(self) -> str:
187 return f"LeaseSet(leases={len(self._leases)}, current={self.is_current()})"
188
189
190class Lease2(Lease):
191 """LS2 lease with 4-byte second-resolution timestamps (40 bytes total).
192
193 Ported from net.i2p.data.Lease2.
194
195 Wire format: 32 (gateway hash) + 4 (tunnel ID) + 4 (end date seconds)
196 Internal API uses milliseconds for compatibility with Lease.
197 """
198
199 SIZE = 40 # 32 + 4 + 4
200
201 def to_bytes(self) -> bytes:
202 """Serialize to 40 bytes (end_date stored as seconds on wire)."""
203 return (
204 self._gateway_hash
205 + struct.pack("!I", self._tunnel_id)
206 + struct.pack("!I", self._end_date // 1000)
207 )
208
209 @classmethod
210 def from_bytes(cls, data: bytes) -> "Lease2":
211 """Deserialize from 40 bytes."""
212 if len(data) < cls.SIZE:
213 raise ValueError(f"Lease2 requires {cls.SIZE} bytes, got {len(data)}")
214 gateway_hash = data[:32]
215 tunnel_id = struct.unpack("!I", data[32:36])[0]
216 end_seconds = struct.unpack("!I", data[36:40])[0]
217 return cls(gateway_hash, tunnel_id, end_seconds * 1000)