A Python port of the Invisible Internet Project (I2P)
at main 289 lines 9.8 kB view raw
1"""Tests for SessionKeyManager and TagSet. 2 3TDD: these tests are written before the implementation. 4""" 5 6import os 7import time 8 9import pytest 10 11from i2p_crypto.session_key_manager import SessionKeyManager, TagSet 12 13 14class TestTagSet: 15 """Tests for TagSet.""" 16 17 def test_init_stores_key_and_tags(self): 18 key = os.urandom(32) 19 tags = [os.urandom(32) for _ in range(5)] 20 now = int(time.time() * 1000) 21 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) 22 assert ts.remaining() == 5 23 24 def test_consume_existing_tag(self): 25 key = os.urandom(32) 26 tags = [os.urandom(32) for _ in range(3)] 27 now = int(time.time() * 1000) 28 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) 29 assert ts.consume(tags[1]) is True 30 assert ts.remaining() == 2 31 32 def test_consume_missing_tag(self): 33 key = os.urandom(32) 34 tags = [os.urandom(32) for _ in range(3)] 35 now = int(time.time() * 1000) 36 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) 37 assert ts.consume(os.urandom(32)) is False 38 assert ts.remaining() == 3 39 40 def test_consume_same_tag_twice(self): 41 key = os.urandom(32) 42 tags = [os.urandom(32) for _ in range(3)] 43 now = int(time.time() * 1000) 44 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) 45 assert ts.consume(tags[0]) is True 46 assert ts.consume(tags[0]) is False 47 48 def test_is_expired(self): 49 key = os.urandom(32) 50 tags = [os.urandom(32)] 51 now = 1000000 52 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000) 53 assert ts.is_expired(now) is False 54 assert ts.is_expired(now + 719999) is False 55 assert ts.is_expired(now + 720000) is True 56 assert ts.is_expired(now + 999999) is True 57 58 def test_default_tag_lifetime(self): 59 """Default lifetime should be 720000ms (12 minutes).""" 60 assert TagSet.DEFAULT_LIFETIME_MS == 720000 61 62 63class TestSessionKeyManagerCreateSession: 64 """Tests for create_session.""" 65 66 def test_create_returns_32_byte_key(self): 67 mgr = SessionKeyManager() 68 dest = os.urandom(32) 69 key, tags = mgr.create_session(dest) 70 assert isinstance(key, bytes) 71 assert len(key) == 32 72 73 def test_create_returns_20_tags(self): 74 mgr = SessionKeyManager() 75 dest = os.urandom(32) 76 key, tags = mgr.create_session(dest) 77 assert len(tags) == 20 78 for t in tags: 79 assert isinstance(t, bytes) 80 assert len(t) == 32 81 82 def test_create_session_registers_destination(self): 83 mgr = SessionKeyManager() 84 dest = os.urandom(32) 85 mgr.create_session(dest) 86 assert mgr.has_session(dest) is True 87 88 def test_get_session_key(self): 89 mgr = SessionKeyManager() 90 dest = os.urandom(32) 91 key, _ = mgr.create_session(dest) 92 assert mgr.get_session_key(dest) == key 93 94 def test_has_session_unknown_dest(self): 95 mgr = SessionKeyManager() 96 assert mgr.has_session(os.urandom(32)) is False 97 98 def test_get_session_key_unknown_dest(self): 99 mgr = SessionKeyManager() 100 assert mgr.get_session_key(os.urandom(32)) is None 101 102 103class TestSessionKeyManagerConsume: 104 """Tests for consume_tag.""" 105 106 def test_consume_known_tag_returns_key(self): 107 mgr = SessionKeyManager() 108 dest = os.urandom(32) 109 key, tags = mgr.create_session(dest) 110 result = mgr.consume_tag(tags[0]) 111 assert result == key 112 113 def test_consume_unknown_tag_returns_none(self): 114 mgr = SessionKeyManager() 115 assert mgr.consume_tag(os.urandom(32)) is None 116 117 def test_replay_protection(self): 118 """Consumed tag cannot be reused.""" 119 mgr = SessionKeyManager() 120 dest = os.urandom(32) 121 key, tags = mgr.create_session(dest) 122 assert mgr.consume_tag(tags[5]) == key 123 assert mgr.consume_tag(tags[5]) is None 124 125 def test_consume_all_tags(self): 126 mgr = SessionKeyManager() 127 dest = os.urandom(32) 128 key, tags = mgr.create_session(dest) 129 for t in tags: 130 assert mgr.consume_tag(t) == key 131 # All consumed, none left 132 for t in tags: 133 assert mgr.consume_tag(t) is None 134 135 136class TestSessionKeyManagerAddTags: 137 """Tests for add_tags (receiving tags from a remote peer).""" 138 139 def test_add_tags_makes_them_consumable(self): 140 mgr = SessionKeyManager() 141 session_key = os.urandom(32) 142 new_tags = [os.urandom(32) for _ in range(5)] 143 now = int(time.time() * 1000) 144 mgr.add_tags(session_key, new_tags, expiration_ms=now + 720000) 145 for t in new_tags: 146 assert mgr.consume_tag(t) == session_key 147 148 def test_add_tags_expired_not_consumable(self): 149 mgr = SessionKeyManager() 150 session_key = os.urandom(32) 151 new_tags = [os.urandom(32) for _ in range(3)] 152 past = 1000 153 mgr.add_tags(session_key, new_tags, expiration_ms=past) 154 # Expire them 155 mgr.expire_old(now_ms=2000) 156 for t in new_tags: 157 assert mgr.consume_tag(t) is None 158 159 160class TestSessionKeyManagerExpire: 161 """Tests for expire_old.""" 162 163 def test_expire_removes_old_tagsets(self): 164 mgr = SessionKeyManager() 165 dest = os.urandom(32) 166 key, tags = mgr.create_session(dest) 167 # Expire far in the future 168 future = int(time.time() * 1000) + 10_000_000 169 mgr.expire_old(now_ms=future) 170 # All tags should be gone 171 for t in tags: 172 assert mgr.consume_tag(t) is None 173 174 def test_expire_keeps_fresh_tagsets(self): 175 mgr = SessionKeyManager() 176 dest = os.urandom(32) 177 key, tags = mgr.create_session(dest) 178 now = int(time.time() * 1000) 179 mgr.expire_old(now_ms=now) 180 # Tags should still be there 181 assert mgr.consume_tag(tags[0]) == key 182 183 184class TestSessionKeyManagerMultipleSessions: 185 """Tests for multiple destinations.""" 186 187 def test_different_destinations_independent(self): 188 mgr = SessionKeyManager() 189 dest1 = os.urandom(32) 190 dest2 = os.urandom(32) 191 key1, tags1 = mgr.create_session(dest1) 192 key2, tags2 = mgr.create_session(dest2) 193 assert key1 != key2 194 assert mgr.consume_tag(tags1[0]) == key1 195 assert mgr.consume_tag(tags2[0]) == key2 196 197 def test_has_session_both(self): 198 mgr = SessionKeyManager() 199 dest1 = os.urandom(32) 200 dest2 = os.urandom(32) 201 mgr.create_session(dest1) 202 mgr.create_session(dest2) 203 assert mgr.has_session(dest1) is True 204 assert mgr.has_session(dest2) is True 205 206 207class TestSessionKeyManagerPerformance: 208 """Performance test for O(1) tag lookup.""" 209 210 def test_large_tag_count_o1_lookup(self): 211 mgr = SessionKeyManager() 212 all_tags = [] 213 all_keys = [] 214 # Create 50 sessions with 20 tags each = 1000 tags 215 for _ in range(50): 216 dest = os.urandom(32) 217 key, tags = mgr.create_session(dest) 218 all_tags.extend(tags) 219 all_keys.extend([key] * len(tags)) 220 221 assert len(all_tags) == 1000 222 223 # Lookup should be fast (O(1) via dict) 224 start = time.monotonic() 225 for tag, expected_key in zip(all_tags, all_keys): 226 result = mgr.consume_tag(tag) 227 assert result == expected_key 228 elapsed = time.monotonic() - start 229 # 1000 lookups should complete in well under 1 second 230 assert elapsed < 1.0, f"1000 tag lookups took {elapsed:.3f}s, expected < 1s" 231 232 233class TestSessionKeyManagerExtended: 234 """Tests for Tier 0 extended API (message routing support).""" 235 236 def test_get_current_or_new_key_creates(self): 237 skm = SessionKeyManager() 238 dest = os.urandom(32) 239 key, is_new = skm.get_current_or_new_key(dest) 240 assert len(key) == 32 241 assert is_new is True 242 243 def test_get_current_or_new_key_reuses(self): 244 skm = SessionKeyManager() 245 dest = os.urandom(32) 246 key1, new1 = skm.get_current_or_new_key(dest) 247 key2, new2 = skm.get_current_or_new_key(dest) 248 assert key1 == key2 249 assert new1 is True 250 assert new2 is False 251 252 def test_consume_next_available_tag(self): 253 skm = SessionKeyManager() 254 dest = os.urandom(32) 255 skm.create_session(dest) 256 tag = skm.consume_next_available_tag(dest) 257 assert tag is not None and len(tag) == 32 258 259 def test_should_send_tags_low(self): 260 skm = SessionKeyManager() 261 dest = os.urandom(32) 262 skm.create_session(dest) # 20 tags 263 assert not skm.should_send_tags(dest, low_threshold=20) 264 # Consume most tags 265 for _ in range(19): 266 skm.consume_next_available_tag(dest) 267 assert skm.should_send_tags(dest, low_threshold=2) 268 269 def test_tags_delivered_and_acked(self): 270 skm = SessionKeyManager() 271 dest = os.urandom(32) 272 key, _ = skm.create_session(dest) 273 new_tags = [os.urandom(32) for _ in range(5)] 274 skm.tags_delivered(dest, key, new_tags, token=999) 275 skm.tags_acked(999) 276 # After ACK, tags should be in the system 277 for tag in new_tags: 278 assert skm.consume_tag(tag) == key 279 280 def test_fail_tags(self): 281 skm = SessionKeyManager() 282 dest = os.urandom(32) 283 key, _ = skm.create_session(dest) 284 new_tags = [os.urandom(32) for _ in range(5)] 285 skm.tags_delivered(dest, key, new_tags, token=888) 286 skm.fail_tags(888) 287 # After fail, tags should NOT be usable 288 for tag in new_tags: 289 assert skm.consume_tag(tag) is None