Fast and robust atproto CAR file processing in rust

fix benches

Changed files
+47 -43
benches
+24 -21
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::drive::Processable; 3 + use serde::{Deserialize, Serialize}; 5 4 use std::path::{Path, PathBuf}; 6 5 7 6 use criterion::{Criterion, criterion_group, criterion_main}; 8 7 8 + #[derive(Clone, Serialize, Deserialize)] 9 + struct S(usize); 10 + 11 + impl Processable for S { 12 + fn get_size(&self) -> usize { 13 + 0 // no additional space taken, just its stack size (newtype is free) 14 + } 15 + } 16 + 9 17 pub fn criterion_benchmark(c: &mut Criterion) { 10 18 let rt = tokio::runtime::Builder::new_multi_thread() 11 19 .enable_all() ··· 20 28 }); 21 29 } 22 30 23 - async fn drive_car(filename: impl AsRef<Path>) { 31 + async fn drive_car(filename: impl AsRef<Path>) -> usize { 24 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 25 33 let reader = tokio::io::BufReader::new(reader); 26 - let reader = CarReader::new(reader).await.unwrap(); 34 + 35 + let mb = 2_usize.pow(20); 27 36 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root") 37 + let mut driver = match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024 * mb) 38 + .await 33 39 .unwrap() 34 - .clone(); 35 - 36 - let stream = std::pin::pin!(reader.stream()); 37 - 38 - let (_commit, v) = 39 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 40 - .await 41 - .unwrap(); 42 - let mut record_stream = std::pin::pin!(v.stream()); 40 + { 41 + repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver, 42 + repo_stream::drive::Vehicle::Big(_) => panic!("not doing disk for benchmark"), 43 + }; 43 44 44 - while let Some(_) = record_stream.try_next().await.unwrap() { 45 - // just here for the drive 45 + let mut n = 0; 46 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 47 + n += pairs.len(); 46 48 } 49 + n 47 50 } 48 51 49 52 criterion_group!(benches, criterion_benchmark);
+23 -22
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 5 2 6 3 use criterion::{Criterion, criterion_group, criterion_main}; 4 + use repo_stream::drive::Processable; 5 + use serde::{Deserialize, Serialize}; 7 6 8 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 9 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 10 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 11 10 11 + #[derive(Clone, Serialize, Deserialize)] 12 + struct S(usize); 13 + 14 + impl Processable for S { 15 + fn get_size(&self) -> usize { 16 + 0 // no additional space taken, just its stack size (newtype is free) 17 + } 18 + } 19 + 12 20 pub fn criterion_benchmark(c: &mut Criterion) { 13 21 let rt = tokio::runtime::Builder::new_multi_thread() 14 22 .enable_all() ··· 26 34 }); 27 35 } 28 36 29 - async fn drive_car(bytes: &[u8]) { 30 - let reader = CarReader::new(bytes).await.unwrap(); 31 - 32 - let root = reader 33 - .header() 34 - .roots() 35 - .first() 36 - .ok_or("missing root") 37 - .unwrap() 38 - .clone(); 39 - 40 - let stream = std::pin::pin!(reader.stream()); 41 - 42 - let (_commit, v) = 43 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 37 + async fn drive_car(bytes: &[u8]) -> usize { 38 + let mut driver = 39 + match repo_stream::drive::load_car(bytes, |block| S(block.len()), 32 * 2_usize.pow(20)) 44 40 .await 45 - .unwrap(); 46 - let mut record_stream = std::pin::pin!(v.stream()); 41 + .unwrap() 42 + { 43 + repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver, 44 + repo_stream::drive::Vehicle::Big(_) => panic!("not benching big cars here"), 45 + }; 47 46 48 - while let Some(_) = record_stream.try_next().await.unwrap() { 49 - // just here for the drive 47 + let mut n = 0; 48 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 49 + n += pairs.len(); 50 50 } 51 + n 51 52 } 52 53 53 54 criterion_group!(benches, criterion_benchmark);