Fast and robust atproto CAR file processing in rust
at disk 2.8 kB view raw
1/*! 2A robust CAR file -> MST walker for atproto 3 4Small CARs have their blocks buffered in memory. If a configurable memory limit 5is reached while reading blocks, CAR reading is suspended, and can be continued 6by providing disk storage to buffer the CAR blocks instead. 7 8A `process` function can be provided for tasks where records are transformed 9into a smaller representation, to save memory (and disk) during block reading. 10 11Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12`(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 14Some MST validations are applied 15- Keys must appear in order 16- Keys must be at the correct MST tree depth 17 18`iroh_car` additionally applies a block size limit of `2MiB`. 19 20``` 21use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 23# #[tokio::main] 24# async fn main() -> Result<(), Box<dyn std::error::Error>> { 25# let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26let mut total_size = 0; 27 28match DriverBuilder::new() 29 .with_mem_limit_mb(10) 30 .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 .load_car(reader) 32 .await? 33{ 34 35 // if all blocks fit within memory 36 Driver::Memory(_commit, mut driver) => { 37 while let Some(chunk) = driver.next_chunk(256).await? { 38 for (_rkey, size) in chunk { 39 total_size += size; 40 } 41 } 42 }, 43 44 // if the CAR was too big for in-memory processing 45 Driver::Disk(paused) => { 46 // set up a disk store we can spill to 47 let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 // do the spilling, get back a (similar) driver 49 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 51 while let Some(chunk) = driver.next_chunk(256).await? { 52 for (_rkey, size) in chunk { 53 total_size += size; 54 } 55 } 56 57 // clean up the disk store (drop tables etc) 58 driver.reset_store().await?; 59 } 60}; 61println!("sum of size of all records: {total_size}"); 62# Ok(()) 63# } 64``` 65 66Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 67ahead and eagerly using disk I/O. This means you have to write a bit more code 68to handle both cases, but it allows you to have finer control over resource 69usage. For example, you can drive a number of parallel memory CAR workers, and 70separately have a different number of disk workers picking up suspended disk 71tasks from a queue. 72 73Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 74 75*/ 76 77pub mod mst; 78mod walk; 79 80pub mod disk; 81pub mod drive; 82pub mod process; 83 84pub use disk::{DiskBuilder, DiskError, DiskStore}; 85pub use drive::{DriveError, Driver, DriverBuilder}; 86pub use mst::Commit; 87pub use process::Processable;