Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+114 -19
benches
car-samples
examples
disk-read-file
src
tests
+1 -1
Cargo.lock
··· 1024 1024 1025 1025 [[package]] 1026 1026 name = "repo-stream" 1027 - version = "0.2.0" 1027 + version = "0.2.2" 1028 1028 dependencies = [ 1029 1029 "bincode", 1030 1030 "clap",
+1 -1
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.2.0" 3 + version = "0.2.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 6 description = "A robust CAR file -> MST walker for atproto"
+4
benches/non-huge-cars.rs
··· 3 3 4 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 13 14 .build() 14 15 .expect("Creating runtime failed"); 15 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 16 20 c.bench_function("tiny-car", |b| { 17 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 18 22 });
car-samples/empty.car

This is a binary file and will not be displayed.

+4 -2
examples/disk-read-file/main.rs
··· 6 6 use clap::Parser; 7 7 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 8 use std::path::PathBuf; 9 + use std::time::Instant; 9 10 10 11 #[derive(Debug, Parser)] 11 12 struct Args { ··· 27 28 let reader = tokio::io::BufReader::new(reader); 28 29 29 30 log::info!("hello! reading the car..."); 31 + let t0 = Instant::now(); 30 32 31 33 // in this example we only bother handling CARs that are too big for memory 32 34 // `noop` helper means: do no block processing, store the raw blocks ··· 48 50 49 51 // at this point you might want to fetch the account's signing key 50 52 // via the DID from the commit, and then verify the signature. 51 - log::warn!("big's comit: {:?}", commit); 53 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 52 54 53 55 // pop the driver back out to get some code indentation relief 54 56 driver ··· 78 80 } 79 81 } 80 82 81 - log::info!("arrived! joining rx..."); 83 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 82 84 83 85 // clean up the database. would be nice to do this in drop so it happens 84 86 // automatically, but some blocking work happens, so that's not allowed in
+53 -2
readme.md
··· 4 4 5 5 [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 6 [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 7 8 8 9 [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 9 10 [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 10 12 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 11 15 12 - todo 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 + 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + 54 + // clean up the disk store (drop tables etc) 55 + driver.reset_store().await?; 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + Ok(()) 60 + } 61 + ``` 62 + 63 + more recent todo 13 64 14 65 - [ ] get an *emtpy* car for the test suite 15 - - [ ] implement a max size on disk limit 66 + - [x] implement a max size on disk limit 16 67 17 68 18 69 -----
+2 -1
src/disk.rs
··· 53 53 } 54 54 55 55 /// Builder-style disk store setup 56 + #[derive(Debug, Clone)] 56 57 pub struct DiskBuilder { 57 58 /// Database in-memory cache allowance 58 59 /// ··· 96 97 self 97 98 } 98 99 /// Open and initialize the actual disk storage 99 - pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 101 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 102 } 102 103 }
+5 -3
src/drive.rs
··· 116 116 } 117 117 118 118 /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 119 120 pub struct DriverBuilder { 120 121 pub mem_limit_mb: usize, 121 122 } ··· 153 154 } 154 155 /// Begin processing an atproto MST from a CAR file 155 156 pub async fn load_car<R: AsyncRead + Unpin>( 156 - self, 157 + &self, 157 158 reader: R, 158 159 ) -> Result<Driver<R, Vec<u8>>, DriveError> { 159 160 Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await ··· 163 164 /// Builder-style driver intermediate step 164 165 /// 165 166 /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 166 168 pub struct DriverBuilderWithProcessor<T: Processable> { 167 169 pub mem_limit_mb: usize, 168 170 pub block_processor: fn(Vec<u8>) -> T, ··· 178 180 } 179 181 /// Begin processing an atproto MST from a CAR file 180 182 pub async fn load_car<R: AsyncRead + Unpin>( 181 - self, 183 + &self, 182 184 reader: R, 183 185 ) -> Result<Driver<R, T>, DriveError> { 184 186 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await ··· 346 348 }) 347 349 .await??; 348 350 349 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 351 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 350 352 351 353 let store_worker = tokio::task::spawn_blocking(move || { 352 354 let mut writer = store.get_writer()?;
+1 -1
src/lib.rs
··· 82 82 pub mod process; 83 83 84 84 pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 - pub use drive::{DriveError, Driver, DriverBuilder}; 85 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 86 86 pub use mst::Commit; 87 87 pub use process::Processable;
+21
src/process.rs
··· 77 77 } 78 78 } 79 79 80 + impl Processable for String { 81 + fn get_size(&self) -> usize { 82 + self.capacity() 83 + } 84 + } 85 + 80 86 impl<Item: Sized + Processable> Processable for Vec<Item> { 81 87 fn get_size(&self) -> usize { 82 88 let slot_size = std::mem::size_of::<Item>(); ··· 85 91 direct_size + items_referenced_size 86 92 } 87 93 } 94 + 95 + impl<Item: Processable> Processable for Option<Item> { 96 + fn get_size(&self) -> usize { 97 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 + } 99 + } 100 + 101 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 + fn get_size(&self) -> usize { 103 + match self { 104 + Ok(item) => item.get_size(), 105 + Err(err) => err.get_size(), 106 + } 107 + } 108 + }
+6 -3
src/walk.rs
··· 87 87 } 88 88 89 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST 91 - // ...except for a single one for empty MST, but we wouldn't be pushing that 90 + // empty nodes are not allowed in the MST except in an empty MST 92 91 if node.is_empty() { 93 - return Err(MstError::EmptyNode); 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 94 97 } 95 98 96 99 let mut entries = Vec::with_capacity(node.entries.len());
+16 -5
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 2 use repo_stream::Driver; 3 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 4 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 5 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 6 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 7 8 8 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 9 15 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 10 16 .await 11 17 .unwrap() ··· 33 39 34 40 assert_eq!(records, expected_records); 35 41 assert_eq!(sum, expected_sum); 36 - assert!(found_bsky_profile); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 37 48 } 38 49 39 50 #[tokio::test] 40 51 async fn test_tiny_car() { 41 - test_car(TINY_CAR, 8, 2071).await 52 + test_car(TINY_CAR, 8, 2071, true).await 42 53 } 43 54 44 55 #[tokio::test] 45 56 async fn test_little_car() { 46 - test_car(LITTLE_CAR, 278, 246960).await 57 + test_car(LITTLE_CAR, 278, 246960, true).await 47 58 } 48 59 49 60 #[tokio::test] 50 61 async fn test_midsize_car() { 51 - test_car(MIDSIZE_CAR, 11585, 3741393).await 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 52 63 }