A Python port of the Invisible Internet Project (I2P)
1"""Noise protocol compliance verification tests.
2
3Verifies that the Noise_XK and Noise_IK implementations conform to the
4Noise Protocol Framework specification (noiseprotocol.org/noise.html).
5
6Covers: message sequence correctness, MixHash/MixKey calls, Split
7correctness, prologue binding, nonce overflow detection, and static
8key validation.
9"""
10from __future__ import annotations
11
12import hashlib
13import os
14
15import pytest
16
17from i2p_crypto.noise import CipherState, SymmetricState, HandshakeState
18from i2p_crypto.x25519 import X25519DH
19
20
21def _make_keypair():
22 return X25519DH.generate_keypair()
23
24
25# ---------- Prologue binding ----------
26
27
28class TestPrologueBinding:
29 """Noise spec: prologue is MixHash'd into h before any DH.
30 Mismatched prologues must cause handshake failure.
31 """
32
33 def test_prologue_mismatch_causes_failure(self) -> None:
34 """XK handshake with different prologues must fail.
35
36 In XK, the 'es' token in message 1 runs MixKey, giving the
37 CipherState a key. The payload in message 1 is then encrypted
38 with AD = h (which includes the prologue). If prologues differ,
39 h values diverge, causing AEAD decryption to fail at message 1.
40 """
41 i_s = _make_keypair()
42 r_s = _make_keypair()
43
44 initiator = HandshakeState(
45 "Noise_XK", initiator=True,
46 s=i_s, rs=r_s[1], prologue=b"prologue-A",
47 )
48 responder = HandshakeState(
49 "Noise_XK", initiator=False,
50 s=r_s, prologue=b"prologue-B",
51 )
52
53 msg1 = initiator.write_message(b"test")
54
55 # Responder fails to read message 1 — hash chains diverged
56 with pytest.raises(Exception):
57 responder.read_message(msg1)
58
59 def test_matching_prologue_succeeds(self) -> None:
60 """Same prologue on both sides must produce successful handshake."""
61 i_s = _make_keypair()
62 r_s = _make_keypair()
63
64 initiator = HandshakeState(
65 "Noise_XK", initiator=True,
66 s=i_s, rs=r_s[1], prologue=b"same-prologue",
67 )
68 responder = HandshakeState(
69 "Noise_XK", initiator=False,
70 s=r_s, prologue=b"same-prologue",
71 )
72
73 msg1 = initiator.write_message(b"hello")
74 p1 = responder.read_message(msg1)
75 assert p1 == b"hello"
76
77 msg2 = responder.write_message(b"world")
78 p2 = initiator.read_message(msg2)
79 assert p2 == b"world"
80
81 msg3 = initiator.write_message(b"final")
82 p3 = responder.read_message(msg3)
83 assert p3 == b"final"
84
85 assert initiator.complete and responder.complete
86
87 def test_empty_prologue_default(self) -> None:
88 """Default empty prologue must match another empty prologue."""
89 i_s = _make_keypair()
90 r_s = _make_keypair()
91
92 initiator = HandshakeState(
93 "Noise_XK", initiator=True,
94 s=i_s, rs=r_s[1],
95 )
96 responder = HandshakeState(
97 "Noise_XK", initiator=False,
98 s=r_s,
99 )
100
101 msg1 = initiator.write_message()
102 responder.read_message(msg1)
103 msg2 = responder.write_message()
104 initiator.read_message(msg2)
105 msg3 = initiator.write_message()
106 responder.read_message(msg3)
107
108 assert initiator.complete and responder.complete
109
110
111# ---------- Nonce overflow ----------
112
113
114class TestNonceOverflow:
115 def test_nonce_overflow_encrypt_raises(self) -> None:
116 """CipherState must reject encryption at MAX_NONCE."""
117 cs = CipherState(os.urandom(32))
118 cs.set_nonce(CipherState.MAX_NONCE)
119 with pytest.raises(RuntimeError, match="Nonce exhausted"):
120 cs.encrypt_with_ad(b"", b"data")
121
122 def test_nonce_overflow_decrypt_raises(self) -> None:
123 """CipherState must reject decryption at MAX_NONCE."""
124 key = os.urandom(32)
125 cs_enc = CipherState(key)
126 ct = cs_enc.encrypt_with_ad(b"", b"data")
127
128 cs_dec = CipherState(key)
129 cs_dec.set_nonce(CipherState.MAX_NONCE)
130 with pytest.raises(RuntimeError, match="Nonce exhausted"):
131 cs_dec.decrypt_with_ad(b"", ct)
132
133 def test_nonce_just_below_max_works(self) -> None:
134 """Nonce at MAX_NONCE - 1 should still work."""
135 key = os.urandom(32)
136 cs_enc = CipherState(key)
137 cs_enc.set_nonce(CipherState.MAX_NONCE - 1)
138 ct = cs_enc.encrypt_with_ad(b"", b"last-message")
139
140 cs_dec = CipherState(key)
141 cs_dec.set_nonce(CipherState.MAX_NONCE - 1)
142 pt = cs_dec.decrypt_with_ad(b"", ct)
143 assert pt == b"last-message"
144
145
146# ---------- Split correctness ----------
147
148
149class TestSplitCorrectness:
150 def test_split_produces_distinct_keys(self) -> None:
151 """Split must produce two CipherStates with different keys."""
152 ss = SymmetricState(b"test-protocol")
153 ss.mix_key(os.urandom(32))
154 c1, c2 = ss.split()
155 assert c1.has_key() and c2.has_key()
156 assert c1._key != c2._key
157
158 def test_split_ciphers_start_at_nonce_zero(self) -> None:
159 """Both CipherStates from Split must start with nonce = 0."""
160 ss = SymmetricState(b"test-protocol")
161 ss.mix_key(os.urandom(32))
162 c1, c2 = ss.split()
163 assert c1._n == 0
164 assert c2._n == 0
165
166 def test_split_is_deterministic(self) -> None:
167 """Two SymmetricStates with same history produce same Split keys."""
168 ikm = os.urandom(32)
169 ss1 = SymmetricState(b"test")
170 ss1.mix_key(ikm)
171 c1a, c1b = ss1.split()
172
173 ss2 = SymmetricState(b"test")
174 ss2.mix_key(ikm)
175 c2a, c2b = ss2.split()
176
177 assert c1a._key == c2a._key
178 assert c1b._key == c2b._key
179
180
181# ---------- XK message sequence compliance ----------
182
183
184class TestXKMessageSequence:
185 """Verify XK pattern: -> e, es / <- e, ee / -> s, se"""
186
187 def test_message_1_contains_ephemeral(self) -> None:
188 """Message 1 must contain initiator's ephemeral public key (32 bytes)."""
189 i_s = _make_keypair()
190 r_s = _make_keypair()
191
192 initiator = HandshakeState(
193 "Noise_XK", initiator=True,
194 s=i_s, rs=r_s[1],
195 )
196
197 msg1 = initiator.write_message(b"")
198 # Message 1 for XK with empty payload:
199 # e(32) + encrypted_payload(0 bytes + 16 byte tag from es having set cipher key)
200 # After MixKey from es, the cipher has a key, so payload gets encrypted
201 assert len(msg1) >= 32
202
203 def test_message_3_contains_encrypted_static(self) -> None:
204 """Message 3 must contain initiator's encrypted static key."""
205 i_s = _make_keypair()
206 r_s = _make_keypair()
207
208 initiator = HandshakeState(
209 "Noise_XK", initiator=True,
210 s=i_s, rs=r_s[1],
211 )
212 responder = HandshakeState(
213 "Noise_XK", initiator=False,
214 s=r_s,
215 )
216
217 msg1 = initiator.write_message()
218 responder.read_message(msg1)
219 msg2 = responder.write_message()
220 initiator.read_message(msg2)
221
222 msg3 = initiator.write_message()
223 # Message 3: encrypted static (32 + 16 tag) + encrypted payload (0 + 16 tag) = 64
224 assert len(msg3) == 64
225
226 # Responder recovers initiator's static key
227 responder.read_message(msg3)
228 assert responder.remote_static == i_s[1]
229
230
231# ---------- MixHash chain integrity ----------
232
233
234class TestMixHashChainIntegrity:
235 def test_handshake_hash_chains_match(self) -> None:
236 """After handshake, both sides must have identical h (handshake hash)."""
237 i_s = _make_keypair()
238 r_s = _make_keypair()
239
240 initiator = HandshakeState(
241 "Noise_XK", initiator=True,
242 s=i_s, rs=r_s[1],
243 )
244 responder = HandshakeState(
245 "Noise_XK", initiator=False,
246 s=r_s,
247 )
248
249 msg1 = initiator.write_message()
250 responder.read_message(msg1)
251 msg2 = responder.write_message()
252 initiator.read_message(msg2)
253 msg3 = initiator.write_message()
254 responder.read_message(msg3)
255
256 # Both sides must have identical handshake hash
257 assert initiator._ss.h == responder._ss.h
258
259
260# ---------- Rekey correctness ----------
261
262
263class TestRekey:
264 def test_rekey_changes_key(self) -> None:
265 key = os.urandom(32)
266 cs = CipherState(key)
267 old_key = cs._key
268 cs.rekey()
269 assert cs._key != old_key
270 assert len(cs._key) == 32
271
272 def test_rekey_preserves_nonce(self) -> None:
273 cs = CipherState(os.urandom(32))
274 cs.set_nonce(42)
275 cs.rekey()
276 assert cs._n == 42
277
278 def test_rekey_is_deterministic(self) -> None:
279 key = os.urandom(32)
280 cs1 = CipherState(key)
281 cs2 = CipherState(key)
282 cs1.rekey()
283 cs2.rekey()
284 assert cs1._key == cs2._key