Streaming Tree ARchive format
1use crate::error::Result;
2use crate::parser::StarParser;
3use crate::types::StarItem;
4use cid::Cid;
5use std::collections::VecDeque;
6use std::io::Read;
7
8pub struct StarIterator<R> {
9 reader: R,
10 parser: StarParser,
11 buffer: VecDeque<u8>,
12}
13
14impl<R: Read> StarIterator<R> {
15 pub fn new(reader: R) -> Self {
16 Self {
17 reader,
18 parser: StarParser::new(),
19 buffer: VecDeque::new(),
20 }
21 }
22
23 /// Returns an iterator over all items (Nodes, Records, Header) in the tree (order preserved).
24 /// This is the identity iterator.
25 pub fn iter_tree(self) -> Self {
26 self
27 }
28
29 /// Returns an iterator over triples of (key, cid, Option<record>).
30 /// Skips Commit header and MST Nodes.
31 pub fn iter(self) -> impl Iterator<Item = Result<(Vec<u8>, Cid, Option<Vec<u8>>)>> {
32 self.filter_map(|res| match res {
33 Ok(StarItem::Record { key, cid, content }) => Some(Ok((key, cid, content))),
34 Ok(_) => None,
35 Err(e) => Some(Err(e)),
36 })
37 }
38
39 /// Returns an iterator over (key, cid) pairs.
40 pub fn iter_keys(self) -> impl Iterator<Item = Result<(Vec<u8>, Cid)>> {
41 self.filter_map(|res| match res {
42 Ok(StarItem::Record { key, cid, .. }) => Some(Ok((key, cid))),
43 Ok(_) => None,
44 Err(e) => Some(Err(e)),
45 })
46 }
47}
48
49impl<R: Read> Iterator for StarIterator<R> {
50 type Item = Result<StarItem>;
51
52 fn next(&mut self) -> Option<Self::Item> {
53 loop {
54 // VecDeque must be contiguous to be parsed as a slice.
55 // make_contiguous() allows us to get a single slice, but might involve a memory copy/move
56 // if the buffer is wrapped. This is amortized O(1) in many cases but can be O(N).
57 // However, it solves the "head removal" problem perfectly (pop_front is O(1)).
58
59 self.buffer.make_contiguous();
60 let (slice, _) = self.buffer.as_slices();
61
62 match self.parser.parse(slice) {
63 Ok((consumed, Some(item))) => {
64 self.buffer.drain(..consumed);
65 return Some(Ok(item));
66 }
67 Ok((consumed, None)) => {
68 self.buffer.drain(..consumed);
69
70 // Read more
71 // We can't read directly into VecDeque easily without temporary buffer
72 // because it doesn't expose a mutable slice to uninitialized memory.
73 let mut temp = [0u8; 8192];
74 match self.reader.read(&mut temp) {
75 Ok(0) => return None, // EOF
76 Ok(n) => self.buffer.extend(&temp[..n]),
77 Err(e) => return Some(Err(e.into())),
78 }
79 }
80 Err(e) => return Some(Err(e)),
81 }
82 }
83 }
84}