Fast and robust atproto CAR file processing in rust

back to eager processing

eh

Changed files
+75 -40
examples
disk-read-file
src
+3 -4
examples/disk-read-file/main.rs
··· 40 40 // let stream = Box::pin(reader.stream()); 41 41 let stream = std::pin::pin!(reader.stream()); 42 42 43 - let (commit, v) = repo_stream::disk_drive::Vehicle::init(root, stream, redb_store, |block| { 44 - Ok::<_, Infallible>(block.len()) 45 - }) 46 - .await?; 43 + let (commit, v) = 44 + repo_stream::disk_drive::Vehicle::init(root, stream, redb_store, |block| block.len()) 45 + .await?; 47 46 let mut record_stream = std::pin::pin!(v.stream()); 48 47 49 48 log::info!("got commit: {commit:?}");
+56 -29
src/disk_drive.rs
··· 3 3 use std::collections::VecDeque; 4 4 use std::error::Error; 5 5 6 - use crate::disk_walk::{Trip, Walker, RkeyError}; 7 - use crate::mst::Commit; 6 + use crate::disk_walk::{RkeyError, Trip, Walker}; 7 + use crate::mst::{Commit, Node}; 8 8 9 9 use ipld_core::cid::Cid; 10 - use serde::{Serialize, de::DeserializeOwned}; 10 + use serde::{Deserialize, Serialize, de::DeserializeOwned}; 11 11 12 12 /// Errors that can happen while consuming and emitting blocks and records 13 13 #[derive(Debug, thiserror::Error)] 14 - pub enum DriveError<E: Error> { 14 + pub enum DriveError { 15 15 #[error("Failed to initialize CarReader: {0}")] 16 16 CarReader(#[from] iroh_car::Error), 17 17 #[error("Car block stream error: {0}")] ··· 28 28 WalkingProblem(#[from] WalkError), 29 29 #[error("whatever: {0}")] 30 30 Boooooo(String), 31 - #[error("processing error: {0}")] 32 - ProcessingError(E), 31 + #[error("Error while encoding: {0}")] 32 + EncodingError(#[from] bincode::error::EncodeError), 33 + #[error("Error while decoding: {0}")] 34 + DecodingError(#[from] bincode::error::DecodeError), 33 35 } 34 36 35 37 /// Limited subset of errors that can happen while walking ··· 56 58 #[error("Could not find block: {0}")] 57 59 MissingBlock(Cid), 58 60 } 61 + 62 + #[derive(Serialize, Deserialize)] 63 + pub enum MaybeProcessedBlock<T: Serialize> { 64 + Raw(Vec<u8>), 65 + Processed(T), 66 + } 67 + 68 + pub type Records = Vec<(String, Vec<u8>)>; 59 69 60 70 /// Storage backend for caching large-repo blocks 61 71 /// 62 72 /// Since 63 - pub trait BlockStore<MPB: Serialize + DeserializeOwned> { 64 - fn put_batch(&self, blocks: Vec<(Cid, MPB)>) -> impl Future<Output = Result<(), BlockStoreError>>; // unwraps for now 73 + pub trait BlockStore { 74 + fn put_batch( 75 + &self, 76 + blocks: Vec<(Cid, Vec<u8>)>, 77 + ) -> impl Future<Output = Result<(), BlockStoreError>>; // unwraps for now 65 78 fn walk_batch( 66 79 &self, 67 80 walker: Walker, 68 81 n: usize, 69 - ) -> impl Future<Output = Result<(Walker, Vec<(String, MPB)>), BlockStoreError>>; // boo string error for now because 82 + ) -> impl Future<Output = Result<(Walker, Records), BlockStoreError>>; // boo string error for now because 70 83 } 71 84 72 85 type CarBlock<E> = Result<(Cid, Vec<u8>), E>; 73 86 74 87 /// The core driver between the block stream and MST walker 75 - pub struct Vehicle<SE, S, T, BS, P, PE> 88 + pub struct Vehicle<SE, S, T, BS, P> 76 89 where 77 90 SE: Error + 'static, 78 91 S: Stream<Item = CarBlock<SE>>, 79 92 T: Clone + Serialize + DeserializeOwned, 80 - BS: BlockStore<Vec<u8>>, 81 - P: Fn(&[u8]) -> Result<T, PE>, 82 - PE: Error, 93 + BS: BlockStore, 94 + P: Fn(&[u8]) -> T, 83 95 { 84 96 #[allow(dead_code)] 85 97 block_stream: Option<S>, ··· 89 101 out_cache: VecDeque<(String, T)>, 90 102 } 91 103 92 - impl<SE, S, T, BS, P, PE> Vehicle<SE, S, T, BS, P, PE> 104 + impl<SE, S, T, BS, P> Vehicle<SE, S, T, BS, P> 93 105 where 94 106 SE: Error + 'static, 95 107 S: Stream<Item = CarBlock<SE>> + Unpin + Send, 96 108 T: Clone + Serialize + DeserializeOwned + Send, 97 - BS: BlockStore<Vec<u8>> + Send, 98 - P: Fn(&[u8]) -> Result<T, PE> + Send, 99 - PE: Error, 109 + BS: BlockStore + Send, 110 + P: Fn(&[u8]) -> T, 100 111 { 101 112 /// Set up the stream 102 113 /// ··· 120 131 block_stream: S, 121 132 block_store: BS, 122 133 process: P, 123 - ) -> Result<(Commit, Self), DriveError<PE>> { 134 + ) -> Result<(Commit, Self), DriveError> { 124 135 let mut commit = None; 125 136 126 137 log::warn!("init: load blocks"); 127 138 128 - let mut chunked = block_stream.try_chunks(4096); 139 + let mut chunked = block_stream.try_chunks(256); 129 140 130 141 // go ahead and put all blocks in the block store 131 142 while let Some(chunk) = chunked ··· 140 151 .map_err(|e| DriveError::BadCommit(e.into()))?; 141 152 commit = Some(c); 142 153 } else { 143 - to_insert.push((cid, data)); 154 + let wrapped = if Node::could_be(&data) { 155 + MaybeProcessedBlock::Raw(data) 156 + } else { 157 + MaybeProcessedBlock::Processed(process(&data)) 158 + }; 159 + let bytes = 160 + bincode::serde::encode_to_vec(wrapped, bincode::config::standard())?; 161 + 162 + to_insert.push((cid, bytes)); 144 163 } 145 164 } 146 165 block_store ··· 168 187 Ok((commit, me)) 169 188 } 170 189 171 - async fn load_chunk(&mut self, n: usize) -> Result<(), DriveError<PE>> { 190 + async fn load_chunk(&mut self, n: usize) -> Result<(), DriveError> { 172 191 let walker = std::mem::take(&mut self.walker); 173 192 let (walker, batch) = self 174 193 .block_store 175 194 .walk_batch(walker, n) 176 195 .await 177 - .map_err(|e| DriveError::Boooooo(format!("booo! {e}")))?; // TODO 196 + .map_err(|e| DriveError::Boooooo(format!("booo! (here right?) {e}")))?; // TODO 178 197 self.walker = walker; 179 198 180 199 let processed = batch 181 200 .into_iter() 182 - .map(|(k, raw)| (self.process)(&raw).map(|t| (k, t))) 183 - .collect::<Result<Vec<_>, _>>() 184 - .map_err(DriveError::ProcessingError)?; 201 + .map(|(k, encoded)| { 202 + let (decoded, n): (MaybeProcessedBlock<T>, usize) = 203 + bincode::serde::decode_from_slice(&encoded, bincode::config::standard())?; 204 + assert_eq!(n, encoded.len()); 205 + let processed = match decoded { 206 + MaybeProcessedBlock::Processed(t) => t, 207 + MaybeProcessedBlock::Raw(block) => (self.process)(&block), 208 + }; 209 + Ok((k, processed)) 210 + }) 211 + .collect::<Result<Vec<_>, DriveError>>()?; 185 212 186 213 self.out_cache.extend(processed); 187 214 Ok(()) ··· 193 220 /// (but non-zero), even if it's not the last chunk. 194 221 /// 195 222 /// an empty vec will be returned to signal the end. 196 - pub async fn next_chunk(&mut self, n: usize) -> Result<Vec<(String, T)>, DriveError<PE>> { 223 + pub async fn next_chunk(&mut self, n: usize) -> Result<Vec<(String, T)>, DriveError> { 197 224 if self.out_cache.is_empty() { 198 225 self.load_chunk(n).await?; 199 226 } ··· 201 228 } 202 229 203 230 /// Manually step through the record outputs 204 - pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> { 231 + pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> { 205 232 if self.out_cache.is_empty() { 206 - self.load_chunk(64).await?; // TODO 233 + self.load_chunk(128).await?; // TODO 207 234 } 208 235 Ok(self.out_cache.pop_front()) 209 236 } 210 237 211 238 /// Convert to a futures::stream of record outputs 212 - pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> { 239 + pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError>> { 213 240 futures::stream::try_unfold(self, |mut this| async move { 214 241 let maybe_record = this.next_record().await?; 215 242 Ok(maybe_record.map(|b| (b, this)))
+14 -5
src/disk_redb.rs
··· 1 - use crate::disk_drive::{BlockStore, BlockStoreError}; 1 + use crate::disk_drive::{BlockStore, BlockStoreError, MaybeProcessedBlock, Records}; 2 2 use crate::disk_walk::{Need, Walker}; 3 3 use ipld_core::cid::Cid; 4 4 use redb::{Database, Durability, Error, ReadableDatabase, TableDefinition}; ··· 40 40 } 41 41 } 42 42 43 - impl BlockStore<Vec<u8>> for RedbStore { 43 + impl BlockStore for RedbStore { 44 44 async fn put_batch(&self, blocks: Vec<(Cid, Vec<u8>)>) -> Result<(), BlockStoreError> { 45 45 let db = self.db.clone(); 46 46 tokio::task::spawn_blocking(move || -> Result<(), BlockStoreError> { ··· 65 65 &self, 66 66 mut walker: Walker, 67 67 n: usize, 68 - ) -> Result<(Walker, Vec<(String, Vec<u8>)>), BlockStoreError> { 68 + ) -> Result<(Walker, Records), BlockStoreError> { 69 69 let db = self.db.clone(); 70 70 tokio::task::spawn_blocking(move || -> Result<_, BlockStoreError> { 71 71 let tx = db.begin_read()?; ··· 83 83 let block = res.value(); 84 84 85 85 match need { 86 - Need::Node(_) => walker 87 - .handle_node(block)?, 86 + Need::Node(_) => { 87 + let (mpb, n) = 88 + bincode::serde::decode_from_slice(block, bincode::config::standard()) 89 + .unwrap(); 90 + assert_eq!(n, block.len()); 91 + // DANGER: we're throwing in unit () as a placeholder here and assuming bincode will still work since Raw is the first variant 92 + let MaybeProcessedBlock::Raw(bytes): MaybeProcessedBlock<()> = mpb else { 93 + panic!("should have not been processed"); // tODO 94 + }; 95 + walker.handle_node(&bytes)? 96 + } 88 97 Need::Record { rkey, .. } => { 89 98 out.push((rkey, block.to_vec())); 90 99 if out.len() >= n {
+2 -2
src/disk_walk.rs
··· 28 28 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 29 29 #[error("Encountered an rkey out of order while walking the MST")] 30 30 RkeyOutOfOrder, 31 - #[error("Failed to decode commit block: {0}")] 32 - BlockDecodeError(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 31 + #[error("Failed to decode node block: {0}")] 32 + NodeDecodeError(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 33 33 } 34 34 35 35 /// Walker outputs