"""Layer 1: Local-only system probes — zero network traffic. Measures CPU crypto throughput, available RAM, disk I/O speed, and other host characteristics to inform router configuration. """ from __future__ import annotations import os import shutil import tempfile import time from dataclasses import dataclass @dataclass class LocalProbeResult: """Results from local system assessment.""" crypto_ops_per_sec: int available_ram_bytes: int total_ram_bytes: int cpu_count: int disk_write_mbps: float disk_free_bytes: int def measure_crypto_throughput(duration_seconds: float = 2.0) -> int: """Measure X25519 key agreement throughput. Times how many ECDH operations the CPU can perform per second. This directly maps to tunnel participation capacity. """ from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey peer_key = X25519PrivateKey.generate().public_key() count = 0 end = time.monotonic() + duration_seconds while time.monotonic() < end: key = X25519PrivateKey.generate() key.exchange(peer_key) count += 1 elapsed = time.monotonic() - (end - duration_seconds) return int(count / elapsed) if elapsed > 0 else 0 def measure_ram() -> tuple[int, int]: """Measure available and total RAM. Returns (available_bytes, total_bytes). Uses /proc/meminfo on Linux, falls back to os.sysconf. """ try: with open("/proc/meminfo") as f: mem = {} for line in f: parts = line.split() if len(parts) >= 2: mem[parts[0].rstrip(":")] = int(parts[1]) * 1024 # kB to bytes available = mem.get("MemAvailable", mem.get("MemFree", 0)) total = mem.get("MemTotal", 0) if total > 0: return available, total except (OSError, ValueError): pass # Fallback: os.sysconf (works on most Unix) try: page_size = os.sysconf("SC_PAGE_SIZE") total = os.sysconf("SC_PHYS_PAGES") * page_size available = os.sysconf("SC_AVPHYS_PAGES") * page_size return available, total except (ValueError, OSError): pass # Last resort return 0, 0 def measure_disk_speed(data_dir: str, size_mb: int = 10) -> float: """Measure disk write speed by writing a temporary file. Returns MB/s. Cleans up after itself. """ data = os.urandom(size_mb * 1024 * 1024) fd, path = tempfile.mkstemp(dir=data_dir, prefix=".i2p_disktest_") try: start = time.monotonic() os.write(fd, data) os.fsync(fd) elapsed = time.monotonic() - start return size_mb / elapsed if elapsed > 0 else 0.0 finally: os.close(fd) os.unlink(path) def run_local_probes(data_dir: str = "~/.i2p-python") -> LocalProbeResult: """Run all local probes and return aggregated results.""" data_dir = os.path.expanduser(data_dir) os.makedirs(data_dir, exist_ok=True) crypto = measure_crypto_throughput(duration_seconds=1.0) available_ram, total_ram = measure_ram() disk_speed = measure_disk_speed(data_dir, size_mb=1) disk_usage = shutil.disk_usage(data_dir) return LocalProbeResult( crypto_ops_per_sec=crypto, available_ram_bytes=available_ram, total_ram_bytes=total_ram, cpu_count=os.cpu_count() or 1, disk_write_mbps=disk_speed, disk_free_bytes=disk_usage.free, )