Fast and robust atproto CAR file processing in rust

maybe processed helper + example comments

Changed files
+88 -38
examples
disk-read-file
src
+44 -16
examples/disk-read-file/main.rs
··· 3 3 use repo_stream::{Driver, noop}; 4 4 use std::path::PathBuf; 5 5 6 - type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 7 - 8 6 #[derive(Debug, Parser)] 9 7 struct Args { 10 8 #[arg()] ··· 14 12 } 15 13 16 14 #[tokio::main] 17 - async fn main() -> Result<()> { 15 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 18 16 env_logger::init(); 19 17 20 18 let Args { car, tmpfile } = Args::parse(); 19 + 20 + // repo-stream takes an AsyncRead as input. wrapping a filesystem read in 21 + // BufReader can provide a really significant performance win. 21 22 let reader = tokio::fs::File::open(car).await?; 22 23 let reader = tokio::io::BufReader::new(reader); 23 24 24 - // let kb = 2_usize.pow(10); 25 - let mb = 2_usize.pow(20); 25 + // configure how much memory can be used before spilling to disk. 26 + // real memory usage may differ somewhat. 27 + let in_mem_limit = 10 * 2_usize.pow(20); 26 28 27 - let limit_mb = 32; 29 + // configure how much memory sqlite is allowed to use when dumping to disk 30 + let db_cache_mb = 32; 28 31 29 - let driver = match Driver::load_car(reader, noop, 10 * mb).await? { 32 + log::info!("hello! reading the car..."); 33 + 34 + // in this example we only bother handling CARs that are too big for memory 35 + // `noop` helper means: do no block processing, store the raw blocks 36 + let driver = match Driver::load_car(reader, noop, in_mem_limit).await? { 30 37 Driver::Lil(_, _) => panic!("try this on a bigger car"), 31 38 Driver::Big(big_stuff) => { 32 - let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb).await?; 39 + // we reach here if the repo was too big and needs to be spilled to 40 + // disk to continue 41 + 42 + // set up a disk store we can spill to 43 + let disk_store = 44 + repo_stream::disk::SqliteStore::new(tmpfile.clone(), db_cache_mb).await?; 45 + 46 + // do the spilling, get back a (similar) driver 33 47 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 34 - log::warn!("big: {:?}", commit); 48 + 49 + // at this point you might want to fetch the account's signing key 50 + // via the DID from the commit, and then verify the signature. 51 + log::warn!("big's comit: {:?}", commit); 52 + 53 + // pop the driver back out to get some code indentation relief 35 54 driver 36 55 } 37 56 }; 38 57 58 + // collect some random stats about the blocks 39 59 let mut n = 0; 40 60 let mut zeros = 0; 41 - let mut rx = driver.to_channel(512); 42 61 43 - log::debug!("walking..."); 62 + log::info!("walking..."); 63 + 64 + // this example uses the disk driver's channel mode: the tree walking is 65 + // spawned onto a blocking thread, and we get chunks of rkey+blocks back 66 + let (mut rx, join) = driver.to_channel(512); 44 67 while let Some(r) = rx.recv().await { 45 68 let pairs = r?; 69 + 70 + // keep a count of the total number of blocks seen 46 71 n += pairs.len(); 72 + 47 73 for (_, block) in pairs { 74 + // for each block, count how many bytes are equal to '0' 75 + // (this is just an example, you probably want to do something more 76 + // interesting) 48 77 zeros += block.into_iter().filter(|&b| b == b'0').count() 49 78 } 50 79 } 51 - log::debug!("done walking!"); 80 + 81 + log::info!("arrived! joining rx..."); 52 82 53 - // log::info!("now is the time to check mem..."); 54 - // tokio::time::sleep(std::time::Duration::from_secs(22)).await; 55 - log::info!("bye! n={n} zeros={zeros}"); 83 + join.await?.reset_store().await?; 56 84 57 - std::fs::remove_file(tmpfile).unwrap(); // need to also remove -shm -wal 85 + log::info!("done. n={n} zeros={zeros}"); 58 86 59 87 Ok(()) 60 88 }
+4
src/disk.rs
··· 47 47 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 48 48 Ok(SqliteReader { select_stmt }) 49 49 } 50 + pub fn reset(&mut self) -> Result<(), rusqlite::Error> { 51 + self.conn.execute("DROP TABLE blocks", ())?; 52 + Ok(()) 53 + } 50 54 } 51 55 52 56 pub struct SqliteWriter<'conn> {
+34 -17
src/drive.rs
··· 90 90 } 91 91 } 92 92 93 + impl<T> MaybeProcessedBlock<T> { 94 + fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 95 + if Node::could_be(&data) { 96 + MaybeProcessedBlock::Raw(data) 97 + } else { 98 + MaybeProcessedBlock::Processed(process(data)) 99 + } 100 + } 101 + } 102 + 93 103 pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 94 104 Lil(Commit, MemDriver<T>), 95 105 Big(BigCar<R, T>), ··· 126 136 } 127 137 128 138 // remaining possible types: node, record, other. optimistically process 129 - let maybe_processed = if Node::could_be(&data) { 130 - MaybeProcessedBlock::Raw(data) 131 - } else { 132 - MaybeProcessedBlock::Processed(process(data)) 133 - }; 139 + let maybe_processed = MaybeProcessedBlock::maybe(process, data); 134 140 135 141 // stash (maybe processed) blocks in memory as long as we have room 136 142 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); ··· 192 198 match self.walker.step(&mut self.blocks, self.process)? { 193 199 Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 194 200 Step::Finish => break, 195 - Step::Step { rkey, data } => { 201 + Step::Found { rkey, data } => { 196 202 out.push((rkey, data)); 197 203 continue; 198 204 } ··· 283 289 } 284 290 // remaining possible types: node, record, other. optimistically process 285 291 // TODO: get the actual in-memory size to compute disk spill 286 - let maybe_processed = if Node::could_be(&data) { 287 - MaybeProcessedBlock::Raw(data) 288 - } else { 289 - MaybeProcessedBlock::Processed((self.process)(data)) 290 - }; 292 + let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 291 293 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 292 294 chunk.push((cid, maybe_processed)); 293 295 if mem_size >= self.max_size { ··· 347 349 match self.walker.disk_step(&mut reader, self.process)? { 348 350 Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 349 351 Step::Finish => break, 350 - Step::Step { rkey, data } => { 352 + Step::Found { rkey, data } => { 351 353 out.push((rkey, data)); 352 354 continue; 353 355 } ··· 368 370 } 369 371 370 372 fn read_tx_blocking( 371 - mut self, 373 + &mut self, 372 374 n: usize, 373 375 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 374 376 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { ··· 393 395 return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 394 396 } 395 397 Step::Finish => return Ok(()), 396 - Step::Step { rkey, data } => { 398 + Step::Found { rkey, data } => { 397 399 out.push((rkey, data)); 398 400 continue; 399 401 } ··· 409 411 Ok(()) 410 412 } 411 413 412 - pub fn to_channel(self, n: usize) -> mpsc::Receiver<Result<BlockChunk<T>, DriveError>> { 414 + pub fn to_channel( 415 + mut self, 416 + n: usize, 417 + ) -> ( 418 + mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 419 + tokio::task::JoinHandle<Self>, 420 + ) { 413 421 let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 414 422 415 423 // sketch: this worker is going to be allowed to execute without a join handle 416 - tokio::task::spawn_blocking(move || { 424 + let chan_task = tokio::task::spawn_blocking(move || { 417 425 if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 418 426 log::debug!("big car reader exited early due to dropped receiver channel"); 419 427 } 428 + self 420 429 }); 421 430 422 - rx 431 + (rx, chan_task) 432 + } 433 + 434 + pub async fn reset_store(mut self) -> Result<SqliteStore, DriveError> { 435 + tokio::task::spawn_blocking(move || { 436 + self.store.reset()?; 437 + Ok(self.store) 438 + }) 439 + .await? 423 440 } 424 441 }
+3 -2
src/lib.rs
··· 2 2 //! 3 3 //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 4 4 5 + mod mst; 6 + mod walk; 7 + 5 8 pub mod disk; 6 9 pub mod drive; 7 - pub mod mst; 8 10 pub mod process; 9 - pub mod walk; 10 11 11 12 pub use disk::SqliteStore; 12 13 pub use drive::{DriveError, Driver};
+3 -3
src/walk.rs
··· 51 51 /// Reached the end of the MST! yay! 52 52 Finish, 53 53 /// A record was found! 54 - Step { rkey: String, data: T }, 54 + Found { rkey: String, data: T }, 55 55 } 56 56 57 57 #[derive(Debug, Clone, PartialEq)] ··· 227 227 } 228 228 self.prev = rkey.clone(); 229 229 230 - return Ok(Step::Step { rkey, data }); 230 + return Ok(Step::Found { rkey, data }); 231 231 } 232 232 } 233 233 } ··· 294 294 } 295 295 self.prev = rkey.clone(); 296 296 297 - return Ok(Step::Step { rkey, data }); 297 + return Ok(Step::Found { rkey, data }); 298 298 } 299 299 } 300 300 }