"""I2CP session configuration. Provides both the config-holder SessionConfig and the wire-format WireSessionConfig used in CreateSessionMessage/ReconfigureSessionMessage. """ import struct import time from i2p_client.i2cp_messages import _encode_properties, _decode_properties _DEFAULTS = { "inbound.quantity": "3", "inbound.length": "3", "inbound.backupQuantity": "1", "outbound.quantity": "3", "outbound.length": "3", "outbound.backupQuantity": "1", "crypto.tagsToSend": "40", "crypto.lowTagThreshold": "30", } class SessionConfig: """I2CP session configuration with defaults and merge support.""" def __init__(self, overrides: dict[str, str] | None = None): self._options: dict[str, str] = dict(_DEFAULTS) if overrides: self._options.update(overrides) def get(self, key: str, default: str | None = None) -> str | None: return self._options.get(key, default) def set(self, key: str, value: str): self._options[key] = value def merge(self, other: dict[str, str]): self._options.update(other) def to_properties(self) -> dict[str, str]: return dict(self._options) @classmethod def from_properties(cls, props: dict[str, str]) -> "SessionConfig": return cls(props) def __eq__(self, other) -> bool: if not isinstance(other, SessionConfig): return NotImplemented return self._options == other._options def __repr__(self) -> str: return f"SessionConfig({self._options})" class WireSessionConfig: """I2CP SessionConfig wire format for CreateSession/ReconfigureSession. Wire format: destination: variable (self-delimiting, 387+ bytes) signature: 2-byte len prefix + signature bytes date: 8 bytes (ms since epoch) options: I2P properties (count-based encoding) """ def __init__( self, destination_data: bytes, signature: bytes = b"", date_ms: int = 0, options: dict[str, str] | None = None, ) -> None: self.destination_data = destination_data self.signature = signature self.date_ms = date_ms if date_ms else int(time.time() * 1000) self.options = options or {} def to_bytes(self) -> bytes: """Serialize to wire format.""" parts = [ self.destination_data, struct.pack("!H", len(self.signature)), self.signature, struct.pack("!Q", self.date_ms), _encode_properties(self.options), ] return b"".join(parts) @classmethod def from_bytes(cls, data: bytes) -> "WireSessionConfig": """Deserialize from wire format.""" # Parse destination — cert at offset 384 (256+128) cert_offset = 384 if len(data) < cert_offset + 3: raise ValueError("Data too short for destination") cert_payload_len = struct.unpack("!H", data[cert_offset + 1:cert_offset + 3])[0] dest_len = cert_offset + 3 + cert_payload_len destination_data = data[:dest_len] offset = dest_len # Signature (2-byte length prefix + data) sig_len = struct.unpack("!H", data[offset:offset + 2])[0] offset += 2 signature = data[offset:offset + sig_len] offset += sig_len # Date (8 bytes) date_ms = struct.unpack("!Q", data[offset:offset + 8])[0] offset += 8 # Options options, _ = _decode_properties(data[offset:]) return cls(destination_data, signature, date_ms, options)