A Python port of the Invisible Internet Project (I2P)
1"""Tests for Lease and LeaseSet."""
2
3import os
4import struct
5import time
6
7import pytest
8
9
10class TestLease:
11 def test_construct(self):
12 from i2p_data.lease import Lease
13 gw = os.urandom(32)
14 lease = Lease(gw, 12345, 1710000000000)
15 assert lease.gateway_hash == gw
16 assert lease.tunnel_id == 12345
17 assert lease.end_date == 1710000000000
18
19 def test_roundtrip(self):
20 from i2p_data.lease import Lease
21 gw = os.urandom(32)
22 lease = Lease(gw, 99999, 1710000000000)
23 data = lease.to_bytes()
24 assert len(data) == 44
25 lease2 = Lease.from_bytes(data)
26 assert lease2.gateway_hash == gw
27 assert lease2.tunnel_id == 99999
28 assert lease2.end_date == 1710000000000
29
30 def test_size_is_44(self):
31 from i2p_data.lease import Lease
32 lease = Lease(os.urandom(32), 1, 0)
33 assert len(lease.to_bytes()) == 44
34
35 def test_expired(self):
36 from i2p_data.lease import Lease
37 # Past timestamp
38 lease = Lease(os.urandom(32), 1, 1000)
39 assert lease.is_expired(now_ms=2000)
40 assert not lease.is_expired(now_ms=500)
41
42 def test_not_expired(self):
43 from i2p_data.lease import Lease
44 future_ms = int(time.time() * 1000) + 3600_000
45 lease = Lease(os.urandom(32), 1, future_ms)
46 assert not lease.is_expired()
47
48 def test_wrong_hash_size(self):
49 from i2p_data.lease import Lease
50 with pytest.raises(ValueError):
51 Lease(b"too short", 1, 0)
52
53 def test_equality(self):
54 from i2p_data.lease import Lease
55 gw = os.urandom(32)
56 l1 = Lease(gw, 100, 5000)
57 l2 = Lease(gw, 100, 5000)
58 l3 = Lease(gw, 200, 5000)
59 assert l1 == l2
60 assert l1 != l3
61
62 def test_from_bytes_too_short(self):
63 from i2p_data.lease import Lease
64 with pytest.raises(ValueError):
65 Lease.from_bytes(b"\x00" * 10)
66
67
68class TestLeaseSet:
69 def _make_lease_set(self, num_leases=2):
70 from i2p_data.lease import Lease, LeaseSet
71 from i2p_data.destination import Destination
72 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType
73 from i2p_data.certificate import KeyCertificate
74 from i2p_crypto.dsa import SigType, KeyGenerator
75
76 # Generate signing keypair
77 pub_sig, priv_sig = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519)
78
79 # Create destination
80 cert = KeyCertificate(struct.pack("!HH", 7, 4))
81 dest = Destination(
82 PublicKey(os.urandom(32), EncType.ECIES_X25519),
83 SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519),
84 cert
85 )
86
87 enc_key = PublicKey(os.urandom(256), EncType.ELGAMAL)
88 sign_key = SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519)
89
90 future_ms = int(time.time() * 1000) + 3600_000
91 leases = [
92 Lease(os.urandom(32), i + 1, future_ms)
93 for i in range(num_leases)
94 ]
95
96 ls = LeaseSet(dest, enc_key, sign_key, leases)
97 ls.sign(priv_sig)
98 return ls, priv_sig
99
100 def test_sign_and_verify(self):
101 ls, _ = self._make_lease_set()
102 assert ls.verify()
103
104 def test_lease_count(self):
105 ls, _ = self._make_lease_set(3)
106 assert ls.lease_count() == 3
107
108 def test_get_lease(self):
109 ls, _ = self._make_lease_set(2)
110 lease = ls.get_lease(0)
111 assert lease.tunnel_id == 1
112 lease = ls.get_lease(1)
113 assert lease.tunnel_id == 2
114
115 def test_is_current(self):
116 ls, _ = self._make_lease_set(1)
117 assert ls.is_current() # Future expiration
118
119 def test_is_not_current_when_expired(self):
120 from i2p_data.lease import Lease, LeaseSet
121 from i2p_data.destination import Destination
122 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType
123 from i2p_data.certificate import KeyCertificate
124 from i2p_crypto.dsa import SigType, KeyGenerator
125
126 pub_sig, priv_sig = KeyGenerator.generate(SigType.EdDSA_SHA512_Ed25519)
127 cert = KeyCertificate(struct.pack("!HH", 7, 4))
128 dest = Destination(
129 PublicKey(os.urandom(32), EncType.ECIES_X25519),
130 SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519),
131 cert
132 )
133
134 # Expired lease
135 lease = Lease(os.urandom(32), 1, 1000)
136 ls = LeaseSet(dest, PublicKey(os.urandom(256), EncType.ELGAMAL),
137 SigningPublicKey(pub_sig, SigType.EdDSA_SHA512_Ed25519),
138 [lease])
139 assert not ls.is_current(now_ms=2000)
140
141 def test_add_lease(self):
142 from i2p_data.lease import Lease
143 ls, _ = self._make_lease_set(0)
144 assert ls.lease_count() == 0
145 ls.add_lease(Lease(os.urandom(32), 42, int(time.time() * 1000) + 60000))
146 assert ls.lease_count() == 1
147
148 def test_max_leases_enforced(self):
149 from i2p_data.lease import Lease
150 ls, _ = self._make_lease_set(16)
151 assert ls.lease_count() == 16
152 with pytest.raises(ValueError):
153 ls.add_lease(Lease(os.urandom(32), 99, 0))
154
155 def test_verify_fails_modified(self):
156 ls, _ = self._make_lease_set()
157 # Corrupt a lease
158 ls._leases[0] = type(ls._leases[0])(
159 os.urandom(32), 9999, ls._leases[0].end_date
160 )
161 assert not ls.verify()
162
163 def test_unsigned_verify_fails(self):
164 from i2p_data.lease import Lease, LeaseSet
165 from i2p_data.destination import Destination
166 from i2p_data.key_types import PublicKey, SigningPublicKey, EncType
167 from i2p_data.certificate import Certificate
168 from i2p_crypto.dsa import SigType
169
170 dest = Destination(
171 PublicKey(os.urandom(256), EncType.ELGAMAL),
172 SigningPublicKey(os.urandom(128), SigType.DSA_SHA1),
173 Certificate.NULL
174 )
175 ls = LeaseSet(dest, PublicKey(os.urandom(256), EncType.ELGAMAL),
176 SigningPublicKey(os.urandom(128), SigType.DSA_SHA1))
177 assert not ls.verify()