A Python port of the Invisible Internet Project (I2P)
at main 237 lines 7.5 kB view raw
1"""Tier 4 protocol gap tests: BlindingInfoMessage (Type 42).""" 2 3import os 4import struct 5import pytest 6 7from i2p_client.i2cp_messages import I2CPMessage, BlindingInfoMessage 8 9 10class TestBlindingInfoMessageConstants: 11 """Verify type code and endpoint/auth constants.""" 12 13 def test_type_code(self): 14 assert BlindingInfoMessage.TYPE == 42 15 16 def test_endpoint_constants(self): 17 assert BlindingInfoMessage.ENDPOINT_HASH == 0 18 assert BlindingInfoMessage.ENDPOINT_HOST == 1 19 assert BlindingInfoMessage.ENDPOINT_DEST == 2 20 assert BlindingInfoMessage.ENDPOINT_PUBKEY == 3 21 22 def test_auth_constants(self): 23 assert BlindingInfoMessage.AUTH_NONE == 0 24 assert BlindingInfoMessage.AUTH_DH == 1 25 assert BlindingInfoMessage.AUTH_PSK == 2 26 27 28class TestBlindingInfoHashNoAuth: 29 """Endpoint type 0 (hash), no auth.""" 30 31 def test_roundtrip(self): 32 hash_data = os.urandom(32) 33 msg = BlindingInfoMessage( 34 session_id=1, 35 endpoint_type=0, 36 auth_type=0, 37 blind_type=11, # Ed25519 38 expiration=0, 39 endpoint_data=hash_data, 40 auth_key=None, 41 ) 42 wire = msg.to_wire() 43 msg2 = I2CPMessage.from_wire(wire) 44 assert isinstance(msg2, BlindingInfoMessage) 45 assert msg2.session_id == 1 46 assert msg2.endpoint_type == 0 47 assert msg2.auth_type == 0 48 assert msg2.blind_type == 11 49 assert msg2.expiration == 0 50 assert msg2.endpoint_data == hash_data 51 assert msg2.auth_key is None 52 53 def test_wire_layout(self): 54 hash_data = b"\xaa" * 32 55 msg = BlindingInfoMessage( 56 session_id=0x0102, 57 endpoint_type=0, 58 auth_type=0, 59 blind_type=11, 60 expiration=1000, 61 endpoint_data=hash_data, 62 ) 63 payload = msg.payload_bytes() 64 # session_id(2) + endpoint_type(1) + auth_type(1) + blind_type(2) + expiration(4) + hash(32) = 42 65 assert len(payload) == 42 66 assert struct.unpack("!H", payload[0:2])[0] == 0x0102 # session_id 67 assert payload[2] == 0 # endpoint_type 68 assert payload[3] == 0 # auth_type 69 assert struct.unpack("!H", payload[4:6])[0] == 11 # blind_type 70 assert struct.unpack("!I", payload[6:10])[0] == 1000 # expiration 71 assert payload[10:42] == hash_data 72 73 74class TestBlindingInfoHostDH: 75 """Endpoint type 1 (host), DH auth.""" 76 77 def test_roundtrip(self): 78 hostname = b"\x08test.i2p" # 1-byte len + hostname 79 dh_key = os.urandom(32) 80 msg = BlindingInfoMessage( 81 session_id=5, 82 endpoint_type=1, 83 auth_type=1, 84 blind_type=7, # Ed25519-SHA-512 85 expiration=999999, 86 endpoint_data=hostname, 87 auth_key=dh_key, 88 ) 89 wire = msg.to_wire() 90 msg2 = I2CPMessage.from_wire(wire) 91 assert isinstance(msg2, BlindingInfoMessage) 92 assert msg2.session_id == 5 93 assert msg2.endpoint_type == 1 94 assert msg2.auth_type == 1 95 assert msg2.endpoint_data == hostname 96 assert msg2.auth_key == dh_key 97 98 def test_wire_includes_auth_key(self): 99 hostname = b"\x04test" 100 dh_key = b"\xbb" * 32 101 msg = BlindingInfoMessage( 102 session_id=1, 103 endpoint_type=1, 104 auth_type=1, 105 blind_type=11, 106 expiration=0, 107 endpoint_data=hostname, 108 auth_key=dh_key, 109 ) 110 payload = msg.payload_bytes() 111 # header(10) + hostname(5) + dh_key(32) = 47 112 assert len(payload) == 47 113 assert payload[-32:] == dh_key 114 115 116class TestBlindingInfoDestPSK: 117 """Endpoint type 2 (dest), PSK auth.""" 118 119 def test_roundtrip(self): 120 # Valid dest: 384 key bytes + null cert 121 dest_data = os.urandom(384) + b"\x00\x00\x00" 122 psk = os.urandom(32) 123 msg = BlindingInfoMessage( 124 session_id=10, 125 endpoint_type=2, 126 auth_type=2, 127 blind_type=11, 128 expiration=500000, 129 endpoint_data=dest_data, 130 auth_key=psk, 131 ) 132 wire = msg.to_wire() 133 msg2 = I2CPMessage.from_wire(wire) 134 assert isinstance(msg2, BlindingInfoMessage) 135 assert msg2.endpoint_type == 2 136 assert msg2.auth_type == 2 137 assert msg2.endpoint_data == dest_data 138 assert msg2.auth_key == psk 139 140 141class TestBlindingInfoPubkey: 142 """Endpoint type 3 (pubkey): SigType(2) + key bytes.""" 143 144 def test_roundtrip(self): 145 # SigType 7 (Ed25519) = 32-byte key 146 sig_type = struct.pack("!H", 7) 147 key_bytes = os.urandom(32) 148 endpoint_data = sig_type + key_bytes 149 msg = BlindingInfoMessage( 150 session_id=3, 151 endpoint_type=3, 152 auth_type=0, 153 blind_type=7, 154 expiration=0, 155 endpoint_data=endpoint_data, 156 ) 157 wire = msg.to_wire() 158 msg2 = I2CPMessage.from_wire(wire) 159 assert isinstance(msg2, BlindingInfoMessage) 160 assert msg2.endpoint_type == 3 161 assert msg2.endpoint_data == endpoint_data 162 163 164class TestBlindingInfoDefaultExpiration: 165 """Expiration=0 means use default.""" 166 167 def test_zero_expiration(self): 168 msg = BlindingInfoMessage( 169 session_id=1, 170 endpoint_type=0, 171 auth_type=0, 172 blind_type=11, 173 expiration=0, 174 endpoint_data=os.urandom(32), 175 ) 176 wire = msg.to_wire() 177 msg2 = I2CPMessage.from_wire(wire) 178 assert msg2.expiration == 0 179 180 181class TestBlindingInfoRegistry: 182 """Type 42 is registered in the I2CP message registry.""" 183 184 def test_registry(self): 185 assert I2CPMessage._registry.get(42) is BlindingInfoMessage 186 187 def test_factory_dispatch(self): 188 msg = BlindingInfoMessage( 189 session_id=1, 190 endpoint_type=0, 191 auth_type=0, 192 blind_type=11, 193 expiration=0, 194 endpoint_data=os.urandom(32), 195 ) 196 wire = msg.to_wire() 197 msg2 = I2CPMessage.from_wire(wire) 198 assert type(msg2).__name__ == "BlindingInfoMessage" 199 200 201@pytest.mark.parametrize("endpoint_type,auth_type", [ 202 (0, 0), (0, 1), (0, 2), 203 (1, 0), (1, 1), (1, 2), 204 (3, 0), (3, 1), (3, 2), 205]) 206def test_blinding_info_roundtrip_combos(endpoint_type, auth_type): 207 """Parametrized roundtrip over endpoint/auth combinations (excluding type 2 dest for simplicity).""" 208 if endpoint_type == 0: 209 endpoint_data = os.urandom(32) 210 elif endpoint_type == 1: 211 hostname = b"example.i2p" 212 endpoint_data = bytes([len(hostname)]) + hostname 213 elif endpoint_type == 3: 214 endpoint_data = struct.pack("!H", 7) + os.urandom(32) 215 else: 216 endpoint_data = os.urandom(32) 217 218 auth_key = os.urandom(32) if auth_type != 0 else None 219 220 msg = BlindingInfoMessage( 221 session_id=42, 222 endpoint_type=endpoint_type, 223 auth_type=auth_type, 224 blind_type=11, 225 expiration=123456, 226 endpoint_data=endpoint_data, 227 auth_key=auth_key, 228 ) 229 wire = msg.to_wire() 230 msg2 = I2CPMessage.from_wire(wire) 231 assert msg2.session_id == 42 232 assert msg2.endpoint_type == endpoint_type 233 assert msg2.auth_type == auth_type 234 assert msg2.blind_type == 11 235 assert msg2.expiration == 123456 236 assert msg2.endpoint_data == endpoint_data 237 assert msg2.auth_key == auth_key