Fast and robust atproto CAR file processing in rust

down to one file (still repros)

Changed files
+84 -123
examples
disk-read-file
src
+84 -5
examples/disk-read-file/main.rs
··· 4 4 5 5 extern crate repo_stream; 6 6 use clap::Parser; 7 - use repo_stream::{DiskStore, load_car}; 8 7 use std::path::PathBuf; 9 8 use std::time::Instant; 9 + use fjall::{Database, Keyspace, KeyspaceCreateOptions, Error as FjallError}; 10 + use std::collections::HashMap; 11 + use iroh_car::CarReader; 12 + 13 + #[derive(Debug, thiserror::Error)] 14 + pub enum DriveError { 15 + #[error("Error from iroh_car: {0}")] 16 + CarReader(#[from] iroh_car::Error), 17 + #[error("Storage error")] 18 + StorageError(#[from] FjallError), 19 + } 10 20 11 21 #[derive(Debug, Parser)] 12 22 struct Args { ··· 31 41 let t0 = Instant::now(); 32 42 33 43 // set up a disk store we can spill to 34 - let disk_store = DiskStore::new(tmpfile).await?; 44 + let mut store = DiskStore::new(tmpfile).await?; 35 45 36 - // in this example we only bother handling CARs that are too big for memory 37 - // `noop` helper means: do no block processing, store the raw blocks 38 - let mut store = load_car(reader, disk_store).await?; 46 + let mut mem_blocks = HashMap::new(); 47 + let mut car = CarReader::new(reader).await?; 48 + // load some blocks into memory (idk why doing this helps repro) 49 + let mut n = 0; 50 + while let Some((cid, data)) = car.next_block().await? { 51 + mem_blocks.insert(cid, data); 52 + n += 1; 53 + if n >= 20_000 { 54 + log::info!("stopping for disk dump at {n}"); 55 + break; 56 + } 57 + } 58 + // dump mem blocks into the store 59 + for (k, v) in mem_blocks { 60 + store.put(k.to_bytes(), v)?; 61 + } 62 + // dump the rest to disk (in chunks) 63 + log::debug!("dumping the rest of the stream..."); 64 + while let Some((cid, data)) = car.next_block().await? { 65 + store.put(cid.to_bytes(), data)?; 66 + } 39 67 40 68 // at this point you might want to fetch the account's signing key 41 69 // via the DID from the commit, and then verify the signature. ··· 47 75 48 76 Ok(()) 49 77 } 78 + 79 + 80 + /// On-disk block storage 81 + pub struct DiskStore { 82 + #[allow(unused)] 83 + db: Database, 84 + ks: Keyspace, 85 + current: usize, 86 + all_keys: HashMap<Vec<u8>, usize>, 87 + } 88 + 89 + impl DiskStore { 90 + /// Initialize a new disk store 91 + pub async fn new( 92 + path: PathBuf, 93 + ) -> Result<Self, FjallError> { 94 + let db = Database::builder(path).open()?; 95 + let ks = db.keyspace("z", KeyspaceCreateOptions::default)?; 96 + 97 + Ok(Self { 98 + db, 99 + ks, 100 + current: 0, 101 + all_keys: Default::default(), 102 + }) 103 + } 104 + pub(crate) fn put(&mut self, k: Vec<u8>, v: Vec<u8>) -> Result<(), DriveError> { 105 + self.ks.insert(k.clone(), v)?; 106 + self.all_keys.insert(k.clone(), self.current); 107 + self.current += 1; 108 + Ok(()) 109 + } 110 + 111 + pub fn check_keys(&mut self) { 112 + log::info!("checking keys..."); 113 + for guard in self.ks.iter() { 114 + let key = guard.key().unwrap(); 115 + self.all_keys.remove(key.as_ref()); 116 + } 117 + if self.all_keys.len() == 0 { 118 + log::info!("all keys found."); 119 + } 120 + for (leftover, i) in &self.all_keys { 121 + log::warn!("fjall key missing: {i} {leftover:?}"); 122 + } 123 + } 124 + } 125 + 126 + 127 + 128 +
-60
src/disk.rs
··· 1 - use crate::drive::DriveError; 2 - use fjall::{Database, Keyspace, KeyspaceCreateOptions, Error as FjallError}; 3 - use std::path::PathBuf; 4 - use std::collections::HashMap; 5 - 6 - #[derive(Debug, thiserror::Error)] 7 - pub enum DiskError { 8 - /// A wrapped database error 9 - /// 10 - /// (The wrapped err should probably be obscured to remove public-facing 11 - /// sqlite bits) 12 - #[error(transparent)] 13 - DbError(#[from] FjallError), 14 - } 15 - 16 - /// On-disk block storage 17 - pub struct DiskStore { 18 - #[allow(unused)] 19 - db: Database, 20 - ks: Keyspace, 21 - current: usize, 22 - all_keys: HashMap<Vec<u8>, usize>, 23 - } 24 - 25 - impl DiskStore { 26 - /// Initialize a new disk store 27 - pub async fn new( 28 - path: PathBuf, 29 - ) -> Result<Self, DiskError> { 30 - let db = Database::builder(path).open()?; 31 - let ks = db.keyspace("z", KeyspaceCreateOptions::default)?; 32 - 33 - Ok(Self { 34 - db, 35 - ks, 36 - current: 0, 37 - all_keys: Default::default(), 38 - }) 39 - } 40 - pub(crate) fn put(&mut self, k: Vec<u8>, v: Vec<u8>) -> Result<(), DriveError> { 41 - self.ks.insert(k.clone(), v).map_err(DiskError::DbError)?; 42 - self.all_keys.insert(k.clone(), self.current); 43 - self.current += 1; 44 - Ok(()) 45 - } 46 - 47 - pub fn check_keys(&mut self) { 48 - log::info!("checking keys..."); 49 - for guard in self.ks.iter() { 50 - let key = guard.key().unwrap(); 51 - self.all_keys.remove(key.as_ref()); 52 - } 53 - if self.all_keys.len() == 0 { 54 - log::info!("all keys found."); 55 - } 56 - for (leftover, i) in &self.all_keys { 57 - log::warn!("fjall key missing: {i} {leftover:?}"); 58 - } 59 - } 60 - }
-52
src/drive.rs
··· 1 - //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 - 3 - use crate::disk::{DiskError, DiskStore}; 4 - use iroh_car::CarReader; 5 - use std::collections::HashMap; 6 - use tokio::io::AsyncRead; 7 - 8 - 9 - /// Errors that can happen while consuming and emitting blocks and records 10 - #[derive(Debug, thiserror::Error)] 11 - pub enum DriveError { 12 - #[error("Error from iroh_car: {0}")] 13 - CarReader(#[from] iroh_car::Error), 14 - #[error("Storage error")] 15 - StorageError(#[from] DiskError), 16 - } 17 - 18 - 19 - pub async fn load_car<R: AsyncRead + Unpin>( 20 - reader: R, 21 - mut store: DiskStore, 22 - ) -> Result<DiskStore, DriveError> { 23 - let mut mem_blocks = HashMap::new(); 24 - 25 - let mut car = CarReader::new(reader).await?; 26 - 27 - // try to load all the blocks into memory 28 - let mut n = 0; 29 - 30 - while let Some((cid, data)) = car.next_block().await? { 31 - mem_blocks.insert(cid, data); 32 - n += 1; 33 - if n >= 20_000 { 34 - log::info!("stopping for disk dump at {n}"); 35 - break; 36 - } 37 - } 38 - 39 - // dump mem blocks into the store 40 - for (k, v) in mem_blocks { 41 - store.put(k.to_bytes(), v)?; 42 - } 43 - 44 - // dump the rest to disk (in chunks) 45 - log::debug!("dumping the rest of the stream..."); 46 - while let Some((cid, data)) = car.next_block().await? { 47 - store.put(cid.to_bytes(), data)?; 48 - } 49 - log::debug!("done."); 50 - Ok(store) 51 - 52 - }
-6
src/lib.rs
··· 73 73 Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 74 74 75 75 */ 76 - 77 - pub mod disk; 78 - pub mod drive; 79 - 80 - pub use disk::{DiskError, DiskStore}; 81 - pub use drive::{DriveError, load_car};