"""I2CP message types — wire format and type registry.""" import struct from abc import ABC, abstractmethod class I2CPMessage(ABC): """Base class for I2CP messages. Wire format: 4-byte big-endian length + 1-byte type + payload. Length includes the type byte but not itself. """ TYPE: int = -1 _registry: dict[int, type["I2CPMessage"]] = {} @classmethod def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) if hasattr(cls, 'TYPE') and cls.TYPE >= 0: I2CPMessage._registry[cls.TYPE] = cls @abstractmethod def payload_bytes(self) -> bytes: """Serialize message payload (after type byte).""" @classmethod @abstractmethod def _from_payload(cls, payload: bytes) -> "I2CPMessage": """Deserialize from payload bytes.""" def to_wire(self) -> bytes: payload = self.payload_bytes() length = 1 + len(payload) # type byte + payload return struct.pack("!IB", length, self.TYPE) + payload @classmethod def from_wire(cls, data: bytes) -> "I2CPMessage": if len(data) < 5: raise ValueError(f"I2CP message needs at least 5 bytes, got {len(data)}") length = struct.unpack("!I", data[:4])[0] msg_type = data[4] payload = data[5:4 + length] msg_cls = cls._registry.get(msg_type) if msg_cls is None: raise ValueError(f"Unknown I2CP message type: {msg_type}") return msg_cls._from_payload(payload) class GetDateMessage(I2CPMessage): """Type 32: C→R request current date. Payload: version string + optional options.""" TYPE = 32 def __init__(self, version: str = "0.9.62", options: dict[str, str] | None = None): self.version = version self.options = options or {} def payload_bytes(self) -> bytes: vb = self.version.encode("utf-8") result = struct.pack("!B", len(vb)) + vb if self.options: result += _encode_properties(self.options) return result @classmethod def _from_payload(cls, payload: bytes) -> "GetDateMessage": vlen = payload[0] version = payload[1:1 + vlen].decode("utf-8") offset = 1 + vlen options: dict[str, str] = {} if offset < len(payload): options, _ = _decode_properties(payload[offset:]) return cls(version, options) class SetDateMessage(I2CPMessage): """Type 33: R→C send current date. Payload: date(8) + version.""" TYPE = 33 def __init__(self, date_ms: int, version: str = "0.9.62"): self.date_ms = date_ms self.version = version def payload_bytes(self) -> bytes: vb = self.version.encode("utf-8") return struct.pack("!Q", self.date_ms) + struct.pack("!B", len(vb)) + vb @classmethod def _from_payload(cls, payload: bytes) -> "SetDateMessage": date_ms = struct.unpack("!Q", payload[:8])[0] vlen = payload[8] version = payload[9:9 + vlen].decode("utf-8") return cls(date_ms, version) class SessionStatusMessage(I2CPMessage): """Type 20: R→C session creation result. session_id(2) + status(1).""" TYPE = 20 STATUS_DESTROYED = 0 STATUS_CREATED = 1 STATUS_UPDATED = 2 STATUS_INVALID = 3 STATUS_REFUSED = 4 def __init__(self, session_id: int, status: int): self.session_id = session_id self.status = status def payload_bytes(self) -> bytes: return struct.pack("!HB", self.session_id, self.status) @classmethod def _from_payload(cls, payload: bytes) -> "SessionStatusMessage": session_id, status = struct.unpack("!HB", payload[:3]) return cls(session_id, status) class CreateSessionMessage(I2CPMessage): """Type 1: C→R create session. destination_data + options.""" TYPE = 1 def __init__(self, destination_data: bytes, options: dict[str, str] | None = None): self.destination_data = destination_data self.options = options or {} def payload_bytes(self) -> bytes: opts = _encode_properties(self.options) return (struct.pack("!H", len(self.destination_data)) + self.destination_data + opts) @classmethod def _from_payload(cls, payload: bytes) -> "CreateSessionMessage": dest_len = struct.unpack("!H", payload[:2])[0] dest_data = payload[2:2 + dest_len] opts, _ = _decode_properties(payload[2 + dest_len:]) return cls(dest_data, opts) class DestroySessionMessage(I2CPMessage): """Type 3: C→R destroy session. session_id(2).""" TYPE = 3 def __init__(self, session_id: int): self.session_id = session_id def payload_bytes(self) -> bytes: return struct.pack("!H", self.session_id) @classmethod def _from_payload(cls, payload: bytes) -> "DestroySessionMessage": session_id = struct.unpack("!H", payload[:2])[0] return cls(session_id) class SendMessageMessage(I2CPMessage): """Type 5: C→R send message. Wire format: session_id(2) + Destination(self-delimiting) + Payload(4-byte size + data) + nonce(4). """ TYPE = 5 def __init__(self, session_id: int, destination_data: bytes, payload: bytes, nonce: int = 0): self.session_id = session_id self.destination_data = destination_data self.payload = payload self.nonce = nonce def payload_bytes(self) -> bytes: return (struct.pack("!H", self.session_id) + self.destination_data + struct.pack("!I", len(self.payload)) + self.payload + struct.pack("!I", self.nonce)) @classmethod def _from_payload(cls, payload: bytes) -> "SendMessageMessage": session_id = struct.unpack("!H", payload[:2])[0] dest_data, dest_len = _parse_destination_from(payload, 2) off = 2 + dest_len pl_len = struct.unpack("!I", payload[off:off + 4])[0] off += 4 pl = payload[off:off + pl_len] off += pl_len nonce = struct.unpack("!I", payload[off:off + 4])[0] return cls(session_id, dest_data, pl, nonce) class MessageStatusMessage(I2CPMessage): """Type 22: R→C delivery status. session_id(2) + msg_id(4) + nonce(4) + status(1) + size(4).""" TYPE = 22 STATUS_AVAILABLE = 0 STATUS_SEND_ACCEPTED = 1 STATUS_SEND_BEST_EFFORT_SUCCESS = 2 STATUS_SEND_BEST_EFFORT_FAILURE = 3 STATUS_SEND_GUARANTEED_SUCCESS = 4 STATUS_SEND_GUARANTEED_FAILURE = 5 STATUS_SEND_SUCCESS_LOCAL = 6 STATUS_SEND_FAILURE_LOCAL = 7 STATUS_SEND_FAILURE_ROUTER = 8 STATUS_SEND_FAILURE_NETWORK = 9 STATUS_SEND_FAILURE_BAD_SESSION = 10 STATUS_SEND_FAILURE_BAD_MESSAGE = 11 STATUS_SEND_FAILURE_BAD_OPTIONS = 12 STATUS_SEND_FAILURE_OVERFLOW = 13 STATUS_SEND_FAILURE_EXPIRED = 14 STATUS_SEND_FAILURE_LOCAL_LEASESET = 15 STATUS_SEND_FAILURE_NO_TUNNELS = 16 STATUS_SEND_FAILURE_UNSUPPORTED_ENCRYPTION = 17 STATUS_SEND_FAILURE_DESTINATION = 18 STATUS_SEND_FAILURE_BAD_LEASESET = 19 STATUS_SEND_FAILURE_EXPIRED_LEASESET = 20 STATUS_SEND_FAILURE_NO_LEASESET = 21 _SUCCESS_STATUSES = frozenset({1, 2, 4, 6}) _FAILURE_STATUSES = frozenset({3, 5, 7, 8, 9, 10, 11, 12, 13, 14, 15, 16, 17, 18, 19, 20, 21}) _STATUS_NAMES = { 0: "AVAILABLE", 1: "SEND_ACCEPTED", 2: "SEND_BEST_EFFORT_SUCCESS", 3: "SEND_BEST_EFFORT_FAILURE", 4: "SEND_GUARANTEED_SUCCESS", 5: "SEND_GUARANTEED_FAILURE", 6: "SEND_SUCCESS_LOCAL", 7: "SEND_FAILURE_LOCAL", 8: "SEND_FAILURE_ROUTER", 9: "SEND_FAILURE_NETWORK", 10: "SEND_FAILURE_BAD_SESSION", 11: "SEND_FAILURE_BAD_MESSAGE", 12: "SEND_FAILURE_BAD_OPTIONS", 13: "SEND_FAILURE_OVERFLOW", 14: "SEND_FAILURE_EXPIRED", 15: "SEND_FAILURE_LOCAL_LEASESET", 16: "SEND_FAILURE_NO_TUNNELS", 17: "SEND_FAILURE_UNSUPPORTED_ENCRYPTION", 18: "SEND_FAILURE_DESTINATION", 19: "SEND_FAILURE_BAD_LEASESET", 20: "SEND_FAILURE_EXPIRED_LEASESET", 21: "SEND_FAILURE_NO_LEASESET", } def __init__(self, session_id: int, msg_id: int, nonce: int, status: int, size: int): self.session_id = session_id self.msg_id = msg_id self.nonce = nonce self.status = status self.size = size def is_success(self) -> bool: return self.status in self._SUCCESS_STATUSES def is_failure(self) -> bool: return self.status in self._FAILURE_STATUSES def status_name(self) -> str: return self._STATUS_NAMES.get(self.status, f"UNKNOWN_{self.status}") def payload_bytes(self) -> bytes: return struct.pack("!HIIBI", self.session_id, self.msg_id, self.nonce, self.status, self.size) @classmethod def _from_payload(cls, payload: bytes) -> "MessageStatusMessage": session_id, msg_id, nonce, status, size = struct.unpack("!HIIBI", payload[:15]) return cls(session_id, msg_id, nonce, status, size) class MessagePayloadMessage(I2CPMessage): """Type 31: R→C message payload. session_id(2) + msg_id(4) + payload_len(4) + payload.""" TYPE = 31 def __init__(self, session_id: int, msg_id: int, payload: bytes): self.session_id = session_id self.msg_id = msg_id self.payload = payload def payload_bytes(self) -> bytes: return (struct.pack("!HII", self.session_id, self.msg_id, len(self.payload)) + self.payload) @classmethod def _from_payload(cls, payload: bytes) -> "MessagePayloadMessage": session_id, msg_id, pl_len = struct.unpack("!HII", payload[:10]) pl = payload[10:10 + pl_len] return cls(session_id, msg_id, pl) class ReceiveMessageBeginMessage(I2CPMessage): """Type 6: R→C notify message available. session_id(2) + msg_id(4).""" TYPE = 6 def __init__(self, session_id: int, msg_id: int): self.session_id = session_id self.msg_id = msg_id def payload_bytes(self) -> bytes: return struct.pack("!HI", self.session_id, self.msg_id) @classmethod def _from_payload(cls, payload: bytes) -> "ReceiveMessageBeginMessage": session_id, msg_id = struct.unpack("!HI", payload[:6]) return cls(session_id, msg_id) class ReceiveMessageEndMessage(I2CPMessage): """Type 7: C→R acknowledge message received. session_id(2) + msg_id(4).""" TYPE = 7 def __init__(self, session_id: int, msg_id: int): self.session_id = session_id self.msg_id = msg_id def payload_bytes(self) -> bytes: return struct.pack("!HI", self.session_id, self.msg_id) @classmethod def _from_payload(cls, payload: bytes) -> "ReceiveMessageEndMessage": session_id, msg_id = struct.unpack("!HI", payload[:6]) return cls(session_id, msg_id) class HostLookupMessage(I2CPMessage): """Type 38: C→R resolve hostname. Wire format: session_id(2) + request_id(4) + timeout(4) + lookup_type(1) + [hash(32) | hostname(1-byte len + str)]. Java type codes: 0=LOOKUP_HASH, 1=LOOKUP_HOST. """ TYPE = 38 LOOKUP_HASH = 0 LOOKUP_HOST = 1 def __init__(self, session_id: int, request_id: int, hostname: str | None = None, dest_hash: bytes | None = None, timeout: int = 10000): self.session_id = session_id self.request_id = request_id self.hostname = hostname self.dest_hash = dest_hash self.timeout = timeout def payload_bytes(self) -> bytes: parts = [struct.pack("!HII", self.session_id, self.request_id, self.timeout)] if self.dest_hash is not None: parts.append(struct.pack("!B", self.LOOKUP_HASH)) # 0 = hash parts.append(self.dest_hash) elif self.hostname is not None: hb = self.hostname.encode("utf-8") parts.append(struct.pack("!BB", self.LOOKUP_HOST, len(hb))) # 1 = host, 1-byte len parts.append(hb) return b"".join(parts) @classmethod def _from_payload(cls, payload: bytes) -> "HostLookupMessage": session_id, request_id, timeout = struct.unpack("!HII", payload[:10]) lookup_type = payload[10] hostname = None dest_hash = None if lookup_type == cls.LOOKUP_HASH: dest_hash = payload[11:11 + 32] elif lookup_type == cls.LOOKUP_HOST: name_len = payload[11] hostname = payload[12:12 + name_len].decode("utf-8") return cls(session_id, request_id, hostname=hostname, dest_hash=dest_hash, timeout=timeout) class HostReplyMessage(I2CPMessage): """Type 39: R→C hostname resolution result.""" TYPE = 39 RESULT_SUCCESS = 0 RESULT_FAILURE = 1 RESULT_SECRET_REQUIRED = 2 RESULT_KEY_REQUIRED = 3 RESULT_SECRET_AND_KEY_REQUIRED = 4 RESULT_DECRYPTION_FAILURE = 5 def __init__(self, session_id: int, request_id: int, result_code: int, destination_data: bytes | None = None): self.session_id = session_id self.request_id = request_id self.result_code = result_code self.destination_data = destination_data def payload_bytes(self) -> bytes: parts = [struct.pack("!HIB", self.session_id, self.request_id, self.result_code)] if self.result_code == 0 and self.destination_data is not None: parts.append(self.destination_data) # self-delimiting, no length prefix return b"".join(parts) @classmethod def _from_payload(cls, payload: bytes) -> "HostReplyMessage": session_id, request_id, result_code = struct.unpack("!HIB", payload[:7]) dest_data = None if result_code == 0 and len(payload) > 7: dest_data = payload[7:] # remaining bytes are the destination return cls(session_id, request_id, result_code, dest_data) class DisconnectMessage(I2CPMessage): """Type 30: R→C disconnect. reason string.""" TYPE = 30 def __init__(self, reason: str = ""): self.reason = reason def payload_bytes(self) -> bytes: rb = self.reason.encode("utf-8") return struct.pack("!B", len(rb)) + rb @classmethod def _from_payload(cls, payload: bytes) -> "DisconnectMessage": if not payload: return cls("") rlen = payload[0] reason = payload[1:1 + rlen].decode("utf-8") return cls(reason) class GetBandwidthLimitsMessage(I2CPMessage): """Type 8: C→R request bandwidth limits. Empty payload.""" TYPE = 8 def payload_bytes(self) -> bytes: return b"" @classmethod def _from_payload(cls, payload: bytes) -> "GetBandwidthLimitsMessage": return cls() class BandwidthLimitsMessage(I2CPMessage): """Type 23: R→C bandwidth limits. 16 x int32 = 64 bytes.""" TYPE = 23 def __init__(self, limits: list[int]): if len(limits) != 16: raise ValueError(f"Expected 16 limit values, got {len(limits)}") self.limits = list(limits) def payload_bytes(self) -> bytes: return struct.pack("!16i", *self.limits) @classmethod def _from_payload(cls, payload: bytes) -> "BandwidthLimitsMessage": values = list(struct.unpack("!16i", payload[:64])) return cls(values) @property def client_inbound(self) -> int: return self.limits[0] @property def client_outbound(self) -> int: return self.limits[1] @property def router_inbound(self) -> int: return self.limits[2] @property def router_outbound(self) -> int: return self.limits[4] class DestLookupMessage(I2CPMessage): """Type 34: C→R legacy dest lookup. dest_hash(32).""" TYPE = 34 def __init__(self, dest_hash: bytes): if len(dest_hash) != 32: raise ValueError(f"dest_hash must be 32 bytes, got {len(dest_hash)}") self.dest_hash = dest_hash def payload_bytes(self) -> bytes: return self.dest_hash @classmethod def _from_payload(cls, payload: bytes) -> "DestLookupMessage": return cls(payload[:32]) class DestReplyMessage(I2CPMessage): """Type 35: R→C dest lookup result. Variable length.""" TYPE = 35 def __init__(self, destination_data: bytes | None = None, hash_data: bytes | None = None): self.destination_data = destination_data self.hash_data = hash_data def payload_bytes(self) -> bytes: if self.destination_data is not None: return self.destination_data if self.hash_data is not None: return self.hash_data return b"" @classmethod def _from_payload(cls, payload: bytes) -> "DestReplyMessage": if len(payload) == 0: return cls() # failure, no data if len(payload) == 32: return cls(hash_data=payload) # hash echo (failure) return cls(destination_data=payload) # success class ReportAbuseMessage(I2CPMessage): """Type 29: bidirectional abuse report. session_id(2) + severity(1) + reason + msg_id(4).""" TYPE = 29 def __init__(self, session_id: int, severity: int, reason: str, message_id: int): self.session_id = session_id self.severity = severity self.reason = reason self.message_id = message_id def payload_bytes(self) -> bytes: rb = self.reason.encode("utf-8") return (struct.pack("!HB", self.session_id, self.severity) + struct.pack("!B", len(rb)) + rb + struct.pack("!I", self.message_id)) @classmethod def _from_payload(cls, payload: bytes) -> "ReportAbuseMessage": session_id, severity = struct.unpack("!HB", payload[:3]) rlen = payload[3] reason = payload[4:4 + rlen].decode("utf-8") message_id = struct.unpack("!I", payload[4 + rlen:8 + rlen])[0] return cls(session_id, severity, reason, message_id) class RequestLeaseSetMessage(I2CPMessage): """Type 21: R→C request lease set. Wire format: session_id(2) + num_tunnels(1) + [router_hash(32) + tunnel_id(4)] * N + end_date(8). """ TYPE = 21 def __init__(self, session_id: int, tunnels: list[tuple[bytes, int]], end_date: int): self.session_id = session_id self.tunnels = tunnels self.end_date = end_date def payload_bytes(self) -> bytes: parts = [struct.pack("!HB", self.session_id, len(self.tunnels))] for router_hash, tunnel_id in self.tunnels: parts.append(router_hash) parts.append(struct.pack("!I", tunnel_id)) parts.append(struct.pack("!Q", self.end_date)) return b"".join(parts) @classmethod def _from_payload(cls, payload: bytes) -> "RequestLeaseSetMessage": session_id, num_tunnels = struct.unpack("!HB", payload[:3]) offset = 3 tunnels = [] for _ in range(num_tunnels): router_hash = payload[offset:offset + 32] tunnel_id = struct.unpack("!I", payload[offset + 32:offset + 36])[0] tunnels.append((router_hash, tunnel_id)) offset += 36 end_date = struct.unpack("!Q", payload[offset:offset + 8])[0] return cls(session_id, tunnels, end_date) class RequestVariableLeaseSetMessage(I2CPMessage): """Type 37: R→C request variable lease set. Wire format: session_id(2) + num_leases(1) + [tunnel_gw(32) + tunnel_id(4) + end_date(8)] * N. """ TYPE = 37 def __init__(self, session_id: int, leases: list[tuple[bytes, int, int]]): self.session_id = session_id self.leases = leases def payload_bytes(self) -> bytes: parts = [struct.pack("!HB", self.session_id, len(self.leases))] for gw_hash, tunnel_id, end_date_ms in self.leases: parts.append(gw_hash) parts.append(struct.pack("!IQ", tunnel_id, end_date_ms)) return b"".join(parts) @classmethod def _from_payload(cls, payload: bytes) -> "RequestVariableLeaseSetMessage": session_id, num_leases = struct.unpack("!HB", payload[:3]) offset = 3 leases = [] for _ in range(num_leases): gw_hash = payload[offset:offset + 32] tunnel_id, end_date_ms = struct.unpack( "!IQ", payload[offset + 32:offset + 44]) leases.append((gw_hash, tunnel_id, end_date_ms)) offset += 44 return cls(session_id, leases) class CreateLeaseSetMessage(I2CPMessage): """Type 4: C→R create lease set. Wire format: session_id(2) + signing_private_key_len(2) + signing_private_key + private_key_len(2) + private_key + lease_set_data. """ TYPE = 4 def __init__(self, session_id: int, signing_private_key: bytes, private_key: bytes, lease_set_data: bytes): self.session_id = session_id self.signing_private_key = signing_private_key self.private_key = private_key self.lease_set_data = lease_set_data def payload_bytes(self) -> bytes: return (struct.pack("!HH", self.session_id, len(self.signing_private_key)) + self.signing_private_key + struct.pack("!H", len(self.private_key)) + self.private_key + self.lease_set_data) @classmethod def _from_payload(cls, payload: bytes) -> "CreateLeaseSetMessage": session_id, spk_len = struct.unpack("!HH", payload[:4]) offset = 4 signing_private_key = payload[offset:offset + spk_len] offset += spk_len pk_len = struct.unpack("!H", payload[offset:offset + 2])[0] offset += 2 private_key = payload[offset:offset + pk_len] offset += pk_len lease_set_data = payload[offset:] return cls(session_id, signing_private_key, private_key, lease_set_data) class CreateLeaseSet2Message(I2CPMessage): """Type 41: C→R create lease set 2. Wire format: session_id(2) + ls_type(1) + lease_set_data_len(4) + lease_set_data + num_keys(1) + [enc_type(2) + enc_len(2) + priv_key(enc_len)] * N. """ TYPE = 41 LS_TYPE_LEASESET = 1 LS_TYPE_LS2 = 3 LS_TYPE_ENCRYPTED_LS2 = 5 LS_TYPE_META_LS2 = 7 def __init__(self, session_id: int, ls_type: int, lease_set_data: bytes, private_keys: list[tuple[int, bytes]]): self.session_id = session_id self.ls_type = ls_type self.lease_set_data = lease_set_data self.private_keys = private_keys def payload_bytes(self) -> bytes: parts = [ struct.pack("!HBI", self.session_id, self.ls_type, len(self.lease_set_data)), self.lease_set_data, struct.pack("!B", len(self.private_keys)), ] for enc_type, priv_key in self.private_keys: parts.append(struct.pack("!HH", enc_type, len(priv_key))) parts.append(priv_key) return b"".join(parts) @classmethod def _from_payload(cls, payload: bytes) -> "CreateLeaseSet2Message": session_id, ls_type, ls_data_len = struct.unpack( "!HBI", payload[:7]) offset = 7 lease_set_data = payload[offset:offset + ls_data_len] offset += ls_data_len num_keys = payload[offset] offset += 1 private_keys = [] for _ in range(num_keys): enc_type, enc_len = struct.unpack( "!HH", payload[offset:offset + 4]) offset += 4 priv_key = payload[offset:offset + enc_len] offset += enc_len private_keys.append((enc_type, priv_key)) return cls(session_id, ls_type, lease_set_data, private_keys) def _parse_destination_from(data: bytes, offset: int) -> tuple[bytes, int]: """Parse a self-delimiting Destination from data at offset. Returns (destination_bytes, total_dest_length). The destination is: 256 (pub) + 128 (sig) + cert (3 + cert_payload_len). """ cert_start = offset + 384 # 256 + 128 if len(data) < cert_start + 3: raise ValueError("Data too short for destination") cert_payload_len = struct.unpack("!H", data[cert_start + 1:cert_start + 3])[0] dest_len = 384 + 3 + cert_payload_len return data[offset:offset + dest_len], dest_len class DateAndFlags: """Combined date + flags for SendMessageExpiresMessage. Wire format: 8 bytes total. Upper 16 bits = flags, lower 48 bits = date_ms. """ SEND_RELIABLE = 0x0001 REQUEST_LEASESET = 0x0002 def __init__(self, date_ms: int = 0, flags: int = 0): self.date_ms = date_ms self.flags = flags def to_bytes(self) -> bytes: combined = (self.flags << 48) | (self.date_ms & 0xFFFFFFFFFFFF) return struct.pack("!Q", combined) @classmethod def from_bytes(cls, data: bytes) -> "DateAndFlags": combined = struct.unpack("!Q", data[:8])[0] flags = (combined >> 48) & 0xFFFF date_ms = combined & 0xFFFFFFFFFFFF return cls(date_ms, flags) class ReconfigureSessionMessage(I2CPMessage): """Type 2: C→R reconfigure session. session_id(2) + SessionConfig(var).""" TYPE = 2 def __init__(self, session_id: int, session_config): self.session_id = session_id self.session_config = session_config def payload_bytes(self) -> bytes: return struct.pack("!H", self.session_id) + self.session_config.to_bytes() @classmethod def _from_payload(cls, payload: bytes) -> "ReconfigureSessionMessage": from i2p_client.session_config import WireSessionConfig session_id = struct.unpack("!H", payload[:2])[0] sc = WireSessionConfig.from_bytes(payload[2:]) return cls(session_id, sc) class SendMessageExpiresMessage(I2CPMessage): """Type 36: C→R send message with expiration. Wire format: session_id(2) + Destination(self-delimiting) + Payload(4-byte size + data) + nonce(4) + expiration(8). """ TYPE = 36 def __init__(self, session_id: int, destination_data: bytes, payload: bytes, nonce: int = 0, expiration: DateAndFlags | None = None): self.session_id = session_id self.destination_data = destination_data self.payload = payload self.nonce = nonce self.expiration = expiration or DateAndFlags() def payload_bytes(self) -> bytes: return (struct.pack("!H", self.session_id) + self.destination_data + struct.pack("!I", len(self.payload)) + self.payload + struct.pack("!I", self.nonce) + self.expiration.to_bytes()) @classmethod def _from_payload(cls, payload: bytes) -> "SendMessageExpiresMessage": session_id = struct.unpack("!H", payload[:2])[0] dest_data, dest_len = _parse_destination_from(payload, 2) off = 2 + dest_len pl_len = struct.unpack("!I", payload[off:off + 4])[0] off += 4 pl = payload[off:off + pl_len] off += pl_len nonce = struct.unpack("!I", payload[off:off + 4])[0] off += 4 expiration = DateAndFlags.from_bytes(payload[off:off + 8]) return cls(session_id, dest_data, pl, nonce, expiration) # SigType code -> public key length (bytes) for BlindingInfoMessage endpoint type 3 _SIGTYPE_PUBKEY_LEN = { 0: 128, # DSA_SHA1 1: 64, # ECDSA_SHA256_P256 2: 96, # ECDSA_SHA384_P384 3: 132, # ECDSA_SHA512_P521 7: 32, # EdDSA_SHA512_Ed25519 11: 32, # RedDSA_SHA512_Ed25519 } class BlindingInfoMessage(I2CPMessage): """I2CP BlindingInfo message (Type 42, C->R). Tells the router about blinding configuration for encrypted LeaseSet2. Wire format: session_id(2) + endpoint_type(1) + auth_type(1) + blind_type(2) + expiration(4) + [endpoint_data] + [auth_key(32)] """ TYPE = 42 # Endpoint type constants ENDPOINT_HASH = 0 ENDPOINT_HOST = 1 ENDPOINT_DEST = 2 ENDPOINT_PUBKEY = 3 # Auth type constants AUTH_NONE = 0 AUTH_DH = 1 AUTH_PSK = 2 def __init__( self, session_id: int, endpoint_type: int, auth_type: int, blind_type: int, expiration: int, endpoint_data: bytes, auth_key: bytes | None = None, ) -> None: self.session_id = session_id self.endpoint_type = endpoint_type self.auth_type = auth_type self.blind_type = blind_type self.expiration = expiration self.endpoint_data = endpoint_data self.auth_key = auth_key def payload_bytes(self) -> bytes: parts = [ struct.pack("!H", self.session_id), struct.pack("!B", self.endpoint_type), struct.pack("!B", self.auth_type), struct.pack("!H", self.blind_type), struct.pack("!I", self.expiration), self.endpoint_data, ] if self.auth_type != self.AUTH_NONE and self.auth_key is not None: parts.append(self.auth_key) return b"".join(parts) @classmethod def _from_payload(cls, payload: bytes) -> "BlindingInfoMessage": if len(payload) < 10: raise ValueError("BlindingInfoMessage payload too short") session_id = struct.unpack("!H", payload[0:2])[0] endpoint_type = payload[2] auth_type = payload[3] blind_type = struct.unpack("!H", payload[4:6])[0] expiration = struct.unpack("!I", payload[6:10])[0] offset = 10 # Parse endpoint data based on type if endpoint_type == cls.ENDPOINT_HASH: endpoint_data = payload[offset:offset + 32] offset += 32 elif endpoint_type == cls.ENDPOINT_HOST: host_len = payload[offset] endpoint_data = payload[offset:offset + 1 + host_len] offset += 1 + host_len elif endpoint_type == cls.ENDPOINT_DEST: endpoint_data, dest_len = _parse_destination_from(payload, offset) offset += dest_len elif endpoint_type == cls.ENDPOINT_PUBKEY: sig_type_code = struct.unpack("!H", payload[offset:offset + 2])[0] pubkey_len = _SIGTYPE_PUBKEY_LEN.get(sig_type_code) if pubkey_len is None: raise ValueError(f"Unknown SigType code {sig_type_code}") ep_len = 2 + pubkey_len endpoint_data = payload[offset:offset + ep_len] offset += ep_len else: raise ValueError(f"Unknown endpoint type {endpoint_type}") # Parse auth key if present auth_key = None if auth_type != cls.AUTH_NONE: auth_key = payload[offset:offset + 32] offset += 32 return cls( session_id=session_id, endpoint_type=endpoint_type, auth_type=auth_type, blind_type=blind_type, expiration=expiration, endpoint_data=endpoint_data, auth_key=auth_key, ) def _encode_properties(props: dict[str, str]) -> bytes: """Encode properties as I2P wire format: count(2) + (key_len(2) + key + val_len(2) + val)*.""" parts = [struct.pack("!H", len(props))] for k, v in sorted(props.items()): kb = k.encode("utf-8") vb = v.encode("utf-8") parts.append(struct.pack("!H", len(kb)) + kb + struct.pack("!H", len(vb)) + vb) return b"".join(parts) def _decode_properties(data: bytes) -> tuple[dict[str, str], int]: """Decode properties from I2P wire format. Returns (dict, bytes_consumed).""" if len(data) < 2: return {}, 0 count = struct.unpack("!H", data[:2])[0] offset = 2 props = {} for _ in range(count): klen = struct.unpack("!H", data[offset:offset + 2])[0] offset += 2 key = data[offset:offset + klen].decode("utf-8") offset += klen vlen = struct.unpack("!H", data[offset:offset + 2])[0] offset += 2 val = data[offset:offset + vlen].decode("utf-8") offset += vlen props[key] = val return props, offset