"""I2NP message types — header, base class, and concrete messages.""" import hashlib import struct from abc import ABC, abstractmethod class I2NPHeader: """I2NP message header: type(1) + msg_id(4) + expiration(8) + size(2) + checksum(1) = 16 bytes.""" SIZE = 16 def __init__(self, msg_type: int, msg_id: int, expiration: int, size: int, checksum: int): self.msg_type = msg_type self.msg_id = msg_id self.expiration = expiration self.size = size self.checksum = checksum def to_bytes(self) -> bytes: return struct.pack("!BIQHB", self.msg_type, self.msg_id, self.expiration, self.size, self.checksum) @classmethod def from_bytes(cls, data: bytes) -> "I2NPHeader": if len(data) < cls.SIZE: raise ValueError(f"I2NPHeader requires {cls.SIZE} bytes, got {len(data)}") msg_type, msg_id, expiration, size, checksum = struct.unpack("!BIQHB", data[:cls.SIZE]) return cls(msg_type, msg_id, expiration, size, checksum) @classmethod def from_stream(cls, stream) -> "I2NPHeader": data = stream.read(cls.SIZE) if len(data) < cls.SIZE: raise ValueError(f"I2NPHeader requires {cls.SIZE} bytes") return cls.from_bytes(data) class I2NPMessage(ABC): """Base class for I2NP messages with type registry.""" TYPE: int = -1 _registry: dict[int, type["I2NPMessage"]] = {} _header_msg_id: int = 0 _header_expiration: int = 0 @classmethod def __init_subclass__(cls, **kwargs): super().__init_subclass__(**kwargs) if hasattr(cls, 'TYPE') and cls.TYPE >= 0: I2NPMessage._registry[cls.TYPE] = cls @abstractmethod def body_bytes(self) -> bytes: """Serialize the message body (without header).""" @staticmethod def calculate_checksum(body: bytes) -> int: return hashlib.sha256(body).digest()[0] def to_bytes(self) -> bytes: body = self.body_bytes() checksum = self.calculate_checksum(body) header = I2NPHeader( msg_type=self.TYPE, msg_id=getattr(self, '_header_msg_id', 0), expiration=getattr(self, '_header_expiration', 0), size=len(body), checksum=checksum, ) return header.to_bytes() + body @classmethod def from_bytes(cls, data: bytes) -> "I2NPMessage": if len(data) < I2NPHeader.SIZE: raise ValueError(f"Need at least {I2NPHeader.SIZE} bytes for I2NP message") header = I2NPHeader.from_bytes(data) body_start = I2NPHeader.SIZE body_end = body_start + header.size if len(data) < body_end: raise ValueError(f"Message body truncated: need {header.size} bytes, got {len(data) - body_start}") body = data[body_start:body_end] expected_checksum = I2NPMessage.calculate_checksum(body) if header.checksum != expected_checksum: raise ValueError(f"Checksum mismatch: expected {expected_checksum}, got {header.checksum}") msg_cls = cls._registry.get(header.msg_type) if msg_cls is None: raise ValueError(f"Unknown I2NP message type: {header.msg_type}") msg = msg_cls._from_body(body) msg._header_msg_id = header.msg_id msg._header_expiration = header.expiration return msg @classmethod @abstractmethod def _from_body(cls, body: bytes) -> "I2NPMessage": """Deserialize from body bytes.""" class DeliveryStatusMessage(I2NPMessage): """Type 10: msg_id(4) + arrival_time(8).""" TYPE = 10 def __init__(self, msg_id: int, arrival_time: int): self.msg_id = msg_id self.arrival_time = arrival_time self._header_msg_id = msg_id self._header_expiration = 0 def body_bytes(self) -> bytes: return struct.pack("!IQ", self.msg_id, self.arrival_time) @classmethod def _from_body(cls, body: bytes) -> "DeliveryStatusMessage": msg_id, arrival_time = struct.unpack("!IQ", body[:12]) return cls(msg_id, arrival_time) class DataMessage(I2NPMessage): """Type 20: length(4) + payload.""" TYPE = 20 def __init__(self, payload: bytes): self.payload = payload self._header_msg_id = 0 self._header_expiration = 0 def body_bytes(self) -> bytes: return struct.pack("!I", len(self.payload)) + self.payload @classmethod def _from_body(cls, body: bytes) -> "DataMessage": length = struct.unpack("!I", body[:4])[0] return cls(body[4:4 + length]) class DatabaseStoreMessage(I2NPMessage): """Type 1: key(32) + type(1) + reply_token(4) + data.""" TYPE = 1 def __init__(self, key: bytes, ds_type: int, reply_token: int, data: bytes): self.key = key self.ds_type = ds_type self.reply_token = reply_token self.data = data self._header_msg_id = 0 self._header_expiration = 0 def body_bytes(self) -> bytes: return self.key + struct.pack("!BI", self.ds_type, self.reply_token) + self.data @classmethod def _from_body(cls, body: bytes) -> "DatabaseStoreMessage": key = body[:32] ds_type = body[32] reply_token = struct.unpack("!I", body[33:37])[0] data = body[37:] return cls(key, ds_type, reply_token, data) class DatabaseLookupMessage(I2NPMessage): """Type 2: key(32) + from_hash(32) + flags(1) + [reply_tunnel_id(4)] + size(1) + exclude_list.""" TYPE = 2 def __init__(self, key: bytes, from_hash: bytes, flags: int, reply_tunnel_id: int = 0, exclude_list: list[bytes] | None = None): self.key = key self.from_hash = from_hash self.flags = flags self.reply_tunnel_id = reply_tunnel_id self.exclude_list = exclude_list or [] self._header_msg_id = 0 self._header_expiration = 0 def body_bytes(self) -> bytes: parts = [self.key, self.from_hash, struct.pack("!B", self.flags)] if self.flags & 0x01: parts.append(struct.pack("!I", self.reply_tunnel_id)) parts.append(struct.pack("!B", len(self.exclude_list))) for ex in self.exclude_list: parts.append(ex) return b"".join(parts) @classmethod def _from_body(cls, body: bytes) -> "DatabaseLookupMessage": key = body[:32] from_hash = body[32:64] flags = body[64] offset = 65 reply_tunnel_id = 0 if flags & 0x01: reply_tunnel_id = struct.unpack("!I", body[offset:offset + 4])[0] offset += 4 num_excludes = body[offset] offset += 1 exclude_list = [] for _ in range(num_excludes): exclude_list.append(body[offset:offset + 32]) offset += 32 return cls(key, from_hash, flags, reply_tunnel_id, exclude_list)