A Python port of the Invisible Internet Project (I2P)
1"""SessionTag and Payload — session-layer data structures.
2
3Ported from net.i2p.data.SessionTag and I2P payload wire format.
4
5SessionTag: 32-byte random tag used in AES+SessionTag sessions.
6Payload: Variable-length byte container with 4-byte big-endian length prefix.
7"""
8
9from __future__ import annotations
10
11import os
12import struct
13
14
15class SessionTag:
16 """A 32-byte random tag used in AES+SessionTag encrypted sessions.
17
18 Each tag is single-use and identifies which session key to use
19 when decrypting an incoming message.
20 """
21
22 __slots__ = ("_data",)
23
24 SIZE = 32
25
26 def __init__(self, data: bytes) -> None:
27 if len(data) != self.SIZE:
28 raise ValueError(f"SessionTag must be exactly 32 bytes, got {len(data)}")
29 self._data = data
30
31 def to_bytes(self) -> bytes:
32 """Return the raw 32 bytes."""
33 return self._data
34
35 @classmethod
36 def from_bytes(cls, data: bytes) -> SessionTag:
37 """Construct a SessionTag from exactly 32 bytes."""
38 return cls(data)
39
40 @classmethod
41 def random(cls) -> SessionTag:
42 """Generate a random 32-byte session tag."""
43 return cls(os.urandom(cls.SIZE))
44
45 def __eq__(self, other: object) -> bool:
46 if not isinstance(other, SessionTag):
47 return NotImplemented
48 return self._data == other._data
49
50 def __hash__(self) -> int:
51 return hash(self._data)
52
53 def __repr__(self) -> str:
54 return f"SessionTag({self._data[:4].hex()}...)"
55
56
57class Payload:
58 """Variable-length byte container with a 4-byte big-endian length prefix.
59
60 Wire format: [4 bytes length][length bytes data]
61 """
62
63 __slots__ = ("_data",)
64
65 def __init__(self, data: bytes) -> None:
66 self._data = data
67
68 def to_bytes(self) -> bytes:
69 """Serialize to wire format: 4-byte big-endian length + data."""
70 return struct.pack("!I", len(self._data)) + self._data
71
72 @classmethod
73 def from_bytes(cls, data: bytes) -> Payload:
74 """Deserialize from wire format bytes."""
75 if len(data) < 4:
76 raise ValueError(f"Need at least 4 bytes for length prefix, got {len(data)}")
77 length = struct.unpack("!I", data[:4])[0]
78 if len(data) < 4 + length:
79 raise ValueError(
80 f"Truncated payload: need {length} bytes but only {len(data) - 4} available"
81 )
82 return cls(data[4:4 + length])
83
84 @classmethod
85 def from_stream(cls, stream) -> Payload:
86 """Read a payload from a file-like object."""
87 header = stream.read(4)
88 if len(header) < 4:
89 raise ValueError(f"Truncated header: need 4 bytes, got {len(header)}")
90 length = struct.unpack("!I", header)[0]
91 body = stream.read(length)
92 if len(body) < length:
93 raise ValueError(
94 f"Truncated payload body: need {length} bytes, got {len(body)}"
95 )
96 return cls(body)
97
98 def __len__(self) -> int:
99 return len(self._data)
100
101 def __eq__(self, other: object) -> bool:
102 if not isinstance(other, Payload):
103 return NotImplemented
104 return self._data == other._data
105
106 def __repr__(self) -> str:
107 return f"Payload({len(self._data)} bytes)"