Fast and robust atproto CAR file processing in rust

Fix: don't break on empty car

with regression test and benchmark

Changed files
+28 -10
benches
car-samples
src
tests
+1 -1
Cargo.lock
··· 1024 1025 [[package]] 1026 name = "repo-stream" 1027 - version = "0.2.0" 1028 dependencies = [ 1029 "bincode", 1030 "clap",
··· 1024 1025 [[package]] 1026 name = "repo-stream" 1027 + version = "0.2.1" 1028 dependencies = [ 1029 "bincode", 1030 "clap",
+1 -1
Cargo.toml
··· 1 [package] 2 name = "repo-stream" 3 - version = "0.2.0" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 description = "A robust CAR file -> MST walker for atproto"
··· 1 [package] 2 name = "repo-stream" 3 + version = "0.2.1" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 description = "A robust CAR file -> MST walker for atproto"
+4
benches/non-huge-cars.rs
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 13 .build() 14 .expect("Creating runtime failed"); 15 16 c.bench_function("tiny-car", |b| { 17 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 18 });
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 14 .build() 15 .expect("Creating runtime failed"); 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 20 c.bench_function("tiny-car", |b| { 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 22 });
car-samples/empty.car

This is a binary file and will not be displayed.

+6 -3
src/walk.rs
··· 87 } 88 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST 91 - // ...except for a single one for empty MST, but we wouldn't be pushing that 92 if node.is_empty() { 93 - return Err(MstError::EmptyNode); 94 } 95 96 let mut entries = Vec::with_capacity(node.entries.len());
··· 87 } 88 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 + // empty nodes are not allowed in the MST except in an empty MST 91 if node.is_empty() { 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 97 } 98 99 let mut entries = Vec::with_capacity(node.entries.len());
+16 -5
tests/non-huge-cars.rs
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 4 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 5 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 6 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 7 8 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 9 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 10 .await 11 .unwrap() ··· 33 34 assert_eq!(records, expected_records); 35 assert_eq!(sum, expected_sum); 36 - assert!(found_bsky_profile); 37 } 38 39 #[tokio::test] 40 async fn test_tiny_car() { 41 - test_car(TINY_CAR, 8, 2071).await 42 } 43 44 #[tokio::test] 45 async fn test_little_car() { 46 - test_car(LITTLE_CAR, 278, 246960).await 47 } 48 49 #[tokio::test] 50 async fn test_midsize_car() { 51 - test_car(MIDSIZE_CAR, 11585, 3741393).await 52 }
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 8 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 15 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 .await 17 .unwrap() ··· 39 40 assert_eq!(records, expected_records); 41 assert_eq!(sum, expected_sum); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 48 } 49 50 #[tokio::test] 51 async fn test_tiny_car() { 52 + test_car(TINY_CAR, 8, 2071, true).await 53 } 54 55 #[tokio::test] 56 async fn test_little_car() { 57 + test_car(LITTLE_CAR, 278, 246960, true).await 58 } 59 60 #[tokio::test] 61 async fn test_midsize_car() { 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 63 }