Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

at disk 403 lines 13 kB view raw
1//! Depth-first MST traversal 2 3use crate::disk::SqliteReader; 4use crate::drive::{DecodeError, MaybeProcessedBlock}; 5use crate::mst::Node; 6use crate::process::Processable; 7use ipld_core::cid::Cid; 8use sha2::{Digest, Sha256}; 9use std::collections::HashMap; 10use std::convert::Infallible; 11 12/// Errors that can happen while walking 13#[derive(Debug, thiserror::Error)] 14pub enum WalkError { 15 #[error("Failed to fingerprint commit block")] 16 BadCommitFingerprint, 17 #[error("Failed to decode commit block: {0}")] 18 BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 StorageError(#[from] rusqlite::Error), 23 #[error("Decode error: {0}")] 24 DecodeError(#[from] DecodeError), 25} 26 27/// Errors from invalid Rkeys 28#[derive(Debug, PartialEq, thiserror::Error)] 29pub enum MstError { 30 #[error("Failed to compute an rkey due to invalid prefix_len")] 31 EntryPrefixOutOfbounds, 32 #[error("RKey was not utf-8")] 33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 EmptyNode, 36 #[error("Found an entry with rkey at the wrong depth")] 37 WrongDepth, 38 #[error("Lost track of our depth (possible bug?)")] 39 LostDepth, 40 #[error("MST depth underflow: depth-0 node with child trees")] 41 DepthUnderflow, 42 #[error("Encountered an rkey out of order while walking the MST")] 43 RkeyOutOfOrder, 44} 45 46/// Walker outputs 47#[derive(Debug)] 48pub enum Step<T> { 49 /// We needed this CID but it's not in the block store 50 Missing(Cid), 51 /// Reached the end of the MST! yay! 52 Finish, 53 /// A record was found! 54 Found { rkey: String, data: T }, 55} 56 57#[derive(Debug, Clone, PartialEq)] 58enum Need { 59 Node { depth: Depth, cid: Cid }, 60 Record { rkey: String, cid: Cid }, 61} 62 63#[derive(Debug, Clone, Copy, PartialEq)] 64enum Depth { 65 Root, 66 Depth(u32), 67} 68 69impl Depth { 70 fn from_key(key: &[u8]) -> Self { 71 let mut zeros = 0; 72 for byte in Sha256::digest(key) { 73 let leading = byte.leading_zeros(); 74 zeros += leading; 75 if leading < 8 { 76 break; 77 } 78 } 79 Self::Depth(zeros / 2) // truncating divide (rounds down) 80 } 81 fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 match self { 83 Self::Root => Ok(None), 84 Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 } 86 } 87} 88 89fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 // empty nodes are not allowed in the MST 91 // ...except for a single one for empty MST, but we wouldn't be pushing that 92 if node.is_empty() { 93 return Err(MstError::EmptyNode); 94 } 95 96 let mut entries = Vec::with_capacity(node.entries.len()); 97 let mut prefix = vec![]; 98 let mut this_depth = parent_depth.next_expected()?; 99 100 for entry in &node.entries { 101 let mut rkey = vec![]; 102 let pre_checked = prefix 103 .get(..entry.prefix_len) 104 .ok_or(MstError::EntryPrefixOutOfbounds)?; 105 rkey.extend_from_slice(pre_checked); 106 rkey.extend_from_slice(&entry.keysuffix); 107 108 let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 109 return Err(MstError::WrongDepth); 110 }; 111 112 // this_depth is `none` if we are the deepest child (directly below root) 113 // in that case we accept whatever highest depth is claimed 114 let expected_depth = match this_depth { 115 Some(d) => d, 116 None => { 117 this_depth = Some(key_depth); 118 key_depth 119 } 120 }; 121 122 // all keys we find should be this depth 123 if key_depth != expected_depth { 124 return Err(MstError::DepthUnderflow); 125 } 126 127 prefix = rkey.clone(); 128 129 entries.push(Need::Record { 130 rkey: String::from_utf8(rkey)?, 131 cid: entry.value, 132 }); 133 if let Some(ref tree) = entry.tree { 134 entries.push(Need::Node { 135 depth: Depth::Depth(key_depth), 136 cid: *tree, 137 }); 138 } 139 } 140 141 entries.reverse(); 142 stack.append(&mut entries); 143 144 let d = this_depth.ok_or(MstError::LostDepth)?; 145 146 if let Some(tree) = node.left { 147 stack.push(Need::Node { 148 depth: Depth::Depth(d), 149 cid: tree, 150 }); 151 } 152 Ok(()) 153} 154 155/// Traverser of an atproto MST 156/// 157/// Walks the tree from left-to-right in depth-first order 158#[derive(Debug)] 159pub struct Walker { 160 stack: Vec<Need>, 161 prev: String, 162} 163 164impl Walker { 165 pub fn new(tree_root_cid: Cid) -> Self { 166 Self { 167 stack: vec![Need::Node { 168 depth: Depth::Root, 169 cid: tree_root_cid, 170 }], 171 prev: "".to_string(), 172 } 173 } 174 175 /// Advance through nodes until we find a record or can't go further 176 pub fn step<T: Processable>( 177 &mut self, 178 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 179 process: impl Fn(Vec<u8>) -> T, 180 ) -> Result<Step<T>, WalkError> { 181 loop { 182 let Some(need) = self.stack.last_mut() else { 183 log::trace!("tried to walk but we're actually done."); 184 return Ok(Step::Finish); 185 }; 186 187 match need { 188 &mut Need::Node { depth, cid } => { 189 log::trace!("need node {cid:?}"); 190 let Some(block) = blocks.remove(&cid) else { 191 log::trace!("node not found, resting"); 192 return Ok(Step::Missing(cid)); 193 }; 194 195 let MaybeProcessedBlock::Raw(data) = block else { 196 return Err(WalkError::BadCommitFingerprint); 197 }; 198 let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 199 .map_err(WalkError::BadCommit)?; 200 201 // found node, make sure we remember 202 self.stack.pop(); 203 204 // queue up work on the found node next 205 push_from_node(&mut self.stack, &node, depth)?; 206 } 207 Need::Record { rkey, cid } => { 208 log::trace!("need record {cid:?}"); 209 // note that we cannot *remove* a record block, sadly, since 210 // there can be multiple rkeys pointing to the same cid. 211 let Some(data) = blocks.get_mut(cid) else { 212 return Ok(Step::Missing(*cid)); 213 }; 214 let rkey = rkey.clone(); 215 let data = match data { 216 MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 217 MaybeProcessedBlock::Processed(t) => t.clone(), 218 }; 219 220 // found node, make sure we remember 221 self.stack.pop(); 222 223 // rkeys *must* be in order or else the tree is invalid (or 224 // we have a bug) 225 if rkey <= self.prev { 226 return Err(MstError::RkeyOutOfOrder)?; 227 } 228 self.prev = rkey.clone(); 229 230 return Ok(Step::Found { rkey, data }); 231 } 232 } 233 } 234 } 235 236 /// blocking!!!!!! 237 pub fn disk_step<T: Processable>( 238 &mut self, 239 reader: &mut SqliteReader, 240 process: impl Fn(Vec<u8>) -> T, 241 ) -> Result<Step<T>, WalkError> { 242 loop { 243 let Some(need) = self.stack.last_mut() else { 244 log::trace!("tried to walk but we're actually done."); 245 return Ok(Step::Finish); 246 }; 247 248 match need { 249 &mut Need::Node { depth, cid } => { 250 let cid_bytes = cid.to_bytes(); 251 log::trace!("need node {cid:?}"); 252 let Some(block_bytes) = reader.get(cid_bytes)? else { 253 log::trace!("node not found, resting"); 254 return Ok(Step::Missing(cid)); 255 }; 256 257 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 258 259 let MaybeProcessedBlock::Raw(data) = block else { 260 return Err(WalkError::BadCommitFingerprint); 261 }; 262 let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 263 .map_err(WalkError::BadCommit)?; 264 265 // found node, make sure we remember 266 self.stack.pop(); 267 268 // queue up work on the found node next 269 push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 270 } 271 Need::Record { rkey, cid } => { 272 log::trace!("need record {cid:?}"); 273 let cid_bytes = cid.to_bytes(); 274 let Some(data_bytes) = reader.get(cid_bytes)? else { 275 log::trace!("record block not found, resting"); 276 return Ok(Step::Missing(*cid)); 277 }; 278 let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 279 let rkey = rkey.clone(); 280 let data = match data { 281 MaybeProcessedBlock::Raw(data) => process(data), 282 MaybeProcessedBlock::Processed(t) => t.clone(), 283 }; 284 285 // found node, make sure we remember 286 self.stack.pop(); 287 288 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 289 290 // rkeys *must* be in order or else the tree is invalid (or 291 // we have a bug) 292 if rkey <= self.prev { 293 return Err(MstError::RkeyOutOfOrder)?; 294 } 295 self.prev = rkey.clone(); 296 297 return Ok(Step::Found { rkey, data }); 298 } 299 } 300 } 301 } 302} 303 304#[cfg(test)] 305mod test { 306 use super::*; 307 308 fn cid1() -> Cid { 309 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 310 .parse() 311 .unwrap() 312 } 313 314 #[test] 315 fn test_depth_spec_0() { 316 let d = Depth::from_key(b"2653ae71"); 317 assert_eq!(d, Depth::Depth(0)) 318 } 319 320 #[test] 321 fn test_depth_spec_1() { 322 let d = Depth::from_key(b"blue"); 323 assert_eq!(d, Depth::Depth(1)) 324 } 325 326 #[test] 327 fn test_depth_spec_4() { 328 let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 329 assert_eq!(d, Depth::Depth(4)) 330 } 331 332 #[test] 333 fn test_depth_spec_8() { 334 let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 335 assert_eq!(d, Depth::Depth(8)) 336 } 337 338 #[test] 339 fn test_depth_ietf_draft_0() { 340 let d = Depth::from_key(b"key1"); 341 assert_eq!(d, Depth::Depth(0)) 342 } 343 344 #[test] 345 fn test_depth_ietf_draft_1() { 346 let d = Depth::from_key(b"key7"); 347 assert_eq!(d, Depth::Depth(1)) 348 } 349 350 #[test] 351 fn test_depth_ietf_draft_4() { 352 let d = Depth::from_key(b"key515"); 353 assert_eq!(d, Depth::Depth(4)) 354 } 355 356 #[test] 357 fn test_depth_interop() { 358 // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 359 for (k, expected) in [ 360 ("", 0), 361 ("asdf", 0), 362 ("blue", 1), 363 ("2653ae71", 0), 364 ("88bfafc7", 2), 365 ("2a92d355", 4), 366 ("884976f5", 6), 367 ("app.bsky.feed.post/454397e440ec", 4), 368 ("app.bsky.feed.post/9adeb165882c", 8), 369 ] { 370 let d = Depth::from_key(k.as_bytes()); 371 assert_eq!(d, Depth::Depth(expected), "key: {}", k); 372 } 373 } 374 375 #[test] 376 fn test_push_empty_fails() { 377 let empty_node = Node { 378 left: None, 379 entries: vec![], 380 }; 381 let mut stack = vec![]; 382 let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 383 assert_eq!(err, Err(MstError::EmptyNode)); 384 } 385 386 #[test] 387 fn test_push_one_node() { 388 let node = Node { 389 left: Some(cid1()), 390 entries: vec![], 391 }; 392 let mut stack = vec![]; 393 push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 394 assert_eq!( 395 stack.last(), 396 Some(Need::Node { 397 depth: Depth::Depth(3), 398 cid: cid1() 399 }) 400 .as_ref() 401 ); 402 } 403}