A Python port of the Invisible Internet Project (I2P)
at main 108 lines 3.5 kB view raw
1"""Stream utilities — lookahead stream, byte array stream helpers. 2 3Ported from net.i2p.util.LookaheadInputStream, ByteArrayStream. 4""" 5 6import io 7from typing import Optional 8 9 10class LookaheadInputStream: 11 """Stream wrapper that hides the last N bytes (footer). 12 13 The last `lookahead_size` bytes are never returned by read(), 14 but can be retrieved via get_footer() after EOF. 15 """ 16 17 def __init__(self, lookahead_size: int) -> None: 18 self._size = lookahead_size 19 self._footer = bytearray(lookahead_size) 20 self._source: io.RawIOBase | io.BufferedIOBase | None = None 21 self._eof_reached = False 22 self._index = 0 23 24 @property 25 def eof_reached(self) -> bool: 26 return self._eof_reached 27 28 def initialize(self, source: io.RawIOBase | io.BufferedIOBase) -> None: 29 """Start with a new source stream. Reads lookahead_size bytes.""" 30 self._source = source 31 self._eof_reached = False 32 self._index = 0 33 # Read initial footer 34 data = _read_fully(source, self._size) 35 if len(data) < self._size: 36 raise IOError(f"Could not read {self._size} lookahead bytes, got {len(data)}") 37 self._footer[:] = data 38 39 def read(self, n: int = 1) -> bytes: 40 """Read up to n bytes, keeping footer reserved.""" 41 if self._eof_reached or self._source is None: 42 return b"" 43 result = bytearray() 44 for _ in range(n): 45 b = self._source.read(1) 46 if not b: 47 self._eof_reached = True 48 break 49 # Rotate: output oldest footer byte, store new byte 50 result.append(self._footer[self._index]) 51 self._footer[self._index] = b[0] 52 self._index = (self._index + 1) % self._size 53 return bytes(result) 54 55 def get_footer(self) -> bytes: 56 """Get the footer bytes (last lookahead_size bytes of stream). 57 58 Only valid after EOF has been reached. 59 Returns bytes in correct order (reordered from circular buffer). 60 """ 61 # Reorder from circular buffer position 62 out = bytearray(self._size) 63 for i in range(self._size): 64 out[i] = self._footer[(self._index + i) % self._size] 65 return bytes(out) 66 67 68class ByteArrayStream(io.BytesIO): 69 """ByteArrayOutputStream equivalent with zero-copy access. 70 71 Extends BytesIO to provide an as_input_stream() method. 72 """ 73 74 def as_input_stream(self) -> io.BytesIO: 75 """Return a read-only BytesIO over the written data. Zero-copy.""" 76 data = self.getvalue() 77 return io.BytesIO(data) 78 79 80def _read_fully(stream, length: int) -> bytes: 81 """Read exactly `length` bytes from stream, or fewer if EOF.""" 82 data = bytearray() 83 while len(data) < length: 84 chunk = stream.read(length - len(data)) 85 if not chunk: 86 break 87 data.extend(chunk) 88 return bytes(data) 89 90 91def read_fully(stream, length: int) -> bytes: 92 """Read exactly `length` bytes from stream. Raises IOError if not enough data.""" 93 data = _read_fully(stream, length) 94 if len(data) < length: 95 raise IOError(f"Expected {length} bytes, got {len(data)}") 96 return data 97 98 99def copy_stream(source, dest, buf_size: int = 8192) -> int: 100 """Copy all data from source to dest. Returns bytes copied.""" 101 total = 0 102 while True: 103 chunk = source.read(buf_size) 104 if not chunk: 105 break 106 dest.write(chunk) 107 total += len(chunk) 108 return total