A Python port of the Invisible Internet Project (I2P)
at main 116 lines 3.6 kB view raw
1"""I2CP session configuration. 2 3Provides both the config-holder SessionConfig and the wire-format 4WireSessionConfig used in CreateSessionMessage/ReconfigureSessionMessage. 5""" 6 7import struct 8import time 9 10from i2p_client.i2cp_messages import _encode_properties, _decode_properties 11 12 13_DEFAULTS = { 14 "inbound.quantity": "3", 15 "inbound.length": "3", 16 "inbound.backupQuantity": "1", 17 "outbound.quantity": "3", 18 "outbound.length": "3", 19 "outbound.backupQuantity": "1", 20 "crypto.tagsToSend": "40", 21 "crypto.lowTagThreshold": "30", 22} 23 24 25class SessionConfig: 26 """I2CP session configuration with defaults and merge support.""" 27 28 def __init__(self, overrides: dict[str, str] | None = None): 29 self._options: dict[str, str] = dict(_DEFAULTS) 30 if overrides: 31 self._options.update(overrides) 32 33 def get(self, key: str, default: str | None = None) -> str | None: 34 return self._options.get(key, default) 35 36 def set(self, key: str, value: str): 37 self._options[key] = value 38 39 def merge(self, other: dict[str, str]): 40 self._options.update(other) 41 42 def to_properties(self) -> dict[str, str]: 43 return dict(self._options) 44 45 @classmethod 46 def from_properties(cls, props: dict[str, str]) -> "SessionConfig": 47 return cls(props) 48 49 def __eq__(self, other) -> bool: 50 if not isinstance(other, SessionConfig): 51 return NotImplemented 52 return self._options == other._options 53 54 def __repr__(self) -> str: 55 return f"SessionConfig({self._options})" 56 57 58class WireSessionConfig: 59 """I2CP SessionConfig wire format for CreateSession/ReconfigureSession. 60 61 Wire format: 62 destination: variable (self-delimiting, 387+ bytes) 63 signature: 2-byte len prefix + signature bytes 64 date: 8 bytes (ms since epoch) 65 options: I2P properties (count-based encoding) 66 """ 67 68 def __init__( 69 self, 70 destination_data: bytes, 71 signature: bytes = b"", 72 date_ms: int = 0, 73 options: dict[str, str] | None = None, 74 ) -> None: 75 self.destination_data = destination_data 76 self.signature = signature 77 self.date_ms = date_ms if date_ms else int(time.time() * 1000) 78 self.options = options or {} 79 80 def to_bytes(self) -> bytes: 81 """Serialize to wire format.""" 82 parts = [ 83 self.destination_data, 84 struct.pack("!H", len(self.signature)), 85 self.signature, 86 struct.pack("!Q", self.date_ms), 87 _encode_properties(self.options), 88 ] 89 return b"".join(parts) 90 91 @classmethod 92 def from_bytes(cls, data: bytes) -> "WireSessionConfig": 93 """Deserialize from wire format.""" 94 # Parse destination — cert at offset 384 (256+128) 95 cert_offset = 384 96 if len(data) < cert_offset + 3: 97 raise ValueError("Data too short for destination") 98 cert_payload_len = struct.unpack("!H", data[cert_offset + 1:cert_offset + 3])[0] 99 dest_len = cert_offset + 3 + cert_payload_len 100 destination_data = data[:dest_len] 101 offset = dest_len 102 103 # Signature (2-byte length prefix + data) 104 sig_len = struct.unpack("!H", data[offset:offset + 2])[0] 105 offset += 2 106 signature = data[offset:offset + sig_len] 107 offset += sig_len 108 109 # Date (8 bytes) 110 date_ms = struct.unpack("!Q", data[offset:offset + 8])[0] 111 offset += 8 112 113 # Options 114 options, _ = _decode_properties(data[offset:]) 115 116 return cls(destination_data, signature, date_ms, options)