"""Tests for SSU2 header protection and constants.""" import os import pytest class TestSSU2Constants: def test_short_header_size(self): from i2p_transport.ssu2 import SHORT_HEADER_SIZE assert SHORT_HEADER_SIZE == 16 def test_long_header_size(self): from i2p_transport.ssu2 import LONG_HEADER_SIZE assert LONG_HEADER_SIZE == 32 def test_session_header_size(self): from i2p_transport.ssu2 import SESSION_HEADER_SIZE assert SESSION_HEADER_SIZE == 64 def test_mac_len(self): from i2p_transport.ssu2 import MAC_LEN assert MAC_LEN == 16 def test_key_len(self): from i2p_transport.ssu2 import KEY_LEN assert KEY_LEN == 32 def test_header_prot_sample_len(self): from i2p_transport.ssu2 import HEADER_PROT_SAMPLE_LEN assert HEADER_PROT_SAMPLE_LEN == 24 def test_protocol_version(self): from i2p_transport.ssu2 import PROTOCOL_VERSION assert PROTOCOL_VERSION == 2 class TestSSU2HeaderProtection: def _make_keys(self): return os.urandom(32), os.urandom(32) def test_short_header_protection_roundtrip(self): from i2p_transport.ssu2 import SSU2HeaderProtection, SHORT_HEADER_SIZE key1, key2 = self._make_keys() hp = SSU2HeaderProtection(key1, key2) # Build a fake packet: 16-byte header + at least 12 bytes of "encrypted body" original_header = os.urandom(SHORT_HEADER_SIZE) body = os.urandom(64) packet = bytearray(original_header + body) # Save original first 8 bytes original_first8 = bytes(packet[:8]) hp.encrypt_short_header(packet) # First 8 bytes should be different after encryption assert bytes(packet[:8]) != original_first8 hp.decrypt_short_header(packet) # After decrypt, first 8 bytes should be restored assert bytes(packet[:8]) == original_first8 # Body unchanged assert bytes(packet[SHORT_HEADER_SIZE:]) == body def test_long_header_protection_roundtrip(self): from i2p_transport.ssu2 import SSU2HeaderProtection, LONG_HEADER_SIZE key1, key2 = self._make_keys() hp = SSU2HeaderProtection(key1, key2) # Build a fake packet: 32-byte long header + at least 12 bytes body original_header = os.urandom(LONG_HEADER_SIZE) body = os.urandom(64) packet = bytearray(original_header + body) original_first8 = bytes(packet[:8]) original_mid8 = bytes(packet[12:20]) hp.encrypt_long_header(packet) # Both protected regions should change assert bytes(packet[:8]) != original_first8 or bytes(packet[12:20]) != original_mid8 hp.decrypt_long_header(packet) # Both regions restored assert bytes(packet[:8]) == original_first8 assert bytes(packet[12:20]) == original_mid8 # Body unchanged assert bytes(packet[LONG_HEADER_SIZE:]) == body def test_header_protection_different_keys(self): from i2p_transport.ssu2 import SSU2HeaderProtection, SHORT_HEADER_SIZE key1a, key2a = os.urandom(32), os.urandom(32) key1b, key2b = os.urandom(32), os.urandom(32) hp_a = SSU2HeaderProtection(key1a, key2a) hp_b = SSU2HeaderProtection(key1b, key2b) header = os.urandom(SHORT_HEADER_SIZE) body = os.urandom(64) packet_a = bytearray(header + body) packet_b = bytearray(header + body) hp_a.encrypt_short_header(packet_a) hp_b.encrypt_short_header(packet_b) # Different keys should produce different encrypted headers assert bytes(packet_a[:8]) != bytes(packet_b[:8]) def test_short_header_idempotent_double_encrypt_decrypt(self): """Encrypt twice, decrypt twice should restore original.""" from i2p_transport.ssu2 import SSU2HeaderProtection, SHORT_HEADER_SIZE key1, key2 = self._make_keys() hp = SSU2HeaderProtection(key1, key2) original_header = os.urandom(SHORT_HEADER_SIZE) body = os.urandom(64) packet = bytearray(original_header + body) original_first8 = bytes(packet[:8]) hp.encrypt_short_header(packet) hp.encrypt_short_header(packet) hp.decrypt_short_header(packet) hp.decrypt_short_header(packet) assert bytes(packet[:8]) == original_first8