"""Tests for Lease and LeaseSet.""" import os import struct import time import pytest class TestLease: def test_construct(self): from i2p_data.lease import Lease gw = os.urandom(32) lease = Lease(gw, 12345, 1710000000000) assert lease.gateway_hash == gw assert lease.tunnel_id == 12345 assert lease.end_date == 1710000000000 def test_roundtrip(self): from i2p_data.lease import Lease gw = os.urandom(32) lease = Lease(gw, 99999, 1710000000000) data = lease.to_bytes() assert len(data) == 44 lease2 = Lease.from_bytes(data) assert lease2.gateway_hash == gw assert lease2.tunnel_id == 99999 assert lease2.end_date == 1710000000000 def test_size_is_44(self): from i2p_data.lease import Lease lease = Lease(os.urandom(32), 1, 0) assert len(lease.to_bytes()) == 44 def test_expired(self): from i2p_data.lease import Lease # Past timestamp lease = Lease(os.urandom(32), 1, 1000) assert lease.is_expired(now_ms=2000) assert not lease.is_expired(now_ms=500) def test_not_expired(self): from i2p_data.lease import Lease future_ms = int(time.time() * 1000) + 3600_000 lease = Lease(os.urandom(32), 1, future_ms) assert not lease.is_expired() def test_wrong_hash_size(self): from i2p_data.lease import Lease with pytest.raises(ValueError): Lease(b"too short", 1, 0) def test_equality(self): from i2p_data.lease import Lease gw = os.urandom(32) l1 = Lease(gw, 100, 5000) l2 = Lease(gw, 100, 5000) l3 = Lease(gw, 200, 5000) assert l1 == l2 assert l1 != l3 def test_from_bytes_too_short(self): from i2p_data.lease import Lease with pytest.raises(ValueError): Lease.from_bytes(b"\x00" * 10) class TestLeaseSet: def _make_lease_set(self, num_leases=2): from i2p_data.lease import Lease, LeaseSet from i2p_data.destination import Destination from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import KeyCertificate from i2p_crypto.dsa import SigType, KeyGenerator # Generate signing keypair pub_sig, priv_sig = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) # Create destination cert = KeyCertificate(struct.pack("!HH", 7, 4)) dest = Destination( PublicKey(os.urandom(32), EncType.ECIES_X25519), SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519), cert ) enc_key = PublicKey(os.urandom(256), EncType.ELGAMAL) sign_key = SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519) future_ms = int(time.time() * 1000) + 3600_000 leases = [ Lease(os.urandom(32), i + 1, future_ms) for i in range(num_leases) ] ls = LeaseSet(dest, enc_key, sign_key, leases) ls.sign(priv_sig) return ls, priv_sig def test_sign_and_verify(self): ls, _ = self._make_lease_set() assert ls.verify() def test_lease_count(self): ls, _ = self._make_lease_set(3) assert ls.lease_count() == 3 def test_get_lease(self): ls, _ = self._make_lease_set(2) lease = ls.get_lease(0) assert lease.tunnel_id == 1 lease = ls.get_lease(1) assert lease.tunnel_id == 2 def test_is_current(self): ls, _ = self._make_lease_set(1) assert ls.is_current() # Future expiration def test_is_not_current_when_expired(self): from i2p_data.lease import Lease, LeaseSet from i2p_data.destination import Destination from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import KeyCertificate from i2p_crypto.dsa import SigType, KeyGenerator pub_sig, priv_sig = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) cert = KeyCertificate(struct.pack("!HH", 7, 4)) dest = Destination( PublicKey(os.urandom(32), EncType.ECIES_X25519), SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519), cert ) # Expired lease lease = Lease(os.urandom(32), 1, 1000) ls = LeaseSet(dest, PublicKey(os.urandom(256), EncType.ELGAMAL), SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519), [lease]) assert not ls.is_current(now_ms=2000) def test_add_lease(self): from i2p_data.lease import Lease ls, _ = self._make_lease_set(0) assert ls.lease_count() == 0 ls.add_lease(Lease(os.urandom(32), 42, int(time.time() * 1000) + 60000)) assert ls.lease_count() == 1 def test_max_leases_enforced(self): from i2p_data.lease import Lease ls, _ = self._make_lease_set(16) assert ls.lease_count() == 16 with pytest.raises(ValueError): ls.add_lease(Lease(os.urandom(32), 99, 0)) def test_verify_fails_modified(self): ls, _ = self._make_lease_set() # Corrupt a lease ls._leases[0] = type(ls._leases[0])( os.urandom(32), 9999, ls._leases[0].end_date ) assert not ls.verify() def test_unsigned_verify_fails(self): from i2p_data.lease import Lease, LeaseSet from i2p_data.destination import Destination from i2p_data.key_types import PublicKey, SigningPublicKey, EncType from i2p_data.certificate import Certificate from i2p_crypto.dsa import SigType dest = Destination( PublicKey(os.urandom(256), EncType.ELGAMAL), SigningPublicKey(os.urandom(128), SigType.DSA_SHA1), Certificate.NULL ) ls = LeaseSet(dest, PublicKey(os.urandom(256), EncType.ELGAMAL), SigningPublicKey(os.urandom(128), SigType.DSA_SHA1)) assert not ls.verify()