A Python port of the Invisible Internet Project (I2P)
1"""Layer 1: Local-only system probes — zero network traffic.
2
3Measures CPU crypto throughput, available RAM, disk I/O speed,
4and other host characteristics to inform router configuration.
5"""
6
7from __future__ import annotations
8
9import os
10import shutil
11import tempfile
12import time
13from dataclasses import dataclass
14
15
16@dataclass
17class LocalProbeResult:
18 """Results from local system assessment."""
19 crypto_ops_per_sec: int
20 available_ram_bytes: int
21 total_ram_bytes: int
22 cpu_count: int
23 disk_write_mbps: float
24 disk_free_bytes: int
25
26
27def measure_crypto_throughput(duration_seconds: float = 2.0) -> int:
28 """Measure X25519 key agreement throughput.
29
30 Times how many ECDH operations the CPU can perform per second.
31 This directly maps to tunnel participation capacity.
32 """
33 from cryptography.hazmat.primitives.asymmetric.x25519 import X25519PrivateKey
34
35 peer_key = X25519PrivateKey.generate().public_key()
36 count = 0
37 end = time.monotonic() + duration_seconds
38 while time.monotonic() < end:
39 key = X25519PrivateKey.generate()
40 key.exchange(peer_key)
41 count += 1
42 elapsed = time.monotonic() - (end - duration_seconds)
43 return int(count / elapsed) if elapsed > 0 else 0
44
45
46def measure_ram() -> tuple[int, int]:
47 """Measure available and total RAM.
48
49 Returns (available_bytes, total_bytes).
50 Uses /proc/meminfo on Linux, falls back to os.sysconf.
51 """
52 try:
53 with open("/proc/meminfo") as f:
54 mem = {}
55 for line in f:
56 parts = line.split()
57 if len(parts) >= 2:
58 mem[parts[0].rstrip(":")] = int(parts[1]) * 1024 # kB to bytes
59 available = mem.get("MemAvailable", mem.get("MemFree", 0))
60 total = mem.get("MemTotal", 0)
61 if total > 0:
62 return available, total
63 except (OSError, ValueError):
64 pass
65
66 # Fallback: os.sysconf (works on most Unix)
67 try:
68 page_size = os.sysconf("SC_PAGE_SIZE")
69 total = os.sysconf("SC_PHYS_PAGES") * page_size
70 available = os.sysconf("SC_AVPHYS_PAGES") * page_size
71 return available, total
72 except (ValueError, OSError):
73 pass
74
75 # Last resort
76 return 0, 0
77
78
79def measure_disk_speed(data_dir: str, size_mb: int = 10) -> float:
80 """Measure disk write speed by writing a temporary file.
81
82 Returns MB/s. Cleans up after itself.
83 """
84 data = os.urandom(size_mb * 1024 * 1024)
85 fd, path = tempfile.mkstemp(dir=data_dir, prefix=".i2p_disktest_")
86 try:
87 start = time.monotonic()
88 os.write(fd, data)
89 os.fsync(fd)
90 elapsed = time.monotonic() - start
91 return size_mb / elapsed if elapsed > 0 else 0.0
92 finally:
93 os.close(fd)
94 os.unlink(path)
95
96
97def run_local_probes(data_dir: str = "~/.i2p-python") -> LocalProbeResult:
98 """Run all local probes and return aggregated results."""
99 data_dir = os.path.expanduser(data_dir)
100 os.makedirs(data_dir, exist_ok=True)
101
102 crypto = measure_crypto_throughput(duration_seconds=1.0)
103 available_ram, total_ram = measure_ram()
104 disk_speed = measure_disk_speed(data_dir, size_mb=1)
105 disk_usage = shutil.disk_usage(data_dir)
106
107 return LocalProbeResult(
108 crypto_ops_per_sec=crypto,
109 available_ram_bytes=available_ram,
110 total_ram_bytes=total_ram,
111 cpu_count=os.cpu_count() or 1,
112 disk_write_mbps=disk_speed,
113 disk_free_bytes=disk_usage.free,
114 )