Fast and robust atproto CAR file processing in rust

per-node task queue cleanup

Changed files
+22 -96
src
+22 -96
src/walk.rs
··· 5 5 use ipld_core::cid::Cid; 6 6 use std::collections::HashMap; 7 7 use std::error::Error; 8 - use std::fmt; 9 8 10 9 #[derive(Debug, thiserror::Error)] 11 10 pub enum Trip<E: Error> { ··· 36 35 Step { rkey: String, data: T }, 37 36 } 38 37 39 - /// some block we need (or have found) 40 - #[derive(Clone, PartialEq)] 41 - struct FindableLink { 42 - cid: Cid, 43 - found: bool, 44 - } 45 - 46 - impl From<&Cid> for FindableLink { 47 - fn from(cid: &Cid) -> Self { 48 - let cid = *cid; 49 - let found = false; 50 - FindableLink { cid, found } 51 - } 52 - } 53 - 54 - impl fmt::Debug for FindableLink { 55 - fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 56 - let status = if self.found { "??" } else { "ok" }; 57 - let cid_s = self.cid.to_string_of_base(multibase::Base::Base64).unwrap(); 58 - let split_at = cid_s.char_indices().nth_back(4).unwrap().0; 59 - let cid_s = &cid_s[split_at..]; 60 - write!(f, "({status} [{cid_s}])") 61 - } 62 - } 63 - 64 - /// a transformed mst::Entry with decompressed rkeys and which we can mutate 65 - /// 66 - /// contains exactly one record link, and one optional subtree whose contents 67 - /// are all to the right of this record's rkey 68 - #[derive(Debug, PartialEq)] 69 - struct ActionEntry { 70 - rkey: String, 71 - record: FindableLink, 72 - tree: Option<FindableLink>, 73 - } 74 - 75 38 /// a transformed mst::Node which we can mutate and track progress on 76 39 /// 77 40 /// contains an optional left subtree, whose contents are all to the left of 78 41 /// every entry in the entries array. 79 42 #[derive(Debug, PartialEq)] 80 43 struct ActionNode { 81 - left_tree: Option<FindableLink>, 82 - entries: Vec<ActionEntry>, 44 + entries: Vec<Need>, 83 45 } 84 46 85 47 impl ActionNode { 86 48 fn from_root(tree_root_cid: Cid) -> Self { 87 49 ActionNode { 88 - left_tree: Some((&tree_root_cid).into()), 89 - entries: vec![], 50 + entries: vec![Need::Node(tree_root_cid)], 90 51 } 91 52 } 92 53 fn next(&self) -> Option<Need> { 93 - if let Some(findable) = &self.left_tree 94 - && !findable.found 95 - { 96 - return Some(Need::Node(findable.cid)); 97 - } 98 - for ActionEntry { rkey, record, tree } in &self.entries { 99 - if !record.found { 100 - return Some(Need::Record { 101 - rkey: rkey.to_string(), 102 - cid: record.cid, 103 - }); 104 - } 105 - if let Some(findable) = tree 106 - && !findable.found 107 - { 108 - return Some(Need::Node(findable.cid)); 109 - } 110 - } 111 - None 54 + self.entries.last().cloned() 112 55 } 113 - fn found(&mut self, cid: &Cid) { 114 - // just be horrible for now, whatever 115 - if let Some(findable) = &mut self.left_tree 116 - && !findable.found 117 - { 118 - assert_eq!(*cid, findable.cid, "wrong found for next"); 119 - findable.found = true; 120 - return; 121 - } 122 - for ActionEntry { record, tree, .. } in &mut self.entries { 123 - if !record.found { 124 - assert_eq!(*cid, record.cid, "wrong found for next, expected record"); 125 - record.found = true; 126 - return; 127 - } 128 - if let Some(findable) = tree 129 - && !findable.found 130 - { 131 - assert_eq!( 132 - *cid, findable.cid, 133 - "wrong found for next, expected entry tree" 134 - ); 135 - findable.found = true; 136 - return; 137 - } 138 - } 56 + fn found(&mut self) { 57 + self.entries.pop(); 139 58 } 140 59 } 141 60 142 - #[derive(Debug, PartialEq)] 61 + #[derive(Debug, Clone, PartialEq)] 143 62 enum Need { 144 63 Node(Cid), 145 64 Record { rkey: String, cid: Cid }, ··· 149 68 type Error = ActionNodeError; 150 69 151 70 fn try_from(node: &Node) -> Result<Self, Self::Error> { 152 - let left_tree = node.left.as_ref().map(Into::into); 71 + let mut entries = vec![]; 72 + 73 + if let Some(tree) = node.left { 74 + entries.push(Need::Node(tree)); 75 + } 153 76 154 - let mut entries = vec![]; 155 77 let mut prefix = vec![]; 156 78 for entry in &node.entries { 157 79 let mut rkey = vec![]; ··· 162 84 rkey.extend_from_slice(&entry.keysuffix); 163 85 prefix = rkey.clone(); 164 86 165 - entries.push(ActionEntry { 166 - rkey: String::from_utf8(rkey)?, // TODO this has to be try_from 167 - record: (&entry.value).into(), 168 - tree: entry.tree.as_ref().map(Into::into), 87 + entries.push(Need::Record { 88 + rkey: String::from_utf8(rkey)?, 89 + cid: entry.value, 169 90 }); 91 + if let Some(ref tree) = entry.tree { 92 + entries.push(Need::Node(*tree)); 93 + } 170 94 } 171 95 172 - Ok(ActionNode { left_tree, entries }) 96 + entries.reverse(); 97 + 98 + Ok(ActionNode { entries }) 173 99 } 174 100 } 175 101 ··· 215 141 .map_err(|e| Trip::BadCommit(e.into()))?; 216 142 217 143 // found node, make sure we remember 218 - current_node.found(cid); 144 + current_node.found(); 219 145 220 146 // queue up work on the found node next 221 147 self.stack.push((&node).try_into()?); ··· 236 162 }; 237 163 238 164 // found node, make sure we remember 239 - current_node.found(cid); 165 + current_node.found(); 240 166 241 167 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 242 168 let data = data.map_err(Trip::ProcessFailed)?;