Fast and robust atproto CAR file processing in rust

basic integration tests

Changed files
+60
tests
+60
tests/non-huge-cars.rs
···
··· 1 + extern crate repo_stream; 2 + use futures::TryStreamExt; 3 + use iroh_car::CarReader; 4 + use std::convert::Infallible; 5 + 6 + const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 + const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 + const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 9 + 10 + 11 + async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 12 + let reader = CarReader::new(bytes).await.unwrap(); 13 + 14 + let root = reader 15 + .header() 16 + .roots() 17 + .first() 18 + .ok_or("missing root").unwrap() 19 + .clone(); 20 + 21 + let stream = std::pin::pin!(reader.stream()); 22 + 23 + let (_commit, v) = 24 + repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 25 + .await 26 + .unwrap(); 27 + let mut record_stream = std::pin::pin!(v.stream()); 28 + 29 + let mut records = 0; 30 + let mut sum = 0; 31 + let mut found_bsky_profile = false; 32 + let mut prev_rkey = "".to_string(); 33 + while let Some((rkey, size)) = record_stream.try_next().await.unwrap() { 34 + records += 1; 35 + sum += size; 36 + if rkey.0 == "app.bsky.actor.profile/self" { 37 + found_bsky_profile = true; 38 + } 39 + assert!(rkey.0 > prev_rkey, "rkeys are streamed in order"); 40 + prev_rkey = rkey.0; 41 + } 42 + assert_eq!(records, expected_records); 43 + assert_eq!(sum, expected_sum); 44 + assert!(found_bsky_profile); 45 + } 46 + 47 + #[tokio::test] 48 + async fn test_tiny_car() { 49 + test_car(TINY_CAR, 8, 2071).await 50 + } 51 + 52 + #[tokio::test] 53 + async fn test_little_car() { 54 + test_car(LITTLE_CAR, 278, 246960).await 55 + } 56 + 57 + #[tokio::test] 58 + async fn test_midsize_car() { 59 + test_car(MIDSIZE_CAR, 11585, 3741393).await 60 + }