Fast and robust atproto CAR file processing in rust
at main 13 kB view raw
1//! Depth-first MST traversal 2 3use crate::disk::SqliteReader; 4use crate::drive::{DecodeError, MaybeProcessedBlock}; 5use crate::mst::Node; 6use crate::process::Processable; 7use ipld_core::cid::Cid; 8use sha2::{Digest, Sha256}; 9use std::collections::HashMap; 10use std::convert::Infallible; 11 12/// Errors that can happen while walking 13#[derive(Debug, thiserror::Error)] 14pub enum WalkError { 15 #[error("Failed to fingerprint commit block")] 16 BadCommitFingerprint, 17 #[error("Failed to decode commit block: {0}")] 18 BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 StorageError(#[from] rusqlite::Error), 23 #[error("Decode error: {0}")] 24 DecodeError(#[from] DecodeError), 25} 26 27/// Errors from invalid Rkeys 28#[derive(Debug, PartialEq, thiserror::Error)] 29pub enum MstError { 30 #[error("Failed to compute an rkey due to invalid prefix_len")] 31 EntryPrefixOutOfbounds, 32 #[error("RKey was not utf-8")] 33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 EmptyNode, 36 #[error("Found an entry with rkey at the wrong depth")] 37 WrongDepth, 38 #[error("Lost track of our depth (possible bug?)")] 39 LostDepth, 40 #[error("MST depth underflow: depth-0 node with child trees")] 41 DepthUnderflow, 42 #[error("Encountered an rkey out of order while walking the MST")] 43 RkeyOutOfOrder, 44} 45 46/// Walker outputs 47#[derive(Debug)] 48pub enum Step<T> { 49 /// We needed this CID but it's not in the block store 50 Missing(Cid), 51 /// Reached the end of the MST! yay! 52 Finish, 53 /// A record was found! 54 Found { rkey: String, data: T }, 55} 56 57#[derive(Debug, Clone, PartialEq)] 58enum Need { 59 Node { depth: Depth, cid: Cid }, 60 Record { rkey: String, cid: Cid }, 61} 62 63#[derive(Debug, Clone, Copy, PartialEq)] 64enum Depth { 65 Root, 66 Depth(u32), 67} 68 69impl Depth { 70 fn from_key(key: &[u8]) -> Self { 71 let mut zeros = 0; 72 for byte in Sha256::digest(key) { 73 let leading = byte.leading_zeros(); 74 zeros += leading; 75 if leading < 8 { 76 break; 77 } 78 } 79 Self::Depth(zeros / 2) // truncating divide (rounds down) 80 } 81 fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 match self { 83 Self::Root => Ok(None), 84 Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 } 86 } 87} 88 89fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 // empty nodes are not allowed in the MST except in an empty MST 91 if node.is_empty() { 92 if parent_depth == Depth::Root { 93 return Ok(()); // empty mst, nothing to push 94 } else { 95 return Err(MstError::EmptyNode); 96 } 97 } 98 99 let mut entries = Vec::with_capacity(node.entries.len()); 100 let mut prefix = vec![]; 101 let mut this_depth = parent_depth.next_expected()?; 102 103 for entry in &node.entries { 104 let mut rkey = vec![]; 105 let pre_checked = prefix 106 .get(..entry.prefix_len) 107 .ok_or(MstError::EntryPrefixOutOfbounds)?; 108 rkey.extend_from_slice(pre_checked); 109 rkey.extend_from_slice(&entry.keysuffix); 110 111 let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 112 return Err(MstError::WrongDepth); 113 }; 114 115 // this_depth is `none` if we are the deepest child (directly below root) 116 // in that case we accept whatever highest depth is claimed 117 let expected_depth = match this_depth { 118 Some(d) => d, 119 None => { 120 this_depth = Some(key_depth); 121 key_depth 122 } 123 }; 124 125 // all keys we find should be this depth 126 if key_depth != expected_depth { 127 return Err(MstError::DepthUnderflow); 128 } 129 130 prefix = rkey.clone(); 131 132 entries.push(Need::Record { 133 rkey: String::from_utf8(rkey)?, 134 cid: entry.value, 135 }); 136 if let Some(ref tree) = entry.tree { 137 entries.push(Need::Node { 138 depth: Depth::Depth(key_depth), 139 cid: *tree, 140 }); 141 } 142 } 143 144 entries.reverse(); 145 stack.append(&mut entries); 146 147 let d = this_depth.ok_or(MstError::LostDepth)?; 148 149 if let Some(tree) = node.left { 150 stack.push(Need::Node { 151 depth: Depth::Depth(d), 152 cid: tree, 153 }); 154 } 155 Ok(()) 156} 157 158/// Traverser of an atproto MST 159/// 160/// Walks the tree from left-to-right in depth-first order 161#[derive(Debug)] 162pub struct Walker { 163 stack: Vec<Need>, 164 prev: String, 165} 166 167impl Walker { 168 pub fn new(tree_root_cid: Cid) -> Self { 169 Self { 170 stack: vec![Need::Node { 171 depth: Depth::Root, 172 cid: tree_root_cid, 173 }], 174 prev: "".to_string(), 175 } 176 } 177 178 /// Advance through nodes until we find a record or can't go further 179 pub fn step<T: Processable>( 180 &mut self, 181 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 182 process: impl Fn(Vec<u8>) -> T, 183 ) -> Result<Step<T>, WalkError> { 184 loop { 185 let Some(need) = self.stack.last_mut() else { 186 log::trace!("tried to walk but we're actually done."); 187 return Ok(Step::Finish); 188 }; 189 190 match need { 191 &mut Need::Node { depth, cid } => { 192 log::trace!("need node {cid:?}"); 193 let Some(block) = blocks.remove(&cid) else { 194 log::trace!("node not found, resting"); 195 return Ok(Step::Missing(cid)); 196 }; 197 198 let MaybeProcessedBlock::Raw(data) = block else { 199 return Err(WalkError::BadCommitFingerprint); 200 }; 201 let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 202 .map_err(WalkError::BadCommit)?; 203 204 // found node, make sure we remember 205 self.stack.pop(); 206 207 // queue up work on the found node next 208 push_from_node(&mut self.stack, &node, depth)?; 209 } 210 Need::Record { rkey, cid } => { 211 log::trace!("need record {cid:?}"); 212 // note that we cannot *remove* a record block, sadly, since 213 // there can be multiple rkeys pointing to the same cid. 214 let Some(data) = blocks.get_mut(cid) else { 215 return Ok(Step::Missing(*cid)); 216 }; 217 let rkey = rkey.clone(); 218 let data = match data { 219 MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 220 MaybeProcessedBlock::Processed(t) => t.clone(), 221 }; 222 223 // found node, make sure we remember 224 self.stack.pop(); 225 226 // rkeys *must* be in order or else the tree is invalid (or 227 // we have a bug) 228 if rkey <= self.prev { 229 return Err(MstError::RkeyOutOfOrder)?; 230 } 231 self.prev = rkey.clone(); 232 233 return Ok(Step::Found { rkey, data }); 234 } 235 } 236 } 237 } 238 239 /// blocking!!!!!! 240 pub fn disk_step<T: Processable>( 241 &mut self, 242 reader: &mut SqliteReader, 243 process: impl Fn(Vec<u8>) -> T, 244 ) -> Result<Step<T>, WalkError> { 245 loop { 246 let Some(need) = self.stack.last_mut() else { 247 log::trace!("tried to walk but we're actually done."); 248 return Ok(Step::Finish); 249 }; 250 251 match need { 252 &mut Need::Node { depth, cid } => { 253 let cid_bytes = cid.to_bytes(); 254 log::trace!("need node {cid:?}"); 255 let Some(block_bytes) = reader.get(cid_bytes)? else { 256 log::trace!("node not found, resting"); 257 return Ok(Step::Missing(cid)); 258 }; 259 260 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 261 262 let MaybeProcessedBlock::Raw(data) = block else { 263 return Err(WalkError::BadCommitFingerprint); 264 }; 265 let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 266 .map_err(WalkError::BadCommit)?; 267 268 // found node, make sure we remember 269 self.stack.pop(); 270 271 // queue up work on the found node next 272 push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 273 } 274 Need::Record { rkey, cid } => { 275 log::trace!("need record {cid:?}"); 276 let cid_bytes = cid.to_bytes(); 277 let Some(data_bytes) = reader.get(cid_bytes)? else { 278 log::trace!("record block not found, resting"); 279 return Ok(Step::Missing(*cid)); 280 }; 281 let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 282 let rkey = rkey.clone(); 283 let data = match data { 284 MaybeProcessedBlock::Raw(data) => process(data), 285 MaybeProcessedBlock::Processed(t) => t.clone(), 286 }; 287 288 // found node, make sure we remember 289 self.stack.pop(); 290 291 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 292 293 // rkeys *must* be in order or else the tree is invalid (or 294 // we have a bug) 295 if rkey <= self.prev { 296 return Err(MstError::RkeyOutOfOrder)?; 297 } 298 self.prev = rkey.clone(); 299 300 return Ok(Step::Found { rkey, data }); 301 } 302 } 303 } 304 } 305} 306 307#[cfg(test)] 308mod test { 309 use super::*; 310 311 fn cid1() -> Cid { 312 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 313 .parse() 314 .unwrap() 315 } 316 317 #[test] 318 fn test_depth_spec_0() { 319 let d = Depth::from_key(b"2653ae71"); 320 assert_eq!(d, Depth::Depth(0)) 321 } 322 323 #[test] 324 fn test_depth_spec_1() { 325 let d = Depth::from_key(b"blue"); 326 assert_eq!(d, Depth::Depth(1)) 327 } 328 329 #[test] 330 fn test_depth_spec_4() { 331 let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 332 assert_eq!(d, Depth::Depth(4)) 333 } 334 335 #[test] 336 fn test_depth_spec_8() { 337 let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 338 assert_eq!(d, Depth::Depth(8)) 339 } 340 341 #[test] 342 fn test_depth_ietf_draft_0() { 343 let d = Depth::from_key(b"key1"); 344 assert_eq!(d, Depth::Depth(0)) 345 } 346 347 #[test] 348 fn test_depth_ietf_draft_1() { 349 let d = Depth::from_key(b"key7"); 350 assert_eq!(d, Depth::Depth(1)) 351 } 352 353 #[test] 354 fn test_depth_ietf_draft_4() { 355 let d = Depth::from_key(b"key515"); 356 assert_eq!(d, Depth::Depth(4)) 357 } 358 359 #[test] 360 fn test_depth_interop() { 361 // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 362 for (k, expected) in [ 363 ("", 0), 364 ("asdf", 0), 365 ("blue", 1), 366 ("2653ae71", 0), 367 ("88bfafc7", 2), 368 ("2a92d355", 4), 369 ("884976f5", 6), 370 ("app.bsky.feed.post/454397e440ec", 4), 371 ("app.bsky.feed.post/9adeb165882c", 8), 372 ] { 373 let d = Depth::from_key(k.as_bytes()); 374 assert_eq!(d, Depth::Depth(expected), "key: {}", k); 375 } 376 } 377 378 #[test] 379 fn test_push_empty_fails() { 380 let empty_node = Node { 381 left: None, 382 entries: vec![], 383 }; 384 let mut stack = vec![]; 385 let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 386 assert_eq!(err, Err(MstError::EmptyNode)); 387 } 388 389 #[test] 390 fn test_push_one_node() { 391 let node = Node { 392 left: Some(cid1()), 393 entries: vec![], 394 }; 395 let mut stack = vec![]; 396 push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 397 assert_eq!( 398 stack.last(), 399 Some(Need::Node { 400 depth: Depth::Depth(3), 401 cid: cid1() 402 }) 403 .as_ref() 404 ); 405 } 406}