Fast and robust atproto CAR file processing in rust

public api cleanup

Changed files
+90 -75
benches
examples
disk-read-file
read-file
src
tests
+4 -3
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 + use repo_stream::Driver; 2 3 use std::path::{Path, PathBuf}; 3 4 4 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 23 24 24 25 let mb = 2_usize.pow(20); 25 26 26 - let mut driver = match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * mb) 27 + let mut driver = match Driver::load_car(reader, |block| block.len(), 1024 * mb) 27 28 .await 28 29 .unwrap() 29 30 { 30 - repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver, 31 - repo_stream::drive::Vehicle::Big(_) => panic!("not doing disk for benchmark"), 31 + Driver::Lil(_, mem_driver) => mem_driver, 32 + Driver::Big(_) => panic!("not doing disk for benchmark"), 32 33 }; 33 34 34 35 let mut n = 0;
+8 -8
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 + use repo_stream::Driver; 2 3 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 4 5 ··· 24 25 } 25 26 26 27 async fn drive_car(bytes: &[u8]) -> usize { 27 - let mut driver = 28 - match repo_stream::drive::load_car(bytes, |block| block.len(), 32 * 2_usize.pow(20)) 29 - .await 30 - .unwrap() 31 - { 32 - repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver, 33 - repo_stream::drive::Vehicle::Big(_) => panic!("not benching big cars here"), 34 - }; 28 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 32 * 2_usize.pow(20)) 29 + .await 30 + .unwrap() 31 + { 32 + Driver::Lil(_, mem_driver) => mem_driver, 33 + Driver::Big(_) => panic!("not benching big cars here"), 34 + }; 35 35 36 36 let mut n = 0; 37 37 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
+4 -3
examples/disk-read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 + use repo_stream::{Driver, noop}; 3 4 use std::path::PathBuf; 4 5 5 6 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 25 26 26 27 let limit_mb = 32; 27 28 28 - let driver = match repo_stream::drive::load_car(reader, |block| block, 10 * mb).await? { 29 - repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 30 - repo_stream::drive::Vehicle::Big(big_stuff) => { 29 + let driver = match Driver::load_car(reader, noop, 10 * mb).await? { 30 + Driver::Lil(_, _) => panic!("try this on a bigger car"), 31 + Driver::Big(big_stuff) => { 31 32 let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb).await?; 32 33 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 33 34 log::warn!("big: {:?}", commit);
+4 -3
examples/read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 + use repo_stream::Driver; 3 4 use std::path::PathBuf; 4 5 5 6 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 19 20 let reader = tokio::io::BufReader::new(reader); 20 21 21 22 let (commit, mut driver) = 22 - match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * 1024).await? { 23 - repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver), 24 - repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"), 23 + match Driver::load_car(reader, |block| block.len(), 16 * 1024 * 1024).await? { 24 + Driver::Lil(commit, mem_driver) => (commit, mem_driver), 25 + Driver::Big(_) => panic!("can't handle big cars yet"), 25 26 }; 26 27 27 28 log::info!("got commit: {commit:?}");
+57 -55
src/drive.rs
··· 88 88 } 89 89 } 90 90 91 - pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> { 91 + pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 92 92 Lil(Commit, MemDriver<T>), 93 93 Big(BigCar<R, T>), 94 94 } 95 95 96 - pub async fn load_car<R: AsyncRead + Unpin, T: Processable>( 97 - reader: R, 98 - process: fn(Vec<u8>) -> T, 99 - max_size: usize, 100 - ) -> Result<Vehicle<R, T>, DriveError> { 101 - let mut mem_blocks = HashMap::new(); 96 + impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 97 + pub async fn load_car( 98 + reader: R, 99 + process: fn(Vec<u8>) -> T, 100 + max_size: usize, 101 + ) -> Result<Driver<R, T>, DriveError> { 102 + let mut mem_blocks = HashMap::new(); 102 103 103 - let mut car = CarReader::new(reader).await?; 104 + let mut car = CarReader::new(reader).await?; 104 105 105 - let root = *car 106 - .header() 107 - .roots() 108 - .first() 109 - .ok_or(DriveError::MissingRoot)?; 110 - log::debug!("root: {root:?}"); 106 + let root = *car 107 + .header() 108 + .roots() 109 + .first() 110 + .ok_or(DriveError::MissingRoot)?; 111 + log::debug!("root: {root:?}"); 111 112 112 - let mut commit = None; 113 + let mut commit = None; 113 114 114 - // try to load all the blocks into memory 115 - let mut mem_size = 0; 116 - while let Some((cid, data)) = car.next_block().await? { 117 - // the root commit is a Special Third Kind of block that we need to make 118 - // sure not to optimistically send to the processing function 119 - if cid == root { 120 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 121 - commit = Some(c); 122 - continue; 123 - } 115 + // try to load all the blocks into memory 116 + let mut mem_size = 0; 117 + while let Some((cid, data)) = car.next_block().await? { 118 + // the root commit is a Special Third Kind of block that we need to make 119 + // sure not to optimistically send to the processing function 120 + if cid == root { 121 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 122 + commit = Some(c); 123 + continue; 124 + } 124 125 125 - // remaining possible types: node, record, other. optimistically process 126 - let maybe_processed = if Node::could_be(&data) { 127 - MaybeProcessedBlock::Raw(data) 128 - } else { 129 - MaybeProcessedBlock::Processed(process(data)) 130 - }; 126 + // remaining possible types: node, record, other. optimistically process 127 + let maybe_processed = if Node::could_be(&data) { 128 + MaybeProcessedBlock::Raw(data) 129 + } else { 130 + MaybeProcessedBlock::Processed(process(data)) 131 + }; 131 132 132 - // stash (maybe processed) blocks in memory as long as we have room 133 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 134 - mem_blocks.insert(cid, maybe_processed); 135 - if mem_size >= max_size { 136 - return Ok(Vehicle::Big(BigCar { 137 - car, 138 - root, 139 - process, 140 - max_size, 141 - mem_blocks, 142 - commit, 143 - })); 133 + // stash (maybe processed) blocks in memory as long as we have room 134 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 135 + mem_blocks.insert(cid, maybe_processed); 136 + if mem_size >= max_size { 137 + return Ok(Driver::Big(BigCar { 138 + car, 139 + root, 140 + process, 141 + max_size, 142 + mem_blocks, 143 + commit, 144 + })); 145 + } 144 146 } 145 - } 146 147 147 - // all blocks loaded and we fit in memory! hopefully we found the commit... 148 - let commit = commit.ok_or(DriveError::MissingCommit)?; 148 + // all blocks loaded and we fit in memory! hopefully we found the commit... 149 + let commit = commit.ok_or(DriveError::MissingCommit)?; 149 150 150 - let walker = Walker::new(commit.data); 151 + let walker = Walker::new(commit.data); 151 152 152 - Ok(Vehicle::Lil( 153 - commit, 154 - MemDriver { 155 - blocks: mem_blocks, 156 - walker, 157 - process, 158 - }, 159 - )) 153 + Ok(Driver::Lil( 154 + commit, 155 + MemDriver { 156 + blocks: mem_blocks, 157 + walker, 158 + process, 159 + }, 160 + )) 161 + } 160 162 } 161 163 162 164 /// a paritally memory-loaded car file that needs disk spillover to continue
+4
src/lib.rs
··· 7 7 pub mod mst; 8 8 pub mod process; 9 9 pub mod walk; 10 + 11 + pub use disk::SqliteStore; 12 + pub use drive::{DriveError, Driver}; 13 + pub use process::{Processable, noop};
+5
src/process.rs
··· 5 5 fn get_size(&self) -> usize; 6 6 } 7 7 8 + #[inline] 9 + pub fn noop(block: Vec<u8>) -> Vec<u8> { 10 + block 11 + } 12 + 8 13 impl Processable for u8 { 9 14 fn get_size(&self) -> usize { 10 15 0
+4 -3
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 + use repo_stream::Driver; 2 3 3 4 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 4 5 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); ··· 7 8 async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 8 9 let mb = 2_usize.pow(20); 9 10 10 - let mut driver = match repo_stream::drive::load_car(bytes, |block| block.len(), 10 * mb) 11 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 * mb) 11 12 .await 12 13 .unwrap() 13 14 { 14 - repo_stream::drive::Vehicle::Lil(_commit, mem_driver) => mem_driver, 15 - repo_stream::drive::Vehicle::Big(_) => panic!("too big"), 15 + Driver::Lil(_commit, mem_driver) => mem_driver, 16 + Driver::Big(_) => panic!("too big"), 16 17 }; 17 18 18 19 let mut records = 0;