Fast and robust atproto CAR file processing in rust

sizing and profile a lil

Changed files
+47 -11
examples
disk-read-file
src
+3
Cargo.toml
··· 34 34 inherits = "release" 35 35 debug = true 36 36 37 + [profile.release] 38 + debug = true 39 + 37 40 [[bench]] 38 41 name = "non-huge-cars" 39 42 harness = false
+11 -4
examples/disk-read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 - use repo_stream::disk::RedbStore; 4 3 use repo_stream::drive::Processable; 5 4 use serde::{Deserialize, Serialize}; 6 5 use std::path::PathBuf; ··· 18 17 #[derive(Clone, Serialize, Deserialize)] 19 18 struct S(usize); 20 19 21 - impl Processable for S {} 20 + impl Processable for S { 21 + fn get_size(&self) -> usize { 22 + 0 // no additional space taken, just its stack size (newtype is free) 23 + } 24 + } 22 25 23 26 #[tokio::main] 24 27 async fn main() -> Result<()> { ··· 28 31 let reader = tokio::fs::File::open(car).await?; 29 32 let reader = tokio::io::BufReader::new(reader); 30 33 34 + // let kb = 2_usize.pow(10); 35 + let mb = 2_usize.pow(20); 36 + 31 37 let mut driver = 32 - match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024).await? { 38 + match repo_stream::drive::load_car(reader, |block| S(block.len()), 16 * mb).await? { 33 39 repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 34 40 repo_stream::drive::Vehicle::Big(big_stuff) => { 35 - let disk_store = RedbStore::new(tmpfile); 41 + // let disk_store = repo_stream::disk::SqliteStore::new(tmpfile); 42 + let disk_store = repo_stream::disk::RedbStore::new(tmpfile); 36 43 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 37 44 log::warn!("big: {:?}", commit); 38 45 driver
+7 -2
src/disk.rs
··· 61 61 62 62 conn.pragma_update(None, "journal_mode", "WAL")?; 63 63 conn.pragma_update(None, "synchronous", "OFF")?; 64 - conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?; 64 + conn.pragma_update(None, "cache_size", (-4 * 2_i64.pow(10)).to_string())?; 65 65 conn.execute( 66 66 "CREATE TABLE blocks ( 67 67 key BLOB PRIMARY KEY NOT NULL, ··· 144 144 type Access = RedbAccess; 145 145 async fn get_access(&mut self) -> Result<RedbAccess, redb::Error> { 146 146 let path = self.path.clone(); 147 + let kb = 2_usize.pow(10); 147 148 let db = tokio::task::spawn_blocking(move || { 148 - let db = redb::Database::create(path)?; 149 + let db = redb::Database::builder() 150 + .set_cache_size(16 * kb) 151 + .create(path)?; 149 152 Ok::<_, Self::StorageError>(db) 150 153 }) 151 154 .await ··· 204 207 Ok(rv) 205 208 } 206 209 } 210 + 211 + ///// TODO: that other single file db thing to try
+26 -5
src/drive.rs
··· 57 57 // DiskDriveError(#[from] DiskDriveError<E>), 58 58 // } 59 59 60 - pub trait Processable: Clone + Serialize + DeserializeOwned {} 60 + pub trait Processable: Clone + Serialize + DeserializeOwned { 61 + /// the additional size taken up (not including its mem::size_of) 62 + fn get_size(&self) -> usize; 63 + } 61 64 62 65 #[derive(Debug, Clone, Serialize, Deserialize)] 63 66 pub enum MaybeProcessedBlock<T> { ··· 85 88 Processed(T), 86 89 } 87 90 88 - impl<T: Processable> Processable for MaybeProcessedBlock<T> {} 91 + impl<T: Processable> Processable for MaybeProcessedBlock<T> { 92 + /// TODO this is probably a little broken 93 + fn get_size(&self) -> usize { 94 + use std::{cmp::max, mem::size_of}; 95 + 96 + // enum is always as big as its biggest member? 97 + let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 98 + 99 + let extra = match self { 100 + Self::Raw(bytes) => bytes.len(), 101 + Self::Processed(t) => t.get_size(), 102 + }; 103 + 104 + base_size + extra 105 + } 106 + } 89 107 90 108 pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> { 91 109 Lil(Commit, MemDriver<T>), ··· 111 129 let mut commit = None; 112 130 113 131 // try to load all the blocks into memory 132 + let mut mem_size = 0; 114 133 while let Some((cid, data)) = car.next_block().await? { 115 134 // the root commit is a Special Third Kind of block that we need to make 116 135 // sure not to optimistically send to the processing function ··· 129 148 }; 130 149 131 150 // stash (maybe processed) blocks in memory as long as we have room 151 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 132 152 mem_blocks.insert(cid, maybe_processed); 133 - if mem_blocks.len() >= max_size { 153 + if mem_size >= max_size { 134 154 return Ok(Vehicle::Big(BigCar { 135 155 car, 136 156 root, ··· 207 227 // dump the rest to disk (in chunks) 208 228 loop { 209 229 let mut chunk = vec![]; 230 + let mut mem_size = 0; 210 231 loop { 211 232 let Some((cid, data)) = self.car.next_block().await? else { 212 233 break; ··· 224 245 } else { 225 246 MaybeProcessedBlock::Processed((self.process)(&data)) 226 247 }; 248 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 227 249 chunk.push((cid, maybe_processed)); 228 - if chunk.len() >= self.max_size { 229 - // eventually this won't be .len() 250 + if mem_size >= self.max_size { 230 251 break; 231 252 } 232 253 }