A Python port of the Invisible Internet Project (I2P)
at main 277 lines 11 kB view raw
1"""Tests for I2NP message types.""" 2 3import hashlib 4import os 5import struct 6 7import pytest 8 9 10class TestI2NPHeader: 11 def test_construct(self): 12 from i2p_data.i2np import I2NPHeader 13 h = I2NPHeader(msg_type=10, msg_id=12345, expiration=1710000000000, 14 size=100, checksum=0xAB) 15 assert h.msg_type == 10 16 assert h.msg_id == 12345 17 assert h.expiration == 1710000000000 18 assert h.size == 100 19 assert h.checksum == 0xAB 20 21 def test_header_size_is_16(self): 22 from i2p_data.i2np import I2NPHeader 23 h = I2NPHeader(msg_type=1, msg_id=0, expiration=0, size=0, checksum=0) 24 assert len(h.to_bytes()) == 16 25 26 def test_roundtrip(self): 27 from i2p_data.i2np import I2NPHeader 28 h = I2NPHeader(msg_type=20, msg_id=0xDEADBEEF, expiration=9999999999999, 29 size=512, checksum=0xFF) 30 data = h.to_bytes() 31 h2 = I2NPHeader.from_bytes(data) 32 assert h2.msg_type == h.msg_type 33 assert h2.msg_id == h.msg_id 34 assert h2.expiration == h.expiration 35 assert h2.size == h.size 36 assert h2.checksum == h.checksum 37 38 def test_from_stream(self): 39 import io 40 from i2p_data.i2np import I2NPHeader 41 h = I2NPHeader(msg_type=2, msg_id=42, expiration=1000, size=10, checksum=5) 42 stream = io.BytesIO(h.to_bytes() + b"extra data") 43 h2 = I2NPHeader.from_stream(stream) 44 assert h2.msg_type == 2 45 assert h2.msg_id == 42 46 # Stream should have advanced past header 47 assert stream.read() == b"extra data" 48 49 def test_from_bytes_truncated(self): 50 from i2p_data.i2np import I2NPHeader 51 with pytest.raises(ValueError): 52 I2NPHeader.from_bytes(b"\x00" * 10) 53 54 55class TestChecksum: 56 def test_calculate_checksum(self): 57 from i2p_data.i2np import I2NPMessage 58 body = b"hello world" 59 expected = hashlib.sha256(body).digest()[0] 60 assert I2NPMessage.calculate_checksum(body) == expected 61 62 def test_checksum_empty_body(self): 63 from i2p_data.i2np import I2NPMessage 64 expected = hashlib.sha256(b"").digest()[0] 65 assert I2NPMessage.calculate_checksum(b"") == expected 66 67 68class TestTypeRegistry: 69 def test_delivery_status_registered(self): 70 from i2p_data.i2np import I2NPMessage, DeliveryStatusMessage 71 assert I2NPMessage._registry[10] is DeliveryStatusMessage 72 73 def test_data_message_registered(self): 74 from i2p_data.i2np import I2NPMessage, DataMessage 75 assert I2NPMessage._registry[20] is DataMessage 76 77 def test_database_store_registered(self): 78 from i2p_data.i2np import I2NPMessage, DatabaseStoreMessage 79 assert I2NPMessage._registry[1] is DatabaseStoreMessage 80 81 def test_database_lookup_registered(self): 82 from i2p_data.i2np import I2NPMessage, DatabaseLookupMessage 83 assert I2NPMessage._registry[2] is DatabaseLookupMessage 84 85 def test_unknown_type(self): 86 from i2p_data.i2np import I2NPMessage, I2NPHeader 87 # Build a message with unknown type 255 88 body = b"\x00" * 4 89 checksum = I2NPMessage.calculate_checksum(body) 90 header = I2NPHeader(msg_type=255, msg_id=1, expiration=0, 91 size=len(body), checksum=checksum) 92 data = header.to_bytes() + body 93 with pytest.raises(ValueError, match="Unknown"): 94 I2NPMessage.from_bytes(data) 95 96 97class TestDeliveryStatusMessage: 98 def test_construct(self): 99 from i2p_data.i2np import DeliveryStatusMessage 100 msg = DeliveryStatusMessage(msg_id=1234, arrival_time=1710000000000) 101 assert msg.msg_id == 1234 102 assert msg.arrival_time == 1710000000000 103 assert msg.TYPE == 10 104 105 def test_body_size(self): 106 from i2p_data.i2np import DeliveryStatusMessage 107 msg = DeliveryStatusMessage(msg_id=1, arrival_time=0) 108 assert len(msg.body_bytes()) == 12 # 4 + 8 109 110 def test_roundtrip(self): 111 from i2p_data.i2np import DeliveryStatusMessage, I2NPMessage 112 msg = DeliveryStatusMessage(msg_id=0xCAFE, arrival_time=1710000000000) 113 data = msg.to_bytes() 114 msg2 = I2NPMessage.from_bytes(data) 115 assert isinstance(msg2, DeliveryStatusMessage) 116 assert msg2.msg_id == 0xCAFE 117 assert msg2.arrival_time == 1710000000000 118 119 def test_checksum_validated(self): 120 from i2p_data.i2np import DeliveryStatusMessage, I2NPMessage 121 msg = DeliveryStatusMessage(msg_id=1, arrival_time=100) 122 data = bytearray(msg.to_bytes()) 123 # Corrupt the checksum byte (byte 15 in header) 124 data[15] ^= 0xFF 125 with pytest.raises(ValueError, match="[Cc]hecksum"): 126 I2NPMessage.from_bytes(bytes(data)) 127 128 129class TestDataMessage: 130 def test_construct(self): 131 from i2p_data.i2np import DataMessage 132 payload = b"hello i2p" 133 msg = DataMessage(payload=payload) 134 assert msg.payload == payload 135 assert msg.TYPE == 20 136 137 def test_body_format(self): 138 from i2p_data.i2np import DataMessage 139 payload = b"\x01\x02\x03" 140 msg = DataMessage(payload=payload) 141 body = msg.body_bytes() 142 length = struct.unpack("!I", body[:4])[0] 143 assert length == 3 144 assert body[4:] == payload 145 146 def test_roundtrip(self): 147 from i2p_data.i2np import DataMessage, I2NPMessage 148 payload = os.urandom(128) 149 msg = DataMessage(payload=payload) 150 data = msg.to_bytes() 151 msg2 = I2NPMessage.from_bytes(data) 152 assert isinstance(msg2, DataMessage) 153 assert msg2.payload == payload 154 155 def test_empty_payload(self): 156 from i2p_data.i2np import DataMessage, I2NPMessage 157 msg = DataMessage(payload=b"") 158 data = msg.to_bytes() 159 msg2 = I2NPMessage.from_bytes(data) 160 assert isinstance(msg2, DataMessage) 161 assert msg2.payload == b"" 162 163 164class TestDatabaseStoreMessage: 165 def test_construct(self): 166 from i2p_data.i2np import DatabaseStoreMessage 167 key = os.urandom(32) 168 msg = DatabaseStoreMessage(key=key, ds_type=1, reply_token=42, 169 data=b"some data") 170 assert msg.key == key 171 assert msg.ds_type == 1 172 assert msg.reply_token == 42 173 assert msg.data == b"some data" 174 assert msg.TYPE == 1 175 176 def test_roundtrip(self): 177 from i2p_data.i2np import DatabaseStoreMessage, I2NPMessage 178 key = os.urandom(32) 179 payload = os.urandom(64) 180 msg = DatabaseStoreMessage(key=key, ds_type=0, reply_token=9999, 181 data=payload) 182 data = msg.to_bytes() 183 msg2 = I2NPMessage.from_bytes(data) 184 assert isinstance(msg2, DatabaseStoreMessage) 185 assert msg2.key == key 186 assert msg2.ds_type == 0 187 assert msg2.reply_token == 9999 188 assert msg2.data == payload 189 190 def test_body_structure(self): 191 from i2p_data.i2np import DatabaseStoreMessage 192 key = os.urandom(32) 193 msg = DatabaseStoreMessage(key=key, ds_type=3, reply_token=0, 194 data=b"xyz") 195 body = msg.body_bytes() 196 assert body[:32] == key 197 assert body[32] == 3 # type byte 198 assert struct.unpack("!I", body[33:37])[0] == 0 # reply_token 199 assert body[37:] == b"xyz" 200 201 202class TestDatabaseLookupMessage: 203 def test_construct_no_tunnel(self): 204 from i2p_data.i2np import DatabaseLookupMessage 205 key = os.urandom(32) 206 from_hash = os.urandom(32) 207 msg = DatabaseLookupMessage(key=key, from_hash=from_hash, flags=0, 208 reply_tunnel_id=0, exclude_list=[]) 209 assert msg.key == key 210 assert msg.from_hash == from_hash 211 assert msg.flags == 0 212 assert msg.TYPE == 2 213 214 def test_roundtrip_no_excludes(self): 215 from i2p_data.i2np import DatabaseLookupMessage, I2NPMessage 216 key = os.urandom(32) 217 from_hash = os.urandom(32) 218 msg = DatabaseLookupMessage(key=key, from_hash=from_hash, flags=0, 219 reply_tunnel_id=0, exclude_list=[]) 220 data = msg.to_bytes() 221 msg2 = I2NPMessage.from_bytes(data) 222 assert isinstance(msg2, DatabaseLookupMessage) 223 assert msg2.key == key 224 assert msg2.from_hash == from_hash 225 assert msg2.flags == 0 226 assert msg2.exclude_list == [] 227 228 def test_roundtrip_with_tunnel_and_excludes(self): 229 from i2p_data.i2np import DatabaseLookupMessage, I2NPMessage 230 key = os.urandom(32) 231 from_hash = os.urandom(32) 232 excludes = [os.urandom(32) for _ in range(3)] 233 # Flag bit 0 set means reply to tunnel 234 msg = DatabaseLookupMessage(key=key, from_hash=from_hash, flags=0x01, 235 reply_tunnel_id=5555, exclude_list=excludes) 236 data = msg.to_bytes() 237 msg2 = I2NPMessage.from_bytes(data) 238 assert isinstance(msg2, DatabaseLookupMessage) 239 assert msg2.key == key 240 assert msg2.from_hash == from_hash 241 assert msg2.flags == 0x01 242 assert msg2.reply_tunnel_id == 5555 243 assert len(msg2.exclude_list) == 3 244 for i in range(3): 245 assert msg2.exclude_list[i] == excludes[i] 246 247 def test_body_structure_with_tunnel(self): 248 from i2p_data.i2np import DatabaseLookupMessage 249 key = os.urandom(32) 250 from_hash = os.urandom(32) 251 msg = DatabaseLookupMessage(key=key, from_hash=from_hash, flags=0x01, 252 reply_tunnel_id=100, exclude_list=[]) 253 body = msg.body_bytes() 254 assert body[:32] == key 255 assert body[32:64] == from_hash 256 assert body[64] == 0x01 # flags 257 # When flag bit 0 set, next 4 bytes are tunnel ID 258 assert struct.unpack("!I", body[65:69])[0] == 100 259 assert body[69] == 0 # exclude count 260 261 262class TestI2NPMessageBase: 263 def test_from_bytes_truncated_header(self): 264 from i2p_data.i2np import I2NPMessage 265 with pytest.raises(ValueError): 266 I2NPMessage.from_bytes(b"\x00" * 5) 267 268 def test_from_bytes_truncated_body(self): 269 from i2p_data.i2np import I2NPMessage, I2NPHeader 270 # Header says size=100 but body is only 2 bytes 271 body = b"\x00\x00" 272 checksum = I2NPMessage.calculate_checksum(body) 273 header = I2NPHeader(msg_type=10, msg_id=1, expiration=0, 274 size=100, checksum=checksum) 275 data = header.to_bytes() + body 276 with pytest.raises(ValueError): 277 I2NPMessage.from_bytes(data)