A Python port of the Invisible Internet Project (I2P)
at main 207 lines 7.1 kB view raw
1"""Tests for MetaLease and MetaLeaseSet (Type 7) — TDD tests written before implementation.""" 2 3import struct 4import time 5 6import pytest 7from cryptography.hazmat.primitives.asymmetric.ed25519 import Ed25519PrivateKey 8from cryptography.hazmat.primitives.serialization import ( 9 Encoding, PublicFormat, PrivateFormat, NoEncryption, 10) 11 12from i2p_crypto.dsa import SigType, DSAEngine 13from i2p_data.key_types import ( 14 PublicKey, SigningPublicKey, SigningPrivateKey, EncType, 15) 16from i2p_data.certificate import KeyCertificate 17from i2p_data.destination import Destination 18from i2p_data.lease_set2 import MetaLease, MetaLeaseSet 19 20 21# --------------------------------------------------------------------------- 22# Helpers 23# --------------------------------------------------------------------------- 24 25def _make_ed25519_keypair(): 26 """Return (pub_bytes_32, priv_bytes_32) for Ed25519.""" 27 priv = Ed25519PrivateKey.generate() 28 pub_bytes = priv.public_key().public_bytes(Encoding.Raw, PublicFormat.Raw) 29 priv_bytes = priv.private_bytes(Encoding.Raw, PrivateFormat.Raw, NoEncryption()) 30 return pub_bytes, priv_bytes 31 32 33def _make_destination(): 34 """Create a Destination with X25519 enc + Ed25519 signing (KeyCertificate).""" 35 pub_bytes, priv_bytes = _make_ed25519_keypair() 36 sig_type = SigType.EdDSA_SHA512_Ed25519 37 38 signing_pub = SigningPublicKey(pub_bytes, sig_type=sig_type) 39 signing_priv = SigningPrivateKey(priv_bytes, sig_type=sig_type) 40 41 # KeyCert payload: sig_type_code(2) + enc_type_code(2) 42 # Ed25519 = code 7, ECIES_X25519 = code 4 43 cert = KeyCertificate(struct.pack("!HH", 7, 4)) 44 45 # ECIES_X25519 public key is 32 bytes 46 enc_key = PublicKey(b"\x00" * 32, EncType.ECIES_X25519) 47 dest = Destination(enc_key, signing_pub, cert) 48 return dest, signing_priv, priv_bytes 49 50 51def _make_meta_lease(index: int = 0, cost: int = 10, ls_type: int = 3) -> MetaLease: 52 """Create a MetaLease with deterministic data.""" 53 gw = bytes([index & 0xFF]) * 32 54 end_date = int(time.time()) + 600 # 10 min from now, seconds 55 return MetaLease( 56 gateway_hash=gw, 57 flags=0, 58 ls_type=ls_type, 59 cost=cost, 60 end_date=end_date, 61 ) 62 63 64# --------------------------------------------------------------------------- 65# Tests 66# --------------------------------------------------------------------------- 67 68class TestMetaLease: 69 70 def test_meta_lease_roundtrip(self): 71 """Single MetaLease to_bytes/from_bytes roundtrip.""" 72 ml = _make_meta_lease(index=5, cost=20, ls_type=3) 73 74 data = ml.to_bytes() 75 assert len(data) == MetaLease.SIZE 76 77 restored = MetaLease.from_bytes(data) 78 assert restored.gateway_hash == ml.gateway_hash 79 assert restored.flags == ml.flags 80 assert restored.ls_type == ml.ls_type 81 assert restored.cost == ml.cost 82 assert restored.end_date == ml.end_date 83 84 def test_meta_lease_size_40(self): 85 """Verify MetaLease.SIZE == 40.""" 86 assert MetaLease.SIZE == 40 87 ml = _make_meta_lease() 88 assert len(ml.to_bytes()) == 40 89 90 91class TestMetaLeaseSet: 92 93 def test_meta_lease_set_roundtrip(self): 94 """Create with 2 meta-leases, serialize, deserialize.""" 95 dest, signing_priv, priv_bytes = _make_destination() 96 published = int(time.time()) 97 expires = published + 600 98 99 mls = MetaLeaseSet( 100 destination=dest, 101 published=published, 102 expires=expires, 103 flags=0, 104 meta_leases=[_make_meta_lease(0), _make_meta_lease(1)], 105 ) 106 mls.sign(priv_bytes, SigType.EdDSA_SHA512_Ed25519) 107 assert mls.verify() 108 109 data = mls.to_bytes() 110 restored = MetaLeaseSet.from_bytes(data) 111 112 assert restored.destination == dest 113 assert restored.published == published 114 assert restored.expires == expires 115 assert len(restored.meta_leases) == 2 116 assert restored.meta_leases[0].gateway_hash == bytes([0]) * 32 117 assert restored.meta_leases[1].gateway_hash == bytes([1]) * 32 118 assert restored.verify() 119 120 def test_meta_lease_set_with_revocations(self): 121 """Create with 1 meta-lease + 2 revocations, verify roundtrip.""" 122 dest, signing_priv, priv_bytes = _make_destination() 123 published = int(time.time()) 124 expires = published + 600 125 126 rev1 = b"\xaa" * 32 127 rev2 = b"\xbb" * 32 128 129 mls = MetaLeaseSet( 130 destination=dest, 131 published=published, 132 expires=expires, 133 flags=0, 134 meta_leases=[_make_meta_lease(0)], 135 revocations=[rev1, rev2], 136 ) 137 mls.sign(priv_bytes, SigType.EdDSA_SHA512_Ed25519) 138 assert mls.verify() 139 140 data = mls.to_bytes() 141 restored = MetaLeaseSet.from_bytes(data) 142 143 assert len(restored.meta_leases) == 1 144 assert len(restored.revocations) == 2 145 assert restored.revocations[0] == rev1 146 assert restored.revocations[1] == rev2 147 assert restored.verify() 148 149 def test_no_encryption_keys(self): 150 """Verify MetaLeaseSet body has no enc keys section. 151 152 The body should be: options_len(2) + options + num_meta_leases(1) + 153 meta_leases + num_revocations(1) + revocations. 154 There is no num_enc_keys byte or encryption key data. 155 """ 156 dest, signing_priv, priv_bytes = _make_destination() 157 published = int(time.time()) 158 expires = published + 600 159 160 mls = MetaLeaseSet( 161 destination=dest, 162 published=published, 163 expires=expires, 164 flags=0, 165 meta_leases=[_make_meta_lease(0)], 166 ) 167 168 body = mls._body_bytes() 169 170 # Body starts with options_len (2 bytes, value 0) 171 assert body[0:2] == b"\x00\x00" 172 # Next byte is num_meta_leases (1 byte, value 1) 173 assert body[2:3] == b"\x01" 174 # Then 40 bytes of MetaLease 175 # Then num_revocations (1 byte, value 0) 176 assert body[2 + 1 + 40] == 0 # num_revocations 177 # Total body: 2 + 1 + 40 + 1 = 44 bytes 178 assert len(body) == 44 179 180 def test_cost_ordering(self): 181 """Verify lower cost is preferred (sorted ordering).""" 182 ml_high = _make_meta_lease(index=0, cost=200) 183 ml_low = _make_meta_lease(index=1, cost=10) 184 ml_mid = _make_meta_lease(index=2, cost=50) 185 186 leases = [ml_high, ml_low, ml_mid] 187 by_cost = sorted(leases, key=lambda m: m.cost) 188 assert by_cost[0].cost == 10 189 assert by_cost[1].cost == 50 190 assert by_cost[2].cost == 200 191 192 def test_type_byte_7(self): 193 """Verify signable bytes start with 0x07.""" 194 dest, signing_priv, priv_bytes = _make_destination() 195 published = int(time.time()) 196 expires = published + 600 197 198 mls = MetaLeaseSet( 199 destination=dest, 200 published=published, 201 expires=expires, 202 flags=0, 203 meta_leases=[_make_meta_lease(0)], 204 ) 205 206 signable = mls._signable_bytes() 207 assert signable[0:1] == b"\x07"