Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+319 -251
benches
car-samples
examples
disk-read-file
read-file
src
tests
+1 -1
Cargo.lock
··· 1024 1025 [[package]] 1026 name = "repo-stream" 1027 - version = "0.1.1" 1028 dependencies = [ 1029 "bincode", 1030 "clap",
··· 1024 1025 [[package]] 1026 name = "repo-stream" 1027 + version = "0.2.2" 1028 dependencies = [ 1029 "bincode", 1030 "clap",
+2 -2
Cargo.toml
··· 1 [package] 2 name = "repo-stream" 3 - version = "0.1.1" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 - description = "Fast and robust atproto CAR file processing in rust" 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies]
··· 1 [package] 2 name = "repo-stream" 3 + version = "0.2.2" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 + description = "A robust CAR file -> MST walker for atproto" 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies]
+4
benches/non-huge-cars.rs
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 13 .build() 14 .expect("Creating runtime failed"); 15 16 c.bench_function("tiny-car", |b| { 17 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 18 });
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 14 .build() 15 .expect("Creating runtime failed"); 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 20 c.bench_function("tiny-car", |b| { 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 22 });
car-samples/empty.car

This is a binary file and will not be displayed.

+11 -12
examples/disk-read-file/main.rs
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 - use repo_stream::{DiskStore, Driver, process::noop}; 8 use std::path::PathBuf; 9 10 #[derive(Debug, Parser)] 11 struct Args { ··· 26 let reader = tokio::fs::File::open(car).await?; 27 let reader = tokio::io::BufReader::new(reader); 28 29 - // configure how much memory can be used before spilling to disk. 30 - // real memory usage may differ somewhat. 31 - let in_mem_limit = 10; // MiB 32 - 33 - // configure how much memory sqlite is allowed to use when dumping to disk 34 - let db_cache_mb = 32; // MiB 35 - 36 log::info!("hello! reading the car..."); 37 38 // in this example we only bother handling CARs that are too big for memory 39 // `noop` helper means: do no block processing, store the raw blocks 40 - let driver = match Driver::load_car(reader, noop, in_mem_limit).await? { 41 Driver::Memory(_, _) => panic!("try this on a bigger car"), 42 Driver::Disk(big_stuff) => { 43 // we reach here if the repo was too big and needs to be spilled to 44 // disk to continue 45 46 // set up a disk store we can spill to 47 - let disk_store = DiskStore::new(tmpfile.clone(), db_cache_mb).await?; 48 49 // do the spilling, get back a (similar) driver 50 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 51 52 // at this point you might want to fetch the account's signing key 53 // via the DID from the commit, and then verify the signature. 54 - log::warn!("big's comit: {:?}", commit); 55 56 // pop the driver back out to get some code indentation relief 57 driver ··· 81 } 82 } 83 84 - log::info!("arrived! joining rx..."); 85 86 // clean up the database. would be nice to do this in drop so it happens 87 // automatically, but some blocking work happens, so that's not allowed in
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 + use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 + use std::time::Instant; 10 11 #[derive(Debug, Parser)] 12 struct Args { ··· 27 let reader = tokio::fs::File::open(car).await?; 28 let reader = tokio::io::BufReader::new(reader); 29 30 log::info!("hello! reading the car..."); 31 + let t0 = Instant::now(); 32 33 // in this example we only bother handling CARs that are too big for memory 34 // `noop` helper means: do no block processing, store the raw blocks 35 + let driver = match DriverBuilder::new() 36 + .with_mem_limit_mb(10) // how much memory can be used before disk spill 37 + .load_car(reader) 38 + .await? 39 + { 40 Driver::Memory(_, _) => panic!("try this on a bigger car"), 41 Driver::Disk(big_stuff) => { 42 // we reach here if the repo was too big and needs to be spilled to 43 // disk to continue 44 45 // set up a disk store we can spill to 46 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 47 48 // do the spilling, get back a (similar) driver 49 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 50 51 // at this point you might want to fetch the account's signing key 52 // via the DID from the commit, and then verify the signature. 53 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 54 55 // pop the driver back out to get some code indentation relief 56 driver ··· 80 } 81 } 82 83 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 85 // clean up the database. would be nice to do this in drop so it happens 86 // automatically, but some blocking work happens, so that's not allowed in
+9 -6
examples/read-file/main.rs
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 - use repo_stream::Driver; 8 use std::path::PathBuf; 9 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 23 let reader = tokio::fs::File::open(file).await?; 24 let reader = tokio::io::BufReader::new(reader); 25 26 - let (commit, mut driver) = 27 - match Driver::load_car(reader, |block| block.len(), 16 /* MiB */).await? { 28 - Driver::Memory(commit, mem_driver) => (commit, mem_driver), 29 - Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 30 - }; 31 32 log::info!("got commit: {commit:?}"); 33
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 + use repo_stream::{Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 23 let reader = tokio::fs::File::open(file).await?; 24 let reader = tokio::io::BufReader::new(reader); 25 26 + let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len()) 28 + .load_car(reader) 29 + .await? 30 + { 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 + }; 34 35 log::info!("got commit: {commit:?}"); 36
+70 -2
readme.md
··· 1 # repo-stream 2 3 - Fast and (aspirationally) robust atproto CAR file processing in rust 4 5 6 current car processing times (records processed into their length usize, phil's dev machine): ··· 27 -> yeah the commit is returned from init 28 - [ ] spec compliance todos 29 - [x] assert that keys are ordered and fail if not 30 - - [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 31 - [ ] performance todos 32 - [x] consume the serialized nodes into a mutable efficient format 33 - [ ] maybe customize the deserialize impl to do that directly?
··· 1 # repo-stream 2 3 + A robust CAR file -> MST walker for atproto 4 + 5 + [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 + [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 8 + 9 + [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 10 + [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 + 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 15 + 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 + 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + 54 + // clean up the disk store (drop tables etc) 55 + driver.reset_store().await?; 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + Ok(()) 60 + } 61 + ``` 62 + 63 + more recent todo 64 + 65 + - [ ] get an *emtpy* car for the test suite 66 + - [x] implement a max size on disk limit 67 + 68 + 69 + ----- 70 + 71 + older stuff (to clean up): 72 73 74 current car processing times (records processed into their length usize, phil's dev machine): ··· 95 -> yeah the commit is returned from init 96 - [ ] spec compliance todos 97 - [x] assert that keys are ordered and fail if not 98 + - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 99 - [ ] performance todos 100 - [x] consume the serialized nodes into a mutable efficient format 101 - [ ] maybe customize the deserialize impl to do that directly?
+85 -6
src/disk.rs
··· 5 to be the best behaved in terms of both on-disk space usage and memory usage. 6 7 ```no_run 8 - # use repo_stream::{DiskStore, DiskError}; 9 # #[tokio::main] 10 # async fn main() -> Result<(), DiskError> { 11 - let db_cache_size = 32; // MiB 12 - let store = DiskStore::new("/some/path.db".into(), db_cache_size).await?; 13 # Ok(()) 14 # } 15 ``` ··· 30 /// A tokio blocking task failed to join 31 #[error("Failed to join a tokio blocking task: {0}")] 32 JoinError(#[from] tokio::task::JoinError), 33 #[error("this error was replaced, seeing this is a bug.")] 34 #[doc(hidden)] 35 Stolen, ··· 44 } 45 } 46 47 /// On-disk block storage 48 pub struct DiskStore { 49 conn: rusqlite::Connection, 50 } 51 52 impl DiskStore { 53 /// Initialize a new disk store 54 - pub async fn new(path: PathBuf, cache_mb: usize) -> Result<Self, DiskError> { 55 let conn = tokio::task::spawn_blocking(move || { 56 let conn = rusqlite::Connection::open(path)?; 57 ··· 73 }) 74 .await??; 75 76 - Ok(Self { conn }) 77 } 78 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 79 let tx = self.conn.transaction()?; 80 - Ok(SqliteWriter { tx }) 81 } 82 pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 83 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; ··· 106 107 pub(crate) struct SqliteWriter<'conn> { 108 tx: rusqlite::Transaction<'conn>, 109 } 110 111 impl SqliteWriter<'_> { ··· 119 .map_err(DiskError::DbError)?; 120 for pair in kv { 121 let (k, v) = pair?; 122 insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 123 } 124 Ok(())
··· 5 to be the best behaved in terms of both on-disk space usage and memory usage. 6 7 ```no_run 8 + # use repo_stream::{DiskBuilder, DiskError}; 9 # #[tokio::main] 10 # async fn main() -> Result<(), DiskError> { 11 + let store = DiskBuilder::new() 12 + .with_cache_size_mb(32) 13 + .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 + .open("/some/path.db".into()).await?; 15 # Ok(()) 16 # } 17 ``` ··· 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), 35 + /// The total size of stored blocks exceeded the allowed size 36 + /// 37 + /// If you need to process *really* big CARs, you can configure a higher 38 + /// limit. 39 + #[error("Maximum disk size reached")] 40 + MaxSizeExceeded, 41 #[error("this error was replaced, seeing this is a bug.")] 42 #[doc(hidden)] 43 Stolen, ··· 52 } 53 } 54 55 + /// Builder-style disk store setup 56 + #[derive(Debug, Clone)] 57 + pub struct DiskBuilder { 58 + /// Database in-memory cache allowance 59 + /// 60 + /// Default: 32 MiB 61 + pub cache_size_mb: usize, 62 + /// Database stored block size limit 63 + /// 64 + /// Default: 10 GiB 65 + /// 66 + /// Note: actual size on disk may be more, but should approximately scale 67 + /// with this limit 68 + pub max_stored_mb: usize, 69 + } 70 + 71 + impl Default for DiskBuilder { 72 + fn default() -> Self { 73 + Self { 74 + cache_size_mb: 32, 75 + max_stored_mb: 10 * 1024, // 10 GiB 76 + } 77 + } 78 + } 79 + 80 + impl DiskBuilder { 81 + /// Begin configuring the storage with defaults 82 + pub fn new() -> Self { 83 + Default::default() 84 + } 85 + /// Set the in-memory cache allowance for the database 86 + /// 87 + /// Default: 32 MiB 88 + pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 + self.cache_size_mb = size; 90 + self 91 + } 92 + /// Set the approximate stored block size limit 93 + /// 94 + /// Default: 10 GiB 95 + pub fn with_max_stored_mb(mut self, max: usize) -> Self { 96 + self.max_stored_mb = max; 97 + self 98 + } 99 + /// Open and initialize the actual disk storage 100 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 101 + DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 102 + } 103 + } 104 + 105 /// On-disk block storage 106 pub struct DiskStore { 107 conn: rusqlite::Connection, 108 + max_stored: usize, 109 + stored: usize, 110 } 111 112 impl DiskStore { 113 /// Initialize a new disk store 114 + pub async fn new( 115 + path: PathBuf, 116 + cache_mb: usize, 117 + max_stored_mb: usize, 118 + ) -> Result<Self, DiskError> { 119 + let max_stored = max_stored_mb * 2_usize.pow(20); 120 let conn = tokio::task::spawn_blocking(move || { 121 let conn = rusqlite::Connection::open(path)?; 122 ··· 138 }) 139 .await??; 140 141 + Ok(Self { 142 + conn, 143 + max_stored, 144 + stored: 0, 145 + }) 146 } 147 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 let tx = self.conn.transaction()?; 149 + Ok(SqliteWriter { 150 + tx, 151 + stored: &mut self.stored, 152 + max: self.max_stored, 153 + }) 154 } 155 pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; ··· 179 180 pub(crate) struct SqliteWriter<'conn> { 181 tx: rusqlite::Transaction<'conn>, 182 + stored: &'conn mut usize, 183 + max: usize, 184 } 185 186 impl SqliteWriter<'_> { ··· 194 .map_err(DiskError::DbError)?; 195 for pair in kv { 196 let (k, v) = pair?; 197 + *self.stored += v.len(); 198 + if *self.stored > self.max { 199 + return Err(DiskError::MaxSizeExceeded.into()); 200 + } 201 insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 202 } 203 Ok(())
+78 -6
src/drive.rs
··· 115 Disk(NeedDisk<R, T>), 116 } 117 118 impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 119 /// Begin processing an atproto MST from a CAR file 120 /// 121 /// Blocks will be loaded, processed, and buffered in memory. If the entire 122 - /// processed size is under the `max_size_mb` limit, a `Driver::Memory` will 123 - /// be returned along with a `Commit` ready for validation. 124 /// 125 - /// If the `max_size_mb` limit is reached before loading all blocks, the 126 /// partial state will be returned as `Driver::Disk(needed)`, which can be 127 /// resumed by providing a `SqliteStorage` for on-disk block storage. 128 pub async fn load_car( 129 reader: R, 130 process: fn(Vec<u8>) -> T, 131 - max_size_mb: usize, 132 ) -> Result<Driver<R, T>, DriveError> { 133 - let max_size = max_size_mb * 2_usize.pow(20); 134 let mut mem_blocks = HashMap::new(); 135 136 let mut car = CarReader::new(reader).await?; ··· 276 }) 277 .await??; 278 279 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 280 281 let store_worker = tokio::task::spawn_blocking(move || { 282 let mut writer = store.get_writer()?;
··· 115 Disk(NeedDisk<R, T>), 116 } 117 118 + /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 120 + pub struct DriverBuilder { 121 + pub mem_limit_mb: usize, 122 + } 123 + 124 + impl Default for DriverBuilder { 125 + fn default() -> Self { 126 + Self { mem_limit_mb: 16 } 127 + } 128 + } 129 + 130 + impl DriverBuilder { 131 + /// Begin configuring the driver with defaults 132 + pub fn new() -> Self { 133 + Default::default() 134 + } 135 + /// Set the in-memory size limit, in MiB 136 + /// 137 + /// Default: 16 MiB 138 + pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 139 + Self { 140 + mem_limit_mb: new_limit, 141 + } 142 + } 143 + /// Set the block processor 144 + /// 145 + /// Default: noop, raw blocks will be emitted 146 + pub fn with_block_processor<T: Processable>( 147 + self, 148 + p: fn(Vec<u8>) -> T, 149 + ) -> DriverBuilderWithProcessor<T> { 150 + DriverBuilderWithProcessor { 151 + mem_limit_mb: self.mem_limit_mb, 152 + block_processor: p, 153 + } 154 + } 155 + /// Begin processing an atproto MST from a CAR file 156 + pub async fn load_car<R: AsyncRead + Unpin>( 157 + &self, 158 + reader: R, 159 + ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 + Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 161 + } 162 + } 163 + 164 + /// Builder-style driver intermediate step 165 + /// 166 + /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 168 + pub struct DriverBuilderWithProcessor<T: Processable> { 169 + pub mem_limit_mb: usize, 170 + pub block_processor: fn(Vec<u8>) -> T, 171 + } 172 + 173 + impl<T: Processable> DriverBuilderWithProcessor<T> { 174 + /// Set the in-memory size limit, in MiB 175 + /// 176 + /// Default: 16 MiB 177 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 178 + self.mem_limit_mb = new_limit; 179 + self 180 + } 181 + /// Begin processing an atproto MST from a CAR file 182 + pub async fn load_car<R: AsyncRead + Unpin>( 183 + &self, 184 + reader: R, 185 + ) -> Result<Driver<R, T>, DriveError> { 186 + Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 + } 188 + } 189 + 190 impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 191 /// Begin processing an atproto MST from a CAR file 192 /// 193 /// Blocks will be loaded, processed, and buffered in memory. If the entire 194 + /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 195 + /// will be returned along with a `Commit` ready for validation. 196 /// 197 + /// If the `mem_limit_mb` limit is reached before loading all blocks, the 198 /// partial state will be returned as `Driver::Disk(needed)`, which can be 199 /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 pub async fn load_car( 201 reader: R, 202 process: fn(Vec<u8>) -> T, 203 + mem_limit_mb: usize, 204 ) -> Result<Driver<R, T>, DriveError> { 205 + let max_size = mem_limit_mb * 2_usize.pow(20); 206 let mut mem_blocks = HashMap::new(); 207 208 let mut car = CarReader::new(reader).await?; ··· 348 }) 349 .await??; 350 351 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 352 353 let store_worker = tokio::task::spawn_blocking(move || { 354 let mut writer = store.get_writer()?;
+10 -8
src/lib.rs
··· 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 20 ``` 21 - use repo_stream::{Driver, DiskStore}; 22 23 # #[tokio::main] 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 let mut total_size = 0; 27 - let process = |rec: Vec<u8>| rec.len(); // block processing: just extract the size 28 - let in_mem_limit = 10; /* MiB */ 29 - let db_cache_size = 32; /* MiB */ 30 31 - match Driver::load_car(reader, process, in_mem_limit).await? { 32 33 // if all blocks fit within memory 34 Driver::Memory(_commit, mut driver) => { ··· 42 // if the CAR was too big for in-memory processing 43 Driver::Disk(paused) => { 44 // set up a disk store we can spill to 45 - let store = DiskStore::new("some/path.db".into(), db_cache_size).await?; 46 // do the spilling, get back a (similar) driver 47 let (_commit, mut driver) = paused.finish_loading(store).await?; 48 ··· 79 pub mod drive; 80 pub mod process; 81 82 - pub use disk::{DiskError, DiskStore}; 83 - pub use drive::{DriveError, Driver}; 84 pub use mst::Commit; 85 pub use process::Processable;
··· 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 20 ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 23 # #[tokio::main] 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 let mut total_size = 0; 27 28 + match DriverBuilder::new() 29 + .with_mem_limit_mb(10) 30 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 + .load_car(reader) 32 + .await? 33 + { 34 35 // if all blocks fit within memory 36 Driver::Memory(_commit, mut driver) => { ··· 44 // if the CAR was too big for in-memory processing 45 Driver::Disk(paused) => { 46 // set up a disk store we can spill to 47 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 // do the spilling, get back a (similar) driver 49 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 ··· 81 pub mod drive; 82 pub mod process; 83 84 + pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 86 pub use mst::Commit; 87 pub use process::Processable;
+27
src/process.rs
··· 11 approximate total off-stack size of the type. (the on-stack size will be added 12 automatically via `std::mem::get_size`). 13 14 Here's a silly processing function that just collects 'eyy's found in the raw 15 record bytes 16 ··· 71 } 72 } 73 74 impl<Item: Sized + Processable> Processable for Vec<Item> { 75 fn get_size(&self) -> usize { 76 let slot_size = std::mem::size_of::<Item>(); ··· 79 direct_size + items_referenced_size 80 } 81 }
··· 11 approximate total off-stack size of the type. (the on-stack size will be added 12 automatically via `std::mem::get_size`). 13 14 + Note that it is **not guaranteed** that the `process` function will run on a 15 + block before storing it in memory or on disk: it's not possible to know if a 16 + block is a record without actually walking the MST, so the best we can do is 17 + apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 + store the raw block bytes. 19 + 20 Here's a silly processing function that just collects 'eyy's found in the raw 21 record bytes 22 ··· 77 } 78 } 79 80 + impl Processable for String { 81 + fn get_size(&self) -> usize { 82 + self.capacity() 83 + } 84 + } 85 + 86 impl<Item: Sized + Processable> Processable for Vec<Item> { 87 fn get_size(&self) -> usize { 88 let slot_size = std::mem::size_of::<Item>(); ··· 91 direct_size + items_referenced_size 92 } 93 } 94 + 95 + impl<Item: Processable> Processable for Option<Item> { 96 + fn get_size(&self) -> usize { 97 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 + } 99 + } 100 + 101 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 + fn get_size(&self) -> usize { 103 + match self { 104 + Ok(item) => item.get_size(), 105 + Err(err) => err.get_size(), 106 + } 107 + } 108 + }
+6 -203
src/walk.rs
··· 87 } 88 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST 91 - // ...except for a single one for empty MST, but we wouldn't be pushing that 92 if node.is_empty() { 93 - return Err(MstError::EmptyNode); 94 } 95 96 let mut entries = Vec::with_capacity(node.entries.len()); ··· 304 #[cfg(test)] 305 mod test { 306 use super::*; 307 - // use crate::mst::Entry; 308 309 fn cid1() -> Cid { 310 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 311 .parse() 312 .unwrap() 313 } 314 - // fn cid2() -> Cid { 315 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 316 - // .parse() 317 - // .unwrap() 318 - // } 319 - // fn cid3() -> Cid { 320 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 321 - // .parse() 322 - // .unwrap() 323 - // } 324 - // fn cid4() -> Cid { 325 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 326 - // .parse() 327 - // .unwrap() 328 - // } 329 - // fn cid5() -> Cid { 330 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 331 - // .parse() 332 - // .unwrap() 333 - // } 334 - // fn cid6() -> Cid { 335 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 336 - // .parse() 337 - // .unwrap() 338 - // } 339 - // fn cid7() -> Cid { 340 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 341 - // .parse() 342 - // .unwrap() 343 - // } 344 - // fn cid8() -> Cid { 345 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 346 - // .parse() 347 - // .unwrap() 348 - // } 349 - // fn cid9() -> Cid { 350 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 351 - // .parse() 352 - // .unwrap() 353 - // } 354 355 #[test] 356 fn test_depth_spec_0() { ··· 441 .as_ref() 442 ); 443 } 444 - 445 - // #[test] 446 - // fn test_needs_from_node_just_one_record() { 447 - // let node = Node { 448 - // left: None, 449 - // entries: vec![Entry { 450 - // keysuffix: "asdf".into(), 451 - // prefix_len: 0, 452 - // value: cid1(), 453 - // tree: None, 454 - // }], 455 - // }; 456 - // assert_eq!( 457 - // needs_from_node(node).unwrap(), 458 - // vec![Need::Record { 459 - // rkey: "asdf".into(), 460 - // cid: cid1(), 461 - // },] 462 - // ); 463 - // } 464 - 465 - // #[test] 466 - // fn test_needs_from_node_two_records() { 467 - // let node = Node { 468 - // left: None, 469 - // entries: vec![ 470 - // Entry { 471 - // keysuffix: "asdf".into(), 472 - // prefix_len: 0, 473 - // value: cid1(), 474 - // tree: None, 475 - // }, 476 - // Entry { 477 - // keysuffix: "gh".into(), 478 - // prefix_len: 2, 479 - // value: cid2(), 480 - // tree: None, 481 - // }, 482 - // ], 483 - // }; 484 - // assert_eq!( 485 - // needs_from_node(node).unwrap(), 486 - // vec![ 487 - // Need::Record { 488 - // rkey: "asdf".into(), 489 - // cid: cid1(), 490 - // }, 491 - // Need::Record { 492 - // rkey: "asgh".into(), 493 - // cid: cid2(), 494 - // }, 495 - // ] 496 - // ); 497 - // } 498 - 499 - // #[test] 500 - // fn test_needs_from_node_with_both() { 501 - // let node = Node { 502 - // left: None, 503 - // entries: vec![Entry { 504 - // keysuffix: "asdf".into(), 505 - // prefix_len: 0, 506 - // value: cid1(), 507 - // tree: Some(cid2()), 508 - // }], 509 - // }; 510 - // assert_eq!( 511 - // needs_from_node(node).unwrap(), 512 - // vec![ 513 - // Need::Record { 514 - // rkey: "asdf".into(), 515 - // cid: cid1(), 516 - // }, 517 - // Need::Node(cid2()), 518 - // ] 519 - // ); 520 - // } 521 - 522 - // #[test] 523 - // fn test_needs_from_node_left_and_record() { 524 - // let node = Node { 525 - // left: Some(cid1()), 526 - // entries: vec![Entry { 527 - // keysuffix: "asdf".into(), 528 - // prefix_len: 0, 529 - // value: cid2(), 530 - // tree: None, 531 - // }], 532 - // }; 533 - // assert_eq!( 534 - // needs_from_node(node).unwrap(), 535 - // vec![ 536 - // Need::Node(cid1()), 537 - // Need::Record { 538 - // rkey: "asdf".into(), 539 - // cid: cid2(), 540 - // }, 541 - // ] 542 - // ); 543 - // } 544 - 545 - // #[test] 546 - // fn test_needs_from_full_node() { 547 - // let node = Node { 548 - // left: Some(cid1()), 549 - // entries: vec![ 550 - // Entry { 551 - // keysuffix: "asdf".into(), 552 - // prefix_len: 0, 553 - // value: cid2(), 554 - // tree: Some(cid3()), 555 - // }, 556 - // Entry { 557 - // keysuffix: "ghi".into(), 558 - // prefix_len: 1, 559 - // value: cid4(), 560 - // tree: Some(cid5()), 561 - // }, 562 - // Entry { 563 - // keysuffix: "jkl".into(), 564 - // prefix_len: 2, 565 - // value: cid6(), 566 - // tree: Some(cid7()), 567 - // }, 568 - // Entry { 569 - // keysuffix: "mno".into(), 570 - // prefix_len: 4, 571 - // value: cid8(), 572 - // tree: Some(cid9()), 573 - // }, 574 - // ], 575 - // }; 576 - // assert_eq!( 577 - // needs_from_node(node).unwrap(), 578 - // vec![ 579 - // Need::Node(cid1()), 580 - // Need::Record { 581 - // rkey: "asdf".into(), 582 - // cid: cid2(), 583 - // }, 584 - // Need::Node(cid3()), 585 - // Need::Record { 586 - // rkey: "aghi".into(), 587 - // cid: cid4(), 588 - // }, 589 - // Need::Node(cid5()), 590 - // Need::Record { 591 - // rkey: "agjkl".into(), 592 - // cid: cid6(), 593 - // }, 594 - // Need::Node(cid7()), 595 - // Need::Record { 596 - // rkey: "agjkmno".into(), 597 - // cid: cid8(), 598 - // }, 599 - // Need::Node(cid9()), 600 - // ] 601 - // ); 602 - // } 603 }
··· 87 } 88 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 + // empty nodes are not allowed in the MST except in an empty MST 91 if node.is_empty() { 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 97 } 98 99 let mut entries = Vec::with_capacity(node.entries.len()); ··· 307 #[cfg(test)] 308 mod test { 309 use super::*; 310 311 fn cid1() -> Cid { 312 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 313 .parse() 314 .unwrap() 315 } 316 317 #[test] 318 fn test_depth_spec_0() { ··· 403 .as_ref() 404 ); 405 } 406 }
+16 -5
tests/non-huge-cars.rs
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 4 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 5 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 6 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 7 8 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 9 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 10 .await 11 .unwrap() ··· 33 34 assert_eq!(records, expected_records); 35 assert_eq!(sum, expected_sum); 36 - assert!(found_bsky_profile); 37 } 38 39 #[tokio::test] 40 async fn test_tiny_car() { 41 - test_car(TINY_CAR, 8, 2071).await 42 } 43 44 #[tokio::test] 45 async fn test_little_car() { 46 - test_car(LITTLE_CAR, 278, 246960).await 47 } 48 49 #[tokio::test] 50 async fn test_midsize_car() { 51 - test_car(MIDSIZE_CAR, 11585, 3741393).await 52 }
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 8 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 15 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 .await 17 .unwrap() ··· 39 40 assert_eq!(records, expected_records); 41 assert_eq!(sum, expected_sum); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 48 } 49 50 #[tokio::test] 51 async fn test_tiny_car() { 52 + test_car(TINY_CAR, 8, 2071, true).await 53 } 54 55 #[tokio::test] 56 async fn test_little_car() { 57 + test_car(LITTLE_CAR, 278, 246960, true).await 58 } 59 60 #[tokio::test] 61 async fn test_midsize_car() { 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 63 }