"""Tests for MetaLease and MetaLeaseSet (Type 7) — TDD tests written before implementation.""" import struct import time import pytest from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey from cryptography.hazmat.primitives.serialization import ( Encoding, PublicFormat, PrivateFormat, NoEncryption, ) from i2p_crypto.dsa import SigType, DSAEngine from i2p_data.key_types import ( PublicKey, SigningPublicKey, SigningPrivateKey, EncType, ) from i2p_data.certificate import KeyCertificate from i2p_data.destination import Destination from i2p_data.lease_set2 import MetaLease, MetaLeaseSet # --------------------------------------------------------------------------- # Helpers # --------------------------------------------------------------------------- def _make_ed25519_keypair(): """Return (pub_bytes_32, priv_bytes_32) for Ed25519.""" priv = Ed25519PrivateKey.generate() pub_bytes = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) priv_bytes = priv.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) return pub_bytes, priv_bytes def _make_destination(): """Create a Destination with X25519 enc + Ed25519 signing (KeyCertificate).""" pub_bytes, priv_bytes = _make_ed25519_keypair() sig_type = SigType.EdDSA_SHA512_Ed25519 signing_pub = SigningPublicKey(pub_bytes, sig_type=sig_type) signing_priv = SigningPrivateKey(priv_bytes, sig_type=sig_type) # KeyCert payload: sig_type_code(2) + enc_type_code(2) # Ed25519 = code 7, ECIES_X25519 = code 4 cert = KeyCertificate(struct.pack("!HH", 7, 4)) # ECIES_X25519 public key is 32 bytes enc_key = PublicKey(b"\x00" * 32, EncType.ECIES_X25519) dest = Destination(enc_key, signing_pub, cert) return dest, signing_priv, priv_bytes def _make_meta_lease(index: int = 0, cost: int = 10, ls_type: int = 3) -> MetaLease: """Create a MetaLease with deterministic data.""" gw = bytes([index & 0xFF]) * 32 end_date = int(time.time()) + 600 # 10 min from now, seconds return MetaLease( gateway_hash=gw, flags=0, ls_type=ls_type, cost=cost, end_date=end_date, ) # --------------------------------------------------------------------------- # Tests # --------------------------------------------------------------------------- class TestMetaLease: def test_meta_lease_roundtrip(self): """Single MetaLease to_bytes/from_bytes roundtrip.""" ml = _make_meta_lease(index=5, cost=20, ls_type=3) data = ml.to_bytes() assert len(data) == MetaLease.SIZE restored = MetaLease.from_bytes(data) assert restored.gateway_hash == ml.gateway_hash assert restored.flags == ml.flags assert restored.ls_type == ml.ls_type assert restored.cost == ml.cost assert restored.end_date == ml.end_date def test_meta_lease_size_40(self): """Verify MetaLease.SIZE == 40.""" assert MetaLease.SIZE == 40 ml = _make_meta_lease() assert len(ml.to_bytes()) == 40 class TestMetaLeaseSet: def test_meta_lease_set_roundtrip(self): """Create with 2 meta-leases, serialize, deserialize.""" dest, signing_priv, priv_bytes = _make_destination() published = int(time.time()) expires = published + 600 mls = MetaLeaseSet( destination=dest, published=published, expires=expires, flags=0, meta_leases=[_make_meta_lease(0), _make_meta_lease(1)], ) mls.sign(priv_bytes, SigType.EdDSA_SHA512_Ed25519) assert mls.verify() data = mls.to_bytes() restored = MetaLeaseSet.from_bytes(data) assert restored.destination == dest assert restored.published == published assert restored.expires == expires assert len(restored.meta_leases) == 2 assert restored.meta_leases[0].gateway_hash == bytes([0]) * 32 assert restored.meta_leases[1].gateway_hash == bytes([1]) * 32 assert restored.verify() def test_meta_lease_set_with_revocations(self): """Create with 1 meta-lease + 2 revocations, verify roundtrip.""" dest, signing_priv, priv_bytes = _make_destination() published = int(time.time()) expires = published + 600 rev1 = b"\xaa" * 32 rev2 = b"\xbb" * 32 mls = MetaLeaseSet( destination=dest, published=published, expires=expires, flags=0, meta_leases=[_make_meta_lease(0)], revocations=[rev1, rev2], ) mls.sign(priv_bytes, SigType.EdDSA_SHA512_Ed25519) assert mls.verify() data = mls.to_bytes() restored = MetaLeaseSet.from_bytes(data) assert len(restored.meta_leases) == 1 assert len(restored.revocations) == 2 assert restored.revocations[0] == rev1 assert restored.revocations[1] == rev2 assert restored.verify() def test_no_encryption_keys(self): """Verify MetaLeaseSet body has no enc keys section. The body should be: options_len(2) + options + num_meta_leases(1) + meta_leases + num_revocations(1) + revocations. There is no num_enc_keys byte or encryption key data. """ dest, signing_priv, priv_bytes = _make_destination() published = int(time.time()) expires = published + 600 mls = MetaLeaseSet( destination=dest, published=published, expires=expires, flags=0, meta_leases=[_make_meta_lease(0)], ) body = mls._body_bytes() # Body starts with options_len (2 bytes, value 0) assert body[0:2] == b"\x00\x00" # Next byte is num_meta_leases (1 byte, value 1) assert body[2:3] == b"\x01" # Then 40 bytes of MetaLease # Then num_revocations (1 byte, value 0) assert body[2 + 1 + 40] == 0 # num_revocations # Total body: 2 + 1 + 40 + 1 = 44 bytes assert len(body) == 44 def test_cost_ordering(self): """Verify lower cost is preferred (sorted ordering).""" ml_high = _make_meta_lease(index=0, cost=200) ml_low = _make_meta_lease(index=1, cost=10) ml_mid = _make_meta_lease(index=2, cost=50) leases = [ml_high, ml_low, ml_mid] by_cost = sorted(leases, key=lambda m: m.cost) assert by_cost[0].cost == 10 assert by_cost[1].cost == 50 assert by_cost[2].cost == 200 def test_type_byte_7(self): """Verify signable bytes start with 0x07.""" dest, signing_priv, priv_bytes = _make_destination() published = int(time.time()) expires = published + 600 mls = MetaLeaseSet( destination=dest, published=published, expires=expires, flags=0, meta_leases=[_make_meta_lease(0)], ) signable = mls._signable_bytes() assert signable[0:1] == b"\x07"