Fast and robust atproto CAR file processing in rust

yep repros

Changed files
+34 -153
examples
disk-read-file
src
+5 -8
examples/disk-read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{DiskBuilder, Driver}; 7 + use repo_stream::{DiskStore, load_car}; 8 8 use std::path::PathBuf; 9 9 use std::time::Instant; 10 10 ··· 30 30 log::info!("hello! reading the car..."); 31 31 let t0 = Instant::now(); 32 32 33 + // set up a disk store we can spill to 34 + let disk_store = DiskStore::new(tmpfile).await?; 35 + 33 36 // in this example we only bother handling CARs that are too big for memory 34 37 // `noop` helper means: do no block processing, store the raw blocks 35 - let paused = Driver::load_car(reader).await?; 36 - 37 - // set up a disk store we can spill to 38 - let disk_store = DiskBuilder::new().open(tmpfile).await?; 39 - 40 - // do the spilling, get back a driver 41 - let mut store = paused.finish_loading(disk_store).await?; 38 + let mut store = load_car(reader, disk_store).await?; 42 39 43 40 // at this point you might want to fetch the account's signing key 44 41 // via the DID from the commit, and then verify the signature.
+2 -56
src/disk.rs
··· 1 - /*! 2 - Disk storage for blocks on disk 3 - 4 - Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5 - to be the best behaved in terms of both on-disk space usage and memory usage. 6 - 7 - ```no_run 8 - # use repo_stream::{DiskBuilder, DiskError}; 9 - # #[tokio::main] 10 - # async fn main() -> Result<(), DiskError> { 11 - let store = DiskBuilder::new() 12 - .with_cache_size_mb(32) 13 - .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 - .open("/some/path.db".into()).await?; 15 - # Ok(()) 16 - # } 17 - ``` 18 - */ 19 - 20 1 use crate::drive::DriveError; 21 2 use fjall::{Database, Keyspace, KeyspaceCreateOptions, Error as FjallError}; 22 3 use std::path::PathBuf; ··· 30 11 /// sqlite bits) 31 12 #[error(transparent)] 32 13 DbError(#[from] FjallError), 33 - /// A tokio blocking task failed to join 34 - #[error("Failed to join a tokio blocking task: {0}")] 35 - JoinError(#[from] tokio::task::JoinError), 36 - /// The total size of stored blocks exceeded the allowed size 37 - /// 38 - /// If you need to process *really* big CARs, you can configure a higher 39 - /// limit. 40 - #[error("Maximum disk size reached")] 41 - MaxSizeExceeded, 42 - } 43 - 44 - /// Builder-style disk store setup 45 - #[derive(Debug, Clone)] 46 - pub struct DiskBuilder {} 47 - 48 - impl Default for DiskBuilder { 49 - fn default() -> Self { 50 - Self {} 51 - } 52 - } 53 - 54 - impl DiskBuilder { 55 - /// Begin configuring the storage with defaults 56 - pub fn new() -> Self { 57 - Default::default() 58 - } 59 - /// Open and initialize the actual disk storage 60 - pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 61 - DiskStore::new(path).await 62 - } 63 14 } 64 15 65 16 /// On-disk block storage ··· 76 27 pub async fn new( 77 28 path: PathBuf, 78 29 ) -> Result<Self, DiskError> { 79 - let (db, ks) = tokio::task::spawn_blocking(move || { 80 - let db = Database::builder(path).open()?; 81 - let ks = db.keyspace("z", KeyspaceCreateOptions::default)?; 82 - 83 - Ok::<_, DiskError>((db, ks)) 84 - }) 85 - .await??; 30 + let db = Database::builder(path).open()?; 31 + let ks = db.keyspace("z", KeyspaceCreateOptions::default)?; 86 32 87 33 Ok(Self { 88 34 db,
+25 -87
src/drive.rs
··· 1 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 3 use crate::disk::{DiskError, DiskStore}; 4 - use ipld_core::cid::Cid; 5 4 use iroh_car::CarReader; 6 5 use std::collections::HashMap; 7 - use std::convert::Infallible; 8 6 use tokio::io::AsyncRead; 9 7 10 8 ··· 13 11 pub enum DriveError { 14 12 #[error("Error from iroh_car: {0}")] 15 13 CarReader(#[from] iroh_car::Error), 16 - #[error("Failed to decode commit block: {0}")] 17 - BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 18 - #[error("The Commit block reference by the root was not found")] 19 - MissingCommit, 20 - #[error("The MST block {0} could not be found")] 21 - MissingBlock(Cid), 22 - #[error("CAR file had no roots")] 23 - MissingRoot, 24 14 #[error("Storage error")] 25 15 StorageError(#[from] DiskError), 26 - #[error("Encode error: {0}")] 27 - BincodeEncodeError(#[from] bincode::error::EncodeError), 28 - #[error("Tried to send on a closed channel")] 29 - ChannelSendError, // SendError takes <T> which we don't need 30 - #[error("Failed to join a task: {0}")] 31 - JoinError(#[from] tokio::task::JoinError), 32 16 } 33 17 34 - #[derive(Debug, thiserror::Error)] 35 - pub enum DecodeError { 36 - #[error(transparent)] 37 - BincodeDecodeError(#[from] bincode::error::DecodeError), 38 - #[error("extra bytes remained after decoding")] 39 - ExtraGarbage, 40 - } 41 18 42 - /// An in-order chunk of Rkey + (processed) Block pairs 43 - pub type BlockChunk<T> = Vec<(String, T)>; 44 - 45 - /// Read a CAR file, buffering blocks in memory or to disk 46 - pub enum Driver<R: AsyncRead + Unpin> { 47 - /// Blocks exceed the memory limit 48 - /// 49 - /// You'll need to provide a disk storage to continue. The commit will be 50 - /// returned and can be validated only once all blocks are loaded. 51 - Disk(NeedDisk<R>), 52 - } 53 - 54 - impl<R: AsyncRead + Unpin> Driver<R> { 55 - /// Begin processing an atproto MST from a CAR file 56 - /// 57 - /// Blocks will be loaded, processed, and buffered in memory. If the entire 58 - /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 59 - /// will be returned along with a `Commit` ready for validation. 60 - /// 61 - /// If the `mem_limit_mb` limit is reached before loading all blocks, the 62 - /// partial state will be returned as `Driver::Disk(needed)`, which can be 63 - /// resumed by providing a `SqliteStorage` for on-disk block storage. 64 - pub async fn load_car( 65 - reader: R, 66 - ) -> Result<NeedDisk<R>, DriveError> { 67 - let mut mem_blocks = HashMap::new(); 19 + pub async fn load_car<R: AsyncRead + Unpin>( 20 + reader: R, 21 + mut store: DiskStore, 22 + ) -> Result<DiskStore, DriveError> { 23 + let mut mem_blocks = HashMap::new(); 68 24 69 - let mut car = CarReader::new(reader).await?; 25 + let mut car = CarReader::new(reader).await?; 70 26 71 - // try to load all the blocks into memory 72 - let mut n = 0; 27 + // try to load all the blocks into memory 28 + let mut n = 0; 73 29 74 - while let Some((cid, data)) = car.next_block().await? { 75 - mem_blocks.insert(cid, data); 76 - n += 1; 77 - if n > 20_000 { 78 - log::info!("stopping for disk dump at {n}"); 79 - return Ok(NeedDisk { 80 - car, 81 - mem_blocks, 82 - }); 83 - } 30 + while let Some((cid, data)) = car.next_block().await? { 31 + mem_blocks.insert(cid, data); 32 + n += 1; 33 + if n >= 20_000 { 34 + log::info!("stopping for disk dump at {n}"); 35 + break; 84 36 } 85 - panic!("car too small"); 86 37 } 87 - } 88 38 89 - 90 - /// A partially memory-loaded car file that needs disk spillover to continue 91 - pub struct NeedDisk<R: AsyncRead + Unpin> { 92 - car: CarReader<R>, 93 - mem_blocks: HashMap<Cid, Vec<u8>>, 94 - } 95 - 96 - impl<R: AsyncRead + Unpin> NeedDisk<R> { 97 - pub async fn finish_loading( 98 - mut self, 99 - mut store: DiskStore, 100 - ) -> Result<DiskStore, DriveError> { 101 - // dump mem blocks into the store 102 - for (k, v) in self.mem_blocks { 103 - store.put(k.to_bytes(), v)?; 104 - } 39 + // dump mem blocks into the store 40 + for (k, v) in mem_blocks { 41 + store.put(k.to_bytes(), v)?; 42 + } 105 43 106 - // dump the rest to disk (in chunks) 107 - log::debug!("dumping the rest of the stream..."); 108 - while let Some((cid, data)) = self.car.next_block().await? { 109 - store.put(cid.to_bytes(), data)?; 110 - } 111 - log::debug!("done."); 112 - Ok(store) 44 + // dump the rest to disk (in chunks) 45 + log::debug!("dumping the rest of the stream..."); 46 + while let Some((cid, data)) = car.next_block().await? { 47 + store.put(cid.to_bytes(), data)?; 113 48 } 49 + log::debug!("done."); 50 + Ok(store) 51 + 114 52 }
+2 -2
src/lib.rs
··· 77 77 pub mod disk; 78 78 pub mod drive; 79 79 80 - pub use disk::{DiskBuilder, DiskError, DiskStore}; 81 - pub use drive::{DriveError, Driver, NeedDisk}; 80 + pub use disk::{DiskError, DiskStore}; 81 + pub use drive::{DriveError, load_car};