Fast and robust atproto CAR file processing in rust

whatever

Changed files
+36 -20
examples
disk-read-file
src
+5 -3
examples/disk-read-file/main.rs
··· 34 34 // let kb = 2_usize.pow(10); 35 35 let mb = 2_usize.pow(20); 36 36 37 + let limit_mb = 32; 38 + 37 39 let mut driver = 38 - match repo_stream::drive::load_car(reader, |block| S(block.len()), 5 * mb).await? { 40 + match repo_stream::drive::load_car(reader, |block| S(block.len()), 10 * mb).await? { 39 41 repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 40 42 repo_stream::drive::Vehicle::Big(big_stuff) => { 41 - let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone()); 42 - // let disk_store = repo_stream::disk::RedbStore::new(tmpfile.clone()); 43 + let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb); 44 + // let disk_store = repo_stream::disk::RedbStore::new(tmpfile.clone(), limit_mb); 43 45 // let disk_store = repo_stream::disk::RustcaskStore::new(tmpfile.clone()); 44 46 // let disk_store = repo_stream::disk::HeedStore::new(tmpfile.clone()); 45 47 let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
+31 -17
src/disk.rs
··· 42 42 43 43 pub struct SqliteStore { 44 44 path: PathBuf, 45 + limit_mb: usize, 45 46 } 46 47 47 48 impl SqliteStore { 48 - pub fn new(path: PathBuf) -> Self { 49 - Self { path } 49 + pub fn new(path: PathBuf, limit_mb: usize) -> Self { 50 + Self { path, limit_mb } 50 51 } 51 52 } 52 53 ··· 57 58 type Access = SqliteAccess; 58 59 async fn get_access(&mut self) -> Result<SqliteAccess, rusqlite::Error> { 59 60 let path = self.path.clone(); 61 + let limit_mb = self.limit_mb; 60 62 let conn = tokio::task::spawn_blocking(move || { 61 63 let conn = rusqlite::Connection::open(path)?; 62 64 ··· 65 67 // conn.pragma_update(None, "journal_mode", "OFF")?; 66 68 // conn.pragma_update(None, "journal_mode", "MEMORY")?; 67 69 conn.pragma_update(None, "journal_mode", "WAL")?; 70 + // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 68 71 conn.pragma_update(None, "synchronous", "OFF")?; 69 - conn.pragma_update(None, "cache_size", (5 * sq_mb).to_string())?; 72 + conn.pragma_update(None, "cache_size", (limit_mb as i64 * sq_mb).to_string())?; 70 73 conn.execute( 71 74 "CREATE TABLE blocks ( 72 75 key BLOB PRIMARY KEY NOT NULL, ··· 151 154 152 155 pub struct RedbStore { 153 156 path: PathBuf, 157 + limit_mb: usize, 154 158 } 155 159 156 160 impl RedbStore { 157 - pub fn new(path: PathBuf) -> Self { 158 - Self { path } 161 + pub fn new(path: PathBuf, limit_mb: usize) -> Self { 162 + Self { path, limit_mb } 159 163 } 160 164 } 161 165 ··· 166 170 type Access = RedbAccess; 167 171 async fn get_access(&mut self) -> Result<RedbAccess, redb::Error> { 168 172 let path = self.path.clone(); 173 + let limit_mb = self.limit_mb; 169 174 let mb = 2_usize.pow(20); 170 175 let db = tokio::task::spawn_blocking(move || { 171 176 let db = redb::Database::builder() 172 - .set_cache_size(5 * mb) 177 + .set_cache_size(limit_mb * mb) 173 178 .create(path)?; 174 179 Ok::<_, Self::StorageError>(db) 175 180 }) ··· 207 212 table.insert(&*key, &*val)?; 208 213 Ok(()) 209 214 } 210 - fn put_many(&mut self, kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<(), redb::Error> { 215 + fn put_many( 216 + &mut self, 217 + kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>, 218 + ) -> Result<(), redb::Error> { 211 219 let mut table = self.tx.as_ref().unwrap().open_table(REDB_TABLE)?; 212 220 for (k, v) in kv { 213 221 table.insert(&*k, &*v)?; ··· 287 295 impl DiskAccess for RustcaskAccess { 288 296 type StorageError = CaskError; 289 297 fn get_writer(&mut self) -> Result<impl DiskWriter<CaskError>, CaskError> { 290 - Ok(RustcaskWriter { db: self.db.clone() }) 298 + Ok(RustcaskWriter { 299 + db: self.db.clone(), 300 + }) 291 301 } 292 302 fn get_reader(&self) -> Result<impl DiskReader<StorageError = CaskError>, CaskError> { 293 - Ok(RustcaskReader { db: self.db.clone() }) 303 + Ok(RustcaskReader { 304 + db: self.db.clone(), 305 + }) 294 306 } 295 307 } 296 308 ··· 324 336 } 325 337 } 326 338 327 - 328 339 ///////// heeeeeeeeeeeeed 329 340 330 341 type HeedBytes = heed::types::SerdeBincode<Vec<u8>>; ··· 352 363 std::fs::create_dir_all(&path).unwrap(); 353 364 let env = unsafe { 354 365 heed::EnvOpenOptions::new() 355 - .map_size(1 * 2_usize.pow(30)) 366 + .map_size(2 * 2_usize.pow(30)) 356 367 .open(path)? 357 368 }; 358 369 Ok::<_, Self::StorageError>(env) ··· 374 385 fn get_writer(&mut self) -> Result<impl DiskWriter<heed::Error>, heed::Error> { 375 386 let mut tx = self.env.write_txn()?; 376 387 let db = self.env.create_database(&mut tx, None)?; 377 - self.db = Some(db.clone()); 388 + self.db = Some(db); 378 389 Ok(HeedWriter { tx: Some(tx), db }) 379 390 } 380 391 fn get_reader(&self) -> Result<impl DiskReader<StorageError = heed::Error>, heed::Error> { ··· 391 402 392 403 impl DiskWriter<heed::Error> for HeedWriter<'_> { 393 404 fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), heed::Error> { 394 - let mut tx = self.tx.as_mut().unwrap(); 395 - self.db.put(&mut tx, &key, &val)?; 405 + let tx = self.tx.as_mut().unwrap(); 406 + self.db.put(tx, &key, &val)?; 396 407 Ok(()) 397 408 } 398 - fn put_many(&mut self, kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<(), heed::Error> { 399 - let mut tx = self.tx.as_mut().unwrap(); 409 + fn put_many( 410 + &mut self, 411 + kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>, 412 + ) -> Result<(), heed::Error> { 413 + let tx = self.tx.as_mut().unwrap(); 400 414 for (k, v) in kv { 401 - self.db.put(&mut tx, &k, &v)?; 415 + self.db.put(tx, &k, &v)?; 402 416 } 403 417 Ok(()) 404 418 }