Fast and robust atproto CAR file processing in rust
1//! Depth-first MST traversal
2
3use crate::disk::SqliteReader;
4use crate::drive::{DecodeError, MaybeProcessedBlock};
5use crate::mst::Node;
6use crate::process::Processable;
7use ipld_core::cid::Cid;
8use sha2::{Digest, Sha256};
9use std::collections::HashMap;
10use std::convert::Infallible;
11
12/// Errors that can happen while walking
13#[derive(Debug, thiserror::Error)]
14pub enum WalkError {
15 #[error("Failed to fingerprint commit block")]
16 BadCommitFingerprint,
17 #[error("Failed to decode commit block: {0}")]
18 BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
19 #[error("Action node error: {0}")]
20 MstError(#[from] MstError),
21 #[error("storage error: {0}")]
22 StorageError(#[from] rusqlite::Error),
23 #[error("Decode error: {0}")]
24 DecodeError(#[from] DecodeError),
25}
26
27/// Errors from invalid Rkeys
28#[derive(Debug, PartialEq, thiserror::Error)]
29pub enum MstError {
30 #[error("Failed to compute an rkey due to invalid prefix_len")]
31 EntryPrefixOutOfbounds,
32 #[error("RKey was not utf-8")]
33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
34 #[error("Nodes cannot be empty (except for an entirely empty MST)")]
35 EmptyNode,
36 #[error("Found an entry with rkey at the wrong depth")]
37 WrongDepth,
38 #[error("Lost track of our depth (possible bug?)")]
39 LostDepth,
40 #[error("MST depth underflow: depth-0 node with child trees")]
41 DepthUnderflow,
42 #[error("Encountered an rkey out of order while walking the MST")]
43 RkeyOutOfOrder,
44}
45
46/// Walker outputs
47#[derive(Debug)]
48pub enum Step<T> {
49 /// We needed this CID but it's not in the block store
50 Missing(Cid),
51 /// Reached the end of the MST! yay!
52 Finish,
53 /// A record was found!
54 Found { rkey: String, data: T },
55}
56
57#[derive(Debug, Clone, PartialEq)]
58enum Need {
59 Node { depth: Depth, cid: Cid },
60 Record { rkey: String, cid: Cid },
61}
62
63#[derive(Debug, Clone, Copy, PartialEq)]
64enum Depth {
65 Root,
66 Depth(u32),
67}
68
69impl Depth {
70 fn from_key(key: &[u8]) -> Self {
71 let mut zeros = 0;
72 for byte in Sha256::digest(key) {
73 let leading = byte.leading_zeros();
74 zeros += leading;
75 if leading < 8 {
76 break;
77 }
78 }
79 Self::Depth(zeros / 2) // truncating divide (rounds down)
80 }
81 fn next_expected(&self) -> Result<Option<u32>, MstError> {
82 match self {
83 Self::Root => Ok(None),
84 Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some),
85 }
86 }
87}
88
89fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90 // empty nodes are not allowed in the MST except in an empty MST
91 if node.is_empty() {
92 if parent_depth == Depth::Root {
93 return Ok(()); // empty mst, nothing to push
94 } else {
95 return Err(MstError::EmptyNode);
96 }
97 }
98
99 let mut entries = Vec::with_capacity(node.entries.len());
100 let mut prefix = vec![];
101 let mut this_depth = parent_depth.next_expected()?;
102
103 for entry in &node.entries {
104 let mut rkey = vec![];
105 let pre_checked = prefix
106 .get(..entry.prefix_len)
107 .ok_or(MstError::EntryPrefixOutOfbounds)?;
108 rkey.extend_from_slice(pre_checked);
109 rkey.extend_from_slice(&entry.keysuffix);
110
111 let Depth::Depth(key_depth) = Depth::from_key(&rkey) else {
112 return Err(MstError::WrongDepth);
113 };
114
115 // this_depth is `none` if we are the deepest child (directly below root)
116 // in that case we accept whatever highest depth is claimed
117 let expected_depth = match this_depth {
118 Some(d) => d,
119 None => {
120 this_depth = Some(key_depth);
121 key_depth
122 }
123 };
124
125 // all keys we find should be this depth
126 if key_depth != expected_depth {
127 return Err(MstError::DepthUnderflow);
128 }
129
130 prefix = rkey.clone();
131
132 entries.push(Need::Record {
133 rkey: String::from_utf8(rkey)?,
134 cid: entry.value,
135 });
136 if let Some(ref tree) = entry.tree {
137 entries.push(Need::Node {
138 depth: Depth::Depth(key_depth),
139 cid: *tree,
140 });
141 }
142 }
143
144 entries.reverse();
145 stack.append(&mut entries);
146
147 let d = this_depth.ok_or(MstError::LostDepth)?;
148
149 if let Some(tree) = node.left {
150 stack.push(Need::Node {
151 depth: Depth::Depth(d),
152 cid: tree,
153 });
154 }
155 Ok(())
156}
157
158/// Traverser of an atproto MST
159///
160/// Walks the tree from left-to-right in depth-first order
161#[derive(Debug)]
162pub struct Walker {
163 stack: Vec<Need>,
164 prev: String,
165}
166
167impl Walker {
168 pub fn new(tree_root_cid: Cid) -> Self {
169 Self {
170 stack: vec![Need::Node {
171 depth: Depth::Root,
172 cid: tree_root_cid,
173 }],
174 prev: "".to_string(),
175 }
176 }
177
178 /// Advance through nodes until we find a record or can't go further
179 pub fn step<T: Processable>(
180 &mut self,
181 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>,
182 process: impl Fn(Vec<u8>) -> T,
183 ) -> Result<Step<T>, WalkError> {
184 loop {
185 let Some(need) = self.stack.last_mut() else {
186 log::trace!("tried to walk but we're actually done.");
187 return Ok(Step::Finish);
188 };
189
190 match need {
191 &mut Need::Node { depth, cid } => {
192 log::trace!("need node {cid:?}");
193 let Some(block) = blocks.remove(&cid) else {
194 log::trace!("node not found, resting");
195 return Ok(Step::Missing(cid));
196 };
197
198 let MaybeProcessedBlock::Raw(data) = block else {
199 return Err(WalkError::BadCommitFingerprint);
200 };
201 let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
202 .map_err(WalkError::BadCommit)?;
203
204 // found node, make sure we remember
205 self.stack.pop();
206
207 // queue up work on the found node next
208 push_from_node(&mut self.stack, &node, depth)?;
209 }
210 Need::Record { rkey, cid } => {
211 log::trace!("need record {cid:?}");
212 // note that we cannot *remove* a record block, sadly, since
213 // there can be multiple rkeys pointing to the same cid.
214 let Some(data) = blocks.get_mut(cid) else {
215 return Ok(Step::Missing(*cid));
216 };
217 let rkey = rkey.clone();
218 let data = match data {
219 MaybeProcessedBlock::Raw(data) => process(data.to_vec()),
220 MaybeProcessedBlock::Processed(t) => t.clone(),
221 };
222
223 // found node, make sure we remember
224 self.stack.pop();
225
226 // rkeys *must* be in order or else the tree is invalid (or
227 // we have a bug)
228 if rkey <= self.prev {
229 return Err(MstError::RkeyOutOfOrder)?;
230 }
231 self.prev = rkey.clone();
232
233 return Ok(Step::Found { rkey, data });
234 }
235 }
236 }
237 }
238
239 /// blocking!!!!!!
240 pub fn disk_step<T: Processable>(
241 &mut self,
242 reader: &mut SqliteReader,
243 process: impl Fn(Vec<u8>) -> T,
244 ) -> Result<Step<T>, WalkError> {
245 loop {
246 let Some(need) = self.stack.last_mut() else {
247 log::trace!("tried to walk but we're actually done.");
248 return Ok(Step::Finish);
249 };
250
251 match need {
252 &mut Need::Node { depth, cid } => {
253 let cid_bytes = cid.to_bytes();
254 log::trace!("need node {cid:?}");
255 let Some(block_bytes) = reader.get(cid_bytes)? else {
256 log::trace!("node not found, resting");
257 return Ok(Step::Missing(cid));
258 };
259
260 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?;
261
262 let MaybeProcessedBlock::Raw(data) = block else {
263 return Err(WalkError::BadCommitFingerprint);
264 };
265 let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
266 .map_err(WalkError::BadCommit)?;
267
268 // found node, make sure we remember
269 self.stack.pop();
270
271 // queue up work on the found node next
272 push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?;
273 }
274 Need::Record { rkey, cid } => {
275 log::trace!("need record {cid:?}");
276 let cid_bytes = cid.to_bytes();
277 let Some(data_bytes) = reader.get(cid_bytes)? else {
278 log::trace!("record block not found, resting");
279 return Ok(Step::Missing(*cid));
280 };
281 let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?;
282 let rkey = rkey.clone();
283 let data = match data {
284 MaybeProcessedBlock::Raw(data) => process(data),
285 MaybeProcessedBlock::Processed(t) => t.clone(),
286 };
287
288 // found node, make sure we remember
289 self.stack.pop();
290
291 log::trace!("emitting a block as a step. depth={}", self.stack.len());
292
293 // rkeys *must* be in order or else the tree is invalid (or
294 // we have a bug)
295 if rkey <= self.prev {
296 return Err(MstError::RkeyOutOfOrder)?;
297 }
298 self.prev = rkey.clone();
299
300 return Ok(Step::Found { rkey, data });
301 }
302 }
303 }
304 }
305}
306
307#[cfg(test)]
308mod test {
309 use super::*;
310
311 fn cid1() -> Cid {
312 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
313 .parse()
314 .unwrap()
315 }
316
317 #[test]
318 fn test_depth_spec_0() {
319 let d = Depth::from_key(b"2653ae71");
320 assert_eq!(d, Depth::Depth(0))
321 }
322
323 #[test]
324 fn test_depth_spec_1() {
325 let d = Depth::from_key(b"blue");
326 assert_eq!(d, Depth::Depth(1))
327 }
328
329 #[test]
330 fn test_depth_spec_4() {
331 let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec");
332 assert_eq!(d, Depth::Depth(4))
333 }
334
335 #[test]
336 fn test_depth_spec_8() {
337 let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c");
338 assert_eq!(d, Depth::Depth(8))
339 }
340
341 #[test]
342 fn test_depth_ietf_draft_0() {
343 let d = Depth::from_key(b"key1");
344 assert_eq!(d, Depth::Depth(0))
345 }
346
347 #[test]
348 fn test_depth_ietf_draft_1() {
349 let d = Depth::from_key(b"key7");
350 assert_eq!(d, Depth::Depth(1))
351 }
352
353 #[test]
354 fn test_depth_ietf_draft_4() {
355 let d = Depth::from_key(b"key515");
356 assert_eq!(d, Depth::Depth(4))
357 }
358
359 #[test]
360 fn test_depth_interop() {
361 // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json
362 for (k, expected) in [
363 ("", 0),
364 ("asdf", 0),
365 ("blue", 1),
366 ("2653ae71", 0),
367 ("88bfafc7", 2),
368 ("2a92d355", 4),
369 ("884976f5", 6),
370 ("app.bsky.feed.post/454397e440ec", 4),
371 ("app.bsky.feed.post/9adeb165882c", 8),
372 ] {
373 let d = Depth::from_key(k.as_bytes());
374 assert_eq!(d, Depth::Depth(expected), "key: {}", k);
375 }
376 }
377
378 #[test]
379 fn test_push_empty_fails() {
380 let empty_node = Node {
381 left: None,
382 entries: vec![],
383 };
384 let mut stack = vec![];
385 let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4));
386 assert_eq!(err, Err(MstError::EmptyNode));
387 }
388
389 #[test]
390 fn test_push_one_node() {
391 let node = Node {
392 left: Some(cid1()),
393 entries: vec![],
394 };
395 let mut stack = vec![];
396 push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap();
397 assert_eq!(
398 stack.last(),
399 Some(Need::Node {
400 depth: Depth::Depth(3),
401 cid: cid1()
402 })
403 .as_ref()
404 );
405 }
406}