A Python port of the Invisible Internet Project (I2P)
1"""Tests for SSU2 header protection and constants."""
2
3import os
4
5import pytest
6
7
8class TestSSU2Constants:
9 def test_short_header_size(self):
10 from i2p_transport.ssu2 import SHORT_HEADER_SIZE
11 assert SHORT_HEADER_SIZE == 16
12
13 def test_long_header_size(self):
14 from i2p_transport.ssu2 import LONG_HEADER_SIZE
15 assert LONG_HEADER_SIZE == 32
16
17 def test_session_header_size(self):
18 from i2p_transport.ssu2 import SESSION_HEADER_SIZE
19 assert SESSION_HEADER_SIZE == 64
20
21 def test_mac_len(self):
22 from i2p_transport.ssu2 import MAC_LEN
23 assert MAC_LEN == 16
24
25 def test_key_len(self):
26 from i2p_transport.ssu2 import KEY_LEN
27 assert KEY_LEN == 32
28
29 def test_header_prot_sample_len(self):
30 from i2p_transport.ssu2 import HEADER_PROT_SAMPLE_LEN
31 assert HEADER_PROT_SAMPLE_LEN == 24
32
33 def test_protocol_version(self):
34 from i2p_transport.ssu2 import PROTOCOL_VERSION
35 assert PROTOCOL_VERSION == 2
36
37
38class TestSSU2HeaderProtection:
39 def _make_keys(self):
40 return os.urandom(32), os.urandom(32)
41
42 def test_short_header_protection_roundtrip(self):
43 from i2p_transport.ssu2 import SSU2HeaderProtection, SHORT_HEADER_SIZE
44
45 key1, key2 = self._make_keys()
46 hp = SSU2HeaderProtection(key1, key2)
47
48 # Build a fake packet: 16-byte header + at least 12 bytes of "encrypted body"
49 original_header = os.urandom(SHORT_HEADER_SIZE)
50 body = os.urandom(64)
51 packet = bytearray(original_header + body)
52
53 # Save original first 8 bytes
54 original_first8 = bytes(packet[:8])
55
56 hp.encrypt_short_header(packet)
57 # First 8 bytes should be different after encryption
58 assert bytes(packet[:8]) != original_first8
59
60 hp.decrypt_short_header(packet)
61 # After decrypt, first 8 bytes should be restored
62 assert bytes(packet[:8]) == original_first8
63 # Body unchanged
64 assert bytes(packet[SHORT_HEADER_SIZE:]) == body
65
66 def test_long_header_protection_roundtrip(self):
67 from i2p_transport.ssu2 import SSU2HeaderProtection, LONG_HEADER_SIZE
68
69 key1, key2 = self._make_keys()
70 hp = SSU2HeaderProtection(key1, key2)
71
72 # Build a fake packet: 32-byte long header + at least 12 bytes body
73 original_header = os.urandom(LONG_HEADER_SIZE)
74 body = os.urandom(64)
75 packet = bytearray(original_header + body)
76
77 original_first8 = bytes(packet[:8])
78 original_mid8 = bytes(packet[12:20])
79
80 hp.encrypt_long_header(packet)
81 # Both protected regions should change
82 assert bytes(packet[:8]) != original_first8 or bytes(packet[12:20]) != original_mid8
83
84 hp.decrypt_long_header(packet)
85 # Both regions restored
86 assert bytes(packet[:8]) == original_first8
87 assert bytes(packet[12:20]) == original_mid8
88 # Body unchanged
89 assert bytes(packet[LONG_HEADER_SIZE:]) == body
90
91 def test_header_protection_different_keys(self):
92 from i2p_transport.ssu2 import SSU2HeaderProtection, SHORT_HEADER_SIZE
93
94 key1a, key2a = os.urandom(32), os.urandom(32)
95 key1b, key2b = os.urandom(32), os.urandom(32)
96 hp_a = SSU2HeaderProtection(key1a, key2a)
97 hp_b = SSU2HeaderProtection(key1b, key2b)
98
99 header = os.urandom(SHORT_HEADER_SIZE)
100 body = os.urandom(64)
101
102 packet_a = bytearray(header + body)
103 packet_b = bytearray(header + body)
104
105 hp_a.encrypt_short_header(packet_a)
106 hp_b.encrypt_short_header(packet_b)
107
108 # Different keys should produce different encrypted headers
109 assert bytes(packet_a[:8]) != bytes(packet_b[:8])
110
111 def test_short_header_idempotent_double_encrypt_decrypt(self):
112 """Encrypt twice, decrypt twice should restore original."""
113 from i2p_transport.ssu2 import SSU2HeaderProtection, SHORT_HEADER_SIZE
114
115 key1, key2 = self._make_keys()
116 hp = SSU2HeaderProtection(key1, key2)
117
118 original_header = os.urandom(SHORT_HEADER_SIZE)
119 body = os.urandom(64)
120 packet = bytearray(original_header + body)
121
122 original_first8 = bytes(packet[:8])
123
124 hp.encrypt_short_header(packet)
125 hp.encrypt_short_header(packet)
126 hp.decrypt_short_header(packet)
127 hp.decrypt_short_header(packet)
128
129 assert bytes(packet[:8]) == original_first8