"""Tests for Noise protocol framework.""" import os import pytest class TestCipherState: def test_construct_no_key(self): from i2p_crypto.noise import CipherState cs = CipherState() assert not cs.has_key() def test_construct_with_key(self): from i2p_crypto.noise import CipherState cs = CipherState(os.urandom(32)) assert cs.has_key() def test_no_key_passthrough(self): from i2p_crypto.noise import CipherState cs = CipherState() pt = b"hello noise" ct = cs.encrypt_with_ad(b"", pt) assert ct == pt # No encryption without key def test_encrypt_decrypt_roundtrip(self): from i2p_crypto.noise import CipherState key = os.urandom(32) cs_enc = CipherState(key) cs_dec = CipherState(key) pt = b"hello noise protocol" ad = b"additional data" ct = cs_enc.encrypt_with_ad(ad, pt) assert ct != pt result = cs_dec.decrypt_with_ad(ad, ct) assert result == pt def test_nonce_increment(self): from i2p_crypto.noise import CipherState key = os.urandom(32) cs = CipherState(key) # Encrypt twice — different ciphertexts due to nonce increment ct1 = cs.encrypt_with_ad(b"", b"data") ct2 = cs.encrypt_with_ad(b"", b"data") assert ct1 != ct2 def test_set_nonce(self): from i2p_crypto.noise import CipherState key = os.urandom(32) cs1 = CipherState(key) cs1.set_nonce(5) ct = cs1.encrypt_with_ad(b"", b"test") cs2 = CipherState(key) cs2.set_nonce(5) pt = cs2.decrypt_with_ad(b"", ct) assert pt == b"test" def test_wrong_key_fails(self): from i2p_crypto.noise import CipherState cs1 = CipherState(os.urandom(32)) cs2 = CipherState(os.urandom(32)) ct = cs1.encrypt_with_ad(b"", b"secret") with pytest.raises(Exception): cs2.decrypt_with_ad(b"", ct) def test_rekey(self): from i2p_crypto.noise import CipherState key = os.urandom(32) cs = CipherState(key) old_key = cs._key cs.rekey() assert cs._key != old_key assert len(cs._key) == 32 class TestSymmetricState: def test_construct_short_name(self): from i2p_crypto.noise import SymmetricState ss = SymmetricState(b"short") assert len(ss.h) == 32 assert ss.h == b"short" + b"\x00" * 27 def test_construct_long_name(self): from i2p_crypto.noise import SymmetricState name = b"A" * 64 ss = SymmetricState(name) import hashlib assert ss.h == hashlib.sha256(name).digest() def test_mix_hash_changes_h(self): from i2p_crypto.noise import SymmetricState ss = SymmetricState(b"test") h_before = ss.h ss.mix_hash(b"some data") assert ss.h != h_before def test_mix_key_changes_ck(self): from i2p_crypto.noise import SymmetricState ss = SymmetricState(b"test") ck_before = ss.ck ss.mix_key(os.urandom(32)) assert ss.ck != ck_before def test_mix_key_enables_encryption(self): from i2p_crypto.noise import SymmetricState ss = SymmetricState(b"test") assert not ss._cipher.has_key() ss.mix_key(os.urandom(32)) assert ss._cipher.has_key() def test_encrypt_decrypt_and_hash_roundtrip(self): from i2p_crypto.noise import SymmetricState ss1 = SymmetricState(b"test") ss2 = SymmetricState(b"test") # mix_key to enable encryption ikm = os.urandom(32) ss1.mix_key(ikm) ss2.mix_key(ikm) ct = ss1.encrypt_and_hash(b"payload") pt = ss2.decrypt_and_hash(ct) assert pt == b"payload" # Hashes should match after both operations assert ss1.h == ss2.h def test_split_produces_two_ciphers(self): from i2p_crypto.noise import SymmetricState ss = SymmetricState(b"test") ss.mix_key(os.urandom(32)) c1, c2 = ss.split() assert c1.has_key() assert c2.has_key() # Different keys assert c1._key != c2._key class TestHandshakeIK: """Full IK handshake: initiator knows responder's static key.""" def _make_keypair(self): from i2p_crypto.x25519 import X25519DH return X25519DH.generate_keypair() def test_full_handshake(self): from i2p_crypto.noise import HandshakeState # Generate keys i_static = self._make_keypair() r_static = self._make_keypair() # Initiator knows responder's static public initiator = HandshakeState("Noise_IK", initiator=True, s=i_static, rs=r_static[1]) responder = HandshakeState("Noise_IK", initiator=False, s=r_static) # Message 1: initiator -> responder msg1 = initiator.write_message(b"hello") payload1 = responder.read_message(msg1) assert payload1 == b"hello" # Message 2: responder -> initiator msg2 = responder.write_message(b"world") payload2 = initiator.read_message(msg2) assert payload2 == b"world" # Both complete assert initiator.complete assert responder.complete # Responder learned initiator's static key assert responder.remote_static == i_static[1] def test_transport_after_handshake(self): from i2p_crypto.noise import HandshakeState i_static = self._make_keypair() r_static = self._make_keypair() initiator = HandshakeState("Noise_IK", initiator=True, s=i_static, rs=r_static[1]) responder = HandshakeState("Noise_IK", initiator=False, s=r_static) msg1 = initiator.write_message() responder.read_message(msg1) msg2 = responder.write_message() initiator.read_message(msg2) # Split into transport ciphers i_send, i_recv = initiator.split() r_recv, r_send = responder.split() # Initiator sends to responder ct = i_send.encrypt_with_ad(b"", b"transport data") pt = r_recv.decrypt_with_ad(b"", ct) assert pt == b"transport data" # Responder sends to initiator ct2 = r_send.encrypt_with_ad(b"", b"reply data") pt2 = i_recv.decrypt_with_ad(b"", ct2) assert pt2 == b"reply data" class TestHandshakeXK: """Full XK handshake: initiator knows responder's static key, 3 messages.""" def _make_keypair(self): from i2p_crypto.x25519 import X25519DH return X25519DH.generate_keypair() def test_full_handshake(self): from i2p_crypto.noise import HandshakeState i_static = self._make_keypair() r_static = self._make_keypair() initiator = HandshakeState("Noise_XK", initiator=True, s=i_static, rs=r_static[1]) responder = HandshakeState("Noise_XK", initiator=False, s=r_static) # Message 1: initiator -> responder (e, es) msg1 = initiator.write_message(b"msg1") p1 = responder.read_message(msg1) assert p1 == b"msg1" # Message 2: responder -> initiator (e, ee) msg2 = responder.write_message(b"msg2") p2 = initiator.read_message(msg2) assert p2 == b"msg2" # Message 3: initiator -> responder (s, se) msg3 = initiator.write_message(b"msg3") p3 = responder.read_message(msg3) assert p3 == b"msg3" assert initiator.complete assert responder.complete # Responder learned initiator's static assert responder.remote_static == i_static[1] def test_transport_after_xk(self): from i2p_crypto.noise import HandshakeState i_static = self._make_keypair() r_static = self._make_keypair() initiator = HandshakeState("Noise_XK", initiator=True, s=i_static, rs=r_static[1]) responder = HandshakeState("Noise_XK", initiator=False, s=r_static) msg1 = initiator.write_message() responder.read_message(msg1) msg2 = responder.write_message() initiator.read_message(msg2) msg3 = initiator.write_message() responder.read_message(msg3) i_send, i_recv = initiator.split() r_recv, r_send = responder.split() ct = i_send.encrypt_with_ad(b"", b"xk transport") pt = r_recv.decrypt_with_ad(b"", ct) assert pt == b"xk transport" class TestHandshakeErrors: def test_write_wrong_turn(self): from i2p_crypto.noise import HandshakeState from i2p_crypto.x25519 import X25519DH r_static = X25519DH.generate_keypair() # Responder tries to write first — should fail responder = HandshakeState("Noise_IK", initiator=False, s=r_static) with pytest.raises(RuntimeError): responder.write_message() def test_split_before_complete(self): from i2p_crypto.noise import HandshakeState from i2p_crypto.x25519 import X25519DH s = X25519DH.generate_keypair() rs = X25519DH.generate_keypair() hs = HandshakeState("Noise_IK", initiator=True, s=s, rs=rs[1]) with pytest.raises(RuntimeError): hs.split() def test_unknown_pattern(self): from i2p_crypto.noise import HandshakeState with pytest.raises(ValueError): HandshakeState("Noise_NN", initiator=True)