A Python port of the Invisible Internet Project (I2P)
at main 114 lines 3.5 kB view raw
1"""Layer 1: Local-only system probes — zero network traffic. 2 3Measures CPU crypto throughput, available RAM, disk I/O speed, 4and other host characteristics to inform router configuration. 5""" 6 7from __future__ import annotations 8 9import os 10import shutil 11import tempfile 12import time 13from dataclasses import dataclass 14 15 16@dataclass 17class LocalProbeResult: 18 """Results from local system assessment.""" 19 crypto_ops_per_sec: int 20 available_ram_bytes: int 21 total_ram_bytes: int 22 cpu_count: int 23 disk_write_mbps: float 24 disk_free_bytes: int 25 26 27def measure_crypto_throughput(duration_seconds: float = 2.0) -> int: 28 """Measure X25519 key agreement throughput. 29 30 Times how many ECDH operations the CPU can perform per second. 31 This directly maps to tunnel participation capacity. 32 """ 33 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey 34 35 peer_key = X25519PrivateKey.generate().public_key() 36 count = 0 37 end = time.monotonic() + duration_seconds 38 while time.monotonic() < end: 39 key = X25519PrivateKey.generate() 40 key.exchange(peer_key) 41 count += 1 42 elapsed = time.monotonic() - (end - duration_seconds) 43 return int(count / elapsed) if elapsed > 0 else 0 44 45 46def measure_ram() -> tuple[int, int]: 47 """Measure available and total RAM. 48 49 Returns (available_bytes, total_bytes). 50 Uses /proc/meminfo on Linux, falls back to os.sysconf. 51 """ 52 try: 53 with open("/proc/meminfo") as f: 54 mem = {} 55 for line in f: 56 parts = line.split() 57 if len(parts) >= 2: 58 mem[parts[0].rstrip(":")] = int(parts[1]) * 1024 # kB to bytes 59 available = mem.get("MemAvailable", mem.get("MemFree", 0)) 60 total = mem.get("MemTotal", 0) 61 if total > 0: 62 return available, total 63 except (OSError, ValueError): 64 pass 65 66 # Fallback: os.sysconf (works on most Unix) 67 try: 68 page_size = os.sysconf("SC_PAGE_SIZE") 69 total = os.sysconf("SC_PHYS_PAGES") * page_size 70 available = os.sysconf("SC_AVPHYS_PAGES") * page_size 71 return available, total 72 except (ValueError, OSError): 73 pass 74 75 # Last resort 76 return 0, 0 77 78 79def measure_disk_speed(data_dir: str, size_mb: int = 10) -> float: 80 """Measure disk write speed by writing a temporary file. 81 82 Returns MB/s. Cleans up after itself. 83 """ 84 data = os.urandom(size_mb * 1024 * 1024) 85 fd, path = tempfile.mkstemp(dir=data_dir, prefix=".i2p_disktest_") 86 try: 87 start = time.monotonic() 88 os.write(fd, data) 89 os.fsync(fd) 90 elapsed = time.monotonic() - start 91 return size_mb / elapsed if elapsed > 0 else 0.0 92 finally: 93 os.close(fd) 94 os.unlink(path) 95 96 97def run_local_probes(data_dir: str = "~/.i2p-python") -> LocalProbeResult: 98 """Run all local probes and return aggregated results.""" 99 data_dir = os.path.expanduser(data_dir) 100 os.makedirs(data_dir, exist_ok=True) 101 102 crypto = measure_crypto_throughput(duration_seconds=1.0) 103 available_ram, total_ram = measure_ram() 104 disk_speed = measure_disk_speed(data_dir, size_mb=1) 105 disk_usage = shutil.disk_usage(data_dir) 106 107 return LocalProbeResult( 108 crypto_ops_per_sec=crypto, 109 available_ram_bytes=available_ram, 110 total_ram_bytes=total_ram, 111 cpu_count=os.cpu_count() or 1, 112 disk_write_mbps=disk_speed, 113 disk_free_bytes=disk_usage.free, 114 )