A Python port of the Invisible Internet Project (I2P)
at main 185 lines 6.7 kB view raw
1"""Tests for I2CP Tier 3: lease set message types.""" 2 3import struct 4import os 5 6from i2p_client.i2cp_messages import ( 7 I2CPMessage, 8 RequestLeaseSetMessage, 9 RequestVariableLeaseSetMessage, 10 CreateLeaseSetMessage, 11 CreateLeaseSet2Message, 12) 13 14 15# --- RequestLeaseSetMessage (Type 21) --- 16 17class TestRequestLeaseSetMessage: 18 def test_roundtrip_single_tunnel(self): 19 router_hash = os.urandom(32) 20 msg = RequestLeaseSetMessage( 21 session_id=1, tunnels=[(router_hash, 9999)], end_date=1700000000000) 22 wire = msg.to_wire() 23 msg2 = I2CPMessage.from_wire(wire) 24 assert isinstance(msg2, RequestLeaseSetMessage) 25 assert msg2.session_id == 1 26 assert len(msg2.tunnels) == 1 27 assert msg2.tunnels[0][0] == router_hash 28 assert msg2.tunnels[0][1] == 9999 29 assert msg2.end_date == 1700000000000 30 31 def test_roundtrip_multiple_tunnels(self): 32 tunnels = [(os.urandom(32), i * 100) for i in range(5)] 33 msg = RequestLeaseSetMessage( 34 session_id=42, tunnels=tunnels, end_date=9999999999999) 35 wire = msg.to_wire() 36 msg2 = I2CPMessage.from_wire(wire) 37 assert msg2.session_id == 42 38 assert len(msg2.tunnels) == 5 39 for orig, parsed in zip(tunnels, msg2.tunnels): 40 assert parsed[0] == orig[0] 41 assert parsed[1] == orig[1] 42 43 def test_end_date_encoding(self): 44 """end_date is an 8-byte big-endian value at the end of the payload.""" 45 msg = RequestLeaseSetMessage( 46 session_id=0, tunnels=[(os.urandom(32), 1)], end_date=0xDEADBEEFCAFE) 47 payload = msg.payload_bytes() 48 end_date = struct.unpack("!Q", payload[-8:])[0] 49 assert end_date == 0xDEADBEEFCAFE 50 51 def test_type_code(self): 52 assert RequestLeaseSetMessage.TYPE == 21 53 54 def test_zero_tunnels(self): 55 msg = RequestLeaseSetMessage( 56 session_id=7, tunnels=[], end_date=100) 57 wire = msg.to_wire() 58 msg2 = I2CPMessage.from_wire(wire) 59 assert msg2.tunnels == [] 60 assert msg2.end_date == 100 61 62 63# --- RequestVariableLeaseSetMessage (Type 37) --- 64 65class TestRequestVariableLeaseSetMessage: 66 def test_roundtrip_single_lease(self): 67 gw = os.urandom(32) 68 msg = RequestVariableLeaseSetMessage( 69 session_id=3, leases=[(gw, 555, 1700000000000)]) 70 wire = msg.to_wire() 71 msg2 = I2CPMessage.from_wire(wire) 72 assert isinstance(msg2, RequestVariableLeaseSetMessage) 73 assert msg2.session_id == 3 74 assert len(msg2.leases) == 1 75 assert msg2.leases[0] == (gw, 555, 1700000000000) 76 77 def test_roundtrip_multiple_leases(self): 78 leases = [ 79 (os.urandom(32), i * 10, 1700000000000 + i * 60000) 80 for i in range(4) 81 ] 82 msg = RequestVariableLeaseSetMessage(session_id=10, leases=leases) 83 wire = msg.to_wire() 84 msg2 = I2CPMessage.from_wire(wire) 85 assert msg2.session_id == 10 86 assert len(msg2.leases) == 4 87 for orig, parsed in zip(leases, msg2.leases): 88 assert parsed == orig 89 90 def test_type_code(self): 91 assert RequestVariableLeaseSetMessage.TYPE == 37 92 93 94# --- CreateLeaseSetMessage (Type 4) --- 95 96class TestCreateLeaseSetMessage: 97 def test_roundtrip(self): 98 spk = os.urandom(64) 99 pk = os.urandom(32) 100 ls_data = os.urandom(200) 101 msg = CreateLeaseSetMessage( 102 session_id=5, signing_private_key=spk, 103 private_key=pk, lease_set_data=ls_data) 104 wire = msg.to_wire() 105 msg2 = I2CPMessage.from_wire(wire) 106 assert isinstance(msg2, CreateLeaseSetMessage) 107 assert msg2.session_id == 5 108 assert msg2.signing_private_key == spk 109 assert msg2.private_key == pk 110 assert msg2.lease_set_data == ls_data 111 112 def test_key_length_fields(self): 113 """Key lengths are encoded as 2-byte big-endian prefixes.""" 114 spk = os.urandom(48) 115 pk = os.urandom(128) 116 msg = CreateLeaseSetMessage( 117 session_id=0, signing_private_key=spk, 118 private_key=pk, lease_set_data=b"") 119 payload = msg.payload_bytes() 120 # session_id(2) then spk_len(2) 121 spk_len = struct.unpack("!H", payload[2:4])[0] 122 assert spk_len == 48 123 # After spk, pk_len(2) 124 pk_offset = 4 + 48 125 pk_len = struct.unpack("!H", payload[pk_offset:pk_offset + 2])[0] 126 assert pk_len == 128 127 128 def test_type_code(self): 129 assert CreateLeaseSetMessage.TYPE == 4 130 131 def test_empty_lease_set_data(self): 132 msg = CreateLeaseSetMessage( 133 session_id=1, signing_private_key=b"\x01" * 20, 134 private_key=b"\x02" * 10, lease_set_data=b"") 135 wire = msg.to_wire() 136 msg2 = I2CPMessage.from_wire(wire) 137 assert msg2.lease_set_data == b"" 138 assert msg2.signing_private_key == b"\x01" * 20 139 assert msg2.private_key == b"\x02" * 10 140 141 142# --- CreateLeaseSet2Message (Type 41) --- 143 144class TestCreateLeaseSet2Message: 145 def test_roundtrip(self): 146 ls_data = os.urandom(300) 147 keys = [(0, os.urandom(32)), (4, os.urandom(64))] 148 msg = CreateLeaseSet2Message( 149 session_id=8, ls_type=CreateLeaseSet2Message.LS_TYPE_LS2, 150 lease_set_data=ls_data, private_keys=keys) 151 wire = msg.to_wire() 152 msg2 = I2CPMessage.from_wire(wire) 153 assert isinstance(msg2, CreateLeaseSet2Message) 154 assert msg2.session_id == 8 155 assert msg2.ls_type == 3 156 assert msg2.lease_set_data == ls_data 157 assert len(msg2.private_keys) == 2 158 assert msg2.private_keys[0] == keys[0] 159 assert msg2.private_keys[1] == keys[1] 160 161 def test_type_code(self): 162 assert CreateLeaseSet2Message.TYPE == 41 163 164 def test_multiple_private_keys(self): 165 keys = [(enc_type, os.urandom(16 + enc_type)) 166 for enc_type in (0, 4, 6)] 167 msg = CreateLeaseSet2Message( 168 session_id=2, ls_type=1, 169 lease_set_data=b"test", private_keys=keys) 170 wire = msg.to_wire() 171 msg2 = I2CPMessage.from_wire(wire) 172 assert len(msg2.private_keys) == 3 173 for orig, parsed in zip(keys, msg2.private_keys): 174 assert parsed[0] == orig[0] 175 assert parsed[1] == orig[1] 176 177 def test_ls_type_constants(self): 178 assert CreateLeaseSet2Message.LS_TYPE_LEASESET == 1 179 assert CreateLeaseSet2Message.LS_TYPE_LS2 == 3 180 assert CreateLeaseSet2Message.LS_TYPE_ENCRYPTED_LS2 == 5 181 assert CreateLeaseSet2Message.LS_TYPE_META_LS2 == 7 182 183 def test_registry_lookup(self): 184 """Type 41 should be in the I2CP registry.""" 185 assert I2CPMessage._registry[41] is CreateLeaseSet2Message