Fast and robust atproto CAR file processing in rust

take owned vec in data

Changed files
+10 -10
src
+6 -6
src/drive.rs
··· 112 112 113 113 pub async fn load_car<R: AsyncRead + Unpin, T: Processable>( 114 114 reader: R, 115 - process: fn(&[u8]) -> T, 115 + process: fn(Vec<u8>) -> T, 116 116 max_size: usize, 117 117 ) -> Result<Vehicle<R, T>, DriveError> { 118 118 let mut mem_blocks = HashMap::new(); ··· 144 144 let maybe_processed = if Node::could_be(&data) { 145 145 MaybeProcessedBlock::Raw(data) 146 146 } else { 147 - MaybeProcessedBlock::Processed(process(&data)) 147 + MaybeProcessedBlock::Processed(process(data)) 148 148 }; 149 149 150 150 // stash (maybe processed) blocks in memory as long as we have room ··· 181 181 pub struct BigCar<R: AsyncRead + Unpin, T: Processable> { 182 182 car: CarReader<R>, 183 183 root: Cid, 184 - process: fn(&[u8]) -> T, 184 + process: fn(Vec<u8>) -> T, 185 185 max_size: usize, 186 186 mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 187 187 pub commit: Option<Commit>, ··· 243 243 let maybe_processed = if Node::could_be(&data) { 244 244 MaybeProcessedBlock::Raw(data) 245 245 } else { 246 - MaybeProcessedBlock::Processed((self.process)(&data)) 246 + MaybeProcessedBlock::Processed((self.process)(data)) 247 247 }; 248 248 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 249 249 chunk.push((cid, maybe_processed)); ··· 287 287 } 288 288 289 289 pub struct BigCarReady<T: Clone, A: DiskAccess> { 290 - process: fn(&[u8]) -> T, 290 + process: fn(Vec<u8>) -> T, 291 291 access: A, 292 292 walker: Walker, 293 293 } ··· 349 349 pub struct MemDriver<T: Processable> { 350 350 blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 351 351 walker: Walker, 352 - process: fn(&[u8]) -> T, 352 + process: fn(Vec<u8>) -> T, 353 353 } 354 354 355 355 impl<T: Processable> MemDriver<T> {
+4 -4
src/walk.rs
··· 111 111 pub fn step<T: Processable>( 112 112 &mut self, 113 113 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 114 - process: impl Fn(&[u8]) -> T, 114 + process: impl Fn(Vec<u8>) -> T, 115 115 ) -> Result<Step<T>, Trip> { 116 116 loop { 117 117 let Some(mut need) = self.stack.last() else { ··· 147 147 }; 148 148 let rkey = rkey.clone(); 149 149 let data = match data { 150 - MaybeProcessedBlock::Raw(data) => process(data), 150 + MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 151 151 MaybeProcessedBlock::Processed(t) => t.clone(), 152 152 }; 153 153 ··· 173 173 pub fn disk_step<T: Processable, R: DiskReader>( 174 174 &mut self, 175 175 reader: &mut R, 176 - process: impl Fn(&[u8]) -> T, 176 + process: impl Fn(Vec<u8>) -> T, 177 177 ) -> Result<Step<T>, DiskTrip<R::StorageError>> { 178 178 loop { 179 179 let Some(mut need) = self.stack.last() else { ··· 214 214 let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 215 215 let rkey = rkey.clone(); 216 216 let data = match data { 217 - MaybeProcessedBlock::Raw(data) => process(&data), 217 + MaybeProcessedBlock::Raw(data) => process(data), 218 218 MaybeProcessedBlock::Processed(t) => t.clone(), 219 219 }; 220 220