A Python port of the Invisible Internet Project (I2P)
1"""LeaseSet2 (Type 3) — modern lease set with multiple encryption keys.
2
3Ported from net.i2p.data.LeaseSet2.
4
5Wire format:
6 HEADER: Destination + published(4) + expires_offset(2) + flags(2) + [offline_block]
7 BODY: options_len(2) + [options] + num_enc_keys(1) + enc_keys + num_leases(1) + leases
8 SIGNATURE: variable (from sig type)
9
10Signature covers: type_byte(0x03) + header + body
11"""
12
13from __future__ import annotations
14
15import io
16import struct
17from dataclasses import dataclass
18from typing import TYPE_CHECKING
19
20if TYPE_CHECKING:
21 from i2p_crypto.dsa import SigType
22
23from i2p_data.destination import Destination
24from i2p_data.lease import Lease2
25
26
27# ---------------------------------------------------------------------------
28# OfflineBlock
29# ---------------------------------------------------------------------------
30
31@dataclass
32class OfflineBlock:
33 """Offline signing block — delegates signing to a transient key.
34
35 Wire format:
36 transient_expires (uint32, seconds)
37 transient_sig_type (uint16)
38 transient_spk (variable, from sig type)
39 offline_signature (variable, from dest's sig type)
40
41 The offline_signature signs:
42 published(4) + transient_expires(4) + transient_sig_type(2) + transient_spk
43 """
44
45 transient_expires: int
46 transient_sig_type: "SigType"
47 transient_spk: bytes
48 offline_signature: bytes
49
50 def to_bytes(self) -> bytes:
51 """Serialize to wire format (without leading published timestamp)."""
52 return (
53 struct.pack("!I", self.transient_expires)
54 + struct.pack("!H", self.transient_sig_type.code)
55 + self.transient_spk
56 + self.offline_signature
57 )
58
59 @classmethod
60 def from_stream(cls, stream: io.IOBase, dest_sig_type, published: int) -> "OfflineBlock":
61 """Read an OfflineBlock from a stream.
62
63 Args:
64 stream: byte stream positioned after the flags field
65 dest_sig_type: SigType of the destination (for offline_signature length)
66 published: the published timestamp (needed for verify)
67 """
68 from i2p_crypto.dsa import SigType
69
70 raw = stream.read(4)
71 if len(raw) != 4:
72 raise ValueError("Truncated offline block (transient_expires)")
73 transient_expires = struct.unpack("!I", raw)[0]
74
75 raw = stream.read(2)
76 if len(raw) != 2:
77 raise ValueError("Truncated offline block (transient_sig_type)")
78 trans_sig_code = struct.unpack("!H", raw)[0]
79 trans_sig_type = SigType.by_code(trans_sig_code)
80 if trans_sig_type is None:
81 raise ValueError(f"Unknown transient sig type code: {trans_sig_code}")
82
83 spk_len = trans_sig_type.pubkey_len
84 transient_spk = stream.read(spk_len)
85 if len(transient_spk) != spk_len:
86 raise ValueError(f"Truncated offline block (transient_spk), expected {spk_len}")
87
88 sig_len = dest_sig_type.sig_len
89 offline_sig = stream.read(sig_len)
90 if len(offline_sig) != sig_len:
91 raise ValueError(f"Truncated offline block (signature), expected {sig_len}")
92
93 return cls(
94 transient_expires=transient_expires,
95 transient_sig_type=trans_sig_type,
96 transient_spk=transient_spk,
97 offline_signature=offline_sig,
98 )
99
100 def verify(self, published: int, dest_pub_key: bytes, dest_sig_type) -> bool:
101 """Verify the offline signature using the destination's signing key.
102
103 The signed payload is: published(4) + transient_expires(4) + transient_sig_type(2) + transient_spk
104 """
105 from i2p_crypto.dsa import DSAEngine
106
107 payload = (
108 struct.pack("!I", published)
109 + struct.pack("!I", self.transient_expires)
110 + struct.pack("!H", self.transient_sig_type.code)
111 + self.transient_spk
112 )
113 return DSAEngine.verify(payload, self.offline_signature, dest_pub_key, dest_sig_type)
114
115
116# ---------------------------------------------------------------------------
117# LeaseSet2
118# ---------------------------------------------------------------------------
119
120class LeaseSet2:
121 """LeaseSet2 (database type 3) — modern lease set.
122
123 Supports multiple encryption key types, options, and offline signing.
124 """
125
126 TYPE = 3
127
128 FLAG_OFFLINE_KEYS = 0x0001
129 FLAG_UNPUBLISHED = 0x0002
130 FLAG_BLINDED = 0x0004
131
132 MAX_LEASES = 16
133 MAX_ENC_KEYS = 8
134
135 __slots__ = (
136 "_destination", "_published", "_expires", "_flags",
137 "_offline_block", "_options", "_encryption_keys", "_leases",
138 "_signature",
139 )
140
141 def __init__(
142 self,
143 destination: Destination,
144 published: int,
145 expires: int,
146 flags: int = 0,
147 encryption_keys: list[tuple[int, bytes]] | None = None,
148 leases: list[Lease2] | None = None,
149 options: dict[str, str] | None = None,
150 offline_block: OfflineBlock | None = None,
151 signature: bytes = b"",
152 ) -> None:
153 self._destination = destination
154 self._published = published
155 self._expires = expires
156 self._flags = flags
157 self._encryption_keys = list(encryption_keys) if encryption_keys else []
158 self._leases = list(leases) if leases else []
159 self._options = dict(options) if options else {}
160 self._offline_block = offline_block
161 self._signature = signature
162
163 # -- Properties ---------------------------------------------------------
164
165 @property
166 def destination(self) -> Destination:
167 return self._destination
168
169 @property
170 def published(self) -> int:
171 return self._published
172
173 @property
174 def expires(self) -> int:
175 return self._expires
176
177 @property
178 def flags(self) -> int:
179 return self._flags
180
181 @property
182 def encryption_keys(self) -> list[tuple[int, bytes]]:
183 return list(self._encryption_keys)
184
185 @property
186 def leases(self) -> list[Lease2]:
187 return list(self._leases)
188
189 @property
190 def options(self) -> dict[str, str]:
191 return dict(self._options)
192
193 @property
194 def offline_block(self) -> OfflineBlock | None:
195 return self._offline_block
196
197 @property
198 def signature(self) -> bytes:
199 return self._signature
200
201 @property
202 def is_offline(self) -> bool:
203 return bool(self._flags & self.FLAG_OFFLINE_KEYS)
204
205 @property
206 def is_unpublished(self) -> bool:
207 return bool(self._flags & self.FLAG_UNPUBLISHED)
208
209 @property
210 def is_blinded(self) -> bool:
211 return bool(self._flags & self.FLAG_BLINDED)
212
213 def get_signing_key(self) -> bytes:
214 """Return the effective signing public key bytes.
215
216 If offline, returns the transient SPK; otherwise returns the
217 destination's signing public key.
218 """
219 if self.is_offline and self._offline_block is not None:
220 return self._offline_block.transient_spk
221 return self._destination.signing_public_key.to_bytes()
222
223 def _get_signing_sig_type(self):
224 """Return the SigType used for the LS2 signature."""
225 if self.is_offline and self._offline_block is not None:
226 return self._offline_block.transient_sig_type
227 return self._destination.signing_public_key.sig_type
228
229 # -- Wire format --------------------------------------------------------
230
231 def _header_bytes(self) -> bytes:
232 """Serialize the header portion (destination through offline block)."""
233 buf = io.BytesIO()
234 buf.write(self._destination.to_bytes())
235 buf.write(struct.pack("!I", self._published))
236 buf.write(struct.pack("!H", self._expires - self._published))
237 buf.write(struct.pack("!H", self._flags))
238
239 if self.is_offline and self._offline_block is not None:
240 buf.write(self._offline_block.to_bytes())
241
242 return buf.getvalue()
243
244 def _body_bytes(self) -> bytes:
245 """Serialize the body portion (options + enc keys + leases)."""
246 buf = io.BytesIO()
247
248 # Options
249 if self._options:
250 opts_str = "".join(
251 f"{k}={v};\n" for k, v in sorted(self._options.items())
252 )
253 opts_data = opts_str.encode("utf-8")
254 buf.write(struct.pack("!H", len(opts_data)))
255 buf.write(opts_data)
256 else:
257 buf.write(struct.pack("!H", 0))
258
259 # Encryption keys
260 buf.write(struct.pack("!B", len(self._encryption_keys)))
261 for enc_type_code, key_data in self._encryption_keys:
262 buf.write(struct.pack("!H", enc_type_code))
263 buf.write(struct.pack("!H", len(key_data)))
264 buf.write(key_data)
265
266 # Leases
267 buf.write(struct.pack("!B", len(self._leases)))
268 for lease in self._leases:
269 buf.write(lease.to_bytes())
270
271 return buf.getvalue()
272
273 def _signable_bytes(self) -> bytes:
274 """Bytes covered by the signature: type_byte + header + body."""
275 return bytes([self.TYPE]) + self._header_bytes() + self._body_bytes()
276
277 def to_bytes(self) -> bytes:
278 """Serialize to full wire format: header + body + signature."""
279 return self._header_bytes() + self._body_bytes() + self._signature
280
281 def sign(self, private_key_bytes: bytes, sig_type) -> None:
282 """Sign this LeaseSet2.
283
284 Args:
285 private_key_bytes: raw private key for signing
286 sig_type: SigType to use for signing
287 """
288 from i2p_crypto.dsa import DSAEngine
289 self._signature = DSAEngine.sign(
290 self._signable_bytes(), private_key_bytes, sig_type
291 )
292
293 def verify(self) -> bool:
294 """Verify the LS2 signature.
295
296 Uses the transient key if offline, else the destination's signing key.
297 """
298 if not self._signature:
299 return False
300
301 from i2p_crypto.dsa import DSAEngine
302
303 sig_type = self._get_signing_sig_type()
304 pub_key = self.get_signing_key()
305
306 # First verify the offline block if present
307 if self.is_offline and self._offline_block is not None:
308 dest_sig_type = self._destination.signing_public_key.sig_type
309 dest_pub = self._destination.signing_public_key.to_bytes()
310 if not self._offline_block.verify(self._published, dest_pub, dest_sig_type):
311 return False
312
313 return DSAEngine.verify(
314 self._signable_bytes(), self._signature, pub_key, sig_type
315 )
316
317 # -- Deserialization ----------------------------------------------------
318
319 @classmethod
320 def from_bytes(cls, data: bytes) -> "LeaseSet2":
321 """Deserialize a LeaseSet2 from wire format."""
322 from i2p_crypto.dsa import SigType
323
324 stream = io.BytesIO(data)
325
326 # Header
327 dest = Destination.from_stream(stream)
328 dest_sig_type = dest.signing_public_key.sig_type
329
330 raw = stream.read(4)
331 if len(raw) != 4:
332 raise ValueError("Truncated LeaseSet2 (published)")
333 published = struct.unpack("!I", raw)[0]
334
335 raw = stream.read(2)
336 if len(raw) != 2:
337 raise ValueError("Truncated LeaseSet2 (expires_offset)")
338 expires_offset = struct.unpack("!H", raw)[0]
339 expires = published + expires_offset
340
341 raw = stream.read(2)
342 if len(raw) != 2:
343 raise ValueError("Truncated LeaseSet2 (flags)")
344 flags = struct.unpack("!H", raw)[0]
345
346 # Offline block
347 offline_block = None
348 if flags & cls.FLAG_OFFLINE_KEYS:
349 offline_block = OfflineBlock.from_stream(stream, dest_sig_type, published)
350
351 # Body — options
352 raw = stream.read(2)
353 if len(raw) != 2:
354 raise ValueError("Truncated LeaseSet2 (options_len)")
355 opts_len = struct.unpack("!H", raw)[0]
356
357 options = {}
358 if opts_len > 0:
359 opts_data = stream.read(opts_len)
360 if len(opts_data) != opts_len:
361 raise ValueError("Truncated LeaseSet2 (options data)")
362 opts_str = opts_data.decode("utf-8")
363 for line in opts_str.strip().split("\n"):
364 line = line.rstrip(";")
365 if "=" in line:
366 k, v = line.split("=", 1)
367 options[k] = v
368
369 # Encryption keys
370 raw = stream.read(1)
371 if len(raw) != 1:
372 raise ValueError("Truncated LeaseSet2 (num_enc_keys)")
373 num_enc_keys = raw[0]
374
375 encryption_keys = []
376 for _ in range(num_enc_keys):
377 raw = stream.read(2)
378 if len(raw) != 2:
379 raise ValueError("Truncated LeaseSet2 (enc_type)")
380 enc_type_code = struct.unpack("!H", raw)[0]
381 raw = stream.read(2)
382 if len(raw) != 2:
383 raise ValueError("Truncated LeaseSet2 (enc_key_len)")
384 key_len = struct.unpack("!H", raw)[0]
385 key_data = stream.read(key_len)
386 if len(key_data) != key_len:
387 raise ValueError("Truncated LeaseSet2 (enc_key_data)")
388 encryption_keys.append((enc_type_code, key_data))
389
390 # Leases
391 raw = stream.read(1)
392 if len(raw) != 1:
393 raise ValueError("Truncated LeaseSet2 (num_leases)")
394 num_leases = raw[0]
395
396 leases = []
397 for _ in range(num_leases):
398 lease_data = stream.read(Lease2.SIZE)
399 if len(lease_data) != Lease2.SIZE:
400 raise ValueError("Truncated LeaseSet2 (lease data)")
401 leases.append(Lease2.from_bytes(lease_data))
402
403 # Signature — determine sig type for length
404 if offline_block is not None:
405 sig_type = offline_block.transient_sig_type
406 else:
407 sig_type = dest_sig_type
408 sig_len = sig_type.sig_len
409 signature = stream.read(sig_len)
410 if len(signature) != sig_len:
411 raise ValueError(f"Truncated LeaseSet2 (signature), expected {sig_len}")
412
413 return cls(
414 destination=dest,
415 published=published,
416 expires=expires,
417 flags=flags,
418 encryption_keys=encryption_keys,
419 leases=leases,
420 options=options,
421 offline_block=offline_block,
422 signature=signature,
423 )
424
425 def __repr__(self) -> str:
426 return (
427 f"LeaseSet2(leases={len(self._leases)}, "
428 f"enc_keys={len(self._encryption_keys)}, "
429 f"offline={self.is_offline})"
430 )
431
432
433# ---------------------------------------------------------------------------
434# MetaLease (40 bytes)
435# ---------------------------------------------------------------------------
436
437class MetaLease:
438 """A single meta-lease entry in a MetaLeaseSet (Type 7).
439
440 Wire format (40 bytes):
441 gateway_hash: 32 bytes (hash of the sub-LeaseSet holder)
442 flags: 2 bytes (uint16, reserved — currently 0)
443 ls_type: 1 byte (uint8: 3=LS2, 7=MetaLS2)
444 cost: 1 byte (uint8: routing metric, lower=preferred)
445 end_date: 4 bytes (uint32, seconds since epoch)
446 """
447
448 __slots__ = ("_gateway_hash", "_flags", "_ls_type", "_cost", "_end_date")
449
450 SIZE = 40 # 32 + 2 + 1 + 1 + 4
451
452 def __init__(
453 self,
454 gateway_hash: bytes,
455 flags: int = 0,
456 ls_type: int = 3,
457 cost: int = 0,
458 end_date: int = 0,
459 ) -> None:
460 if len(gateway_hash) != 32:
461 raise ValueError(f"Gateway hash must be 32 bytes, got {len(gateway_hash)}")
462 self._gateway_hash = gateway_hash
463 self._flags = flags
464 self._ls_type = ls_type
465 self._cost = cost
466 self._end_date = end_date
467
468 @property
469 def gateway_hash(self) -> bytes:
470 return self._gateway_hash
471
472 @property
473 def flags(self) -> int:
474 return self._flags
475
476 @property
477 def ls_type(self) -> int:
478 return self._ls_type
479
480 @property
481 def cost(self) -> int:
482 return self._cost
483
484 @property
485 def end_date(self) -> int:
486 return self._end_date
487
488 def to_bytes(self) -> bytes:
489 """Serialize to 40 bytes."""
490 return (
491 self._gateway_hash
492 + struct.pack("!H", self._flags)
493 + struct.pack("!B", self._ls_type)
494 + struct.pack("!B", self._cost)
495 + struct.pack("!I", self._end_date)
496 )
497
498 @classmethod
499 def from_bytes(cls, data: bytes) -> "MetaLease":
500 """Deserialize from 40 bytes."""
501 if len(data) < cls.SIZE:
502 raise ValueError(f"MetaLease requires {cls.SIZE} bytes, got {len(data)}")
503 gateway_hash = data[:32]
504 flags = struct.unpack("!H", data[32:34])[0]
505 ls_type = data[34]
506 cost = data[35]
507 end_date = struct.unpack("!I", data[36:40])[0]
508 return cls(
509 gateway_hash=gateway_hash,
510 flags=flags,
511 ls_type=ls_type,
512 cost=cost,
513 end_date=end_date,
514 )
515
516 def __eq__(self, other: object) -> bool:
517 if not isinstance(other, MetaLease):
518 return NotImplemented
519 return (
520 self._gateway_hash == other._gateway_hash
521 and self._flags == other._flags
522 and self._ls_type == other._ls_type
523 and self._cost == other._cost
524 and self._end_date == other._end_date
525 )
526
527 def __hash__(self) -> int:
528 return hash((self._gateway_hash, self._flags, self._ls_type, self._cost, self._end_date))
529
530 def __repr__(self) -> str:
531 return (
532 f"MetaLease(gw={self._gateway_hash[:4].hex()}..., "
533 f"type={self._ls_type}, cost={self._cost})"
534 )
535
536
537# ---------------------------------------------------------------------------
538# MetaLeaseSet (Type 7)
539# ---------------------------------------------------------------------------
540
541class MetaLeaseSet(LeaseSet2):
542 """MetaLeaseSet (database type 7) — hierarchical lease set.
543
544 Same header as LeaseSet2 (destination + published + expires + flags +
545 [offline block]).
546
547 Body differs from LeaseSet2:
548 options_len(2) + [options]
549 num_meta_leases(1) + MetaLease * N (40 bytes each)
550 num_revocations(1) + revocation_hash * N (32 bytes each)
551
552 Key difference: NO encryption keys section.
553
554 Signature covers: type_byte(7) + header + body
555 """
556
557 TYPE = 7
558
559 MAX_META_LEASES = 16
560 MAX_REVOCATIONS = 16
561
562 __slots__ = ("_meta_leases", "_revocations")
563
564 def __init__(
565 self,
566 destination: "Destination",
567 published: int,
568 expires: int,
569 flags: int = 0,
570 meta_leases: list[MetaLease] | None = None,
571 revocations: list[bytes] | None = None,
572 options: dict[str, str] | None = None,
573 offline_block: OfflineBlock | None = None,
574 signature: bytes = b"",
575 ) -> None:
576 # Initialize the parent for header fields — pass empty enc_keys/leases
577 super().__init__(
578 destination=destination,
579 published=published,
580 expires=expires,
581 flags=flags,
582 encryption_keys=None,
583 leases=None,
584 options=options,
585 offline_block=offline_block,
586 signature=signature,
587 )
588 self._meta_leases = list(meta_leases) if meta_leases else []
589 self._revocations = list(revocations) if revocations else []
590
591 @property
592 def meta_leases(self) -> list[MetaLease]:
593 return list(self._meta_leases)
594
595 @property
596 def revocations(self) -> list[bytes]:
597 return list(self._revocations)
598
599 def _body_bytes(self) -> bytes:
600 """Serialize the body: options + meta-leases + revocations (no enc keys)."""
601 buf = io.BytesIO()
602
603 # Options
604 if self._options:
605 opts_str = "".join(
606 f"{k}={v};\n" for k, v in sorted(self._options.items())
607 )
608 opts_data = opts_str.encode("utf-8")
609 buf.write(struct.pack("!H", len(opts_data)))
610 buf.write(opts_data)
611 else:
612 buf.write(struct.pack("!H", 0))
613
614 # Meta-leases (no encryption keys section)
615 buf.write(struct.pack("!B", len(self._meta_leases)))
616 for ml in self._meta_leases:
617 buf.write(ml.to_bytes())
618
619 # Revocations
620 buf.write(struct.pack("!B", len(self._revocations)))
621 for rev_hash in self._revocations:
622 buf.write(rev_hash)
623
624 return buf.getvalue()
625
626 @classmethod
627 def from_bytes(cls, data: bytes) -> "MetaLeaseSet":
628 """Deserialize a MetaLeaseSet from wire format."""
629 from i2p_crypto.dsa import SigType
630
631 stream = io.BytesIO(data)
632
633 # Header (reuse same parsing as LeaseSet2)
634 dest = Destination.from_stream(stream)
635 dest_sig_type = dest.signing_public_key.sig_type
636
637 raw = stream.read(4)
638 if len(raw) != 4:
639 raise ValueError("Truncated MetaLeaseSet (published)")
640 published = struct.unpack("!I", raw)[0]
641
642 raw = stream.read(2)
643 if len(raw) != 2:
644 raise ValueError("Truncated MetaLeaseSet (expires_offset)")
645 expires_offset = struct.unpack("!H", raw)[0]
646 expires = published + expires_offset
647
648 raw = stream.read(2)
649 if len(raw) != 2:
650 raise ValueError("Truncated MetaLeaseSet (flags)")
651 flags = struct.unpack("!H", raw)[0]
652
653 offline_block = None
654 if flags & cls.FLAG_OFFLINE_KEYS:
655 offline_block = OfflineBlock.from_stream(stream, dest_sig_type, published)
656
657 # Body — options
658 raw = stream.read(2)
659 if len(raw) != 2:
660 raise ValueError("Truncated MetaLeaseSet (options_len)")
661 opts_len = struct.unpack("!H", raw)[0]
662
663 options = {}
664 if opts_len > 0:
665 opts_data = stream.read(opts_len)
666 if len(opts_data) != opts_len:
667 raise ValueError("Truncated MetaLeaseSet (options data)")
668 opts_str = opts_data.decode("utf-8")
669 for line in opts_str.strip().split("\n"):
670 line = line.rstrip(";")
671 if "=" in line:
672 k, v = line.split("=", 1)
673 options[k] = v
674
675 # Meta-leases (NOT encryption keys)
676 raw = stream.read(1)
677 if len(raw) != 1:
678 raise ValueError("Truncated MetaLeaseSet (num_meta_leases)")
679 num_meta_leases = raw[0]
680
681 meta_leases = []
682 for _ in range(num_meta_leases):
683 ml_data = stream.read(MetaLease.SIZE)
684 if len(ml_data) != MetaLease.SIZE:
685 raise ValueError("Truncated MetaLeaseSet (meta lease data)")
686 meta_leases.append(MetaLease.from_bytes(ml_data))
687
688 # Revocations
689 raw = stream.read(1)
690 if len(raw) != 1:
691 raise ValueError("Truncated MetaLeaseSet (num_revocations)")
692 num_revocations = raw[0]
693
694 revocations = []
695 for _ in range(num_revocations):
696 rev_hash = stream.read(32)
697 if len(rev_hash) != 32:
698 raise ValueError("Truncated MetaLeaseSet (revocation hash)")
699 revocations.append(rev_hash)
700
701 # Signature
702 if offline_block is not None:
703 sig_type = offline_block.transient_sig_type
704 else:
705 sig_type = dest_sig_type
706 sig_len = sig_type.sig_len
707 signature = stream.read(sig_len)
708 if len(signature) != sig_len:
709 raise ValueError(f"Truncated MetaLeaseSet (signature), expected {sig_len}")
710
711 return cls(
712 destination=dest,
713 published=published,
714 expires=expires,
715 flags=flags,
716 meta_leases=meta_leases,
717 revocations=revocations,
718 options=options,
719 offline_block=offline_block,
720 signature=signature,
721 )
722
723 def __repr__(self) -> str:
724 return (
725 f"MetaLeaseSet(meta_leases={len(self._meta_leases)}, "
726 f"revocations={len(self._revocations)}, "
727 f"offline={self.is_offline})"
728 )