A Python port of the Invisible Internet Project (I2P)
at main 466 lines 16 kB view raw
1"""Tests for i2p_util IO module — addresses, threads, timer, events, streams.""" 2 3import io 4import threading 5import time 6 7import pytest 8 9 10# === Addresses === 11 12class TestAddresses: 13 def test_is_ipv4_valid(self): 14 from i2p_util.addresses import Addresses 15 assert Addresses.is_ipv4("192.168.1.1") 16 assert Addresses.is_ipv4("0.0.0.0") 17 assert Addresses.is_ipv4("255.255.255.255") 18 19 def test_is_ipv4_invalid(self): 20 from i2p_util.addresses import Addresses 21 assert not Addresses.is_ipv4("not.an.ip") 22 assert not Addresses.is_ipv4("256.1.1.1") 23 assert not Addresses.is_ipv4("::1") 24 assert not Addresses.is_ipv4("") 25 26 def test_is_ipv6_valid(self): 27 from i2p_util.addresses import Addresses 28 assert Addresses.is_ipv6("::1") 29 assert Addresses.is_ipv6("fe80::1") 30 assert Addresses.is_ipv6("2001:db8::1") 31 32 def test_is_ipv6_invalid(self): 33 from i2p_util.addresses import Addresses 34 assert not Addresses.is_ipv6("192.168.1.1") 35 assert not Addresses.is_ipv6("not-ipv6") 36 37 def test_is_ip_address(self): 38 from i2p_util.addresses import Addresses 39 assert Addresses.is_ip_address("10.0.0.1") 40 assert Addresses.is_ip_address("::1") 41 assert not Addresses.is_ip_address("example.com") 42 43 def test_is_public(self): 44 from i2p_util.addresses import Addresses 45 assert not Addresses.is_public("192.168.1.1") 46 assert not Addresses.is_public("10.0.0.1") 47 assert not Addresses.is_public("127.0.0.1") 48 assert Addresses.is_public("8.8.8.8") 49 50 def test_is_local(self): 51 from i2p_util.addresses import Addresses 52 assert Addresses.is_local("127.0.0.1") 53 assert Addresses.is_local("192.168.1.1") 54 assert Addresses.is_local("10.0.0.1") 55 assert not Addresses.is_local("8.8.8.8") 56 57 def test_to_ip_string(self): 58 from i2p_util.addresses import Addresses 59 assert Addresses.to_ip_string(b"\x7f\x00\x00\x01") == "127.0.0.1" 60 assert Addresses.to_ip_string(b"\xc0\xa8\x01\x01") == "192.168.1.1" 61 62 def test_get_ip_localhost(self): 63 from i2p_util.addresses import Addresses 64 ip = Addresses.get_ip("localhost") 65 # Should resolve to 127.0.0.1 or ::1 66 assert ip is not None 67 68 def test_get_ip_invalid(self): 69 from i2p_util.addresses import Addresses 70 ip = Addresses.get_ip("this.host.does.not.exist.invalid") 71 assert ip is None 72 73 def test_address_type_enum(self): 74 from i2p_util.addresses import AddressType 75 assert AddressType.IPV4 != AddressType.IPV6 76 assert AddressType.YGG is not None 77 78 79# === I2PThread === 80 81class TestI2PThread: 82 def test_basic_execution(self): 83 from i2p_util.threads import I2PThread 84 results = [] 85 t = I2PThread(target=lambda: results.append(42), name="test-basic") 86 t.start() 87 t.join(timeout=5) 88 assert results == [42] 89 90 def test_daemon_by_default(self): 91 from i2p_util.threads import I2PThread 92 t = I2PThread(target=lambda: None, name="test-daemon") 93 assert t.daemon is True 94 95 def test_exception_does_not_propagate(self): 96 from i2p_util.threads import I2PThread 97 def bad(): 98 raise ValueError("boom") 99 t = I2PThread(target=bad, name="test-exception") 100 t.start() 101 t.join(timeout=5) 102 # Should complete without raising 103 104 def test_custom_name(self): 105 from i2p_util.threads import I2PThread 106 t = I2PThread(target=lambda: None, name="my-thread") 107 assert t.name == "my-thread" 108 109 def test_non_daemon(self): 110 from i2p_util.threads import I2PThread 111 t = I2PThread(target=lambda: None, daemon=False) 112 assert t.daemon is False 113 114 115# === SimpleTimer2 === 116 117class TestSimpleTimer2: 118 def test_add_event_fires(self): 119 from i2p_util.timer import SimpleTimer2 120 timer = SimpleTimer2(name="test-timer") 121 try: 122 results = [] 123 event = threading.Event() 124 def task(): 125 results.append(1) 126 event.set() 127 timer.add_event(task, 50) # 50ms 128 event.wait(timeout=5) 129 assert results == [1] 130 finally: 131 timer.shutdown() 132 133 def test_cancel_event(self): 134 from i2p_util.timer import SimpleTimer2 135 timer = SimpleTimer2(name="test-cancel") 136 try: 137 results = [] 138 t = timer.add_event(lambda: results.append(1), 500) 139 timer.cancel(t) 140 time.sleep(0.8) 141 assert results == [] 142 finally: 143 timer.shutdown() 144 145 def test_periodic_event(self): 146 from i2p_util.timer import SimpleTimer2 147 timer = SimpleTimer2(name="test-periodic") 148 try: 149 count = [] 150 event = threading.Event() 151 def task(): 152 count.append(1) 153 if len(count) >= 3: 154 event.set() 155 pe = timer.add_periodic_event(task, 100, initial_delay_ms=50) 156 event.wait(timeout=5) 157 pe.cancel() 158 assert len(count) >= 3 159 finally: 160 timer.shutdown() 161 162 def test_shutdown_prevents_new_events(self): 163 from i2p_util.timer import SimpleTimer2 164 timer = SimpleTimer2(name="test-shutdown") 165 timer.shutdown() 166 with pytest.raises(RuntimeError): 167 timer.add_event(lambda: None, 100) 168 169 170# === EventDispatcher === 171 172class TestEventDispatcher: 173 def test_notify_and_get(self): 174 from i2p_util.events import EventDispatcher 175 ed = EventDispatcher() 176 ed.notify_event("test", "value") 177 assert ed.get_event_value("test") == "value" 178 179 def test_get_events(self): 180 from i2p_util.events import EventDispatcher 181 ed = EventDispatcher() 182 ed.notify_event("a", 1) 183 ed.notify_event("b", 2) 184 assert ed.get_events() == {"a", "b"} 185 186 def test_missing_event_returns_none(self): 187 from i2p_util.events import EventDispatcher 188 ed = EventDispatcher() 189 assert ed.get_event_value("missing") is None 190 191 def test_ignore_events(self): 192 from i2p_util.events import EventDispatcher 193 ed = EventDispatcher() 194 ed.ignore_events() 195 ed.notify_event("test", "value") 196 assert ed.get_event_value("test") is None 197 198 def test_un_ignore_events(self): 199 from i2p_util.events import EventDispatcher 200 ed = EventDispatcher() 201 ed.ignore_events() 202 ed.un_ignore_events() 203 ed.notify_event("test", "value") 204 assert ed.get_event_value("test") == "value" 205 206 def test_attach_dispatcher(self): 207 from i2p_util.events import EventDispatcher 208 parent = EventDispatcher() 209 child = EventDispatcher() 210 parent.attach_event_dispatcher(child) 211 parent.notify_event("propagated", 42) 212 assert child.get_event_value("propagated") == 42 213 214 def test_detach_dispatcher(self): 215 from i2p_util.events import EventDispatcher 216 parent = EventDispatcher() 217 child = EventDispatcher() 218 parent.attach_event_dispatcher(child) 219 parent.detach_event_dispatcher(child) 220 parent.notify_event("not_propagated", 99) 221 assert child.get_event_value("not_propagated") is None 222 223 def test_wait_event_value_already_set(self): 224 from i2p_util.events import EventDispatcher 225 ed = EventDispatcher() 226 ed.notify_event("ready", "go") 227 assert ed.wait_event_value("ready", timeout=1) == "go" 228 229 def test_wait_event_value_timeout(self): 230 from i2p_util.events import EventDispatcher 231 ed = EventDispatcher() 232 result = ed.wait_event_value("never", timeout=0.1) 233 assert result is None 234 235 236# === Streams === 237 238class TestLookaheadInputStream: 239 def test_basic_lookahead(self): 240 from i2p_util.streams import LookaheadInputStream 241 data = b"Hello, World!XY" # XY is the 2-byte footer 242 lis = LookaheadInputStream(2) 243 lis.initialize(io.BytesIO(data)) 244 result = lis.read(13) # Read "Hello, World!" 245 assert result == b"Hello, World!" 246 # Read past EOF 247 remaining = lis.read(100) 248 assert remaining == b"" 249 assert lis.eof_reached 250 assert lis.get_footer() == b"XY" 251 252 def test_footer_size_1(self): 253 from i2p_util.streams import LookaheadInputStream 254 data = b"ABCZ" 255 lis = LookaheadInputStream(1) 256 lis.initialize(io.BytesIO(data)) 257 result = lis.read(3) 258 assert result == b"ABC" 259 lis.read(100) # drain 260 assert lis.get_footer() == b"Z" 261 262 def test_footer_exact_size(self): 263 from i2p_util.streams import LookaheadInputStream 264 # Data is exactly the footer size 265 data = b"XY" 266 lis = LookaheadInputStream(2) 267 lis.initialize(io.BytesIO(data)) 268 result = lis.read(100) 269 assert result == b"" 270 assert lis.get_footer() == b"XY" 271 272 def test_initialize_too_short(self): 273 from i2p_util.streams import LookaheadInputStream 274 lis = LookaheadInputStream(5) 275 with pytest.raises(IOError): 276 lis.initialize(io.BytesIO(b"AB")) 277 278 279class TestByteArrayStream: 280 def test_write_and_read_back(self): 281 from i2p_util.streams import ByteArrayStream 282 bas = ByteArrayStream() 283 bas.write(b"hello") 284 bas.write(b" world") 285 result = bas.getvalue() 286 assert result == b"hello world" 287 288 def test_as_input_stream(self): 289 from i2p_util.streams import ByteArrayStream 290 bas = ByteArrayStream() 291 bas.write(b"test data") 292 reader = bas.as_input_stream() 293 assert reader.read() == b"test data" 294 295 296class TestStreamUtils: 297 def test_read_fully(self): 298 from i2p_util.streams import read_fully 299 data = b"0123456789" 300 result = read_fully(io.BytesIO(data), 10) 301 assert result == data 302 303 def test_read_fully_short(self): 304 from i2p_util.streams import read_fully 305 with pytest.raises(IOError): 306 read_fully(io.BytesIO(b"AB"), 10) 307 308 def test_copy_stream(self): 309 from i2p_util.streams import copy_stream 310 src = io.BytesIO(b"copy this data") 311 dst = io.BytesIO() 312 n = copy_stream(src, dst) 313 assert n == 14 314 assert dst.getvalue() == b"copy this data" 315 316 def test_copy_stream_empty(self): 317 from i2p_util.streams import copy_stream 318 src = io.BytesIO(b"") 319 dst = io.BytesIO() 320 n = copy_stream(src, dst) 321 assert n == 0 322 323 324# === Additional Addresses coverage (lines 67-68, 76-77, 91-109, 114-115, 120, 125-128) === 325 326class TestAddressesCoverage: 327 def test_is_public_invalid(self): 328 from i2p_util.addresses import Addresses 329 assert Addresses.is_public("not-valid") is False 330 331 def test_is_local_invalid(self): 332 from i2p_util.addresses import Addresses 333 assert Addresses.is_local("not-valid") is False 334 335 def test_get_addresses_with_local(self): 336 from i2p_util.addresses import Addresses 337 addrs = Addresses.get_addresses(include_local=True) 338 assert isinstance(addrs, set) 339 340 def test_get_addresses_with_ipv6(self): 341 from i2p_util.addresses import Addresses 342 addrs = Addresses.get_addresses(include_local=True, include_ipv6=True) 343 assert isinstance(addrs, set) 344 345 def test_get_any_address(self): 346 from i2p_util.addresses import Addresses 347 result = Addresses.get_any_address() 348 # May be None or an IP string depending on host 349 assert result is None or isinstance(result, str) 350 351 def test_is_connected(self): 352 from i2p_util.addresses import Addresses 353 result = Addresses.is_connected() 354 assert isinstance(result, bool) 355 356 def test_is_connected_ipv6(self): 357 from i2p_util.addresses import Addresses 358 result = Addresses.is_connected_ipv6() 359 assert isinstance(result, bool) 360 361 def test_is_local_link_local(self): 362 from i2p_util.addresses import Addresses 363 assert Addresses.is_local("169.254.0.1") # link-local 364 365 def test_is_public_ipv6(self): 366 from i2p_util.addresses import Addresses 367 assert Addresses.is_public("2001:db8::1") is False # doc range, not global 368 369 370# === Additional I2PThread coverage (lines 36, 42-47, 51-52, 56-57) === 371 372class TestI2PThreadOOM: 373 def test_oom_fires_listeners(self): 374 from i2p_util.threads import I2PThread 375 376 fired = [] 377 class Listener: 378 def out_of_memory(self, error): 379 fired.append(error) 380 381 listener = Listener() 382 I2PThread.add_oom_listener(listener) 383 try: 384 def oom_raiser(): 385 raise MemoryError("test OOM") 386 t = I2PThread(target=oom_raiser, name="test-oom") 387 t.start() 388 t.join(timeout=5) 389 assert len(fired) == 1 390 assert isinstance(fired[0], MemoryError) 391 finally: 392 I2PThread.remove_oom_listener(listener) 393 394 def test_remove_oom_listener(self): 395 from i2p_util.threads import I2PThread 396 397 fired = [] 398 class Listener: 399 def out_of_memory(self, error): 400 fired.append(error) 401 402 listener = Listener() 403 I2PThread.add_oom_listener(listener) 404 I2PThread.remove_oom_listener(listener) 405 def oom_raiser(): 406 raise MemoryError("test OOM") 407 t = I2PThread(target=oom_raiser, name="test-oom-removed") 408 t.start() 409 t.join(timeout=5) 410 assert len(fired) == 0 411 412 413# === Additional SimpleTimer2 coverage (lines 32-36, 47-49, 82, 105, 113, 116-118) === 414 415class TestSimpleTimer2Coverage: 416 def test_singleton_instance(self): 417 from i2p_util.timer import SimpleTimer2 418 # Reset singleton for test isolation 419 orig = SimpleTimer2._instance 420 SimpleTimer2._instance = None 421 try: 422 inst1 = SimpleTimer2.get_instance() 423 inst2 = SimpleTimer2.get_instance() 424 assert inst1 is inst2 425 finally: 426 if SimpleTimer2._instance: 427 SimpleTimer2._instance.shutdown() 428 SimpleTimer2._instance = orig 429 430 def test_task_exception_logged(self): 431 from i2p_util.timer import SimpleTimer2 432 timer = SimpleTimer2(name="test-exception") 433 try: 434 event = threading.Event() 435 def bad_task(): 436 event.set() 437 raise ValueError("task error") 438 timer.add_event(bad_task, 50) 439 event.wait(timeout=5) 440 time.sleep(0.2) # let the exception handler run 441 finally: 442 timer.shutdown() 443 444 def test_periodic_cancel(self): 445 from i2p_util.timer import SimpleTimer2 446 timer = SimpleTimer2(name="test-periodic-cancel") 447 try: 448 count = [] 449 pe = timer.add_periodic_event(lambda: count.append(1), 1000, initial_delay_ms=0) 450 pe.cancel() 451 time.sleep(0.2) 452 assert pe._cancelled is True 453 finally: 454 timer.shutdown() 455 456 def test_periodic_initial_delay_zero(self): 457 from i2p_util.timer import SimpleTimer2 458 timer = SimpleTimer2(name="test-periodic-init0") 459 try: 460 event = threading.Event() 461 pe = timer.add_periodic_event(lambda: event.set(), 5000, initial_delay_ms=0) 462 event.wait(timeout=10) 463 pe.cancel() 464 assert event.is_set() 465 finally: 466 timer.shutdown()