Fast and robust atproto CAR file processing in rust
1//! Depth-first MST traversal
2
3use crate::disk::SqliteReader;
4use crate::drive::{DecodeError, MaybeProcessedBlock};
5use crate::mst::Node;
6use crate::process::Processable;
7use ipld_core::cid::Cid;
8use sha2::{Digest, Sha256};
9use std::collections::HashMap;
10use std::convert::Infallible;
11
12/// Errors that can happen while walking
13#[derive(Debug, thiserror::Error)]
14pub enum WalkError {
15 #[error("Failed to fingerprint commit block")]
16 BadCommitFingerprint,
17 #[error("Failed to decode commit block: {0}")]
18 BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
19 #[error("Action node error: {0}")]
20 MstError(#[from] MstError),
21 #[error("storage error: {0}")]
22 StorageError(#[from] rusqlite::Error),
23 #[error("Decode error: {0}")]
24 DecodeError(#[from] DecodeError),
25}
26
27/// Errors from invalid Rkeys
28#[derive(Debug, PartialEq, thiserror::Error)]
29pub enum MstError {
30 #[error("Failed to compute an rkey due to invalid prefix_len")]
31 EntryPrefixOutOfbounds,
32 #[error("RKey was not utf-8")]
33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
34 #[error("Nodes cannot be empty (except for an entirely empty MST)")]
35 EmptyNode,
36 #[error("Found an entry with rkey at the wrong depth")]
37 WrongDepth,
38 #[error("Lost track of our depth (possible bug?)")]
39 LostDepth,
40 #[error("MST depth underflow: depth-0 node with child trees")]
41 DepthUnderflow,
42 #[error("Encountered an rkey out of order while walking the MST")]
43 RkeyOutOfOrder,
44}
45
46/// Walker outputs
47#[derive(Debug)]
48pub enum Step<T> {
49 /// We needed this CID but it's not in the block store
50 Missing(Cid),
51 /// Reached the end of the MST! yay!
52 Finish,
53 /// A record was found!
54 Found { rkey: String, data: T },
55}
56
57#[derive(Debug, Clone, PartialEq)]
58enum Need {
59 Node { depth: Depth, cid: Cid },
60 Record { rkey: String, cid: Cid },
61}
62
63#[derive(Debug, Clone, Copy, PartialEq)]
64enum Depth {
65 Root,
66 Depth(u32),
67}
68
69impl Depth {
70 fn from_key(key: &[u8]) -> Self {
71 let mut zeros = 0;
72 for byte in Sha256::digest(key) {
73 let leading = byte.leading_zeros();
74 zeros += leading;
75 if leading < 8 {
76 break;
77 }
78 }
79 Self::Depth(zeros / 2) // truncating divide (rounds down)
80 }
81 fn next_expected(&self) -> Result<Option<u32>, MstError> {
82 match self {
83 Self::Root => Ok(None),
84 Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some),
85 }
86 }
87}
88
89fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90 // empty nodes are not allowed in the MST
91 // ...except for a single one for empty MST, but we wouldn't be pushing that
92 if node.is_empty() {
93 return Err(MstError::EmptyNode);
94 }
95
96 let mut entries = Vec::with_capacity(node.entries.len());
97 let mut prefix = vec![];
98 let mut this_depth = parent_depth.next_expected()?;
99
100 for entry in &node.entries {
101 let mut rkey = vec![];
102 let pre_checked = prefix
103 .get(..entry.prefix_len)
104 .ok_or(MstError::EntryPrefixOutOfbounds)?;
105 rkey.extend_from_slice(pre_checked);
106 rkey.extend_from_slice(&entry.keysuffix);
107
108 let Depth::Depth(key_depth) = Depth::from_key(&rkey) else {
109 return Err(MstError::WrongDepth);
110 };
111
112 // this_depth is `none` if we are the deepest child (directly below root)
113 // in that case we accept whatever highest depth is claimed
114 let expected_depth = match this_depth {
115 Some(d) => d,
116 None => {
117 this_depth = Some(key_depth);
118 key_depth
119 }
120 };
121
122 // all keys we find should be this depth
123 if key_depth != expected_depth {
124 return Err(MstError::DepthUnderflow);
125 }
126
127 prefix = rkey.clone();
128
129 entries.push(Need::Record {
130 rkey: String::from_utf8(rkey)?,
131 cid: entry.value,
132 });
133 if let Some(ref tree) = entry.tree {
134 entries.push(Need::Node {
135 depth: Depth::Depth(key_depth),
136 cid: *tree,
137 });
138 }
139 }
140
141 entries.reverse();
142 stack.append(&mut entries);
143
144 let d = this_depth.ok_or(MstError::LostDepth)?;
145
146 if let Some(tree) = node.left {
147 stack.push(Need::Node {
148 depth: Depth::Depth(d),
149 cid: tree,
150 });
151 }
152 Ok(())
153}
154
155/// Traverser of an atproto MST
156///
157/// Walks the tree from left-to-right in depth-first order
158#[derive(Debug)]
159pub struct Walker {
160 stack: Vec<Need>,
161 prev: String,
162}
163
164impl Walker {
165 pub fn new(tree_root_cid: Cid) -> Self {
166 Self {
167 stack: vec![Need::Node {
168 depth: Depth::Root,
169 cid: tree_root_cid,
170 }],
171 prev: "".to_string(),
172 }
173 }
174
175 /// Advance through nodes until we find a record or can't go further
176 pub fn step<T: Processable>(
177 &mut self,
178 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>,
179 process: impl Fn(Vec<u8>) -> T,
180 ) -> Result<Step<T>, WalkError> {
181 loop {
182 let Some(need) = self.stack.last_mut() else {
183 log::trace!("tried to walk but we're actually done.");
184 return Ok(Step::Finish);
185 };
186
187 match need {
188 &mut Need::Node { depth, cid } => {
189 log::trace!("need node {cid:?}");
190 let Some(block) = blocks.remove(&cid) else {
191 log::trace!("node not found, resting");
192 return Ok(Step::Missing(cid));
193 };
194
195 let MaybeProcessedBlock::Raw(data) = block else {
196 return Err(WalkError::BadCommitFingerprint);
197 };
198 let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
199 .map_err(WalkError::BadCommit)?;
200
201 // found node, make sure we remember
202 self.stack.pop();
203
204 // queue up work on the found node next
205 push_from_node(&mut self.stack, &node, depth)?;
206 }
207 Need::Record { rkey, cid } => {
208 log::trace!("need record {cid:?}");
209 // note that we cannot *remove* a record block, sadly, since
210 // there can be multiple rkeys pointing to the same cid.
211 let Some(data) = blocks.get_mut(cid) else {
212 return Ok(Step::Missing(*cid));
213 };
214 let rkey = rkey.clone();
215 let data = match data {
216 MaybeProcessedBlock::Raw(data) => process(data.to_vec()),
217 MaybeProcessedBlock::Processed(t) => t.clone(),
218 };
219
220 // found node, make sure we remember
221 self.stack.pop();
222
223 // rkeys *must* be in order or else the tree is invalid (or
224 // we have a bug)
225 if rkey <= self.prev {
226 return Err(MstError::RkeyOutOfOrder)?;
227 }
228 self.prev = rkey.clone();
229
230 return Ok(Step::Found { rkey, data });
231 }
232 }
233 }
234 }
235
236 /// blocking!!!!!!
237 pub fn disk_step<T: Processable>(
238 &mut self,
239 reader: &mut SqliteReader,
240 process: impl Fn(Vec<u8>) -> T,
241 ) -> Result<Step<T>, WalkError> {
242 loop {
243 let Some(need) = self.stack.last_mut() else {
244 log::trace!("tried to walk but we're actually done.");
245 return Ok(Step::Finish);
246 };
247
248 match need {
249 &mut Need::Node { depth, cid } => {
250 let cid_bytes = cid.to_bytes();
251 log::trace!("need node {cid:?}");
252 let Some(block_bytes) = reader.get(cid_bytes)? else {
253 log::trace!("node not found, resting");
254 return Ok(Step::Missing(cid));
255 };
256
257 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?;
258
259 let MaybeProcessedBlock::Raw(data) = block else {
260 return Err(WalkError::BadCommitFingerprint);
261 };
262 let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
263 .map_err(WalkError::BadCommit)?;
264
265 // found node, make sure we remember
266 self.stack.pop();
267
268 // queue up work on the found node next
269 push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?;
270 }
271 Need::Record { rkey, cid } => {
272 log::trace!("need record {cid:?}");
273 let cid_bytes = cid.to_bytes();
274 let Some(data_bytes) = reader.get(cid_bytes)? else {
275 log::trace!("record block not found, resting");
276 return Ok(Step::Missing(*cid));
277 };
278 let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?;
279 let rkey = rkey.clone();
280 let data = match data {
281 MaybeProcessedBlock::Raw(data) => process(data),
282 MaybeProcessedBlock::Processed(t) => t.clone(),
283 };
284
285 // found node, make sure we remember
286 self.stack.pop();
287
288 log::trace!("emitting a block as a step. depth={}", self.stack.len());
289
290 // rkeys *must* be in order or else the tree is invalid (or
291 // we have a bug)
292 if rkey <= self.prev {
293 return Err(MstError::RkeyOutOfOrder)?;
294 }
295 self.prev = rkey.clone();
296
297 return Ok(Step::Found { rkey, data });
298 }
299 }
300 }
301 }
302}
303
304#[cfg(test)]
305mod test {
306 use super::*;
307
308 fn cid1() -> Cid {
309 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
310 .parse()
311 .unwrap()
312 }
313
314 #[test]
315 fn test_depth_spec_0() {
316 let d = Depth::from_key(b"2653ae71");
317 assert_eq!(d, Depth::Depth(0))
318 }
319
320 #[test]
321 fn test_depth_spec_1() {
322 let d = Depth::from_key(b"blue");
323 assert_eq!(d, Depth::Depth(1))
324 }
325
326 #[test]
327 fn test_depth_spec_4() {
328 let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec");
329 assert_eq!(d, Depth::Depth(4))
330 }
331
332 #[test]
333 fn test_depth_spec_8() {
334 let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c");
335 assert_eq!(d, Depth::Depth(8))
336 }
337
338 #[test]
339 fn test_depth_ietf_draft_0() {
340 let d = Depth::from_key(b"key1");
341 assert_eq!(d, Depth::Depth(0))
342 }
343
344 #[test]
345 fn test_depth_ietf_draft_1() {
346 let d = Depth::from_key(b"key7");
347 assert_eq!(d, Depth::Depth(1))
348 }
349
350 #[test]
351 fn test_depth_ietf_draft_4() {
352 let d = Depth::from_key(b"key515");
353 assert_eq!(d, Depth::Depth(4))
354 }
355
356 #[test]
357 fn test_depth_interop() {
358 // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json
359 for (k, expected) in [
360 ("", 0),
361 ("asdf", 0),
362 ("blue", 1),
363 ("2653ae71", 0),
364 ("88bfafc7", 2),
365 ("2a92d355", 4),
366 ("884976f5", 6),
367 ("app.bsky.feed.post/454397e440ec", 4),
368 ("app.bsky.feed.post/9adeb165882c", 8),
369 ] {
370 let d = Depth::from_key(k.as_bytes());
371 assert_eq!(d, Depth::Depth(expected), "key: {}", k);
372 }
373 }
374
375 #[test]
376 fn test_push_empty_fails() {
377 let empty_node = Node {
378 left: None,
379 entries: vec![],
380 };
381 let mut stack = vec![];
382 let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4));
383 assert_eq!(err, Err(MstError::EmptyNode));
384 }
385
386 #[test]
387 fn test_push_one_node() {
388 let node = Node {
389 left: Some(cid1()),
390 entries: vec![],
391 };
392 let mut stack = vec![];
393 push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap();
394 assert_eq!(
395 stack.last(),
396 Some(Need::Node {
397 depth: Depth::Depth(3),
398 cid: cid1()
399 })
400 .as_ref()
401 );
402 }
403}