Fast and robust atproto CAR file processing in rust

fjall v3

pretty good speedup!

Changed files
+47 -116
examples
disk-read-file
src
+40 -67
Cargo.lock
··· 167 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 168 169 169 [[package]] 170 - name = "byteorder" 171 - version = "1.5.0" 170 + name = "byteorder-lite" 171 + version = "0.1.0" 172 172 source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 173 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 174 174 175 175 [[package]] 176 176 name = "bytes" ··· 180 180 181 181 [[package]] 182 182 name = "byteview" 183 - version = "0.6.1" 183 + version = "0.10.0" 184 184 source = "registry+https://github.com/rust-lang/crates.io-index" 185 - checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5" 185 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 186 186 187 187 [[package]] 188 188 name = "cast" ··· 458 458 ] 459 459 460 460 [[package]] 461 - name = "double-ended-peekable" 462 - version = "0.1.0" 463 - source = "registry+https://github.com/rust-lang/crates.io-index" 464 - checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" 465 - 466 - [[package]] 467 461 name = "either" 468 462 version = "1.15.0" 469 463 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 528 522 529 523 [[package]] 530 524 name = "fjall" 531 - version = "2.11.2" 525 + version = "3.0.0" 532 526 source = "registry+https://github.com/rust-lang/crates.io-index" 533 - checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc" 527 + checksum = "4986f550347ed1666561f36e8bf1be3c97df72850ecef0140129da6e2d0aa911" 534 528 dependencies = [ 535 - "byteorder", 529 + "byteorder-lite", 536 530 "byteview", 537 531 "dashmap", 532 + "flume", 538 533 "log", 539 534 "lsm-tree", 540 - "path-absolutize", 541 - "std-semaphore", 542 535 "tempfile", 543 536 "xxhash-rust", 544 537 ] 545 538 546 539 [[package]] 540 + name = "flume" 541 + version = "0.12.0" 542 + source = "registry+https://github.com/rust-lang/crates.io-index" 543 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 544 + dependencies = [ 545 + "spin", 546 + ] 547 + 548 + [[package]] 547 549 name = "futures" 548 550 version = "0.3.31" 549 551 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 659 661 version = "0.32.3" 660 662 source = "registry+https://github.com/rust-lang/crates.io-index" 661 663 checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" 662 - 663 - [[package]] 664 - name = "guardian" 665 - version = "1.3.0" 666 - source = "registry+https://github.com/rust-lang/crates.io-index" 667 - checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f" 668 664 669 665 [[package]] 670 666 name = "half" ··· 826 822 827 823 [[package]] 828 824 name = "lsm-tree" 829 - version = "2.10.4" 825 + version = "3.0.0" 830 826 source = "registry+https://github.com/rust-lang/crates.io-index" 831 - checksum = "799399117a2bfb37660e08be33f470958babb98386b04185288d829df362ea15" 827 + checksum = "3a206e87e8bc38114045060ec1fc6bc4e4559748a37e9622b910d80e48863e87" 832 828 dependencies = [ 833 - "byteorder", 829 + "byteorder-lite", 830 + "byteview", 834 831 "crossbeam-skiplist", 835 - "double-ended-peekable", 836 832 "enum_dispatch", 837 - "guardian", 838 833 "interval-heap", 839 834 "log", 840 - "path-absolutize", 841 835 "quick_cache", 842 836 "rustc-hash", 843 837 "self_cell", 838 + "sfa", 844 839 "tempfile", 845 - "value-log", 846 840 "varint-rs", 847 841 "xxhash-rust", 848 842 ] ··· 967 961 ] 968 962 969 963 [[package]] 970 - name = "path-absolutize" 971 - version = "3.1.1" 972 - source = "registry+https://github.com/rust-lang/crates.io-index" 973 - checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5" 974 - dependencies = [ 975 - "path-dedot", 976 - ] 977 - 978 - [[package]] 979 - name = "path-dedot" 980 - version = "3.1.1" 981 - source = "registry+https://github.com/rust-lang/crates.io-index" 982 - checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397" 983 - dependencies = [ 984 - "once_cell", 985 - ] 986 - 987 - [[package]] 988 964 name = "pin-project-lite" 989 965 version = "0.2.16" 990 966 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1279 1255 ] 1280 1256 1281 1257 [[package]] 1258 + name = "sfa" 1259 + version = "1.0.0" 1260 + source = "registry+https://github.com/rust-lang/crates.io-index" 1261 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1262 + dependencies = [ 1263 + "byteorder-lite", 1264 + "log", 1265 + "xxhash-rust", 1266 + ] 1267 + 1268 + [[package]] 1282 1269 name = "sha2" 1283 1270 version = "0.10.9" 1284 1271 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1321 1308 ] 1322 1309 1323 1310 [[package]] 1324 - name = "std-semaphore" 1325 - version = "0.1.0" 1311 + name = "spin" 1312 + version = "0.9.8" 1326 1313 source = "registry+https://github.com/rust-lang/crates.io-index" 1327 - checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" 1314 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1315 + dependencies = [ 1316 + "lock_api", 1317 + ] 1328 1318 1329 1319 [[package]] 1330 1320 name = "strsim" ··· 1483 1473 version = "0.2.2" 1484 1474 source = "registry+https://github.com/rust-lang/crates.io-index" 1485 1475 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1486 - 1487 - [[package]] 1488 - name = "value-log" 1489 - version = "1.9.0" 1490 - source = "registry+https://github.com/rust-lang/crates.io-index" 1491 - checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c" 1492 - dependencies = [ 1493 - "byteorder", 1494 - "byteview", 1495 - "interval-heap", 1496 - "log", 1497 - "path-absolutize", 1498 - "rustc-hash", 1499 - "tempfile", 1500 - "varint-rs", 1501 - "xxhash-rust", 1502 - ] 1503 1476 1504 1477 [[package]] 1505 1478 name = "varint-rs"
+1 -1
Cargo.toml
··· 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 - fjall = { version = "2.11.2", default-features = false } 11 + fjall = { version = "3.0.0", default-features = false } 12 12 futures = "0.3.31" 13 13 futures-core = "0.3.31" 14 14 ipld-core = { version = "0.4.2", features = ["serde"] }
+1 -3
examples/disk-read-file/main.rs
··· 82 82 83 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 84 85 - let driver = join.await?; 86 - 87 - driver.reset_store().await?; 85 + join.await?; 88 86 89 87 log::info!("done. n={n} zeros={zeros}"); 90 88
-3
readme.md
··· 50 50 total_size += size; 51 51 } 52 52 } 53 - 54 - // clean up the disk store (drop tables etc) 55 - driver.reset_store().await?; 56 53 } 57 54 }; 58 55 println!("sum of size of all records: {total_size}");
+5 -24
src/disk.rs
··· 18 18 */ 19 19 20 20 use crate::drive::DriveError; 21 - use fjall::{Config, Error as FjallError, Keyspace, Partition, PartitionCreateOptions}; 21 + use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22 22 use std::path::PathBuf; 23 23 24 24 #[derive(Debug, thiserror::Error)] ··· 93 93 /// On-disk block storage 94 94 pub struct DiskStore { 95 95 #[allow(unused)] 96 - db: Keyspace, 97 - partition: Partition, 96 + db: Database, 97 + partition: Keyspace, 98 98 max_stored: usize, 99 99 stored: usize, 100 100 } ··· 108 108 ) -> Result<Self, DiskError> { 109 109 let max_stored = max_stored_mb * 2_usize.pow(20); 110 110 let (db, partition) = tokio::task::spawn_blocking(move || { 111 - let db = Config::new(path) 111 + let db = Database::builder(path) 112 112 // .manual_journal_persist(true) 113 113 // .flush_workers(1) 114 114 // .compaction_workers(1) 115 115 .cache_size(cache_mb as u64 * 2_u64.pow(20)) 116 116 .temporary(true) 117 117 .open()?; 118 - let partition = Self::get_partition(&db)?; 118 + let partition = db.keyspace("z", KeyspaceCreateOptions::default)?; 119 119 120 120 Ok::<_, DiskError>((db, partition)) 121 121 }) ··· 129 129 }) 130 130 } 131 131 132 - /// Drop and recreate the kv table 133 - pub async fn reset(mut self) -> Result<Self, DiskError> { 134 - tokio::task::spawn_blocking(move || { 135 - let partition = self.partition; 136 - Self::reset_partition(&self.db, partition)?; 137 - self.partition = Self::get_partition(&self.db)?; 138 - Ok(self) 139 - }) 140 - .await? 141 - } 142 - 143 - fn get_partition(db: &Keyspace) -> Result<Partition, FjallError> { 144 - db.open_partition("z", PartitionCreateOptions::default()) 145 - } 146 - fn reset_partition(keyspace: &Keyspace, partition: Partition) -> Result<Partition, DiskError> { 147 - keyspace.delete_partition(partition)?; 148 - let partition = Self::get_partition(keyspace)?; 149 - Ok(partition) 150 - } 151 132 pub(crate) fn put_many( 152 133 &mut self, 153 134 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
-15
src/drive.rs
··· 446 446 /// println!("{rkey}: size={}", record.len()); 447 447 /// } 448 448 /// } 449 - /// let store = disk_driver.reset_store().await?; 450 449 /// # Ok(()) 451 450 /// # } 452 451 /// ``` ··· 559 558 /// } 560 559 /// 561 560 /// } 562 - /// let store = join.await?.reset_store().await?; 563 561 /// # Ok(()) 564 562 /// # } 565 563 /// ``` ··· 581 579 }); 582 580 583 581 (rx, chan_task) 584 - } 585 - 586 - /// Reset the disk storage so it can be reused. You must call this. 587 - /// 588 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 589 - /// calls, that would be risky in an async context. For now you just have to 590 - /// carefully make sure you call it. 591 - /// 592 - /// The sqlite store is returned, so it can be reused for another 593 - /// `DiskDriver`. 594 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 595 - let BigState { store, .. } = self.state.take().expect("valid state"); 596 - Ok(store.reset().await?) 597 582 } 598 583 }
-3
src/lib.rs
··· 53 53 total_size += size; 54 54 } 55 55 } 56 - 57 - // clean up the disk store (drop tables etc) 58 - driver.reset_store().await?; 59 56 } 60 57 }; 61 58 println!("sum of size of all records: {total_size}");