Fast and robust atproto CAR file processing in rust

add builder-style constructors for driver + store

Changed files
+185 -35
examples
disk-read-file
read-file
src
+7 -10
examples/disk-read-file/main.rs
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 - use repo_stream::{DiskStore, Driver, process::noop}; 8 use std::path::PathBuf; 9 10 #[derive(Debug, Parser)] ··· 26 let reader = tokio::fs::File::open(car).await?; 27 let reader = tokio::io::BufReader::new(reader); 28 29 - // configure how much memory can be used before spilling to disk. 30 - // real memory usage may differ somewhat. 31 - let in_mem_limit = 10; // MiB 32 - 33 - // configure how much memory sqlite is allowed to use when dumping to disk 34 - let db_cache_mb = 32; // MiB 35 - 36 log::info!("hello! reading the car..."); 37 38 // in this example we only bother handling CARs that are too big for memory 39 // `noop` helper means: do no block processing, store the raw blocks 40 - let driver = match Driver::load_car(reader, noop, in_mem_limit).await? { 41 Driver::Memory(_, _) => panic!("try this on a bigger car"), 42 Driver::Disk(big_stuff) => { 43 // we reach here if the repo was too big and needs to be spilled to 44 // disk to continue 45 46 // set up a disk store we can spill to 47 - let disk_store = DiskStore::new(tmpfile.clone(), db_cache_mb).await?; 48 49 // do the spilling, get back a (similar) driver 50 let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 + use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 10 #[derive(Debug, Parser)] ··· 26 let reader = tokio::fs::File::open(car).await?; 27 let reader = tokio::io::BufReader::new(reader); 28 29 log::info!("hello! reading the car..."); 30 31 // in this example we only bother handling CARs that are too big for memory 32 // `noop` helper means: do no block processing, store the raw blocks 33 + let driver = match DriverBuilder::new() 34 + .with_mem_limit_mb(10) // how much memory can be used before disk spill 35 + .load_car(reader) 36 + .await? 37 + { 38 Driver::Memory(_, _) => panic!("try this on a bigger car"), 39 Driver::Disk(big_stuff) => { 40 // we reach here if the repo was too big and needs to be spilled to 41 // disk to continue 42 43 // set up a disk store we can spill to 44 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 45 46 // do the spilling, get back a (similar) driver 47 let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
+9 -6
examples/read-file/main.rs
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 - use repo_stream::Driver; 8 use std::path::PathBuf; 9 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 23 let reader = tokio::fs::File::open(file).await?; 24 let reader = tokio::io::BufReader::new(reader); 25 26 - let (commit, mut driver) = 27 - match Driver::load_car(reader, |block| block.len(), 16 /* MiB */).await? { 28 - Driver::Memory(commit, mem_driver) => (commit, mem_driver), 29 - Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 30 - }; 31 32 log::info!("got commit: {commit:?}"); 33
··· 4 5 extern crate repo_stream; 6 use clap::Parser; 7 + use repo_stream::{Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 23 let reader = tokio::fs::File::open(file).await?; 24 let reader = tokio::io::BufReader::new(reader); 25 26 + let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len()) 28 + .load_car(reader) 29 + .await? 30 + { 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 + }; 34 35 log::info!("got commit: {commit:?}"); 36
+84 -6
src/disk.rs
··· 5 to be the best behaved in terms of both on-disk space usage and memory usage. 6 7 ```no_run 8 - # use repo_stream::{DiskStore, DiskError}; 9 # #[tokio::main] 10 # async fn main() -> Result<(), DiskError> { 11 - let db_cache_size = 32; // MiB 12 - let store = DiskStore::new("/some/path.db".into(), db_cache_size).await?; 13 # Ok(()) 14 # } 15 ``` ··· 30 /// A tokio blocking task failed to join 31 #[error("Failed to join a tokio blocking task: {0}")] 32 JoinError(#[from] tokio::task::JoinError), 33 #[error("this error was replaced, seeing this is a bug.")] 34 #[doc(hidden)] 35 Stolen, ··· 44 } 45 } 46 47 /// On-disk block storage 48 pub struct DiskStore { 49 conn: rusqlite::Connection, 50 } 51 52 impl DiskStore { 53 /// Initialize a new disk store 54 - pub async fn new(path: PathBuf, cache_mb: usize) -> Result<Self, DiskError> { 55 let conn = tokio::task::spawn_blocking(move || { 56 let conn = rusqlite::Connection::open(path)?; 57 ··· 73 }) 74 .await??; 75 76 - Ok(Self { conn }) 77 } 78 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 79 let tx = self.conn.transaction()?; 80 - Ok(SqliteWriter { tx }) 81 } 82 pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 83 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; ··· 106 107 pub(crate) struct SqliteWriter<'conn> { 108 tx: rusqlite::Transaction<'conn>, 109 } 110 111 impl SqliteWriter<'_> { ··· 119 .map_err(DiskError::DbError)?; 120 for pair in kv { 121 let (k, v) = pair?; 122 insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 123 } 124 Ok(())
··· 5 to be the best behaved in terms of both on-disk space usage and memory usage. 6 7 ```no_run 8 + # use repo_stream::{DiskBuilder, DiskError}; 9 # #[tokio::main] 10 # async fn main() -> Result<(), DiskError> { 11 + let store = DiskBuilder::new() 12 + .with_cache_size_mb(32) 13 + .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 + .open("/some/path.db".into()).await?; 15 # Ok(()) 16 # } 17 ``` ··· 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), 35 + /// The total size of stored blocks exceeded the allowed size 36 + /// 37 + /// If you need to process *really* big CARs, you can configure a higher 38 + /// limit. 39 + #[error("Maximum disk size reached")] 40 + MaxSizeExceeded, 41 #[error("this error was replaced, seeing this is a bug.")] 42 #[doc(hidden)] 43 Stolen, ··· 52 } 53 } 54 55 + /// Builder-style disk store setup 56 + pub struct DiskBuilder { 57 + /// Database in-memory cache allowance 58 + /// 59 + /// Default: 32 MiB 60 + pub cache_size_mb: usize, 61 + /// Database stored block size limit 62 + /// 63 + /// Default: 10 GiB 64 + /// 65 + /// Note: actual size on disk may be more, but should approximately scale 66 + /// with this limit 67 + pub max_stored_mb: usize, 68 + } 69 + 70 + impl Default for DiskBuilder { 71 + fn default() -> Self { 72 + Self { 73 + cache_size_mb: 32, 74 + max_stored_mb: 10 * 1024, // 10 GiB 75 + } 76 + } 77 + } 78 + 79 + impl DiskBuilder { 80 + /// Begin configuring the storage with defaults 81 + pub fn new() -> Self { 82 + Default::default() 83 + } 84 + /// Set the in-memory cache allowance for the database 85 + /// 86 + /// Default: 32 MiB 87 + pub fn with_cache_size_mb(mut self, size: usize) -> Self { 88 + self.cache_size_mb = size; 89 + self 90 + } 91 + /// Set the approximate stored block size limit 92 + /// 93 + /// Default: 10 GiB 94 + pub fn with_max_stored_mb(mut self, max: usize) -> Self { 95 + self.max_stored_mb = max; 96 + self 97 + } 98 + /// Open and initialize the actual disk storage 99 + pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 + DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 + } 102 + } 103 + 104 /// On-disk block storage 105 pub struct DiskStore { 106 conn: rusqlite::Connection, 107 + max_stored: usize, 108 + stored: usize, 109 } 110 111 impl DiskStore { 112 /// Initialize a new disk store 113 + pub async fn new( 114 + path: PathBuf, 115 + cache_mb: usize, 116 + max_stored_mb: usize, 117 + ) -> Result<Self, DiskError> { 118 + let max_stored = max_stored_mb * 2_usize.pow(20); 119 let conn = tokio::task::spawn_blocking(move || { 120 let conn = rusqlite::Connection::open(path)?; 121 ··· 137 }) 138 .await??; 139 140 + Ok(Self { 141 + conn, 142 + max_stored, 143 + stored: 0, 144 + }) 145 } 146 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 147 let tx = self.conn.transaction()?; 148 + Ok(SqliteWriter { 149 + tx, 150 + stored: &mut self.stored, 151 + max: self.max_stored, 152 + }) 153 } 154 pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 155 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; ··· 178 179 pub(crate) struct SqliteWriter<'conn> { 180 tx: rusqlite::Transaction<'conn>, 181 + stored: &'conn mut usize, 182 + max: usize, 183 } 184 185 impl SqliteWriter<'_> { ··· 193 .map_err(DiskError::DbError)?; 194 for pair in kv { 195 let (k, v) = pair?; 196 + *self.stored += v.len(); 197 + if *self.stored > self.max { 198 + return Err(DiskError::MaxSizeExceeded.into()); 199 + } 200 insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 201 } 202 Ok(())
+75 -5
src/drive.rs
··· 115 Disk(NeedDisk<R, T>), 116 } 117 118 impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 119 /// Begin processing an atproto MST from a CAR file 120 /// 121 /// Blocks will be loaded, processed, and buffered in memory. If the entire 122 - /// processed size is under the `max_size_mb` limit, a `Driver::Memory` will 123 - /// be returned along with a `Commit` ready for validation. 124 /// 125 - /// If the `max_size_mb` limit is reached before loading all blocks, the 126 /// partial state will be returned as `Driver::Disk(needed)`, which can be 127 /// resumed by providing a `SqliteStorage` for on-disk block storage. 128 pub async fn load_car( 129 reader: R, 130 process: fn(Vec<u8>) -> T, 131 - max_size_mb: usize, 132 ) -> Result<Driver<R, T>, DriveError> { 133 - let max_size = max_size_mb * 2_usize.pow(20); 134 let mut mem_blocks = HashMap::new(); 135 136 let mut car = CarReader::new(reader).await?;
··· 115 Disk(NeedDisk<R, T>), 116 } 117 118 + /// Builder-style driver setup 119 + pub struct DriverBuilder { 120 + pub mem_limit_mb: usize, 121 + } 122 + 123 + impl Default for DriverBuilder { 124 + fn default() -> Self { 125 + Self { mem_limit_mb: 16 } 126 + } 127 + } 128 + 129 + impl DriverBuilder { 130 + /// Begin configuring the driver with defaults 131 + pub fn new() -> Self { 132 + Default::default() 133 + } 134 + /// Set the in-memory size limit, in MiB 135 + /// 136 + /// Default: 16 MiB 137 + pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 138 + Self { 139 + mem_limit_mb: new_limit, 140 + } 141 + } 142 + /// Set the block processor 143 + /// 144 + /// Default: noop, raw blocks will be emitted 145 + pub fn with_block_processor<T: Processable>( 146 + self, 147 + p: fn(Vec<u8>) -> T, 148 + ) -> DriverBuilderWithProcessor<T> { 149 + DriverBuilderWithProcessor { 150 + mem_limit_mb: self.mem_limit_mb, 151 + block_processor: p, 152 + } 153 + } 154 + /// Begin processing an atproto MST from a CAR file 155 + pub async fn load_car<R: AsyncRead + Unpin>( 156 + self, 157 + reader: R, 158 + ) -> Result<Driver<R, Vec<u8>>, DriveError> { 159 + Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 160 + } 161 + } 162 + 163 + /// Builder-style driver intermediate step 164 + /// 165 + /// start from `DriverBuilder` 166 + pub struct DriverBuilderWithProcessor<T: Processable> { 167 + pub mem_limit_mb: usize, 168 + pub block_processor: fn(Vec<u8>) -> T, 169 + } 170 + 171 + impl<T: Processable> DriverBuilderWithProcessor<T> { 172 + /// Set the in-memory size limit, in MiB 173 + /// 174 + /// Default: 16 MiB 175 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 176 + self.mem_limit_mb = new_limit; 177 + self 178 + } 179 + /// Begin processing an atproto MST from a CAR file 180 + pub async fn load_car<R: AsyncRead + Unpin>( 181 + self, 182 + reader: R, 183 + ) -> Result<Driver<R, T>, DriveError> { 184 + Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 185 + } 186 + } 187 + 188 impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 189 /// Begin processing an atproto MST from a CAR file 190 /// 191 /// Blocks will be loaded, processed, and buffered in memory. If the entire 192 + /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 193 + /// will be returned along with a `Commit` ready for validation. 194 /// 195 + /// If the `mem_limit_mb` limit is reached before loading all blocks, the 196 /// partial state will be returned as `Driver::Disk(needed)`, which can be 197 /// resumed by providing a `SqliteStorage` for on-disk block storage. 198 pub async fn load_car( 199 reader: R, 200 process: fn(Vec<u8>) -> T, 201 + mem_limit_mb: usize, 202 ) -> Result<Driver<R, T>, DriveError> { 203 + let max_size = mem_limit_mb * 2_usize.pow(20); 204 let mut mem_blocks = HashMap::new(); 205 206 let mut car = CarReader::new(reader).await?;
+10 -8
src/lib.rs
··· 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 20 ``` 21 - use repo_stream::{Driver, DiskStore}; 22 23 # #[tokio::main] 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 let mut total_size = 0; 27 - let process = |rec: Vec<u8>| rec.len(); // block processing: just extract the size 28 - let in_mem_limit = 10; /* MiB */ 29 - let db_cache_size = 32; /* MiB */ 30 31 - match Driver::load_car(reader, process, in_mem_limit).await? { 32 33 // if all blocks fit within memory 34 Driver::Memory(_commit, mut driver) => { ··· 42 // if the CAR was too big for in-memory processing 43 Driver::Disk(paused) => { 44 // set up a disk store we can spill to 45 - let store = DiskStore::new("some/path.db".into(), db_cache_size).await?; 46 // do the spilling, get back a (similar) driver 47 let (_commit, mut driver) = paused.finish_loading(store).await?; 48 ··· 79 pub mod drive; 80 pub mod process; 81 82 - pub use disk::{DiskError, DiskStore}; 83 - pub use drive::{DriveError, Driver}; 84 pub use mst::Commit; 85 pub use process::Processable;
··· 18 `iroh_car` additionally applies a block size limit of `2MiB`. 19 20 ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 23 # #[tokio::main] 24 # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 let mut total_size = 0; 27 28 + match DriverBuilder::new() 29 + .with_mem_limit_mb(10) 30 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 + .load_car(reader) 32 + .await? 33 + { 34 35 // if all blocks fit within memory 36 Driver::Memory(_commit, mut driver) => { ··· 44 // if the CAR was too big for in-memory processing 45 Driver::Disk(paused) => { 46 // set up a disk store we can spill to 47 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 // do the spilling, get back a (similar) driver 49 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 ··· 81 pub mod drive; 82 pub mod process; 83 84 + pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 + pub use drive::{DriveError, Driver, DriverBuilder}; 86 pub use mst::Commit; 87 pub use process::Processable;