Fast and robust atproto CAR file processing in rust

chunked walking

Changed files
+36 -6
src
+1 -1
Cargo.toml
··· 20 20 serde_bytes = "0.11.19" 21 21 serde_ipld_dagcbor = "0.6.4" 22 22 thiserror = "2.0.17" 23 - tokio = "1.47.1" 23 + tokio = { version = "1.47.1", features = ["rt"] } 24 24 25 25 [dev-dependencies] 26 26 clap = { version = "4.5.48", features = ["derive"] }
+32 -4
src/disk_drive.rs
··· 1 1 use futures::Stream; 2 2 use futures::TryStreamExt; 3 + use std::collections::VecDeque; 3 4 use std::error::Error; 4 5 5 6 use crate::disk_walk::{Step, Trip, Walker}; ··· 47 48 block_store: BS, 48 49 walker: Walker, 49 50 process: P, 51 + out_cache: VecDeque<(String, T)>, 50 52 } 51 53 52 54 impl<SE, S, T, BS, P, PE> Vehicle<SE, S, T, BS, P, PE> ··· 120 122 block_store, 121 123 walker, 122 124 process, 125 + out_cache: VecDeque::new(), 123 126 }; 124 127 Ok((commit, me)) 125 128 } 126 129 130 + async fn load_chunk(&mut self, n: usize) -> Result<(), DriveError> { 131 + self.out_cache.reserve(n); 132 + for _ in 0..n { 133 + let item = match self.walker.step(&mut self.block_store, &self.process)? { 134 + Step::Step { rkey, data } => (rkey, data), 135 + Step::Finish => break, 136 + Step::Rest(cid) => return Err(DriveError::MissingBlock(cid)), 137 + }; 138 + self.out_cache.push_back(item); 139 + } 140 + Ok(()) 141 + } 142 + 143 + /// Get a chunk of records at a time 144 + /// 145 + /// the number of returned records may be smaller or larger than requested 146 + /// (but non-zero), even if it's not the last chunk. 147 + /// 148 + /// an empty vec will be returned to signal the end. 149 + pub async fn next_chunk(&mut self, n: usize) -> Result<Vec<(String, T)>, DriveError> { 150 + if self.out_cache.is_empty() { 151 + self.load_chunk(n).await?; 152 + } 153 + Ok(std::mem::take(&mut self.out_cache).into()) 154 + } 155 + 127 156 /// Manually step through the record outputs 128 157 pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> { 129 - match self.walker.step(&mut self.block_store, &self.process)? { 130 - Step::Rest(cid) => Err(DriveError::MissingBlock(cid)), 131 - Step::Finish => Ok(None), 132 - Step::Step { rkey, data } => Ok(Some((rkey, data))), 158 + if self.out_cache.is_empty() { 159 + self.load_chunk(64).await?; // TODO 133 160 } 161 + Ok(self.out_cache.pop_front()) 134 162 } 135 163 136 164 /// Convert to a futures::stream of record outputs
+3 -1
src/disk_redb.rs
··· 45 45 } 46 46 47 47 tx.commit().unwrap(); 48 - }).await.unwrap(); 48 + }) 49 + .await 50 + .unwrap(); 49 51 } 50 52 51 53 fn get(&self, c: Cid) -> Option<Vec<u8>> {