Streaming Tree ARchive format
at rust-impl 84 lines 2.9 kB view raw
1use crate::error::Result; 2use crate::parser::StarParser; 3use crate::types::StarItem; 4use cid::Cid; 5use std::collections::VecDeque; 6use std::io::Read; 7 8pub struct StarIterator<R> { 9 reader: R, 10 parser: StarParser, 11 buffer: VecDeque<u8>, 12} 13 14impl<R: Read> StarIterator<R> { 15 pub fn new(reader: R) -> Self { 16 Self { 17 reader, 18 parser: StarParser::new(), 19 buffer: VecDeque::new(), 20 } 21 } 22 23 /// Returns an iterator over all items (Nodes, Records, Header) in the tree (order preserved). 24 /// This is the identity iterator. 25 pub fn iter_tree(self) -> Self { 26 self 27 } 28 29 /// Returns an iterator over triples of (key, cid, Option<record>). 30 /// Skips Commit header and MST Nodes. 31 pub fn iter(self) -> impl Iterator<Item = Result<(Vec<u8>, Cid, Option<Vec<u8>>)>> { 32 self.filter_map(|res| match res { 33 Ok(StarItem::Record { key, cid, content }) => Some(Ok((key, cid, content))), 34 Ok(_) => None, 35 Err(e) => Some(Err(e)), 36 }) 37 } 38 39 /// Returns an iterator over (key, cid) pairs. 40 pub fn iter_keys(self) -> impl Iterator<Item = Result<(Vec<u8>, Cid)>> { 41 self.filter_map(|res| match res { 42 Ok(StarItem::Record { key, cid, .. }) => Some(Ok((key, cid))), 43 Ok(_) => None, 44 Err(e) => Some(Err(e)), 45 }) 46 } 47} 48 49impl<R: Read> Iterator for StarIterator<R> { 50 type Item = Result<StarItem>; 51 52 fn next(&mut self) -> Option<Self::Item> { 53 loop { 54 // VecDeque must be contiguous to be parsed as a slice. 55 // make_contiguous() allows us to get a single slice, but might involve a memory copy/move 56 // if the buffer is wrapped. This is amortized O(1) in many cases but can be O(N). 57 // However, it solves the "head removal" problem perfectly (pop_front is O(1)). 58 59 self.buffer.make_contiguous(); 60 let (slice, _) = self.buffer.as_slices(); 61 62 match self.parser.parse(slice) { 63 Ok((consumed, Some(item))) => { 64 self.buffer.drain(..consumed); 65 return Some(Ok(item)); 66 } 67 Ok((consumed, None)) => { 68 self.buffer.drain(..consumed); 69 70 // Read more 71 // We can't read directly into VecDeque easily without temporary buffer 72 // because it doesn't expose a mutable slice to uninitialized memory. 73 let mut temp = [0u8; 8192]; 74 match self.reader.read(&mut temp) { 75 Ok(0) => return None, // EOF 76 Ok(n) => self.buffer.extend(&temp[..n]), 77 Err(e) => return Some(Err(e.into())), 78 } 79 } 80 Err(e) => return Some(Err(e)), 81 } 82 } 83 } 84}