Fast and robust atproto CAR file processing in rust

cleanup: store and processable for usize

Changed files
+49 -111
benches
examples
disk-read-file
read-file
src
tests
+1 -12
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::drive::Processable; 3 - use serde::{Deserialize, Serialize}; 4 2 use std::path::{Path, PathBuf}; 5 3 6 4 use criterion::{Criterion, criterion_group, criterion_main}; 7 - 8 - #[derive(Clone, Serialize, Deserialize)] 9 - struct S(usize); 10 - 11 - impl Processable for S { 12 - fn get_size(&self) -> usize { 13 - 0 // no additional space taken, just its stack size (newtype is free) 14 - } 15 - } 16 5 17 6 pub fn criterion_benchmark(c: &mut Criterion) { 18 7 let rt = tokio::runtime::Builder::new_multi_thread() ··· 34 23 35 24 let mb = 2_usize.pow(20); 36 25 37 - let mut driver = match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024 * mb) 26 + let mut driver = match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * mb) 38 27 .await 39 28 .unwrap() 40 29 {
+1 -12
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 2 3 3 use criterion::{Criterion, criterion_group, criterion_main}; 4 - use repo_stream::drive::Processable; 5 - use serde::{Deserialize, Serialize}; 6 4 7 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 9 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 10 - 11 - #[derive(Clone, Serialize, Deserialize)] 12 - struct S(usize); 13 - 14 - impl Processable for S { 15 - fn get_size(&self) -> usize { 16 - 0 // no additional space taken, just its stack size (newtype is free) 17 - } 18 - } 19 8 20 9 pub fn criterion_benchmark(c: &mut Criterion) { 21 10 let rt = tokio::runtime::Builder::new_multi_thread() ··· 36 25 37 26 async fn drive_car(bytes: &[u8]) -> usize { 38 27 let mut driver = 39 - match repo_stream::drive::load_car(bytes, |block| S(block.len()), 32 * 2_usize.pow(20)) 28 + match repo_stream::drive::load_car(bytes, |block| block.len(), 32 * 2_usize.pow(20)) 40 29 .await 41 30 .unwrap() 42 31 {
+2 -14
examples/disk-read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 - use repo_stream::drive::Processable; 4 - use serde::{Deserialize, Serialize}; 5 3 use std::path::PathBuf; 6 4 7 5 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 14 12 tmpfile: PathBuf, 15 13 } 16 14 17 - #[derive(Clone, Serialize, Deserialize)] 18 - struct S(usize); 19 - 20 - impl Processable for S { 21 - fn get_size(&self) -> usize { 22 - 0 // no additional space taken, just its stack size (newtype is free) 23 - } 24 - } 25 - 26 15 #[tokio::main] 27 16 async fn main() -> Result<()> { 28 17 env_logger::init(); ··· 36 25 37 26 let limit_mb = 32; 38 27 39 - let driver = match repo_stream::drive::load_car(reader, |block| S(block.len()), 10 * mb).await? 40 - { 28 + let driver = match repo_stream::drive::load_car(reader, |block| block.len(), 10 * mb).await? { 41 29 repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 42 30 repo_stream::drive::Vehicle::Big(big_stuff) => { 43 - let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb); 31 + let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb).await?; 44 32 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 45 33 log::warn!("big: {:?}", commit); 46 34 driver
+1 -12
examples/read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 - use repo_stream::drive::Processable; 4 - use serde::{Deserialize, Serialize}; 5 3 use std::path::PathBuf; 6 4 7 5 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 12 10 file: PathBuf, 13 11 } 14 12 15 - #[derive(Clone, Serialize, Deserialize)] 16 - struct S(usize); 17 - 18 - impl Processable for S { 19 - fn get_size(&self) -> usize { 20 - 0 // no additional space taken, just its stack size (newtype is free) 21 - } 22 - } 23 - 24 13 #[tokio::main] 25 14 async fn main() -> Result<()> { 26 15 env_logger::init(); ··· 30 19 let reader = tokio::io::BufReader::new(reader); 31 20 32 21 let (commit, mut driver) = 33 - match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024 * 1024).await? { 22 + match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * 1024).await? { 34 23 repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver), 35 24 repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"), 36 25 };
+9 -21
src/disk.rs
··· 3 3 use std::path::PathBuf; 4 4 5 5 pub struct SqliteStore { 6 - path: PathBuf, 7 - limit_mb: usize, 6 + conn: rusqlite::Connection, 8 7 } 9 8 10 9 impl SqliteStore { 11 - pub fn new(path: PathBuf, limit_mb: usize) -> Self { 12 - Self { path, limit_mb } 13 - } 14 - } 15 - 16 - impl SqliteStore { 17 - pub async fn get_access(&mut self) -> Result<SqliteAccess, rusqlite::Error> { 18 - let path = self.path.clone(); 19 - let limit_mb = self.limit_mb; 10 + pub async fn new(path: PathBuf, cache_mb: usize) -> Result<Self, rusqlite::Error> { 20 11 let conn = tokio::task::spawn_blocking(move || { 21 12 let conn = rusqlite::Connection::open(path)?; 22 13 23 - let sq_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 14 + let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 24 15 25 16 // conn.pragma_update(None, "journal_mode", "OFF")?; 26 17 // conn.pragma_update(None, "journal_mode", "MEMORY")?; 27 18 conn.pragma_update(None, "journal_mode", "WAL")?; 28 19 // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 29 20 conn.pragma_update(None, "synchronous", "OFF")?; 30 - conn.pragma_update(None, "cache_size", (limit_mb as i64 * sq_mb).to_string())?; 21 + conn.pragma_update( 22 + None, 23 + "cache_size", 24 + (cache_mb as i64 * sqlite_one_mb).to_string(), 25 + )?; 31 26 conn.execute( 32 27 "CREATE TABLE blocks ( 33 28 key BLOB PRIMARY KEY NOT NULL, ··· 41 36 .await 42 37 .expect("join error")?; 43 38 44 - Ok(SqliteAccess { conn }) 39 + Ok(Self { conn }) 45 40 } 46 - } 47 - 48 - pub struct SqliteAccess { 49 - conn: rusqlite::Connection, 50 - } 51 - 52 - impl SqliteAccess { 53 41 pub fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, rusqlite::Error> { 54 42 let tx = self.conn.transaction()?; 55 43 // let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?;
+18 -26
src/drive.rs
··· 1 1 //! Consume an MST block stream, producing an ordered stream of records 2 2 3 - use crate::disk::{SqliteAccess, SqliteStore}; 3 + use crate::disk::SqliteStore; 4 + use crate::process::Processable; 4 5 use ipld_core::cid::Cid; 5 6 use iroh_car::CarReader; 6 - use serde::de::DeserializeOwned; 7 7 use serde::{Deserialize, Serialize}; 8 8 use std::collections::HashMap; 9 9 use std::convert::Infallible; ··· 43 43 BincodeDecodeError(#[from] bincode::error::DecodeError), 44 44 #[error("extra bytes remained after decoding")] 45 45 ExtraGarbage, 46 - } 47 - 48 - pub trait Processable: Clone + Serialize + DeserializeOwned { 49 - /// the additional size taken up (not including its mem::size_of) 50 - fn get_size(&self) -> usize; 51 46 } 52 47 53 48 #[derive(Debug, Clone, Serialize, Deserialize)] ··· 191 186 mut self, 192 187 mut store: SqliteStore, 193 188 ) -> Result<(Commit, BigCarReady<T>), DriveError> { 194 - // set up access for real 195 - let mut access = store.get_access().await?; 196 - 197 - // move access in and back out so we can manage lifetimes 189 + // move store in and back out so we can manage lifetimes 198 190 // dump mem blocks into the store 199 - access = tokio::task::spawn(async move { 200 - let mut writer = access.get_writer()?; 191 + store = tokio::task::spawn(async move { 192 + let mut writer = store.get_writer()?; 201 193 202 194 let kvs = self 203 195 .mem_blocks ··· 206 198 207 199 writer.put_many(kvs)?; 208 200 writer.commit()?; 209 - Ok::<_, DriveError>(access) 201 + Ok::<_, DriveError>(store) 210 202 }) 211 203 .await??; 212 204 213 205 let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 214 206 215 - let access_worker = tokio::task::spawn_blocking(move || { 216 - let mut writer = access.get_writer()?; 207 + let store_worker = tokio::task::spawn_blocking(move || { 208 + let mut writer = store.get_writer()?; 217 209 218 210 while let Some(chunk) = rx.blocking_recv() { 219 211 let kvs = chunk ··· 223 215 } 224 216 225 217 writer.commit()?; 226 - Ok::<_, DriveError>(access) 218 + Ok::<_, DriveError>(store) 227 219 }); // await later 228 220 229 221 // dump the rest to disk (in chunks) ··· 267 259 drop(tx); 268 260 log::debug!("done. waiting for worker to finish..."); 269 261 270 - access = access_worker.await??; 262 + store = store_worker.await??; 271 263 272 264 log::debug!("worker finished."); 273 265 ··· 279 271 commit, 280 272 BigCarReady { 281 273 process: self.process, 282 - access, 274 + store, 283 275 walker, 284 276 }, 285 277 )) ··· 288 280 289 281 pub struct BigCarReady<T: Clone> { 290 282 process: fn(Vec<u8>) -> T, 291 - access: SqliteAccess, 283 + store: SqliteStore, 292 284 walker: Walker, 293 285 } 294 286 ··· 299 291 ) -> Result<(Self, Option<Vec<(String, T)>>), DriveError> { 300 292 let mut out = Vec::with_capacity(n); 301 293 (self, out) = tokio::task::spawn_blocking(move || { 302 - let access = self.access; 303 - let mut reader = access.get_reader()?; 294 + let store = self.store; 295 + let mut reader = store.get_reader()?; 304 296 305 297 for _ in 0..n { 306 298 // walk as far as we can until we run out of blocks or find a record ··· 314 306 }; 315 307 } 316 308 317 - drop(reader); // cannot outlive access 318 - self.access = access; 309 + drop(reader); // cannot outlive store 310 + self.store = store; 319 311 Ok::<_, DriveError>((self, out)) 320 312 }) 321 313 .await??; ··· 343 335 // ...should we return the join handle here so the caller at least knows about it? 344 336 // yes probably for error handling?? (orrr put errors in the channel) 345 337 let worker = tokio::task::spawn_blocking(move || { 346 - let mut reader = self.access.get_reader()?; 338 + let mut reader = self.store.get_reader()?; 347 339 348 340 loop { 349 341 let mut out = Vec::with_capacity(n); ··· 367 359 .map_err(|_| DriveError::ChannelSendError)?; 368 360 } 369 361 370 - drop(reader); // cannot outlive access 362 + drop(reader); // cannot outlive store 371 363 Ok(()) 372 364 }); // await later 373 365
+1
src/lib.rs
··· 5 5 pub mod disk; 6 6 pub mod drive; 7 7 pub mod mst; 8 + pub mod process; 8 9 pub mod walk;
+12
src/process.rs
··· 1 + use serde::{Serialize, de::DeserializeOwned}; 2 + 3 + pub trait Processable: Clone + Serialize + DeserializeOwned { 4 + /// the additional size taken up (not including its mem::size_of) 5 + fn get_size(&self) -> usize; 6 + } 7 + 8 + impl Processable for usize { 9 + fn get_size(&self) -> usize { 10 + 0 // no additional space taken, just its stack size (newtype is free) 11 + } 12 + }
+2 -1
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 3 use crate::disk::SqliteReader; 4 - use crate::drive::{DecodeError, MaybeProcessedBlock, Processable}; 4 + use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 5 use crate::mst::Node; 6 + use crate::process::Processable; 6 7 use ipld_core::cid::Cid; 7 8 use sha2::{Digest, Sha256}; 8 9 use std::collections::HashMap;
+2 -13
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::drive::Processable; 3 - use serde::{Deserialize, Serialize}; 4 2 5 3 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 6 4 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 7 5 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 8 6 9 - #[derive(Clone, Serialize, Deserialize)] 10 - struct S(usize); 11 - 12 - impl Processable for S { 13 - fn get_size(&self) -> usize { 14 - 0 // no additional space taken, just its stack size (newtype is free) 15 - } 16 - } 17 - 18 7 async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 19 8 let mb = 2_usize.pow(20); 20 9 21 - let mut driver = match repo_stream::drive::load_car(bytes, |block| S(block.len()), 10 * mb) 10 + let mut driver = match repo_stream::drive::load_car(bytes, |block| block.len(), 10 * mb) 22 11 .await 23 12 .unwrap() 24 13 { ··· 32 21 let mut prev_rkey = "".to_string(); 33 22 34 23 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 35 - for (rkey, S(size)) in pairs { 24 + for (rkey, size) in pairs { 36 25 records += 1; 37 26 sum += size; 38 27 if rkey == "app.bsky.actor.profile/self" {