A Python port of the Invisible Internet Project (I2P)
1"""Tests for SessionKeyManager and TagSet.
2
3TDD: these tests are written before the implementation.
4"""
5
6import os
7import time
8
9import pytest
10
11from i2p_crypto.session_key_manager import SessionKeyManager, TagSet
12
13
14class TestTagSet:
15 """Tests for TagSet."""
16
17 def test_init_stores_key_and_tags(self):
18 key = os.urandom(32)
19 tags = [os.urandom(32) for _ in range(5)]
20 now = int(time.time() * 1000)
21 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000)
22 assert ts.remaining() == 5
23
24 def test_consume_existing_tag(self):
25 key = os.urandom(32)
26 tags = [os.urandom(32) for _ in range(3)]
27 now = int(time.time() * 1000)
28 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000)
29 assert ts.consume(tags[1]) is True
30 assert ts.remaining() == 2
31
32 def test_consume_missing_tag(self):
33 key = os.urandom(32)
34 tags = [os.urandom(32) for _ in range(3)]
35 now = int(time.time() * 1000)
36 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000)
37 assert ts.consume(os.urandom(32)) is False
38 assert ts.remaining() == 3
39
40 def test_consume_same_tag_twice(self):
41 key = os.urandom(32)
42 tags = [os.urandom(32) for _ in range(3)]
43 now = int(time.time() * 1000)
44 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000)
45 assert ts.consume(tags[0]) is True
46 assert ts.consume(tags[0]) is False
47
48 def test_is_expired(self):
49 key = os.urandom(32)
50 tags = [os.urandom(32)]
51 now = 1000000
52 ts = TagSet(session_key=key, tags=tags, creation_ms=now, expiration_ms=now + 720000)
53 assert ts.is_expired(now) is False
54 assert ts.is_expired(now + 719999) is False
55 assert ts.is_expired(now + 720000) is True
56 assert ts.is_expired(now + 999999) is True
57
58 def test_default_tag_lifetime(self):
59 """Default lifetime should be 720000ms (12 minutes)."""
60 assert TagSet.DEFAULT_LIFETIME_MS == 720000
61
62
63class TestSessionKeyManagerCreateSession:
64 """Tests for create_session."""
65
66 def test_create_returns_32_byte_key(self):
67 mgr = SessionKeyManager()
68 dest = os.urandom(32)
69 key, tags = mgr.create_session(dest)
70 assert isinstance(key, bytes)
71 assert len(key) == 32
72
73 def test_create_returns_20_tags(self):
74 mgr = SessionKeyManager()
75 dest = os.urandom(32)
76 key, tags = mgr.create_session(dest)
77 assert len(tags) == 20
78 for t in tags:
79 assert isinstance(t, bytes)
80 assert len(t) == 32
81
82 def test_create_session_registers_destination(self):
83 mgr = SessionKeyManager()
84 dest = os.urandom(32)
85 mgr.create_session(dest)
86 assert mgr.has_session(dest) is True
87
88 def test_get_session_key(self):
89 mgr = SessionKeyManager()
90 dest = os.urandom(32)
91 key, _ = mgr.create_session(dest)
92 assert mgr.get_session_key(dest) == key
93
94 def test_has_session_unknown_dest(self):
95 mgr = SessionKeyManager()
96 assert mgr.has_session(os.urandom(32)) is False
97
98 def test_get_session_key_unknown_dest(self):
99 mgr = SessionKeyManager()
100 assert mgr.get_session_key(os.urandom(32)) is None
101
102
103class TestSessionKeyManagerConsume:
104 """Tests for consume_tag."""
105
106 def test_consume_known_tag_returns_key(self):
107 mgr = SessionKeyManager()
108 dest = os.urandom(32)
109 key, tags = mgr.create_session(dest)
110 result = mgr.consume_tag(tags[0])
111 assert result == key
112
113 def test_consume_unknown_tag_returns_none(self):
114 mgr = SessionKeyManager()
115 assert mgr.consume_tag(os.urandom(32)) is None
116
117 def test_replay_protection(self):
118 """Consumed tag cannot be reused."""
119 mgr = SessionKeyManager()
120 dest = os.urandom(32)
121 key, tags = mgr.create_session(dest)
122 assert mgr.consume_tag(tags[5]) == key
123 assert mgr.consume_tag(tags[5]) is None
124
125 def test_consume_all_tags(self):
126 mgr = SessionKeyManager()
127 dest = os.urandom(32)
128 key, tags = mgr.create_session(dest)
129 for t in tags:
130 assert mgr.consume_tag(t) == key
131 # All consumed, none left
132 for t in tags:
133 assert mgr.consume_tag(t) is None
134
135
136class TestSessionKeyManagerAddTags:
137 """Tests for add_tags (receiving tags from a remote peer)."""
138
139 def test_add_tags_makes_them_consumable(self):
140 mgr = SessionKeyManager()
141 session_key = os.urandom(32)
142 new_tags = [os.urandom(32) for _ in range(5)]
143 now = int(time.time() * 1000)
144 mgr.add_tags(session_key, new_tags, expiration_ms=now + 720000)
145 for t in new_tags:
146 assert mgr.consume_tag(t) == session_key
147
148 def test_add_tags_expired_not_consumable(self):
149 mgr = SessionKeyManager()
150 session_key = os.urandom(32)
151 new_tags = [os.urandom(32) for _ in range(3)]
152 past = 1000
153 mgr.add_tags(session_key, new_tags, expiration_ms=past)
154 # Expire them
155 mgr.expire_old(now_ms=2000)
156 for t in new_tags:
157 assert mgr.consume_tag(t) is None
158
159
160class TestSessionKeyManagerExpire:
161 """Tests for expire_old."""
162
163 def test_expire_removes_old_tagsets(self):
164 mgr = SessionKeyManager()
165 dest = os.urandom(32)
166 key, tags = mgr.create_session(dest)
167 # Expire far in the future
168 future = int(time.time() * 1000) + 10_000_000
169 mgr.expire_old(now_ms=future)
170 # All tags should be gone
171 for t in tags:
172 assert mgr.consume_tag(t) is None
173
174 def test_expire_keeps_fresh_tagsets(self):
175 mgr = SessionKeyManager()
176 dest = os.urandom(32)
177 key, tags = mgr.create_session(dest)
178 now = int(time.time() * 1000)
179 mgr.expire_old(now_ms=now)
180 # Tags should still be there
181 assert mgr.consume_tag(tags[0]) == key
182
183
184class TestSessionKeyManagerMultipleSessions:
185 """Tests for multiple destinations."""
186
187 def test_different_destinations_independent(self):
188 mgr = SessionKeyManager()
189 dest1 = os.urandom(32)
190 dest2 = os.urandom(32)
191 key1, tags1 = mgr.create_session(dest1)
192 key2, tags2 = mgr.create_session(dest2)
193 assert key1 != key2
194 assert mgr.consume_tag(tags1[0]) == key1
195 assert mgr.consume_tag(tags2[0]) == key2
196
197 def test_has_session_both(self):
198 mgr = SessionKeyManager()
199 dest1 = os.urandom(32)
200 dest2 = os.urandom(32)
201 mgr.create_session(dest1)
202 mgr.create_session(dest2)
203 assert mgr.has_session(dest1) is True
204 assert mgr.has_session(dest2) is True
205
206
207class TestSessionKeyManagerPerformance:
208 """Performance test for O(1) tag lookup."""
209
210 def test_large_tag_count_o1_lookup(self):
211 mgr = SessionKeyManager()
212 all_tags = []
213 all_keys = []
214 # Create 50 sessions with 20 tags each = 1000 tags
215 for _ in range(50):
216 dest = os.urandom(32)
217 key, tags = mgr.create_session(dest)
218 all_tags.extend(tags)
219 all_keys.extend([key] * len(tags))
220
221 assert len(all_tags) == 1000
222
223 # Lookup should be fast (O(1) via dict)
224 start = time.monotonic()
225 for tag, expected_key in zip(all_tags, all_keys):
226 result = mgr.consume_tag(tag)
227 assert result == expected_key
228 elapsed = time.monotonic() - start
229 # 1000 lookups should complete in well under 1 second
230 assert elapsed < 1.0, f"1000 tag lookups took {elapsed:.3f}s, expected < 1s"
231
232
233class TestSessionKeyManagerExtended:
234 """Tests for Tier 0 extended API (message routing support)."""
235
236 def test_get_current_or_new_key_creates(self):
237 skm = SessionKeyManager()
238 dest = os.urandom(32)
239 key, is_new = skm.get_current_or_new_key(dest)
240 assert len(key) == 32
241 assert is_new is True
242
243 def test_get_current_or_new_key_reuses(self):
244 skm = SessionKeyManager()
245 dest = os.urandom(32)
246 key1, new1 = skm.get_current_or_new_key(dest)
247 key2, new2 = skm.get_current_or_new_key(dest)
248 assert key1 == key2
249 assert new1 is True
250 assert new2 is False
251
252 def test_consume_next_available_tag(self):
253 skm = SessionKeyManager()
254 dest = os.urandom(32)
255 skm.create_session(dest)
256 tag = skm.consume_next_available_tag(dest)
257 assert tag is not None and len(tag) == 32
258
259 def test_should_send_tags_low(self):
260 skm = SessionKeyManager()
261 dest = os.urandom(32)
262 skm.create_session(dest) # 20 tags
263 assert not skm.should_send_tags(dest, low_threshold=20)
264 # Consume most tags
265 for _ in range(19):
266 skm.consume_next_available_tag(dest)
267 assert skm.should_send_tags(dest, low_threshold=2)
268
269 def test_tags_delivered_and_acked(self):
270 skm = SessionKeyManager()
271 dest = os.urandom(32)
272 key, _ = skm.create_session(dest)
273 new_tags = [os.urandom(32) for _ in range(5)]
274 skm.tags_delivered(dest, key, new_tags, token=999)
275 skm.tags_acked(999)
276 # After ACK, tags should be in the system
277 for tag in new_tags:
278 assert skm.consume_tag(tag) == key
279
280 def test_fail_tags(self):
281 skm = SessionKeyManager()
282 dest = os.urandom(32)
283 key, _ = skm.create_session(dest)
284 new_tags = [os.urandom(32) for _ in range(5)]
285 skm.tags_delivered(dest, key, new_tags, token=888)
286 skm.fail_tags(888)
287 # After fail, tags should NOT be usable
288 for tag in new_tags:
289 assert skm.consume_tag(tag) is None