Fast and robust atproto CAR file processing in rust

throw in sqlite

just slightly slower than redb atm

+92 -1
Cargo.lock
··· 407 407 ] 408 408 409 409 [[package]] 410 + name = "errno" 411 + version = "0.3.14" 412 + source = "registry+https://github.com/rust-lang/crates.io-index" 413 + checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" 414 + dependencies = [ 415 + "libc", 416 + "windows-sys 0.60.2", 417 + ] 418 + 419 + [[package]] 410 420 name = "fallible-iterator" 411 421 version = "0.3.0" 412 422 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 417 427 version = "0.1.9" 418 428 source = "registry+https://github.com/rust-lang/crates.io-index" 419 429 checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 430 + 431 + [[package]] 432 + name = "fastrand" 433 + version = "2.3.0" 434 + source = "registry+https://github.com/rust-lang/crates.io-index" 435 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 420 436 421 437 [[package]] 422 438 name = "foldhash" ··· 514 530 ] 515 531 516 532 [[package]] 533 + name = "getrandom" 534 + version = "0.3.3" 535 + source = "registry+https://github.com/rust-lang/crates.io-index" 536 + checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 537 + dependencies = [ 538 + "cfg-if", 539 + "libc", 540 + "r-efi", 541 + "wasi 0.14.7+wasi-0.2.4", 542 + ] 543 + 544 + [[package]] 517 545 name = "gimli" 518 546 version = "0.32.3" 519 547 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 664 692 ] 665 693 666 694 [[package]] 695 + name = "linux-raw-sys" 696 + version = "0.11.0" 697 + source = "registry+https://github.com/rust-lang/crates.io-index" 698 + checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" 699 + 700 + [[package]] 667 701 name = "lock_api" 668 702 version = "0.4.14" 669 703 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 711 745 checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" 712 746 dependencies = [ 713 747 "libc", 714 - "wasi", 748 + "wasi 0.11.1+wasi-snapshot-preview1", 715 749 "windows-sys 0.59.0", 716 750 ] 717 751 ··· 877 911 ] 878 912 879 913 [[package]] 914 + name = "r-efi" 915 + version = "5.3.0" 916 + source = "registry+https://github.com/rust-lang/crates.io-index" 917 + checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 918 + 919 + [[package]] 880 920 name = "rayon" 881 921 version = "1.11.0" 882 922 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 962 1002 "serde", 963 1003 "serde_bytes", 964 1004 "serde_ipld_dagcbor", 1005 + "tempfile", 965 1006 "thiserror 2.0.17", 966 1007 "tokio", 967 1008 ] ··· 987 1028 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 988 1029 989 1030 [[package]] 1031 + name = "rustix" 1032 + version = "1.1.2" 1033 + source = "registry+https://github.com/rust-lang/crates.io-index" 1034 + checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" 1035 + dependencies = [ 1036 + "bitflags", 1037 + "errno", 1038 + "libc", 1039 + "linux-raw-sys", 1040 + "windows-sys 0.60.2", 1041 + ] 1042 + 1043 + [[package]] 990 1044 name = "rustversion" 991 1045 version = "1.0.22" 992 1046 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1138 1192 ] 1139 1193 1140 1194 [[package]] 1195 + name = "tempfile" 1196 + version = "3.23.0" 1197 + source = "registry+https://github.com/rust-lang/crates.io-index" 1198 + checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" 1199 + dependencies = [ 1200 + "fastrand", 1201 + "getrandom", 1202 + "once_cell", 1203 + "rustix", 1204 + "windows-sys 0.60.2", 1205 + ] 1206 + 1207 + [[package]] 1141 1208 name = "thiserror" 1142 1209 version = "1.0.69" 1143 1210 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1277 1344 checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" 1278 1345 1279 1346 [[package]] 1347 + name = "wasi" 1348 + version = "0.14.7+wasi-0.2.4" 1349 + source = "registry+https://github.com/rust-lang/crates.io-index" 1350 + checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" 1351 + dependencies = [ 1352 + "wasip2", 1353 + ] 1354 + 1355 + [[package]] 1356 + name = "wasip2" 1357 + version = "1.0.1+wasi-0.2.4" 1358 + source = "registry+https://github.com/rust-lang/crates.io-index" 1359 + checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" 1360 + dependencies = [ 1361 + "wit-bindgen", 1362 + ] 1363 + 1364 + [[package]] 1280 1365 name = "wasm-bindgen" 1281 1366 version = "0.2.104" 1282 1367 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1506 1591 version = "0.53.1" 1507 1592 source = "registry+https://github.com/rust-lang/crates.io-index" 1508 1593 checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" 1594 + 1595 + [[package]] 1596 + name = "wit-bindgen" 1597 + version = "0.46.0" 1598 + source = "registry+https://github.com/rust-lang/crates.io-index" 1599 + checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1509 1600 1510 1601 [[package]] 1511 1602 name = "zerocopy"
+1
Cargo.toml
··· 27 27 criterion = { version = "0.7.0", features = ["async_tokio"] } 28 28 env_logger = "0.11.8" 29 29 multibase = "0.9.2" 30 + tempfile = "3.23.0" 30 31 tokio = { version = "1.47.1", features = ["full"] } 31 32 32 33 [profile.profiling]
+12 -38
src/disk_drive.rs
··· 70 70 P: Fn(&[u8]) -> Result<T, PE>, 71 71 PE: Error, 72 72 { 73 + #[allow(dead_code)] 73 74 block_stream: S, 74 75 block_store: BS, 75 76 walker: Walker, ··· 110 111 ) -> Result<(Commit, Self), DriveError> { 111 112 let mut commit = None; 112 113 114 + log::warn!("init: load blocks"); 115 + 116 + // go ahead and put all blocks in the block store 113 117 while let Some((cid, data)) = block_stream 114 118 .try_next() 115 119 .await ··· 119 123 let c: Commit = serde_ipld_dagcbor::from_slice(&data) 120 124 .map_err(|e| DriveError::BadCommit(e.into()))?; 121 125 commit = Some(c); 122 - break; 123 126 } else { 124 127 block_store.put( 125 128 cid, ··· 134 137 ); 135 138 } 136 139 } 140 + 141 + log::warn!("init: got commit?"); 137 142 138 143 // we either broke out or read all the blocks without finding the commit... 139 144 let commit = commit.ok_or(DriveError::MissingCommit)?; 140 145 141 146 let walker = Walker::new(commit.data); 142 147 148 + log::warn!("init: wrapping up"); 149 + 143 150 let me = Self { 144 151 block_stream, 145 152 block_store, ··· 149 156 Ok((commit, me)) 150 157 } 151 158 152 - async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError> { 153 - while let Some((cid, data)) = self 154 - .block_stream 155 - .try_next() 156 - .await 157 - .map_err(|e| DriveError::CarBlockError(e.into()))? 158 - { 159 - self.block_store.put( 160 - cid, 161 - if Node::could_be(&data) { 162 - MaybeProcessedBlock::Raw(data) 163 - } else { 164 - match (self.process)(&data) { 165 - Ok(t) => MaybeProcessedBlock::ProcessedOk(t), 166 - Err(e) => MaybeProcessedBlock::Unprocessable(e.to_string()), 167 - } 168 - }, 169 - ); 170 - if cid == cid_needed { 171 - return Ok(()); 172 - } 173 - } 174 - 175 - // if we never found the block 176 - Err(DriveError::MissingBlock(cid_needed)) 177 - } 178 - 179 159 /// Manually step through the record outputs 180 160 pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> { 181 - loop { 182 - // walk as far as we can until we run out of blocks or find a record 183 - let cid_needed = match self.walker.step(&mut self.block_store, &self.process)? { 184 - Step::Rest(cid) => cid, 185 - Step::Finish => return Ok(None), 186 - Step::Step { rkey, data } => return Ok(Some((rkey, data))), 187 - }; 188 - 189 - // load blocks until we reach that cid 190 - self.drive_until(cid_needed).await?; 161 + match self.walker.step(&mut self.block_store, &self.process)? { 162 + Step::Rest(cid) => Err(DriveError::MissingBlock(cid)), 163 + Step::Finish => Ok(None), 164 + Step::Step { rkey, data } => Ok(Some((rkey, data))), 191 165 } 192 166 } 193 167
+3 -5
src/disk_redb.rs
··· 12 12 13 13 impl RedbStore { 14 14 pub fn new(path: impl AsRef<Path>) -> Result<Self, Error> { 15 + log::warn!("redb new"); 15 16 let db = Database::create(path)?; 17 + log::warn!("db created"); 16 18 Ok(Self { db }) 17 19 } 18 20 } ··· 35 37 fn get(&self, c: Cid) -> Option<MPB> { 36 38 let key_bytes = c.to_bytes(); 37 39 let tx = self.db.begin_read().unwrap(); 38 - let table = match tx.open_table(TABLE) { 39 - Ok(t) => t, 40 - Err(redb::TableError::TableDoesNotExist(_)) => return None, 41 - e => e.unwrap(), 42 - }; 40 + let table = tx.open_table(TABLE).unwrap(); 43 41 let maybe_val_bytes = table.get(&*key_bytes).unwrap()?; 44 42 let (t, n): (MPB, usize) = 45 43 bincode::serde::decode_from_slice(maybe_val_bytes.value(), bincode::config::standard())
+65
src/disk_sqlite.rs
··· 1 + use crate::disk_drive::BlockStore; 2 + use ipld_core::cid::Cid; 3 + use rusqlite::{Connection, OptionalExtension, Result}; 4 + use serde::{Serialize, de::DeserializeOwned}; 5 + use std::path::Path; 6 + 7 + pub struct SqliteStore { 8 + conn: Connection, 9 + } 10 + 11 + impl SqliteStore { 12 + pub fn new(path: impl AsRef<Path>) -> Result<Self> { 13 + let conn = Connection::open(path)?; 14 + conn.pragma_update(None, "journal_mode", "WAL")?; 15 + conn.pragma_update(None, "synchronous", "OFF")?; 16 + conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?; 17 + conn.execute( 18 + "CREATE TABLE blocks ( 19 + key BLOB PRIMARY KEY NOT NULL, 20 + val BLOB NOT NULL 21 + ) WITHOUT ROWID", 22 + (), 23 + )?; 24 + 25 + Ok(Self { conn }) 26 + } 27 + } 28 + 29 + impl Drop for SqliteStore { 30 + fn drop(&mut self) { 31 + self.conn.execute("DROP TABLE blocks", ()).unwrap(); 32 + } 33 + } 34 + 35 + impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for SqliteStore { 36 + fn put(&self, c: Cid, t: MPB) { 37 + let key_bytes = c.to_bytes(); 38 + let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 39 + 40 + self.conn 41 + .execute( 42 + "INSERT INTO blocks (key, val) VALUES (?1, ?2)", 43 + (&key_bytes, &val_bytes), 44 + ) 45 + .unwrap(); 46 + } 47 + fn get(&self, c: Cid) -> Option<MPB> { 48 + let key_bytes = c.to_bytes(); 49 + 50 + let val_bytes: Vec<u8> = self 51 + .conn 52 + .query_one( 53 + "SELECT val FROM blocks WHERE key = ?1", 54 + (&key_bytes,), 55 + |row| row.get(0), 56 + ) 57 + .optional() 58 + .unwrap()?; 59 + 60 + let (t, n): (MPB, usize) = 61 + bincode::serde::decode_from_slice(&val_bytes, bincode::config::standard()).unwrap(); 62 + assert_eq!(val_bytes.len(), n); 63 + Some(t) 64 + } 65 + }
+1
src/lib.rs
··· 4 4 5 5 pub mod disk_drive; 6 6 pub mod disk_redb; 7 + pub mod disk_sqlite; 7 8 pub mod disk_walk; 8 9 pub mod drive; 9 10 pub mod mst;