A Python port of the Invisible Internet Project (I2P)
1"""I2Ping — destination reachability testing over I2P.
2
3Uses SAM STREAM CONNECT to measure if an I2P destination is reachable
4and estimate round-trip time.
5
6Ported from net.i2p.i2ptunnel.I2Ping.
7"""
8
9from __future__ import annotations
10
11import asyncio
12import logging
13import time
14from dataclasses import dataclass
15
16logger = logging.getLogger(__name__)
17
18
19@dataclass
20class PingResult:
21 """Result of a single ping attempt."""
22
23 destination: str
24 reachable: bool
25 rtt_ms: float | None = None
26 error: str | None = None
27
28 def __str__(self) -> str:
29 if self.reachable:
30 return f"{self.destination}: reachable, rtt={self.rtt_ms:.1f}ms"
31 return f"{self.destination}: unreachable ({self.error})"
32
33
34async def i2p_ping(
35 session,
36 destination: str,
37 timeout: float = 30.0,
38) -> PingResult:
39 """Ping an I2P destination by attempting a STREAM CONNECT.
40
41 Returns a PingResult with reachability and RTT.
42 """
43 start = time.monotonic()
44 try:
45 reader, writer = await asyncio.wait_for(
46 session.connect(destination),
47 timeout=timeout,
48 )
49 rtt_ms = (time.monotonic() - start) * 1000
50 try:
51 writer.close()
52 await writer.wait_closed()
53 except Exception:
54 pass
55 return PingResult(
56 destination=destination,
57 reachable=True,
58 rtt_ms=rtt_ms,
59 )
60 except asyncio.TimeoutError:
61 return PingResult(
62 destination=destination,
63 reachable=False,
64 error="timeout",
65 )
66 except Exception as e:
67 return PingResult(
68 destination=destination,
69 reachable=False,
70 error=str(e),
71 )
72
73
74async def i2p_ping_multi(
75 session,
76 destination: str,
77 count: int = 4,
78 timeout: float = 30.0,
79) -> list[PingResult]:
80 """Send multiple sequential pings with 1s delay between them."""
81 results = []
82 for i in range(count):
83 if i > 0:
84 await asyncio.sleep(1.0)
85 result = await i2p_ping(session, destination, timeout=timeout)
86 results.append(result)
87 return results