"""I2Ping — destination reachability testing over I2P. Uses SAM STREAM CONNECT to measure if an I2P destination is reachable and estimate round-trip time. Ported from net.i2p.i2ptunnel.I2Ping. """ from __future__ import annotations import asyncio import logging import time from dataclasses import dataclass logger = logging.getLogger(__name__) @dataclass class PingResult: """Result of a single ping attempt.""" destination: str reachable: bool rtt_ms: float | None = None error: str | None = None def __str__(self) -> str: if self.reachable: return f"{self.destination}: reachable, rtt={self.rtt_ms:.1f}ms" return f"{self.destination}: unreachable ({self.error})" async def i2p_ping( session, destination: str, timeout: float = 30.0, ) -> PingResult: """Ping an I2P destination by attempting a STREAM CONNECT. Returns a PingResult with reachability and RTT. """ start = time.monotonic() try: reader, writer = await asyncio.wait_for( session.connect(destination), timeout=timeout, ) rtt_ms = (time.monotonic() - start) * 1000 try: writer.close() await writer.wait_closed() except Exception: pass return PingResult( destination=destination, reachable=True, rtt_ms=rtt_ms, ) except asyncio.TimeoutError: return PingResult( destination=destination, reachable=False, error="timeout", ) except Exception as e: return PingResult( destination=destination, reachable=False, error=str(e), ) async def i2p_ping_multi( session, destination: str, count: int = 4, timeout: float = 30.0, ) -> list[PingResult]: """Send multiple sequential pings with 1s delay between them.""" results = [] for i in range(count): if i > 0: await asyncio.sleep(1.0) result = await i2p_ping(session, destination, timeout=timeout) results.append(result) return results