A Python port of the Invisible Internet Project (I2P)
at main 87 lines 2.2 kB view raw
1"""I2Ping — destination reachability testing over I2P. 2 3Uses SAM STREAM CONNECT to measure if an I2P destination is reachable 4and estimate round-trip time. 5 6Ported from net.i2p.i2ptunnel.I2Ping. 7""" 8 9from __future__ import annotations 10 11import asyncio 12import logging 13import time 14from dataclasses import dataclass 15 16logger = logging.getLogger(__name__) 17 18 19@dataclass 20class PingResult: 21 """Result of a single ping attempt.""" 22 23 destination: str 24 reachable: bool 25 rtt_ms: float | None = None 26 error: str | None = None 27 28 def __str__(self) -> str: 29 if self.reachable: 30 return f"{self.destination}: reachable, rtt={self.rtt_ms:.1f}ms" 31 return f"{self.destination}: unreachable ({self.error})" 32 33 34async def i2p_ping( 35 session, 36 destination: str, 37 timeout: float = 30.0, 38) -> PingResult: 39 """Ping an I2P destination by attempting a STREAM CONNECT. 40 41 Returns a PingResult with reachability and RTT. 42 """ 43 start = time.monotonic() 44 try: 45 reader, writer = await asyncio.wait_for( 46 session.connect(destination), 47 timeout=timeout, 48 ) 49 rtt_ms = (time.monotonic() - start) * 1000 50 try: 51 writer.close() 52 await writer.wait_closed() 53 except Exception: 54 pass 55 return PingResult( 56 destination=destination, 57 reachable=True, 58 rtt_ms=rtt_ms, 59 ) 60 except asyncio.TimeoutError: 61 return PingResult( 62 destination=destination, 63 reachable=False, 64 error="timeout", 65 ) 66 except Exception as e: 67 return PingResult( 68 destination=destination, 69 reachable=False, 70 error=str(e), 71 ) 72 73 74async def i2p_ping_multi( 75 session, 76 destination: str, 77 count: int = 4, 78 timeout: float = 30.0, 79) -> list[PingResult]: 80 """Send multiple sequential pings with 1s delay between them.""" 81 results = [] 82 for i in range(count): 83 if i > 0: 84 await asyncio.sleep(1.0) 85 result = await i2p_ping(session, destination, timeout=timeout) 86 results.append(result) 87 return results