A Python port of the Invisible Internet Project (I2P)
at main 274 lines 9.2 kB view raw
1"""Tier 4 protocol gap tests: EncryptedLeaseSet (Type 5). 2 3Tests two-layer ChaCha20 encryption, subcredential derivation, 4per-client auth (DH/PSK), wire format roundtrip. 5""" 6 7import hashlib 8import os 9import struct 10import time 11 12import pytest 13 14from i2p_crypto.blinding import Blinding 15from i2p_crypto.dsa import SigType, KeyGenerator 16from i2p_crypto.hkdf import HKDF 17from i2p_data.encrypted_lease_set import ( 18 EncryptedLeaseSet, 19 compute_credential, 20 compute_subcredential, 21) 22from i2p_data.key_types import SigningPublicKey 23 24 25# -- Helpers -- 26 27def _make_ed25519_keys(): 28 """Generate an Ed25519 signing key pair.""" 29 pub, priv = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) 30 return pub, priv 31 32 33def _make_inner_ls_bytes(): 34 """Create a minimal inner LeaseSet2-like byte string for testing. 35 36 In real usage this would be LeaseSet2.to_bytes(), but for testing 37 the encryption layer we just need arbitrary bytes. 38 """ 39 return os.urandom(200) 40 41 42# -- Subcredential computation -- 43 44class TestSubcredential: 45 def test_credential_deterministic(self): 46 pub, _ = _make_ed25519_keys() 47 spk = SigningPublicKey(pub, sig_type=SigType.EdDSA_SHA512_Ed25519) 48 c1 = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) 49 c2 = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) 50 assert c1 == c2 51 assert len(c1) == 32 52 53 def test_subcredential_deterministic(self): 54 pub, _ = _make_ed25519_keys() 55 spk = SigningPublicKey(pub, sig_type=SigType.EdDSA_SHA512_Ed25519) 56 cred = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) 57 blinded_spk = os.urandom(32) 58 sc1 = compute_subcredential(cred, blinded_spk) 59 sc2 = compute_subcredential(cred, blinded_spk) 60 assert sc1 == sc2 61 assert len(sc1) == 32 62 63 def test_different_keys_different_subcredential(self): 64 pub1, _ = _make_ed25519_keys() 65 pub2, _ = _make_ed25519_keys() 66 spk1 = SigningPublicKey(pub1, sig_type=SigType.EdDSA_SHA512_Ed25519) 67 spk2 = SigningPublicKey(pub2, sig_type=SigType.EdDSA_SHA512_Ed25519) 68 cred1 = compute_credential(spk1, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) 69 cred2 = compute_credential(spk2, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) 70 # Different keys should produce different credentials 71 assert cred1 != cred2 72 73 74# -- Wire format -- 75 76class TestWireFormat: 77 def test_type_code(self): 78 assert EncryptedLeaseSet.TYPE == 5 79 80 def test_roundtrip_no_auth(self): 81 """Serialize and deserialize without auth.""" 82 blinded_spk = os.urandom(32) 83 published = int(time.time()) 84 expires = published + 600 85 encrypted_data = os.urandom(200) 86 signature = os.urandom(64) 87 88 els = EncryptedLeaseSet( 89 blinded_sig_type=SigType.RedDSA_SHA512_Ed25519, 90 blinded_spk=blinded_spk, 91 published=published, 92 expires=expires, 93 flags=0, 94 encrypted_data=encrypted_data, 95 signature=signature, 96 ) 97 98 wire = els.to_bytes() 99 els2 = EncryptedLeaseSet.from_bytes(wire) 100 101 assert els2.blinded_sig_type.code == SigType.RedDSA_SHA512_Ed25519.code 102 assert els2.blinded_spk == blinded_spk 103 assert els2.published == published 104 assert els2.expires == expires 105 assert els2.flags == 0 106 assert els2.encrypted_data == encrypted_data 107 assert els2.signature == signature 108 109 def test_wire_layout(self): 110 """Verify byte layout matches spec.""" 111 blinded_spk = b"\xaa" * 32 112 published = 1000000 113 expires = 1000600 114 encrypted_data = b"\xbb" * 50 115 signature = b"\xcc" * 64 116 117 els = EncryptedLeaseSet( 118 blinded_sig_type=SigType.RedDSA_SHA512_Ed25519, 119 blinded_spk=blinded_spk, 120 published=published, 121 expires=expires, 122 flags=0, 123 encrypted_data=encrypted_data, 124 signature=signature, 125 ) 126 127 wire = els.to_bytes() 128 # blinded_sig_type(2) + blinded_spk(32) + published(4) + expires_offset(2) + flags(2) 129 # + encrypted_len(2) + encrypted_data(50) + signature(64) 130 assert len(wire) == 2 + 32 + 4 + 2 + 2 + 2 + 50 + 64 131 132 assert struct.unpack("!H", wire[0:2])[0] == 11 # RedDSA code 133 assert wire[2:34] == blinded_spk 134 assert struct.unpack("!I", wire[34:38])[0] == published 135 assert struct.unpack("!H", wire[38:40])[0] == 600 # offset 136 assert struct.unpack("!H", wire[40:42])[0] == 0 # flags 137 138 def test_hash_computation(self): 139 """Hash = SHA256(sig_type_code(2) + blinded_spk).""" 140 blinded_spk = os.urandom(32) 141 els = EncryptedLeaseSet( 142 blinded_sig_type=SigType.RedDSA_SHA512_Ed25519, 143 blinded_spk=blinded_spk, 144 published=1000, 145 expires=1600, 146 flags=0, 147 encrypted_data=b"", 148 signature=b"\x00" * 64, 149 ) 150 expected = hashlib.sha256( 151 struct.pack("!H", 11) + blinded_spk 152 ).digest() 153 assert els.compute_hash() == expected 154 155 156# -- Two-layer encryption/decryption -- 157 158class TestEncryptDecrypt: 159 def test_roundtrip_no_auth(self): 160 """Encrypt inner LS bytes, then decrypt and verify identical.""" 161 inner_bytes = _make_inner_ls_bytes() 162 subcredential = os.urandom(32) 163 published = int(time.time()) 164 blinded_spk = os.urandom(32) 165 166 encrypted_data = EncryptedLeaseSet.encrypt_inner( 167 inner_bytes=inner_bytes, 168 subcredential=subcredential, 169 published=published, 170 auth_type=0, 171 ) 172 173 decrypted = EncryptedLeaseSet.decrypt_inner( 174 encrypted_data=encrypted_data, 175 subcredential=subcredential, 176 published=published, 177 auth_type=0, 178 ) 179 180 assert decrypted == inner_bytes 181 182 def test_roundtrip_dh_auth(self): 183 """Encrypt/decrypt with DH per-client auth.""" 184 inner_bytes = _make_inner_ls_bytes() 185 subcredential = os.urandom(32) 186 published = int(time.time()) 187 cookie = os.urandom(32) 188 189 encrypted_data = EncryptedLeaseSet.encrypt_inner( 190 inner_bytes=inner_bytes, 191 subcredential=subcredential, 192 published=published, 193 auth_type=1, 194 auth_cookie=cookie, 195 ) 196 197 decrypted = EncryptedLeaseSet.decrypt_inner( 198 encrypted_data=encrypted_data, 199 subcredential=subcredential, 200 published=published, 201 auth_type=1, 202 auth_cookie=cookie, 203 ) 204 205 assert decrypted == inner_bytes 206 207 def test_roundtrip_psk_auth(self): 208 """Encrypt/decrypt with PSK per-client auth.""" 209 inner_bytes = _make_inner_ls_bytes() 210 subcredential = os.urandom(32) 211 published = int(time.time()) 212 cookie = os.urandom(32) 213 214 encrypted_data = EncryptedLeaseSet.encrypt_inner( 215 inner_bytes=inner_bytes, 216 subcredential=subcredential, 217 published=published, 218 auth_type=2, 219 auth_cookie=cookie, 220 ) 221 222 decrypted = EncryptedLeaseSet.decrypt_inner( 223 encrypted_data=encrypted_data, 224 subcredential=subcredential, 225 published=published, 226 auth_type=2, 227 auth_cookie=cookie, 228 ) 229 230 assert decrypted == inner_bytes 231 232 def test_wrong_subcredential_fails(self): 233 """Decryption with wrong subcredential produces different bytes.""" 234 inner_bytes = _make_inner_ls_bytes() 235 subcredential = os.urandom(32) 236 published = int(time.time()) 237 238 encrypted_data = EncryptedLeaseSet.encrypt_inner( 239 inner_bytes=inner_bytes, 240 subcredential=subcredential, 241 published=published, 242 auth_type=0, 243 ) 244 245 wrong_subcredential = os.urandom(32) 246 decrypted = EncryptedLeaseSet.decrypt_inner( 247 encrypted_data=encrypted_data, 248 subcredential=wrong_subcredential, 249 published=published, 250 auth_type=0, 251 ) 252 253 # ChaCha20 doesn't authenticate, so it will "decrypt" to garbage 254 assert decrypted != inner_bytes 255 256 def test_various_inner_sizes(self): 257 """Encrypt/decrypt works for various inner LS sizes.""" 258 subcredential = os.urandom(32) 259 published = int(time.time()) 260 for size in [1, 50, 200, 500, 1000, 4000]: 261 inner_bytes = os.urandom(size) 262 encrypted_data = EncryptedLeaseSet.encrypt_inner( 263 inner_bytes=inner_bytes, 264 subcredential=subcredential, 265 published=published, 266 auth_type=0, 267 ) 268 decrypted = EncryptedLeaseSet.decrypt_inner( 269 encrypted_data=encrypted_data, 270 subcredential=subcredential, 271 published=published, 272 auth_type=0, 273 ) 274 assert decrypted == inner_bytes, f"Failed for size {size}"