A Python port of the Invisible Internet Project (I2P)
1"""Tests for MetaLease and MetaLeaseSet (Type 7) — TDD tests written before implementation."""
2
3import struct
4import time
5
6import pytest
7from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey
8from cryptography.hazmat.primitives.serialization import (
9 Encoding, PublicFormat, PrivateFormat, NoEncryption,
10)
11
12from i2p_crypto.dsa import SigType, DSAEngine
13from i2p_data.key_types import (
14 PublicKey, SigningPublicKey, SigningPrivateKey, EncType,
15)
16from i2p_data.certificate import KeyCertificate
17from i2p_data.destination import Destination
18from i2p_data.lease_set2 import MetaLease, MetaLeaseSet
19
20
21# ---------------------------------------------------------------------------
22# Helpers
23# ---------------------------------------------------------------------------
24
25def _make_ed25519_keypair():
26 """Return (pub_bytes_32, priv_bytes_32) for Ed25519."""
27 priv = Ed25519PrivateKey.generate()
28 pub_bytes = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw)
29 priv_bytes = priv.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption())
30 return pub_bytes, priv_bytes
31
32
33def _make_destination():
34 """Create a Destination with X25519 enc + Ed25519 signing (KeyCertificate)."""
35 pub_bytes, priv_bytes = _make_ed25519_keypair()
36 sig_type = SigType.EdDSA_SHA512_Ed25519
37
38 signing_pub = SigningPublicKey(pub_bytes, sig_type=sig_type)
39 signing_priv = SigningPrivateKey(priv_bytes, sig_type=sig_type)
40
41 # KeyCert payload: sig_type_code(2) + enc_type_code(2)
42 # Ed25519 = code 7, ECIES_X25519 = code 4
43 cert = KeyCertificate(struct.pack("!HH", 7, 4))
44
45 # ECIES_X25519 public key is 32 bytes
46 enc_key = PublicKey(b"\x00" * 32, EncType.ECIES_X25519)
47 dest = Destination(enc_key, signing_pub, cert)
48 return dest, signing_priv, priv_bytes
49
50
51def _make_meta_lease(index: int = 0, cost: int = 10, ls_type: int = 3) -> MetaLease:
52 """Create a MetaLease with deterministic data."""
53 gw = bytes([index & 0xFF]) * 32
54 end_date = int(time.time()) + 600 # 10 min from now, seconds
55 return MetaLease(
56 gateway_hash=gw,
57 flags=0,
58 ls_type=ls_type,
59 cost=cost,
60 end_date=end_date,
61 )
62
63
64# ---------------------------------------------------------------------------
65# Tests
66# ---------------------------------------------------------------------------
67
68class TestMetaLease:
69
70 def test_meta_lease_roundtrip(self):
71 """Single MetaLease to_bytes/from_bytes roundtrip."""
72 ml = _make_meta_lease(index=5, cost=20, ls_type=3)
73
74 data = ml.to_bytes()
75 assert len(data) == MetaLease.SIZE
76
77 restored = MetaLease.from_bytes(data)
78 assert restored.gateway_hash == ml.gateway_hash
79 assert restored.flags == ml.flags
80 assert restored.ls_type == ml.ls_type
81 assert restored.cost == ml.cost
82 assert restored.end_date == ml.end_date
83
84 def test_meta_lease_size_40(self):
85 """Verify MetaLease.SIZE == 40."""
86 assert MetaLease.SIZE == 40
87 ml = _make_meta_lease()
88 assert len(ml.to_bytes()) == 40
89
90
91class TestMetaLeaseSet:
92
93 def test_meta_lease_set_roundtrip(self):
94 """Create with 2 meta-leases, serialize, deserialize."""
95 dest, signing_priv, priv_bytes = _make_destination()
96 published = int(time.time())
97 expires = published + 600
98
99 mls = MetaLeaseSet(
100 destination=dest,
101 published=published,
102 expires=expires,
103 flags=0,
104 meta_leases=[_make_meta_lease(0), _make_meta_lease(1)],
105 )
106 mls.sign(priv_bytes, SigType.EdDSA_SHA512_Ed25519)
107 assert mls.verify()
108
109 data = mls.to_bytes()
110 restored = MetaLeaseSet.from_bytes(data)
111
112 assert restored.destination == dest
113 assert restored.published == published
114 assert restored.expires == expires
115 assert len(restored.meta_leases) == 2
116 assert restored.meta_leases[0].gateway_hash == bytes([0]) * 32
117 assert restored.meta_leases[1].gateway_hash == bytes([1]) * 32
118 assert restored.verify()
119
120 def test_meta_lease_set_with_revocations(self):
121 """Create with 1 meta-lease + 2 revocations, verify roundtrip."""
122 dest, signing_priv, priv_bytes = _make_destination()
123 published = int(time.time())
124 expires = published + 600
125
126 rev1 = b"\xaa" * 32
127 rev2 = b"\xbb" * 32
128
129 mls = MetaLeaseSet(
130 destination=dest,
131 published=published,
132 expires=expires,
133 flags=0,
134 meta_leases=[_make_meta_lease(0)],
135 revocations=[rev1, rev2],
136 )
137 mls.sign(priv_bytes, SigType.EdDSA_SHA512_Ed25519)
138 assert mls.verify()
139
140 data = mls.to_bytes()
141 restored = MetaLeaseSet.from_bytes(data)
142
143 assert len(restored.meta_leases) == 1
144 assert len(restored.revocations) == 2
145 assert restored.revocations[0] == rev1
146 assert restored.revocations[1] == rev2
147 assert restored.verify()
148
149 def test_no_encryption_keys(self):
150 """Verify MetaLeaseSet body has no enc keys section.
151
152 The body should be: options_len(2) + options + num_meta_leases(1) +
153 meta_leases + num_revocations(1) + revocations.
154 There is no num_enc_keys byte or encryption key data.
155 """
156 dest, signing_priv, priv_bytes = _make_destination()
157 published = int(time.time())
158 expires = published + 600
159
160 mls = MetaLeaseSet(
161 destination=dest,
162 published=published,
163 expires=expires,
164 flags=0,
165 meta_leases=[_make_meta_lease(0)],
166 )
167
168 body = mls._body_bytes()
169
170 # Body starts with options_len (2 bytes, value 0)
171 assert body[0:2] == b"\x00\x00"
172 # Next byte is num_meta_leases (1 byte, value 1)
173 assert body[2:3] == b"\x01"
174 # Then 40 bytes of MetaLease
175 # Then num_revocations (1 byte, value 0)
176 assert body[2 + 1 + 40] == 0 # num_revocations
177 # Total body: 2 + 1 + 40 + 1 = 44 bytes
178 assert len(body) == 44
179
180 def test_cost_ordering(self):
181 """Verify lower cost is preferred (sorted ordering)."""
182 ml_high = _make_meta_lease(index=0, cost=200)
183 ml_low = _make_meta_lease(index=1, cost=10)
184 ml_mid = _make_meta_lease(index=2, cost=50)
185
186 leases = [ml_high, ml_low, ml_mid]
187 by_cost = sorted(leases, key=lambda m: m.cost)
188 assert by_cost[0].cost == 10
189 assert by_cost[1].cost == 50
190 assert by_cost[2].cost == 200
191
192 def test_type_byte_7(self):
193 """Verify signable bytes start with 0x07."""
194 dest, signing_priv, priv_bytes = _make_destination()
195 published = int(time.time())
196 expires = published + 600
197
198 mls = MetaLeaseSet(
199 destination=dest,
200 published=published,
201 expires=expires,
202 flags=0,
203 meta_leases=[_make_meta_lease(0)],
204 )
205
206 signable = mls._signable_bytes()
207 assert signable[0:1] == b"\x07"