A Python port of the Invisible Internet Project (I2P)
1"""Tests for I2Ping — TDD: tests before implementation."""
2
3import pytest
4
5from i2p_apps.i2ptunnel.ping import PingResult
6
7
8class TestPingResult:
9 def test_reachable(self):
10 r = PingResult(destination="test.i2p", reachable=True, rtt_ms=150.5)
11 assert r.reachable is True
12 assert r.rtt_ms == 150.5
13 assert r.error is None
14
15 def test_unreachable(self):
16 r = PingResult(destination="test.i2p", reachable=False, error="timeout")
17 assert r.reachable is False
18 assert r.rtt_ms is None
19 assert r.error == "timeout"
20
21 def test_str_reachable(self):
22 r = PingResult(destination="test.i2p", reachable=True, rtt_ms=100.0)
23 s = str(r)
24 assert "test.i2p" in s
25 assert "100.0" in s
26
27 def test_str_unreachable(self):
28 r = PingResult(destination="test.i2p", reachable=False, error="timeout")
29 s = str(r)
30 assert "unreachable" in s.lower() or "timeout" in s.lower()
31
32
33class TestPingFunction:
34 @pytest.mark.asyncio
35 async def test_ping_returns_result(self):
36 """i2p_ping should return a PingResult."""
37 from i2p_apps.i2ptunnel.ping import i2p_ping
38
39 class MockSession:
40 async def connect(self, dest):
41 import asyncio
42 r = asyncio.StreamReader()
43 r.feed_eof()
44 class W:
45 def close(self): pass
46 async def wait_closed(self): pass
47 return r, W()
48
49 result = await i2p_ping(MockSession(), "test.i2p", timeout=5.0)
50 assert isinstance(result, PingResult)
51 assert result.reachable is True
52 assert result.rtt_ms is not None