"""Stream utilities — lookahead stream, byte array stream helpers. Ported from net.i2p.util.LookaheadInputStream, ByteArrayStream. """ import io from typing import Optional class LookaheadInputStream: """Stream wrapper that hides the last N bytes (footer). The last `lookahead_size` bytes are never returned by read(), but can be retrieved via get_footer() after EOF. """ def __init__(self, lookahead_size: int) -> None: self._size = lookahead_size self._footer = bytearray(lookahead_size) self._source: io.RawIOBase | io.BufferedIOBase | None = None self._eof_reached = False self._index = 0 @property def eof_reached(self) -> bool: return self._eof_reached def initialize(self, source: io.RawIOBase | io.BufferedIOBase) -> None: """Start with a new source stream. Reads lookahead_size bytes.""" self._source = source self._eof_reached = False self._index = 0 # Read initial footer data = _read_fully(source, self._size) if len(data) < self._size: raise IOError(f"Could not read {self._size} lookahead bytes, got {len(data)}") self._footer[:] = data def read(self, n: int = 1) -> bytes: """Read up to n bytes, keeping footer reserved.""" if self._eof_reached or self._source is None: return b"" result = bytearray() for _ in range(n): b = self._source.read(1) if not b: self._eof_reached = True break # Rotate: output oldest footer byte, store new byte result.append(self._footer[self._index]) self._footer[self._index] = b[0] self._index = (self._index + 1) % self._size return bytes(result) def get_footer(self) -> bytes: """Get the footer bytes (last lookahead_size bytes of stream). Only valid after EOF has been reached. Returns bytes in correct order (reordered from circular buffer). """ # Reorder from circular buffer position out = bytearray(self._size) for i in range(self._size): out[i] = self._footer[(self._index + i) % self._size] return bytes(out) class ByteArrayStream(io.BytesIO): """ByteArrayOutputStream equivalent with zero-copy access. Extends BytesIO to provide an as_input_stream() method. """ def as_input_stream(self) -> io.BytesIO: """Return a read-only BytesIO over the written data. Zero-copy.""" data = self.getvalue() return io.BytesIO(data) def _read_fully(stream, length: int) -> bytes: """Read exactly `length` bytes from stream, or fewer if EOF.""" data = bytearray() while len(data) < length: chunk = stream.read(length - len(data)) if not chunk: break data.extend(chunk) return bytes(data) def read_fully(stream, length: int) -> bytes: """Read exactly `length` bytes from stream. Raises IOError if not enough data.""" data = _read_fully(stream, length) if len(data) < length: raise IOError(f"Expected {length} bytes, got {len(data)}") return data def copy_stream(source, dest, buf_size: int = 8192) -> int: """Copy all data from source to dest. Returns bytes copied.""" total = 0 while True: chunk = source.read(buf_size) if not chunk: break dest.write(chunk) total += len(chunk) return total