A Python port of the Invisible Internet Project (I2P)
at main 107 lines 3.3 kB view raw
1"""SessionTag and Payload — session-layer data structures. 2 3Ported from net.i2p.data.SessionTag and I2P payload wire format. 4 5SessionTag: 32-byte random tag used in AES+SessionTag sessions. 6Payload: Variable-length byte container with 4-byte big-endian length prefix. 7""" 8 9from __future__ import annotations 10 11import os 12import struct 13 14 15class SessionTag: 16 """A 32-byte random tag used in AES+SessionTag encrypted sessions. 17 18 Each tag is single-use and identifies which session key to use 19 when decrypting an incoming message. 20 """ 21 22 __slots__ = ("_data",) 23 24 SIZE = 32 25 26 def __init__(self, data: bytes) -> None: 27 if len(data) != self.SIZE: 28 raise ValueError(f"SessionTag must be exactly 32 bytes, got {len(data)}") 29 self._data = data 30 31 def to_bytes(self) -> bytes: 32 """Return the raw 32 bytes.""" 33 return self._data 34 35 @classmethod 36 def from_bytes(cls, data: bytes) -> SessionTag: 37 """Construct a SessionTag from exactly 32 bytes.""" 38 return cls(data) 39 40 @classmethod 41 def random(cls) -> SessionTag: 42 """Generate a random 32-byte session tag.""" 43 return cls(os.urandom(cls.SIZE)) 44 45 def __eq__(self, other: object) -> bool: 46 if not isinstance(other, SessionTag): 47 return NotImplemented 48 return self._data == other._data 49 50 def __hash__(self) -> int: 51 return hash(self._data) 52 53 def __repr__(self) -> str: 54 return f"SessionTag({self._data[:4].hex()}...)" 55 56 57class Payload: 58 """Variable-length byte container with a 4-byte big-endian length prefix. 59 60 Wire format: [4 bytes length][length bytes data] 61 """ 62 63 __slots__ = ("_data",) 64 65 def __init__(self, data: bytes) -> None: 66 self._data = data 67 68 def to_bytes(self) -> bytes: 69 """Serialize to wire format: 4-byte big-endian length + data.""" 70 return struct.pack("!I", len(self._data)) + self._data 71 72 @classmethod 73 def from_bytes(cls, data: bytes) -> Payload: 74 """Deserialize from wire format bytes.""" 75 if len(data) < 4: 76 raise ValueError(f"Need at least 4 bytes for length prefix, got {len(data)}") 77 length = struct.unpack("!I", data[:4])[0] 78 if len(data) < 4 + length: 79 raise ValueError( 80 f"Truncated payload: need {length} bytes but only {len(data) - 4} available" 81 ) 82 return cls(data[4:4 + length]) 83 84 @classmethod 85 def from_stream(cls, stream) -> Payload: 86 """Read a payload from a file-like object.""" 87 header = stream.read(4) 88 if len(header) < 4: 89 raise ValueError(f"Truncated header: need 4 bytes, got {len(header)}") 90 length = struct.unpack("!I", header)[0] 91 body = stream.read(length) 92 if len(body) < length: 93 raise ValueError( 94 f"Truncated payload body: need {length} bytes, got {len(body)}" 95 ) 96 return cls(body) 97 98 def __len__(self) -> int: 99 return len(self._data) 100 101 def __eq__(self, other: object) -> bool: 102 if not isinstance(other, Payload): 103 return NotImplemented 104 return self._data == other._data 105 106 def __repr__(self) -> str: 107 return f"Payload({len(self._data)} bytes)"