Fast and robust atproto CAR file processing in rust

we are async

Changed files
+21 -24
src
+2 -2
src/disk_drive.rs
··· 26 26 } 27 27 28 28 pub trait BlockStore<MPB: Serialize + DeserializeOwned> { 29 - fn put_batch(&self, blocks: Vec<(Cid, MPB)>); // unwraps for now 29 + fn put_batch(&self, blocks: Vec<(Cid, MPB)>) -> impl std::future::Future<Output = ()> + Send; // unwraps for now 30 30 fn get(&self, key: Cid) -> Option<MPB>; 31 31 } 32 32 ··· 103 103 to_insert.push((cid, data)); 104 104 } 105 105 } 106 - block_store.put_batch(to_insert) 106 + block_store.put_batch(to_insert).await; 107 107 } 108 108 109 109 log::warn!("init: got commit?");
+19 -22
src/disk_redb.rs
··· 1 1 use crate::disk_drive::BlockStore; 2 2 use ipld_core::cid::Cid; 3 3 use redb::{Database, Durability, Error, ReadableDatabase, TableDefinition}; 4 - use serde::{Serialize, de::DeserializeOwned}; 5 4 use std::path::Path; 5 + use std::sync::Arc; 6 6 7 7 const TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blocks"); 8 8 9 9 pub struct RedbStore { 10 10 #[allow(dead_code)] 11 - db: Database, 11 + db: Arc<Database>, 12 12 } 13 13 14 14 impl RedbStore { ··· 16 16 log::warn!("redb new"); 17 17 let db = Database::create(path)?; 18 18 log::warn!("db created"); 19 - Ok(Self { db }) 19 + Ok(Self { db: db.into() }) 20 20 } 21 21 } 22 22 ··· 29 29 } 30 30 } 31 31 32 - impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for RedbStore { 33 - fn put_batch(&self, blocks: Vec<(Cid, MPB)>) { 34 - let mut tx = self.db.begin_write().unwrap(); 35 - tx.set_durability(Durability::None).unwrap(); 32 + impl BlockStore<Vec<u8>> for RedbStore { 33 + async fn put_batch(&self, blocks: Vec<(Cid, Vec<u8>)>) { 34 + let db = self.db.clone(); 35 + tokio::task::spawn_blocking(move || { 36 + let mut tx = db.begin_write().unwrap(); 37 + tx.set_durability(Durability::None).unwrap(); 36 38 37 - { 38 - let mut table = tx.open_table(TABLE).unwrap(); 39 - for (cid, t) in blocks { 40 - let key_bytes = cid.to_bytes(); 41 - let val_bytes = 42 - bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 43 - table.insert(&*key_bytes, &*val_bytes).unwrap(); 39 + { 40 + let mut table = tx.open_table(TABLE).unwrap(); 41 + for (cid, t) in blocks { 42 + let key_bytes = cid.to_bytes(); 43 + table.insert(&*key_bytes, &*t).unwrap(); 44 + } 44 45 } 45 - } 46 46 47 - tx.commit().unwrap(); 47 + tx.commit().unwrap(); 48 + }).await.unwrap(); 48 49 } 49 50 50 - fn get(&self, c: Cid) -> Option<MPB> { 51 + fn get(&self, c: Cid) -> Option<Vec<u8>> { 51 52 let key_bytes = c.to_bytes(); 52 53 let tx = self.db.begin_read().unwrap(); 53 54 let table = tx.open_table(TABLE).unwrap(); 54 - let maybe_val_bytes = table.get(&*key_bytes).unwrap()?; 55 - let (t, n): (MPB, usize) = 56 - bincode::serde::decode_from_slice(maybe_val_bytes.value(), bincode::config::standard()) 57 - .unwrap(); 58 - assert_eq!(maybe_val_bytes.value().len(), n); 55 + let t = table.get(&*key_bytes).unwrap()?.value().to_vec(); 59 56 Some(t) 60 57 } 61 58 }