Fast and robust atproto CAR file processing in rust

process for noop

Changed files
+21 -2
examples
disk-read-file
src
+6 -2
examples/disk-read-file/main.rs
··· 25 25 26 26 let limit_mb = 32; 27 27 28 - let driver = match repo_stream::drive::load_car(reader, |block| block.len(), 10 * mb).await? { 28 + let driver = match repo_stream::drive::load_car(reader, |block| block, 10 * mb).await? { 29 29 repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 30 30 repo_stream::drive::Vehicle::Big(big_stuff) => { 31 31 let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb).await?; ··· 36 36 }; 37 37 38 38 let mut n = 0; 39 + let mut zeros = 0; 39 40 let (mut rx, worker) = driver.rx(512).await?; 40 41 41 42 log::debug!("walking..."); 42 43 while let Some(pairs) = rx.recv().await { 43 44 n += pairs.len(); 45 + for (_, block) in pairs { 46 + zeros += block.into_iter().filter(|&b| b == b'0').count() 47 + } 44 48 } 45 49 log::debug!("done walking! joining..."); 46 50 ··· 50 54 51 55 // log::info!("now is the time to check mem..."); 52 56 // tokio::time::sleep(std::time::Duration::from_secs(22)).await; 53 - log::info!("bye! {n}"); 57 + log::info!("bye! n={n} zeros={zeros}"); 54 58 55 59 std::fs::remove_file(tmpfile).unwrap(); // need to also remove -shm -wal 56 60
+15
src/process.rs
··· 5 5 fn get_size(&self) -> usize; 6 6 } 7 7 8 + impl Processable for u8 { 9 + fn get_size(&self) -> usize { 10 + 0 11 + } 12 + } 13 + 8 14 impl Processable for usize { 9 15 fn get_size(&self) -> usize { 10 16 0 // no additional space taken, just its stack size (newtype is free) 11 17 } 12 18 } 19 + 20 + impl<Item: Sized + Processable> Processable for Vec<Item> { 21 + fn get_size(&self) -> usize { 22 + let slot_size = std::mem::size_of::<Item>(); 23 + let direct_size = slot_size * self.capacity(); 24 + let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 25 + direct_size + items_referenced_size 26 + } 27 + }