Fast and robust atproto CAR file processing in rust

errors cleanup

not generic over storage anymore, yay

Changed files
+17 -40
src
+11 -25
src/drive.rs
··· 10 10 use tokio::io::AsyncRead; 11 11 12 12 use crate::mst::{Commit, Node}; 13 - use crate::walk::{DiskTrip, Step, Trip, Walker}; 13 + use crate::walk::{Step, Trip, Walker}; 14 14 15 15 /// Errors that can happen while consuming and emitting blocks and records 16 16 #[derive(Debug, thiserror::Error)] ··· 27 27 Tripped(#[from] Trip), 28 28 #[error("CAR file had no roots")] 29 29 MissingRoot, 30 - } 31 - 32 - #[derive(Debug, thiserror::Error)] 33 - pub enum DiskDriveError { 34 - #[error("Error from iroh_car: {0}")] 35 - CarReader(#[from] iroh_car::Error), 36 - #[error("Failed to decode commit block: {0}")] 37 - BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 38 30 #[error("Storage error")] 39 31 StorageError(#[from] rusqlite::Error), 40 - #[error("The Commit block reference by the root was not found")] 41 - MissingCommit, 42 - #[error("The MST block {0} could not be found")] 43 - MissingBlock(Cid), 44 32 #[error("Encode error: {0}")] 45 33 BincodeEncodeError(#[from] bincode::error::EncodeError), 46 34 #[error("Decode error: {0}")] 47 35 BincodeDecodeError(#[from] bincode::error::DecodeError), 48 - #[error("disk tripped: {0}")] 49 - DiskTripped(#[from] DiskTrip), 50 36 } 51 37 52 38 pub trait Processable: Clone + Serialize + DeserializeOwned { ··· 193 179 pub async fn finish_loading( 194 180 mut self, 195 181 mut store: SqliteStore, 196 - ) -> Result<(Commit, BigCarReady<T>), DiskDriveError> { 182 + ) -> Result<(Commit, BigCarReady<T>), DriveError> { 197 183 // set up access for real 198 184 let mut access = store.get_access().await?; 199 185 ··· 210 196 writer.put_many(kvs)?; 211 197 212 198 drop(writer); // cannot outlive access 213 - Ok::<_, DiskDriveError>(access) 199 + Ok::<_, DriveError>(access) 214 200 }) 215 201 .await 216 202 .unwrap()?; ··· 228 214 } 229 215 230 216 drop(writer); // cannot outlive access 231 - Ok::<_, DiskDriveError>(access) 217 + Ok::<_, DriveError>(access) 232 218 }); // await later 233 219 234 220 // dump the rest to disk (in chunks) ··· 274 260 275 261 log::debug!("worker finished."); 276 262 277 - let commit = self.commit.ok_or(DiskDriveError::MissingCommit)?; 263 + let commit = self.commit.ok_or(DriveError::MissingCommit)?; 278 264 279 265 let walker = Walker::new(commit.data); 280 266 ··· 299 285 pub async fn next_chunk( 300 286 mut self, 301 287 n: usize, 302 - ) -> Result<(Self, Option<Vec<(String, T)>>), DiskDriveError> { 288 + ) -> Result<(Self, Option<Vec<(String, T)>>), DriveError> { 303 289 let mut out = Vec::with_capacity(n); 304 290 (self, out) = tokio::task::spawn_blocking(move || { 305 291 let access = self.access; ··· 308 294 for _ in 0..n { 309 295 // walk as far as we can until we run out of blocks or find a record 310 296 match self.walker.disk_step(&mut reader, self.process)? { 311 - Step::Missing(cid) => return Err(DiskDriveError::MissingBlock(cid)), 297 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 312 298 Step::Finish => break, 313 299 Step::Step { rkey, data } => { 314 300 out.push((rkey, data)); ··· 319 305 320 306 drop(reader); // cannot outlive access 321 307 self.access = access; 322 - Ok::<_, DiskDriveError>((self, out)) 308 + Ok::<_, DriveError>((self, out)) 323 309 }) 324 310 .await 325 311 .unwrap()?; // TODO ··· 337 323 ) -> Result< 338 324 ( 339 325 tokio::sync::mpsc::Receiver<Vec<(String, T)>>, 340 - tokio::task::JoinHandle<Result<(), DiskDriveError>>, 326 + tokio::task::JoinHandle<Result<(), DriveError>>, 341 327 ), 342 - DiskDriveError, 328 + DriveError, 343 329 > { 344 330 let (tx, rx) = tokio::sync::mpsc::channel::<Vec<(String, T)>>(1); 345 331 ··· 355 341 for _ in 0..n { 356 342 // walk as far as we can until we run out of blocks or find a record 357 343 match self.walker.disk_step(&mut reader, self.process)? { 358 - Step::Missing(cid) => return Err(DiskDriveError::MissingBlock(cid)), 344 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 359 345 Step::Finish => break, 360 346 Step::Step { rkey, data } => { 361 347 out.push((rkey, data));
+6 -15
src/walk.rs
··· 11 11 /// Errors that can happen while walking 12 12 #[derive(Debug, thiserror::Error)] 13 13 pub enum Trip { 14 - #[error("empty mst nodes are not allowed")] 15 - NodeEmpty, 16 14 #[error("Failed to fingerprint commit block")] 17 15 BadCommitFingerprint, 18 16 #[error("Failed to decode commit block: {0}")] 19 17 BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 20 18 #[error("Action node error: {0}")] 21 19 MstError(#[from] MstError), 22 - #[error("Encountered an rkey out of order while walking the MST")] 23 - RkeyOutOfOrder, 24 - } 25 - 26 - /// Errors that can happen while walking 27 - #[derive(Debug, thiserror::Error)] 28 - pub enum DiskTrip { 29 - #[error("tripped: {0}")] 30 - Trip(#[from] Trip), 31 20 #[error("storage error: {0}")] 32 21 StorageError(#[from] rusqlite::Error), 33 22 #[error("Decode error: {0}")] ··· 49 38 LostDepth, 50 39 #[error("MST depth underflow: depth-0 node with child trees")] 51 40 DepthUnderflow, 41 + #[error("Encountered an rkey out of order while walking the MST")] 42 + RkeyOutOfOrder, 52 43 } 53 44 54 45 /// Walker outputs ··· 232 223 // rkeys *must* be in order or else the tree is invalid (or 233 224 // we have a bug) 234 225 if rkey <= self.prev { 235 - return Err(Trip::RkeyOutOfOrder); 226 + return Err(MstError::RkeyOutOfOrder)?; 236 227 } 237 228 self.prev = rkey.clone(); 238 229 ··· 247 238 &mut self, 248 239 reader: &mut SqliteReader, 249 240 process: impl Fn(Vec<u8>) -> T, 250 - ) -> Result<Step<T>, DiskTrip> { 241 + ) -> Result<Step<T>, Trip> { 251 242 loop { 252 243 let Some(need) = self.stack.last_mut() else { 253 244 log::trace!("tried to walk but we're actually done."); ··· 266 257 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 267 258 268 259 let MaybeProcessedBlock::Raw(data) = block else { 269 - return Err(Trip::BadCommitFingerprint.into()); 260 + return Err(Trip::BadCommitFingerprint); 270 261 }; 271 262 let node = 272 263 serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?; ··· 299 290 // rkeys *must* be in order or else the tree is invalid (or 300 291 // we have a bug) 301 292 if rkey <= self.prev { 302 - return Err(DiskTrip::Trip(Trip::RkeyOutOfOrder)); 293 + return Err(MstError::RkeyOutOfOrder)?; 303 294 } 304 295 self.prev = rkey.clone(); 305 296