Fast and robust atproto CAR file processing in rust

more documentation

Changed files
+115 -20
src
+22 -4
src/disk.rs
··· 1 + /*! 2 + Sqlite storage for blocks on disk 3 + 4 + In testing sqlite wasn't the fastest, but it seemed to be the best behaved in 5 + terms of both on-disk space usage and memory usage. 6 + 7 + ```no_run 8 + # use repo_stream::SqliteStore; 9 + # #[tokio::main] 10 + # async fn main() -> Result<(), rusqlite::Error> { 11 + let db_cache_size = 32; // MiB 12 + let store = SqliteStore::new("/some/path.sqlite".into(), db_cache_size).await?; 13 + # Ok(()) 14 + # } 15 + ``` 16 + */ 17 + 1 18 use crate::drive::DriveError; 2 19 use rusqlite::OptionalExtension; 3 20 use std::path::PathBuf; 4 21 22 + /// On-disk block storage 5 23 pub struct SqliteStore { 6 24 conn: rusqlite::Connection, 7 25 } ··· 38 56 39 57 Ok(Self { conn }) 40 58 } 41 - pub fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, rusqlite::Error> { 59 + pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, rusqlite::Error> { 42 60 let tx = self.conn.transaction()?; 43 61 // let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 44 62 Ok(SqliteWriter { tx }) 45 63 } 46 - pub fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, rusqlite::Error> { 64 + pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, rusqlite::Error> { 47 65 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 48 66 Ok(SqliteReader { select_stmt }) 49 67 } ··· 53 71 } 54 72 } 55 73 56 - pub struct SqliteWriter<'conn> { 74 + pub(crate) struct SqliteWriter<'conn> { 57 75 tx: rusqlite::Transaction<'conn>, 58 76 } 59 77 ··· 77 95 } 78 96 } 79 97 80 - pub struct SqliteReader<'conn> { 98 + pub(crate) struct SqliteReader<'conn> { 81 99 select_stmt: rusqlite::Statement<'conn>, 82 100 } 83 101
+82 -14
src/drive.rs
··· 1 - //! Consume an MST block stream, producing an ordered stream of records 1 + //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 3 use crate::disk::SqliteStore; 4 4 use crate::process::Processable; ··· 100 100 } 101 101 } 102 102 103 + /// Read a CAR file buffering blocks in memory or to disk 103 104 pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 105 + /// All blocks fit within the memory limit 106 + /// 107 + /// You probably want to check the commit's signature. You can go ahead and 108 + /// walk the MST right away. 104 109 Memory(Commit, MemDriver<T>), 105 - Disk(BigCar<R, T>), 110 + /// Blocks exceed the memory limit 111 + /// 112 + /// You'll need to provide a disk storage to continue. The commit will be 113 + /// returned and can be validated only once all blocks are loaded. 114 + Disk(NeedDisk<R, T>), 106 115 } 107 116 108 117 impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { ··· 143 152 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 144 153 mem_blocks.insert(cid, maybe_processed); 145 154 if mem_size >= max_size { 146 - return Ok(Driver::Disk(BigCar { 155 + return Ok(Driver::Disk(NeedDisk { 147 156 car, 148 157 root, 149 158 process, ··· 214 223 } 215 224 } 216 225 217 - /// a paritally memory-loaded car file that needs disk spillover to continue 218 - pub struct BigCar<R: AsyncRead + Unpin, T: Processable> { 226 + /// A paritally memory-loaded car file that needs disk spillover to continue 227 + pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 219 228 car: CarReader<R>, 220 229 root: Cid, 221 230 process: fn(Vec<u8>) -> T, ··· 236 245 Ok(t) 237 246 } 238 247 239 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> BigCar<R, T> { 248 + impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 240 249 pub async fn finish_loading( 241 250 mut self, 242 251 mut store: SqliteStore, 243 - ) -> Result<(Commit, BigCarReady<T>), DriveError> { 252 + ) -> Result<(Commit, DiskDriver<T>), DriveError> { 244 253 // move store in and back out so we can manage lifetimes 245 254 // dump mem blocks into the store 246 255 store = tokio::task::spawn(async move { ··· 320 329 321 330 Ok(( 322 331 commit, 323 - BigCarReady { 332 + DiskDriver { 324 333 process: self.process, 325 334 state: Some(BigState { store, walker }), 326 335 }, ··· 333 342 walker: Walker, 334 343 } 335 344 336 - pub struct BigCarReady<T: Clone> { 345 + /// MST walker that reads from disk instead of an in-memory hashmap 346 + pub struct DiskDriver<T: Clone> { 337 347 process: fn(Vec<u8>) -> T, 338 348 state: Option<BigState>, 339 349 } 340 350 341 - impl<T: Processable + Send + 'static> BigCarReady<T> { 351 + // for doctests only 352 + #[doc(hidden)] 353 + pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 354 + use crate::process::noop; 355 + DiskDriver { 356 + process: noop, 357 + state: None, 358 + } 359 + } 360 + 361 + impl<T: Processable + Send + 'static> DiskDriver<T> { 362 + /// Walk the MST returning up to `n` rkey + record pairs 363 + /// 364 + /// ```no_run 365 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 366 + /// # #[tokio::main] 367 + /// # async fn main() -> Result<(), DriveError> { 368 + /// # let mut disk_driver = _get_fake_disk_driver(); 369 + /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 370 + /// for (rkey, record) in pairs { 371 + /// println!("{rkey}: size={}", record.len()); 372 + /// } 373 + /// } 374 + /// let store = disk_driver.reset_store().await?; 375 + /// # Ok(()) 376 + /// # } 377 + /// ``` 342 378 pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 343 379 let process = self.process; 344 380 345 381 // state should only *ever* be None transiently while inside here 346 - let mut state = self 347 - .state 348 - .take() 349 - .expect("BigCarReady must have Some(state)"); 382 + let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 350 383 351 384 // the big pain here is that we don't want to leave self.state in an 352 385 // invalid state (None), so all the error paths have to make sure it ··· 456 489 Ok(()) 457 490 } 458 491 492 + /// Spawn the disk reading task into a tokio blocking thread 493 + /// 494 + /// The idea is to avoid so much sending back and forth to the blocking 495 + /// thread, letting a blocking task do all the disk reading work and sending 496 + /// records and rkeys back through an `mpsc` channel instead. 497 + /// 498 + /// This might also allow the disk work to continue while processing the 499 + /// records. It's still not yet clear if this method actually has much 500 + /// benefit over just using `.next_chunk(n)`. 501 + /// 502 + /// ```no_run 503 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 504 + /// # #[tokio::main] 505 + /// # async fn main() -> Result<(), DriveError> { 506 + /// # let mut disk_driver = _get_fake_disk_driver(); 507 + /// let (mut rx, join) = disk_driver.to_channel(512); 508 + /// while let Some(recvd) = rx.recv().await { 509 + /// let pairs = recvd?; 510 + /// for (rkey, record) in pairs { 511 + /// println!("{rkey}: size={}", record.len()); 512 + /// } 513 + /// 514 + /// } 515 + /// let store = join.await?.reset_store().await?; 516 + /// # Ok(()) 517 + /// # } 518 + /// ``` 459 519 pub fn to_channel( 460 520 mut self, 461 521 n: usize, ··· 476 536 (rx, chan_task) 477 537 } 478 538 539 + /// Reset the disk storage so it can be reused. You must call this. 540 + /// 541 + /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 542 + /// calls, that would be risky in an async context. For now you just have to 543 + /// carefully make sure you call it. 544 + /// 545 + /// The sqlite store is returned, so it can be reused for another 546 + /// `DiskDriver`. 479 547 pub async fn reset_store(mut self) -> Result<SqliteStore, DriveError> { 480 548 tokio::task::spawn_blocking(move || { 481 549 let BigState { mut store, .. } = self.state.take().expect("valid state");
+2 -1
src/lib.rs
··· 65 65 66 66 */ 67 67 68 - mod mst; 68 + pub mod mst; 69 69 mod walk; 70 70 71 71 pub mod disk; ··· 74 74 75 75 pub use disk::SqliteStore; 76 76 pub use drive::{DriveError, Driver}; 77 + pub use mst::Commit; 77 78 pub use process::Processable;
+9 -1
src/process.rs
··· 1 + /*! 2 + Record processor function output trait 3 + */ 4 + 1 5 use serde::{Serialize, de::DeserializeOwned}; 2 6 7 + /// Output trait for record processing 3 8 pub trait Processable: Clone + Serialize + DeserializeOwned { 4 - /// the additional size taken up (not including its mem::size_of) 9 + /// Any additional in-memory size taken by the processed type 10 + /// 11 + /// Do not include stack size (`std::mem::size_of`) 5 12 fn get_size(&self) -> usize; 6 13 } 7 14 15 + /// Processor that just returns the raw blocks 8 16 #[inline] 9 17 pub fn noop(block: Vec<u8>) -> Vec<u8> { 10 18 block