"""Tier 4 protocol gap tests: EncryptedLeaseSet (Type 5). Tests two-layer ChaCha20 encryption, subcredential derivation, per-client auth (DH/PSK), wire format roundtrip. """ import hashlib import os import struct import time import pytest from i2p_crypto.blinding import Blinding from i2p_crypto.dsa import SigType, KeyGenerator from i2p_crypto.hkdf import HKDF from i2p_data.encrypted_lease_set import ( EncryptedLeaseSet, compute_credential, compute_subcredential, ) from i2p_data.key_types import SigningPublicKey # -- Helpers -- def _make_ed25519_keys(): """Generate an Ed25519 signing key pair.""" pub, priv = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) return pub, priv def _make_inner_ls_bytes(): """Create a minimal inner LeaseSet2-like byte string for testing. In real usage this would be LeaseSet2.to_bytes(), but for testing the encryption layer we just need arbitrary bytes. """ return os.urandom(200) # -- Subcredential computation -- class TestSubcredential: def test_credential_deterministic(self): pub, _ = _make_ed25519_keys() spk = SigningPublicKey(pub, sig_type=SigType.EdDSA_SHA512_Ed25519) c1 = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) c2 = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) assert c1 == c2 assert len(c1) == 32 def test_subcredential_deterministic(self): pub, _ = _make_ed25519_keys() spk = SigningPublicKey(pub, sig_type=SigType.EdDSA_SHA512_Ed25519) cred = compute_credential(spk, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) blinded_spk = os.urandom(32) sc1 = compute_subcredential(cred, blinded_spk) sc2 = compute_subcredential(cred, blinded_spk) assert sc1 == sc2 assert len(sc1) == 32 def test_different_keys_different_subcredential(self): pub1, _ = _make_ed25519_keys() pub2, _ = _make_ed25519_keys() spk1 = SigningPublicKey(pub1, sig_type=SigType.EdDSA_SHA512_Ed25519) spk2 = SigningPublicKey(pub2, sig_type=SigType.EdDSA_SHA512_Ed25519) cred1 = compute_credential(spk1, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) cred2 = compute_credential(spk2, SigType.EdDSA_SHA512_Ed25519, SigType.RedDSA_SHA512_Ed25519) # Different keys should produce different credentials assert cred1 != cred2 # -- Wire format -- class TestWireFormat: def test_type_code(self): assert EncryptedLeaseSet.TYPE == 5 def test_roundtrip_no_auth(self): """Serialize and deserialize without auth.""" blinded_spk = os.urandom(32) published = int(time.time()) expires = published + 600 encrypted_data = os.urandom(200) signature = os.urandom(64) els = EncryptedLeaseSet( blinded_sig_type=SigType.RedDSA_SHA512_Ed25519, blinded_spk=blinded_spk, published=published, expires=expires, flags=0, encrypted_data=encrypted_data, signature=signature, ) wire = els.to_bytes() els2 = EncryptedLeaseSet.from_bytes(wire) assert els2.blinded_sig_type.code == SigType.RedDSA_SHA512_Ed25519.code assert els2.blinded_spk == blinded_spk assert els2.published == published assert els2.expires == expires assert els2.flags == 0 assert els2.encrypted_data == encrypted_data assert els2.signature == signature def test_wire_layout(self): """Verify byte layout matches spec.""" blinded_spk = b"\xaa" * 32 published = 1000000 expires = 1000600 encrypted_data = b"\xbb" * 50 signature = b"\xcc" * 64 els = EncryptedLeaseSet( blinded_sig_type=SigType.RedDSA_SHA512_Ed25519, blinded_spk=blinded_spk, published=published, expires=expires, flags=0, encrypted_data=encrypted_data, signature=signature, ) wire = els.to_bytes() # blinded_sig_type(2) + blinded_spk(32) + published(4) + expires_offset(2) + flags(2) # + encrypted_len(2) + encrypted_data(50) + signature(64) assert len(wire) == 2 + 32 + 4 + 2 + 2 + 2 + 50 + 64 assert struct.unpack("!H", wire[0:2])[0] == 11 # RedDSA code assert wire[2:34] == blinded_spk assert struct.unpack("!I", wire[34:38])[0] == published assert struct.unpack("!H", wire[38:40])[0] == 600 # offset assert struct.unpack("!H", wire[40:42])[0] == 0 # flags def test_hash_computation(self): """Hash = SHA256(sig_type_code(2) + blinded_spk).""" blinded_spk = os.urandom(32) els = EncryptedLeaseSet( blinded_sig_type=SigType.RedDSA_SHA512_Ed25519, blinded_spk=blinded_spk, published=1000, expires=1600, flags=0, encrypted_data=b"", signature=b"\x00" * 64, ) expected = hashlib.sha256( struct.pack("!H", 11) + blinded_spk ).digest() assert els.compute_hash() == expected # -- Two-layer encryption/decryption -- class TestEncryptDecrypt: def test_roundtrip_no_auth(self): """Encrypt inner LS bytes, then decrypt and verify identical.""" inner_bytes = _make_inner_ls_bytes() subcredential = os.urandom(32) published = int(time.time()) blinded_spk = os.urandom(32) encrypted_data = EncryptedLeaseSet.encrypt_inner( inner_bytes=inner_bytes, subcredential=subcredential, published=published, auth_type=0, ) decrypted = EncryptedLeaseSet.decrypt_inner( encrypted_data=encrypted_data, subcredential=subcredential, published=published, auth_type=0, ) assert decrypted == inner_bytes def test_roundtrip_dh_auth(self): """Encrypt/decrypt with DH per-client auth.""" inner_bytes = _make_inner_ls_bytes() subcredential = os.urandom(32) published = int(time.time()) cookie = os.urandom(32) encrypted_data = EncryptedLeaseSet.encrypt_inner( inner_bytes=inner_bytes, subcredential=subcredential, published=published, auth_type=1, auth_cookie=cookie, ) decrypted = EncryptedLeaseSet.decrypt_inner( encrypted_data=encrypted_data, subcredential=subcredential, published=published, auth_type=1, auth_cookie=cookie, ) assert decrypted == inner_bytes def test_roundtrip_psk_auth(self): """Encrypt/decrypt with PSK per-client auth.""" inner_bytes = _make_inner_ls_bytes() subcredential = os.urandom(32) published = int(time.time()) cookie = os.urandom(32) encrypted_data = EncryptedLeaseSet.encrypt_inner( inner_bytes=inner_bytes, subcredential=subcredential, published=published, auth_type=2, auth_cookie=cookie, ) decrypted = EncryptedLeaseSet.decrypt_inner( encrypted_data=encrypted_data, subcredential=subcredential, published=published, auth_type=2, auth_cookie=cookie, ) assert decrypted == inner_bytes def test_wrong_subcredential_fails(self): """Decryption with wrong subcredential produces different bytes.""" inner_bytes = _make_inner_ls_bytes() subcredential = os.urandom(32) published = int(time.time()) encrypted_data = EncryptedLeaseSet.encrypt_inner( inner_bytes=inner_bytes, subcredential=subcredential, published=published, auth_type=0, ) wrong_subcredential = os.urandom(32) decrypted = EncryptedLeaseSet.decrypt_inner( encrypted_data=encrypted_data, subcredential=wrong_subcredential, published=published, auth_type=0, ) # ChaCha20 doesn't authenticate, so it will "decrypt" to garbage assert decrypted != inner_bytes def test_various_inner_sizes(self): """Encrypt/decrypt works for various inner LS sizes.""" subcredential = os.urandom(32) published = int(time.time()) for size in [1, 50, 200, 500, 1000, 4000]: inner_bytes = os.urandom(size) encrypted_data = EncryptedLeaseSet.encrypt_inner( inner_bytes=inner_bytes, subcredential=subcredential, published=published, auth_type=0, ) decrypted = EncryptedLeaseSet.decrypt_inner( encrypted_data=encrypted_data, subcredential=subcredential, published=published, auth_type=0, ) assert decrypted == inner_bytes, f"Failed for size {size}"