Fast and robust atproto CAR file processing in rust

try rustcask

it works? it's fast?

Changed files
+162 -13
examples
disk-read-file
src
+78 -2
Cargo.lock
··· 127 127 128 128 [[package]] 129 129 name = "bincode" 130 + version = "1.3.3" 131 + source = "registry+https://github.com/rust-lang/crates.io-index" 132 + checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" 133 + dependencies = [ 134 + "serde", 135 + ] 136 + 137 + [[package]] 138 + name = "bincode" 130 139 version = "2.0.1" 131 140 source = "registry+https://github.com/rust-lang/crates.io-index" 132 141 checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" ··· 527 536 "pin-project-lite", 528 537 "pin-utils", 529 538 "slab", 539 + ] 540 + 541 + [[package]] 542 + name = "getrandom" 543 + version = "0.2.16" 544 + source = "registry+https://github.com/rust-lang/crates.io-index" 545 + checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" 546 + dependencies = [ 547 + "cfg-if", 548 + "libc", 549 + "wasi 0.11.1+wasi-snapshot-preview1", 530 550 ] 531 551 532 552 [[package]] ··· 893 913 ] 894 914 895 915 [[package]] 916 + name = "ppv-lite86" 917 + version = "0.2.21" 918 + source = "registry+https://github.com/rust-lang/crates.io-index" 919 + checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" 920 + dependencies = [ 921 + "zerocopy", 922 + ] 923 + 924 + [[package]] 896 925 name = "proc-macro2" 897 926 version = "1.0.101" 898 927 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 917 946 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 918 947 919 948 [[package]] 949 + name = "rand" 950 + version = "0.8.5" 951 + source = "registry+https://github.com/rust-lang/crates.io-index" 952 + checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 953 + dependencies = [ 954 + "libc", 955 + "rand_chacha", 956 + "rand_core", 957 + ] 958 + 959 + [[package]] 960 + name = "rand_chacha" 961 + version = "0.3.1" 962 + source = "registry+https://github.com/rust-lang/crates.io-index" 963 + checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 964 + dependencies = [ 965 + "ppv-lite86", 966 + "rand_core", 967 + ] 968 + 969 + [[package]] 970 + name = "rand_core" 971 + version = "0.6.4" 972 + source = "registry+https://github.com/rust-lang/crates.io-index" 973 + checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 974 + dependencies = [ 975 + "getrandom 0.2.16", 976 + ] 977 + 978 + [[package]] 920 979 name = "rayon" 921 980 version = "1.11.0" 922 981 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 987 1046 name = "repo-stream" 988 1047 version = "0.1.1" 989 1048 dependencies = [ 990 - "bincode", 1049 + "bincode 2.0.1", 991 1050 "clap", 992 1051 "criterion", 993 1052 "env_logger", ··· 999 1058 "multibase", 1000 1059 "redb", 1001 1060 "rusqlite", 1061 + "rustcask", 1002 1062 "serde", 1003 1063 "serde_bytes", 1004 1064 "serde_ipld_dagcbor", ··· 1026 1086 version = "0.1.26" 1027 1087 source = "registry+https://github.com/rust-lang/crates.io-index" 1028 1088 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1089 + 1090 + [[package]] 1091 + name = "rustcask" 1092 + version = "0.1.0" 1093 + source = "registry+https://github.com/rust-lang/crates.io-index" 1094 + checksum = "e17ed1a2733a60fea8495ddcb42c22cabd17afec7ffa7b024b161dd662da4003" 1095 + dependencies = [ 1096 + "bincode 1.3.3", 1097 + "bytes", 1098 + "clap", 1099 + "log", 1100 + "rand", 1101 + "regex", 1102 + "serde", 1103 + "tokio", 1104 + ] 1029 1105 1030 1106 [[package]] 1031 1107 name = "rustix" ··· 1198 1274 checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" 1199 1275 dependencies = [ 1200 1276 "fastrand", 1201 - "getrandom", 1277 + "getrandom 0.3.3", 1202 1278 "once_cell", 1203 1279 "rustix", 1204 1280 "windows-sys 0.60.2",
+1
Cargo.toml
··· 16 16 multibase = "0.9.2" 17 17 redb = "3.1.0" 18 18 rusqlite = "0.37.0" 19 + rustcask = "0.1.0" 19 20 serde = { version = "1.0.228", features = ["derive"] } 20 21 serde_bytes = "0.11.19" 21 22 serde_ipld_dagcbor = "0.6.4"
+3 -2
examples/disk-read-file/main.rs
··· 35 35 let mb = 2_usize.pow(20); 36 36 37 37 let mut driver = 38 - match repo_stream::drive::load_car(reader, |block| S(block.len()), 16 * mb).await? { 38 + match repo_stream::drive::load_car(reader, |block| S(block.len()), 1 * mb).await? { 39 39 repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 40 40 repo_stream::drive::Vehicle::Big(big_stuff) => { 41 41 // let disk_store = repo_stream::disk::SqliteStore::new(tmpfile); 42 - let disk_store = repo_stream::disk::RedbStore::new(tmpfile); 42 + // let disk_store = repo_stream::disk::RedbStore::new(tmpfile); 43 + let disk_store = repo_stream::disk::RustcaskStore::new(tmpfile); 43 44 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 44 45 log::warn!("big: {:?}", commit); 45 46 driver
+80 -1
src/disk.rs
··· 208 208 } 209 209 } 210 210 211 - ///// TODO: that other single file db thing to try 211 + ///// rustcask?? 212 + 213 + pub struct RustcaskStore { 214 + path: PathBuf, 215 + } 216 + 217 + impl RustcaskStore { 218 + pub fn new(path: PathBuf) -> Self { 219 + Self { path } 220 + } 221 + } 222 + 223 + #[derive(Debug, thiserror::Error)] 224 + pub enum CaskError { 225 + #[error(transparent)] 226 + OpenError(#[from] rustcask::error::OpenError), 227 + #[error(transparent)] 228 + SetError(#[from] rustcask::error::SetError), 229 + #[error("failed to get key: {0}")] 230 + GetError(String), 231 + #[error("failed to ensure directory: {0}")] 232 + EnsureDirError(std::io::Error), 233 + } 234 + 235 + impl StorageErrorBase for CaskError {} 236 + 237 + impl DiskStore for RustcaskStore { 238 + type StorageError = CaskError; 239 + type Access = RustcaskAccess; 240 + async fn get_access(&mut self) -> Result<RustcaskAccess, CaskError> { 241 + let path = self.path.clone(); 242 + let db = tokio::task::spawn_blocking(move || { 243 + std::fs::create_dir_all(&path).map_err(CaskError::EnsureDirError)?; 244 + let db = rustcask::Rustcask::builder().open(&path)?; 245 + Ok::<_, Self::StorageError>(db) 246 + }) 247 + .await 248 + .expect("join error")?; 249 + 250 + Ok(RustcaskAccess { db }) 251 + } 252 + } 253 + 254 + pub struct RustcaskAccess { 255 + db: rustcask::Rustcask, 256 + } 257 + 258 + impl DiskAccess for RustcaskAccess { 259 + type StorageError = CaskError; 260 + fn get_writer(&mut self) -> Result<impl DiskWriter<CaskError>, CaskError> { 261 + Ok(RustcaskWriter { db: self.db.clone() }) 262 + } 263 + fn get_reader(&self) -> Result<impl DiskReader<StorageError = CaskError>, CaskError> { 264 + Ok(RustcaskReader { db: self.db.clone() }) 265 + } 266 + } 267 + 268 + pub struct RustcaskWriter { 269 + db: rustcask::Rustcask, 270 + } 271 + 272 + impl DiskWriter<CaskError> for RustcaskWriter { 273 + fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), CaskError> { 274 + self.db.set(key, val)?; 275 + Ok(()) 276 + } 277 + } 278 + 279 + pub struct RustcaskReader { 280 + db: rustcask::Rustcask, 281 + } 282 + 283 + impl DiskReader for RustcaskReader { 284 + type StorageError = CaskError; 285 + fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, CaskError> { 286 + self.db 287 + .get(&key) 288 + .map_err(|e| CaskError::GetError(e.to_string())) 289 + } 290 + }
-8
src/drive.rs
··· 49 49 DiskTripped(#[from] DiskTrip<E>), 50 50 } 51 51 52 - // #[derive(Debug, thiserror::Error)] 53 - // pub enum Boooooo<E: StorageErrorBase> { 54 - // #[error("disk tripped: {0}")] 55 - // DiskTripped(#[from] DiskTrip<E>), 56 - // #[error("dde whatever: {0}")] 57 - // DiskDriveError(#[from] DiskDriveError<E>), 58 - // } 59 - 60 52 pub trait Processable: Clone + Serialize + DeserializeOwned { 61 53 /// the additional size taken up (not including its mem::size_of) 62 54 fn get_size(&self) -> usize;