Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+981 -576
benches
car-samples
examples
disk-read-file
read-file
src
tests
+1 -1
Cargo.lock
··· 1024 1024 1025 1025 [[package]] 1026 1026 name = "repo-stream" 1027 - version = "0.1.1" 1027 + version = "0.2.2" 1028 1028 dependencies = [ 1029 1029 "bincode", 1030 1030 "clap",
+2 -2
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.1.1" 3 + version = "0.2.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 - description = "Fast and robust atproto CAR file processing in rust" 6 + description = "A robust CAR file -> MST walker for atproto" 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies]
+12 -21
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 use std::path::{Path, PathBuf}; 6 4 7 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 20 18 }); 21 19 } 22 20 23 - async fn drive_car(filename: impl AsRef<Path>) { 21 + async fn drive_car(filename: impl AsRef<Path>) -> usize { 24 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 25 23 let reader = tokio::io::BufReader::new(reader); 26 - let reader = CarReader::new(reader).await.unwrap(); 27 24 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root") 25 + let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 + .await 33 27 .unwrap() 34 - .clone(); 35 - 36 - let stream = std::pin::pin!(reader.stream()); 37 - 38 - let (_commit, v) = 39 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 40 - .await 41 - .unwrap(); 42 - let mut record_stream = std::pin::pin!(v.stream()); 28 + { 29 + Driver::Memory(_, mem_driver) => mem_driver, 30 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 31 + }; 43 32 44 - while let Some(_) = record_stream.try_next().await.unwrap() { 45 - // just here for the drive 33 + let mut n = 0; 34 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 35 + n += pairs.len(); 46 36 } 37 + n 47 38 } 48 39 49 40 criterion_group!(benches, criterion_benchmark);
+16 -22
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 6 4 use criterion::{Criterion, criterion_group, criterion_main}; 7 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 8 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 9 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 10 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 15 14 .build() 16 15 .expect("Creating runtime failed"); 17 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 18 20 c.bench_function("tiny-car", |b| { 19 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 20 22 }); ··· 26 28 }); 27 29 } 28 30 29 - async fn drive_car(bytes: &[u8]) { 30 - let reader = CarReader::new(bytes).await.unwrap(); 31 - 32 - let root = reader 33 - .header() 34 - .roots() 35 - .first() 36 - .ok_or("missing root") 31 + async fn drive_car(bytes: &[u8]) -> usize { 32 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 + .await 37 34 .unwrap() 38 - .clone(); 39 - 40 - let stream = std::pin::pin!(reader.stream()); 35 + { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not benching big cars here"), 38 + }; 41 39 42 - let (_commit, v) = 43 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 44 - .await 45 - .unwrap(); 46 - let mut record_stream = std::pin::pin!(v.stream()); 47 - 48 - while let Some(_) = record_stream.try_next().await.unwrap() { 49 - // just here for the drive 40 + let mut n = 0; 41 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 42 + n += pairs.len(); 50 43 } 44 + n 51 45 } 52 46 53 47 criterion_group!(benches, criterion_benchmark);
car-samples/empty.car

This is a binary file and will not be displayed.

+57 -34
examples/disk-read-file/main.rs
··· 1 + /*! 2 + Read a CAR file by spilling to disk 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 - use repo_stream::drive::Processable; 4 - use serde::{Deserialize, Serialize}; 7 + use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 5 8 use std::path::PathBuf; 6 - 7 - type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + use std::time::Instant; 8 10 9 11 #[derive(Debug, Parser)] 10 12 struct Args { ··· 14 16 tmpfile: PathBuf, 15 17 } 16 18 17 - #[derive(Clone, Serialize, Deserialize)] 18 - struct S(usize); 19 - 20 - impl Processable for S { 21 - fn get_size(&self) -> usize { 22 - 0 // no additional space taken, just its stack size (newtype is free) 23 - } 24 - } 25 - 26 19 #[tokio::main] 27 - async fn main() -> Result<()> { 20 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 28 21 env_logger::init(); 29 22 30 23 let Args { car, tmpfile } = Args::parse(); 24 + 25 + // repo-stream takes an AsyncRead as input. wrapping a filesystem read in 26 + // BufReader can provide a really significant performance win. 31 27 let reader = tokio::fs::File::open(car).await?; 32 28 let reader = tokio::io::BufReader::new(reader); 33 29 34 - // let kb = 2_usize.pow(10); 35 - let mb = 2_usize.pow(20); 30 + log::info!("hello! reading the car..."); 31 + let t0 = Instant::now(); 32 + 33 + // in this example we only bother handling CARs that are too big for memory 34 + // `noop` helper means: do no block processing, store the raw blocks 35 + let driver = match DriverBuilder::new() 36 + .with_mem_limit_mb(10) // how much memory can be used before disk spill 37 + .load_car(reader) 38 + .await? 39 + { 40 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 41 + Driver::Disk(big_stuff) => { 42 + // we reach here if the repo was too big and needs to be spilled to 43 + // disk to continue 36 44 37 - let limit_mb = 32; 45 + // set up a disk store we can spill to 46 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 38 47 39 - let driver = match repo_stream::drive::load_car(reader, |block| S(block.len()), 10 * mb).await? 40 - { 41 - repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 42 - repo_stream::drive::Vehicle::Big(big_stuff) => { 43 - let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb); 48 + // do the spilling, get back a (similar) driver 44 49 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 45 - log::warn!("big: {:?}", commit); 50 + 51 + // at this point you might want to fetch the account's signing key 52 + // via the DID from the commit, and then verify the signature. 53 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 54 + 55 + // pop the driver back out to get some code indentation relief 46 56 driver 47 57 } 48 58 }; 49 59 60 + // collect some random stats about the blocks 50 61 let mut n = 0; 51 - let (mut rx, worker) = driver.rx(512).await?; 62 + let mut zeros = 0; 63 + 64 + log::info!("walking..."); 65 + 66 + // this example uses the disk driver's channel mode: the tree walking is 67 + // spawned onto a blocking thread, and we get chunks of rkey+blocks back 68 + let (mut rx, join) = driver.to_channel(512); 69 + while let Some(r) = rx.recv().await { 70 + let pairs = r?; 52 71 53 - log::debug!("walking..."); 54 - while let Some(pairs) = rx.recv().await { 72 + // keep a count of the total number of blocks seen 55 73 n += pairs.len(); 74 + 75 + for (_, block) in pairs { 76 + // for each block, count how many bytes are equal to '0' 77 + // (this is just an example, you probably want to do something more 78 + // interesting) 79 + zeros += block.into_iter().filter(|&b| b == b'0').count() 80 + } 56 81 } 57 - log::debug!("done walking! joining..."); 58 - 59 - worker.await.unwrap().unwrap(); 60 82 61 - log::debug!("joined."); 83 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 62 84 63 - // log::info!("now is the time to check mem..."); 64 - // tokio::time::sleep(std::time::Duration::from_secs(22)).await; 65 - log::info!("bye! {n}"); 85 + // clean up the database. would be nice to do this in drop so it happens 86 + // automatically, but some blocking work happens, so that's not allowed in 87 + // async rust. ๐Ÿคทโ€โ™€๏ธ 88 + join.await?.reset_store().await?; 66 89 67 - std::fs::remove_file(tmpfile).unwrap(); // need to also remove -shm -wal 90 + log::info!("done. n={n} zeros={zeros}"); 68 91 69 92 Ok(()) 70 93 }
+14 -17
examples/read-file/main.rs
··· 1 + /*! 2 + Read a CAR file with in-memory processing 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 - use repo_stream::drive::Processable; 4 - use serde::{Deserialize, Serialize}; 7 + use repo_stream::{Driver, DriverBuilder}; 5 8 use std::path::PathBuf; 6 9 7 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 10 13 struct Args { 11 14 #[arg()] 12 15 file: PathBuf, 13 - } 14 - 15 - #[derive(Clone, Serialize, Deserialize)] 16 - struct S(usize); 17 - 18 - impl Processable for S { 19 - fn get_size(&self) -> usize { 20 - 0 // no additional space taken, just its stack size (newtype is free) 21 - } 22 16 } 23 17 24 18 #[tokio::main] ··· 29 23 let reader = tokio::fs::File::open(file).await?; 30 24 let reader = tokio::io::BufReader::new(reader); 31 25 32 - let (commit, mut driver) = 33 - match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024 * 1024).await? { 34 - repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver), 35 - repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"), 36 - }; 26 + let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len()) 28 + .load_car(reader) 29 + .await? 30 + { 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 + }; 37 34 38 35 log::info!("got commit: {commit:?}"); 39 36 ··· 42 39 n += pairs.len(); 43 40 // log::info!("got {rkey:?}"); 44 41 } 45 - log::info!("bye! {n}"); 42 + log::info!("bye! total records={n}"); 46 43 47 44 Ok(()) 48 45 }
+70 -2
readme.md
··· 1 1 # repo-stream 2 2 3 - Fast and (aspirationally) robust atproto CAR file processing in rust 3 + A robust CAR file -> MST walker for atproto 4 + 5 + [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 + [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 8 + 9 + [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 10 + [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 + 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 15 + 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 + 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + 54 + // clean up the disk store (drop tables etc) 55 + driver.reset_store().await?; 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + Ok(()) 60 + } 61 + ``` 62 + 63 + more recent todo 64 + 65 + - [ ] get an *emtpy* car for the test suite 66 + - [x] implement a max size on disk limit 67 + 68 + 69 + ----- 70 + 71 + older stuff (to clean up): 4 72 5 73 6 74 current car processing times (records processed into their length usize, phil's dev machine): ··· 27 95 -> yeah the commit is returned from init 28 96 - [ ] spec compliance todos 29 97 - [x] assert that keys are ordered and fail if not 30 - - [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 98 + - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 31 99 - [ ] performance todos 32 100 - [x] consume the serialized nodes into a mutable efficient format 33 101 - [ ] maybe customize the deserialize impl to do that directly?
+163 -40
src/disk.rs
··· 1 + /*! 2 + Disk storage for blocks on disk 3 + 4 + Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5 + to be the best behaved in terms of both on-disk space usage and memory usage. 6 + 7 + ```no_run 8 + # use repo_stream::{DiskBuilder, DiskError}; 9 + # #[tokio::main] 10 + # async fn main() -> Result<(), DiskError> { 11 + let store = DiskBuilder::new() 12 + .with_cache_size_mb(32) 13 + .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 + .open("/some/path.db".into()).await?; 15 + # Ok(()) 16 + # } 17 + ``` 18 + */ 19 + 1 20 use crate::drive::DriveError; 2 21 use rusqlite::OptionalExtension; 3 22 use std::path::PathBuf; 4 23 5 - pub struct SqliteStore { 6 - path: PathBuf, 7 - limit_mb: usize, 24 + #[derive(Debug, thiserror::Error)] 25 + pub enum DiskError { 26 + /// A wrapped database error 27 + /// 28 + /// (The wrapped err should probably be obscured to remove public-facing 29 + /// sqlite bits) 30 + #[error(transparent)] 31 + DbError(#[from] rusqlite::Error), 32 + /// A tokio blocking task failed to join 33 + #[error("Failed to join a tokio blocking task: {0}")] 34 + JoinError(#[from] tokio::task::JoinError), 35 + /// The total size of stored blocks exceeded the allowed size 36 + /// 37 + /// If you need to process *really* big CARs, you can configure a higher 38 + /// limit. 39 + #[error("Maximum disk size reached")] 40 + MaxSizeExceeded, 41 + #[error("this error was replaced, seeing this is a bug.")] 42 + #[doc(hidden)] 43 + Stolen, 8 44 } 9 45 10 - impl SqliteStore { 11 - pub fn new(path: PathBuf, limit_mb: usize) -> Self { 12 - Self { path, limit_mb } 46 + impl DiskError { 47 + /// hack for ownership challenges with the disk driver 48 + pub(crate) fn steal(&mut self) -> Self { 49 + let mut swapped = DiskError::Stolen; 50 + std::mem::swap(self, &mut swapped); 51 + swapped 13 52 } 14 53 } 15 54 16 - impl SqliteStore { 17 - pub async fn get_access(&mut self) -> Result<SqliteAccess, rusqlite::Error> { 18 - let path = self.path.clone(); 19 - let limit_mb = self.limit_mb; 55 + /// Builder-style disk store setup 56 + #[derive(Debug, Clone)] 57 + pub struct DiskBuilder { 58 + /// Database in-memory cache allowance 59 + /// 60 + /// Default: 32 MiB 61 + pub cache_size_mb: usize, 62 + /// Database stored block size limit 63 + /// 64 + /// Default: 10 GiB 65 + /// 66 + /// Note: actual size on disk may be more, but should approximately scale 67 + /// with this limit 68 + pub max_stored_mb: usize, 69 + } 70 + 71 + impl Default for DiskBuilder { 72 + fn default() -> Self { 73 + Self { 74 + cache_size_mb: 32, 75 + max_stored_mb: 10 * 1024, // 10 GiB 76 + } 77 + } 78 + } 79 + 80 + impl DiskBuilder { 81 + /// Begin configuring the storage with defaults 82 + pub fn new() -> Self { 83 + Default::default() 84 + } 85 + /// Set the in-memory cache allowance for the database 86 + /// 87 + /// Default: 32 MiB 88 + pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 + self.cache_size_mb = size; 90 + self 91 + } 92 + /// Set the approximate stored block size limit 93 + /// 94 + /// Default: 10 GiB 95 + pub fn with_max_stored_mb(mut self, max: usize) -> Self { 96 + self.max_stored_mb = max; 97 + self 98 + } 99 + /// Open and initialize the actual disk storage 100 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 101 + DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 102 + } 103 + } 104 + 105 + /// On-disk block storage 106 + pub struct DiskStore { 107 + conn: rusqlite::Connection, 108 + max_stored: usize, 109 + stored: usize, 110 + } 111 + 112 + impl DiskStore { 113 + /// Initialize a new disk store 114 + pub async fn new( 115 + path: PathBuf, 116 + cache_mb: usize, 117 + max_stored_mb: usize, 118 + ) -> Result<Self, DiskError> { 119 + let max_stored = max_stored_mb * 2_usize.pow(20); 20 120 let conn = tokio::task::spawn_blocking(move || { 21 121 let conn = rusqlite::Connection::open(path)?; 22 122 23 - let sq_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 123 + let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 24 124 25 125 // conn.pragma_update(None, "journal_mode", "OFF")?; 26 126 // conn.pragma_update(None, "journal_mode", "MEMORY")?; 27 127 conn.pragma_update(None, "journal_mode", "WAL")?; 28 128 // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 29 129 conn.pragma_update(None, "synchronous", "OFF")?; 30 - conn.pragma_update(None, "cache_size", (limit_mb as i64 * sq_mb).to_string())?; 31 - conn.execute( 32 - "CREATE TABLE blocks ( 33 - key BLOB PRIMARY KEY NOT NULL, 34 - val BLOB NOT NULL 35 - ) WITHOUT ROWID", 36 - (), 130 + conn.pragma_update( 131 + None, 132 + "cache_size", 133 + (cache_mb as i64 * sqlite_one_mb).to_string(), 37 134 )?; 135 + Self::reset_tables(&conn)?; 38 136 39 - Ok::<_, rusqlite::Error>(conn) 137 + Ok::<_, DiskError>(conn) 40 138 }) 41 - .await 42 - .expect("join error")?; 139 + .await??; 43 140 44 - Ok(SqliteAccess { conn }) 141 + Ok(Self { 142 + conn, 143 + max_stored, 144 + stored: 0, 145 + }) 45 146 } 46 - } 47 - 48 - pub struct SqliteAccess { 49 - conn: rusqlite::Connection, 50 - } 51 - 52 - impl SqliteAccess { 53 - pub fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, rusqlite::Error> { 147 + pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 54 148 let tx = self.conn.transaction()?; 55 - // let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 56 - Ok(SqliteWriter { tx }) 149 + Ok(SqliteWriter { 150 + tx, 151 + stored: &mut self.stored, 152 + max: self.max_stored, 153 + }) 57 154 } 58 - pub fn get_reader(&'_ self) -> Result<SqliteReader<'_>, rusqlite::Error> { 155 + pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 59 156 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 60 157 Ok(SqliteReader { select_stmt }) 61 158 } 159 + /// Drop and recreate the kv table 160 + pub async fn reset(self) -> Result<Self, DiskError> { 161 + tokio::task::spawn_blocking(move || { 162 + Self::reset_tables(&self.conn)?; 163 + Ok(self) 164 + }) 165 + .await? 166 + } 167 + fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 + conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 + conn.execute( 170 + "CREATE TABLE blocks ( 171 + key BLOB PRIMARY KEY NOT NULL, 172 + val BLOB NOT NULL 173 + ) WITHOUT ROWID", 174 + (), 175 + )?; 176 + Ok(()) 177 + } 62 178 } 63 179 64 - pub struct SqliteWriter<'conn> { 180 + pub(crate) struct SqliteWriter<'conn> { 65 181 tx: rusqlite::Transaction<'conn>, 182 + stored: &'conn mut usize, 183 + max: usize, 66 184 } 67 185 68 186 impl SqliteWriter<'_> { 69 - pub fn put_many( 187 + pub(crate) fn put_many( 70 188 &mut self, 71 189 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 72 190 ) -> Result<(), DriveError> { 73 191 let mut insert_stmt = self 74 192 .tx 75 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 193 + .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 + .map_err(DiskError::DbError)?; 76 195 for pair in kv { 77 196 let (k, v) = pair?; 78 - insert_stmt.execute((k, v))?; 197 + *self.stored += v.len(); 198 + if *self.stored > self.max { 199 + return Err(DiskError::MaxSizeExceeded.into()); 200 + } 201 + insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 79 202 } 80 203 Ok(()) 81 204 } 82 - pub fn commit(self) -> Result<(), rusqlite::Error> { 205 + pub fn commit(self) -> Result<(), DiskError> { 83 206 self.tx.commit()?; 84 207 Ok(()) 85 208 } 86 209 } 87 210 88 - pub struct SqliteReader<'conn> { 211 + pub(crate) struct SqliteReader<'conn> { 89 212 select_stmt: rusqlite::Statement<'conn>, 90 213 } 91 214 92 215 impl SqliteReader<'_> { 93 - pub fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 216 + pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 94 217 self.select_stmt 95 218 .query_one((&key,), |row| row.get(0)) 96 219 .optional()
+417 -198
src/drive.rs
··· 1 - //! Consume an MST block stream, producing an ordered stream of records 1 + //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::disk::{SqliteAccess, SqliteStore}; 3 + use crate::disk::{DiskError, DiskStore}; 4 + use crate::process::Processable; 4 5 use ipld_core::cid::Cid; 5 6 use iroh_car::CarReader; 6 - use serde::de::DeserializeOwned; 7 7 use serde::{Deserialize, Serialize}; 8 8 use std::collections::HashMap; 9 9 use std::convert::Infallible; 10 - use tokio::io::AsyncRead; 10 + use tokio::{io::AsyncRead, sync::mpsc}; 11 11 12 12 use crate::mst::{Commit, Node}; 13 13 use crate::walk::{Step, WalkError, Walker}; ··· 28 28 #[error("CAR file had no roots")] 29 29 MissingRoot, 30 30 #[error("Storage error")] 31 - StorageError(#[from] rusqlite::Error), 31 + StorageError(#[from] DiskError), 32 32 #[error("Encode error: {0}")] 33 33 BincodeEncodeError(#[from] bincode::error::EncodeError), 34 - #[error("Decode error: {0}")] 35 - BincodeDecodeError(#[from] bincode::error::DecodeError), 36 34 #[error("Tried to send on a closed channel")] 37 35 ChannelSendError, // SendError takes <T> which we don't need 38 36 #[error("Failed to join a task: {0}")] 39 37 JoinError(#[from] tokio::task::JoinError), 40 38 } 41 39 42 - pub trait Processable: Clone + Serialize + DeserializeOwned { 43 - /// the additional size taken up (not including its mem::size_of) 44 - fn get_size(&self) -> usize; 40 + #[derive(Debug, thiserror::Error)] 41 + pub enum DecodeError { 42 + #[error(transparent)] 43 + BincodeDecodeError(#[from] bincode::error::DecodeError), 44 + #[error("extra bytes remained after decoding")] 45 + ExtraGarbage, 45 46 } 46 47 48 + /// An in-order chunk of Rkey + (processed) Block pairs 49 + pub type BlockChunk<T> = Vec<(String, T)>; 50 + 47 51 #[derive(Debug, Clone, Serialize, Deserialize)] 48 - pub enum MaybeProcessedBlock<T> { 52 + pub(crate) enum MaybeProcessedBlock<T> { 49 53 /// A block that's *probably* a Node (but we can't know yet) 50 54 /// 51 55 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 87 91 } 88 92 } 89 93 90 - pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> { 91 - Lil(Commit, MemDriver<T>), 92 - Big(BigCar<R, T>), 94 + impl<T> MaybeProcessedBlock<T> { 95 + fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 + if Node::could_be(&data) { 97 + MaybeProcessedBlock::Raw(data) 98 + } else { 99 + MaybeProcessedBlock::Processed(process(data)) 100 + } 101 + } 93 102 } 94 103 95 - pub async fn load_car<R: AsyncRead + Unpin, T: Processable>( 96 - reader: R, 97 - process: fn(Vec<u8>) -> T, 98 - max_size: usize, 99 - ) -> Result<Vehicle<R, T>, DriveError> { 100 - let mut mem_blocks = HashMap::new(); 104 + /// Read a CAR file, buffering blocks in memory or to disk 105 + pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 + /// All blocks fit within the memory limit 107 + /// 108 + /// You probably want to check the commit's signature. You can go ahead and 109 + /// walk the MST right away. 110 + Memory(Commit, MemDriver<T>), 111 + /// Blocks exceed the memory limit 112 + /// 113 + /// You'll need to provide a disk storage to continue. The commit will be 114 + /// returned and can be validated only once all blocks are loaded. 115 + Disk(NeedDisk<R, T>), 116 + } 101 117 102 - let mut car = CarReader::new(reader).await?; 118 + /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 120 + pub struct DriverBuilder { 121 + pub mem_limit_mb: usize, 122 + } 103 123 104 - let root = *car 105 - .header() 106 - .roots() 107 - .first() 108 - .ok_or(DriveError::MissingRoot)?; 109 - log::debug!("root: {root:?}"); 124 + impl Default for DriverBuilder { 125 + fn default() -> Self { 126 + Self { mem_limit_mb: 16 } 127 + } 128 + } 110 129 111 - let mut commit = None; 130 + impl DriverBuilder { 131 + /// Begin configuring the driver with defaults 132 + pub fn new() -> Self { 133 + Default::default() 134 + } 135 + /// Set the in-memory size limit, in MiB 136 + /// 137 + /// Default: 16 MiB 138 + pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 139 + Self { 140 + mem_limit_mb: new_limit, 141 + } 142 + } 143 + /// Set the block processor 144 + /// 145 + /// Default: noop, raw blocks will be emitted 146 + pub fn with_block_processor<T: Processable>( 147 + self, 148 + p: fn(Vec<u8>) -> T, 149 + ) -> DriverBuilderWithProcessor<T> { 150 + DriverBuilderWithProcessor { 151 + mem_limit_mb: self.mem_limit_mb, 152 + block_processor: p, 153 + } 154 + } 155 + /// Begin processing an atproto MST from a CAR file 156 + pub async fn load_car<R: AsyncRead + Unpin>( 157 + &self, 158 + reader: R, 159 + ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 + Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 161 + } 162 + } 163 + 164 + /// Builder-style driver intermediate step 165 + /// 166 + /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 168 + pub struct DriverBuilderWithProcessor<T: Processable> { 169 + pub mem_limit_mb: usize, 170 + pub block_processor: fn(Vec<u8>) -> T, 171 + } 112 172 113 - // try to load all the blocks into memory 114 - let mut mem_size = 0; 115 - while let Some((cid, data)) = car.next_block().await? { 116 - // the root commit is a Special Third Kind of block that we need to make 117 - // sure not to optimistically send to the processing function 118 - if cid == root { 119 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 120 - commit = Some(c); 121 - continue; 173 + impl<T: Processable> DriverBuilderWithProcessor<T> { 174 + /// Set the in-memory size limit, in MiB 175 + /// 176 + /// Default: 16 MiB 177 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 178 + self.mem_limit_mb = new_limit; 179 + self 180 + } 181 + /// Begin processing an atproto MST from a CAR file 182 + pub async fn load_car<R: AsyncRead + Unpin>( 183 + &self, 184 + reader: R, 185 + ) -> Result<Driver<R, T>, DriveError> { 186 + Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 + } 188 + } 189 + 190 + impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 191 + /// Begin processing an atproto MST from a CAR file 192 + /// 193 + /// Blocks will be loaded, processed, and buffered in memory. If the entire 194 + /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 195 + /// will be returned along with a `Commit` ready for validation. 196 + /// 197 + /// If the `mem_limit_mb` limit is reached before loading all blocks, the 198 + /// partial state will be returned as `Driver::Disk(needed)`, which can be 199 + /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 + pub async fn load_car( 201 + reader: R, 202 + process: fn(Vec<u8>) -> T, 203 + mem_limit_mb: usize, 204 + ) -> Result<Driver<R, T>, DriveError> { 205 + let max_size = mem_limit_mb * 2_usize.pow(20); 206 + let mut mem_blocks = HashMap::new(); 207 + 208 + let mut car = CarReader::new(reader).await?; 209 + 210 + let root = *car 211 + .header() 212 + .roots() 213 + .first() 214 + .ok_or(DriveError::MissingRoot)?; 215 + log::debug!("root: {root:?}"); 216 + 217 + let mut commit = None; 218 + 219 + // try to load all the blocks into memory 220 + let mut mem_size = 0; 221 + while let Some((cid, data)) = car.next_block().await? { 222 + // the root commit is a Special Third Kind of block that we need to make 223 + // sure not to optimistically send to the processing function 224 + if cid == root { 225 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 226 + commit = Some(c); 227 + continue; 228 + } 229 + 230 + // remaining possible types: node, record, other. optimistically process 231 + let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 + 233 + // stash (maybe processed) blocks in memory as long as we have room 234 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 235 + mem_blocks.insert(cid, maybe_processed); 236 + if mem_size >= max_size { 237 + return Ok(Driver::Disk(NeedDisk { 238 + car, 239 + root, 240 + process, 241 + max_size, 242 + mem_blocks, 243 + commit, 244 + })); 245 + } 122 246 } 123 247 124 - // remaining possible types: node, record, other. optimistically process 125 - let maybe_processed = if Node::could_be(&data) { 126 - MaybeProcessedBlock::Raw(data) 127 - } else { 128 - MaybeProcessedBlock::Processed(process(data)) 129 - }; 248 + // all blocks loaded and we fit in memory! hopefully we found the commit... 249 + let commit = commit.ok_or(DriveError::MissingCommit)?; 130 250 131 - // stash (maybe processed) blocks in memory as long as we have room 132 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 133 - mem_blocks.insert(cid, maybe_processed); 134 - if mem_size >= max_size { 135 - return Ok(Vehicle::Big(BigCar { 136 - car, 137 - root, 251 + let walker = Walker::new(commit.data); 252 + 253 + Ok(Driver::Memory( 254 + commit, 255 + MemDriver { 256 + blocks: mem_blocks, 257 + walker, 138 258 process, 139 - max_size, 140 - mem_blocks, 141 - commit, 142 - })); 143 - } 259 + }, 260 + )) 144 261 } 262 + } 145 263 146 - // all blocks loaded and we fit in memory! hopefully we found the commit... 147 - let commit = commit.ok_or(DriveError::MissingCommit)?; 264 + /// The core driver between the block stream and MST walker 265 + /// 266 + /// In the future, PDSs will export CARs in a stream-friendly order that will 267 + /// enable processing them with tiny memory overhead. But that future is not 268 + /// here yet. 269 + /// 270 + /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 271 + /// optimistic stream features: we load all block first, then walk the MST. 272 + /// 273 + /// This makes things much simpler: we only need to worry about spilling to disk 274 + /// in one place, and we always have a reasonable expecatation about how much 275 + /// work the init function will do. We can drop the CAR reader before walking, 276 + /// so the sync/async boundaries become a little easier to work around. 277 + #[derive(Debug)] 278 + pub struct MemDriver<T: Processable> { 279 + blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 280 + walker: Walker, 281 + process: fn(Vec<u8>) -> T, 282 + } 148 283 149 - let walker = Walker::new(commit.data); 284 + impl<T: Processable> MemDriver<T> { 285 + /// Step through the record outputs, in rkey order 286 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 287 + let mut out = Vec::with_capacity(n); 288 + for _ in 0..n { 289 + // walk as far as we can until we run out of blocks or find a record 290 + match self.walker.step(&mut self.blocks, self.process)? { 291 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 292 + Step::Finish => break, 293 + Step::Found { rkey, data } => { 294 + out.push((rkey, data)); 295 + continue; 296 + } 297 + }; 298 + } 150 299 151 - Ok(Vehicle::Lil( 152 - commit, 153 - MemDriver { 154 - blocks: mem_blocks, 155 - walker, 156 - process, 157 - }, 158 - )) 300 + if out.is_empty() { 301 + Ok(None) 302 + } else { 303 + Ok(Some(out)) 304 + } 305 + } 159 306 } 160 307 161 - /// a paritally memory-loaded car file that needs disk spillover to continue 162 - pub struct BigCar<R: AsyncRead + Unpin, T: Processable> { 308 + /// A partially memory-loaded car file that needs disk spillover to continue 309 + pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 163 310 car: CarReader<R>, 164 311 root: Cid, 165 312 process: fn(Vec<u8>) -> T, ··· 172 319 bincode::serde::encode_to_vec(v, bincode::config::standard()) 173 320 } 174 321 175 - pub fn decode<T: Processable>(bytes: &[u8]) -> Result<T, bincode::error::DecodeError> { 322 + pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 176 323 let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 177 - assert_eq!(n, bytes.len(), "expected to decode all bytes"); // TODO 324 + if n != bytes.len() { 325 + return Err(DecodeError::ExtraGarbage); 326 + } 178 327 Ok(t) 179 328 } 180 329 181 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> BigCar<R, T> { 330 + impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 182 331 pub async fn finish_loading( 183 332 mut self, 184 - mut store: SqliteStore, 185 - ) -> Result<(Commit, BigCarReady<T>), DriveError> { 186 - // set up access for real 187 - let mut access = store.get_access().await?; 188 - 189 - // move access in and back out so we can manage lifetimes 333 + mut store: DiskStore, 334 + ) -> Result<(Commit, DiskDriver<T>), DriveError> { 335 + // move store in and back out so we can manage lifetimes 190 336 // dump mem blocks into the store 191 - access = tokio::task::spawn(async move { 192 - let mut writer = access.get_writer()?; 337 + store = tokio::task::spawn(async move { 338 + let mut writer = store.get_writer()?; 193 339 194 340 let kvs = self 195 341 .mem_blocks ··· 198 344 199 345 writer.put_many(kvs)?; 200 346 writer.commit()?; 201 - Ok::<_, DriveError>(access) 347 + Ok::<_, DriveError>(store) 202 348 }) 203 349 .await??; 204 350 205 - let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 351 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 206 352 207 - let access_worker = tokio::task::spawn_blocking(move || { 208 - let mut writer = access.get_writer()?; 353 + let store_worker = tokio::task::spawn_blocking(move || { 354 + let mut writer = store.get_writer()?; 209 355 210 356 while let Some(chunk) = rx.blocking_recv() { 211 357 let kvs = chunk ··· 215 361 } 216 362 217 363 writer.commit()?; 218 - Ok::<_, DriveError>(access) 364 + Ok::<_, DriveError>(store) 219 365 }); // await later 220 366 221 367 // dump the rest to disk (in chunks) ··· 235 381 } 236 382 // remaining possible types: node, record, other. optimistically process 237 383 // TODO: get the actual in-memory size to compute disk spill 238 - let maybe_processed = if Node::could_be(&data) { 239 - MaybeProcessedBlock::Raw(data) 240 - } else { 241 - MaybeProcessedBlock::Processed((self.process)(data)) 242 - }; 384 + let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 243 385 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 244 386 chunk.push((cid, maybe_processed)); 245 387 if mem_size >= self.max_size { ··· 259 401 drop(tx); 260 402 log::debug!("done. waiting for worker to finish..."); 261 403 262 - access = access_worker.await??; 404 + store = store_worker.await??; 263 405 264 406 log::debug!("worker finished."); 265 407 ··· 269 411 270 412 Ok(( 271 413 commit, 272 - BigCarReady { 414 + DiskDriver { 273 415 process: self.process, 274 - access, 275 - walker, 416 + state: Some(BigState { store, walker }), 276 417 }, 277 418 )) 278 419 } 279 420 } 280 421 281 - pub struct BigCarReady<T: Clone> { 422 + struct BigState { 423 + store: DiskStore, 424 + walker: Walker, 425 + } 426 + 427 + /// MST walker that reads from disk instead of an in-memory hashmap 428 + pub struct DiskDriver<T: Clone> { 282 429 process: fn(Vec<u8>) -> T, 283 - access: SqliteAccess, 284 - walker: Walker, 430 + state: Option<BigState>, 431 + } 432 + 433 + // for doctests only 434 + #[doc(hidden)] 435 + pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 436 + use crate::process::noop; 437 + DiskDriver { 438 + process: noop, 439 + state: None, 440 + } 285 441 } 286 442 287 - impl<T: Processable + Send + 'static> BigCarReady<T> { 288 - pub async fn next_chunk( 289 - mut self, 290 - n: usize, 291 - ) -> Result<(Self, Option<Vec<(String, T)>>), DriveError> { 292 - let mut out = Vec::with_capacity(n); 293 - (self, out) = tokio::task::spawn_blocking(move || { 294 - let access = self.access; 295 - let mut reader = access.get_reader()?; 443 + impl<T: Processable + Send + 'static> DiskDriver<T> { 444 + /// Walk the MST returning up to `n` rkey + record pairs 445 + /// 446 + /// ```no_run 447 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 448 + /// # #[tokio::main] 449 + /// # async fn main() -> Result<(), DriveError> { 450 + /// # let mut disk_driver = _get_fake_disk_driver(); 451 + /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 452 + /// for (rkey, record) in pairs { 453 + /// println!("{rkey}: size={}", record.len()); 454 + /// } 455 + /// } 456 + /// let store = disk_driver.reset_store().await?; 457 + /// # Ok(()) 458 + /// # } 459 + /// ``` 460 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 461 + let process = self.process; 296 462 297 - for _ in 0..n { 298 - // walk as far as we can until we run out of blocks or find a record 299 - match self.walker.disk_step(&mut reader, self.process)? { 300 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 301 - Step::Finish => break, 302 - Step::Step { rkey, data } => { 303 - out.push((rkey, data)); 304 - continue; 463 + // state should only *ever* be None transiently while inside here 464 + let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 465 + 466 + // the big pain here is that we don't want to leave self.state in an 467 + // invalid state (None), so all the error paths have to make sure it 468 + // comes out again. 469 + let (state, res) = tokio::task::spawn_blocking( 470 + move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 471 + let mut reader_res = state.store.get_reader(); 472 + let reader: &mut _ = match reader_res { 473 + Ok(ref mut r) => r, 474 + Err(ref mut e) => { 475 + // unfortunately we can't return the error directly because 476 + // (for some reason) it's attached to the lifetime of the 477 + // reader? 478 + // hack a mem::swap so we can get it out :/ 479 + let e_swapped = e.steal(); 480 + // the pain: `state` *has to* outlive the reader 481 + drop(reader_res); 482 + return (state, Err(e_swapped.into())); 305 483 } 306 484 }; 307 - } 485 + 486 + let mut out = Vec::with_capacity(n); 487 + 488 + for _ in 0..n { 489 + // walk as far as we can until we run out of blocks or find a record 490 + let step = match state.walker.disk_step(reader, process) { 491 + Ok(s) => s, 492 + Err(e) => { 493 + // the pain: `state` *has to* outlive the reader 494 + drop(reader_res); 495 + return (state, Err(e.into())); 496 + } 497 + }; 498 + match step { 499 + Step::Missing(cid) => { 500 + // the pain: `state` *has to* outlive the reader 501 + drop(reader_res); 502 + return (state, Err(DriveError::MissingBlock(cid))); 503 + } 504 + Step::Finish => break, 505 + Step::Found { rkey, data } => out.push((rkey, data)), 506 + }; 507 + } 508 + 509 + // `state` *has to* outlive the reader 510 + drop(reader_res); 511 + 512 + (state, Ok::<_, DriveError>(out)) 513 + }, 514 + ) 515 + .await?; // on tokio JoinError, we'll be left with invalid state :( 308 516 309 - drop(reader); // cannot outlive access 310 - self.access = access; 311 - Ok::<_, DriveError>((self, out)) 312 - }) 313 - .await??; 517 + // *must* restore state before dealing with the actual result 518 + self.state = Some(state); 519 + 520 + let out = res?; 314 521 315 522 if out.is_empty() { 316 - Ok((self, None)) 523 + Ok(None) 317 524 } else { 318 - Ok((self, Some(out))) 525 + Ok(Some(out)) 319 526 } 320 527 } 321 528 322 - pub async fn rx( 323 - mut self, 529 + fn read_tx_blocking( 530 + &mut self, 324 531 n: usize, 325 - ) -> Result< 326 - ( 327 - tokio::sync::mpsc::Receiver<Vec<(String, T)>>, 328 - tokio::task::JoinHandle<Result<(), DriveError>>, 329 - ), 330 - DriveError, 331 - > { 332 - let (tx, rx) = tokio::sync::mpsc::channel::<Vec<(String, T)>>(1); 532 + tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 533 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 534 + let BigState { store, walker } = self.state.as_mut().expect("valid state"); 535 + let mut reader = match store.get_reader() { 536 + Ok(r) => r, 537 + Err(e) => return tx.blocking_send(Err(e.into())), 538 + }; 333 539 334 - // sketch: this worker is going to be allowed to execute without a join handle 335 - // ...should we return the join handle here so the caller at least knows about it? 336 - // yes probably for error handling?? (orrr put errors in the channel) 337 - let worker = tokio::task::spawn_blocking(move || { 338 - let mut reader = self.access.get_reader()?; 540 + loop { 541 + let mut out: BlockChunk<T> = Vec::with_capacity(n); 339 542 340 - loop { 341 - let mut out = Vec::with_capacity(n); 543 + for _ in 0..n { 544 + // walk as far as we can until we run out of blocks or find a record 342 545 343 - for _ in 0..n { 344 - // walk as far as we can until we run out of blocks or find a record 345 - match self.walker.disk_step(&mut reader, self.process)? { 346 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 347 - Step::Finish => break, 348 - Step::Step { rkey, data } => { 349 - out.push((rkey, data)); 350 - continue; 351 - } 352 - }; 353 - } 546 + let step = match walker.disk_step(&mut reader, self.process) { 547 + Ok(s) => s, 548 + Err(e) => return tx.blocking_send(Err(e.into())), 549 + }; 354 550 355 - if out.is_empty() { 356 - break; 357 - } 358 - tx.blocking_send(out) 359 - .map_err(|_| DriveError::ChannelSendError)?; 551 + match step { 552 + Step::Missing(cid) => { 553 + return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 554 + } 555 + Step::Finish => return Ok(()), 556 + Step::Found { rkey, data } => { 557 + out.push((rkey, data)); 558 + continue; 559 + } 560 + }; 360 561 } 361 562 362 - drop(reader); // cannot outlive access 363 - Ok(()) 364 - }); // await later 563 + if out.is_empty() { 564 + break; 565 + } 566 + tx.blocking_send(Ok(out))?; 567 + } 365 568 366 - Ok((rx, worker)) 569 + Ok(()) 367 570 } 368 - } 571 + 572 + /// Spawn the disk reading task into a tokio blocking thread 573 + /// 574 + /// The idea is to avoid so much sending back and forth to the blocking 575 + /// thread, letting a blocking task do all the disk reading work and sending 576 + /// records and rkeys back through an `mpsc` channel instead. 577 + /// 578 + /// This might also allow the disk work to continue while processing the 579 + /// records. It's still not yet clear if this method actually has much 580 + /// benefit over just using `.next_chunk(n)`. 581 + /// 582 + /// ```no_run 583 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 584 + /// # #[tokio::main] 585 + /// # async fn main() -> Result<(), DriveError> { 586 + /// # let mut disk_driver = _get_fake_disk_driver(); 587 + /// let (mut rx, join) = disk_driver.to_channel(512); 588 + /// while let Some(recvd) = rx.recv().await { 589 + /// let pairs = recvd?; 590 + /// for (rkey, record) in pairs { 591 + /// println!("{rkey}: size={}", record.len()); 592 + /// } 593 + /// 594 + /// } 595 + /// let store = join.await?.reset_store().await?; 596 + /// # Ok(()) 597 + /// # } 598 + /// ``` 599 + pub fn to_channel( 600 + mut self, 601 + n: usize, 602 + ) -> ( 603 + mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 604 + tokio::task::JoinHandle<Self>, 605 + ) { 606 + let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 369 607 370 - /// The core driver between the block stream and MST walker 371 - /// 372 - /// In the future, PDSs will export CARs in a stream-friendly order that will 373 - /// enable processing them with tiny memory overhead. But that future is not 374 - /// here yet. 375 - /// 376 - /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 377 - /// optimistic stream features: we load all block first, then walk the MST. 378 - /// 379 - /// This makes things much simpler: we only need to worry about spilling to disk 380 - /// in one place, and we always have a reasonable expecatation about how much 381 - /// work the init function will do. We can drop the CAR reader before walking, 382 - /// so the sync/async boundaries become a little easier to work around. 383 - #[derive(Debug)] 384 - pub struct MemDriver<T: Processable> { 385 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 386 - walker: Walker, 387 - process: fn(Vec<u8>) -> T, 388 - } 608 + // sketch: this worker is going to be allowed to execute without a join handle 609 + let chan_task = tokio::task::spawn_blocking(move || { 610 + if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 611 + log::debug!("big car reader exited early due to dropped receiver channel"); 612 + } 613 + self 614 + }); 389 615 390 - impl<T: Processable> MemDriver<T> { 391 - /// Manually step through the record outputs 392 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<(String, T)>>, DriveError> { 393 - let mut out = Vec::with_capacity(n); 394 - for _ in 0..n { 395 - // walk as far as we can until we run out of blocks or find a record 396 - match self.walker.step(&mut self.blocks, self.process)? { 397 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 398 - Step::Finish => break, 399 - Step::Step { rkey, data } => { 400 - out.push((rkey, data)); 401 - continue; 402 - } 403 - }; 404 - } 616 + (rx, chan_task) 617 + } 405 618 406 - if out.is_empty() { 407 - Ok(None) 408 - } else { 409 - Ok(Some(out)) 410 - } 619 + /// Reset the disk storage so it can be reused. You must call this. 620 + /// 621 + /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 622 + /// calls, that would be risky in an async context. For now you just have to 623 + /// carefully make sure you call it. 624 + /// 625 + /// The sqlite store is returned, so it can be reused for another 626 + /// `DiskDriver`. 627 + pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 628 + let BigState { store, .. } = self.state.take().expect("valid state"); 629 + Ok(store.reset().await?) 411 630 } 412 631 }
+84 -5
src/lib.rs
··· 1 - //! Fast and robust atproto CAR file processing in rust 2 - //! 3 - //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 1 + /*! 2 + A robust CAR file -> MST walker for atproto 3 + 4 + Small CARs have their blocks buffered in memory. If a configurable memory limit 5 + is reached while reading blocks, CAR reading is suspended, and can be continued 6 + by providing disk storage to buffer the CAR blocks instead. 7 + 8 + A `process` function can be provided for tasks where records are transformed 9 + into a smaller representation, to save memory (and disk) during block reading. 10 + 11 + Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12 + `(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 + 14 + Some MST validations are applied 15 + - Keys must appear in order 16 + - Keys must be at the correct MST tree depth 17 + 18 + `iroh_car` additionally applies a block size limit of `2MiB`. 19 + 20 + ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 + 23 + # #[tokio::main] 24 + # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 + # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 + let mut total_size = 0; 27 + 28 + match DriverBuilder::new() 29 + .with_mem_limit_mb(10) 30 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 + .load_car(reader) 32 + .await? 33 + { 34 + 35 + // if all blocks fit within memory 36 + Driver::Memory(_commit, mut driver) => { 37 + while let Some(chunk) = driver.next_chunk(256).await? { 38 + for (_rkey, size) in chunk { 39 + total_size += size; 40 + } 41 + } 42 + }, 43 + 44 + // if the CAR was too big for in-memory processing 45 + Driver::Disk(paused) => { 46 + // set up a disk store we can spill to 47 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 + // do the spilling, get back a (similar) driver 49 + let (_commit, mut driver) = paused.finish_loading(store).await?; 50 + 51 + while let Some(chunk) = driver.next_chunk(256).await? { 52 + for (_rkey, size) in chunk { 53 + total_size += size; 54 + } 55 + } 56 + 57 + // clean up the disk store (drop tables etc) 58 + driver.reset_store().await?; 59 + } 60 + }; 61 + println!("sum of size of all records: {total_size}"); 62 + # Ok(()) 63 + # } 64 + ``` 65 + 66 + Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 67 + ahead and eagerly using disk I/O. This means you have to write a bit more code 68 + to handle both cases, but it allows you to have finer control over resource 69 + usage. For example, you can drive a number of parallel memory CAR workers, and 70 + separately have a different number of disk workers picking up suspended disk 71 + tasks from a queue. 72 + 73 + Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 74 + 75 + */ 76 + 77 + pub mod mst; 78 + mod walk; 4 79 5 80 pub mod disk; 6 81 pub mod drive; 7 - pub mod mst; 8 - pub mod walk; 82 + pub mod process; 83 + 84 + pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 86 + pub use mst::Commit; 87 + pub use process::Processable;
+4 -4
src/mst.rs
··· 39 39 /// MST node data schema 40 40 #[derive(Debug, Deserialize, PartialEq)] 41 41 #[serde(deny_unknown_fields)] 42 - pub struct Node { 42 + pub(crate) struct Node { 43 43 /// link to sub-tree Node on a lower level and with all keys sorting before 44 44 /// keys at this node 45 45 #[serde(rename = "l")] ··· 62 62 /// so if a block *could be* a node, any record converter must postpone 63 63 /// processing. if it turns out it happens to be a very node-looking record, 64 64 /// well, sorry, it just has to only be processed later when that's known. 65 - pub fn could_be(bytes: impl AsRef<[u8]>) -> bool { 65 + pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 66 const NODE_FINGERPRINT: [u8; 3] = [ 67 67 0xA2, // map length 2 (for "l" and "e" keys) 68 68 0x61, // text length 1 ··· 83 83 /// with an empty array of entries. This is the only situation in which a 84 84 /// tree may contain an empty leaf node which does not either contain keys 85 85 /// ("entries") or point to a sub-tree containing entries. 86 - pub fn is_empty(&self) -> bool { 86 + pub(crate) fn is_empty(&self) -> bool { 87 87 self.left.is_none() && self.entries.is_empty() 88 88 } 89 89 } ··· 91 91 /// TreeEntry object 92 92 #[derive(Debug, Deserialize, PartialEq)] 93 93 #[serde(deny_unknown_fields)] 94 - pub struct Entry { 94 + pub(crate) struct Entry { 95 95 /// count of bytes shared with previous TreeEntry in this Node (if any) 96 96 #[serde(rename = "p")] 97 97 pub prefix_len: usize,
+108
src/process.rs
··· 1 + /*! 2 + Record processor function output trait 3 + 4 + The return type must satisfy the `Processable` trait, which requires: 5 + 6 + - `Clone` because two rkeys can refer to the same record by CID, which may 7 + only appear once in the CAR file. 8 + - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 + 10 + One required function must be implemented, `get_size()`: this should return the 11 + approximate total off-stack size of the type. (the on-stack size will be added 12 + automatically via `std::mem::get_size`). 13 + 14 + Note that it is **not guaranteed** that the `process` function will run on a 15 + block before storing it in memory or on disk: it's not possible to know if a 16 + block is a record without actually walking the MST, so the best we can do is 17 + apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 + store the raw block bytes. 19 + 20 + Here's a silly processing function that just collects 'eyy's found in the raw 21 + record bytes 22 + 23 + ``` 24 + # use repo_stream::Processable; 25 + # use serde::{Serialize, Deserialize}; 26 + #[derive(Debug, Clone, Serialize, Deserialize)] 27 + struct Eyy(usize, String); 28 + 29 + impl Processable for Eyy { 30 + fn get_size(&self) -> usize { 31 + // don't need to compute the usize, it's on the stack 32 + self.1.capacity() // in-mem size from the string's capacity, in bytes 33 + } 34 + } 35 + 36 + fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 + let mut out = Vec::new(); 38 + let to_find = "eyy".as_bytes(); 39 + for i in 0..(raw.len() - 3) { 40 + if &raw[i..(i+3)] == to_find { 41 + out.push(Eyy(i, "eyy".to_string())); 42 + } 43 + } 44 + out 45 + } 46 + ``` 47 + 48 + The memory sizing stuff is a little sketch but probably at least approximately 49 + works. 50 + */ 51 + 52 + use serde::{Serialize, de::DeserializeOwned}; 53 + 54 + /// Output trait for record processing 55 + pub trait Processable: Clone + Serialize + DeserializeOwned { 56 + /// Any additional in-memory size taken by the processed type 57 + /// 58 + /// Do not include stack size (`std::mem::size_of`) 59 + fn get_size(&self) -> usize; 60 + } 61 + 62 + /// Processor that just returns the raw blocks 63 + #[inline] 64 + pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 + block 66 + } 67 + 68 + impl Processable for u8 { 69 + fn get_size(&self) -> usize { 70 + 0 71 + } 72 + } 73 + 74 + impl Processable for usize { 75 + fn get_size(&self) -> usize { 76 + 0 // no additional space taken, just its stack size (newtype is free) 77 + } 78 + } 79 + 80 + impl Processable for String { 81 + fn get_size(&self) -> usize { 82 + self.capacity() 83 + } 84 + } 85 + 86 + impl<Item: Sized + Processable> Processable for Vec<Item> { 87 + fn get_size(&self) -> usize { 88 + let slot_size = std::mem::size_of::<Item>(); 89 + let direct_size = slot_size * self.capacity(); 90 + let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 + direct_size + items_referenced_size 92 + } 93 + } 94 + 95 + impl<Item: Processable> Processable for Option<Item> { 96 + fn get_size(&self) -> usize { 97 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 + } 99 + } 100 + 101 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 + fn get_size(&self) -> usize { 103 + match self { 104 + Ok(item) => item.get_size(), 105 + Err(err) => err.get_size(), 106 + } 107 + } 108 + }
+12 -208
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 3 use crate::disk::SqliteReader; 4 - use crate::drive::{MaybeProcessedBlock, Processable}; 4 + use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 5 use crate::mst::Node; 6 + use crate::process::Processable; 6 7 use ipld_core::cid::Cid; 7 8 use sha2::{Digest, Sha256}; 8 9 use std::collections::HashMap; ··· 20 21 #[error("storage error: {0}")] 21 22 StorageError(#[from] rusqlite::Error), 22 23 #[error("Decode error: {0}")] 23 - BincodeDecodeError(#[from] bincode::error::DecodeError), 24 + DecodeError(#[from] DecodeError), 24 25 } 25 26 26 27 /// Errors from invalid Rkeys ··· 50 51 /// Reached the end of the MST! yay! 51 52 Finish, 52 53 /// A record was found! 53 - Step { rkey: String, data: T }, 54 + Found { rkey: String, data: T }, 54 55 } 55 56 56 57 #[derive(Debug, Clone, PartialEq)] ··· 86 87 } 87 88 88 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 89 - // empty nodes are not allowed in the MST 90 - // ...except for a single one for empty MST, but we wouldn't be pushing that 90 + // empty nodes are not allowed in the MST except in an empty MST 91 91 if node.is_empty() { 92 - return Err(MstError::EmptyNode); 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 93 97 } 94 98 95 99 let mut entries = Vec::with_capacity(node.entries.len()); ··· 226 230 } 227 231 self.prev = rkey.clone(); 228 232 229 - return Ok(Step::Step { rkey, data }); 233 + return Ok(Step::Found { rkey, data }); 230 234 } 231 235 } 232 236 } ··· 293 297 } 294 298 self.prev = rkey.clone(); 295 299 296 - return Ok(Step::Step { rkey, data }); 300 + return Ok(Step::Found { rkey, data }); 297 301 } 298 302 } 299 303 } ··· 303 307 #[cfg(test)] 304 308 mod test { 305 309 use super::*; 306 - // use crate::mst::Entry; 307 310 308 311 fn cid1() -> Cid { 309 312 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 310 313 .parse() 311 314 .unwrap() 312 315 } 313 - // fn cid2() -> Cid { 314 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 315 - // .parse() 316 - // .unwrap() 317 - // } 318 - // fn cid3() -> Cid { 319 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 320 - // .parse() 321 - // .unwrap() 322 - // } 323 - // fn cid4() -> Cid { 324 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 325 - // .parse() 326 - // .unwrap() 327 - // } 328 - // fn cid5() -> Cid { 329 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 330 - // .parse() 331 - // .unwrap() 332 - // } 333 - // fn cid6() -> Cid { 334 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 335 - // .parse() 336 - // .unwrap() 337 - // } 338 - // fn cid7() -> Cid { 339 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 340 - // .parse() 341 - // .unwrap() 342 - // } 343 - // fn cid8() -> Cid { 344 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 345 - // .parse() 346 - // .unwrap() 347 - // } 348 - // fn cid9() -> Cid { 349 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 350 - // .parse() 351 - // .unwrap() 352 - // } 353 316 354 317 #[test] 355 318 fn test_depth_spec_0() { ··· 440 403 .as_ref() 441 404 ); 442 405 } 443 - 444 - // #[test] 445 - // fn test_needs_from_node_just_one_record() { 446 - // let node = Node { 447 - // left: None, 448 - // entries: vec![Entry { 449 - // keysuffix: "asdf".into(), 450 - // prefix_len: 0, 451 - // value: cid1(), 452 - // tree: None, 453 - // }], 454 - // }; 455 - // assert_eq!( 456 - // needs_from_node(node).unwrap(), 457 - // vec![Need::Record { 458 - // rkey: "asdf".into(), 459 - // cid: cid1(), 460 - // },] 461 - // ); 462 - // } 463 - 464 - // #[test] 465 - // fn test_needs_from_node_two_records() { 466 - // let node = Node { 467 - // left: None, 468 - // entries: vec![ 469 - // Entry { 470 - // keysuffix: "asdf".into(), 471 - // prefix_len: 0, 472 - // value: cid1(), 473 - // tree: None, 474 - // }, 475 - // Entry { 476 - // keysuffix: "gh".into(), 477 - // prefix_len: 2, 478 - // value: cid2(), 479 - // tree: None, 480 - // }, 481 - // ], 482 - // }; 483 - // assert_eq!( 484 - // needs_from_node(node).unwrap(), 485 - // vec![ 486 - // Need::Record { 487 - // rkey: "asdf".into(), 488 - // cid: cid1(), 489 - // }, 490 - // Need::Record { 491 - // rkey: "asgh".into(), 492 - // cid: cid2(), 493 - // }, 494 - // ] 495 - // ); 496 - // } 497 - 498 - // #[test] 499 - // fn test_needs_from_node_with_both() { 500 - // let node = Node { 501 - // left: None, 502 - // entries: vec![Entry { 503 - // keysuffix: "asdf".into(), 504 - // prefix_len: 0, 505 - // value: cid1(), 506 - // tree: Some(cid2()), 507 - // }], 508 - // }; 509 - // assert_eq!( 510 - // needs_from_node(node).unwrap(), 511 - // vec![ 512 - // Need::Record { 513 - // rkey: "asdf".into(), 514 - // cid: cid1(), 515 - // }, 516 - // Need::Node(cid2()), 517 - // ] 518 - // ); 519 - // } 520 - 521 - // #[test] 522 - // fn test_needs_from_node_left_and_record() { 523 - // let node = Node { 524 - // left: Some(cid1()), 525 - // entries: vec![Entry { 526 - // keysuffix: "asdf".into(), 527 - // prefix_len: 0, 528 - // value: cid2(), 529 - // tree: None, 530 - // }], 531 - // }; 532 - // assert_eq!( 533 - // needs_from_node(node).unwrap(), 534 - // vec![ 535 - // Need::Node(cid1()), 536 - // Need::Record { 537 - // rkey: "asdf".into(), 538 - // cid: cid2(), 539 - // }, 540 - // ] 541 - // ); 542 - // } 543 - 544 - // #[test] 545 - // fn test_needs_from_full_node() { 546 - // let node = Node { 547 - // left: Some(cid1()), 548 - // entries: vec![ 549 - // Entry { 550 - // keysuffix: "asdf".into(), 551 - // prefix_len: 0, 552 - // value: cid2(), 553 - // tree: Some(cid3()), 554 - // }, 555 - // Entry { 556 - // keysuffix: "ghi".into(), 557 - // prefix_len: 1, 558 - // value: cid4(), 559 - // tree: Some(cid5()), 560 - // }, 561 - // Entry { 562 - // keysuffix: "jkl".into(), 563 - // prefix_len: 2, 564 - // value: cid6(), 565 - // tree: Some(cid7()), 566 - // }, 567 - // Entry { 568 - // keysuffix: "mno".into(), 569 - // prefix_len: 4, 570 - // value: cid8(), 571 - // tree: Some(cid9()), 572 - // }, 573 - // ], 574 - // }; 575 - // assert_eq!( 576 - // needs_from_node(node).unwrap(), 577 - // vec![ 578 - // Need::Node(cid1()), 579 - // Need::Record { 580 - // rkey: "asdf".into(), 581 - // cid: cid2(), 582 - // }, 583 - // Need::Node(cid3()), 584 - // Need::Record { 585 - // rkey: "aghi".into(), 586 - // cid: cid4(), 587 - // }, 588 - // Need::Node(cid5()), 589 - // Need::Record { 590 - // rkey: "agjkl".into(), 591 - // cid: cid6(), 592 - // }, 593 - // Need::Node(cid7()), 594 - // Need::Record { 595 - // rkey: "agjkmno".into(), 596 - // cid: cid8(), 597 - // }, 598 - // Need::Node(cid9()), 599 - // ] 600 - // ); 601 - // } 602 406 }
+21 -22
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use repo_stream::drive::Processable; 3 - use serde::{Deserialize, Serialize}; 2 + use repo_stream::Driver; 4 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 6 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 7 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 8 8 9 - #[derive(Clone, Serialize, Deserialize)] 10 - struct S(usize); 11 - 12 - impl Processable for S { 13 - fn get_size(&self) -> usize { 14 - 0 // no additional space taken, just its stack size (newtype is free) 15 - } 16 - } 17 - 18 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 19 - let mb = 2_usize.pow(20); 20 - 21 - let mut driver = match repo_stream::drive::load_car(bytes, |block| S(block.len()), 10 * mb) 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 15 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 22 16 .await 23 17 .unwrap() 24 18 { 25 - repo_stream::drive::Vehicle::Lil(_commit, mem_driver) => mem_driver, 26 - repo_stream::drive::Vehicle::Big(_) => panic!("too big"), 19 + Driver::Memory(_commit, mem_driver) => mem_driver, 20 + Driver::Disk(_) => panic!("too big"), 27 21 }; 28 22 29 23 let mut records = 0; ··· 32 26 let mut prev_rkey = "".to_string(); 33 27 34 28 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 35 - for (rkey, S(size)) in pairs { 29 + for (rkey, size) in pairs { 36 30 records += 1; 37 31 sum += size; 38 32 if rkey == "app.bsky.actor.profile/self" { ··· 45 39 46 40 assert_eq!(records, expected_records); 47 41 assert_eq!(sum, expected_sum); 48 - assert!(found_bsky_profile); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 49 48 } 50 49 51 50 #[tokio::test] 52 51 async fn test_tiny_car() { 53 - test_car(TINY_CAR, 8, 2071).await 52 + test_car(TINY_CAR, 8, 2071, true).await 54 53 } 55 54 56 55 #[tokio::test] 57 56 async fn test_little_car() { 58 - test_car(LITTLE_CAR, 278, 246960).await 57 + test_car(LITTLE_CAR, 278, 246960, true).await 59 58 } 60 59 61 60 #[tokio::test] 62 61 async fn test_midsize_car() { 63 - test_car(MIDSIZE_CAR, 11585, 3741393).await 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 64 63 }