A Python port of the Invisible Internet Project (I2P)
1"""Tests for net.i2p.util Tier 0 — standalone utilities.
2
3TDD — tests for PortMapper, Clock, ConvertToHash, VersionComparator,
4RFC822Date, SecureFile, ObjectCounter, ConcurrentHashSet, ArraySet.
5"""
6
7from __future__ import annotations
8
9import os
10import tempfile
11import threading
12import time
13from concurrent.futures import ThreadPoolExecutor
14
15import pytest
16
17from i2p_util.port_mapper import PortMapper
18from i2p_util.clock import Clock
19from i2p_util.convert import convert_to_hash
20from i2p_util.version import compare_versions
21from i2p_util.rfc822 import to_rfc822, parse_rfc822
22from i2p_util.secure_file import SecureFile, SecureDirectory
23from i2p_util.object_counter import ObjectCounter
24from i2p_util.concurrent_set import ConcurrentHashSet
25from i2p_util.array_set import ArraySet
26
27
28# ---------------------------------------------------------------------------
29# PortMapper
30# ---------------------------------------------------------------------------
31
32
33class TestPortMapper:
34 """Thread-safe service name → port registry."""
35
36 def test_register_and_get_port(self):
37 pm = PortMapper()
38 pm.register("HTTP_PROXY", "127.0.0.1", 4444)
39 assert pm.get_port("HTTP_PROXY") == 4444
40
41 def test_get_host(self):
42 pm = PortMapper()
43 pm.register("HTTP_PROXY", "127.0.0.1", 4444)
44 assert pm.get_host("HTTP_PROXY") == "127.0.0.1"
45
46 def test_get_port_default(self):
47 pm = PortMapper()
48 assert pm.get_port("MISSING", default=0) == 0
49
50 def test_get_host_default(self):
51 pm = PortMapper()
52 assert pm.get_host("MISSING", default="") == ""
53
54 def test_unregister(self):
55 pm = PortMapper()
56 pm.register("IRC", "127.0.0.1", 6668)
57 pm.unregister("IRC")
58 assert pm.get_port("IRC", default=-1) == -1
59
60 def test_get_all_services(self):
61 pm = PortMapper()
62 pm.register("A", "localhost", 1)
63 pm.register("B", "localhost", 2)
64 services = pm.get_all_services()
65 assert len(services) == 2
66 assert "A" in services
67 assert "B" in services
68
69 def test_thread_safety(self):
70 pm = PortMapper()
71
72 def register_range(start):
73 for i in range(start, start + 50):
74 pm.register(f"SVC_{i}", "127.0.0.1", i)
75
76 with ThreadPoolExecutor(max_workers=4) as ex:
77 ex.submit(register_range, 0)
78 ex.submit(register_range, 50)
79 ex.submit(register_range, 100)
80 ex.submit(register_range, 150)
81
82 assert len(pm.get_all_services()) == 200
83
84 def test_overwrite_registration(self):
85 pm = PortMapper()
86 pm.register("HTTP", "127.0.0.1", 4444)
87 pm.register("HTTP", "0.0.0.0", 8080)
88 assert pm.get_port("HTTP") == 8080
89 assert pm.get_host("HTTP") == "0.0.0.0"
90
91
92# ---------------------------------------------------------------------------
93# Clock
94# ---------------------------------------------------------------------------
95
96
97class TestClock:
98 """NTP-adjusted time singleton."""
99
100 def test_now_returns_float(self):
101 c = Clock()
102 assert isinstance(c.now(), float)
103
104 def test_offset_default_zero(self):
105 c = Clock()
106 assert c.get_offset() == 0.0
107
108 def test_set_offset(self):
109 c = Clock()
110 c.set_offset(500.0)
111 assert c.get_offset() == 500.0
112
113 def test_now_includes_offset(self):
114 c = Clock()
115 c.set_offset(0.0)
116 t1 = c.now()
117 c.set_offset(1000.0) # +1 second in ms
118 t2 = c.now()
119 # t2 should be noticeably larger due to offset
120 assert t2 > t1
121
122 def test_was_updated(self):
123 c = Clock()
124 assert c.was_updated() is False
125 c.set_offset(100.0)
126 assert c.was_updated() is True
127
128
129# ---------------------------------------------------------------------------
130# ConvertToHash
131# ---------------------------------------------------------------------------
132
133
134class TestConvertToHash:
135 """Convert various I2P address formats to 32-byte hash."""
136
137 def test_base64_hash(self):
138 """43-char I2P Base64 string → 32-byte hash."""
139 # 32 zero bytes = 43 chars of 'A' in I2P Base64 (no padding)
140 b64 = "A" * 43
141 result = convert_to_hash(b64)
142 assert result is not None
143 assert len(result) == 32
144
145 def test_base32_address(self):
146 """52-char .b32.i2p → 32-byte hash."""
147 b32 = "a" * 52 + ".b32.i2p"
148 result = convert_to_hash(b32)
149 assert result is not None
150 assert len(result) == 32
151
152 def test_invalid_returns_none(self):
153 assert convert_to_hash("") is None
154 assert convert_to_hash("not-valid") is None
155
156 def test_short_base64_returns_none(self):
157 assert convert_to_hash("AAAA") is None
158
159 def test_base64_invalid_chars(self):
160 """43-char string with invalid base64 chars → None."""
161 assert convert_to_hash("!" * 43) is None
162
163 def test_base64_44_chars_valid(self):
164 """44-char base64 string that decodes to 32 bytes."""
165 import base64
166 # 32 zero bytes → standard base64 with I2P alphabet
167 raw = b"\x00" * 32
168 b64 = base64.b64encode(raw).decode().replace("+", "-").replace("/", "~")
169 result = convert_to_hash(b64)
170 assert result is not None
171 assert len(result) == 32
172
173 def test_base64_44_chars_wrong_size(self):
174 """44-char base64 that decodes to non-32 bytes → None."""
175 b64 = "A" * 44 # Decodes to 33 bytes
176 assert convert_to_hash(b64) is None
177
178 def test_base32_wrong_length(self):
179 """b32.i2p with wrong-length prefix → None."""
180 assert convert_to_hash("aaaa.b32.i2p") is None
181
182 def test_base32_invalid_chars(self):
183 """52-char b32.i2p with invalid base32 chars → None."""
184 assert convert_to_hash("!" * 52 + ".b32.i2p") is None
185
186 def test_43_char_i2p_suffix(self):
187 """43-char string ending in .i2p should not be treated as base64."""
188 s = "A" * 39 + ".i2p" # 43 chars total
189 assert convert_to_hash(s) is None
190
191
192# ---------------------------------------------------------------------------
193# VersionComparator
194# ---------------------------------------------------------------------------
195
196
197class TestVersionComparator:
198 """Semantic version comparison."""
199
200 def test_equal(self):
201 assert compare_versions("0.9.50", "0.9.50") == 0
202
203 def test_less_than(self):
204 assert compare_versions("0.9.49", "0.9.50") == -1
205
206 def test_greater_than(self):
207 assert compare_versions("0.9.51", "0.9.50") == 1
208
209 def test_different_lengths(self):
210 assert compare_versions("0.9", "0.9.1") == -1
211
212 def test_suffix(self):
213 assert compare_versions("0.9.50-1", "0.9.50") == 1
214
215 def test_major_difference(self):
216 assert compare_versions("2.0.0", "1.99.99") == 1
217
218
219# ---------------------------------------------------------------------------
220# RFC822Date
221# ---------------------------------------------------------------------------
222
223
224class TestRFC822Date:
225 """HTTP date formatting."""
226
227 def test_to_rfc822_format(self):
228 # 2026-03-21 12:00:00 UTC in ms
229 ts_ms = 1774267200000
230 result = to_rfc822(ts_ms)
231 assert "2026" in result
232 assert "GMT" in result
233
234 def test_round_trip(self):
235 ts_ms = 1774267200000
236 formatted = to_rfc822(ts_ms)
237 parsed = parse_rfc822(formatted)
238 assert abs(parsed - ts_ms) < 1000 # within 1 second
239
240 def test_parse_invalid_returns_none(self):
241 assert parse_rfc822("not a date") is None
242
243
244# ---------------------------------------------------------------------------
245# SecureFile / SecureDirectory
246# ---------------------------------------------------------------------------
247
248
249class TestSecureFile:
250 """File/directory creation with restrictive permissions."""
251
252 def test_create_secure_file(self):
253 with tempfile.TemporaryDirectory() as td:
254 path = os.path.join(td, "secret.txt")
255 SecureFile.create(path)
256 assert os.path.exists(path)
257 mode = os.stat(path).st_mode & 0o777
258 assert mode == 0o600
259
260 def test_create_secure_directory(self):
261 with tempfile.TemporaryDirectory() as td:
262 path = os.path.join(td, "secure_dir")
263 SecureDirectory.create(path)
264 assert os.path.isdir(path)
265 mode = os.stat(path).st_mode & 0o777
266 assert mode == 0o700
267
268
269# ---------------------------------------------------------------------------
270# ObjectCounter
271# ---------------------------------------------------------------------------
272
273
274class TestObjectCounter:
275 """Thread-safe counter."""
276
277 def test_increment(self):
278 c = ObjectCounter()
279 c.increment("foo")
280 c.increment("foo")
281 assert c.count("foo") == 2
282
283 def test_decrement(self):
284 c = ObjectCounter()
285 c.increment("foo")
286 c.increment("foo")
287 c.decrement("foo")
288 assert c.count("foo") == 1
289
290 def test_missing_key_zero(self):
291 c = ObjectCounter()
292 assert c.count("missing") == 0
293
294 def test_total(self):
295 c = ObjectCounter()
296 c.increment("a")
297 c.increment("b")
298 c.increment("b")
299 assert c.total() == 3
300
301 def test_thread_safety(self):
302 c = ObjectCounter()
303
304 def inc_many():
305 for _ in range(1000):
306 c.increment("x")
307
308 threads = [threading.Thread(target=inc_many) for _ in range(4)]
309 for t in threads:
310 t.start()
311 for t in threads:
312 t.join()
313 assert c.count("x") == 4000
314
315
316# ---------------------------------------------------------------------------
317# ConcurrentHashSet
318# ---------------------------------------------------------------------------
319
320
321class TestConcurrentHashSet:
322 """Thread-safe set."""
323
324 def test_add_and_contains(self):
325 s = ConcurrentHashSet()
326 s.add("a")
327 assert "a" in s
328 assert "b" not in s
329
330 def test_remove(self):
331 s = ConcurrentHashSet()
332 s.add("a")
333 s.remove("a")
334 assert "a" not in s
335
336 def test_len(self):
337 s = ConcurrentHashSet()
338 s.add("a")
339 s.add("b")
340 assert len(s) == 2
341
342 def test_thread_safety(self):
343 s = ConcurrentHashSet()
344
345 def add_range(start):
346 for i in range(start, start + 100):
347 s.add(i)
348
349 with ThreadPoolExecutor(max_workers=4) as ex:
350 for i in range(0, 400, 100):
351 ex.submit(add_range, i)
352
353 assert len(s) == 400
354
355
356# ---------------------------------------------------------------------------
357# ArraySet
358# ---------------------------------------------------------------------------
359
360
361class TestArraySet:
362 """Sorted small-set backed by array."""
363
364 def test_add_and_contains(self):
365 s = ArraySet()
366 s.add(3)
367 s.add(1)
368 s.add(2)
369 assert 1 in s
370 assert 4 not in s
371
372 def test_sorted_iteration(self):
373 s = ArraySet()
374 s.add(3)
375 s.add(1)
376 s.add(2)
377 assert list(s) == [1, 2, 3]
378
379 def test_no_duplicates(self):
380 s = ArraySet()
381 s.add(1)
382 s.add(1)
383 s.add(1)
384 assert len(s) == 1
385
386 def test_remove(self):
387 s = ArraySet()
388 s.add(1)
389 s.add(2)
390 s.remove(1)
391 assert 1 not in s
392 assert len(s) == 1
393
394 def test_remove_missing_raises(self):
395 s = ArraySet()
396 with pytest.raises(KeyError):
397 s.remove(999)