Fast and robust atproto CAR file processing in rust

channels

seems like it's good on the write side

unclear if there's a win on reads

Changed files
+66 -20
examples
disk-read-file
src
+20 -20
examples/disk-read-file/main.rs
··· 36 36 37 37 let limit_mb = 32; 38 38 39 - let mut driver = 40 - match repo_stream::drive::load_car(reader, |block| S(block.len()), 10 * mb).await? { 41 - repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 42 - repo_stream::drive::Vehicle::Big(big_stuff) => { 43 - let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb); 44 - let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 45 - log::warn!("big: {:?}", commit); 46 - driver 47 - } 48 - }; 49 - 50 - println!("hello!"); 39 + let driver = match repo_stream::drive::load_car(reader, |block| S(block.len()), 10 * mb).await? 40 + { 41 + repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 42 + repo_stream::drive::Vehicle::Big(big_stuff) => { 43 + let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb); 44 + let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 45 + log::warn!("big: {:?}", commit); 46 + driver 47 + } 48 + }; 51 49 52 50 let mut n = 0; 53 - loop { 54 - let (d, p) = driver.next_chunk(1024).await?; 55 - driver = d; 56 - let Some(pairs) = p else { 57 - break; 58 - }; 51 + let (mut rx, worker) = driver.rx(512).await?; 52 + 53 + log::debug!("walking..."); 54 + while let Some(pairs) = rx.recv().await { 59 55 n += pairs.len(); 60 - // log::info!("got {rkey:?}"); 61 56 } 57 + log::debug!("done walking! joining..."); 58 + 59 + worker.await.unwrap().unwrap(); 60 + 61 + log::debug!("joined."); 62 + 62 63 // log::info!("now is the time to check mem..."); 63 64 // tokio::time::sleep(std::time::Duration::from_secs(22)).await; 64 - drop(driver); 65 65 log::info!("bye! {n}"); 66 66 67 67 std::fs::remove_file(tmpfile).unwrap(); // need to also remove -shm -wal
+46
src/drive.rs
··· 330 330 Ok((self, Some(out))) 331 331 } 332 332 } 333 + 334 + pub async fn rx( 335 + mut self, 336 + n: usize, 337 + ) -> Result< 338 + ( 339 + tokio::sync::mpsc::Receiver<Vec<(String, T)>>, 340 + tokio::task::JoinHandle<Result<(), DiskDriveError>>, 341 + ), 342 + DiskDriveError, 343 + > { 344 + let (tx, rx) = tokio::sync::mpsc::channel::<Vec<(String, T)>>(1); 345 + 346 + // sketch: this worker is going to be allowed to execute without a join handle 347 + // ...should we return the join handle here so the caller at least knows about it? 348 + // yes probably for error handling?? (orrr put errors in the channel) 349 + let worker = tokio::task::spawn_blocking(move || { 350 + let mut reader = self.access.get_reader()?; 351 + 352 + loop { 353 + let mut out = Vec::with_capacity(n); 354 + 355 + for _ in 0..n { 356 + // walk as far as we can until we run out of blocks or find a record 357 + match self.walker.disk_step(&mut reader, self.process)? { 358 + Step::Missing(cid) => return Err(DiskDriveError::MissingBlock(cid)), 359 + Step::Finish => break, 360 + Step::Step { rkey, data } => { 361 + out.push((rkey, data)); 362 + continue; 363 + } 364 + }; 365 + } 366 + 367 + if out.is_empty() { 368 + break; 369 + } 370 + tx.blocking_send(out).unwrap(); 371 + } 372 + 373 + drop(reader); // cannot outlive access 374 + Ok(()) 375 + }); // await later 376 + 377 + Ok((rx, worker)) 378 + } 333 379 } 334 380 335 381 /// The core driver between the block stream and MST walker