"""Tests for SessionKeyManager and TagSet. TDD: these tests are written before the implementation. """ import os import time import pytest from i2p_crypto.session_key_manager import SessionKeyManager, TagSet class TestTagSet: """Tests for TagSet.""" def test_init_stores_key_and_tags(self): key = os.urandom(32) tags = [os.urandom(32) for _ in range(5)] now = int(time.time() * 1000) ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) assert ts.remaining() == 5 def test_consume_existing_tag(self): key = os.urandom(32) tags = [os.urandom(32) for _ in range(3)] now = int(time.time() * 1000) ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) assert ts.consume(tags[1]) is True assert ts.remaining() == 2 def test_consume_missing_tag(self): key = os.urandom(32) tags = [os.urandom(32) for _ in range(3)] now = int(time.time() * 1000) ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) assert ts.consume(os.urandom(32)) is False assert ts.remaining() == 3 def test_consume_same_tag_twice(self): key = os.urandom(32) tags = [os.urandom(32) for _ in range(3)] now = int(time.time() * 1000) ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) assert ts.consume(tags[0]) is True assert ts.consume(tags[0]) is False def test_is_expired(self): key = os.urandom(32) tags = [os.urandom(32)] now = 1000000 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) assert ts.is_expired(now) is False assert ts.is_expired(now + 719999) is False assert ts.is_expired(now + 720000) is True assert ts.is_expired(now + 999999) is True def test_default_tag_lifetime(self): """Default lifetime should be 720000ms (12 minutes).""" assert TagSet.DEFAULT_LIFETIME_MS == 720000 class TestSessionKeyManagerCreateSession: """Tests for create_session.""" def test_create_returns_32_byte_key(self): mgr = SessionKeyManager() dest = os.urandom(32) key, tags = mgr.create_session(dest) assert isinstance(key, bytes) assert len(key) == 32 def test_create_returns_20_tags(self): mgr = SessionKeyManager() dest = os.urandom(32) key, tags = mgr.create_session(dest) assert len(tags) == 20 for t in tags: assert isinstance(t, bytes) assert len(t) == 32 def test_create_session_registers_destination(self): mgr = SessionKeyManager() dest = os.urandom(32) mgr.create_session(dest) assert mgr.has_session(dest) is True def test_get_session_key(self): mgr = SessionKeyManager() dest = os.urandom(32) key, _ = mgr.create_session(dest) assert mgr.get_session_key(dest) == key def test_has_session_unknown_dest(self): mgr = SessionKeyManager() assert mgr.has_session(os.urandom(32)) is False def test_get_session_key_unknown_dest(self): mgr = SessionKeyManager() assert mgr.get_session_key(os.urandom(32)) is None class TestSessionKeyManagerConsume: """Tests for consume_tag.""" def test_consume_known_tag_returns_key(self): mgr = SessionKeyManager() dest = os.urandom(32) key, tags = mgr.create_session(dest) result = mgr.consume_tag(tags[0]) assert result == key def test_consume_unknown_tag_returns_none(self): mgr = SessionKeyManager() assert mgr.consume_tag(os.urandom(32)) is None def test_replay_protection(self): """Consumed tag cannot be reused.""" mgr = SessionKeyManager() dest = os.urandom(32) key, tags = mgr.create_session(dest) assert mgr.consume_tag(tags[5]) == key assert mgr.consume_tag(tags[5]) is None def test_consume_all_tags(self): mgr = SessionKeyManager() dest = os.urandom(32) key, tags = mgr.create_session(dest) for t in tags: assert mgr.consume_tag(t) == key # All consumed, none left for t in tags: assert mgr.consume_tag(t) is None class TestSessionKeyManagerAddTags: """Tests for add_tags (receiving tags from a remote peer).""" def test_add_tags_makes_them_consumable(self): mgr = SessionKeyManager() session_key = os.urandom(32) new_tags = [os.urandom(32) for _ in range(5)] now = int(time.time() * 1000) mgr.add_tags(session_key, new_tags, expiration_ms=now + 720000) for t in new_tags: assert mgr.consume_tag(t) == session_key def test_add_tags_expired_not_consumable(self): mgr = SessionKeyManager() session_key = os.urandom(32) new_tags = [os.urandom(32) for _ in range(3)] past = 1000 mgr.add_tags(session_key, new_tags, expiration_ms=past) # Expire them mgr.expire_old(now_ms=2000) for t in new_tags: assert mgr.consume_tag(t) is None class TestSessionKeyManagerExpire: """Tests for expire_old.""" def test_expire_removes_old_tagsets(self): mgr = SessionKeyManager() dest = os.urandom(32) key, tags = mgr.create_session(dest) # Expire far in the future future = int(time.time() * 1000) + 10_000_000 mgr.expire_old(now_ms=future) # All tags should be gone for t in tags: assert mgr.consume_tag(t) is None def test_expire_keeps_fresh_tagsets(self): mgr = SessionKeyManager() dest = os.urandom(32) key, tags = mgr.create_session(dest) now = int(time.time() * 1000) mgr.expire_old(now_ms=now) # Tags should still be there assert mgr.consume_tag(tags[0]) == key class TestSessionKeyManagerMultipleSessions: """Tests for multiple destinations.""" def test_different_destinations_independent(self): mgr = SessionKeyManager() dest1 = os.urandom(32) dest2 = os.urandom(32) key1, tags1 = mgr.create_session(dest1) key2, tags2 = mgr.create_session(dest2) assert key1 != key2 assert mgr.consume_tag(tags1[0]) == key1 assert mgr.consume_tag(tags2[0]) == key2 def test_has_session_both(self): mgr = SessionKeyManager() dest1 = os.urandom(32) dest2 = os.urandom(32) mgr.create_session(dest1) mgr.create_session(dest2) assert mgr.has_session(dest1) is True assert mgr.has_session(dest2) is True class TestSessionKeyManagerPerformance: """Performance test for O(1) tag lookup.""" def test_large_tag_count_o1_lookup(self): mgr = SessionKeyManager() all_tags = [] all_keys = [] # Create 50 sessions with 20 tags each = 1000 tags for _ in range(50): dest = os.urandom(32) key, tags = mgr.create_session(dest) all_tags.extend(tags) all_keys.extend([key] * len(tags)) assert len(all_tags) == 1000 # Lookup should be fast (O(1) via dict) start = time.monotonic() for tag, expected_key in zip(all_tags, all_keys): result = mgr.consume_tag(tag) assert result == expected_key elapsed = time.monotonic() - start # 1000 lookups should complete in well under 1 second assert elapsed < 1.0, f"1000 tag lookups took {elapsed:.3f}s, expected < 1s" class TestSessionKeyManagerExtended: """Tests for Tier 0 extended API (message routing support).""" def test_get_current_or_new_key_creates(self): skm = SessionKeyManager() dest = os.urandom(32) key, is_new = skm.get_current_or_new_key(dest) assert len(key) == 32 assert is_new is True def test_get_current_or_new_key_reuses(self): skm = SessionKeyManager() dest = os.urandom(32) key1, new1 = skm.get_current_or_new_key(dest) key2, new2 = skm.get_current_or_new_key(dest) assert key1 == key2 assert new1 is True assert new2 is False def test_consume_next_available_tag(self): skm = SessionKeyManager() dest = os.urandom(32) skm.create_session(dest) tag = skm.consume_next_available_tag(dest) assert tag is not None and len(tag) == 32 def test_should_send_tags_low(self): skm = SessionKeyManager() dest = os.urandom(32) skm.create_session(dest) # 20 tags assert not skm.should_send_tags(dest, low_threshold=20) # Consume most tags for _ in range(19): skm.consume_next_available_tag(dest) assert skm.should_send_tags(dest, low_threshold=2) def test_tags_delivered_and_acked(self): skm = SessionKeyManager() dest = os.urandom(32) key, _ = skm.create_session(dest) new_tags = [os.urandom(32) for _ in range(5)] skm.tags_delivered(dest, key, new_tags, token=999) skm.tags_acked(999) # After ACK, tags should be in the system for tag in new_tags: assert skm.consume_tag(tag) == key def test_fail_tags(self): skm = SessionKeyManager() dest = os.urandom(32) key, _ = skm.create_session(dest) new_tags = [os.urandom(32) for _ in range(5)] skm.tags_delivered(dest, key, new_tags, token=888) skm.fail_tags(888) # After fail, tags should NOT be usable for tag in new_tags: assert skm.consume_tag(tag) is None