A Python port of the Invisible Internet Project (I2P)
1"""Tier 4 protocol gap tests: EncryptedLeaseSet (Type 5).
2
3Tests two-layer ChaCha20 encryption, subcredential derivation,
4per-client auth (DH/PSK), wire format roundtrip.
5"""
6
7import hashlib
8import os
9import struct
10import time
11
12import pytest
13
14from i2p_crypto.blinding import Blinding
15from i2p_crypto.dsa import SigType, KeyGenerator
16from i2p_crypto.hkdf import HKDF
17from i2p_data.encrypted_lease_set import (
18 EncryptedLeaseSet,
19 compute_credential,
20 compute_subcredential,
21)
22from i2p_data.key_types import SigningPublicKey
23
24
25# -- Helpers --
26
27def _make_ed25519_keys():
28 """Generate an Ed25519 signing key pair."""
29 pub, priv = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519)
30 return pub, priv
31
32
33def _make_inner_ls_bytes():
34 """Create a minimal inner LeaseSet2-like byte string for testing.
35
36 In real usage this would be LeaseSet2.to_bytes(), but for testing
37 the encryption layer we just need arbitrary bytes.
38 """
39 return os.urandom(200)
40
41
42# -- Subcredential computation --
43
44class TestSubcredential:
45 def test_credential_deterministic(self):
46 pub, _ = _make_ed25519_keys()
47 spk = SigningPublicKey(pub, sig_type=SigType.EdDSA_SHA512_Ed25519)
48 c1 = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519)
49 c2 = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519)
50 assert c1 == c2
51 assert len(c1) == 32
52
53 def test_subcredential_deterministic(self):
54 pub, _ = _make_ed25519_keys()
55 spk = SigningPublicKey(pub, sig_type=SigType.EdDSA_SHA512_Ed25519)
56 cred = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519)
57 blinded_spk = os.urandom(32)
58 sc1 = compute_subcredential(cred, blinded_spk)
59 sc2 = compute_subcredential(cred, blinded_spk)
60 assert sc1 == sc2
61 assert len(sc1) == 32
62
63 def test_different_keys_different_subcredential(self):
64 pub1, _ = _make_ed25519_keys()
65 pub2, _ = _make_ed25519_keys()
66 spk1 = SigningPublicKey(pub1, sig_type=SigType.EdDSA_SHA512_Ed25519)
67 spk2 = SigningPublicKey(pub2, sig_type=SigType.EdDSA_SHA512_Ed25519)
68 cred1 = compute_credential(spk1, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519)
69 cred2 = compute_credential(spk2, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519)
70 # Different keys should produce different credentials
71 assert cred1 != cred2
72
73
74# -- Wire format --
75
76class TestWireFormat:
77 def test_type_code(self):
78 assert EncryptedLeaseSet.TYPE == 5
79
80 def test_roundtrip_no_auth(self):
81 """Serialize and deserialize without auth."""
82 blinded_spk = os.urandom(32)
83 published = int(time.time())
84 expires = published + 600
85 encrypted_data = os.urandom(200)
86 signature = os.urandom(64)
87
88 els = EncryptedLeaseSet(
89 blinded_sig_type=SigType.RedDSA_SHA512_Ed25519,
90 blinded_spk=blinded_spk,
91 published=published,
92 expires=expires,
93 flags=0,
94 encrypted_data=encrypted_data,
95 signature=signature,
96 )
97
98 wire = els.to_bytes()
99 els2 = EncryptedLeaseSet.from_bytes(wire)
100
101 assert els2.blinded_sig_type.code == SigType.RedDSA_SHA512_Ed25519.code
102 assert els2.blinded_spk == blinded_spk
103 assert els2.published == published
104 assert els2.expires == expires
105 assert els2.flags == 0
106 assert els2.encrypted_data == encrypted_data
107 assert els2.signature == signature
108
109 def test_wire_layout(self):
110 """Verify byte layout matches spec."""
111 blinded_spk = b"\xaa" * 32
112 published = 1000000
113 expires = 1000600
114 encrypted_data = b"\xbb" * 50
115 signature = b"\xcc" * 64
116
117 els = EncryptedLeaseSet(
118 blinded_sig_type=SigType.RedDSA_SHA512_Ed25519,
119 blinded_spk=blinded_spk,
120 published=published,
121 expires=expires,
122 flags=0,
123 encrypted_data=encrypted_data,
124 signature=signature,
125 )
126
127 wire = els.to_bytes()
128 # blinded_sig_type(2) + blinded_spk(32) + published(4) + expires_offset(2) + flags(2)
129 # + encrypted_len(2) + encrypted_data(50) + signature(64)
130 assert len(wire) == 2 + 32 + 4 + 2 + 2 + 2 + 50 + 64
131
132 assert struct.unpack("!H", wire[0:2])[0] == 11 # RedDSA code
133 assert wire[2:34] == blinded_spk
134 assert struct.unpack("!I", wire[34:38])[0] == published
135 assert struct.unpack("!H", wire[38:40])[0] == 600 # offset
136 assert struct.unpack("!H", wire[40:42])[0] == 0 # flags
137
138 def test_hash_computation(self):
139 """Hash = SHA256(sig_type_code(2) + blinded_spk)."""
140 blinded_spk = os.urandom(32)
141 els = EncryptedLeaseSet(
142 blinded_sig_type=SigType.RedDSA_SHA512_Ed25519,
143 blinded_spk=blinded_spk,
144 published=1000,
145 expires=1600,
146 flags=0,
147 encrypted_data=b"",
148 signature=b"\x00" * 64,
149 )
150 expected = hashlib.sha256(
151 struct.pack("!H", 11) + blinded_spk
152 ).digest()
153 assert els.compute_hash() == expected
154
155
156# -- Two-layer encryption/decryption --
157
158class TestEncryptDecrypt:
159 def test_roundtrip_no_auth(self):
160 """Encrypt inner LS bytes, then decrypt and verify identical."""
161 inner_bytes = _make_inner_ls_bytes()
162 subcredential = os.urandom(32)
163 published = int(time.time())
164 blinded_spk = os.urandom(32)
165
166 encrypted_data = EncryptedLeaseSet.encrypt_inner(
167 inner_bytes=inner_bytes,
168 subcredential=subcredential,
169 published=published,
170 auth_type=0,
171 )
172
173 decrypted = EncryptedLeaseSet.decrypt_inner(
174 encrypted_data=encrypted_data,
175 subcredential=subcredential,
176 published=published,
177 auth_type=0,
178 )
179
180 assert decrypted == inner_bytes
181
182 def test_roundtrip_dh_auth(self):
183 """Encrypt/decrypt with DH per-client auth."""
184 inner_bytes = _make_inner_ls_bytes()
185 subcredential = os.urandom(32)
186 published = int(time.time())
187 cookie = os.urandom(32)
188
189 encrypted_data = EncryptedLeaseSet.encrypt_inner(
190 inner_bytes=inner_bytes,
191 subcredential=subcredential,
192 published=published,
193 auth_type=1,
194 auth_cookie=cookie,
195 )
196
197 decrypted = EncryptedLeaseSet.decrypt_inner(
198 encrypted_data=encrypted_data,
199 subcredential=subcredential,
200 published=published,
201 auth_type=1,
202 auth_cookie=cookie,
203 )
204
205 assert decrypted == inner_bytes
206
207 def test_roundtrip_psk_auth(self):
208 """Encrypt/decrypt with PSK per-client auth."""
209 inner_bytes = _make_inner_ls_bytes()
210 subcredential = os.urandom(32)
211 published = int(time.time())
212 cookie = os.urandom(32)
213
214 encrypted_data = EncryptedLeaseSet.encrypt_inner(
215 inner_bytes=inner_bytes,
216 subcredential=subcredential,
217 published=published,
218 auth_type=2,
219 auth_cookie=cookie,
220 )
221
222 decrypted = EncryptedLeaseSet.decrypt_inner(
223 encrypted_data=encrypted_data,
224 subcredential=subcredential,
225 published=published,
226 auth_type=2,
227 auth_cookie=cookie,
228 )
229
230 assert decrypted == inner_bytes
231
232 def test_wrong_subcredential_fails(self):
233 """Decryption with wrong subcredential produces different bytes."""
234 inner_bytes = _make_inner_ls_bytes()
235 subcredential = os.urandom(32)
236 published = int(time.time())
237
238 encrypted_data = EncryptedLeaseSet.encrypt_inner(
239 inner_bytes=inner_bytes,
240 subcredential=subcredential,
241 published=published,
242 auth_type=0,
243 )
244
245 wrong_subcredential = os.urandom(32)
246 decrypted = EncryptedLeaseSet.decrypt_inner(
247 encrypted_data=encrypted_data,
248 subcredential=wrong_subcredential,
249 published=published,
250 auth_type=0,
251 )
252
253 # ChaCha20 doesn't authenticate, so it will "decrypt" to garbage
254 assert decrypted != inner_bytes
255
256 def test_various_inner_sizes(self):
257 """Encrypt/decrypt works for various inner LS sizes."""
258 subcredential = os.urandom(32)
259 published = int(time.time())
260 for size in [1, 50, 200, 500, 1000, 4000]:
261 inner_bytes = os.urandom(size)
262 encrypted_data = EncryptedLeaseSet.encrypt_inner(
263 inner_bytes=inner_bytes,
264 subcredential=subcredential,
265 published=published,
266 auth_type=0,
267 )
268 decrypted = EncryptedLeaseSet.decrypt_inner(
269 encrypted_data=encrypted_data,
270 subcredential=subcredential,
271 published=published,
272 auth_type=0,
273 )
274 assert decrypted == inner_bytes, f"Failed for size {size}"