Fast and robust atproto CAR file processing in rust

clean up error handling

Changed files
+33 -28
src
+5 -3
src/disk.rs
··· 1 use rusqlite::OptionalExtension; 2 use std::path::PathBuf; 3 ··· 81 } 82 pub fn put_many( 83 &mut self, 84 - kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>, 85 - ) -> rusqlite::Result<()> { 86 let tx = self.tx.as_ref().unwrap(); 87 let mut insert_stmt = tx.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 88 - for (k, v) in kv { 89 insert_stmt.execute((k, v))?; 90 } 91 Ok(())
··· 1 + use crate::drive::DriveError; 2 use rusqlite::OptionalExtension; 3 use std::path::PathBuf; 4 ··· 82 } 83 pub fn put_many( 84 &mut self, 85 + kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 86 + ) -> Result<(), DriveError> { 87 let tx = self.tx.as_ref().unwrap(); 88 let mut insert_stmt = tx.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 89 + for pair in kv { 90 + let (k, v) = pair?; 91 insert_stmt.execute((k, v))?; 92 } 93 Ok(())
+16 -12
src/drive.rs
··· 10 use tokio::io::AsyncRead; 11 12 use crate::mst::{Commit, Node}; 13 - use crate::walk::{Step, Trip, Walker}; 14 15 /// Errors that can happen while consuming and emitting blocks and records 16 #[derive(Debug, thiserror::Error)] ··· 24 #[error("The MST block {0} could not be found")] 25 MissingBlock(Cid), 26 #[error("Failed to walk the mst tree: {0}")] 27 - Tripped(#[from] Trip), 28 #[error("CAR file had no roots")] 29 MissingRoot, 30 #[error("Storage error")] ··· 33 BincodeEncodeError(#[from] bincode::error::EncodeError), 34 #[error("Decode error: {0}")] 35 BincodeDecodeError(#[from] bincode::error::DecodeError), 36 } 37 38 pub trait Processable: Clone + Serialize + DeserializeOwned { ··· 118 } 119 120 // remaining possible types: node, record, other. optimistically process 121 - // TODO: get the actual in-memory size to compute disk spill 122 let maybe_processed = if Node::could_be(&data) { 123 MaybeProcessedBlock::Raw(data) 124 } else { ··· 191 let kvs = self 192 .mem_blocks 193 .into_iter() 194 - .map(|(k, v)| (k.to_bytes(), encode(v).unwrap())); 195 196 writer.put_many(kvs)?; 197 198 drop(writer); // cannot outlive access 199 Ok::<_, DriveError>(access) 200 }) 201 - .await 202 - .unwrap()?; 203 204 let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 205 ··· 209 while let Some(chunk) = rx.blocking_recv() { 210 let kvs = chunk 211 .into_iter() 212 - .map(|(k, v)| (k.to_bytes(), encode(v).unwrap())); 213 writer.put_many(kvs)?; 214 } 215 ··· 251 if chunk.is_empty() { 252 break; 253 } 254 - tx.send(chunk).await.unwrap(); 255 } 256 drop(tx); 257 log::debug!("done. waiting for worker to finish..."); 258 259 - access = access_worker.await.unwrap()?; 260 261 log::debug!("worker finished."); 262 ··· 307 self.access = access; 308 Ok::<_, DriveError>((self, out)) 309 }) 310 - .await 311 - .unwrap()?; // TODO 312 313 if out.is_empty() { 314 Ok((self, None)) ··· 353 if out.is_empty() { 354 break; 355 } 356 - tx.blocking_send(out).unwrap(); 357 } 358 359 drop(reader); // cannot outlive access
··· 10 use tokio::io::AsyncRead; 11 12 use crate::mst::{Commit, Node}; 13 + use crate::walk::{Step, WalkError, Walker}; 14 15 /// Errors that can happen while consuming and emitting blocks and records 16 #[derive(Debug, thiserror::Error)] ··· 24 #[error("The MST block {0} could not be found")] 25 MissingBlock(Cid), 26 #[error("Failed to walk the mst tree: {0}")] 27 + WalkError(#[from] WalkError), 28 #[error("CAR file had no roots")] 29 MissingRoot, 30 #[error("Storage error")] ··· 33 BincodeEncodeError(#[from] bincode::error::EncodeError), 34 #[error("Decode error: {0}")] 35 BincodeDecodeError(#[from] bincode::error::DecodeError), 36 + #[error("Tried to send on a closed channel")] 37 + ChannelSendError, // SendError takes <T> which we don't need 38 + #[error("Failed to join a task: {0}")] 39 + JoinError(#[from] tokio::task::JoinError), 40 } 41 42 pub trait Processable: Clone + Serialize + DeserializeOwned { ··· 122 } 123 124 // remaining possible types: node, record, other. optimistically process 125 let maybe_processed = if Node::could_be(&data) { 126 MaybeProcessedBlock::Raw(data) 127 } else { ··· 194 let kvs = self 195 .mem_blocks 196 .into_iter() 197 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 198 199 writer.put_many(kvs)?; 200 201 drop(writer); // cannot outlive access 202 Ok::<_, DriveError>(access) 203 }) 204 + .await??; 205 206 let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 207 ··· 211 while let Some(chunk) = rx.blocking_recv() { 212 let kvs = chunk 213 .into_iter() 214 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 215 writer.put_many(kvs)?; 216 } 217 ··· 253 if chunk.is_empty() { 254 break; 255 } 256 + tx.send(chunk) 257 + .await 258 + .map_err(|_| DriveError::ChannelSendError)?; 259 } 260 drop(tx); 261 log::debug!("done. waiting for worker to finish..."); 262 263 + access = access_worker.await??; 264 265 log::debug!("worker finished."); 266 ··· 311 self.access = access; 312 Ok::<_, DriveError>((self, out)) 313 }) 314 + .await??; 315 316 if out.is_empty() { 317 Ok((self, None)) ··· 356 if out.is_empty() { 357 break; 358 } 359 + tx.blocking_send(out) 360 + .map_err(|_| DriveError::ChannelSendError)?; 361 } 362 363 drop(reader); // cannot outlive access
+12 -13
src/walk.rs
··· 10 11 /// Errors that can happen while walking 12 #[derive(Debug, thiserror::Error)] 13 - pub enum Trip { 14 #[error("Failed to fingerprint commit block")] 15 BadCommitFingerprint, 16 #[error("Failed to decode commit block: {0}")] ··· 176 &mut self, 177 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 178 process: impl Fn(Vec<u8>) -> T, 179 - ) -> Result<Step<T>, Trip> { 180 loop { 181 let Some(need) = self.stack.last_mut() else { 182 log::trace!("tried to walk but we're actually done."); ··· 192 }; 193 194 let MaybeProcessedBlock::Raw(data) = block else { 195 - return Err(Trip::BadCommitFingerprint); 196 }; 197 - let node = 198 - serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?; 199 200 // found node, make sure we remember 201 self.stack.pop(); ··· 205 } 206 Need::Record { rkey, cid } => { 207 log::trace!("need record {cid:?}"); 208 let Some(data) = blocks.get_mut(cid) else { 209 - log::trace!("record block not found, resting"); 210 return Ok(Step::Missing(*cid)); 211 }; 212 let rkey = rkey.clone(); ··· 218 // found node, make sure we remember 219 self.stack.pop(); 220 221 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 222 - 223 // rkeys *must* be in order or else the tree is invalid (or 224 // we have a bug) 225 if rkey <= self.prev { ··· 238 &mut self, 239 reader: &mut SqliteReader, 240 process: impl Fn(Vec<u8>) -> T, 241 - ) -> Result<Step<T>, Trip> { 242 loop { 243 let Some(need) = self.stack.last_mut() else { 244 log::trace!("tried to walk but we're actually done."); ··· 257 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 258 259 let MaybeProcessedBlock::Raw(data) = block else { 260 - return Err(Trip::BadCommitFingerprint); 261 }; 262 - let node = 263 - serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?; 264 265 // found node, make sure we remember 266 self.stack.pop(); 267 268 // queue up work on the found node next 269 - push_from_node(&mut self.stack, &node, depth).map_err(Trip::MstError)?; 270 } 271 Need::Record { rkey, cid } => { 272 log::trace!("need record {cid:?}");
··· 10 11 /// Errors that can happen while walking 12 #[derive(Debug, thiserror::Error)] 13 + pub enum WalkError { 14 #[error("Failed to fingerprint commit block")] 15 BadCommitFingerprint, 16 #[error("Failed to decode commit block: {0}")] ··· 176 &mut self, 177 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 178 process: impl Fn(Vec<u8>) -> T, 179 + ) -> Result<Step<T>, WalkError> { 180 loop { 181 let Some(need) = self.stack.last_mut() else { 182 log::trace!("tried to walk but we're actually done."); ··· 192 }; 193 194 let MaybeProcessedBlock::Raw(data) = block else { 195 + return Err(WalkError::BadCommitFingerprint); 196 }; 197 + let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 198 + .map_err(WalkError::BadCommit)?; 199 200 // found node, make sure we remember 201 self.stack.pop(); ··· 205 } 206 Need::Record { rkey, cid } => { 207 log::trace!("need record {cid:?}"); 208 + // note that we cannot *remove* a record block, sadly, since 209 + // there can be multiple rkeys pointing to the same cid. 210 let Some(data) = blocks.get_mut(cid) else { 211 return Ok(Step::Missing(*cid)); 212 }; 213 let rkey = rkey.clone(); ··· 219 // found node, make sure we remember 220 self.stack.pop(); 221 222 // rkeys *must* be in order or else the tree is invalid (or 223 // we have a bug) 224 if rkey <= self.prev { ··· 237 &mut self, 238 reader: &mut SqliteReader, 239 process: impl Fn(Vec<u8>) -> T, 240 + ) -> Result<Step<T>, WalkError> { 241 loop { 242 let Some(need) = self.stack.last_mut() else { 243 log::trace!("tried to walk but we're actually done."); ··· 256 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 257 258 let MaybeProcessedBlock::Raw(data) = block else { 259 + return Err(WalkError::BadCommitFingerprint); 260 }; 261 + let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 262 + .map_err(WalkError::BadCommit)?; 263 264 // found node, make sure we remember 265 self.stack.pop(); 266 267 // queue up work on the found node next 268 + push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 269 } 270 Need::Record { rkey, cid } => { 271 log::trace!("need record {cid:?}");