A Python port of the Invisible Internet Project (I2P)
1"""Tier 4 protocol gap tests: BlindingInfoMessage (Type 42)."""
2
3import os
4import struct
5import pytest
6
7from i2p_client.i2cp_messages import I2CPMessage, BlindingInfoMessage
8
9
10class TestBlindingInfoMessageConstants:
11 """Verify type code and endpoint/auth constants."""
12
13 def test_type_code(self):
14 assert BlindingInfoMessage.TYPE == 42
15
16 def test_endpoint_constants(self):
17 assert BlindingInfoMessage.ENDPOINT_HASH == 0
18 assert BlindingInfoMessage.ENDPOINT_HOST == 1
19 assert BlindingInfoMessage.ENDPOINT_DEST == 2
20 assert BlindingInfoMessage.ENDPOINT_PUBKEY == 3
21
22 def test_auth_constants(self):
23 assert BlindingInfoMessage.AUTH_NONE == 0
24 assert BlindingInfoMessage.AUTH_DH == 1
25 assert BlindingInfoMessage.AUTH_PSK == 2
26
27
28class TestBlindingInfoHashNoAuth:
29 """Endpoint type 0 (hash), no auth."""
30
31 def test_roundtrip(self):
32 hash_data = os.urandom(32)
33 msg = BlindingInfoMessage(
34 session_id=1,
35 endpoint_type=0,
36 auth_type=0,
37 blind_type=11, # Ed25519
38 expiration=0,
39 endpoint_data=hash_data,
40 auth_key=None,
41 )
42 wire = msg.to_wire()
43 msg2 = I2CPMessage.from_wire(wire)
44 assert isinstance(msg2, BlindingInfoMessage)
45 assert msg2.session_id == 1
46 assert msg2.endpoint_type == 0
47 assert msg2.auth_type == 0
48 assert msg2.blind_type == 11
49 assert msg2.expiration == 0
50 assert msg2.endpoint_data == hash_data
51 assert msg2.auth_key is None
52
53 def test_wire_layout(self):
54 hash_data = b"\xaa" * 32
55 msg = BlindingInfoMessage(
56 session_id=0x0102,
57 endpoint_type=0,
58 auth_type=0,
59 blind_type=11,
60 expiration=1000,
61 endpoint_data=hash_data,
62 )
63 payload = msg.payload_bytes()
64 # session_id(2) + endpoint_type(1) + auth_type(1) + blind_type(2) + expiration(4) + hash(32) = 42
65 assert len(payload) == 42
66 assert struct.unpack("!H", payload[0:2])[0] == 0x0102 # session_id
67 assert payload[2] == 0 # endpoint_type
68 assert payload[3] == 0 # auth_type
69 assert struct.unpack("!H", payload[4:6])[0] == 11 # blind_type
70 assert struct.unpack("!I", payload[6:10])[0] == 1000 # expiration
71 assert payload[10:42] == hash_data
72
73
74class TestBlindingInfoHostDH:
75 """Endpoint type 1 (host), DH auth."""
76
77 def test_roundtrip(self):
78 hostname = b"\x08test.i2p" # 1-byte len + hostname
79 dh_key = os.urandom(32)
80 msg = BlindingInfoMessage(
81 session_id=5,
82 endpoint_type=1,
83 auth_type=1,
84 blind_type=7, # Ed25519-SHA-512
85 expiration=999999,
86 endpoint_data=hostname,
87 auth_key=dh_key,
88 )
89 wire = msg.to_wire()
90 msg2 = I2CPMessage.from_wire(wire)
91 assert isinstance(msg2, BlindingInfoMessage)
92 assert msg2.session_id == 5
93 assert msg2.endpoint_type == 1
94 assert msg2.auth_type == 1
95 assert msg2.endpoint_data == hostname
96 assert msg2.auth_key == dh_key
97
98 def test_wire_includes_auth_key(self):
99 hostname = b"\x04test"
100 dh_key = b"\xbb" * 32
101 msg = BlindingInfoMessage(
102 session_id=1,
103 endpoint_type=1,
104 auth_type=1,
105 blind_type=11,
106 expiration=0,
107 endpoint_data=hostname,
108 auth_key=dh_key,
109 )
110 payload = msg.payload_bytes()
111 # header(10) + hostname(5) + dh_key(32) = 47
112 assert len(payload) == 47
113 assert payload[-32:] == dh_key
114
115
116class TestBlindingInfoDestPSK:
117 """Endpoint type 2 (dest), PSK auth."""
118
119 def test_roundtrip(self):
120 # Valid dest: 384 key bytes + null cert
121 dest_data = os.urandom(384) + b"\x00\x00\x00"
122 psk = os.urandom(32)
123 msg = BlindingInfoMessage(
124 session_id=10,
125 endpoint_type=2,
126 auth_type=2,
127 blind_type=11,
128 expiration=500000,
129 endpoint_data=dest_data,
130 auth_key=psk,
131 )
132 wire = msg.to_wire()
133 msg2 = I2CPMessage.from_wire(wire)
134 assert isinstance(msg2, BlindingInfoMessage)
135 assert msg2.endpoint_type == 2
136 assert msg2.auth_type == 2
137 assert msg2.endpoint_data == dest_data
138 assert msg2.auth_key == psk
139
140
141class TestBlindingInfoPubkey:
142 """Endpoint type 3 (pubkey): SigType(2) + key bytes."""
143
144 def test_roundtrip(self):
145 # SigType 7 (Ed25519) = 32-byte key
146 sig_type = struct.pack("!H", 7)
147 key_bytes = os.urandom(32)
148 endpoint_data = sig_type + key_bytes
149 msg = BlindingInfoMessage(
150 session_id=3,
151 endpoint_type=3,
152 auth_type=0,
153 blind_type=7,
154 expiration=0,
155 endpoint_data=endpoint_data,
156 )
157 wire = msg.to_wire()
158 msg2 = I2CPMessage.from_wire(wire)
159 assert isinstance(msg2, BlindingInfoMessage)
160 assert msg2.endpoint_type == 3
161 assert msg2.endpoint_data == endpoint_data
162
163
164class TestBlindingInfoDefaultExpiration:
165 """Expiration=0 means use default."""
166
167 def test_zero_expiration(self):
168 msg = BlindingInfoMessage(
169 session_id=1,
170 endpoint_type=0,
171 auth_type=0,
172 blind_type=11,
173 expiration=0,
174 endpoint_data=os.urandom(32),
175 )
176 wire = msg.to_wire()
177 msg2 = I2CPMessage.from_wire(wire)
178 assert msg2.expiration == 0
179
180
181class TestBlindingInfoRegistry:
182 """Type 42 is registered in the I2CP message registry."""
183
184 def test_registry(self):
185 assert I2CPMessage._registry.get(42) is BlindingInfoMessage
186
187 def test_factory_dispatch(self):
188 msg = BlindingInfoMessage(
189 session_id=1,
190 endpoint_type=0,
191 auth_type=0,
192 blind_type=11,
193 expiration=0,
194 endpoint_data=os.urandom(32),
195 )
196 wire = msg.to_wire()
197 msg2 = I2CPMessage.from_wire(wire)
198 assert type(msg2).__name__ == "BlindingInfoMessage"
199
200
201@pytest.mark.parametrize("endpoint_type,auth_type", [
202 (0, 0), (0, 1), (0, 2),
203 (1, 0), (1, 1), (1, 2),
204 (3, 0), (3, 1), (3, 2),
205])
206def test_blinding_info_roundtrip_combos(endpoint_type, auth_type):
207 """Parametrized roundtrip over endpoint/auth combinations (excluding type 2 dest for simplicity)."""
208 if endpoint_type == 0:
209 endpoint_data = os.urandom(32)
210 elif endpoint_type == 1:
211 hostname = b"example.i2p"
212 endpoint_data = bytes([len(hostname)]) + hostname
213 elif endpoint_type == 3:
214 endpoint_data = struct.pack("!H", 7) + os.urandom(32)
215 else:
216 endpoint_data = os.urandom(32)
217
218 auth_key = os.urandom(32) if auth_type != 0 else None
219
220 msg = BlindingInfoMessage(
221 session_id=42,
222 endpoint_type=endpoint_type,
223 auth_type=auth_type,
224 blind_type=11,
225 expiration=123456,
226 endpoint_data=endpoint_data,
227 auth_key=auth_key,
228 )
229 wire = msg.to_wire()
230 msg2 = I2CPMessage.from_wire(wire)
231 assert msg2.session_id == 42
232 assert msg2.endpoint_type == endpoint_type
233 assert msg2.auth_type == auth_type
234 assert msg2.blind_type == 11
235 assert msg2.expiration == 123456
236 assert msg2.endpoint_data == endpoint_data
237 assert msg2.auth_key == auth_key