A Python port of the Invisible Internet Project (I2P)
at main 91 lines 2.7 kB view raw
1"""SAM utility functions.""" 2 3import os 4import base64 5import re 6 7 8def parse_params(text: str) -> dict[str, str]: 9 """Parse KEY=VALUE KEY2=VALUE2 from SAM protocol line. 10 11 Handles quoted values: KEY="value with spaces". 12 Keys are normalized to uppercase. 13 14 Args: 15 text: The parameter portion of a SAM protocol line. 16 17 Returns: 18 Dictionary of uppercase keys to string values. 19 """ 20 params: dict[str, str] = {} 21 if not text or not text.strip(): 22 return params 23 24 # Match KEY=VALUE or KEY="quoted value" 25 pattern = re.compile(r'(\w+)=(?:"([^"]*)"|([\S]*))') 26 for match in pattern.finditer(text): 27 key = match.group(1).upper() 28 # Group 2 is quoted value, group 3 is unquoted 29 value = match.group(2) if match.group(2) is not None else match.group(3) 30 params[key] = value 31 32 return params 33 34 35def generate_transient_destination() -> tuple[bytes, str]: 36 """Generate a new transient Destination keypair. 37 38 Creates a random 387-byte destination (256 pubkey + 128 signing key + 39 3 null certificate) and returns both raw bytes and base64 encoding. 40 41 Returns: 42 Tuple of (raw_bytes, base64_string). 43 """ 44 # Generate random key material: 45 # 256 bytes public key area + 128 bytes signing key area + 3 bytes null cert 46 pub_key = os.urandom(256) 47 sig_key = os.urandom(128) 48 # Null certificate: type=0, length=0 49 null_cert = b"\x00\x00\x00" 50 51 raw = pub_key + sig_key + null_cert 52 from i2p_data.data_helper import to_base64 53 b64 = to_base64(raw) 54 return raw, b64 55 56 57def negotiate_version(client_min: str, client_max: str, 58 server_versions: list[str]) -> str | None: 59 """Find best mutually supported version. 60 61 Selects the highest version that is within the client's requested 62 range [client_min, client_max] and also in server_versions. 63 64 Args: 65 client_min: Client's minimum acceptable version. 66 client_max: Client's maximum acceptable version. 67 server_versions: List of versions the server supports. 68 69 Returns: 70 The negotiated version string, or None if no match. 71 """ 72 # Parse versions as tuples for comparison 73 def ver_tuple(v: str) -> tuple[int, ...]: 74 return tuple(int(x) for x in v.split(".")) 75 76 min_t = ver_tuple(client_min) 77 max_t = ver_tuple(client_max) 78 79 # Filter server versions to those within client's range 80 candidates = [] 81 for v in server_versions: 82 vt = ver_tuple(v) 83 if min_t <= vt <= max_t: 84 candidates.append((vt, v)) 85 86 if not candidates: 87 return None 88 89 # Return the highest matching version 90 candidates.sort(reverse=True) 91 return candidates[0][1]