A Python port of the Invisible Internet Project (I2P)
1"""Stream utilities — lookahead stream, byte array stream helpers.
2
3Ported from net.i2p.util.LookaheadInputStream, ByteArrayStream.
4"""
5
6import io
7from typing import Optional
8
9
10class LookaheadInputStream:
11 """Stream wrapper that hides the last N bytes (footer).
12
13 The last `lookahead_size` bytes are never returned by read(),
14 but can be retrieved via get_footer() after EOF.
15 """
16
17 def __init__(self, lookahead_size: int) -> None:
18 self._size = lookahead_size
19 self._footer = bytearray(lookahead_size)
20 self._source: io.RawIOBase | io.BufferedIOBase | None = None
21 self._eof_reached = False
22 self._index = 0
23
24 @property
25 def eof_reached(self) -> bool:
26 return self._eof_reached
27
28 def initialize(self, source: io.RawIOBase | io.BufferedIOBase) -> None:
29 """Start with a new source stream. Reads lookahead_size bytes."""
30 self._source = source
31 self._eof_reached = False
32 self._index = 0
33 # Read initial footer
34 data = _read_fully(source, self._size)
35 if len(data) < self._size:
36 raise IOError(f"Could not read {self._size} lookahead bytes, got {len(data)}")
37 self._footer[:] = data
38
39 def read(self, n: int = 1) -> bytes:
40 """Read up to n bytes, keeping footer reserved."""
41 if self._eof_reached or self._source is None:
42 return b""
43 result = bytearray()
44 for _ in range(n):
45 b = self._source.read(1)
46 if not b:
47 self._eof_reached = True
48 break
49 # Rotate: output oldest footer byte, store new byte
50 result.append(self._footer[self._index])
51 self._footer[self._index] = b[0]
52 self._index = (self._index + 1) % self._size
53 return bytes(result)
54
55 def get_footer(self) -> bytes:
56 """Get the footer bytes (last lookahead_size bytes of stream).
57
58 Only valid after EOF has been reached.
59 Returns bytes in correct order (reordered from circular buffer).
60 """
61 # Reorder from circular buffer position
62 out = bytearray(self._size)
63 for i in range(self._size):
64 out[i] = self._footer[(self._index + i) % self._size]
65 return bytes(out)
66
67
68class ByteArrayStream(io.BytesIO):
69 """ByteArrayOutputStream equivalent with zero-copy access.
70
71 Extends BytesIO to provide an as_input_stream() method.
72 """
73
74 def as_input_stream(self) -> io.BytesIO:
75 """Return a read-only BytesIO over the written data. Zero-copy."""
76 data = self.getvalue()
77 return io.BytesIO(data)
78
79
80def _read_fully(stream, length: int) -> bytes:
81 """Read exactly `length` bytes from stream, or fewer if EOF."""
82 data = bytearray()
83 while len(data) < length:
84 chunk = stream.read(length - len(data))
85 if not chunk:
86 break
87 data.extend(chunk)
88 return bytes(data)
89
90
91def read_fully(stream, length: int) -> bytes:
92 """Read exactly `length` bytes from stream. Raises IOError if not enough data."""
93 data = _read_fully(stream, length)
94 if len(data) < length:
95 raise IOError(f"Expected {length} bytes, got {len(data)}")
96 return data
97
98
99def copy_stream(source, dest, buf_size: int = 8192) -> int:
100 """Copy all data from source to dest. Returns bytes copied."""
101 total = 0
102 while True:
103 chunk = source.read(buf_size)
104 if not chunk:
105 break
106 dest.write(chunk)
107 total += len(chunk)
108 return total