A Python port of the Invisible Internet Project (I2P)
at main 287 lines 9.7 kB view raw
1"""Tests for Noise protocol framework.""" 2 3import os 4 5import pytest 6 7 8class TestCipherState: 9 def test_construct_no_key(self): 10 from i2p_crypto.noise import CipherState 11 cs = CipherState() 12 assert not cs.has_key() 13 14 def test_construct_with_key(self): 15 from i2p_crypto.noise import CipherState 16 cs = CipherState(os.urandom(32)) 17 assert cs.has_key() 18 19 def test_no_key_passthrough(self): 20 from i2p_crypto.noise import CipherState 21 cs = CipherState() 22 pt = b"hello noise" 23 ct = cs.encrypt_with_ad(b"", pt) 24 assert ct == pt # No encryption without key 25 26 def test_encrypt_decrypt_roundtrip(self): 27 from i2p_crypto.noise import CipherState 28 key = os.urandom(32) 29 cs_enc = CipherState(key) 30 cs_dec = CipherState(key) 31 pt = b"hello noise protocol" 32 ad = b"additional data" 33 ct = cs_enc.encrypt_with_ad(ad, pt) 34 assert ct != pt 35 result = cs_dec.decrypt_with_ad(ad, ct) 36 assert result == pt 37 38 def test_nonce_increment(self): 39 from i2p_crypto.noise import CipherState 40 key = os.urandom(32) 41 cs = CipherState(key) 42 # Encrypt twice — different ciphertexts due to nonce increment 43 ct1 = cs.encrypt_with_ad(b"", b"data") 44 ct2 = cs.encrypt_with_ad(b"", b"data") 45 assert ct1 != ct2 46 47 def test_set_nonce(self): 48 from i2p_crypto.noise import CipherState 49 key = os.urandom(32) 50 cs1 = CipherState(key) 51 cs1.set_nonce(5) 52 ct = cs1.encrypt_with_ad(b"", b"test") 53 cs2 = CipherState(key) 54 cs2.set_nonce(5) 55 pt = cs2.decrypt_with_ad(b"", ct) 56 assert pt == b"test" 57 58 def test_wrong_key_fails(self): 59 from i2p_crypto.noise import CipherState 60 cs1 = CipherState(os.urandom(32)) 61 cs2 = CipherState(os.urandom(32)) 62 ct = cs1.encrypt_with_ad(b"", b"secret") 63 with pytest.raises(Exception): 64 cs2.decrypt_with_ad(b"", ct) 65 66 def test_rekey(self): 67 from i2p_crypto.noise import CipherState 68 key = os.urandom(32) 69 cs = CipherState(key) 70 old_key = cs._key 71 cs.rekey() 72 assert cs._key != old_key 73 assert len(cs._key) == 32 74 75 76class TestSymmetricState: 77 def test_construct_short_name(self): 78 from i2p_crypto.noise import SymmetricState 79 ss = SymmetricState(b"short") 80 assert len(ss.h) == 32 81 assert ss.h == b"short" + b"\x00" * 27 82 83 def test_construct_long_name(self): 84 from i2p_crypto.noise import SymmetricState 85 name = b"A" * 64 86 ss = SymmetricState(name) 87 import hashlib 88 assert ss.h == hashlib.sha256(name).digest() 89 90 def test_mix_hash_changes_h(self): 91 from i2p_crypto.noise import SymmetricState 92 ss = SymmetricState(b"test") 93 h_before = ss.h 94 ss.mix_hash(b"some data") 95 assert ss.h != h_before 96 97 def test_mix_key_changes_ck(self): 98 from i2p_crypto.noise import SymmetricState 99 ss = SymmetricState(b"test") 100 ck_before = ss.ck 101 ss.mix_key(os.urandom(32)) 102 assert ss.ck != ck_before 103 104 def test_mix_key_enables_encryption(self): 105 from i2p_crypto.noise import SymmetricState 106 ss = SymmetricState(b"test") 107 assert not ss._cipher.has_key() 108 ss.mix_key(os.urandom(32)) 109 assert ss._cipher.has_key() 110 111 def test_encrypt_decrypt_and_hash_roundtrip(self): 112 from i2p_crypto.noise import SymmetricState 113 ss1 = SymmetricState(b"test") 114 ss2 = SymmetricState(b"test") 115 # mix_key to enable encryption 116 ikm = os.urandom(32) 117 ss1.mix_key(ikm) 118 ss2.mix_key(ikm) 119 ct = ss1.encrypt_and_hash(b"payload") 120 pt = ss2.decrypt_and_hash(ct) 121 assert pt == b"payload" 122 # Hashes should match after both operations 123 assert ss1.h == ss2.h 124 125 def test_split_produces_two_ciphers(self): 126 from i2p_crypto.noise import SymmetricState 127 ss = SymmetricState(b"test") 128 ss.mix_key(os.urandom(32)) 129 c1, c2 = ss.split() 130 assert c1.has_key() 131 assert c2.has_key() 132 # Different keys 133 assert c1._key != c2._key 134 135 136class TestHandshakeIK: 137 """Full IK handshake: initiator knows responder's static key.""" 138 139 def _make_keypair(self): 140 from i2p_crypto.x25519 import X25519DH 141 return X25519DH.generate_keypair() 142 143 def test_full_handshake(self): 144 from i2p_crypto.noise import HandshakeState 145 # Generate keys 146 i_static = self._make_keypair() 147 r_static = self._make_keypair() 148 149 # Initiator knows responder's static public 150 initiator = HandshakeState("Noise_IK", initiator=True, 151 s=i_static, rs=r_static[1]) 152 responder = HandshakeState("Noise_IK", initiator=False, 153 s=r_static) 154 155 # Message 1: initiator -> responder 156 msg1 = initiator.write_message(b"hello") 157 payload1 = responder.read_message(msg1) 158 assert payload1 == b"hello" 159 160 # Message 2: responder -> initiator 161 msg2 = responder.write_message(b"world") 162 payload2 = initiator.read_message(msg2) 163 assert payload2 == b"world" 164 165 # Both complete 166 assert initiator.complete 167 assert responder.complete 168 169 # Responder learned initiator's static key 170 assert responder.remote_static == i_static[1] 171 172 def test_transport_after_handshake(self): 173 from i2p_crypto.noise import HandshakeState 174 i_static = self._make_keypair() 175 r_static = self._make_keypair() 176 177 initiator = HandshakeState("Noise_IK", initiator=True, 178 s=i_static, rs=r_static[1]) 179 responder = HandshakeState("Noise_IK", initiator=False, 180 s=r_static) 181 182 msg1 = initiator.write_message() 183 responder.read_message(msg1) 184 msg2 = responder.write_message() 185 initiator.read_message(msg2) 186 187 # Split into transport ciphers 188 i_send, i_recv = initiator.split() 189 r_recv, r_send = responder.split() 190 191 # Initiator sends to responder 192 ct = i_send.encrypt_with_ad(b"", b"transport data") 193 pt = r_recv.decrypt_with_ad(b"", ct) 194 assert pt == b"transport data" 195 196 # Responder sends to initiator 197 ct2 = r_send.encrypt_with_ad(b"", b"reply data") 198 pt2 = i_recv.decrypt_with_ad(b"", ct2) 199 assert pt2 == b"reply data" 200 201 202class TestHandshakeXK: 203 """Full XK handshake: initiator knows responder's static key, 3 messages.""" 204 205 def _make_keypair(self): 206 from i2p_crypto.x25519 import X25519DH 207 return X25519DH.generate_keypair() 208 209 def test_full_handshake(self): 210 from i2p_crypto.noise import HandshakeState 211 i_static = self._make_keypair() 212 r_static = self._make_keypair() 213 214 initiator = HandshakeState("Noise_XK", initiator=True, 215 s=i_static, rs=r_static[1]) 216 responder = HandshakeState("Noise_XK", initiator=False, 217 s=r_static) 218 219 # Message 1: initiator -> responder (e, es) 220 msg1 = initiator.write_message(b"msg1") 221 p1 = responder.read_message(msg1) 222 assert p1 == b"msg1" 223 224 # Message 2: responder -> initiator (e, ee) 225 msg2 = responder.write_message(b"msg2") 226 p2 = initiator.read_message(msg2) 227 assert p2 == b"msg2" 228 229 # Message 3: initiator -> responder (s, se) 230 msg3 = initiator.write_message(b"msg3") 231 p3 = responder.read_message(msg3) 232 assert p3 == b"msg3" 233 234 assert initiator.complete 235 assert responder.complete 236 237 # Responder learned initiator's static 238 assert responder.remote_static == i_static[1] 239 240 def test_transport_after_xk(self): 241 from i2p_crypto.noise import HandshakeState 242 i_static = self._make_keypair() 243 r_static = self._make_keypair() 244 245 initiator = HandshakeState("Noise_XK", initiator=True, 246 s=i_static, rs=r_static[1]) 247 responder = HandshakeState("Noise_XK", initiator=False, 248 s=r_static) 249 250 msg1 = initiator.write_message() 251 responder.read_message(msg1) 252 msg2 = responder.write_message() 253 initiator.read_message(msg2) 254 msg3 = initiator.write_message() 255 responder.read_message(msg3) 256 257 i_send, i_recv = initiator.split() 258 r_recv, r_send = responder.split() 259 260 ct = i_send.encrypt_with_ad(b"", b"xk transport") 261 pt = r_recv.decrypt_with_ad(b"", ct) 262 assert pt == b"xk transport" 263 264 265class TestHandshakeErrors: 266 def test_write_wrong_turn(self): 267 from i2p_crypto.noise import HandshakeState 268 from i2p_crypto.x25519 import X25519DH 269 r_static = X25519DH.generate_keypair() 270 # Responder tries to write first — should fail 271 responder = HandshakeState("Noise_IK", initiator=False, s=r_static) 272 with pytest.raises(RuntimeError): 273 responder.write_message() 274 275 def test_split_before_complete(self): 276 from i2p_crypto.noise import HandshakeState 277 from i2p_crypto.x25519 import X25519DH 278 s = X25519DH.generate_keypair() 279 rs = X25519DH.generate_keypair() 280 hs = HandshakeState("Noise_IK", initiator=True, s=s, rs=rs[1]) 281 with pytest.raises(RuntimeError): 282 hs.split() 283 284 def test_unknown_pattern(self): 285 from i2p_crypto.noise import HandshakeState 286 with pytest.raises(ValueError): 287 HandshakeState("Noise_NN", initiator=True)