Fast and robust atproto CAR file processing in rust

+redb

Changed files
+79 -34
examples
disk-read-file
src
+2 -2
examples/disk-read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 - use repo_stream::disk::SqliteStore; 3 + use repo_stream::disk::RedbStore; 4 4 use repo_stream::drive::Processable; 5 5 use serde::{Deserialize, Serialize}; 6 6 use std::path::PathBuf; ··· 32 32 match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024).await? { 33 33 repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 34 34 repo_stream::drive::Vehicle::Big(big_stuff) => { 35 - let disk_store = SqliteStore::new(tmpfile); 35 + let disk_store = RedbStore::new(tmpfile); 36 36 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 37 37 log::warn!("big: {:?}", commit); 38 38 driver
+77 -32
src/disk.rs
··· 1 + use redb::ReadableDatabase; 1 2 use rusqlite::OptionalExtension; 2 3 use std::error::Error; 3 4 use std::path::PathBuf; ··· 36 37 fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, Self::StorageError>; 37 38 } 38 39 39 - ///////////////// 40 + ///////////////// sqlite 40 41 41 42 pub struct SqliteStore { 42 43 path: PathBuf, ··· 122 123 } 123 124 } 124 125 125 - // /// The main storage interface for MST blocks 126 - // /// 127 - // /// **Note**: `get` and `put` are **synchronous methods that may block** 128 - // pub trait BlockStore<T: Clone> { 129 - // fn get(&self, cid: Cid) -> Option<MaybeProcessedBlock<T>>; 130 - // fn put(&mut self, cid: Cid, mpb: MaybeProcessedBlock<T>); 131 - // } 126 + //////////// redb why not 127 + 128 + const REDB_TABLE: redb::TableDefinition<&[u8], &[u8]> = redb::TableDefinition::new("blocks"); 129 + 130 + pub struct RedbStore { 131 + path: PathBuf, 132 + } 133 + 134 + impl RedbStore { 135 + pub fn new(path: PathBuf) -> Self { 136 + Self { path } 137 + } 138 + } 139 + 140 + impl StorageErrorBase for redb::Error {} 141 + 142 + impl DiskStore for RedbStore { 143 + type StorageError = redb::Error; 144 + type Access = RedbAccess; 145 + async fn get_access(&mut self) -> Result<RedbAccess, redb::Error> { 146 + let path = self.path.clone(); 147 + let db = tokio::task::spawn_blocking(move || { 148 + let db = redb::Database::create(path)?; 149 + Ok::<_, Self::StorageError>(db) 150 + }) 151 + .await 152 + .expect("join error")?; 153 + 154 + Ok(RedbAccess { db }) 155 + } 156 + } 157 + 158 + pub struct RedbAccess { 159 + db: redb::Database, 160 + } 132 161 133 - // ///// wheee 162 + impl DiskAccess for RedbAccess { 163 + type StorageError = redb::Error; 164 + fn get_writer(&mut self) -> Result<impl DiskWriter<redb::Error>, redb::Error> { 165 + let mut tx = self.db.begin_write()?; 166 + tx.set_durability(redb::Durability::None)?; 167 + Ok(RedbWriter { tx: Some(tx) }) 168 + } 169 + fn get_reader(&self) -> Result<impl DiskReader<StorageError = redb::Error>, redb::Error> { 170 + let tx = self.db.begin_read()?; 171 + Ok(RedbReader { tx }) 172 + } 173 + } 134 174 135 - // /// In-memory MST block storage 136 - // /// 137 - // /// a thin wrapper around a hashmap 138 - // pub struct MemoryStore<T: Clone> { 139 - // map: HashMap<Cid, MaybeProcessedBlock<T>>, 140 - // } 175 + pub struct RedbWriter { 176 + tx: Option<redb::WriteTransaction>, 177 + } 141 178 142 - // impl<T: Clone> BlockStore<T> for MemoryStore<T> { 143 - // fn get(&self, cid: Cid) -> Option<MaybeProcessedBlock<T>> { 144 - // self.map.get(&cid).map(|t| t.clone()) 145 - // } 146 - // fn put(&mut self, cid: Cid, mpb: MaybeProcessedBlock<T>) { 147 - // self.map.insert(cid, mpb); 148 - // } 149 - // } 179 + impl DiskWriter<redb::Error> for RedbWriter { 180 + fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), redb::Error> { 181 + let mut table = self.tx.as_ref().unwrap().open_table(REDB_TABLE)?; 182 + table.insert(&*key, &*val)?; 183 + Ok(()) 184 + } 185 + } 150 186 151 - // //// the fun bits 187 + /// oops careful in async 188 + impl Drop for RedbWriter { 189 + fn drop(&mut self) { 190 + let tx = self.tx.take(); 191 + tx.unwrap().commit().unwrap(); 192 + } 193 + } 152 194 153 - // pub struct HybridStore<T: Clone, D: DiskStore> { 154 - // mem: MemoryStore<T>, 155 - // disk: D, 156 - // } 195 + pub struct RedbReader { 196 + tx: redb::ReadTransaction, 197 + } 157 198 158 - // impl<T: Clone, D: DiskStore> BlockStore<T> for HybridStore<T, D> { 159 - // fn get(&self, _cid: Cid) -> Option<MaybeProcessedBlock<T>> { todo!() } 160 - // fn put(&mut self, _cid: Cid, _mpb: MaybeProcessedBlock<T>) { todo!() } 161 - // } 199 + impl DiskReader for RedbReader { 200 + type StorageError = redb::Error; 201 + fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, redb::Error> { 202 + let table = self.tx.open_table(REDB_TABLE)?; 203 + let rv = table.get(&*key)?.map(|guard| guard.value().to_vec()); 204 + Ok(rv) 205 + } 206 + }