A Python port of the Invisible Internet Project (I2P)
1"""I2CP session configuration.
2
3Provides both the config-holder SessionConfig and the wire-format
4WireSessionConfig used in CreateSessionMessage/ReconfigureSessionMessage.
5"""
6
7import struct
8import time
9
10from i2p_client.i2cp_messages import _encode_properties, _decode_properties
11
12
13_DEFAULTS = {
14 "inbound.quantity": "3",
15 "inbound.length": "3",
16 "inbound.backupQuantity": "1",
17 "outbound.quantity": "3",
18 "outbound.length": "3",
19 "outbound.backupQuantity": "1",
20 "crypto.tagsToSend": "40",
21 "crypto.lowTagThreshold": "30",
22}
23
24
25class SessionConfig:
26 """I2CP session configuration with defaults and merge support."""
27
28 def __init__(self, overrides: dict[str, str] | None = None):
29 self._options: dict[str, str] = dict(_DEFAULTS)
30 if overrides:
31 self._options.update(overrides)
32
33 def get(self, key: str, default: str | None = None) -> str | None:
34 return self._options.get(key, default)
35
36 def set(self, key: str, value: str):
37 self._options[key] = value
38
39 def merge(self, other: dict[str, str]):
40 self._options.update(other)
41
42 def to_properties(self) -> dict[str, str]:
43 return dict(self._options)
44
45 @classmethod
46 def from_properties(cls, props: dict[str, str]) -> "SessionConfig":
47 return cls(props)
48
49 def __eq__(self, other) -> bool:
50 if not isinstance(other, SessionConfig):
51 return NotImplemented
52 return self._options == other._options
53
54 def __repr__(self) -> str:
55 return f"SessionConfig({self._options})"
56
57
58class WireSessionConfig:
59 """I2CP SessionConfig wire format for CreateSession/ReconfigureSession.
60
61 Wire format:
62 destination: variable (self-delimiting, 387+ bytes)
63 signature: 2-byte len prefix + signature bytes
64 date: 8 bytes (ms since epoch)
65 options: I2P properties (count-based encoding)
66 """
67
68 def __init__(
69 self,
70 destination_data: bytes,
71 signature: bytes = b"",
72 date_ms: int = 0,
73 options: dict[str, str] | None = None,
74 ) -> None:
75 self.destination_data = destination_data
76 self.signature = signature
77 self.date_ms = date_ms if date_ms else int(time.time() * 1000)
78 self.options = options or {}
79
80 def to_bytes(self) -> bytes:
81 """Serialize to wire format."""
82 parts = [
83 self.destination_data,
84 struct.pack("!H", len(self.signature)),
85 self.signature,
86 struct.pack("!Q", self.date_ms),
87 _encode_properties(self.options),
88 ]
89 return b"".join(parts)
90
91 @classmethod
92 def from_bytes(cls, data: bytes) -> "WireSessionConfig":
93 """Deserialize from wire format."""
94 # Parse destination — cert at offset 384 (256+128)
95 cert_offset = 384
96 if len(data) < cert_offset + 3:
97 raise ValueError("Data too short for destination")
98 cert_payload_len = struct.unpack("!H", data[cert_offset + 1:cert_offset + 3])[0]
99 dest_len = cert_offset + 3 + cert_payload_len
100 destination_data = data[:dest_len]
101 offset = dest_len
102
103 # Signature (2-byte length prefix + data)
104 sig_len = struct.unpack("!H", data[offset:offset + 2])[0]
105 offset += 2
106 signature = data[offset:offset + sig_len]
107 offset += sig_len
108
109 # Date (8 bytes)
110 date_ms = struct.unpack("!Q", data[offset:offset + 8])[0]
111 offset += 8
112
113 # Options
114 options, _ = _decode_properties(data[offset:])
115
116 return cls(destination_data, signature, date_ms, options)