"""Stream packet types — wire format for I2P streaming protocol.""" import struct class Flags: """Stream packet flag constants.""" SYNCHRONIZE = 0x0001 CLOSE = 0x0002 RESET = 0x0004 SIGNATURE_INCLUDED = 0x0008 SIGNATURE_REQUESTED = 0x0010 FROM_INCLUDED = 0x0020 DELAY_REQUESTED = 0x0040 MAX_PACKET_SIZE_INCLUDED = 0x0080 PROFILE_INTERACTIVE = 0x0100 ECHO = 0x0200 NO_ACK = 0x0400 class StreamPacket: """I2P streaming protocol packet. Wire format: send_id(4) + recv_id(4) + seq_num(4) + ack_through(4) + nack_count(1) + nack_list(nack_count*4) + resend_delay(1) + flags(2) + option_size(2) + options + payload """ def __init__(self, send_id: int, recv_id: int, seq_num: int, ack_through: int, flags: int, payload: bytes = b"", nacks: list[int] | None = None, resend_delay: int = 0, options: bytes = b""): self.send_id = send_id self.recv_id = recv_id self.seq_num = seq_num self.ack_through = ack_through self.nacks = nacks or [] self.resend_delay = resend_delay self.flags = flags self.options = options self.payload = payload def to_bytes(self) -> bytes: parts = [ struct.pack("!IIII", self.send_id, self.recv_id, self.seq_num, self.ack_through), struct.pack("!B", len(self.nacks)), ] for nack in self.nacks: parts.append(struct.pack("!I", nack)) parts.append(struct.pack("!BHH", self.resend_delay, self.flags, len(self.options))) parts.append(self.options) parts.append(self.payload) return b"".join(parts) @classmethod def from_bytes(cls, data: bytes) -> "StreamPacket": send_id, recv_id, seq_num, ack_through = struct.unpack("!IIII", data[:16]) offset = 16 nack_count = data[offset] offset += 1 nacks = [] for _ in range(nack_count): nacks.append(struct.unpack("!I", data[offset:offset + 4])[0]) offset += 4 resend_delay, flags, option_size = struct.unpack("!BHH", data[offset:offset + 5]) offset += 5 options = data[offset:offset + option_size] offset += option_size payload = data[offset:] return cls(send_id, recv_id, seq_num, ack_through, flags, payload, nacks, resend_delay, options)