"""Tests for net.i2p.util Tier 0 — standalone utilities. TDD — tests for PortMapper, Clock, ConvertToHash, VersionComparator, RFC822Date, SecureFile, ObjectCounter, ConcurrentHashSet, ArraySet. """ from __future__ import annotations import os import tempfile import threading import time from concurrent.futures import ThreadPoolExecutor import pytest from i2p_util.port_mapper import PortMapper from i2p_util.clock import Clock from i2p_util.convert import convert_to_hash from i2p_util.version import compare_versions from i2p_util.rfc822 import to_rfc822, parse_rfc822 from i2p_util.secure_file import SecureFile, SecureDirectory from i2p_util.object_counter import ObjectCounter from i2p_util.concurrent_set import ConcurrentHashSet from i2p_util.array_set import ArraySet # --------------------------------------------------------------------------- # PortMapper # --------------------------------------------------------------------------- class TestPortMapper: """Thread-safe service name → port registry.""" def test_register_and_get_port(self): pm = PortMapper() pm.register("HTTP_PROXY", "127.0.0.1", 4444) assert pm.get_port("HTTP_PROXY") == 4444 def test_get_host(self): pm = PortMapper() pm.register("HTTP_PROXY", "127.0.0.1", 4444) assert pm.get_host("HTTP_PROXY") == "127.0.0.1" def test_get_port_default(self): pm = PortMapper() assert pm.get_port("MISSING", default=0) == 0 def test_get_host_default(self): pm = PortMapper() assert pm.get_host("MISSING", default="") == "" def test_unregister(self): pm = PortMapper() pm.register("IRC", "127.0.0.1", 6668) pm.unregister("IRC") assert pm.get_port("IRC", default=-1) == -1 def test_get_all_services(self): pm = PortMapper() pm.register("A", "localhost", 1) pm.register("B", "localhost", 2) services = pm.get_all_services() assert len(services) == 2 assert "A" in services assert "B" in services def test_thread_safety(self): pm = PortMapper() def register_range(start): for i in range(start, start + 50): pm.register(f"SVC_{i}", "127.0.0.1", i) with ThreadPoolExecutor(max_workers=4) as ex: ex.submit(register_range, 0) ex.submit(register_range, 50) ex.submit(register_range, 100) ex.submit(register_range, 150) assert len(pm.get_all_services()) == 200 def test_overwrite_registration(self): pm = PortMapper() pm.register("HTTP", "127.0.0.1", 4444) pm.register("HTTP", "0.0.0.0", 8080) assert pm.get_port("HTTP") == 8080 assert pm.get_host("HTTP") == "0.0.0.0" # --------------------------------------------------------------------------- # Clock # --------------------------------------------------------------------------- class TestClock: """NTP-adjusted time singleton.""" def test_now_returns_float(self): c = Clock() assert isinstance(c.now(), float) def test_offset_default_zero(self): c = Clock() assert c.get_offset() == 0.0 def test_set_offset(self): c = Clock() c.set_offset(500.0) assert c.get_offset() == 500.0 def test_now_includes_offset(self): c = Clock() c.set_offset(0.0) t1 = c.now() c.set_offset(1000.0) # +1 second in ms t2 = c.now() # t2 should be noticeably larger due to offset assert t2 > t1 def test_was_updated(self): c = Clock() assert c.was_updated() is False c.set_offset(100.0) assert c.was_updated() is True # --------------------------------------------------------------------------- # ConvertToHash # --------------------------------------------------------------------------- class TestConvertToHash: """Convert various I2P address formats to 32-byte hash.""" def test_base64_hash(self): """43-char I2P Base64 string → 32-byte hash.""" # 32 zero bytes = 43 chars of 'A' in I2P Base64 (no padding) b64 = "A" * 43 result = convert_to_hash(b64) assert result is not None assert len(result) == 32 def test_base32_address(self): """52-char .b32.i2p → 32-byte hash.""" b32 = "a" * 52 + ".b32.i2p" result = convert_to_hash(b32) assert result is not None assert len(result) == 32 def test_invalid_returns_none(self): assert convert_to_hash("") is None assert convert_to_hash("not-valid") is None def test_short_base64_returns_none(self): assert convert_to_hash("AAAA") is None def test_base64_invalid_chars(self): """43-char string with invalid base64 chars → None.""" assert convert_to_hash("!" * 43) is None def test_base64_44_chars_valid(self): """44-char base64 string that decodes to 32 bytes.""" import base64 # 32 zero bytes → standard base64 with I2P alphabet raw = b"\x00" * 32 b64 = base64.b64encode(raw).decode().replace("+", "-").replace("/", "~") result = convert_to_hash(b64) assert result is not None assert len(result) == 32 def test_base64_44_chars_wrong_size(self): """44-char base64 that decodes to non-32 bytes → None.""" b64 = "A" * 44 # Decodes to 33 bytes assert convert_to_hash(b64) is None def test_base32_wrong_length(self): """b32.i2p with wrong-length prefix → None.""" assert convert_to_hash("aaaa.b32.i2p") is None def test_base32_invalid_chars(self): """52-char b32.i2p with invalid base32 chars → None.""" assert convert_to_hash("!" * 52 + ".b32.i2p") is None def test_43_char_i2p_suffix(self): """43-char string ending in .i2p should not be treated as base64.""" s = "A" * 39 + ".i2p" # 43 chars total assert convert_to_hash(s) is None # --------------------------------------------------------------------------- # VersionComparator # --------------------------------------------------------------------------- class TestVersionComparator: """Semantic version comparison.""" def test_equal(self): assert compare_versions("0.9.50", "0.9.50") == 0 def test_less_than(self): assert compare_versions("0.9.49", "0.9.50") == -1 def test_greater_than(self): assert compare_versions("0.9.51", "0.9.50") == 1 def test_different_lengths(self): assert compare_versions("0.9", "0.9.1") == -1 def test_suffix(self): assert compare_versions("0.9.50-1", "0.9.50") == 1 def test_major_difference(self): assert compare_versions("2.0.0", "1.99.99") == 1 # --------------------------------------------------------------------------- # RFC822Date # --------------------------------------------------------------------------- class TestRFC822Date: """HTTP date formatting.""" def test_to_rfc822_format(self): # 2026-03-21 12:00:00 UTC in ms ts_ms = 1774267200000 result = to_rfc822(ts_ms) assert "2026" in result assert "GMT" in result def test_round_trip(self): ts_ms = 1774267200000 formatted = to_rfc822(ts_ms) parsed = parse_rfc822(formatted) assert abs(parsed - ts_ms) < 1000 # within 1 second def test_parse_invalid_returns_none(self): assert parse_rfc822("not a date") is None # --------------------------------------------------------------------------- # SecureFile / SecureDirectory # --------------------------------------------------------------------------- class TestSecureFile: """File/directory creation with restrictive permissions.""" def test_create_secure_file(self): with tempfile.TemporaryDirectory() as td: path = os.path.join(td, "secret.txt") SecureFile.create(path) assert os.path.exists(path) mode = os.stat(path).st_mode & 0o777 assert mode == 0o600 def test_create_secure_directory(self): with tempfile.TemporaryDirectory() as td: path = os.path.join(td, "secure_dir") SecureDirectory.create(path) assert os.path.isdir(path) mode = os.stat(path).st_mode & 0o777 assert mode == 0o700 # --------------------------------------------------------------------------- # ObjectCounter # --------------------------------------------------------------------------- class TestObjectCounter: """Thread-safe counter.""" def test_increment(self): c = ObjectCounter() c.increment("foo") c.increment("foo") assert c.count("foo") == 2 def test_decrement(self): c = ObjectCounter() c.increment("foo") c.increment("foo") c.decrement("foo") assert c.count("foo") == 1 def test_missing_key_zero(self): c = ObjectCounter() assert c.count("missing") == 0 def test_total(self): c = ObjectCounter() c.increment("a") c.increment("b") c.increment("b") assert c.total() == 3 def test_thread_safety(self): c = ObjectCounter() def inc_many(): for _ in range(1000): c.increment("x") threads = [threading.Thread(target=inc_many) for _ in range(4)] for t in threads: t.start() for t in threads: t.join() assert c.count("x") == 4000 # --------------------------------------------------------------------------- # ConcurrentHashSet # --------------------------------------------------------------------------- class TestConcurrentHashSet: """Thread-safe set.""" def test_add_and_contains(self): s = ConcurrentHashSet() s.add("a") assert "a" in s assert "b" not in s def test_remove(self): s = ConcurrentHashSet() s.add("a") s.remove("a") assert "a" not in s def test_len(self): s = ConcurrentHashSet() s.add("a") s.add("b") assert len(s) == 2 def test_thread_safety(self): s = ConcurrentHashSet() def add_range(start): for i in range(start, start + 100): s.add(i) with ThreadPoolExecutor(max_workers=4) as ex: for i in range(0, 400, 100): ex.submit(add_range, i) assert len(s) == 400 # --------------------------------------------------------------------------- # ArraySet # --------------------------------------------------------------------------- class TestArraySet: """Sorted small-set backed by array.""" def test_add_and_contains(self): s = ArraySet() s.add(3) s.add(1) s.add(2) assert 1 in s assert 4 not in s def test_sorted_iteration(self): s = ArraySet() s.add(3) s.add(1) s.add(2) assert list(s) == [1, 2, 3] def test_no_duplicates(self): s = ArraySet() s.add(1) s.add(1) s.add(1) assert len(s) == 1 def test_remove(self): s = ArraySet() s.add(1) s.add(2) s.remove(1) assert 1 not in s assert len(s) == 1 def test_remove_missing_raises(self): s = ArraySet() with pytest.raises(KeyError): s.remove(999)