Fast and robust atproto CAR file processing in rust
at disk 6.5 kB view raw
1/*! 2Disk storage for blocks on disk 3 4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5to be the best behaved in terms of both on-disk space usage and memory usage. 6 7```no_run 8# use repo_stream::{DiskBuilder, DiskError}; 9# #[tokio::main] 10# async fn main() -> Result<(), DiskError> { 11let store = DiskBuilder::new() 12 .with_cache_size_mb(32) 13 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 .open("/some/path.db".into()).await?; 15# Ok(()) 16# } 17``` 18*/ 19 20use crate::drive::DriveError; 21use rusqlite::OptionalExtension; 22use std::path::PathBuf; 23 24#[derive(Debug, thiserror::Error)] 25pub enum DiskError { 26 /// A wrapped database error 27 /// 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 DbError(#[from] rusqlite::Error), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), 35 /// The total size of stored blocks exceeded the allowed size 36 /// 37 /// If you need to process *really* big CARs, you can configure a higher 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 #[error("this error was replaced, seeing this is a bug.")] 42 #[doc(hidden)] 43 Stolen, 44} 45 46impl DiskError { 47 /// hack for ownership challenges with the disk driver 48 pub(crate) fn steal(&mut self) -> Self { 49 let mut swapped = DiskError::Stolen; 50 std::mem::swap(self, &mut swapped); 51 swapped 52 } 53} 54 55/// Builder-style disk store setup 56pub struct DiskBuilder { 57 /// Database in-memory cache allowance 58 /// 59 /// Default: 32 MiB 60 pub cache_size_mb: usize, 61 /// Database stored block size limit 62 /// 63 /// Default: 10 GiB 64 /// 65 /// Note: actual size on disk may be more, but should approximately scale 66 /// with this limit 67 pub max_stored_mb: usize, 68} 69 70impl Default for DiskBuilder { 71 fn default() -> Self { 72 Self { 73 cache_size_mb: 32, 74 max_stored_mb: 10 * 1024, // 10 GiB 75 } 76 } 77} 78 79impl DiskBuilder { 80 /// Begin configuring the storage with defaults 81 pub fn new() -> Self { 82 Default::default() 83 } 84 /// Set the in-memory cache allowance for the database 85 /// 86 /// Default: 32 MiB 87 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 88 self.cache_size_mb = size; 89 self 90 } 91 /// Set the approximate stored block size limit 92 /// 93 /// Default: 10 GiB 94 pub fn with_max_stored_mb(mut self, max: usize) -> Self { 95 self.max_stored_mb = max; 96 self 97 } 98 /// Open and initialize the actual disk storage 99 pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 } 102} 103 104/// On-disk block storage 105pub struct DiskStore { 106 conn: rusqlite::Connection, 107 max_stored: usize, 108 stored: usize, 109} 110 111impl DiskStore { 112 /// Initialize a new disk store 113 pub async fn new( 114 path: PathBuf, 115 cache_mb: usize, 116 max_stored_mb: usize, 117 ) -> Result<Self, DiskError> { 118 let max_stored = max_stored_mb * 2_usize.pow(20); 119 let conn = tokio::task::spawn_blocking(move || { 120 let conn = rusqlite::Connection::open(path)?; 121 122 let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 123 124 // conn.pragma_update(None, "journal_mode", "OFF")?; 125 // conn.pragma_update(None, "journal_mode", "MEMORY")?; 126 conn.pragma_update(None, "journal_mode", "WAL")?; 127 // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 128 conn.pragma_update(None, "synchronous", "OFF")?; 129 conn.pragma_update( 130 None, 131 "cache_size", 132 (cache_mb as i64 * sqlite_one_mb).to_string(), 133 )?; 134 Self::reset_tables(&conn)?; 135 136 Ok::<_, DiskError>(conn) 137 }) 138 .await??; 139 140 Ok(Self { 141 conn, 142 max_stored, 143 stored: 0, 144 }) 145 } 146 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 147 let tx = self.conn.transaction()?; 148 Ok(SqliteWriter { 149 tx, 150 stored: &mut self.stored, 151 max: self.max_stored, 152 }) 153 } 154 pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 155 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 156 Ok(SqliteReader { select_stmt }) 157 } 158 /// Drop and recreate the kv table 159 pub async fn reset(self) -> Result<Self, DiskError> { 160 tokio::task::spawn_blocking(move || { 161 Self::reset_tables(&self.conn)?; 162 Ok(self) 163 }) 164 .await? 165 } 166 fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 167 conn.execute("DROP TABLE IF EXISTS blocks", ())?; 168 conn.execute( 169 "CREATE TABLE blocks ( 170 key BLOB PRIMARY KEY NOT NULL, 171 val BLOB NOT NULL 172 ) WITHOUT ROWID", 173 (), 174 )?; 175 Ok(()) 176 } 177} 178 179pub(crate) struct SqliteWriter<'conn> { 180 tx: rusqlite::Transaction<'conn>, 181 stored: &'conn mut usize, 182 max: usize, 183} 184 185impl SqliteWriter<'_> { 186 pub(crate) fn put_many( 187 &mut self, 188 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 189 ) -> Result<(), DriveError> { 190 let mut insert_stmt = self 191 .tx 192 .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 193 .map_err(DiskError::DbError)?; 194 for pair in kv { 195 let (k, v) = pair?; 196 *self.stored += v.len(); 197 if *self.stored > self.max { 198 return Err(DiskError::MaxSizeExceeded.into()); 199 } 200 insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 201 } 202 Ok(()) 203 } 204 pub fn commit(self) -> Result<(), DiskError> { 205 self.tx.commit()?; 206 Ok(()) 207 } 208} 209 210pub(crate) struct SqliteReader<'conn> { 211 select_stmt: rusqlite::Statement<'conn>, 212} 213 214impl SqliteReader<'_> { 215 pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 216 self.select_stmt 217 .query_one((&key,), |row| row.get(0)) 218 .optional() 219 } 220}