A Python port of the Invisible Internet Project (I2P)
1"""Tests for i2p_util IO module — addresses, threads, timer, events, streams."""
2
3import io
4import threading
5import time
6
7import pytest
8
9
10# === Addresses ===
11
12class TestAddresses:
13 def test_is_ipv4_valid(self):
14 from i2p_util.addresses import Addresses
15 assert Addresses.is_ipv4("192.168.1.1")
16 assert Addresses.is_ipv4("0.0.0.0")
17 assert Addresses.is_ipv4("255.255.255.255")
18
19 def test_is_ipv4_invalid(self):
20 from i2p_util.addresses import Addresses
21 assert not Addresses.is_ipv4("not.an.ip")
22 assert not Addresses.is_ipv4("256.1.1.1")
23 assert not Addresses.is_ipv4("::1")
24 assert not Addresses.is_ipv4("")
25
26 def test_is_ipv6_valid(self):
27 from i2p_util.addresses import Addresses
28 assert Addresses.is_ipv6("::1")
29 assert Addresses.is_ipv6("fe80::1")
30 assert Addresses.is_ipv6("2001:db8::1")
31
32 def test_is_ipv6_invalid(self):
33 from i2p_util.addresses import Addresses
34 assert not Addresses.is_ipv6("192.168.1.1")
35 assert not Addresses.is_ipv6("not-ipv6")
36
37 def test_is_ip_address(self):
38 from i2p_util.addresses import Addresses
39 assert Addresses.is_ip_address("10.0.0.1")
40 assert Addresses.is_ip_address("::1")
41 assert not Addresses.is_ip_address("example.com")
42
43 def test_is_public(self):
44 from i2p_util.addresses import Addresses
45 assert not Addresses.is_public("192.168.1.1")
46 assert not Addresses.is_public("10.0.0.1")
47 assert not Addresses.is_public("127.0.0.1")
48 assert Addresses.is_public("8.8.8.8")
49
50 def test_is_local(self):
51 from i2p_util.addresses import Addresses
52 assert Addresses.is_local("127.0.0.1")
53 assert Addresses.is_local("192.168.1.1")
54 assert Addresses.is_local("10.0.0.1")
55 assert not Addresses.is_local("8.8.8.8")
56
57 def test_to_ip_string(self):
58 from i2p_util.addresses import Addresses
59 assert Addresses.to_ip_string(b"\x7f\x00\x00\x01") == "127.0.0.1"
60 assert Addresses.to_ip_string(b"\xc0\xa8\x01\x01") == "192.168.1.1"
61
62 def test_get_ip_localhost(self):
63 from i2p_util.addresses import Addresses
64 ip = Addresses.get_ip("localhost")
65 # Should resolve to 127.0.0.1 or ::1
66 assert ip is not None
67
68 def test_get_ip_invalid(self):
69 from i2p_util.addresses import Addresses
70 ip = Addresses.get_ip("this.host.does.not.exist.invalid")
71 assert ip is None
72
73 def test_address_type_enum(self):
74 from i2p_util.addresses import AddressType
75 assert AddressType.IPV4 != AddressType.IPV6
76 assert AddressType.YGG is not None
77
78
79# === I2PThread ===
80
81class TestI2PThread:
82 def test_basic_execution(self):
83 from i2p_util.threads import I2PThread
84 results = []
85 t = I2PThread(target=lambda: results.append(42), name="test-basic")
86 t.start()
87 t.join(timeout=5)
88 assert results == [42]
89
90 def test_daemon_by_default(self):
91 from i2p_util.threads import I2PThread
92 t = I2PThread(target=lambda: None, name="test-daemon")
93 assert t.daemon is True
94
95 def test_exception_does_not_propagate(self):
96 from i2p_util.threads import I2PThread
97 def bad():
98 raise ValueError("boom")
99 t = I2PThread(target=bad, name="test-exception")
100 t.start()
101 t.join(timeout=5)
102 # Should complete without raising
103
104 def test_custom_name(self):
105 from i2p_util.threads import I2PThread
106 t = I2PThread(target=lambda: None, name="my-thread")
107 assert t.name == "my-thread"
108
109 def test_non_daemon(self):
110 from i2p_util.threads import I2PThread
111 t = I2PThread(target=lambda: None, daemon=False)
112 assert t.daemon is False
113
114
115# === SimpleTimer2 ===
116
117class TestSimpleTimer2:
118 def test_add_event_fires(self):
119 from i2p_util.timer import SimpleTimer2
120 timer = SimpleTimer2(name="test-timer")
121 try:
122 results = []
123 event = threading.Event()
124 def task():
125 results.append(1)
126 event.set()
127 timer.add_event(task, 50) # 50ms
128 event.wait(timeout=5)
129 assert results == [1]
130 finally:
131 timer.shutdown()
132
133 def test_cancel_event(self):
134 from i2p_util.timer import SimpleTimer2
135 timer = SimpleTimer2(name="test-cancel")
136 try:
137 results = []
138 t = timer.add_event(lambda: results.append(1), 500)
139 timer.cancel(t)
140 time.sleep(0.8)
141 assert results == []
142 finally:
143 timer.shutdown()
144
145 def test_periodic_event(self):
146 from i2p_util.timer import SimpleTimer2
147 timer = SimpleTimer2(name="test-periodic")
148 try:
149 count = []
150 event = threading.Event()
151 def task():
152 count.append(1)
153 if len(count) >= 3:
154 event.set()
155 pe = timer.add_periodic_event(task, 100, initial_delay_ms=50)
156 event.wait(timeout=5)
157 pe.cancel()
158 assert len(count) >= 3
159 finally:
160 timer.shutdown()
161
162 def test_shutdown_prevents_new_events(self):
163 from i2p_util.timer import SimpleTimer2
164 timer = SimpleTimer2(name="test-shutdown")
165 timer.shutdown()
166 with pytest.raises(RuntimeError):
167 timer.add_event(lambda: None, 100)
168
169
170# === EventDispatcher ===
171
172class TestEventDispatcher:
173 def test_notify_and_get(self):
174 from i2p_util.events import EventDispatcher
175 ed = EventDispatcher()
176 ed.notify_event("test", "value")
177 assert ed.get_event_value("test") == "value"
178
179 def test_get_events(self):
180 from i2p_util.events import EventDispatcher
181 ed = EventDispatcher()
182 ed.notify_event("a", 1)
183 ed.notify_event("b", 2)
184 assert ed.get_events() == {"a", "b"}
185
186 def test_missing_event_returns_none(self):
187 from i2p_util.events import EventDispatcher
188 ed = EventDispatcher()
189 assert ed.get_event_value("missing") is None
190
191 def test_ignore_events(self):
192 from i2p_util.events import EventDispatcher
193 ed = EventDispatcher()
194 ed.ignore_events()
195 ed.notify_event("test", "value")
196 assert ed.get_event_value("test") is None
197
198 def test_un_ignore_events(self):
199 from i2p_util.events import EventDispatcher
200 ed = EventDispatcher()
201 ed.ignore_events()
202 ed.un_ignore_events()
203 ed.notify_event("test", "value")
204 assert ed.get_event_value("test") == "value"
205
206 def test_attach_dispatcher(self):
207 from i2p_util.events import EventDispatcher
208 parent = EventDispatcher()
209 child = EventDispatcher()
210 parent.attach_event_dispatcher(child)
211 parent.notify_event("propagated", 42)
212 assert child.get_event_value("propagated") == 42
213
214 def test_detach_dispatcher(self):
215 from i2p_util.events import EventDispatcher
216 parent = EventDispatcher()
217 child = EventDispatcher()
218 parent.attach_event_dispatcher(child)
219 parent.detach_event_dispatcher(child)
220 parent.notify_event("not_propagated", 99)
221 assert child.get_event_value("not_propagated") is None
222
223 def test_wait_event_value_already_set(self):
224 from i2p_util.events import EventDispatcher
225 ed = EventDispatcher()
226 ed.notify_event("ready", "go")
227 assert ed.wait_event_value("ready", timeout=1) == "go"
228
229 def test_wait_event_value_timeout(self):
230 from i2p_util.events import EventDispatcher
231 ed = EventDispatcher()
232 result = ed.wait_event_value("never", timeout=0.1)
233 assert result is None
234
235
236# === Streams ===
237
238class TestLookaheadInputStream:
239 def test_basic_lookahead(self):
240 from i2p_util.streams import LookaheadInputStream
241 data = b"Hello, World!XY" # XY is the 2-byte footer
242 lis = LookaheadInputStream(2)
243 lis.initialize(io.BytesIO(data))
244 result = lis.read(13) # Read "Hello, World!"
245 assert result == b"Hello, World!"
246 # Read past EOF
247 remaining = lis.read(100)
248 assert remaining == b""
249 assert lis.eof_reached
250 assert lis.get_footer() == b"XY"
251
252 def test_footer_size_1(self):
253 from i2p_util.streams import LookaheadInputStream
254 data = b"ABCZ"
255 lis = LookaheadInputStream(1)
256 lis.initialize(io.BytesIO(data))
257 result = lis.read(3)
258 assert result == b"ABC"
259 lis.read(100) # drain
260 assert lis.get_footer() == b"Z"
261
262 def test_footer_exact_size(self):
263 from i2p_util.streams import LookaheadInputStream
264 # Data is exactly the footer size
265 data = b"XY"
266 lis = LookaheadInputStream(2)
267 lis.initialize(io.BytesIO(data))
268 result = lis.read(100)
269 assert result == b""
270 assert lis.get_footer() == b"XY"
271
272 def test_initialize_too_short(self):
273 from i2p_util.streams import LookaheadInputStream
274 lis = LookaheadInputStream(5)
275 with pytest.raises(IOError):
276 lis.initialize(io.BytesIO(b"AB"))
277
278
279class TestByteArrayStream:
280 def test_write_and_read_back(self):
281 from i2p_util.streams import ByteArrayStream
282 bas = ByteArrayStream()
283 bas.write(b"hello")
284 bas.write(b" world")
285 result = bas.getvalue()
286 assert result == b"hello world"
287
288 def test_as_input_stream(self):
289 from i2p_util.streams import ByteArrayStream
290 bas = ByteArrayStream()
291 bas.write(b"test data")
292 reader = bas.as_input_stream()
293 assert reader.read() == b"test data"
294
295
296class TestStreamUtils:
297 def test_read_fully(self):
298 from i2p_util.streams import read_fully
299 data = b"0123456789"
300 result = read_fully(io.BytesIO(data), 10)
301 assert result == data
302
303 def test_read_fully_short(self):
304 from i2p_util.streams import read_fully
305 with pytest.raises(IOError):
306 read_fully(io.BytesIO(b"AB"), 10)
307
308 def test_copy_stream(self):
309 from i2p_util.streams import copy_stream
310 src = io.BytesIO(b"copy this data")
311 dst = io.BytesIO()
312 n = copy_stream(src, dst)
313 assert n == 14
314 assert dst.getvalue() == b"copy this data"
315
316 def test_copy_stream_empty(self):
317 from i2p_util.streams import copy_stream
318 src = io.BytesIO(b"")
319 dst = io.BytesIO()
320 n = copy_stream(src, dst)
321 assert n == 0
322
323
324# === Additional Addresses coverage (lines 67-68, 76-77, 91-109, 114-115, 120, 125-128) ===
325
326class TestAddressesCoverage:
327 def test_is_public_invalid(self):
328 from i2p_util.addresses import Addresses
329 assert Addresses.is_public("not-valid") is False
330
331 def test_is_local_invalid(self):
332 from i2p_util.addresses import Addresses
333 assert Addresses.is_local("not-valid") is False
334
335 def test_get_addresses_with_local(self):
336 from i2p_util.addresses import Addresses
337 addrs = Addresses.get_addresses(include_local=True)
338 assert isinstance(addrs, set)
339
340 def test_get_addresses_with_ipv6(self):
341 from i2p_util.addresses import Addresses
342 addrs = Addresses.get_addresses(include_local=True, include_ipv6=True)
343 assert isinstance(addrs, set)
344
345 def test_get_any_address(self):
346 from i2p_util.addresses import Addresses
347 result = Addresses.get_any_address()
348 # May be None or an IP string depending on host
349 assert result is None or isinstance(result, str)
350
351 def test_is_connected(self):
352 from i2p_util.addresses import Addresses
353 result = Addresses.is_connected()
354 assert isinstance(result, bool)
355
356 def test_is_connected_ipv6(self):
357 from i2p_util.addresses import Addresses
358 result = Addresses.is_connected_ipv6()
359 assert isinstance(result, bool)
360
361 def test_is_local_link_local(self):
362 from i2p_util.addresses import Addresses
363 assert Addresses.is_local("169.254.0.1") # link-local
364
365 def test_is_public_ipv6(self):
366 from i2p_util.addresses import Addresses
367 assert Addresses.is_public("2001:db8::1") is False # doc range, not global
368
369
370# === Additional I2PThread coverage (lines 36, 42-47, 51-52, 56-57) ===
371
372class TestI2PThreadOOM:
373 def test_oom_fires_listeners(self):
374 from i2p_util.threads import I2PThread
375
376 fired = []
377 class Listener:
378 def out_of_memory(self, error):
379 fired.append(error)
380
381 listener = Listener()
382 I2PThread.add_oom_listener(listener)
383 try:
384 def oom_raiser():
385 raise MemoryError("test OOM")
386 t = I2PThread(target=oom_raiser, name="test-oom")
387 t.start()
388 t.join(timeout=5)
389 assert len(fired) == 1
390 assert isinstance(fired[0], MemoryError)
391 finally:
392 I2PThread.remove_oom_listener(listener)
393
394 def test_remove_oom_listener(self):
395 from i2p_util.threads import I2PThread
396
397 fired = []
398 class Listener:
399 def out_of_memory(self, error):
400 fired.append(error)
401
402 listener = Listener()
403 I2PThread.add_oom_listener(listener)
404 I2PThread.remove_oom_listener(listener)
405 def oom_raiser():
406 raise MemoryError("test OOM")
407 t = I2PThread(target=oom_raiser, name="test-oom-removed")
408 t.start()
409 t.join(timeout=5)
410 assert len(fired) == 0
411
412
413# === Additional SimpleTimer2 coverage (lines 32-36, 47-49, 82, 105, 113, 116-118) ===
414
415class TestSimpleTimer2Coverage:
416 def test_singleton_instance(self):
417 from i2p_util.timer import SimpleTimer2
418 # Reset singleton for test isolation
419 orig = SimpleTimer2._instance
420 SimpleTimer2._instance = None
421 try:
422 inst1 = SimpleTimer2.get_instance()
423 inst2 = SimpleTimer2.get_instance()
424 assert inst1 is inst2
425 finally:
426 if SimpleTimer2._instance:
427 SimpleTimer2._instance.shutdown()
428 SimpleTimer2._instance = orig
429
430 def test_task_exception_logged(self):
431 from i2p_util.timer import SimpleTimer2
432 timer = SimpleTimer2(name="test-exception")
433 try:
434 event = threading.Event()
435 def bad_task():
436 event.set()
437 raise ValueError("task error")
438 timer.add_event(bad_task, 50)
439 event.wait(timeout=5)
440 time.sleep(0.2) # let the exception handler run
441 finally:
442 timer.shutdown()
443
444 def test_periodic_cancel(self):
445 from i2p_util.timer import SimpleTimer2
446 timer = SimpleTimer2(name="test-periodic-cancel")
447 try:
448 count = []
449 pe = timer.add_periodic_event(lambda: count.append(1), 1000, initial_delay_ms=0)
450 pe.cancel()
451 time.sleep(0.2)
452 assert pe._cancelled is True
453 finally:
454 timer.shutdown()
455
456 def test_periodic_initial_delay_zero(self):
457 from i2p_util.timer import SimpleTimer2
458 timer = SimpleTimer2(name="test-periodic-init0")
459 try:
460 event = threading.Event()
461 pe = timer.add_periodic_event(lambda: event.set(), 5000, initial_delay_ms=0)
462 event.wait(timeout=10)
463 pe.cancel()
464 assert event.is_set()
465 finally:
466 timer.shutdown()