Fast and robust atproto CAR file processing in rust

disk kinda sorta works?

Changed files
+585 -200
examples
disk-read-file
read-file
src
+28 -28
examples/disk-read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 - use futures::TryStreamExt; 4 - use iroh_car::CarReader; 5 - use std::convert::Infallible; 3 + use repo_stream::disk::SqliteStore; 4 + use repo_stream::drive::Processable; 5 + use serde::{Deserialize, Serialize}; 6 6 use std::path::PathBuf; 7 7 8 8 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 15 15 tmpfile: PathBuf, 16 16 } 17 17 18 + #[derive(Clone, Serialize, Deserialize)] 19 + struct S(usize); 20 + 21 + impl Processable for S {} 22 + 18 23 #[tokio::main] 19 24 async fn main() -> Result<()> { 20 25 env_logger::init(); ··· 23 28 let reader = tokio::fs::File::open(car).await?; 24 29 let reader = tokio::io::BufReader::new(reader); 25 30 26 - println!("hello!"); 27 - 28 - let reader = CarReader::new(reader).await?; 29 - 30 - let redb_store = repo_stream::disk_redb::RedbStore::new(tmpfile).await?; 31 - 32 - let root = reader 33 - .header() 34 - .roots() 35 - .first() 36 - .ok_or("missing root")? 37 - .clone(); 38 - log::debug!("root: {root:?}"); 39 - 40 - // let stream = Box::pin(reader.stream()); 41 - let stream = std::pin::pin!(reader.stream()); 42 - 43 - let (commit, v) = 44 - repo_stream::disk_drive::Vehicle::init(root, stream, redb_store, |block| block.len()) 45 - .await?; 46 - let mut record_stream = std::pin::pin!(v.stream()); 31 + let mut driver = 32 + match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024).await? { 33 + repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 34 + repo_stream::drive::Vehicle::Big(big_stuff) => { 35 + let disk_store = SqliteStore::new(tmpfile); 36 + let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 37 + log::warn!("big: {:?}", commit); 38 + driver 39 + } 40 + }; 47 41 48 - log::info!("got commit: {commit:?}"); 42 + println!("hello!"); 49 43 50 - while let Some((rkey, _rec)) = record_stream.try_next().await? { 51 - log::info!("got {rkey:?}"); 44 + let mut n = 0; 45 + loop { 46 + let (d, Some(pairs)) = driver.next_chunk(256).await? else { 47 + break; 48 + }; 49 + driver = d; 50 + n += pairs.len(); 51 + // log::info!("got {rkey:?}"); 52 52 } 53 - log::info!("bye!"); 53 + log::info!("bye! {n}"); 54 54 55 55 Ok(()) 56 56 }
+10 -25
examples/read-file/main.rs
··· 1 1 extern crate repo_stream; 2 2 use clap::Parser; 3 - use futures::TryStreamExt; 4 - use iroh_car::CarReader; 5 - use std::convert::Infallible; 6 3 use std::path::PathBuf; 7 4 8 5 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 21 18 let reader = tokio::fs::File::open(file).await?; 22 19 let reader = tokio::io::BufReader::new(reader); 23 20 24 - println!("hello!"); 25 - 26 - let reader = CarReader::new(reader).await?; 27 - 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root")? 33 - .clone(); 34 - log::debug!("root: {root:?}"); 35 - 36 - // let stream = Box::pin(reader.stream()); 37 - let stream = std::pin::pin!(reader.stream()); 38 - 39 - let (commit, v) = 40 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 41 - .await?; 42 - let mut record_stream = std::pin::pin!(v.stream()); 21 + let (commit, mut driver) = 22 + match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * 1024).await? { 23 + repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver), 24 + repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"), 25 + }; 43 26 44 27 log::info!("got commit: {commit:?}"); 45 28 46 - while let Some((rkey, _rec)) = record_stream.try_next().await? { 47 - log::info!("got {rkey:?}"); 29 + let mut n = 0; 30 + while let Some(pairs) = driver.next_chunk(256).await? { 31 + n += pairs.len(); 32 + // log::info!("got {rkey:?}"); 48 33 } 49 - log::info!("bye!"); 34 + log::info!("bye! {n}"); 50 35 51 36 Ok(()) 52 37 }
+161
src/disk.rs
··· 1 + use rusqlite::OptionalExtension; 2 + use std::error::Error; 3 + use std::path::PathBuf; 4 + 5 + pub trait StorageErrorBase: Error + Send + 'static {} 6 + 7 + /// high level potential storage resource 8 + /// 9 + /// separating this allows (hopefully) implementing a storage pool that can 10 + /// async-block when until a member is available to use 11 + pub trait DiskStore { 12 + type StorageError: StorageErrorBase + Send; 13 + type Access: DiskAccess<StorageError = Self::StorageError>; 14 + fn get_access(&mut self) -> impl Future<Output = Result<Self::Access, Self::StorageError>>; 15 + } 16 + 17 + /// actual concrete access to disk storage 18 + pub trait DiskAccess: Send { 19 + type StorageError: StorageErrorBase; 20 + 21 + fn get_writer(&mut self) -> Result<impl DiskWriter<Self::StorageError>, Self::StorageError>; 22 + 23 + fn get_reader( 24 + &self, 25 + ) -> Result<impl DiskReader<StorageError = Self::StorageError>, Self::StorageError>; 26 + 27 + // TODO: force a cleanup implementation? 28 + } 29 + 30 + pub trait DiskWriter<E: StorageErrorBase> { 31 + fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), E>; 32 + } 33 + 34 + pub trait DiskReader { 35 + type StorageError: StorageErrorBase; 36 + fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, Self::StorageError>; 37 + } 38 + 39 + ///////////////// 40 + 41 + pub struct SqliteStore { 42 + path: PathBuf, 43 + } 44 + 45 + impl SqliteStore { 46 + pub fn new(path: PathBuf) -> Self { 47 + Self { path } 48 + } 49 + } 50 + 51 + impl StorageErrorBase for rusqlite::Error {} 52 + 53 + impl DiskStore for SqliteStore { 54 + type StorageError = rusqlite::Error; 55 + type Access = SqliteAccess; 56 + async fn get_access(&mut self) -> Result<SqliteAccess, rusqlite::Error> { 57 + let path = self.path.clone(); 58 + let conn = tokio::task::spawn_blocking(move || { 59 + let conn = rusqlite::Connection::open(path)?; 60 + 61 + conn.pragma_update(None, "journal_mode", "WAL")?; 62 + conn.pragma_update(None, "synchronous", "OFF")?; 63 + conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?; 64 + conn.execute( 65 + "CREATE TABLE blocks ( 66 + key BLOB PRIMARY KEY NOT NULL, 67 + val BLOB NOT NULL 68 + ) WITHOUT ROWID", 69 + (), 70 + )?; 71 + 72 + Ok::<_, Self::StorageError>(conn) 73 + }) 74 + .await 75 + .expect("join error")?; 76 + 77 + Ok(SqliteAccess { conn }) 78 + } 79 + } 80 + 81 + pub struct SqliteAccess { 82 + conn: rusqlite::Connection, 83 + } 84 + 85 + impl DiskAccess for SqliteAccess { 86 + type StorageError = rusqlite::Error; 87 + fn get_writer(&mut self) -> Result<impl DiskWriter<rusqlite::Error>, rusqlite::Error> { 88 + let insert_stmt = self 89 + .conn 90 + .prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 91 + Ok(SqliteWriter { insert_stmt }) 92 + } 93 + fn get_reader( 94 + &self, 95 + ) -> Result<impl DiskReader<StorageError = rusqlite::Error>, rusqlite::Error> { 96 + let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 97 + Ok(SqliteReader { select_stmt }) 98 + } 99 + } 100 + 101 + pub struct SqliteWriter<'conn> { 102 + insert_stmt: rusqlite::Statement<'conn>, 103 + } 104 + 105 + impl DiskWriter<rusqlite::Error> for SqliteWriter<'_> { 106 + fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> rusqlite::Result<()> { 107 + self.insert_stmt.execute((key, val))?; 108 + Ok(()) 109 + } 110 + } 111 + 112 + pub struct SqliteReader<'conn> { 113 + select_stmt: rusqlite::Statement<'conn>, 114 + } 115 + 116 + impl DiskReader for SqliteReader<'_> { 117 + type StorageError = rusqlite::Error; 118 + fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 119 + self.select_stmt 120 + .query_one((&key,), |row| row.get(0)) 121 + .optional() 122 + } 123 + } 124 + 125 + // /// The main storage interface for MST blocks 126 + // /// 127 + // /// **Note**: `get` and `put` are **synchronous methods that may block** 128 + // pub trait BlockStore<T: Clone> { 129 + // fn get(&self, cid: Cid) -> Option<MaybeProcessedBlock<T>>; 130 + // fn put(&mut self, cid: Cid, mpb: MaybeProcessedBlock<T>); 131 + // } 132 + 133 + // ///// wheee 134 + 135 + // /// In-memory MST block storage 136 + // /// 137 + // /// a thin wrapper around a hashmap 138 + // pub struct MemoryStore<T: Clone> { 139 + // map: HashMap<Cid, MaybeProcessedBlock<T>>, 140 + // } 141 + 142 + // impl<T: Clone> BlockStore<T> for MemoryStore<T> { 143 + // fn get(&self, cid: Cid) -> Option<MaybeProcessedBlock<T>> { 144 + // self.map.get(&cid).map(|t| t.clone()) 145 + // } 146 + // fn put(&mut self, cid: Cid, mpb: MaybeProcessedBlock<T>) { 147 + // self.map.insert(cid, mpb); 148 + // } 149 + // } 150 + 151 + // //// the fun bits 152 + 153 + // pub struct HybridStore<T: Clone, D: DiskStore> { 154 + // mem: MemoryStore<T>, 155 + // disk: D, 156 + // } 157 + 158 + // impl<T: Clone, D: DiskStore> BlockStore<T> for HybridStore<T, D> { 159 + // fn get(&self, _cid: Cid) -> Option<MaybeProcessedBlock<T>> { todo!() } 160 + // fn put(&mut self, _cid: Cid, _mpb: MaybeProcessedBlock<T>) { todo!() } 161 + // }
+288 -116
src/drive.rs
··· 1 1 //! Consume an MST block stream, producing an ordered stream of records 2 2 3 - use futures::{Stream, TryStreamExt}; 3 + use crate::disk::{DiskAccess, DiskStore, DiskWriter, StorageErrorBase}; 4 4 use ipld_core::cid::Cid; 5 + use iroh_car::CarReader; 6 + use serde::de::DeserializeOwned; 7 + use serde::{Deserialize, Serialize}; 5 8 use std::collections::HashMap; 6 - use std::error::Error; 9 + use std::convert::Infallible; 10 + use tokio::io::AsyncRead; 7 11 8 12 use crate::mst::{Commit, Node}; 9 - use crate::walk::{Step, Trip, Walker}; 13 + use crate::walk::{DiskTrip, Step, Trip, Walker}; 10 14 11 15 /// Errors that can happen while consuming and emitting blocks and records 12 16 #[derive(Debug, thiserror::Error)] 13 - pub enum DriveError<E: Error> { 14 - #[error("Failed to initialize CarReader: {0}")] 17 + pub enum DriveError { 18 + #[error("Error from iroh_car: {0}")] 15 19 CarReader(#[from] iroh_car::Error), 16 - #[error("Car block stream error: {0}")] 17 - CarBlockError(Box<dyn Error>), 18 20 #[error("Failed to decode commit block: {0}")] 19 - BadCommit(Box<dyn Error>), 21 + BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 20 22 #[error("The Commit block reference by the root was not found")] 21 23 MissingCommit, 22 24 #[error("The MST block {0} could not be found")] 23 25 MissingBlock(Cid), 24 26 #[error("Failed to walk the mst tree: {0}")] 25 - Tripped(#[from] Trip<E>), 27 + Tripped(#[from] Trip), 28 + #[error("CAR file had no roots")] 29 + MissingRoot, 30 + } 31 + 32 + #[derive(Debug, thiserror::Error)] 33 + pub enum DiskDriveError<E: StorageErrorBase> { 34 + #[error("Error from iroh_car: {0}")] 35 + CarReader(#[from] iroh_car::Error), 36 + #[error("Failed to decode commit block: {0}")] 37 + BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 38 + #[error("Storage error")] 39 + StorageError(#[from] E), 40 + #[error("The Commit block reference by the root was not found")] 41 + MissingCommit, 42 + #[error("The MST block {0} could not be found")] 43 + MissingBlock(Cid), 44 + #[error("Encode error: {0}")] 45 + BincodeEncodeError(#[from] bincode::error::EncodeError), 46 + #[error("Decode error: {0}")] 47 + BincodeDecodeError(#[from] bincode::error::DecodeError), 48 + #[error("disk tripped: {0}")] 49 + DiskTripped(#[from] DiskTrip<E>), 26 50 } 27 51 28 - type CarBlock<E> = Result<(Cid, Vec<u8>), E>; 52 + // #[derive(Debug, thiserror::Error)] 53 + // pub enum Boooooo<E: StorageErrorBase> { 54 + // #[error("disk tripped: {0}")] 55 + // DiskTripped(#[from] DiskTrip<E>), 56 + // #[error("dde whatever: {0}")] 57 + // DiskDriveError(#[from] DiskDriveError<E>), 58 + // } 59 + 60 + pub trait Processable: Clone + Serialize + DeserializeOwned {} 29 61 30 - #[derive(Debug)] 31 - pub enum MaybeProcessedBlock<T, E> { 62 + #[derive(Debug, Clone, Serialize, Deserialize)] 63 + pub enum MaybeProcessedBlock<T> { 32 64 /// A block that's *probably* a Node (but we can't know yet) 33 65 /// 34 66 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 50 82 /// There's an alternative here, which would be to kick unprocessable blocks 51 83 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 52 84 /// surface the typed error later if needed by trying to reprocess. 53 - Processed(Result<T, E>), 85 + Processed(T), 54 86 } 55 87 56 - /// The core driver between the block stream and MST walker 57 - pub struct Vehicle<SE, S, T, P, PE> 58 - where 59 - S: Stream<Item = CarBlock<SE>>, 60 - P: Fn(&[u8]) -> Result<T, PE>, 61 - PE: Error, 62 - { 63 - block_stream: S, 64 - blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>, 65 - walker: Walker, 66 - process: P, 88 + impl<T: Processable> Processable for MaybeProcessedBlock<T> {} 89 + 90 + pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> { 91 + Lil(Commit, MemDriver<T>), 92 + Big(BigCar<R, T>), 67 93 } 68 94 69 - impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE> 70 - where 71 - SE: Error + 'static, 72 - S: Stream<Item = CarBlock<SE>> + Unpin, 73 - P: Fn(&[u8]) -> Result<T, PE>, 74 - PE: Error, 75 - { 76 - /// Set up the stream 77 - /// 78 - /// This will eagerly consume blocks until the `Commit` object is found. 79 - /// *Usually* the it's the first block, but there is no guarantee. 80 - /// 81 - /// ### Parameters 82 - /// 83 - /// `root`: CID of the commit object that is the root of the MST 84 - /// 85 - /// `block_stream`: Input stream of raw CAR blocks 86 - /// 87 - /// `process`: record-transforming callback: 88 - /// 89 - /// For tasks where records can be quickly processed into a *smaller* 90 - /// useful representation, you can do that eagerly as blocks come in by 91 - /// passing the processor as a callback here. This can reduce overall 92 - /// memory usage. 93 - pub async fn init( 94 - root: Cid, 95 - mut block_stream: S, 96 - process: P, 97 - ) -> Result<(Commit, Self), DriveError<PE>> { 98 - let mut blocks = HashMap::new(); 95 + pub async fn load_car<R: AsyncRead + Unpin, T: Processable>( 96 + reader: R, 97 + process: fn(&[u8]) -> T, 98 + max_size: usize, 99 + ) -> Result<Vehicle<R, T>, DriveError> { 100 + let mut mem_blocks = HashMap::new(); 101 + 102 + let mut car = CarReader::new(reader).await?; 103 + 104 + let root = *car 105 + .header() 106 + .roots() 107 + .first() 108 + .ok_or(DriveError::MissingRoot)?; 109 + log::debug!("root: {root:?}"); 110 + 111 + let mut commit = None; 112 + 113 + // try to load all the blocks into memory 114 + while let Some((cid, data)) = car.next_block().await? { 115 + // the root commit is a Special Third Kind of block that we need to make 116 + // sure not to optimistically send to the processing function 117 + if cid == root { 118 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 119 + commit = Some(c); 120 + continue; 121 + } 99 122 100 - let mut commit = None; 123 + // remaining possible types: node, record, other. optimistically process 124 + // TODO: get the actual in-memory size to compute disk spill 125 + let maybe_processed = if Node::could_be(&data) { 126 + MaybeProcessedBlock::Raw(data) 127 + } else { 128 + MaybeProcessedBlock::Processed(process(&data)) 129 + }; 101 130 102 - while let Some((cid, data)) = block_stream 103 - .try_next() 104 - .await 105 - .map_err(|e| DriveError::CarBlockError(e.into()))? 106 - { 107 - if cid == root { 108 - let c: Commit = serde_ipld_dagcbor::from_slice(&data) 109 - .map_err(|e| DriveError::BadCommit(e.into()))?; 110 - commit = Some(c); 111 - break; 112 - } else { 113 - blocks.insert( 114 - cid, 115 - if Node::could_be(&data) { 116 - MaybeProcessedBlock::Raw(data) 117 - } else { 118 - MaybeProcessedBlock::Processed(process(&data)) 119 - }, 120 - ); 121 - } 131 + // stash (maybe processed) blocks in memory as long as we have room 132 + mem_blocks.insert(cid, maybe_processed); 133 + if mem_blocks.len() >= max_size { 134 + return Ok(Vehicle::Big(BigCar { 135 + car, 136 + root, 137 + process, 138 + max_size, 139 + mem_blocks, 140 + commit, 141 + })); 122 142 } 143 + } 123 144 124 - // we either broke out or read all the blocks without finding the commit... 125 - let commit = commit.ok_or(DriveError::MissingCommit)?; 145 + // all blocks loaded and we fit in memory! hopefully we found the commit... 146 + let commit = commit.ok_or(DriveError::MissingCommit)?; 126 147 127 - let walker = Walker::new(commit.data); 148 + let walker = Walker::new(commit.data); 128 149 129 - let me = Self { 130 - block_stream, 131 - blocks, 150 + Ok(Vehicle::Lil( 151 + commit, 152 + MemDriver { 153 + blocks: mem_blocks, 132 154 walker, 133 155 process, 134 - }; 135 - Ok((commit, me)) 136 - } 156 + }, 157 + )) 158 + } 137 159 138 - async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> { 139 - while let Some((cid, data)) = self 140 - .block_stream 141 - .try_next() 142 - .await 143 - .map_err(|e| DriveError::CarBlockError(e.into()))? 144 - { 145 - self.blocks.insert( 146 - cid, 147 - if Node::could_be(&data) { 160 + /// a paritally memory-loaded car file that needs disk spillover to continue 161 + pub struct BigCar<R: AsyncRead + Unpin, T: Processable> { 162 + car: CarReader<R>, 163 + root: Cid, 164 + process: fn(&[u8]) -> T, 165 + max_size: usize, 166 + mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 167 + pub commit: Option<Commit>, 168 + } 169 + 170 + fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 171 + bincode::serde::encode_to_vec(v, bincode::config::standard()) 172 + } 173 + 174 + pub fn decode<T: Processable>(bytes: &[u8]) -> Result<T, bincode::error::DecodeError> { 175 + let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 176 + assert_eq!(n, bytes.len(), "expected to decode all bytes"); // TODO 177 + Ok(t) 178 + } 179 + 180 + impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> BigCar<R, T> { 181 + pub async fn finish_loading<S: DiskStore>( 182 + mut self, 183 + mut store: S, 184 + ) -> Result<(Commit, BigCarReady<T, S::Access>), DiskDriveError<S::StorageError>> 185 + where 186 + S::Access: Send + 'static, 187 + S::StorageError: 'static, 188 + { 189 + // set up access for real 190 + let mut access = store.get_access().await?; 191 + 192 + // move access in and back out so we can manage lifetimes 193 + // dump mem blocks into the store 194 + access = tokio::task::spawn(async move { 195 + let mut writer = access.get_writer()?; 196 + for (k, v) in self.mem_blocks { 197 + let key_bytes = k.to_bytes(); 198 + let val_bytes = encode(v)?; // TODO 199 + writer.put(key_bytes, val_bytes)?; 200 + } 201 + drop(writer); // cannot outlive access 202 + Ok::<_, DiskDriveError<S::StorageError>>(access) 203 + }) 204 + .await 205 + .unwrap()?; 206 + 207 + // dump the rest to disk (in chunks) 208 + loop { 209 + let mut chunk = vec![]; 210 + loop { 211 + let Some((cid, data)) = self.car.next_block().await? else { 212 + break; 213 + }; 214 + // we still gotta keep checking for the root since we might not have it 215 + if cid == self.root { 216 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 217 + self.commit = Some(c); 218 + continue; 219 + } 220 + // remaining possible types: node, record, other. optimistically process 221 + // TODO: get the actual in-memory size to compute disk spill 222 + let maybe_processed = if Node::could_be(&data) { 148 223 MaybeProcessedBlock::Raw(data) 149 224 } else { 150 225 MaybeProcessedBlock::Processed((self.process)(&data)) 151 - }, 152 - ); 153 - if cid == cid_needed { 154 - return Ok(()); 226 + }; 227 + chunk.push((cid, maybe_processed)); 228 + if chunk.len() >= self.max_size { 229 + // eventually this won't be .len() 230 + break; 231 + } 155 232 } 233 + if chunk.is_empty() { 234 + break; 235 + } 236 + 237 + // move access in and back out so we can manage lifetimes 238 + // dump mem blocks into the store 239 + access = tokio::task::spawn_blocking(move || { 240 + let mut writer = access.get_writer()?; 241 + for (k, v) in chunk { 242 + let key_bytes = k.to_bytes(); 243 + let val_bytes = encode(v)?; // TODO 244 + writer.put(key_bytes, val_bytes)?; 245 + } 246 + drop(writer); // cannot outlive access 247 + Ok::<_, DiskDriveError<S::StorageError>>(access) 248 + }) 249 + .await 250 + .unwrap()?; // TODO 156 251 } 157 252 158 - // if we never found the block 159 - Err(DriveError::MissingBlock(cid_needed)) 253 + let commit = self.commit.ok_or(DiskDriveError::MissingCommit)?; 254 + 255 + let walker = Walker::new(commit.data); 256 + 257 + Ok(( 258 + commit, 259 + BigCarReady { 260 + process: self.process, 261 + access, 262 + walker, 263 + }, 264 + )) 160 265 } 266 + } 161 267 268 + pub struct BigCarReady<T: Clone, A: DiskAccess> { 269 + process: fn(&[u8]) -> T, 270 + access: A, 271 + walker: Walker, 272 + } 273 + 274 + impl<T: Processable + Send + 'static, A: DiskAccess + Send + 'static> BigCarReady<T, A> { 275 + pub async fn next_chunk( 276 + mut self, 277 + n: usize, 278 + ) -> Result<(Self, Option<Vec<(String, T)>>), DiskDriveError<A::StorageError>> 279 + where 280 + A::StorageError: Send, 281 + { 282 + let mut out = Vec::with_capacity(n); 283 + (self, out) = tokio::task::spawn_blocking(move || { 284 + let access = self.access; 285 + let mut reader = access.get_reader()?; 286 + 287 + for _ in 0..n { 288 + // walk as far as we can until we run out of blocks or find a record 289 + match self.walker.disk_step(&mut reader, self.process)? { 290 + Step::Missing(cid) => return Err(DiskDriveError::MissingBlock(cid)), 291 + Step::Finish => break, 292 + Step::Step { rkey, data } => { 293 + out.push((rkey, data)); 294 + continue; 295 + } 296 + }; 297 + } 298 + 299 + drop(reader); // cannot outlive access 300 + self.access = access; 301 + Ok::<_, DiskDriveError<A::StorageError>>((self, out)) 302 + }) 303 + .await 304 + .unwrap()?; // TODO 305 + 306 + if out.is_empty() { 307 + Ok((self, None)) 308 + } else { 309 + Ok((self, Some(out))) 310 + } 311 + } 312 + } 313 + 314 + /// The core driver between the block stream and MST walker 315 + /// 316 + /// In the future, PDSs will export CARs in a stream-friendly order that will 317 + /// enable processing them with tiny memory overhead. But that future is not 318 + /// here yet. 319 + /// 320 + /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 321 + /// optimistic stream features: we load all block first, then walk the MST. 322 + /// 323 + /// This makes things much simpler: we only need to worry about spilling to disk 324 + /// in one place, and we always have a reasonable expecatation about how much 325 + /// work the init function will do. We can drop the CAR reader before walking, 326 + /// so the sync/async boundaries become a little easier to work around. 327 + #[derive(Debug)] 328 + pub struct MemDriver<T: Processable> { 329 + blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 330 + walker: Walker, 331 + process: fn(&[u8]) -> T, 332 + } 333 + 334 + impl<T: Processable> MemDriver<T> { 162 335 /// Manually step through the record outputs 163 - pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> { 164 - loop { 336 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<(String, T)>>, DriveError> { 337 + let mut out = Vec::with_capacity(n); 338 + for _ in 0..n { 165 339 // walk as far as we can until we run out of blocks or find a record 166 - let cid_needed = match self.walker.step(&mut self.blocks, &self.process)? { 167 - Step::Rest(cid) => cid, 168 - Step::Finish => return Ok(None), 169 - Step::Step { rkey, data } => return Ok(Some((rkey, data))), 340 + match self.walker.step(&mut self.blocks, self.process)? { 341 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 342 + Step::Finish => break, 343 + Step::Step { rkey, data } => { 344 + out.push((rkey, data)); 345 + continue; 346 + } 170 347 }; 348 + } 171 349 172 - // load blocks until we reach that cid 173 - self.drive_until(cid_needed).await?; 350 + if out.is_empty() { 351 + Ok(None) 352 + } else { 353 + Ok(Some(out)) 174 354 } 175 - } 176 - 177 - /// Convert to a futures::stream of record outputs 178 - pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> { 179 - futures::stream::try_unfold(self, |mut this| async move { 180 - let maybe_record = this.next_record().await?; 181 - Ok(maybe_record.map(|b| (b, this))) 182 - }) 183 355 } 184 356 }
+1
src/lib.rs
··· 2 2 //! 3 3 //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 4 4 5 + pub mod disk; 5 6 pub mod disk_drive; 6 7 pub mod disk_redb; 7 8 pub mod disk_sqlite;
+97 -31
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::drive::MaybeProcessedBlock; 3 + use crate::disk::{DiskReader, StorageErrorBase}; 4 + use crate::drive::{MaybeProcessedBlock, Processable}; 4 5 use crate::mst::Node; 5 6 use ipld_core::cid::Cid; 6 7 use std::collections::HashMap; 7 - use std::error::Error; 8 + use std::convert::Infallible; 8 9 9 10 /// Errors that can happen while walking 10 11 #[derive(Debug, thiserror::Error)] 11 - pub enum Trip<E: Error> { 12 + pub enum Trip { 12 13 #[error("empty mst nodes are not allowed")] 13 14 NodeEmpty, 15 + #[error("Failed to fingerprint commit block")] 16 + BadCommitFingerprint, 14 17 #[error("Failed to decode commit block: {0}")] 15 - BadCommit(Box<dyn std::error::Error>), 18 + BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 16 19 #[error("Action node error: {0}")] 17 20 RkeyError(#[from] RkeyError), 18 - #[error("Process failed: {0}")] 19 - ProcessFailed(E), 20 21 #[error("Encountered an rkey out of order while walking the MST")] 21 22 RkeyOutOfOrder, 22 23 } 23 24 25 + /// Errors that can happen while walking 26 + #[derive(Debug, thiserror::Error)] 27 + pub enum DiskTrip<E: StorageErrorBase> { 28 + #[error("tripped: {0}")] 29 + Trip(#[from] Trip), 30 + #[error("storage error: {0}")] 31 + StorageError(#[from] E), 32 + #[error("Decode error: {0}")] 33 + BincodeDecodeError(#[from] bincode::error::DecodeError), 34 + } 35 + 24 36 /// Errors from invalid Rkeys 25 37 #[derive(Debug, thiserror::Error)] 26 38 pub enum RkeyError { ··· 33 45 /// Walker outputs 34 46 #[derive(Debug)] 35 47 pub enum Step<T> { 36 - /// We need a CID but it's not in the block store 37 - /// 38 - /// Give the needed CID to the driver so it can load blocks until it's found 39 - Rest(Cid), 48 + /// We needed this CID but it's not in the block store 49 + Missing(Cid), 40 50 /// Reached the end of the MST! yay! 41 51 Finish, 42 52 /// A record was found! ··· 98 108 } 99 109 100 110 /// Advance through nodes until we find a record or can't go further 101 - pub fn step<T: Clone, E: Error>( 111 + pub fn step<T: Processable>( 102 112 &mut self, 103 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>, 104 - process: impl Fn(&[u8]) -> Result<T, E>, 105 - ) -> Result<Step<T>, Trip<E>> { 113 + blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 114 + process: impl Fn(&[u8]) -> T, 115 + ) -> Result<Step<T>, Trip> { 106 116 loop { 107 117 let Some(mut need) = self.stack.last() else { 108 118 log::trace!("tried to walk but we're actually done."); ··· 114 124 log::trace!("need node {cid:?}"); 115 125 let Some(block) = blocks.remove(cid) else { 116 126 log::trace!("node not found, resting"); 117 - return Ok(Step::Rest(*cid)); 127 + return Ok(Step::Missing(*cid)); 118 128 }; 119 129 120 130 let MaybeProcessedBlock::Raw(data) = block else { 121 - return Err(Trip::BadCommit("failed commit fingerprint".into())); 131 + return Err(Trip::BadCommitFingerprint); 122 132 }; 123 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 124 - .map_err(|e| Trip::BadCommit(e.into()))?; 133 + let node = 134 + serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?; 125 135 126 136 // found node, make sure we remember 127 137 self.stack.pop(); ··· 133 143 log::trace!("need record {cid:?}"); 134 144 let Some(data) = blocks.get_mut(cid) else { 135 145 log::trace!("record block not found, resting"); 136 - return Ok(Step::Rest(*cid)); 146 + return Ok(Step::Missing(*cid)); 137 147 }; 138 148 let rkey = rkey.clone(); 139 149 let data = match data { 140 150 MaybeProcessedBlock::Raw(data) => process(data), 141 - MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()), 142 - bad => { 143 - // big hack to pull the error out -- this corrupts 144 - // a block, so we should not continue trying to work 145 - let mut steal = MaybeProcessedBlock::Raw(vec![]); 146 - std::mem::swap(&mut steal, bad); 147 - let MaybeProcessedBlock::Processed(Err(e)) = steal else { 148 - unreachable!(); 149 - }; 150 - return Err(Trip::ProcessFailed(e)); 151 - } 151 + MaybeProcessedBlock::Processed(t) => t.clone(), 152 152 }; 153 153 154 154 // found node, make sure we remember 155 155 self.stack.pop(); 156 156 157 157 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 158 - let data = data.map_err(Trip::ProcessFailed)?; 159 158 160 159 // rkeys *must* be in order or else the tree is invalid (or 161 160 // we have a bug) 162 161 if rkey <= self.prev { 163 162 return Err(Trip::RkeyOutOfOrder); 163 + } 164 + self.prev = rkey.clone(); 165 + 166 + return Ok(Step::Step { rkey, data }); 167 + } 168 + } 169 + } 170 + } 171 + 172 + /// blocking!!!!!! 173 + pub fn disk_step<T: Processable, R: DiskReader>( 174 + &mut self, 175 + reader: &mut R, 176 + process: impl Fn(&[u8]) -> T, 177 + ) -> Result<Step<T>, DiskTrip<R::StorageError>> { 178 + loop { 179 + let Some(mut need) = self.stack.last() else { 180 + log::trace!("tried to walk but we're actually done."); 181 + return Ok(Step::Finish); 182 + }; 183 + 184 + match &mut need { 185 + Need::Node(cid) => { 186 + let cid_bytes = cid.to_bytes(); 187 + log::trace!("need node {cid:?}"); 188 + let Some(block_bytes) = reader.get(cid_bytes)? else { 189 + log::trace!("node not found, resting"); 190 + return Ok(Step::Missing(*cid)); 191 + }; 192 + 193 + let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 194 + 195 + let MaybeProcessedBlock::Raw(data) = block else { 196 + return Err(Trip::BadCommitFingerprint.into()); 197 + }; 198 + let node = 199 + serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?; 200 + 201 + // found node, make sure we remember 202 + self.stack.pop(); 203 + 204 + // queue up work on the found node next 205 + push_from_node(&mut self.stack, &node).map_err(Trip::RkeyError)?; 206 + } 207 + Need::Record { rkey, cid } => { 208 + log::trace!("need record {cid:?}"); 209 + let cid_bytes = cid.to_bytes(); 210 + let Some(data_bytes) = reader.get(cid_bytes)? else { 211 + log::trace!("record block not found, resting"); 212 + return Ok(Step::Missing(*cid)); 213 + }; 214 + let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 215 + let rkey = rkey.clone(); 216 + let data = match data { 217 + MaybeProcessedBlock::Raw(data) => process(&data), 218 + MaybeProcessedBlock::Processed(t) => t.clone(), 219 + }; 220 + 221 + // found node, make sure we remember 222 + self.stack.pop(); 223 + 224 + log::trace!("emitting a block as a step. depth={}", self.stack.len()); 225 + 226 + // rkeys *must* be in order or else the tree is invalid (or 227 + // we have a bug) 228 + if rkey <= self.prev { 229 + return Err(DiskTrip::Trip(Trip::RkeyOutOfOrder)); 164 230 } 165 231 self.prev = rkey.clone(); 166 232