Fast and robust atproto CAR file processing in rust
at main 6.5 kB view raw
1/*! 2Disk storage for blocks on disk 3 4Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5to be the best behaved in terms of both on-disk space usage and memory usage. 6 7```no_run 8# use repo_stream::{DiskBuilder, DiskError}; 9# #[tokio::main] 10# async fn main() -> Result<(), DiskError> { 11let store = DiskBuilder::new() 12 .with_cache_size_mb(32) 13 .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 .open("/some/path.db".into()).await?; 15# Ok(()) 16# } 17``` 18*/ 19 20use crate::drive::DriveError; 21use rusqlite::OptionalExtension; 22use std::path::PathBuf; 23 24#[derive(Debug, thiserror::Error)] 25pub enum DiskError { 26 /// A wrapped database error 27 /// 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 DbError(#[from] rusqlite::Error), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), 35 /// The total size of stored blocks exceeded the allowed size 36 /// 37 /// If you need to process *really* big CARs, you can configure a higher 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 #[error("this error was replaced, seeing this is a bug.")] 42 #[doc(hidden)] 43 Stolen, 44} 45 46impl DiskError { 47 /// hack for ownership challenges with the disk driver 48 pub(crate) fn steal(&mut self) -> Self { 49 let mut swapped = DiskError::Stolen; 50 std::mem::swap(self, &mut swapped); 51 swapped 52 } 53} 54 55/// Builder-style disk store setup 56#[derive(Debug, Clone)] 57pub struct DiskBuilder { 58 /// Database in-memory cache allowance 59 /// 60 /// Default: 32 MiB 61 pub cache_size_mb: usize, 62 /// Database stored block size limit 63 /// 64 /// Default: 10 GiB 65 /// 66 /// Note: actual size on disk may be more, but should approximately scale 67 /// with this limit 68 pub max_stored_mb: usize, 69} 70 71impl Default for DiskBuilder { 72 fn default() -> Self { 73 Self { 74 cache_size_mb: 32, 75 max_stored_mb: 10 * 1024, // 10 GiB 76 } 77 } 78} 79 80impl DiskBuilder { 81 /// Begin configuring the storage with defaults 82 pub fn new() -> Self { 83 Default::default() 84 } 85 /// Set the in-memory cache allowance for the database 86 /// 87 /// Default: 32 MiB 88 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 self.cache_size_mb = size; 90 self 91 } 92 /// Set the approximate stored block size limit 93 /// 94 /// Default: 10 GiB 95 pub fn with_max_stored_mb(mut self, max: usize) -> Self { 96 self.max_stored_mb = max; 97 self 98 } 99 /// Open and initialize the actual disk storage 100 pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 101 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 102 } 103} 104 105/// On-disk block storage 106pub struct DiskStore { 107 conn: rusqlite::Connection, 108 max_stored: usize, 109 stored: usize, 110} 111 112impl DiskStore { 113 /// Initialize a new disk store 114 pub async fn new( 115 path: PathBuf, 116 cache_mb: usize, 117 max_stored_mb: usize, 118 ) -> Result<Self, DiskError> { 119 let max_stored = max_stored_mb * 2_usize.pow(20); 120 let conn = tokio::task::spawn_blocking(move || { 121 let conn = rusqlite::Connection::open(path)?; 122 123 let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 125 // conn.pragma_update(None, "journal_mode", "OFF")?; 126 // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 conn.pragma_update(None, "journal_mode", "WAL")?; 128 // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 conn.pragma_update(None, "synchronous", "OFF")?; 130 conn.pragma_update( 131 None, 132 "cache_size", 133 (cache_mb as i64 * sqlite_one_mb).to_string(), 134 )?; 135 Self::reset_tables(&conn)?; 136 137 Ok::<_, DiskError>(conn) 138 }) 139 .await??; 140 141 Ok(Self { 142 conn, 143 max_stored, 144 stored: 0, 145 }) 146 } 147 pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 let tx = self.conn.transaction()?; 149 Ok(SqliteWriter { 150 tx, 151 stored: &mut self.stored, 152 max: self.max_stored, 153 }) 154 } 155 pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 Ok(SqliteReader { select_stmt }) 158 } 159 /// Drop and recreate the kv table 160 pub async fn reset(self) -> Result<Self, DiskError> { 161 tokio::task::spawn_blocking(move || { 162 Self::reset_tables(&self.conn)?; 163 Ok(self) 164 }) 165 .await? 166 } 167 fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 conn.execute( 170 "CREATE TABLE blocks ( 171 key BLOB PRIMARY KEY NOT NULL, 172 val BLOB NOT NULL 173 ) WITHOUT ROWID", 174 (), 175 )?; 176 Ok(()) 177 } 178} 179 180pub(crate) struct SqliteWriter<'conn> { 181 tx: rusqlite::Transaction<'conn>, 182 stored: &'conn mut usize, 183 max: usize, 184} 185 186impl SqliteWriter<'_> { 187 pub(crate) fn put_many( 188 &mut self, 189 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 ) -> Result<(), DriveError> { 191 let mut insert_stmt = self 192 .tx 193 .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 .map_err(DiskError::DbError)?; 195 for pair in kv { 196 let (k, v) = pair?; 197 *self.stored += v.len(); 198 if *self.stored > self.max { 199 return Err(DiskError::MaxSizeExceeded.into()); 200 } 201 insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 202 } 203 Ok(()) 204 } 205 pub fn commit(self) -> Result<(), DiskError> { 206 self.tx.commit()?; 207 Ok(()) 208 } 209} 210 211pub(crate) struct SqliteReader<'conn> { 212 select_stmt: rusqlite::Statement<'conn>, 213} 214 215impl SqliteReader<'_> { 216 pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 self.select_stmt 218 .query_one((&key,), |row| row.get(0)) 219 .optional() 220 } 221}