Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

get the api closer to what it was befor

authored by bad-example.com and committed by tangled.org 8c2ff0aa ac7f7943

+27 -32
+3 -2
benches/huge-car.rs
··· 32 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 33 33 let reader = tokio::io::BufReader::new(reader); 34 34 35 - let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap().unwrap() { 36 - Driver::Memory(_, mem_driver) => mem_driver, 35 + let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 + Driver::Memory(_, Some(mem_driver)) => mem_driver, 37 + Driver::Memory(_, None) => panic!("car was empty"), 37 38 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 39 }; 39 40
+3 -3
benches/non-huge-cars.rs
··· 40 40 41 41 async fn drive_car(bytes: &[u8]) -> usize { 42 42 let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 - None => return 0, 44 - Some(Driver::Memory(_, mem_driver)) => mem_driver, 45 - Some(Driver::Disk(_)) => panic!("not benching big cars here"), 43 + Driver::Memory(_, Some(mem_driver)) => mem_driver, 44 + Driver::Memory(_, None) => return 0, 45 + Driver::Disk(_) => panic!("not benching big cars here"), 46 46 }; 47 47 48 48 let mut n = 0;
+2 -3
examples/disk-read-file/main.rs
··· 42 42 .load_car(reader) 43 43 .await? 44 44 { 45 - None => panic!("empty mst! try a bigger car"), 46 - Some(Driver::Memory(_, _)) => panic!("try this on a bigger car"), 47 - Some(Driver::Disk(big_stuff)) => { 45 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 46 + Driver::Disk(big_stuff) => { 48 47 // we reach here if the repo was too big and needs to be spilled to 49 48 // disk to continue 50 49
+8 -7
examples/read-file/main.rs
··· 23 23 let reader = tokio::fs::File::open(file).await?; 24 24 let reader = tokio::io::BufReader::new(reader); 25 25 26 - let (commit, mut driver) = match DriverBuilder::new() 26 + let (commit, driver) = match DriverBuilder::new() 27 27 .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 28 28 .load_car(reader) 29 29 .await? 30 30 { 31 - None => todo!(), 32 - Some(Driver::Memory(commit, mem_driver)) => (commit, mem_driver), 33 - Some(Driver::Disk(_)) => panic!("this example doesn't handle big CARs"), 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 34 33 }; 35 34 36 35 log::info!("got commit: {commit:?}"); 37 36 38 37 let mut n = 0; 39 - while let Some(pairs) = driver.next_chunk(256).await? { 40 - n += pairs.len(); 41 - // log::info!("got {rkey:?}"); 38 + if let Some(mut driver) = driver { 39 + while let Some(pairs) = driver.next_chunk(256).await? { 40 + n += pairs.len(); 41 + // log::info!("got {rkey:?}"); 42 + } 42 43 } 43 44 log::info!("bye! total records={n}"); 44 45
+11 -17
src/drive.rs
··· 107 107 /// 108 108 /// You probably want to check the commit's signature. You can go ahead and 109 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver), 110 + Memory(Commit, Option<MemDriver>), 111 111 /// Blocks exceed the memory limit 112 112 /// 113 113 /// You'll need to provide a disk storage to continue. The commit will be ··· 159 159 } 160 160 } 161 161 /// Begin processing an atproto MST from a CAR file 162 - pub async fn load_car<R: AsyncRead + Unpin>( 163 - &self, 164 - reader: R, 165 - ) -> Result<Option<Driver<R>>, DriveError> { 162 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 166 163 Driver::load_car(reader, noop, self.mem_limit_mb).await 167 164 } 168 165 } ··· 185 182 self 186 183 } 187 184 /// Begin processing an atproto MST from a CAR file 188 - pub async fn load_car<R: AsyncRead + Unpin>( 189 - &self, 190 - reader: R, 191 - ) -> Result<Option<Driver<R>>, DriveError> { 185 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 192 186 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 193 187 } 194 188 } ··· 207 201 reader: R, 208 202 process: fn(Bytes) -> Bytes, 209 203 mem_limit_mb: usize, 210 - ) -> Result<Option<Driver<R>>, DriveError> { 204 + ) -> Result<Driver<R>, DriveError> { 211 205 let max_size = mem_limit_mb * 2_usize.pow(20); 212 206 let mut mem_blocks = HashMap::new(); 213 207 ··· 240 234 mem_size += maybe_processed.len(); 241 235 mem_blocks.insert(cid, maybe_processed); 242 236 if mem_size >= max_size { 243 - return Ok(Some(Driver::Disk(NeedDisk { 237 + return Ok(Driver::Disk(NeedDisk { 244 238 car, 245 239 root, 246 240 process, 247 241 max_size, 248 242 mem_blocks, 249 243 commit, 250 - }))); 244 + })); 251 245 } 252 246 } 253 247 ··· 264 258 }; 265 259 let Some(walker) = Walker::new(root_node) else { 266 260 // TODO: actually we still want the commit in this case 267 - return Ok(None); 261 + return Ok(Driver::Memory(commit, None)); 268 262 }; 269 263 270 - Ok(Some(Driver::Memory( 264 + Ok(Driver::Memory( 271 265 commit, 272 - MemDriver { 266 + Some(MemDriver { 273 267 blocks: mem_blocks, 274 268 walker, 275 269 process, 276 - }, 277 - ))) 270 + }), 271 + )) 278 272 } 279 273 } 280 274