A Python port of the Invisible Internet Project (I2P)
1"""Tests for Noise protocol framework."""
2
3import os
4
5import pytest
6
7
8class TestCipherState:
9 def test_construct_no_key(self):
10 from i2p_crypto.noise import CipherState
11 cs = CipherState()
12 assert not cs.has_key()
13
14 def test_construct_with_key(self):
15 from i2p_crypto.noise import CipherState
16 cs = CipherState(os.urandom(32))
17 assert cs.has_key()
18
19 def test_no_key_passthrough(self):
20 from i2p_crypto.noise import CipherState
21 cs = CipherState()
22 pt = b"hello noise"
23 ct = cs.encrypt_with_ad(b"", pt)
24 assert ct == pt # No encryption without key
25
26 def test_encrypt_decrypt_roundtrip(self):
27 from i2p_crypto.noise import CipherState
28 key = os.urandom(32)
29 cs_enc = CipherState(key)
30 cs_dec = CipherState(key)
31 pt = b"hello noise protocol"
32 ad = b"additional data"
33 ct = cs_enc.encrypt_with_ad(ad, pt)
34 assert ct != pt
35 result = cs_dec.decrypt_with_ad(ad, ct)
36 assert result == pt
37
38 def test_nonce_increment(self):
39 from i2p_crypto.noise import CipherState
40 key = os.urandom(32)
41 cs = CipherState(key)
42 # Encrypt twice — different ciphertexts due to nonce increment
43 ct1 = cs.encrypt_with_ad(b"", b"data")
44 ct2 = cs.encrypt_with_ad(b"", b"data")
45 assert ct1 != ct2
46
47 def test_set_nonce(self):
48 from i2p_crypto.noise import CipherState
49 key = os.urandom(32)
50 cs1 = CipherState(key)
51 cs1.set_nonce(5)
52 ct = cs1.encrypt_with_ad(b"", b"test")
53 cs2 = CipherState(key)
54 cs2.set_nonce(5)
55 pt = cs2.decrypt_with_ad(b"", ct)
56 assert pt == b"test"
57
58 def test_wrong_key_fails(self):
59 from i2p_crypto.noise import CipherState
60 cs1 = CipherState(os.urandom(32))
61 cs2 = CipherState(os.urandom(32))
62 ct = cs1.encrypt_with_ad(b"", b"secret")
63 with pytest.raises(Exception):
64 cs2.decrypt_with_ad(b"", ct)
65
66 def test_rekey(self):
67 from i2p_crypto.noise import CipherState
68 key = os.urandom(32)
69 cs = CipherState(key)
70 old_key = cs._key
71 cs.rekey()
72 assert cs._key != old_key
73 assert len(cs._key) == 32
74
75
76class TestSymmetricState:
77 def test_construct_short_name(self):
78 from i2p_crypto.noise import SymmetricState
79 ss = SymmetricState(b"short")
80 assert len(ss.h) == 32
81 assert ss.h == b"short" + b"\x00" * 27
82
83 def test_construct_long_name(self):
84 from i2p_crypto.noise import SymmetricState
85 name = b"A" * 64
86 ss = SymmetricState(name)
87 import hashlib
88 assert ss.h == hashlib.sha256(name).digest()
89
90 def test_mix_hash_changes_h(self):
91 from i2p_crypto.noise import SymmetricState
92 ss = SymmetricState(b"test")
93 h_before = ss.h
94 ss.mix_hash(b"some data")
95 assert ss.h != h_before
96
97 def test_mix_key_changes_ck(self):
98 from i2p_crypto.noise import SymmetricState
99 ss = SymmetricState(b"test")
100 ck_before = ss.ck
101 ss.mix_key(os.urandom(32))
102 assert ss.ck != ck_before
103
104 def test_mix_key_enables_encryption(self):
105 from i2p_crypto.noise import SymmetricState
106 ss = SymmetricState(b"test")
107 assert not ss._cipher.has_key()
108 ss.mix_key(os.urandom(32))
109 assert ss._cipher.has_key()
110
111 def test_encrypt_decrypt_and_hash_roundtrip(self):
112 from i2p_crypto.noise import SymmetricState
113 ss1 = SymmetricState(b"test")
114 ss2 = SymmetricState(b"test")
115 # mix_key to enable encryption
116 ikm = os.urandom(32)
117 ss1.mix_key(ikm)
118 ss2.mix_key(ikm)
119 ct = ss1.encrypt_and_hash(b"payload")
120 pt = ss2.decrypt_and_hash(ct)
121 assert pt == b"payload"
122 # Hashes should match after both operations
123 assert ss1.h == ss2.h
124
125 def test_split_produces_two_ciphers(self):
126 from i2p_crypto.noise import SymmetricState
127 ss = SymmetricState(b"test")
128 ss.mix_key(os.urandom(32))
129 c1, c2 = ss.split()
130 assert c1.has_key()
131 assert c2.has_key()
132 # Different keys
133 assert c1._key != c2._key
134
135
136class TestHandshakeIK:
137 """Full IK handshake: initiator knows responder's static key."""
138
139 def _make_keypair(self):
140 from i2p_crypto.x25519 import X25519DH
141 return X25519DH.generate_keypair()
142
143 def test_full_handshake(self):
144 from i2p_crypto.noise import HandshakeState
145 # Generate keys
146 i_static = self._make_keypair()
147 r_static = self._make_keypair()
148
149 # Initiator knows responder's static public
150 initiator = HandshakeState("Noise_IK", initiator=True,
151 s=i_static, rs=r_static[1])
152 responder = HandshakeState("Noise_IK", initiator=False,
153 s=r_static)
154
155 # Message 1: initiator -> responder
156 msg1 = initiator.write_message(b"hello")
157 payload1 = responder.read_message(msg1)
158 assert payload1 == b"hello"
159
160 # Message 2: responder -> initiator
161 msg2 = responder.write_message(b"world")
162 payload2 = initiator.read_message(msg2)
163 assert payload2 == b"world"
164
165 # Both complete
166 assert initiator.complete
167 assert responder.complete
168
169 # Responder learned initiator's static key
170 assert responder.remote_static == i_static[1]
171
172 def test_transport_after_handshake(self):
173 from i2p_crypto.noise import HandshakeState
174 i_static = self._make_keypair()
175 r_static = self._make_keypair()
176
177 initiator = HandshakeState("Noise_IK", initiator=True,
178 s=i_static, rs=r_static[1])
179 responder = HandshakeState("Noise_IK", initiator=False,
180 s=r_static)
181
182 msg1 = initiator.write_message()
183 responder.read_message(msg1)
184 msg2 = responder.write_message()
185 initiator.read_message(msg2)
186
187 # Split into transport ciphers
188 i_send, i_recv = initiator.split()
189 r_recv, r_send = responder.split()
190
191 # Initiator sends to responder
192 ct = i_send.encrypt_with_ad(b"", b"transport data")
193 pt = r_recv.decrypt_with_ad(b"", ct)
194 assert pt == b"transport data"
195
196 # Responder sends to initiator
197 ct2 = r_send.encrypt_with_ad(b"", b"reply data")
198 pt2 = i_recv.decrypt_with_ad(b"", ct2)
199 assert pt2 == b"reply data"
200
201
202class TestHandshakeXK:
203 """Full XK handshake: initiator knows responder's static key, 3 messages."""
204
205 def _make_keypair(self):
206 from i2p_crypto.x25519 import X25519DH
207 return X25519DH.generate_keypair()
208
209 def test_full_handshake(self):
210 from i2p_crypto.noise import HandshakeState
211 i_static = self._make_keypair()
212 r_static = self._make_keypair()
213
214 initiator = HandshakeState("Noise_XK", initiator=True,
215 s=i_static, rs=r_static[1])
216 responder = HandshakeState("Noise_XK", initiator=False,
217 s=r_static)
218
219 # Message 1: initiator -> responder (e, es)
220 msg1 = initiator.write_message(b"msg1")
221 p1 = responder.read_message(msg1)
222 assert p1 == b"msg1"
223
224 # Message 2: responder -> initiator (e, ee)
225 msg2 = responder.write_message(b"msg2")
226 p2 = initiator.read_message(msg2)
227 assert p2 == b"msg2"
228
229 # Message 3: initiator -> responder (s, se)
230 msg3 = initiator.write_message(b"msg3")
231 p3 = responder.read_message(msg3)
232 assert p3 == b"msg3"
233
234 assert initiator.complete
235 assert responder.complete
236
237 # Responder learned initiator's static
238 assert responder.remote_static == i_static[1]
239
240 def test_transport_after_xk(self):
241 from i2p_crypto.noise import HandshakeState
242 i_static = self._make_keypair()
243 r_static = self._make_keypair()
244
245 initiator = HandshakeState("Noise_XK", initiator=True,
246 s=i_static, rs=r_static[1])
247 responder = HandshakeState("Noise_XK", initiator=False,
248 s=r_static)
249
250 msg1 = initiator.write_message()
251 responder.read_message(msg1)
252 msg2 = responder.write_message()
253 initiator.read_message(msg2)
254 msg3 = initiator.write_message()
255 responder.read_message(msg3)
256
257 i_send, i_recv = initiator.split()
258 r_recv, r_send = responder.split()
259
260 ct = i_send.encrypt_with_ad(b"", b"xk transport")
261 pt = r_recv.decrypt_with_ad(b"", ct)
262 assert pt == b"xk transport"
263
264
265class TestHandshakeErrors:
266 def test_write_wrong_turn(self):
267 from i2p_crypto.noise import HandshakeState
268 from i2p_crypto.x25519 import X25519DH
269 r_static = X25519DH.generate_keypair()
270 # Responder tries to write first — should fail
271 responder = HandshakeState("Noise_IK", initiator=False, s=r_static)
272 with pytest.raises(RuntimeError):
273 responder.write_message()
274
275 def test_split_before_complete(self):
276 from i2p_crypto.noise import HandshakeState
277 from i2p_crypto.x25519 import X25519DH
278 s = X25519DH.generate_keypair()
279 rs = X25519DH.generate_keypair()
280 hs = HandshakeState("Noise_IK", initiator=True, s=s, rs=rs[1])
281 with pytest.raises(RuntimeError):
282 hs.split()
283
284 def test_unknown_pattern(self):
285 from i2p_crypto.noise import HandshakeState
286 with pytest.raises(ValueError):
287 HandshakeState("Noise_NN", initiator=True)