"""Tests for i2p_util IO module — addresses, threads, timer, events, streams.""" import io import threading import time import pytest # === Addresses === class TestAddresses: def test_is_ipv4_valid(self): from i2p_util.addresses import Addresses assert Addresses.is_ipv4("192.168.1.1") assert Addresses.is_ipv4("0.0.0.0") assert Addresses.is_ipv4("255.255.255.255") def test_is_ipv4_invalid(self): from i2p_util.addresses import Addresses assert not Addresses.is_ipv4("not.an.ip") assert not Addresses.is_ipv4("256.1.1.1") assert not Addresses.is_ipv4("::1") assert not Addresses.is_ipv4("") def test_is_ipv6_valid(self): from i2p_util.addresses import Addresses assert Addresses.is_ipv6("::1") assert Addresses.is_ipv6("fe80::1") assert Addresses.is_ipv6("2001:db8::1") def test_is_ipv6_invalid(self): from i2p_util.addresses import Addresses assert not Addresses.is_ipv6("192.168.1.1") assert not Addresses.is_ipv6("not-ipv6") def test_is_ip_address(self): from i2p_util.addresses import Addresses assert Addresses.is_ip_address("10.0.0.1") assert Addresses.is_ip_address("::1") assert not Addresses.is_ip_address("example.com") def test_is_public(self): from i2p_util.addresses import Addresses assert not Addresses.is_public("192.168.1.1") assert not Addresses.is_public("10.0.0.1") assert not Addresses.is_public("127.0.0.1") assert Addresses.is_public("8.8.8.8") def test_is_local(self): from i2p_util.addresses import Addresses assert Addresses.is_local("127.0.0.1") assert Addresses.is_local("192.168.1.1") assert Addresses.is_local("10.0.0.1") assert not Addresses.is_local("8.8.8.8") def test_to_ip_string(self): from i2p_util.addresses import Addresses assert Addresses.to_ip_string(b"\x7f\x00\x00\x01") == "127.0.0.1" assert Addresses.to_ip_string(b"\xc0\xa8\x01\x01") == "192.168.1.1" def test_get_ip_localhost(self): from i2p_util.addresses import Addresses ip = Addresses.get_ip("localhost") # Should resolve to 127.0.0.1 or ::1 assert ip is not None def test_get_ip_invalid(self): from i2p_util.addresses import Addresses ip = Addresses.get_ip("this.host.does.not.exist.invalid") assert ip is None def test_address_type_enum(self): from i2p_util.addresses import AddressType assert AddressType.IPV4 != AddressType.IPV6 assert AddressType.YGG is not None # === I2PThread === class TestI2PThread: def test_basic_execution(self): from i2p_util.threads import I2PThread results = [] t = I2PThread(target=lambda: results.append(42), name="test-basic") t.start() t.join(timeout=5) assert results == [42] def test_daemon_by_default(self): from i2p_util.threads import I2PThread t = I2PThread(target=lambda: None, name="test-daemon") assert t.daemon is True def test_exception_does_not_propagate(self): from i2p_util.threads import I2PThread def bad(): raise ValueError("boom") t = I2PThread(target=bad, name="test-exception") t.start() t.join(timeout=5) # Should complete without raising def test_custom_name(self): from i2p_util.threads import I2PThread t = I2PThread(target=lambda: None, name="my-thread") assert t.name == "my-thread" def test_non_daemon(self): from i2p_util.threads import I2PThread t = I2PThread(target=lambda: None, daemon=False) assert t.daemon is False # === SimpleTimer2 === class TestSimpleTimer2: def test_add_event_fires(self): from i2p_util.timer import SimpleTimer2 timer = SimpleTimer2(name="test-timer") try: results = [] event = threading.Event() def task(): results.append(1) event.set() timer.add_event(task, 50) # 50ms event.wait(timeout=5) assert results == [1] finally: timer.shutdown() def test_cancel_event(self): from i2p_util.timer import SimpleTimer2 timer = SimpleTimer2(name="test-cancel") try: results = [] t = timer.add_event(lambda: results.append(1), 500) timer.cancel(t) time.sleep(0.8) assert results == [] finally: timer.shutdown() def test_periodic_event(self): from i2p_util.timer import SimpleTimer2 timer = SimpleTimer2(name="test-periodic") try: count = [] event = threading.Event() def task(): count.append(1) if len(count) >= 3: event.set() pe = timer.add_periodic_event(task, 100, initial_delay_ms=50) event.wait(timeout=5) pe.cancel() assert len(count) >= 3 finally: timer.shutdown() def test_shutdown_prevents_new_events(self): from i2p_util.timer import SimpleTimer2 timer = SimpleTimer2(name="test-shutdown") timer.shutdown() with pytest.raises(RuntimeError): timer.add_event(lambda: None, 100) # === EventDispatcher === class TestEventDispatcher: def test_notify_and_get(self): from i2p_util.events import EventDispatcher ed = EventDispatcher() ed.notify_event("test", "value") assert ed.get_event_value("test") == "value" def test_get_events(self): from i2p_util.events import EventDispatcher ed = EventDispatcher() ed.notify_event("a", 1) ed.notify_event("b", 2) assert ed.get_events() == {"a", "b"} def test_missing_event_returns_none(self): from i2p_util.events import EventDispatcher ed = EventDispatcher() assert ed.get_event_value("missing") is None def test_ignore_events(self): from i2p_util.events import EventDispatcher ed = EventDispatcher() ed.ignore_events() ed.notify_event("test", "value") assert ed.get_event_value("test") is None def test_un_ignore_events(self): from i2p_util.events import EventDispatcher ed = EventDispatcher() ed.ignore_events() ed.un_ignore_events() ed.notify_event("test", "value") assert ed.get_event_value("test") == "value" def test_attach_dispatcher(self): from i2p_util.events import EventDispatcher parent = EventDispatcher() child = EventDispatcher() parent.attach_event_dispatcher(child) parent.notify_event("propagated", 42) assert child.get_event_value("propagated") == 42 def test_detach_dispatcher(self): from i2p_util.events import EventDispatcher parent = EventDispatcher() child = EventDispatcher() parent.attach_event_dispatcher(child) parent.detach_event_dispatcher(child) parent.notify_event("not_propagated", 99) assert child.get_event_value("not_propagated") is None def test_wait_event_value_already_set(self): from i2p_util.events import EventDispatcher ed = EventDispatcher() ed.notify_event("ready", "go") assert ed.wait_event_value("ready", timeout=1) == "go" def test_wait_event_value_timeout(self): from i2p_util.events import EventDispatcher ed = EventDispatcher() result = ed.wait_event_value("never", timeout=0.1) assert result is None # === Streams === class TestLookaheadInputStream: def test_basic_lookahead(self): from i2p_util.streams import LookaheadInputStream data = b"Hello, World!XY" # XY is the 2-byte footer lis = LookaheadInputStream(2) lis.initialize(io.BytesIO(data)) result = lis.read(13) # Read "Hello, World!" assert result == b"Hello, World!" # Read past EOF remaining = lis.read(100) assert remaining == b"" assert lis.eof_reached assert lis.get_footer() == b"XY" def test_footer_size_1(self): from i2p_util.streams import LookaheadInputStream data = b"ABCZ" lis = LookaheadInputStream(1) lis.initialize(io.BytesIO(data)) result = lis.read(3) assert result == b"ABC" lis.read(100) # drain assert lis.get_footer() == b"Z" def test_footer_exact_size(self): from i2p_util.streams import LookaheadInputStream # Data is exactly the footer size data = b"XY" lis = LookaheadInputStream(2) lis.initialize(io.BytesIO(data)) result = lis.read(100) assert result == b"" assert lis.get_footer() == b"XY" def test_initialize_too_short(self): from i2p_util.streams import LookaheadInputStream lis = LookaheadInputStream(5) with pytest.raises(IOError): lis.initialize(io.BytesIO(b"AB")) class TestByteArrayStream: def test_write_and_read_back(self): from i2p_util.streams import ByteArrayStream bas = ByteArrayStream() bas.write(b"hello") bas.write(b" world") result = bas.getvalue() assert result == b"hello world" def test_as_input_stream(self): from i2p_util.streams import ByteArrayStream bas = ByteArrayStream() bas.write(b"test data") reader = bas.as_input_stream() assert reader.read() == b"test data" class TestStreamUtils: def test_read_fully(self): from i2p_util.streams import read_fully data = b"0123456789" result = read_fully(io.BytesIO(data), 10) assert result == data def test_read_fully_short(self): from i2p_util.streams import read_fully with pytest.raises(IOError): read_fully(io.BytesIO(b"AB"), 10) def test_copy_stream(self): from i2p_util.streams import copy_stream src = io.BytesIO(b"copy this data") dst = io.BytesIO() n = copy_stream(src, dst) assert n == 14 assert dst.getvalue() == b"copy this data" def test_copy_stream_empty(self): from i2p_util.streams import copy_stream src = io.BytesIO(b"") dst = io.BytesIO() n = copy_stream(src, dst) assert n == 0 # === Additional Addresses coverage (lines 67-68, 76-77, 91-109, 114-115, 120, 125-128) === class TestAddressesCoverage: def test_is_public_invalid(self): from i2p_util.addresses import Addresses assert Addresses.is_public("not-valid") is False def test_is_local_invalid(self): from i2p_util.addresses import Addresses assert Addresses.is_local("not-valid") is False def test_get_addresses_with_local(self): from i2p_util.addresses import Addresses addrs = Addresses.get_addresses(include_local=True) assert isinstance(addrs, set) def test_get_addresses_with_ipv6(self): from i2p_util.addresses import Addresses addrs = Addresses.get_addresses(include_local=True, include_ipv6=True) assert isinstance(addrs, set) def test_get_any_address(self): from i2p_util.addresses import Addresses result = Addresses.get_any_address() # May be None or an IP string depending on host assert result is None or isinstance(result, str) def test_is_connected(self): from i2p_util.addresses import Addresses result = Addresses.is_connected() assert isinstance(result, bool) def test_is_connected_ipv6(self): from i2p_util.addresses import Addresses result = Addresses.is_connected_ipv6() assert isinstance(result, bool) def test_is_local_link_local(self): from i2p_util.addresses import Addresses assert Addresses.is_local("169.254.0.1") # link-local def test_is_public_ipv6(self): from i2p_util.addresses import Addresses assert Addresses.is_public("2001:db8::1") is False # doc range, not global # === Additional I2PThread coverage (lines 36, 42-47, 51-52, 56-57) === class TestI2PThreadOOM: def test_oom_fires_listeners(self): from i2p_util.threads import I2PThread fired = [] class Listener: def out_of_memory(self, error): fired.append(error) listener = Listener() I2PThread.add_oom_listener(listener) try: def oom_raiser(): raise MemoryError("test OOM") t = I2PThread(target=oom_raiser, name="test-oom") t.start() t.join(timeout=5) assert len(fired) == 1 assert isinstance(fired[0], MemoryError) finally: I2PThread.remove_oom_listener(listener) def test_remove_oom_listener(self): from i2p_util.threads import I2PThread fired = [] class Listener: def out_of_memory(self, error): fired.append(error) listener = Listener() I2PThread.add_oom_listener(listener) I2PThread.remove_oom_listener(listener) def oom_raiser(): raise MemoryError("test OOM") t = I2PThread(target=oom_raiser, name="test-oom-removed") t.start() t.join(timeout=5) assert len(fired) == 0 # === Additional SimpleTimer2 coverage (lines 32-36, 47-49, 82, 105, 113, 116-118) === class TestSimpleTimer2Coverage: def test_singleton_instance(self): from i2p_util.timer import SimpleTimer2 # Reset singleton for test isolation orig = SimpleTimer2._instance SimpleTimer2._instance = None try: inst1 = SimpleTimer2.get_instance() inst2 = SimpleTimer2.get_instance() assert inst1 is inst2 finally: if SimpleTimer2._instance: SimpleTimer2._instance.shutdown() SimpleTimer2._instance = orig def test_task_exception_logged(self): from i2p_util.timer import SimpleTimer2 timer = SimpleTimer2(name="test-exception") try: event = threading.Event() def bad_task(): event.set() raise ValueError("task error") timer.add_event(bad_task, 50) event.wait(timeout=5) time.sleep(0.2) # let the exception handler run finally: timer.shutdown() def test_periodic_cancel(self): from i2p_util.timer import SimpleTimer2 timer = SimpleTimer2(name="test-periodic-cancel") try: count = [] pe = timer.add_periodic_event(lambda: count.append(1), 1000, initial_delay_ms=0) pe.cancel() time.sleep(0.2) assert pe._cancelled is True finally: timer.shutdown() def test_periodic_initial_delay_zero(self): from i2p_util.timer import SimpleTimer2 timer = SimpleTimer2(name="test-periodic-init0") try: event = threading.Event() pe = timer.add_periodic_event(lambda: event.set(), 5000, initial_delay_ms=0) event.wait(timeout=10) pe.cancel() assert event.is_set() finally: timer.shutdown()