Fast and robust atproto CAR file processing in rust
15
fork

Configure Feed

Select the types of activity you want to include in your feed.

at candystore 159 lines 4.7 kB view raw
1/*! 2Disk storage for blocks on disk 3 4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5to be the best behaved in terms of both on-disk space usage and memory usage. 6 7```no_run 8# use repo_stream::{DiskBuilder, DiskError}; 9# #[tokio::main] 10# async fn main() -> Result<(), DiskError> { 11let store = DiskBuilder::new() 12 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 13 .open("/some/path.db".into()).await?; 14# Ok(()) 15# } 16``` 17*/ 18 19use crate::drive::DriveError; 20use candystore::{CandyError, CandyStore, Config}; 21use std::path::PathBuf; 22 23#[derive(Debug, thiserror::Error)] 24pub enum DiskError { 25 /// A wrapped database error 26 /// 27 /// (The wrapped err should probably be obscured to remove public-facing 28 /// sqlite bits) 29 #[error(transparent)] 30 DbError(#[from] CandyError), 31 /// Unfortunately candystore uses anyhow::Result for it's open call 32 #[error("Failed on a db call, see logs")] 33 DbGarbageError, 34 /// A tokio blocking task failed to join 35 #[error("Failed to join a tokio blocking task: {0}")] 36 JoinError(#[from] tokio::task::JoinError), 37 /// The total size of stored blocks exceeded the allowed size 38 /// 39 /// If you need to process *really* big CARs, you can configure a higher 40 /// limit. 41 #[error("Maximum disk size reached")] 42 MaxSizeExceeded, 43} 44 45/// Builder-style disk store setup 46#[derive(Debug, Clone)] 47pub struct DiskBuilder { 48 /// Database stored block size limit 49 /// 50 /// Default: 10 GiB 51 /// 52 /// Note: actual size on disk may be more, but should approximately scale 53 /// with this limit 54 pub max_stored_mb: usize, 55} 56 57impl Default for DiskBuilder { 58 fn default() -> Self { 59 Self { 60 max_stored_mb: 10 * 1024, // 10 GiB 61 } 62 } 63} 64 65impl DiskBuilder { 66 /// Begin configuring the storage with defaults 67 pub fn new() -> Self { 68 Default::default() 69 } 70 /// Set the approximate stored block size limit 71 /// 72 /// Default: 10 GiB 73 pub fn with_max_stored_mb(mut self, max: usize) -> Self { 74 self.max_stored_mb = max; 75 self 76 } 77 /// Open and initialize the actual disk storage 78 pub async fn open(&self, path: PathBuf, keys_hint: Option<usize>) -> Result<DiskStore, DiskError> { 79 DiskStore::new(path, self.max_stored_mb, keys_hint).await 80 } 81} 82 83/// On-disk block storage 84pub struct DiskStore { 85 db: CandyStore, 86 max_stored: usize, 87 stored: usize, 88} 89 90impl DiskStore { 91 /// Initialize a new disk store 92 pub async fn new(path: PathBuf, max_stored_mb: usize, keys_hint: Option<usize>) -> Result<Self, DiskError> { 93 let max_stored = max_stored_mb * 2_usize.pow(20); 94 let db = tokio::task::spawn_blocking(move || { 95 let mut conf = Config::default(); 96 // conf.max_shard_size = 256 * 1024 * 1024; 97 // conf.min_compaction_threashold = 32 * 1024 * 1024; 98 // conf.expected_number_of_keys = 1_200_000; 99 if let Some(hint) = keys_hint { 100 conf.expected_number_of_keys = hint; 101 } 102 conf.num_compaction_threads = 1; 103 let db = CandyStore::open(path, conf).map_err(|e| { 104 log::error!("{e:?}"); 105 DiskError::DbGarbageError 106 })?; 107 108 Ok::<_, DiskError>(db) 109 }) 110 .await??; 111 112 Ok(Self { 113 db, 114 max_stored, 115 stored: 0, 116 }) 117 } 118 119 /// Drop and recreate the kv table 120 pub async fn reset(self) -> Result<Self, DiskError> { 121 tokio::task::spawn_blocking(move || { 122 Self::reset_tables(&self.db)?; 123 Ok(self) 124 }) 125 .await? 126 } 127 fn reset_tables(db: &CandyStore) -> Result<(), DiskError> { 128 db.clear().map_err(|e| { 129 log::error!("{e:?}"); 130 DiskError::DbGarbageError 131 })?; 132 133 Ok(()) 134 } 135 136 pub(crate) fn put_many( 137 &mut self, 138 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 139 ) -> Result<(), DriveError> { 140 for pair in kv { 141 let (k, v) = pair?; 142 self.stored += v.len(); 143 if self.stored > self.max_stored { 144 return Err(DiskError::MaxSizeExceeded.into()); 145 } 146 self.db.owned_set(k, &v).map_err(|e| { 147 log::error!("{e:?}"); 148 DiskError::DbGarbageError 149 })?; 150 } 151 Ok(()) 152 } 153 pub(crate) fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, DiskError> { 154 self.db.owned_get(key).map_err(|e| { 155 log::error!("{e:?}"); 156 DiskError::DbGarbageError 157 }) 158 } 159}