A Python port of the Invisible Internet Project (I2P)
at main 177 lines 6.1 kB view raw
1"""Tests for Lease and LeaseSet.""" 2 3import os 4import struct 5import time 6 7import pytest 8 9 10class TestLease: 11 def test_construct(self): 12 from i2p_data.lease import Lease 13 gw = os.urandom(32) 14 lease = Lease(gw, 12345, 1710000000000) 15 assert lease.gateway_hash == gw 16 assert lease.tunnel_id == 12345 17 assert lease.end_date == 1710000000000 18 19 def test_roundtrip(self): 20 from i2p_data.lease import Lease 21 gw = os.urandom(32) 22 lease = Lease(gw, 99999, 1710000000000) 23 data = lease.to_bytes() 24 assert len(data) == 44 25 lease2 = Lease.from_bytes(data) 26 assert lease2.gateway_hash == gw 27 assert lease2.tunnel_id == 99999 28 assert lease2.end_date == 1710000000000 29 30 def test_size_is_44(self): 31 from i2p_data.lease import Lease 32 lease = Lease(os.urandom(32), 1, 0) 33 assert len(lease.to_bytes()) == 44 34 35 def test_expired(self): 36 from i2p_data.lease import Lease 37 # Past timestamp 38 lease = Lease(os.urandom(32), 1, 1000) 39 assert lease.is_expired(now_ms=2000) 40 assert not lease.is_expired(now_ms=500) 41 42 def test_not_expired(self): 43 from i2p_data.lease import Lease 44 future_ms = int(time.time() * 1000) + 3600_000 45 lease = Lease(os.urandom(32), 1, future_ms) 46 assert not lease.is_expired() 47 48 def test_wrong_hash_size(self): 49 from i2p_data.lease import Lease 50 with pytest.raises(ValueError): 51 Lease(b"too short", 1, 0) 52 53 def test_equality(self): 54 from i2p_data.lease import Lease 55 gw = os.urandom(32) 56 l1 = Lease(gw, 100, 5000) 57 l2 = Lease(gw, 100, 5000) 58 l3 = Lease(gw, 200, 5000) 59 assert l1 == l2 60 assert l1 != l3 61 62 def test_from_bytes_too_short(self): 63 from i2p_data.lease import Lease 64 with pytest.raises(ValueError): 65 Lease.from_bytes(b"\x00" * 10) 66 67 68class TestLeaseSet: 69 def _make_lease_set(self, num_leases=2): 70 from i2p_data.lease import Lease, LeaseSet 71 from i2p_data.destination import Destination 72 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 73 from i2p_data.certificate import KeyCertificate 74 from i2p_crypto.dsa import SigType, KeyGenerator 75 76 # Generate signing keypair 77 pub_sig, priv_sig = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) 78 79 # Create destination 80 cert = KeyCertificate(struct.pack("!HH", 7, 4)) 81 dest = Destination( 82 PublicKey(os.urandom(32), EncType.ECIES_X25519), 83 SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519), 84 cert 85 ) 86 87 enc_key = PublicKey(os.urandom(256), EncType.ELGAMAL) 88 sign_key = SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519) 89 90 future_ms = int(time.time() * 1000) + 3600_000 91 leases = [ 92 Lease(os.urandom(32), i + 1, future_ms) 93 for i in range(num_leases) 94 ] 95 96 ls = LeaseSet(dest, enc_key, sign_key, leases) 97 ls.sign(priv_sig) 98 return ls, priv_sig 99 100 def test_sign_and_verify(self): 101 ls, _ = self._make_lease_set() 102 assert ls.verify() 103 104 def test_lease_count(self): 105 ls, _ = self._make_lease_set(3) 106 assert ls.lease_count() == 3 107 108 def test_get_lease(self): 109 ls, _ = self._make_lease_set(2) 110 lease = ls.get_lease(0) 111 assert lease.tunnel_id == 1 112 lease = ls.get_lease(1) 113 assert lease.tunnel_id == 2 114 115 def test_is_current(self): 116 ls, _ = self._make_lease_set(1) 117 assert ls.is_current() # Future expiration 118 119 def test_is_not_current_when_expired(self): 120 from i2p_data.lease import Lease, LeaseSet 121 from i2p_data.destination import Destination 122 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 123 from i2p_data.certificate import KeyCertificate 124 from i2p_crypto.dsa import SigType, KeyGenerator 125 126 pub_sig, priv_sig = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519) 127 cert = KeyCertificate(struct.pack("!HH", 7, 4)) 128 dest = Destination( 129 PublicKey(os.urandom(32), EncType.ECIES_X25519), 130 SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519), 131 cert 132 ) 133 134 # Expired lease 135 lease = Lease(os.urandom(32), 1, 1000) 136 ls = LeaseSet(dest, PublicKey(os.urandom(256), EncType.ELGAMAL), 137 SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519), 138 [lease]) 139 assert not ls.is_current(now_ms=2000) 140 141 def test_add_lease(self): 142 from i2p_data.lease import Lease 143 ls, _ = self._make_lease_set(0) 144 assert ls.lease_count() == 0 145 ls.add_lease(Lease(os.urandom(32), 42, int(time.time() * 1000) + 60000)) 146 assert ls.lease_count() == 1 147 148 def test_max_leases_enforced(self): 149 from i2p_data.lease import Lease 150 ls, _ = self._make_lease_set(16) 151 assert ls.lease_count() == 16 152 with pytest.raises(ValueError): 153 ls.add_lease(Lease(os.urandom(32), 99, 0)) 154 155 def test_verify_fails_modified(self): 156 ls, _ = self._make_lease_set() 157 # Corrupt a lease 158 ls._leases[0] = type(ls._leases[0])( 159 os.urandom(32), 9999, ls._leases[0].end_date 160 ) 161 assert not ls.verify() 162 163 def test_unsigned_verify_fails(self): 164 from i2p_data.lease import Lease, LeaseSet 165 from i2p_data.destination import Destination 166 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType 167 from i2p_data.certificate import Certificate 168 from i2p_crypto.dsa import SigType 169 170 dest = Destination( 171 PublicKey(os.urandom(256), EncType.ELGAMAL), 172 SigningPublicKey(os.urandom(128), SigType.DSA_SHA1), 173 Certificate.NULL 174 ) 175 ls = LeaseSet(dest, PublicKey(os.urandom(256), EncType.ELGAMAL), 176 SigningPublicKey(os.urandom(128), SigType.DSA_SHA1)) 177 assert not ls.verify()