"""Tier 4 protocol gap tests: BlindingInfoMessage (Type 42).""" import os import struct import pytest from i2p_client.i2cp_messages import I2CPMessage, BlindingInfoMessage class TestBlindingInfoMessageConstants: """Verify type code and endpoint/auth constants.""" def test_type_code(self): assert BlindingInfoMessage.TYPE == 42 def test_endpoint_constants(self): assert BlindingInfoMessage.ENDPOINT_HASH == 0 assert BlindingInfoMessage.ENDPOINT_HOST == 1 assert BlindingInfoMessage.ENDPOINT_DEST == 2 assert BlindingInfoMessage.ENDPOINT_PUBKEY == 3 def test_auth_constants(self): assert BlindingInfoMessage.AUTH_NONE == 0 assert BlindingInfoMessage.AUTH_DH == 1 assert BlindingInfoMessage.AUTH_PSK == 2 class TestBlindingInfoHashNoAuth: """Endpoint type 0 (hash), no auth.""" def test_roundtrip(self): hash_data = os.urandom(32) msg = BlindingInfoMessage( session_id=1, endpoint_type=0, auth_type=0, blind_type=11, # Ed25519 expiration=0, endpoint_data=hash_data, auth_key=None, ) wire = msg.to_wire() msg2 = I2CPMessage.from_wire(wire) assert isinstance(msg2, BlindingInfoMessage) assert msg2.session_id == 1 assert msg2.endpoint_type == 0 assert msg2.auth_type == 0 assert msg2.blind_type == 11 assert msg2.expiration == 0 assert msg2.endpoint_data == hash_data assert msg2.auth_key is None def test_wire_layout(self): hash_data = b"\xaa" * 32 msg = BlindingInfoMessage( session_id=0x0102, endpoint_type=0, auth_type=0, blind_type=11, expiration=1000, endpoint_data=hash_data, ) payload = msg.payload_bytes() # session_id(2) + endpoint_type(1) + auth_type(1) + blind_type(2) + expiration(4) + hash(32) = 42 assert len(payload) == 42 assert struct.unpack("!H", payload[0:2])[0] == 0x0102 # session_id assert payload[2] == 0 # endpoint_type assert payload[3] == 0 # auth_type assert struct.unpack("!H", payload[4:6])[0] == 11 # blind_type assert struct.unpack("!I", payload[6:10])[0] == 1000 # expiration assert payload[10:42] == hash_data class TestBlindingInfoHostDH: """Endpoint type 1 (host), DH auth.""" def test_roundtrip(self): hostname = b"\x08test.i2p" # 1-byte len + hostname dh_key = os.urandom(32) msg = BlindingInfoMessage( session_id=5, endpoint_type=1, auth_type=1, blind_type=7, # Ed25519-SHA-512 expiration=999999, endpoint_data=hostname, auth_key=dh_key, ) wire = msg.to_wire() msg2 = I2CPMessage.from_wire(wire) assert isinstance(msg2, BlindingInfoMessage) assert msg2.session_id == 5 assert msg2.endpoint_type == 1 assert msg2.auth_type == 1 assert msg2.endpoint_data == hostname assert msg2.auth_key == dh_key def test_wire_includes_auth_key(self): hostname = b"\x04test" dh_key = b"\xbb" * 32 msg = BlindingInfoMessage( session_id=1, endpoint_type=1, auth_type=1, blind_type=11, expiration=0, endpoint_data=hostname, auth_key=dh_key, ) payload = msg.payload_bytes() # header(10) + hostname(5) + dh_key(32) = 47 assert len(payload) == 47 assert payload[-32:] == dh_key class TestBlindingInfoDestPSK: """Endpoint type 2 (dest), PSK auth.""" def test_roundtrip(self): # Valid dest: 384 key bytes + null cert dest_data = os.urandom(384) + b"\x00\x00\x00" psk = os.urandom(32) msg = BlindingInfoMessage( session_id=10, endpoint_type=2, auth_type=2, blind_type=11, expiration=500000, endpoint_data=dest_data, auth_key=psk, ) wire = msg.to_wire() msg2 = I2CPMessage.from_wire(wire) assert isinstance(msg2, BlindingInfoMessage) assert msg2.endpoint_type == 2 assert msg2.auth_type == 2 assert msg2.endpoint_data == dest_data assert msg2.auth_key == psk class TestBlindingInfoPubkey: """Endpoint type 3 (pubkey): SigType(2) + key bytes.""" def test_roundtrip(self): # SigType 7 (Ed25519) = 32-byte key sig_type = struct.pack("!H", 7) key_bytes = os.urandom(32) endpoint_data = sig_type + key_bytes msg = BlindingInfoMessage( session_id=3, endpoint_type=3, auth_type=0, blind_type=7, expiration=0, endpoint_data=endpoint_data, ) wire = msg.to_wire() msg2 = I2CPMessage.from_wire(wire) assert isinstance(msg2, BlindingInfoMessage) assert msg2.endpoint_type == 3 assert msg2.endpoint_data == endpoint_data class TestBlindingInfoDefaultExpiration: """Expiration=0 means use default.""" def test_zero_expiration(self): msg = BlindingInfoMessage( session_id=1, endpoint_type=0, auth_type=0, blind_type=11, expiration=0, endpoint_data=os.urandom(32), ) wire = msg.to_wire() msg2 = I2CPMessage.from_wire(wire) assert msg2.expiration == 0 class TestBlindingInfoRegistry: """Type 42 is registered in the I2CP message registry.""" def test_registry(self): assert I2CPMessage._registry.get(42) is BlindingInfoMessage def test_factory_dispatch(self): msg = BlindingInfoMessage( session_id=1, endpoint_type=0, auth_type=0, blind_type=11, expiration=0, endpoint_data=os.urandom(32), ) wire = msg.to_wire() msg2 = I2CPMessage.from_wire(wire) assert type(msg2).__name__ == "BlindingInfoMessage" @pytest.mark.parametrize("endpoint_type,auth_type", [ (0, 0), (0, 1), (0, 2), (1, 0), (1, 1), (1, 2), (3, 0), (3, 1), (3, 2), ]) def test_blinding_info_roundtrip_combos(endpoint_type, auth_type): """Parametrized roundtrip over endpoint/auth combinations (excluding type 2 dest for simplicity).""" if endpoint_type == 0: endpoint_data = os.urandom(32) elif endpoint_type == 1: hostname = b"example.i2p" endpoint_data = bytes([len(hostname)]) + hostname elif endpoint_type == 3: endpoint_data = struct.pack("!H", 7) + os.urandom(32) else: endpoint_data = os.urandom(32) auth_key = os.urandom(32) if auth_type != 0 else None msg = BlindingInfoMessage( session_id=42, endpoint_type=endpoint_type, auth_type=auth_type, blind_type=11, expiration=123456, endpoint_data=endpoint_data, auth_key=auth_key, ) wire = msg.to_wire() msg2 = I2CPMessage.from_wire(wire) assert msg2.session_id == 42 assert msg2.endpoint_type == endpoint_type assert msg2.auth_type == auth_type assert msg2.blind_type == 11 assert msg2.expiration == 123456 assert msg2.endpoint_data == endpoint_data assert msg2.auth_key == auth_key