Fast and robust atproto CAR file processing in rust
at main 3.1 kB view raw
1/*! 2Record processor function output trait 3 4The return type must satisfy the `Processable` trait, which requires: 5 6- `Clone` because two rkeys can refer to the same record by CID, which may 7 only appear once in the CAR file. 8- `Serialize + DeserializeOwned` so it can be spilled to disk. 9 10One required function must be implemented, `get_size()`: this should return the 11approximate total off-stack size of the type. (the on-stack size will be added 12automatically via `std::mem::get_size`). 13 14Note that it is **not guaranteed** that the `process` function will run on a 15block before storing it in memory or on disk: it's not possible to know if a 16block is a record without actually walking the MST, so the best we can do is 17apply `process` to any block that we know *cannot* be an MST node, and otherwise 18store the raw block bytes. 19 20Here's a silly processing function that just collects 'eyy's found in the raw 21record bytes 22 23``` 24# use repo_stream::Processable; 25# use serde::{Serialize, Deserialize}; 26#[derive(Debug, Clone, Serialize, Deserialize)] 27struct Eyy(usize, String); 28 29impl Processable for Eyy { 30 fn get_size(&self) -> usize { 31 // don't need to compute the usize, it's on the stack 32 self.1.capacity() // in-mem size from the string's capacity, in bytes 33 } 34} 35 36fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 let mut out = Vec::new(); 38 let to_find = "eyy".as_bytes(); 39 for i in 0..(raw.len() - 3) { 40 if &raw[i..(i+3)] == to_find { 41 out.push(Eyy(i, "eyy".to_string())); 42 } 43 } 44 out 45} 46``` 47 48The memory sizing stuff is a little sketch but probably at least approximately 49works. 50*/ 51 52use serde::{Serialize, de::DeserializeOwned}; 53 54/// Output trait for record processing 55pub trait Processable: Clone + Serialize + DeserializeOwned { 56 /// Any additional in-memory size taken by the processed type 57 /// 58 /// Do not include stack size (`std::mem::size_of`) 59 fn get_size(&self) -> usize; 60} 61 62/// Processor that just returns the raw blocks 63#[inline] 64pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 block 66} 67 68impl Processable for u8 { 69 fn get_size(&self) -> usize { 70 0 71 } 72} 73 74impl Processable for usize { 75 fn get_size(&self) -> usize { 76 0 // no additional space taken, just its stack size (newtype is free) 77 } 78} 79 80impl Processable for String { 81 fn get_size(&self) -> usize { 82 self.capacity() 83 } 84} 85 86impl<Item: Sized + Processable> Processable for Vec<Item> { 87 fn get_size(&self) -> usize { 88 let slot_size = std::mem::size_of::<Item>(); 89 let direct_size = slot_size * self.capacity(); 90 let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 direct_size + items_referenced_size 92 } 93} 94 95impl<Item: Processable> Processable for Option<Item> { 96 fn get_size(&self) -> usize { 97 self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 } 99} 100 101impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 fn get_size(&self) -> usize { 103 match self { 104 Ok(item) => item.get_size(), 105 Err(err) => err.get_size(), 106 } 107 } 108}