"""NTCP2 transport protocol types.""" import enum import struct class FrameType(enum.IntEnum): DATA = 0 PADDING = 254 DATETIME = 1 OPTIONS = 2 ROUTER_INFO = 3 I2NP = 4 TERMINATION = 5 class TransportState(enum.Enum): UNKNOWN = 0 HANDSHAKING = 1 ESTABLISHED = 2 TERMINATED = 3 ERROR = 4 class NTCP2Frame: """NTCP2 block/frame: type(1) + length(2) + payload.""" def __init__(self, frame_type: FrameType, payload: bytes): self.frame_type = frame_type self.payload = payload def to_bytes(self) -> bytes: return struct.pack("!BH", self.frame_type, len(self.payload)) + self.payload @classmethod def from_bytes(cls, data: bytes) -> "NTCP2Frame": frame_type, length = struct.unpack("!BH", data[:3]) return cls(FrameType(frame_type), data[3:3 + length]) @classmethod def from_stream(cls, stream) -> "NTCP2Frame": header = stream.read(3) frame_type, length = struct.unpack("!BH", header) payload = stream.read(length) return cls(FrameType(frame_type), payload) class NTCP2SessionState: """NTCP2 session state machine.""" def __init__(self): self.state = TransportState.UNKNOWN self.error_message: str | None = None self.messages_sent = 0 self.messages_received = 0 self._termination_reason: int | None = None def begin_handshake(self): if self.state != TransportState.UNKNOWN: raise RuntimeError(f"Cannot begin handshake from {self.state}") self.state = TransportState.HANDSHAKING def complete_handshake(self): if self.state != TransportState.HANDSHAKING: raise RuntimeError(f"Cannot complete handshake from {self.state}") self.state = TransportState.ESTABLISHED def terminate(self, reason: int = 0): self._termination_reason = reason self.state = TransportState.TERMINATED def set_error(self, message: str): self.error_message = message self.state = TransportState.ERROR def on_message_sent(self): self.messages_sent += 1 def on_message_received(self): self.messages_received += 1