A Python port of the Invisible Internet Project (I2P)
at main 397 lines 12 kB view raw
1"""Tests for net.i2p.util Tier 0 — standalone utilities. 2 3TDD — tests for PortMapper, Clock, ConvertToHash, VersionComparator, 4RFC822Date, SecureFile, ObjectCounter, ConcurrentHashSet, ArraySet. 5""" 6 7from __future__ import annotations 8 9import os 10import tempfile 11import threading 12import time 13from concurrent.futures import ThreadPoolExecutor 14 15import pytest 16 17from i2p_util.port_mapper import PortMapper 18from i2p_util.clock import Clock 19from i2p_util.convert import convert_to_hash 20from i2p_util.version import compare_versions 21from i2p_util.rfc822 import to_rfc822, parse_rfc822 22from i2p_util.secure_file import SecureFile, SecureDirectory 23from i2p_util.object_counter import ObjectCounter 24from i2p_util.concurrent_set import ConcurrentHashSet 25from i2p_util.array_set import ArraySet 26 27 28# --------------------------------------------------------------------------- 29# PortMapper 30# --------------------------------------------------------------------------- 31 32 33class TestPortMapper: 34 """Thread-safe service name → port registry.""" 35 36 def test_register_and_get_port(self): 37 pm = PortMapper() 38 pm.register("HTTP_PROXY", "127.0.0.1", 4444) 39 assert pm.get_port("HTTP_PROXY") == 4444 40 41 def test_get_host(self): 42 pm = PortMapper() 43 pm.register("HTTP_PROXY", "127.0.0.1", 4444) 44 assert pm.get_host("HTTP_PROXY") == "127.0.0.1" 45 46 def test_get_port_default(self): 47 pm = PortMapper() 48 assert pm.get_port("MISSING", default=0) == 0 49 50 def test_get_host_default(self): 51 pm = PortMapper() 52 assert pm.get_host("MISSING", default="") == "" 53 54 def test_unregister(self): 55 pm = PortMapper() 56 pm.register("IRC", "127.0.0.1", 6668) 57 pm.unregister("IRC") 58 assert pm.get_port("IRC", default=-1) == -1 59 60 def test_get_all_services(self): 61 pm = PortMapper() 62 pm.register("A", "localhost", 1) 63 pm.register("B", "localhost", 2) 64 services = pm.get_all_services() 65 assert len(services) == 2 66 assert "A" in services 67 assert "B" in services 68 69 def test_thread_safety(self): 70 pm = PortMapper() 71 72 def register_range(start): 73 for i in range(start, start + 50): 74 pm.register(f"SVC_{i}", "127.0.0.1", i) 75 76 with ThreadPoolExecutor(max_workers=4) as ex: 77 ex.submit(register_range, 0) 78 ex.submit(register_range, 50) 79 ex.submit(register_range, 100) 80 ex.submit(register_range, 150) 81 82 assert len(pm.get_all_services()) == 200 83 84 def test_overwrite_registration(self): 85 pm = PortMapper() 86 pm.register("HTTP", "127.0.0.1", 4444) 87 pm.register("HTTP", "0.0.0.0", 8080) 88 assert pm.get_port("HTTP") == 8080 89 assert pm.get_host("HTTP") == "0.0.0.0" 90 91 92# --------------------------------------------------------------------------- 93# Clock 94# --------------------------------------------------------------------------- 95 96 97class TestClock: 98 """NTP-adjusted time singleton.""" 99 100 def test_now_returns_float(self): 101 c = Clock() 102 assert isinstance(c.now(), float) 103 104 def test_offset_default_zero(self): 105 c = Clock() 106 assert c.get_offset() == 0.0 107 108 def test_set_offset(self): 109 c = Clock() 110 c.set_offset(500.0) 111 assert c.get_offset() == 500.0 112 113 def test_now_includes_offset(self): 114 c = Clock() 115 c.set_offset(0.0) 116 t1 = c.now() 117 c.set_offset(1000.0) # +1 second in ms 118 t2 = c.now() 119 # t2 should be noticeably larger due to offset 120 assert t2 > t1 121 122 def test_was_updated(self): 123 c = Clock() 124 assert c.was_updated() is False 125 c.set_offset(100.0) 126 assert c.was_updated() is True 127 128 129# --------------------------------------------------------------------------- 130# ConvertToHash 131# --------------------------------------------------------------------------- 132 133 134class TestConvertToHash: 135 """Convert various I2P address formats to 32-byte hash.""" 136 137 def test_base64_hash(self): 138 """43-char I2P Base64 string → 32-byte hash.""" 139 # 32 zero bytes = 43 chars of 'A' in I2P Base64 (no padding) 140 b64 = "A" * 43 141 result = convert_to_hash(b64) 142 assert result is not None 143 assert len(result) == 32 144 145 def test_base32_address(self): 146 """52-char .b32.i2p → 32-byte hash.""" 147 b32 = "a" * 52 + ".b32.i2p" 148 result = convert_to_hash(b32) 149 assert result is not None 150 assert len(result) == 32 151 152 def test_invalid_returns_none(self): 153 assert convert_to_hash("") is None 154 assert convert_to_hash("not-valid") is None 155 156 def test_short_base64_returns_none(self): 157 assert convert_to_hash("AAAA") is None 158 159 def test_base64_invalid_chars(self): 160 """43-char string with invalid base64 chars → None.""" 161 assert convert_to_hash("!" * 43) is None 162 163 def test_base64_44_chars_valid(self): 164 """44-char base64 string that decodes to 32 bytes.""" 165 import base64 166 # 32 zero bytes → standard base64 with I2P alphabet 167 raw = b"\x00" * 32 168 b64 = base64.b64encode(raw).decode().replace("+", "-").replace("/", "~") 169 result = convert_to_hash(b64) 170 assert result is not None 171 assert len(result) == 32 172 173 def test_base64_44_chars_wrong_size(self): 174 """44-char base64 that decodes to non-32 bytes → None.""" 175 b64 = "A" * 44 # Decodes to 33 bytes 176 assert convert_to_hash(b64) is None 177 178 def test_base32_wrong_length(self): 179 """b32.i2p with wrong-length prefix → None.""" 180 assert convert_to_hash("aaaa.b32.i2p") is None 181 182 def test_base32_invalid_chars(self): 183 """52-char b32.i2p with invalid base32 chars → None.""" 184 assert convert_to_hash("!" * 52 + ".b32.i2p") is None 185 186 def test_43_char_i2p_suffix(self): 187 """43-char string ending in .i2p should not be treated as base64.""" 188 s = "A" * 39 + ".i2p" # 43 chars total 189 assert convert_to_hash(s) is None 190 191 192# --------------------------------------------------------------------------- 193# VersionComparator 194# --------------------------------------------------------------------------- 195 196 197class TestVersionComparator: 198 """Semantic version comparison.""" 199 200 def test_equal(self): 201 assert compare_versions("0.9.50", "0.9.50") == 0 202 203 def test_less_than(self): 204 assert compare_versions("0.9.49", "0.9.50") == -1 205 206 def test_greater_than(self): 207 assert compare_versions("0.9.51", "0.9.50") == 1 208 209 def test_different_lengths(self): 210 assert compare_versions("0.9", "0.9.1") == -1 211 212 def test_suffix(self): 213 assert compare_versions("0.9.50-1", "0.9.50") == 1 214 215 def test_major_difference(self): 216 assert compare_versions("2.0.0", "1.99.99") == 1 217 218 219# --------------------------------------------------------------------------- 220# RFC822Date 221# --------------------------------------------------------------------------- 222 223 224class TestRFC822Date: 225 """HTTP date formatting.""" 226 227 def test_to_rfc822_format(self): 228 # 2026-03-21 12:00:00 UTC in ms 229 ts_ms = 1774267200000 230 result = to_rfc822(ts_ms) 231 assert "2026" in result 232 assert "GMT" in result 233 234 def test_round_trip(self): 235 ts_ms = 1774267200000 236 formatted = to_rfc822(ts_ms) 237 parsed = parse_rfc822(formatted) 238 assert abs(parsed - ts_ms) < 1000 # within 1 second 239 240 def test_parse_invalid_returns_none(self): 241 assert parse_rfc822("not a date") is None 242 243 244# --------------------------------------------------------------------------- 245# SecureFile / SecureDirectory 246# --------------------------------------------------------------------------- 247 248 249class TestSecureFile: 250 """File/directory creation with restrictive permissions.""" 251 252 def test_create_secure_file(self): 253 with tempfile.TemporaryDirectory() as td: 254 path = os.path.join(td, "secret.txt") 255 SecureFile.create(path) 256 assert os.path.exists(path) 257 mode = os.stat(path).st_mode & 0o777 258 assert mode == 0o600 259 260 def test_create_secure_directory(self): 261 with tempfile.TemporaryDirectory() as td: 262 path = os.path.join(td, "secure_dir") 263 SecureDirectory.create(path) 264 assert os.path.isdir(path) 265 mode = os.stat(path).st_mode & 0o777 266 assert mode == 0o700 267 268 269# --------------------------------------------------------------------------- 270# ObjectCounter 271# --------------------------------------------------------------------------- 272 273 274class TestObjectCounter: 275 """Thread-safe counter.""" 276 277 def test_increment(self): 278 c = ObjectCounter() 279 c.increment("foo") 280 c.increment("foo") 281 assert c.count("foo") == 2 282 283 def test_decrement(self): 284 c = ObjectCounter() 285 c.increment("foo") 286 c.increment("foo") 287 c.decrement("foo") 288 assert c.count("foo") == 1 289 290 def test_missing_key_zero(self): 291 c = ObjectCounter() 292 assert c.count("missing") == 0 293 294 def test_total(self): 295 c = ObjectCounter() 296 c.increment("a") 297 c.increment("b") 298 c.increment("b") 299 assert c.total() == 3 300 301 def test_thread_safety(self): 302 c = ObjectCounter() 303 304 def inc_many(): 305 for _ in range(1000): 306 c.increment("x") 307 308 threads = [threading.Thread(target=inc_many) for _ in range(4)] 309 for t in threads: 310 t.start() 311 for t in threads: 312 t.join() 313 assert c.count("x") == 4000 314 315 316# --------------------------------------------------------------------------- 317# ConcurrentHashSet 318# --------------------------------------------------------------------------- 319 320 321class TestConcurrentHashSet: 322 """Thread-safe set.""" 323 324 def test_add_and_contains(self): 325 s = ConcurrentHashSet() 326 s.add("a") 327 assert "a" in s 328 assert "b" not in s 329 330 def test_remove(self): 331 s = ConcurrentHashSet() 332 s.add("a") 333 s.remove("a") 334 assert "a" not in s 335 336 def test_len(self): 337 s = ConcurrentHashSet() 338 s.add("a") 339 s.add("b") 340 assert len(s) == 2 341 342 def test_thread_safety(self): 343 s = ConcurrentHashSet() 344 345 def add_range(start): 346 for i in range(start, start + 100): 347 s.add(i) 348 349 with ThreadPoolExecutor(max_workers=4) as ex: 350 for i in range(0, 400, 100): 351 ex.submit(add_range, i) 352 353 assert len(s) == 400 354 355 356# --------------------------------------------------------------------------- 357# ArraySet 358# --------------------------------------------------------------------------- 359 360 361class TestArraySet: 362 """Sorted small-set backed by array.""" 363 364 def test_add_and_contains(self): 365 s = ArraySet() 366 s.add(3) 367 s.add(1) 368 s.add(2) 369 assert 1 in s 370 assert 4 not in s 371 372 def test_sorted_iteration(self): 373 s = ArraySet() 374 s.add(3) 375 s.add(1) 376 s.add(2) 377 assert list(s) == [1, 2, 3] 378 379 def test_no_duplicates(self): 380 s = ArraySet() 381 s.add(1) 382 s.add(1) 383 s.add(1) 384 assert len(s) == 1 385 386 def test_remove(self): 387 s = ArraySet() 388 s.add(1) 389 s.add(2) 390 s.remove(1) 391 assert 1 not in s 392 assert len(s) == 1 393 394 def test_remove_missing_raises(self): 395 s = ArraySet() 396 with pytest.raises(KeyError): 397 s.remove(999)