Fast and robust atproto CAR file processing in rust
at fjall3 4.9 kB view raw
1/*! 2Disk storage for blocks on disk 3 4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5to be the best behaved in terms of both on-disk space usage and memory usage. 6 7```no_run 8# use repo_stream::{DiskBuilder, DiskError}; 9# #[tokio::main] 10# async fn main() -> Result<(), DiskError> { 11let store = DiskBuilder::new() 12 .with_cache_size_mb(32) 13 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 .open("/some/path.db".into()).await?; 15# Ok(()) 16# } 17``` 18*/ 19 20use crate::drive::DriveError; 21use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 22use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 23use std::path::PathBuf; 24 25#[derive(Debug, thiserror::Error)] 26pub enum DiskError { 27 /// A wrapped database error 28 /// 29 /// (The wrapped err should probably be obscured to remove public-facing 30 /// sqlite bits) 31 #[error(transparent)] 32 DbError(#[from] FjallError), 33 /// A tokio blocking task failed to join 34 #[error("Failed to join a tokio blocking task: {0}")] 35 JoinError(#[from] tokio::task::JoinError), 36 /// The total size of stored blocks exceeded the allowed size 37 /// 38 /// If you need to process *really* big CARs, you can configure a higher 39 /// limit. 40 #[error("Maximum disk size reached")] 41 MaxSizeExceeded, 42} 43 44/// Builder-style disk store setup 45#[derive(Debug, Clone)] 46pub struct DiskBuilder { 47 /// Database in-memory cache allowance 48 /// 49 /// Default: 32 MiB 50 pub cache_size_mb: usize, 51 /// Database stored block size limit 52 /// 53 /// Default: 10 GiB 54 /// 55 /// Note: actual size on disk may be more, but should approximately scale 56 /// with this limit 57 pub max_stored_mb: usize, 58} 59 60impl Default for DiskBuilder { 61 fn default() -> Self { 62 Self { 63 cache_size_mb: 64, 64 max_stored_mb: 10 * 1024, // 10 GiB 65 } 66 } 67} 68 69impl DiskBuilder { 70 /// Begin configuring the storage with defaults 71 pub fn new() -> Self { 72 Default::default() 73 } 74 /// Set the in-memory cache allowance for the database 75 /// 76 /// Default: 64 MiB 77 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 78 self.cache_size_mb = size; 79 self 80 } 81 /// Set the approximate stored block size limit 82 /// 83 /// Default: 10 GiB 84 pub fn with_max_stored_mb(mut self, max: usize) -> Self { 85 self.max_stored_mb = max; 86 self 87 } 88 /// Open and initialize the actual disk storage 89 pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 90 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 91 } 92} 93 94/// On-disk block storage 95pub struct DiskStore { 96 #[allow(unused)] 97 db: Database, 98 partition: Keyspace, 99 max_stored: usize, 100 stored: usize, 101} 102 103impl DiskStore { 104 /// Initialize a new disk store 105 pub async fn new( 106 path: PathBuf, 107 cache_mb: usize, 108 max_stored_mb: usize, 109 ) -> Result<Self, DiskError> { 110 let max_stored = max_stored_mb * 2_usize.pow(20); 111 let (db, partition) = tokio::task::spawn_blocking(move || { 112 let db = Database::builder(path) 113 // .manual_journal_persist(true) 114 // .flush_workers(1) 115 // .compaction_workers(1) 116 .journal_compression(CompressionType::None) 117 .cache_size(cache_mb as u64 * 2_u64.pow(20)) 118 .temporary(true) 119 .open()?; 120 let opts = KeyspaceCreateOptions::default() 121 .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 122 .filter_block_pinning_policy(PinningPolicy::disabled()) 123 .expect_point_read_hits(true) 124 .data_block_compression_policy(CompressionPolicy::disabled()) 125 .manual_journal_persist(true) 126 .max_memtable_size(32 * 2_u64.pow(20)); 127 let partition = db.keyspace("z", || opts)?; 128 129 Ok::<_, DiskError>((db, partition)) 130 }) 131 .await??; 132 133 Ok(Self { 134 db, 135 partition, 136 max_stored, 137 stored: 0, 138 }) 139 } 140 141 pub(crate) fn put_many( 142 &mut self, 143 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 144 ) -> Result<(), DriveError> { 145 let mut batch = self.db.batch(); 146 for pair in kv { 147 let (k, v) = pair?; 148 self.stored += v.len(); 149 if self.stored > self.max_stored { 150 return Err(DiskError::MaxSizeExceeded.into()); 151 } 152 batch.insert(&self.partition, k, v); 153 } 154 batch.commit().map_err(DiskError::DbError)?; 155 Ok(()) 156 } 157 158 #[inline] 159 pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 160 self.partition.get(key) 161 } 162}