"""SessionTag and Payload — session-layer data structures. Ported from net.i2p.data.SessionTag and I2P payload wire format. SessionTag: 32-byte random tag used in AES+SessionTag sessions. Payload: Variable-length byte container with 4-byte big-endian length prefix. """ from __future__ import annotations import os import struct class SessionTag: """A 32-byte random tag used in AES+SessionTag encrypted sessions. Each tag is single-use and identifies which session key to use when decrypting an incoming message. """ __slots__ = ("_data",) SIZE = 32 def __init__(self, data: bytes) -> None: if len(data) != self.SIZE: raise ValueError(f"SessionTag must be exactly 32 bytes, got {len(data)}") self._data = data def to_bytes(self) -> bytes: """Return the raw 32 bytes.""" return self._data @classmethod def from_bytes(cls, data: bytes) -> SessionTag: """Construct a SessionTag from exactly 32 bytes.""" return cls(data) @classmethod def random(cls) -> SessionTag: """Generate a random 32-byte session tag.""" return cls(os.urandom(cls.SIZE)) def __eq__(self, other: object) -> bool: if not isinstance(other, SessionTag): return NotImplemented return self._data == other._data def __hash__(self) -> int: return hash(self._data) def __repr__(self) -> str: return f"SessionTag({self._data[:4].hex()}...)" class Payload: """Variable-length byte container with a 4-byte big-endian length prefix. Wire format: [4 bytes length][length bytes data] """ __slots__ = ("_data",) def __init__(self, data: bytes) -> None: self._data = data def to_bytes(self) -> bytes: """Serialize to wire format: 4-byte big-endian length + data.""" return struct.pack("!I", len(self._data)) + self._data @classmethod def from_bytes(cls, data: bytes) -> Payload: """Deserialize from wire format bytes.""" if len(data) < 4: raise ValueError(f"Need at least 4 bytes for length prefix, got {len(data)}") length = struct.unpack("!I", data[:4])[0] if len(data) < 4 + length: raise ValueError( f"Truncated payload: need {length} bytes but only {len(data) - 4} available" ) return cls(data[4:4 + length]) @classmethod def from_stream(cls, stream) -> Payload: """Read a payload from a file-like object.""" header = stream.read(4) if len(header) < 4: raise ValueError(f"Truncated header: need 4 bytes, got {len(header)}") length = struct.unpack("!I", header)[0] body = stream.read(length) if len(body) < length: raise ValueError( f"Truncated payload body: need {length} bytes, got {len(body)}" ) return cls(body) def __len__(self) -> int: return len(self._data) def __eq__(self, other: object) -> bool: if not isinstance(other, Payload): return NotImplemented return self._data == other._data def __repr__(self) -> str: return f"Payload({len(self._data)} bytes)"