"""SAM utility functions.""" import os import base64 import re def parse_params(text: str) -> dict[str, str]: """Parse KEY=VALUE KEY2=VALUE2 from SAM protocol line. Handles quoted values: KEY="value with spaces". Keys are normalized to uppercase. Args: text: The parameter portion of a SAM protocol line. Returns: Dictionary of uppercase keys to string values. """ params: dict[str, str] = {} if not text or not text.strip(): return params # Match KEY=VALUE or KEY="quoted value" pattern = re.compile(r'(\w+)=(?:"([^"]*)"|([\S]*))') for match in pattern.finditer(text): key = match.group(1).upper() # Group 2 is quoted value, group 3 is unquoted value = match.group(2) if match.group(2) is not None else match.group(3) params[key] = value return params def generate_transient_destination() -> tuple[bytes, str]: """Generate a new transient Destination keypair. Creates a random 387-byte destination (256 pubkey + 128 signing key + 3 null certificate) and returns both raw bytes and base64 encoding. Returns: Tuple of (raw_bytes, base64_string). """ # Generate random key material: # 256 bytes public key area + 128 bytes signing key area + 3 bytes null cert pub_key = os.urandom(256) sig_key = os.urandom(128) # Null certificate: type=0, length=0 null_cert = b"\x00\x00\x00" raw = pub_key + sig_key + null_cert from i2p_data.data_helper import to_base64 b64 = to_base64(raw) return raw, b64 def negotiate_version(client_min: str, client_max: str, server_versions: list[str]) -> str | None: """Find best mutually supported version. Selects the highest version that is within the client's requested range [client_min, client_max] and also in server_versions. Args: client_min: Client's minimum acceptable version. client_max: Client's maximum acceptable version. server_versions: List of versions the server supports. Returns: The negotiated version string, or None if no match. """ # Parse versions as tuples for comparison def ver_tuple(v: str) -> tuple[int, ...]: return tuple(int(x) for x in v.split(".")) min_t = ver_tuple(client_min) max_t = ver_tuple(client_max) # Filter server versions to those within client's range candidates = [] for v in server_versions: vt = ver_tuple(v) if min_t <= vt <= max_t: candidates.append((vt, v)) if not candidates: return None # Return the highest matching version candidates.sort(reverse=True) return candidates[0][1]