Fast and robust atproto CAR file processing in rust

files cleanup

-245
src/disk_drive.rs
··· 1 - use futures::Stream; 2 - use futures::TryStreamExt; 3 - use std::collections::VecDeque; 4 - use std::error::Error; 5 - 6 - use crate::disk_walk::{RkeyError, Trip, Walker}; 7 - use crate::mst::{Commit, Node}; 8 - 9 - use ipld_core::cid::Cid; 10 - use serde::{Deserialize, Serialize, de::DeserializeOwned}; 11 - 12 - /// Errors that can happen while consuming and emitting blocks and records 13 - #[derive(Debug, thiserror::Error)] 14 - pub enum DriveError { 15 - #[error("Failed to initialize CarReader: {0}")] 16 - CarReader(#[from] iroh_car::Error), 17 - #[error("Car block stream error: {0}")] 18 - CarBlockError(Box<dyn Error>), 19 - #[error("Failed to decode commit block: {0}")] 20 - BadCommit(Box<dyn Error>), 21 - #[error("The Commit block reference by the root was not found")] 22 - MissingCommit, 23 - #[error("The MST block {0} could not be found")] 24 - MissingBlock(Cid), 25 - #[error("Failed to walk the mst tree: {0}")] 26 - Tripped(#[from] Trip), 27 - #[error("whatever: {0}")] 28 - WalkingProblem(#[from] WalkError), 29 - #[error("whatever: {0}")] 30 - Boooooo(String), 31 - #[error("Error while encoding: {0}")] 32 - EncodingError(#[from] bincode::error::EncodeError), 33 - #[error("Error while decoding: {0}")] 34 - DecodingError(#[from] bincode::error::DecodeError), 35 - } 36 - 37 - /// Limited subset of errors that can happen while walking 38 - #[derive(Debug, thiserror::Error)] 39 - pub enum WalkError { 40 - #[error("The MST block {0} could not be found")] 41 - MissingBlock(Cid), 42 - #[error("Failed to walk the mst tree: {0}")] 43 - Tripped(#[from] Trip), 44 - } 45 - 46 - #[derive(Debug, thiserror::Error)] 47 - pub enum BlockStoreError { 48 - #[error("Error from the storage backend: {0}")] 49 - StorageBackend(Box<dyn Error + Send>), 50 - 51 - #[error(transparent)] 52 - RkeyError(#[from] RkeyError), 53 - 54 - // this should probably not be up here 55 - #[error("Failed to join tokio task: {0}")] 56 - JoinError(tokio::task::JoinError), 57 - 58 - #[error("Could not find block: {0}")] 59 - MissingBlock(Cid), 60 - } 61 - 62 - #[derive(Serialize, Deserialize)] 63 - pub enum MaybeProcessedBlock<T: Serialize> { 64 - Raw(Vec<u8>), 65 - Processed(T), 66 - } 67 - 68 - pub type Records = Vec<(String, Vec<u8>)>; 69 - 70 - /// Storage backend for caching large-repo blocks 71 - /// 72 - /// Since 73 - pub trait BlockStore { 74 - fn put_batch( 75 - &self, 76 - blocks: Vec<(Cid, Vec<u8>)>, 77 - ) -> impl Future<Output = Result<(), BlockStoreError>>; // unwraps for now 78 - fn walk_batch( 79 - &self, 80 - walker: Walker, 81 - n: usize, 82 - ) -> impl Future<Output = Result<(Walker, Records), BlockStoreError>>; // boo string error for now because 83 - } 84 - 85 - type CarBlock<E> = Result<(Cid, Vec<u8>), E>; 86 - 87 - /// The core driver between the block stream and MST walker 88 - pub struct Vehicle<SE, S, T, BS, P> 89 - where 90 - SE: Error + 'static, 91 - S: Stream<Item = CarBlock<SE>>, 92 - T: Clone + Serialize + DeserializeOwned, 93 - BS: BlockStore, 94 - P: Fn(&[u8]) -> T, 95 - { 96 - #[allow(dead_code)] 97 - block_stream: Option<S>, 98 - block_store: BS, 99 - walker: Walker, 100 - process: P, 101 - out_cache: VecDeque<(String, T)>, 102 - } 103 - 104 - impl<SE, S, T, BS, P> Vehicle<SE, S, T, BS, P> 105 - where 106 - SE: Error + 'static, 107 - S: Stream<Item = CarBlock<SE>> + Unpin + Send, 108 - T: Clone + Serialize + DeserializeOwned + Send, 109 - BS: BlockStore + Send, 110 - P: Fn(&[u8]) -> T, 111 - { 112 - /// Set up the stream 113 - /// 114 - /// This will eagerly consume blocks until the `Commit` object is found. 115 - /// *Usually* the it's the first block, but there is no guarantee. 116 - /// 117 - /// ### Parameters 118 - /// 119 - /// `root`: CID of the commit object that is the root of the MST 120 - /// 121 - /// `block_stream`: Input stream of raw CAR blocks 122 - /// 123 - /// `process`: record-transforming callback: 124 - /// 125 - /// For tasks where records can be quickly processed into a *smaller* 126 - /// useful representation, you can do that eagerly as blocks come in by 127 - /// passing the processor as a callback here. This can reduce overall 128 - /// memory usage. 129 - pub async fn init( 130 - root: Cid, 131 - block_stream: S, 132 - block_store: BS, 133 - process: P, 134 - ) -> Result<(Commit, Self), DriveError> { 135 - let mut commit = None; 136 - 137 - log::warn!("init: load blocks"); 138 - 139 - let mut chunked = block_stream.try_chunks(256); 140 - 141 - // go ahead and put all blocks in the block store 142 - while let Some(chunk) = chunked 143 - .try_next() 144 - .await 145 - .map_err(|e| DriveError::CarBlockError(e.into()))? 146 - { 147 - let mut to_insert = Vec::with_capacity(chunk.len()); 148 - for (cid, data) in chunk { 149 - if cid == root { 150 - let c: Commit = serde_ipld_dagcbor::from_slice(&data) 151 - .map_err(|e| DriveError::BadCommit(e.into()))?; 152 - commit = Some(c); 153 - } else { 154 - let wrapped = if Node::could_be(&data) { 155 - MaybeProcessedBlock::Raw(data) 156 - } else { 157 - MaybeProcessedBlock::Processed(process(&data)) 158 - }; 159 - let bytes = 160 - bincode::serde::encode_to_vec(wrapped, bincode::config::standard())?; 161 - 162 - to_insert.push((cid, bytes)); 163 - } 164 - } 165 - block_store 166 - .put_batch(to_insert) 167 - .await 168 - .map_err(|e| DriveError::Boooooo(format!("boooOOOOO! {e}")))?; // TODO 169 - } 170 - 171 - log::warn!("init: got commit?"); 172 - 173 - // we either broke out or read all the blocks without finding the commit... 174 - let commit = commit.ok_or(DriveError::MissingCommit)?; 175 - 176 - let walker = Walker::new(commit.data); 177 - 178 - log::warn!("init: wrapping up"); 179 - 180 - let me = Self { 181 - block_stream: None, 182 - block_store, 183 - walker, 184 - process, 185 - out_cache: VecDeque::new(), 186 - }; 187 - Ok((commit, me)) 188 - } 189 - 190 - async fn load_chunk(&mut self, n: usize) -> Result<(), DriveError> { 191 - let walker = std::mem::take(&mut self.walker); 192 - let (walker, batch) = self 193 - .block_store 194 - .walk_batch(walker, n) 195 - .await 196 - .map_err(|e| DriveError::Boooooo(format!("booo! (here right?) {e}")))?; // TODO 197 - self.walker = walker; 198 - 199 - let processed = batch 200 - .into_iter() 201 - .map(|(k, encoded)| { 202 - let (decoded, n): (MaybeProcessedBlock<T>, usize) = 203 - bincode::serde::decode_from_slice(&encoded, bincode::config::standard())?; 204 - assert_eq!(n, encoded.len()); 205 - let processed = match decoded { 206 - MaybeProcessedBlock::Processed(t) => t, 207 - MaybeProcessedBlock::Raw(block) => (self.process)(&block), 208 - }; 209 - Ok((k, processed)) 210 - }) 211 - .collect::<Result<Vec<_>, DriveError>>()?; 212 - 213 - self.out_cache.extend(processed); 214 - Ok(()) 215 - } 216 - 217 - /// Get a chunk of records at a time 218 - /// 219 - /// the number of returned records may be smaller or larger than requested 220 - /// (but non-zero), even if it's not the last chunk. 221 - /// 222 - /// an empty vec will be returned to signal the end. 223 - pub async fn next_chunk(&mut self, n: usize) -> Result<Vec<(String, T)>, DriveError> { 224 - if self.out_cache.is_empty() { 225 - self.load_chunk(n).await?; 226 - } 227 - Ok(std::mem::take(&mut self.out_cache).into()) 228 - } 229 - 230 - /// Manually step through the record outputs 231 - pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> { 232 - if self.out_cache.is_empty() { 233 - self.load_chunk(128).await?; // TODO 234 - } 235 - Ok(self.out_cache.pop_front()) 236 - } 237 - 238 - /// Convert to a futures::stream of record outputs 239 - pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError>> { 240 - futures::stream::try_unfold(self, |mut this| async move { 241 - let maybe_record = this.next_record().await?; 242 - Ok(maybe_record.map(|b| (b, this))) 243 - }) 244 - } 245 - }
-110
src/disk_redb.rs
··· 1 - use crate::disk_drive::{BlockStore, BlockStoreError, MaybeProcessedBlock, Records}; 2 - use crate::disk_walk::{Need, Walker}; 3 - use ipld_core::cid::Cid; 4 - use redb::{Database, Durability, Error, ReadableDatabase, TableDefinition}; 5 - use std::path::Path; 6 - use std::sync::Arc; 7 - 8 - const TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blocks"); 9 - 10 - pub struct RedbStore { 11 - #[allow(dead_code)] 12 - db: Arc<Database>, 13 - } 14 - 15 - impl RedbStore { 16 - pub async fn new(path: impl AsRef<Path> + 'static + Send) -> Result<Self, Error> { 17 - log::warn!("redb new"); 18 - let db = tokio::task::spawn_blocking(|| Database::create(path)) 19 - .await 20 - .unwrap()?; 21 - log::warn!("db created"); 22 - Ok(Self { db: db.into() }) 23 - } 24 - } 25 - 26 - // TODO: ship off to a blocking thread 27 - impl Drop for RedbStore { 28 - fn drop(&mut self) { 29 - let mut tx = self.db.begin_write().unwrap(); 30 - tx.set_durability(Durability::None).unwrap(); 31 - tx.delete_table(TABLE).unwrap(); 32 - tx.commit().unwrap(); 33 - } 34 - } 35 - 36 - impl<E: Into<Error>> From<E> for BlockStoreError { 37 - fn from(e: E) -> BlockStoreError { 38 - let e = Into::<Error>::into(e); 39 - BlockStoreError::StorageBackend(Box::new(e)) 40 - } 41 - } 42 - 43 - impl BlockStore for RedbStore { 44 - async fn put_batch(&self, blocks: Vec<(Cid, Vec<u8>)>) -> Result<(), BlockStoreError> { 45 - let db = self.db.clone(); 46 - tokio::task::spawn_blocking(move || -> Result<(), BlockStoreError> { 47 - let mut tx = db.begin_write()?; 48 - tx.set_durability(Durability::None)?; 49 - 50 - { 51 - let mut table = tx.open_table(TABLE)?; 52 - for (cid, t) in blocks { 53 - let key_bytes = cid.to_bytes(); 54 - table.insert(&*key_bytes, &*t)?; 55 - } 56 - } 57 - 58 - Ok(tx.commit()?) 59 - }) 60 - .await 61 - .map_err(BlockStoreError::JoinError)? 62 - } 63 - 64 - async fn walk_batch( 65 - &self, 66 - mut walker: Walker, 67 - n: usize, 68 - ) -> Result<(Walker, Records), BlockStoreError> { 69 - let db = self.db.clone(); 70 - tokio::task::spawn_blocking(move || -> Result<_, BlockStoreError> { 71 - let tx = db.begin_read()?; 72 - let table = tx.open_table(TABLE)?; 73 - 74 - let mut out = Vec::with_capacity(n); 75 - loop { 76 - let Some(need) = walker.next_needed()? else { 77 - break; 78 - }; 79 - let cid = need.cid(); 80 - let Some(res) = table.get(&*cid.to_bytes())? else { 81 - return Err(BlockStoreError::MissingBlock(cid)); 82 - }; 83 - let block = res.value(); 84 - 85 - match need { 86 - Need::Node(_) => { 87 - let (mpb, n) = 88 - bincode::serde::decode_from_slice(block, bincode::config::standard()) 89 - .unwrap(); 90 - assert_eq!(n, block.len()); 91 - // DANGER: we're throwing in unit () as a placeholder here and assuming bincode will still work since Raw is the first variant 92 - let MaybeProcessedBlock::Raw(bytes): MaybeProcessedBlock<()> = mpb else { 93 - panic!("should have not been processed"); // tODO 94 - }; 95 - walker.handle_node(&bytes)? 96 - } 97 - Need::Record { rkey, .. } => { 98 - out.push((rkey, block.to_vec())); 99 - if out.len() >= n { 100 - break; 101 - } 102 - } 103 - } 104 - } 105 - Ok((walker, out)) 106 - }) 107 - .await 108 - .map_err(BlockStoreError::JoinError)? 109 - } 110 - }
-65
src/disk_sqlite.rs
··· 1 - // use crate::disk_drive::BlockStore; 2 - // use ipld_core::cid::Cid; 3 - // use rusqlite::{Connection, OptionalExtension, Result}; 4 - // use serde::{Serialize, de::DeserializeOwned}; 5 - // use std::path::Path; 6 - 7 - // pub struct SqliteStore { 8 - // conn: Connection, 9 - // } 10 - 11 - // impl SqliteStore { 12 - // pub fn new(path: impl AsRef<Path>) -> Result<Self> { 13 - // let conn = Connection::open(path)?; 14 - // conn.pragma_update(None, "journal_mode", "WAL")?; 15 - // conn.pragma_update(None, "synchronous", "OFF")?; 16 - // conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?; 17 - // conn.execute( 18 - // "CREATE TABLE blocks ( 19 - // key BLOB PRIMARY KEY NOT NULL, 20 - // val BLOB NOT NULL 21 - // ) WITHOUT ROWID", 22 - // (), 23 - // )?; 24 - 25 - // Ok(Self { conn }) 26 - // } 27 - // } 28 - 29 - // impl Drop for SqliteStore { 30 - // fn drop(&mut self) { 31 - // self.conn.execute("DROP TABLE blocks", ()).unwrap(); 32 - // } 33 - // } 34 - 35 - // impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for SqliteStore { 36 - // fn put(&self, c: Cid, t: MPB) { 37 - // let key_bytes = c.to_bytes(); 38 - // let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 39 - 40 - // self.conn 41 - // .execute( 42 - // "INSERT INTO blocks (key, val) VALUES (?1, ?2)", 43 - // (&key_bytes, &val_bytes), 44 - // ) 45 - // .unwrap(); 46 - // } 47 - // fn get(&self, c: Cid) -> Option<MPB> { 48 - // let key_bytes = c.to_bytes(); 49 - 50 - // let val_bytes: Vec<u8> = self 51 - // .conn 52 - // .query_one( 53 - // "SELECT val FROM blocks WHERE key = ?1", 54 - // (&key_bytes,), 55 - // |row| row.get(0), 56 - // ) 57 - // .optional() 58 - // .unwrap()?; 59 - 60 - // let (t, n): (MPB, usize) = 61 - // bincode::serde::decode_from_slice(&val_bytes, bincode::config::standard()).unwrap(); 62 - // assert_eq!(val_bytes.len(), n); 63 - // Some(t) 64 - // } 65 - // }
-369
src/disk_walk.rs
··· 1 - //! Depth-first MST traversal 2 - 3 - use crate::mst::Node; 4 - use std::convert::Infallible; 5 - 6 - use ipld_core::cid::Cid; 7 - use serde::{Serialize, de::DeserializeOwned}; 8 - 9 - /// Errors that can happen while walking 10 - #[derive(Debug, thiserror::Error)] 11 - pub enum Trip { 12 - #[error("empty mst nodes are not allowed")] 13 - NodeEmpty, 14 - #[error("Failed to decode commit block: {0}")] 15 - BadCommit(serde_ipld_dagcbor::DecodeError<Infallible>), 16 - #[error("Action node error: {0}")] 17 - RkeyError(#[from] RkeyError), 18 - #[error("Process failed: {0}")] 19 - ProcessFailed(String), 20 - } 21 - 22 - /// Errors from invalid Rkeys 23 - #[derive(Debug, thiserror::Error)] 24 - pub enum RkeyError { 25 - #[error("Failed to compute an rkey due to invalid prefix_len")] 26 - EntryPrefixOutOfbounds, 27 - #[error("RKey was not utf-8")] 28 - EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 29 - #[error("Encountered an rkey out of order while walking the MST")] 30 - RkeyOutOfOrder, 31 - #[error("Failed to decode node block: {0}")] 32 - NodeDecodeError(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 33 - } 34 - 35 - /// Walker outputs 36 - #[derive(Debug)] 37 - pub enum Step<T: Serialize + DeserializeOwned> { 38 - /// We need a CID but it's not in the block store 39 - /// 40 - /// Give the needed CID to the driver so it can load blocks until it's found 41 - Rest(Cid), 42 - /// Reached the end of the MST! yay! 43 - Finish, 44 - /// A record was found! 45 - Step { rkey: String, data: T }, 46 - } 47 - 48 - #[derive(Debug, Clone, PartialEq)] 49 - pub enum Need { 50 - Node(Cid), 51 - Record { rkey: String, cid: Cid }, 52 - } 53 - 54 - impl Need { 55 - pub fn cid(&self) -> Cid { 56 - match self { 57 - Need::Node(cid) => *cid, 58 - Need::Record { cid, .. } => *cid, 59 - } 60 - } 61 - } 62 - 63 - fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> { 64 - let mut entries = Vec::with_capacity(node.entries.len()); 65 - 66 - let mut prefix = vec![]; 67 - for entry in &node.entries { 68 - let mut rkey = vec![]; 69 - let pre_checked = prefix 70 - .get(..entry.prefix_len) 71 - .ok_or(RkeyError::EntryPrefixOutOfbounds)?; 72 - rkey.extend_from_slice(pre_checked); 73 - rkey.extend_from_slice(&entry.keysuffix); 74 - prefix = rkey.clone(); 75 - 76 - entries.push(Need::Record { 77 - rkey: String::from_utf8(rkey)?, 78 - cid: entry.value, 79 - }); 80 - if let Some(ref tree) = entry.tree { 81 - entries.push(Need::Node(*tree)); 82 - } 83 - } 84 - 85 - entries.reverse(); 86 - stack.append(&mut entries); 87 - 88 - if let Some(tree) = node.left { 89 - stack.push(Need::Node(tree)); 90 - } 91 - Ok(()) 92 - } 93 - 94 - /// Traverser of an atproto MST 95 - /// 96 - /// Walks the tree from left-to-right in depth-first order 97 - /// 98 - /// (turning into more of a navigator) 99 - /// it doesn't quite feel like the divisions of responsibility are right around 100 - /// here yet. 101 - #[derive(Debug, Default)] 102 - pub struct Walker { 103 - stack: Vec<Need>, 104 - prev: String, 105 - } 106 - 107 - impl Walker { 108 - pub fn new(tree_root_cid: Cid) -> Self { 109 - Self { 110 - stack: vec![Need::Node(tree_root_cid)], 111 - prev: "".to_string(), 112 - } 113 - } 114 - 115 - pub fn next_needed(&mut self) -> Result<Option<Need>, RkeyError> { 116 - let Some(need) = self.stack.pop() else { 117 - return Ok(None); 118 - }; 119 - if let Need::Record { ref rkey, .. } = need { 120 - // rkeys *must* be in order or else the tree is invalid (or 121 - // we have a bug) 122 - if *rkey <= self.prev { 123 - return Err(RkeyError::RkeyOutOfOrder); 124 - } 125 - self.prev = rkey.clone(); 126 - } 127 - Ok(Some(need)) 128 - } 129 - 130 - /// hacky: this must be called after next_needed if it was a node 131 - pub fn handle_node(&mut self, block: &[u8]) -> Result<(), RkeyError> { 132 - let node = serde_ipld_dagcbor::from_slice::<Node>(block)?; 133 - push_from_node(&mut self.stack, &node)?; 134 - Ok(()) 135 - } 136 - } 137 - 138 - #[cfg(test)] 139 - mod test { 140 - use super::*; 141 - // use crate::mst::Entry; 142 - 143 - fn cid1() -> Cid { 144 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 145 - .parse() 146 - .unwrap() 147 - } 148 - // fn cid2() -> Cid { 149 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 150 - // .parse() 151 - // .unwrap() 152 - // } 153 - // fn cid3() -> Cid { 154 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 155 - // .parse() 156 - // .unwrap() 157 - // } 158 - // fn cid4() -> Cid { 159 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 160 - // .parse() 161 - // .unwrap() 162 - // } 163 - // fn cid5() -> Cid { 164 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 165 - // .parse() 166 - // .unwrap() 167 - // } 168 - // fn cid6() -> Cid { 169 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 170 - // .parse() 171 - // .unwrap() 172 - // } 173 - // fn cid7() -> Cid { 174 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 175 - // .parse() 176 - // .unwrap() 177 - // } 178 - // fn cid8() -> Cid { 179 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 180 - // .parse() 181 - // .unwrap() 182 - // } 183 - // fn cid9() -> Cid { 184 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 185 - // .parse() 186 - // .unwrap() 187 - // } 188 - 189 - #[test] 190 - fn test_next_from_node_empty() { 191 - let node = Node { 192 - left: None, 193 - entries: vec![], 194 - }; 195 - let mut stack = vec![]; 196 - push_from_node(&mut stack, &node).unwrap(); 197 - assert_eq!(stack.last(), None); 198 - } 199 - 200 - #[test] 201 - fn test_needs_from_node_just_left() { 202 - let node = Node { 203 - left: Some(cid1()), 204 - entries: vec![], 205 - }; 206 - let mut stack = vec![]; 207 - push_from_node(&mut stack, &node).unwrap(); 208 - assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref()); 209 - } 210 - 211 - // #[test] 212 - // fn test_needs_from_node_just_one_record() { 213 - // let node = Node { 214 - // left: None, 215 - // entries: vec![Entry { 216 - // keysuffix: "asdf".into(), 217 - // prefix_len: 0, 218 - // value: cid1(), 219 - // tree: None, 220 - // }], 221 - // }; 222 - // assert_eq!( 223 - // needs_from_node(node).unwrap(), 224 - // vec![Need::Record { 225 - // rkey: "asdf".into(), 226 - // cid: cid1(), 227 - // },] 228 - // ); 229 - // } 230 - 231 - // #[test] 232 - // fn test_needs_from_node_two_records() { 233 - // let node = Node { 234 - // left: None, 235 - // entries: vec![ 236 - // Entry { 237 - // keysuffix: "asdf".into(), 238 - // prefix_len: 0, 239 - // value: cid1(), 240 - // tree: None, 241 - // }, 242 - // Entry { 243 - // keysuffix: "gh".into(), 244 - // prefix_len: 2, 245 - // value: cid2(), 246 - // tree: None, 247 - // }, 248 - // ], 249 - // }; 250 - // assert_eq!( 251 - // needs_from_node(node).unwrap(), 252 - // vec![ 253 - // Need::Record { 254 - // rkey: "asdf".into(), 255 - // cid: cid1(), 256 - // }, 257 - // Need::Record { 258 - // rkey: "asgh".into(), 259 - // cid: cid2(), 260 - // }, 261 - // ] 262 - // ); 263 - // } 264 - 265 - // #[test] 266 - // fn test_needs_from_node_with_both() { 267 - // let node = Node { 268 - // left: None, 269 - // entries: vec![Entry { 270 - // keysuffix: "asdf".into(), 271 - // prefix_len: 0, 272 - // value: cid1(), 273 - // tree: Some(cid2()), 274 - // }], 275 - // }; 276 - // assert_eq!( 277 - // needs_from_node(node).unwrap(), 278 - // vec![ 279 - // Need::Record { 280 - // rkey: "asdf".into(), 281 - // cid: cid1(), 282 - // }, 283 - // Need::Node(cid2()), 284 - // ] 285 - // ); 286 - // } 287 - 288 - // #[test] 289 - // fn test_needs_from_node_left_and_record() { 290 - // let node = Node { 291 - // left: Some(cid1()), 292 - // entries: vec![Entry { 293 - // keysuffix: "asdf".into(), 294 - // prefix_len: 0, 295 - // value: cid2(), 296 - // tree: None, 297 - // }], 298 - // }; 299 - // assert_eq!( 300 - // needs_from_node(node).unwrap(), 301 - // vec![ 302 - // Need::Node(cid1()), 303 - // Need::Record { 304 - // rkey: "asdf".into(), 305 - // cid: cid2(), 306 - // }, 307 - // ] 308 - // ); 309 - // } 310 - 311 - // #[test] 312 - // fn test_needs_from_full_node() { 313 - // let node = Node { 314 - // left: Some(cid1()), 315 - // entries: vec![ 316 - // Entry { 317 - // keysuffix: "asdf".into(), 318 - // prefix_len: 0, 319 - // value: cid2(), 320 - // tree: Some(cid3()), 321 - // }, 322 - // Entry { 323 - // keysuffix: "ghi".into(), 324 - // prefix_len: 1, 325 - // value: cid4(), 326 - // tree: Some(cid5()), 327 - // }, 328 - // Entry { 329 - // keysuffix: "jkl".into(), 330 - // prefix_len: 2, 331 - // value: cid6(), 332 - // tree: Some(cid7()), 333 - // }, 334 - // Entry { 335 - // keysuffix: "mno".into(), 336 - // prefix_len: 4, 337 - // value: cid8(), 338 - // tree: Some(cid9()), 339 - // }, 340 - // ], 341 - // }; 342 - // assert_eq!( 343 - // needs_from_node(node).unwrap(), 344 - // vec![ 345 - // Need::Node(cid1()), 346 - // Need::Record { 347 - // rkey: "asdf".into(), 348 - // cid: cid2(), 349 - // }, 350 - // Need::Node(cid3()), 351 - // Need::Record { 352 - // rkey: "aghi".into(), 353 - // cid: cid4(), 354 - // }, 355 - // Need::Node(cid5()), 356 - // Need::Record { 357 - // rkey: "agjkl".into(), 358 - // cid: cid6(), 359 - // }, 360 - // Need::Node(cid7()), 361 - // Need::Record { 362 - // rkey: "agjkmno".into(), 363 - // cid: cid8(), 364 - // }, 365 - // Need::Node(cid9()), 366 - // ] 367 - // ); 368 - // } 369 - }
-4
src/lib.rs
··· 3 3 //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 4 4 5 5 pub mod disk; 6 - pub mod disk_drive; 7 - pub mod disk_redb; 8 - pub mod disk_sqlite; 9 - pub mod disk_walk; 10 6 pub mod drive; 11 7 pub mod mst; 12 8 pub mod walk;