A Python port of the Invisible Internet Project (I2P)
1"""SAM utility functions."""
2
3import os
4import base64
5import re
6
7
8def parse_params(text: str) -> dict[str, str]:
9 """Parse KEY=VALUE KEY2=VALUE2 from SAM protocol line.
10
11 Handles quoted values: KEY="value with spaces".
12 Keys are normalized to uppercase.
13
14 Args:
15 text: The parameter portion of a SAM protocol line.
16
17 Returns:
18 Dictionary of uppercase keys to string values.
19 """
20 params: dict[str, str] = {}
21 if not text or not text.strip():
22 return params
23
24 # Match KEY=VALUE or KEY="quoted value"
25 pattern = re.compile(r'(\w+)=(?:"([^"]*)"|([\S]*))')
26 for match in pattern.finditer(text):
27 key = match.group(1).upper()
28 # Group 2 is quoted value, group 3 is unquoted
29 value = match.group(2) if match.group(2) is not None else match.group(3)
30 params[key] = value
31
32 return params
33
34
35def generate_transient_destination() -> tuple[bytes, str]:
36 """Generate a new transient Destination keypair.
37
38 Creates a random 387-byte destination (256 pubkey + 128 signing key +
39 3 null certificate) and returns both raw bytes and base64 encoding.
40
41 Returns:
42 Tuple of (raw_bytes, base64_string).
43 """
44 # Generate random key material:
45 # 256 bytes public key area + 128 bytes signing key area + 3 bytes null cert
46 pub_key = os.urandom(256)
47 sig_key = os.urandom(128)
48 # Null certificate: type=0, length=0
49 null_cert = b"\x00\x00\x00"
50
51 raw = pub_key + sig_key + null_cert
52 from i2p_data.data_helper import to_base64
53 b64 = to_base64(raw)
54 return raw, b64
55
56
57def negotiate_version(client_min: str, client_max: str,
58 server_versions: list[str]) -> str | None:
59 """Find best mutually supported version.
60
61 Selects the highest version that is within the client's requested
62 range [client_min, client_max] and also in server_versions.
63
64 Args:
65 client_min: Client's minimum acceptable version.
66 client_max: Client's maximum acceptable version.
67 server_versions: List of versions the server supports.
68
69 Returns:
70 The negotiated version string, or None if no match.
71 """
72 # Parse versions as tuples for comparison
73 def ver_tuple(v: str) -> tuple[int, ...]:
74 return tuple(int(x) for x in v.split("."))
75
76 min_t = ver_tuple(client_min)
77 max_t = ver_tuple(client_max)
78
79 # Filter server versions to those within client's range
80 candidates = []
81 for v in server_versions:
82 vt = ver_tuple(v)
83 if min_t <= vt <= max_t:
84 candidates.append((vt, v))
85
86 if not candidates:
87 return None
88
89 # Return the highest matching version
90 candidates.sort(reverse=True)
91 return candidates[0][1]