A Python port of the Invisible Internet Project (I2P)
at main 284 lines 8.9 kB view raw
1"""Noise protocol compliance verification tests. 2 3Verifies that the Noise_XK and Noise_IK implementations conform to the 4Noise Protocol Framework specification (noiseprotocol.org/noise.html). 5 6Covers: message sequence correctness, MixHash/MixKey calls, Split 7correctness, prologue binding, nonce overflow detection, and static 8key validation. 9""" 10from __future__ import annotations 11 12import hashlib 13import os 14 15import pytest 16 17from i2p_crypto.noise import CipherState, SymmetricState, HandshakeState 18from i2p_crypto.x25519 import X25519DH 19 20 21def _make_keypair(): 22 return X25519DH.generate_keypair() 23 24 25# ---------- Prologue binding ---------- 26 27 28class TestPrologueBinding: 29 """Noise spec: prologue is MixHash'd into h before any DH. 30 Mismatched prologues must cause handshake failure. 31 """ 32 33 def test_prologue_mismatch_causes_failure(self) -> None: 34 """XK handshake with different prologues must fail. 35 36 In XK, the 'es' token in message 1 runs MixKey, giving the 37 CipherState a key. The payload in message 1 is then encrypted 38 with AD = h (which includes the prologue). If prologues differ, 39 h values diverge, causing AEAD decryption to fail at message 1. 40 """ 41 i_s = _make_keypair() 42 r_s = _make_keypair() 43 44 initiator = HandshakeState( 45 "Noise_XK", initiator=True, 46 s=i_s, rs=r_s[1], prologue=b"prologue-A", 47 ) 48 responder = HandshakeState( 49 "Noise_XK", initiator=False, 50 s=r_s, prologue=b"prologue-B", 51 ) 52 53 msg1 = initiator.write_message(b"test") 54 55 # Responder fails to read message 1 — hash chains diverged 56 with pytest.raises(Exception): 57 responder.read_message(msg1) 58 59 def test_matching_prologue_succeeds(self) -> None: 60 """Same prologue on both sides must produce successful handshake.""" 61 i_s = _make_keypair() 62 r_s = _make_keypair() 63 64 initiator = HandshakeState( 65 "Noise_XK", initiator=True, 66 s=i_s, rs=r_s[1], prologue=b"same-prologue", 67 ) 68 responder = HandshakeState( 69 "Noise_XK", initiator=False, 70 s=r_s, prologue=b"same-prologue", 71 ) 72 73 msg1 = initiator.write_message(b"hello") 74 p1 = responder.read_message(msg1) 75 assert p1 == b"hello" 76 77 msg2 = responder.write_message(b"world") 78 p2 = initiator.read_message(msg2) 79 assert p2 == b"world" 80 81 msg3 = initiator.write_message(b"final") 82 p3 = responder.read_message(msg3) 83 assert p3 == b"final" 84 85 assert initiator.complete and responder.complete 86 87 def test_empty_prologue_default(self) -> None: 88 """Default empty prologue must match another empty prologue.""" 89 i_s = _make_keypair() 90 r_s = _make_keypair() 91 92 initiator = HandshakeState( 93 "Noise_XK", initiator=True, 94 s=i_s, rs=r_s[1], 95 ) 96 responder = HandshakeState( 97 "Noise_XK", initiator=False, 98 s=r_s, 99 ) 100 101 msg1 = initiator.write_message() 102 responder.read_message(msg1) 103 msg2 = responder.write_message() 104 initiator.read_message(msg2) 105 msg3 = initiator.write_message() 106 responder.read_message(msg3) 107 108 assert initiator.complete and responder.complete 109 110 111# ---------- Nonce overflow ---------- 112 113 114class TestNonceOverflow: 115 def test_nonce_overflow_encrypt_raises(self) -> None: 116 """CipherState must reject encryption at MAX_NONCE.""" 117 cs = CipherState(os.urandom(32)) 118 cs.set_nonce(CipherState.MAX_NONCE) 119 with pytest.raises(RuntimeError, match="Nonce exhausted"): 120 cs.encrypt_with_ad(b"", b"data") 121 122 def test_nonce_overflow_decrypt_raises(self) -> None: 123 """CipherState must reject decryption at MAX_NONCE.""" 124 key = os.urandom(32) 125 cs_enc = CipherState(key) 126 ct = cs_enc.encrypt_with_ad(b"", b"data") 127 128 cs_dec = CipherState(key) 129 cs_dec.set_nonce(CipherState.MAX_NONCE) 130 with pytest.raises(RuntimeError, match="Nonce exhausted"): 131 cs_dec.decrypt_with_ad(b"", ct) 132 133 def test_nonce_just_below_max_works(self) -> None: 134 """Nonce at MAX_NONCE - 1 should still work.""" 135 key = os.urandom(32) 136 cs_enc = CipherState(key) 137 cs_enc.set_nonce(CipherState.MAX_NONCE - 1) 138 ct = cs_enc.encrypt_with_ad(b"", b"last-message") 139 140 cs_dec = CipherState(key) 141 cs_dec.set_nonce(CipherState.MAX_NONCE - 1) 142 pt = cs_dec.decrypt_with_ad(b"", ct) 143 assert pt == b"last-message" 144 145 146# ---------- Split correctness ---------- 147 148 149class TestSplitCorrectness: 150 def test_split_produces_distinct_keys(self) -> None: 151 """Split must produce two CipherStates with different keys.""" 152 ss = SymmetricState(b"test-protocol") 153 ss.mix_key(os.urandom(32)) 154 c1, c2 = ss.split() 155 assert c1.has_key() and c2.has_key() 156 assert c1._key != c2._key 157 158 def test_split_ciphers_start_at_nonce_zero(self) -> None: 159 """Both CipherStates from Split must start with nonce = 0.""" 160 ss = SymmetricState(b"test-protocol") 161 ss.mix_key(os.urandom(32)) 162 c1, c2 = ss.split() 163 assert c1._n == 0 164 assert c2._n == 0 165 166 def test_split_is_deterministic(self) -> None: 167 """Two SymmetricStates with same history produce same Split keys.""" 168 ikm = os.urandom(32) 169 ss1 = SymmetricState(b"test") 170 ss1.mix_key(ikm) 171 c1a, c1b = ss1.split() 172 173 ss2 = SymmetricState(b"test") 174 ss2.mix_key(ikm) 175 c2a, c2b = ss2.split() 176 177 assert c1a._key == c2a._key 178 assert c1b._key == c2b._key 179 180 181# ---------- XK message sequence compliance ---------- 182 183 184class TestXKMessageSequence: 185 """Verify XK pattern: -> e, es / <- e, ee / -> s, se""" 186 187 def test_message_1_contains_ephemeral(self) -> None: 188 """Message 1 must contain initiator's ephemeral public key (32 bytes).""" 189 i_s = _make_keypair() 190 r_s = _make_keypair() 191 192 initiator = HandshakeState( 193 "Noise_XK", initiator=True, 194 s=i_s, rs=r_s[1], 195 ) 196 197 msg1 = initiator.write_message(b"") 198 # Message 1 for XK with empty payload: 199 # e(32) + encrypted_payload(0 bytes + 16 byte tag from es having set cipher key) 200 # After MixKey from es, the cipher has a key, so payload gets encrypted 201 assert len(msg1) >= 32 202 203 def test_message_3_contains_encrypted_static(self) -> None: 204 """Message 3 must contain initiator's encrypted static key.""" 205 i_s = _make_keypair() 206 r_s = _make_keypair() 207 208 initiator = HandshakeState( 209 "Noise_XK", initiator=True, 210 s=i_s, rs=r_s[1], 211 ) 212 responder = HandshakeState( 213 "Noise_XK", initiator=False, 214 s=r_s, 215 ) 216 217 msg1 = initiator.write_message() 218 responder.read_message(msg1) 219 msg2 = responder.write_message() 220 initiator.read_message(msg2) 221 222 msg3 = initiator.write_message() 223 # Message 3: encrypted static (32 + 16 tag) + encrypted payload (0 + 16 tag) = 64 224 assert len(msg3) == 64 225 226 # Responder recovers initiator's static key 227 responder.read_message(msg3) 228 assert responder.remote_static == i_s[1] 229 230 231# ---------- MixHash chain integrity ---------- 232 233 234class TestMixHashChainIntegrity: 235 def test_handshake_hash_chains_match(self) -> None: 236 """After handshake, both sides must have identical h (handshake hash).""" 237 i_s = _make_keypair() 238 r_s = _make_keypair() 239 240 initiator = HandshakeState( 241 "Noise_XK", initiator=True, 242 s=i_s, rs=r_s[1], 243 ) 244 responder = HandshakeState( 245 "Noise_XK", initiator=False, 246 s=r_s, 247 ) 248 249 msg1 = initiator.write_message() 250 responder.read_message(msg1) 251 msg2 = responder.write_message() 252 initiator.read_message(msg2) 253 msg3 = initiator.write_message() 254 responder.read_message(msg3) 255 256 # Both sides must have identical handshake hash 257 assert initiator._ss.h == responder._ss.h 258 259 260# ---------- Rekey correctness ---------- 261 262 263class TestRekey: 264 def test_rekey_changes_key(self) -> None: 265 key = os.urandom(32) 266 cs = CipherState(key) 267 old_key = cs._key 268 cs.rekey() 269 assert cs._key != old_key 270 assert len(cs._key) == 32 271 272 def test_rekey_preserves_nonce(self) -> None: 273 cs = CipherState(os.urandom(32)) 274 cs.set_nonce(42) 275 cs.rekey() 276 assert cs._n == 42 277 278 def test_rekey_is_deterministic(self) -> None: 279 key = os.urandom(32) 280 cs1 = CipherState(key) 281 cs2 = CipherState(key) 282 cs1.rekey() 283 cs2.rekey() 284 assert cs1._key == cs2._key