Fast and robust atproto CAR file processing in rust
at fjall3 2.8 kB view raw
1/*! 2A robust CAR file -> MST walker for atproto 3 4Small CARs have their blocks buffered in memory. If a configurable memory limit 5is reached while reading blocks, CAR reading is suspended, and can be continued 6by providing disk storage to buffer the CAR blocks instead. 7 8A `process` function can be provided for tasks where records are transformed 9into a smaller representation, to save memory (and disk) during block reading. 10 11Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12`(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 14Some MST validations are applied 15- Keys must appear in order 16- Keys must be at the correct MST tree depth 17 18`iroh_car` additionally applies a block size limit of `2MiB`. 19 20``` 21use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 23# #[tokio::main] 24# async fn main() -> Result<(), Box<dyn std::error::Error>> { 25# let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26let mut total_size = 0; 27 28match DriverBuilder::new() 29 .with_mem_limit_mb(10) 30 .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 .load_car(reader) 32 .await? 33{ 34 35 // if all blocks fit within memory 36 Driver::Memory(_commit, mut driver) => { 37 while let Some(chunk) = driver.next_chunk(256).await? { 38 for (_rkey, size) in chunk { 39 total_size += size; 40 } 41 } 42 }, 43 44 // if the CAR was too big for in-memory processing 45 Driver::Disk(paused) => { 46 // set up a disk store we can spill to 47 let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 // do the spilling, get back a (similar) driver 49 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 51 while let Some(chunk) = driver.next_chunk(256).await? { 52 for (_rkey, size) in chunk { 53 total_size += size; 54 } 55 } 56 } 57}; 58println!("sum of size of all records: {total_size}"); 59# Ok(()) 60# } 61``` 62 63Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 64ahead and eagerly using disk I/O. This means you have to write a bit more code 65to handle both cases, but it allows you to have finer control over resource 66usage. For example, you can drive a number of parallel memory CAR workers, and 67separately have a different number of disk workers picking up suspended disk 68tasks from a queue. 69 70Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 71 72*/ 73 74pub mod mst; 75mod walk; 76 77pub mod disk; 78pub mod drive; 79pub mod process; 80 81pub use disk::{DiskBuilder, DiskError, DiskStore}; 82pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 83pub use mst::Commit; 84pub use process::Processable;