Fast and robust atproto CAR file processing in rust

batched walking

Changed files
+129 -92
examples
disk-read-file
src
+1 -1
examples/disk-read-file/main.rs
··· 27 27 28 28 let reader = CarReader::new(reader).await?; 29 29 30 - let redb_store = repo_stream::disk_redb::RedbStore::new(tmpfile)?; 30 + let redb_store = repo_stream::disk_redb::RedbStore::new(tmpfile).await?; 31 31 32 32 let root = reader 33 33 .header()
+50 -22
src/disk_drive.rs
··· 3 3 use std::collections::VecDeque; 4 4 use std::error::Error; 5 5 6 - use crate::disk_walk::{Step, Trip, Walker}; 6 + use crate::disk_walk::{Trip, Walker}; 7 7 use crate::mst::Commit; 8 8 9 9 use ipld_core::cid::Cid; ··· 11 11 12 12 /// Errors that can happen while consuming and emitting blocks and records 13 13 #[derive(Debug, thiserror::Error)] 14 - pub enum DriveError { 14 + pub enum DriveError<E: Error> { 15 15 #[error("Failed to initialize CarReader: {0}")] 16 16 CarReader(#[from] iroh_car::Error), 17 17 #[error("Car block stream error: {0}")] ··· 24 24 MissingBlock(Cid), 25 25 #[error("Failed to walk the mst tree: {0}")] 26 26 Tripped(#[from] Trip), 27 + #[error("whatever: {0}")] 28 + WalkingProblem(#[from] WalkError), 29 + #[error("whatever: {0}")] 30 + Boooooo(String), 31 + #[error("processing error: {0}")] 32 + ProcessingError(E), 27 33 } 28 34 35 + /// Limited subset of errors that can happen while walking 36 + #[derive(Debug, thiserror::Error)] 37 + pub enum WalkError { 38 + #[error("The MST block {0} could not be found")] 39 + MissingBlock(Cid), 40 + #[error("Failed to walk the mst tree: {0}")] 41 + Tripped(#[from] Trip), 42 + } 43 + 44 + /// Storage backend for caching large-repo blocks 45 + /// 46 + /// Since 29 47 pub trait BlockStore<MPB: Serialize + DeserializeOwned> { 30 - fn put_batch(&self, blocks: Vec<(Cid, MPB)>) -> impl std::future::Future<Output = ()> + Send; // unwraps for now 31 - fn get(&self, key: Cid) -> Option<MPB>; 48 + fn put_batch(&self, blocks: Vec<(Cid, MPB)>) -> impl Future<Output = ()> + Send; // unwraps for now 49 + fn walk_batch( 50 + &self, 51 + walker: Walker, 52 + n: usize, 53 + ) -> impl Future<Output = Result<(Walker, Vec<(String, MPB)>), String>>; // boo string error for now because 32 54 } 33 55 34 56 type CarBlock<E> = Result<(Cid, Vec<u8>), E>; ··· 54 76 impl<SE, S, T, BS, P, PE> Vehicle<SE, S, T, BS, P, PE> 55 77 where 56 78 SE: Error + 'static, 57 - S: Stream<Item = CarBlock<SE>> + Unpin, 58 - T: Clone + Serialize + DeserializeOwned, 59 - BS: BlockStore<Vec<u8>>, 60 - P: Fn(&[u8]) -> Result<T, PE>, 79 + S: Stream<Item = CarBlock<SE>> + Unpin + Send, 80 + T: Clone + Serialize + DeserializeOwned + Send, 81 + BS: BlockStore<Vec<u8>> + Send, 82 + P: Fn(&[u8]) -> Result<T, PE> + Send, 61 83 PE: Error, 62 84 { 63 85 /// Set up the stream ··· 82 104 block_stream: S, 83 105 block_store: BS, 84 106 process: P, 85 - ) -> Result<(Commit, Self), DriveError> { 107 + ) -> Result<(Commit, Self), DriveError<PE>> { 86 108 let mut commit = None; 87 109 88 110 log::warn!("init: load blocks"); ··· 127 149 Ok((commit, me)) 128 150 } 129 151 130 - async fn load_chunk(&mut self, n: usize) -> Result<(), DriveError> { 131 - self.out_cache.reserve(n); 132 - for _ in 0..n { 133 - let item = match self.walker.step(&mut self.block_store, &self.process)? { 134 - Step::Step { rkey, data } => (rkey, data), 135 - Step::Finish => break, 136 - Step::Rest(cid) => return Err(DriveError::MissingBlock(cid)), 137 - }; 138 - self.out_cache.push_back(item); 139 - } 152 + async fn load_chunk(&mut self, n: usize) -> Result<(), DriveError<PE>> { 153 + let walker = std::mem::take(&mut self.walker); 154 + let (walker, batch) = self 155 + .block_store 156 + .walk_batch(walker, n) 157 + .await 158 + .map_err(DriveError::Boooooo)?; 159 + self.walker = walker; 160 + 161 + let processed = batch 162 + .into_iter() 163 + .map(|(k, raw)| (self.process)(&raw).map(|t| (k, t))) 164 + .collect::<Result<Vec<_>, _>>() 165 + .map_err(DriveError::ProcessingError)?; 166 + 167 + self.out_cache.extend(processed); 140 168 Ok(()) 141 169 } 142 170 ··· 146 174 /// (but non-zero), even if it's not the last chunk. 147 175 /// 148 176 /// an empty vec will be returned to signal the end. 149 - pub async fn next_chunk(&mut self, n: usize) -> Result<Vec<(String, T)>, DriveError> { 177 + pub async fn next_chunk(&mut self, n: usize) -> Result<Vec<(String, T)>, DriveError<PE>> { 150 178 if self.out_cache.is_empty() { 151 179 self.load_chunk(n).await?; 152 180 } ··· 154 182 } 155 183 156 184 /// Manually step through the record outputs 157 - pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> { 185 + pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> { 158 186 if self.out_cache.is_empty() { 159 187 self.load_chunk(64).await?; // TODO 160 188 } ··· 162 190 } 163 191 164 192 /// Convert to a futures::stream of record outputs 165 - pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError>> { 193 + pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> { 166 194 futures::stream::try_unfold(self, |mut this| async move { 167 195 let maybe_record = this.next_record().await?; 168 196 Ok(maybe_record.map(|b| (b, this)))
+42 -8
src/disk_redb.rs
··· 1 1 use crate::disk_drive::BlockStore; 2 + use crate::disk_walk::{Need, Walker}; 2 3 use ipld_core::cid::Cid; 3 4 use redb::{Database, Durability, Error, ReadableDatabase, TableDefinition}; 4 5 use std::path::Path; ··· 12 13 } 13 14 14 15 impl RedbStore { 15 - pub fn new(path: impl AsRef<Path>) -> Result<Self, Error> { 16 + pub async fn new(path: impl AsRef<Path> + 'static + Send) -> Result<Self, Error> { 16 17 log::warn!("redb new"); 17 - let db = Database::create(path)?; 18 + let db = tokio::task::spawn_blocking(|| Database::create(path)) 19 + .await 20 + .unwrap()?; 18 21 log::warn!("db created"); 19 22 Ok(Self { db: db.into() }) 20 23 } ··· 50 53 .unwrap(); 51 54 } 52 55 53 - fn get(&self, c: Cid) -> Option<Vec<u8>> { 54 - let key_bytes = c.to_bytes(); 55 - let tx = self.db.begin_read().unwrap(); 56 - let table = tx.open_table(TABLE).unwrap(); 57 - let t = table.get(&*key_bytes).unwrap()?.value().to_vec(); 58 - Some(t) 56 + async fn walk_batch( 57 + &self, 58 + mut walker: Walker, 59 + n: usize, 60 + ) -> Result<(Walker, Vec<(String, Vec<u8>)>), String> { 61 + let db = self.db.clone(); 62 + tokio::task::spawn_blocking(move || { 63 + let tx = db.begin_read().unwrap(); 64 + let table = tx.open_table(TABLE).unwrap(); 65 + 66 + let mut out = Vec::with_capacity(n); 67 + loop { 68 + let Some(need) = walker.next_needed() else { 69 + break; 70 + }; 71 + let cid = need.cid(); 72 + let Some(res) = table.get(&*cid.to_bytes()).unwrap() else { 73 + return Err(format!("missing block: {cid:?}")); 74 + }; 75 + let block = res.value(); 76 + 77 + match need { 78 + Need::Node(_) => walker 79 + .handle_node(block) 80 + .map_err(|e| format!("failed to handle mst node: {e}"))?, 81 + Need::Record { rkey, .. } => { 82 + out.push((rkey, block.to_vec())); 83 + if out.len() >= n { 84 + break; 85 + } 86 + } 87 + } 88 + } 89 + Ok((walker, out)) 90 + }) 91 + .await 92 + .unwrap() // tokio join 59 93 } 60 94 }
+36 -61
src/disk_walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::disk_drive::BlockStore; 4 3 use crate::mst::Node; 4 + use std::convert::Infallible; 5 5 6 6 use ipld_core::cid::Cid; 7 7 use serde::{Serialize, de::DeserializeOwned}; 8 - use std::error::Error; 9 8 10 9 /// Errors that can happen while walking 11 10 #[derive(Debug, thiserror::Error)] ··· 13 12 #[error("empty mst nodes are not allowed")] 14 13 NodeEmpty, 15 14 #[error("Failed to decode commit block: {0}")] 16 - BadCommit(Box<dyn std::error::Error>), 15 + BadCommit(serde_ipld_dagcbor::DecodeError<Infallible>), 17 16 #[error("Action node error: {0}")] 18 17 RkeyError(#[from] RkeyError), 19 18 #[error("Process failed: {0}")] ··· 45 44 } 46 45 47 46 #[derive(Debug, Clone, PartialEq)] 48 - enum Need { 47 + pub enum Need { 49 48 Node(Cid), 50 49 Record { rkey: String, cid: Cid }, 51 50 } 52 51 52 + impl Need { 53 + pub fn cid(&self) -> Cid { 54 + match self { 55 + Need::Node(cid) => *cid, 56 + Need::Record { cid, .. } => *cid, 57 + } 58 + } 59 + } 60 + 53 61 fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> { 54 62 let mut entries = Vec::with_capacity(node.entries.len()); 55 63 ··· 84 92 /// Traverser of an atproto MST 85 93 /// 86 94 /// Walks the tree from left-to-right in depth-first order 87 - #[derive(Debug)] 95 + /// 96 + /// (turning into more of a navigator) 97 + /// it doesn't quite feel like the divisions of responsibility are right around 98 + /// here yet. 99 + #[derive(Debug, Default)] 88 100 pub struct Walker { 89 101 stack: Vec<Need>, 90 102 prev: String, ··· 98 110 } 99 111 } 100 112 101 - /// Advance through nodes until we find a record or can't go further 102 - pub fn step<T: Clone + Serialize + DeserializeOwned, E: Error>( 103 - &mut self, 104 - block_store: &mut impl BlockStore<Vec<u8>>, 105 - process: impl Fn(&[u8]) -> Result<T, E>, 106 - ) -> Result<Step<T>, Trip> { 107 - loop { 108 - let Some(mut need) = self.stack.last() else { 109 - log::trace!("tried to walk but we're actually done."); 110 - return Ok(Step::Finish); 111 - }; 113 + pub fn next_needed(&mut self) -> Option<Need> { 114 + self.stack.pop() 115 + // TODO: 116 + // let need = self.stack.pop()?; 117 + // if let Need::Record { ref rkey, .. } = need { 118 + // // rkeys *must* be in order or else the tree is invalid (or 119 + // // we have a bug) 120 + // if *rkey <= self.prev { 121 + // return Err(Trip::RkeyOutOfOrder); 122 + // } 123 + // self.prev = rkey.clone(); 124 + // } 125 + // Some(need) 126 + } 112 127 113 - match &mut need { 114 - Need::Node(cid) => { 115 - log::trace!("need node {cid:?}"); 116 - let Some(block) = block_store.get(*cid) else { 117 - log::trace!("node not found, resting"); 118 - return Ok(Step::Rest(*cid)); 119 - }; 120 - 121 - let node = serde_ipld_dagcbor::from_slice::<Node>(&block) 122 - .map_err(|e| Trip::BadCommit(e.into()))?; 123 - 124 - // found node, make sure we remember 125 - self.stack.pop(); 126 - 127 - // queue up work on the found node next 128 - push_from_node(&mut self.stack, &node)?; 129 - } 130 - Need::Record { rkey, cid } => { 131 - log::trace!("need record {cid:?}"); 132 - let Some(block) = block_store.get(*cid) else { 133 - log::trace!("record block not found, resting"); 134 - return Ok(Step::Rest(*cid)); 135 - }; 136 - let rkey = rkey.clone(); 137 - 138 - let data = process(&block).map_err(|e| Trip::ProcessFailed(e.to_string())); 139 - 140 - // found node, make sure we remember 141 - self.stack.pop(); 142 - 143 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 144 - 145 - let data = data.map_err(|e| Trip::ProcessFailed(e.to_string()))?; 146 - 147 - // rkeys *must* be in order or else the tree is invalid (or 148 - // we have a bug) 149 - if rkey <= self.prev { 150 - return Err(Trip::RkeyOutOfOrder); 151 - } 152 - self.prev = rkey.clone(); 153 - 154 - return Ok(Step::Step { rkey, data }); 155 - } 156 - } 157 - } 128 + /// hacky: this must be called after next_needed if it was a node 129 + pub fn handle_node(&mut self, block: &[u8]) -> Result<(), Trip> { 130 + let node = serde_ipld_dagcbor::from_slice::<Node>(block).map_err(Trip::BadCommit)?; 131 + push_from_node(&mut self.stack, &node)?; 132 + Ok(()) 158 133 } 159 134 } 160 135