Fast and robust atproto CAR file processing in rust

sqlite -> fjall 2 for ~2x speedup

Changed files
+114 -241
examples
disk-read-file
src
+67 -130
Cargo.lock
··· 167 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 168 169 169 [[package]] 170 - name = "byteorder-lite" 171 - version = "0.1.0" 170 + name = "byteorder" 171 + version = "1.5.0" 172 172 source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 173 + checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 174 174 175 175 [[package]] 176 176 name = "bytes" ··· 180 180 181 181 [[package]] 182 182 name = "byteview" 183 - version = "0.10.0" 183 + version = "0.6.1" 184 184 source = "registry+https://github.com/rust-lang/crates.io-index" 185 - checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 185 + checksum = "6236364b88b9b6d0bc181ba374cf1ab55ba3ef97a1cb6f8cddad48a273767fb5" 186 186 187 187 [[package]] 188 188 name = "cast" ··· 458 458 ] 459 459 460 460 [[package]] 461 + name = "double-ended-peekable" 462 + version = "0.1.0" 463 + source = "registry+https://github.com/rust-lang/crates.io-index" 464 + checksum = "c0d05e1c0dbad51b52c38bda7adceef61b9efc2baf04acfe8726a8c4630a6f57" 465 + 466 + [[package]] 461 467 name = "either" 462 468 version = "1.15.0" 463 469 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 515 521 ] 516 522 517 523 [[package]] 518 - name = "fallible-iterator" 519 - version = "0.3.0" 520 - source = "registry+https://github.com/rust-lang/crates.io-index" 521 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 522 - 523 - [[package]] 524 - name = "fallible-streaming-iterator" 525 - version = "0.1.9" 526 - source = "registry+https://github.com/rust-lang/crates.io-index" 527 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 528 - 529 - [[package]] 530 524 name = "fastrand" 531 525 version = "2.3.0" 532 526 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 534 528 535 529 [[package]] 536 530 name = "fjall" 537 - version = "3.0.0" 531 + version = "2.11.2" 538 532 source = "registry+https://github.com/rust-lang/crates.io-index" 539 - checksum = "4986f550347ed1666561f36e8bf1be3c97df72850ecef0140129da6e2d0aa911" 533 + checksum = "0b25ad44cd4360a0448a9b5a0a6f1c7a621101cca4578706d43c9a821418aebc" 540 534 dependencies = [ 541 - "byteorder-lite", 535 + "byteorder", 542 536 "byteview", 543 537 "dashmap", 544 - "flume", 545 538 "log", 546 539 "lsm-tree", 547 - "lz4_flex", 540 + "path-absolutize", 541 + "std-semaphore", 548 542 "tempfile", 549 543 "xxhash-rust", 550 544 ] 551 545 552 546 [[package]] 553 - name = "flume" 554 - version = "0.12.0" 555 - source = "registry+https://github.com/rust-lang/crates.io-index" 556 - checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 557 - dependencies = [ 558 - "spin", 559 - ] 560 - 561 - [[package]] 562 - name = "foldhash" 563 - version = "0.1.5" 564 - source = "registry+https://github.com/rust-lang/crates.io-index" 565 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 566 - 567 - [[package]] 568 547 name = "futures" 569 548 version = "0.3.31" 570 549 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 682 661 checksum = "e629b9b98ef3dd8afe6ca2bd0f89306cec16d43d907889945bc5d6687f2f13c7" 683 662 684 663 [[package]] 664 + name = "guardian" 665 + version = "1.3.0" 666 + source = "registry+https://github.com/rust-lang/crates.io-index" 667 + checksum = "17e2ac29387b1aa07a1e448f7bb4f35b500787971e965b02842b900afa5c8f6f" 668 + 669 + [[package]] 685 670 name = "half" 686 671 version = "2.7.0" 687 672 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 700 685 701 686 [[package]] 702 687 name = "hashbrown" 703 - version = "0.15.5" 704 - source = "registry+https://github.com/rust-lang/crates.io-index" 705 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 706 - dependencies = [ 707 - "foldhash", 708 - ] 709 - 710 - [[package]] 711 - name = "hashbrown" 712 688 version = "0.16.1" 713 689 source = "registry+https://github.com/rust-lang/crates.io-index" 714 690 checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 715 691 716 692 [[package]] 717 - name = "hashlink" 718 - version = "0.10.0" 719 - source = "registry+https://github.com/rust-lang/crates.io-index" 720 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 721 - dependencies = [ 722 - "hashbrown 0.15.5", 723 - ] 724 - 725 - [[package]] 726 693 name = "heck" 727 694 version = "0.5.0" 728 695 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 837 804 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 838 805 839 806 [[package]] 840 - name = "libsqlite3-sys" 841 - version = "0.35.0" 842 - source = "registry+https://github.com/rust-lang/crates.io-index" 843 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 844 - dependencies = [ 845 - "pkg-config", 846 - "vcpkg", 847 - ] 848 - 849 - [[package]] 850 807 name = "linux-raw-sys" 851 808 version = "0.11.0" 852 809 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 869 826 870 827 [[package]] 871 828 name = "lsm-tree" 872 - version = "3.0.0" 829 + version = "2.10.4" 873 830 source = "registry+https://github.com/rust-lang/crates.io-index" 874 - checksum = "3a206e87e8bc38114045060ec1fc6bc4e4559748a37e9622b910d80e48863e87" 831 + checksum = "799399117a2bfb37660e08be33f470958babb98386b04185288d829df362ea15" 875 832 dependencies = [ 876 - "byteorder-lite", 877 - "byteview", 833 + "byteorder", 878 834 "crossbeam-skiplist", 835 + "double-ended-peekable", 879 836 "enum_dispatch", 837 + "guardian", 880 838 "interval-heap", 881 839 "log", 882 - "lz4_flex", 840 + "path-absolutize", 883 841 "quick_cache", 884 842 "rustc-hash", 885 843 "self_cell", 886 - "sfa", 887 844 "tempfile", 845 + "value-log", 888 846 "varint-rs", 889 847 "xxhash-rust", 890 - ] 891 - 892 - [[package]] 893 - name = "lz4_flex" 894 - version = "0.11.5" 895 - source = "registry+https://github.com/rust-lang/crates.io-index" 896 - checksum = "08ab2867e3eeeca90e844d1940eab391c9dc5228783db2ed999acbc0a9ed375a" 897 - dependencies = [ 898 - "twox-hash", 899 848 ] 900 849 901 850 [[package]] ··· 1018 967 ] 1019 968 1020 969 [[package]] 970 + name = "path-absolutize" 971 + version = "3.1.1" 972 + source = "registry+https://github.com/rust-lang/crates.io-index" 973 + checksum = "e4af381fe79fa195b4909485d99f73a80792331df0625188e707854f0b3383f5" 974 + dependencies = [ 975 + "path-dedot", 976 + ] 977 + 978 + [[package]] 979 + name = "path-dedot" 980 + version = "3.1.1" 981 + source = "registry+https://github.com/rust-lang/crates.io-index" 982 + checksum = "07ba0ad7e047712414213ff67533e6dd477af0a4e1d14fb52343e53d30ea9397" 983 + dependencies = [ 984 + "once_cell", 985 + ] 986 + 987 + [[package]] 1021 988 name = "pin-project-lite" 1022 989 version = "0.2.16" 1023 990 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1028 995 version = "0.1.0" 1029 996 source = "registry+https://github.com/rust-lang/crates.io-index" 1030 997 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 1031 - 1032 - [[package]] 1033 - name = "pkg-config" 1034 - version = "0.3.32" 1035 - source = "registry+https://github.com/rust-lang/crates.io-index" 1036 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 1037 998 1038 999 [[package]] 1039 1000 name = "plotters" ··· 1185 1146 "iroh-car", 1186 1147 "log", 1187 1148 "multibase", 1188 - "rusqlite", 1189 1149 "serde", 1190 1150 "serde_bytes", 1191 1151 "serde_ipld_dagcbor", ··· 1193 1153 "tempfile", 1194 1154 "thiserror 2.0.17", 1195 1155 "tokio", 1196 - ] 1197 - 1198 - [[package]] 1199 - name = "rusqlite" 1200 - version = "0.37.0" 1201 - source = "registry+https://github.com/rust-lang/crates.io-index" 1202 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1203 - dependencies = [ 1204 - "bitflags", 1205 - "fallible-iterator", 1206 - "fallible-streaming-iterator", 1207 - "hashlink", 1208 - "libsqlite3-sys", 1209 - "smallvec", 1210 1156 ] 1211 1157 1212 1158 [[package]] ··· 1333 1279 ] 1334 1280 1335 1281 [[package]] 1336 - name = "sfa" 1337 - version = "1.0.0" 1338 - source = "registry+https://github.com/rust-lang/crates.io-index" 1339 - checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1340 - dependencies = [ 1341 - "byteorder-lite", 1342 - "log", 1343 - "xxhash-rust", 1344 - ] 1345 - 1346 - [[package]] 1347 1282 name = "sha2" 1348 1283 version = "0.10.9" 1349 1284 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1386 1321 ] 1387 1322 1388 1323 [[package]] 1389 - name = "spin" 1390 - version = "0.9.8" 1324 + name = "std-semaphore" 1325 + version = "0.1.0" 1391 1326 source = "registry+https://github.com/rust-lang/crates.io-index" 1392 - checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1393 - dependencies = [ 1394 - "lock_api", 1395 - ] 1327 + checksum = "33ae9eec00137a8eed469fb4148acd9fc6ac8c3f9b110f52cd34698c8b5bfa0e" 1396 1328 1397 1329 [[package]] 1398 1330 name = "strsim" ··· 1517 1449 ] 1518 1450 1519 1451 [[package]] 1520 - name = "twox-hash" 1521 - version = "2.1.2" 1522 - source = "registry+https://github.com/rust-lang/crates.io-index" 1523 - checksum = "9ea3136b675547379c4bd395ca6b938e5ad3c3d20fad76e7fe85f9e0d011419c" 1524 - 1525 - [[package]] 1526 1452 name = "typenum" 1527 1453 version = "1.19.0" 1528 1454 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1559 1485 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1560 1486 1561 1487 [[package]] 1488 + name = "value-log" 1489 + version = "1.9.0" 1490 + source = "registry+https://github.com/rust-lang/crates.io-index" 1491 + checksum = "62fc7c4ce161f049607ecea654dca3f2d727da5371ae85e2e4f14ce2b98ed67c" 1492 + dependencies = [ 1493 + "byteorder", 1494 + "byteview", 1495 + "interval-heap", 1496 + "log", 1497 + "path-absolutize", 1498 + "rustc-hash", 1499 + "tempfile", 1500 + "varint-rs", 1501 + "xxhash-rust", 1502 + ] 1503 + 1504 + [[package]] 1562 1505 name = "varint-rs" 1563 1506 version = "2.2.0" 1564 1507 source = "registry+https://github.com/rust-lang/crates.io-index" 1565 1508 checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1566 - 1567 - [[package]] 1568 - name = "vcpkg" 1569 - version = "0.2.15" 1570 - source = "registry+https://github.com/rust-lang/crates.io-index" 1571 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1572 1509 1573 1510 [[package]] 1574 1511 name = "version_check"
+1 -2
Cargo.toml
··· 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 - fjall = "3.0.0" 11 + fjall = { version = "2.11.2", default-features = false } 12 12 futures = "0.3.31" 13 13 futures-core = "0.3.31" 14 14 ipld-core = { version = "0.4.2", features = ["serde"] } 15 15 iroh-car = "0.5.1" 16 16 log = "0.4.28" 17 17 multibase = "0.9.2" 18 - rusqlite = "0.37.0" 19 18 serde = { version = "1.0.228", features = ["derive"] } 20 19 serde_bytes = "0.11.19" 21 20 serde_ipld_dagcbor = "0.6.4"
+5 -1
examples/disk-read-file/main.rs
··· 33 33 // in this example we only bother handling CARs that are too big for memory 34 34 // `noop` helper means: do no block processing, store the raw blocks 35 35 let driver = match DriverBuilder::new() 36 - .with_mem_limit_mb(10) // how much memory can be used before disk spill 36 + .with_mem_limit_mb(32) // how much memory can be used before disk spill 37 37 .load_car(reader) 38 38 .await? 39 39 { ··· 81 81 } 82 82 83 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 + 85 + let driver = join.await?; 86 + 87 + driver.reset_store().await?; 84 88 85 89 log::info!("done. n={n} zeros={zeros}"); 86 90
+33 -70
src/disk.rs
··· 18 18 */ 19 19 20 20 use crate::drive::DriveError; 21 - use fjall::{Database, Keyspace, KeyspaceCreateOptions, Error as FjallError}; 21 + use fjall::{Config, Error as FjallError, Keyspace, Partition, PartitionCreateOptions}; 22 22 use std::path::PathBuf; 23 23 24 24 #[derive(Debug, thiserror::Error)] ··· 38 38 /// limit. 39 39 #[error("Maximum disk size reached")] 40 40 MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 41 } 54 42 55 43 /// Builder-style disk store setup ··· 105 93 /// On-disk block storage 106 94 pub struct DiskStore { 107 95 #[allow(unused)] 108 - db: Database, 109 - ks: Keyspace, 96 + db: Keyspace, 97 + partition: Partition, 110 98 max_stored: usize, 111 99 stored: usize, 112 100 } ··· 119 107 max_stored_mb: usize, 120 108 ) -> Result<Self, DiskError> { 121 109 let max_stored = max_stored_mb * 2_usize.pow(20); 122 - let (db, ks) = tokio::task::spawn_blocking(move || { 123 - let db = Database::builder(path) 110 + let (db, partition) = tokio::task::spawn_blocking(move || { 111 + let db = Config::new(path) 124 112 // .manual_journal_persist(true) 125 - // .worker_threads(1) 126 - // .cache_size(cache_mb as u64 * 2_u64.pow(20)) 127 - // .temporary(true) 113 + // .flush_workers(1) 114 + // .compaction_workers(1) 115 + .cache_size(cache_mb as u64 * 2_u64.pow(20)) 116 + .temporary(true) 128 117 .open()?; 129 - let ks = db.keyspace("z", || 130 - KeyspaceCreateOptions::default() 131 - // .expect_point_read_hits(true) 132 - // .manual_journal_persist(true) 133 - )?; 118 + let partition = Self::get_partition(&db)?; 134 119 135 - // Self::reset_tables(&ks)?; 136 - 137 - Ok::<_, DiskError>((db, ks)) 120 + Ok::<_, DiskError>((db, partition)) 138 121 }) 139 122 .await??; 140 123 141 124 Ok(Self { 142 125 db, 143 - ks, 126 + partition, 144 127 max_stored, 145 128 stored: 0, 146 129 }) 147 130 } 148 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 149 - Ok(SqliteWriter { 150 - ks: self.ks.clone(), 151 - stored: &mut self.stored, 152 - max: self.max_stored, 153 - }) 154 - } 155 - pub(crate) fn get_reader(&self) -> Result<SqliteReader, DiskError> { 156 - Ok(SqliteReader { 157 - ks: self.ks.clone(), 158 - }) 159 - } 131 + 160 132 /// Drop and recreate the kv table 161 - pub async fn reset(self) -> Result<Self, DiskError> { 133 + pub async fn reset(mut self) -> Result<Self, DiskError> { 162 134 tokio::task::spawn_blocking(move || { 163 - Self::reset_tables(&self.ks)?; 135 + let partition = self.partition; 136 + Self::reset_partition(&self.db, partition)?; 137 + self.partition = Self::get_partition(&self.db)?; 164 138 Ok(self) 165 139 }) 166 140 .await? 167 141 } 168 - fn reset_tables(ks: &Keyspace) -> Result<(), DiskError> { 169 - ks.clear()?; 170 - Ok(()) 171 - } 172 - } 173 142 174 - pub(crate) struct SqliteWriter<'a> { 175 - ks: Keyspace, 176 - stored: &'a mut usize, 177 - max: usize, 178 - } 179 - 180 - impl SqliteWriter<'_> { 143 + fn get_partition(db: &Keyspace) -> Result<Partition, FjallError> { 144 + db.open_partition("z", PartitionCreateOptions::default()) 145 + } 146 + fn reset_partition(keyspace: &Keyspace, partition: Partition) -> Result<Partition, DiskError> { 147 + keyspace.delete_partition(partition)?; 148 + let partition = Self::get_partition(keyspace)?; 149 + Ok(partition) 150 + } 181 151 pub(crate) fn put_many( 182 152 &mut self, 183 153 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 184 154 ) -> Result<(), DriveError> { 155 + let mut batch = self.db.batch(); 185 156 for pair in kv { 186 157 let (k, v) = pair?; 187 - *self.stored += v.len(); 188 - if *self.stored > self.max { 158 + self.stored += v.len(); 159 + if self.stored > self.max_stored { 189 160 return Err(DiskError::MaxSizeExceeded.into()); 190 161 } 191 - self.ks.insert(k, v).map_err(DiskError::DbError)?; 162 + batch.insert(&self.partition, k, v); 192 163 } 164 + batch.commit().map_err(DiskError::DbError)?; 193 165 Ok(()) 194 166 } 195 - } 196 167 197 - pub(crate) struct SqliteReader { 198 - ks: Keyspace, 199 - } 200 - 201 - impl SqliteReader { 202 - pub(crate) fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, FjallError> { 203 - let rv = self 204 - .ks 205 - .get(&key)? 206 - .map(|v| v.as_ref().into()); 207 - Ok(rv) 168 + #[inline] 169 + pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 170 + self.partition.get(key) 208 171 } 209 172 }
+4 -34
src/drive.rs
··· 335 335 // move store in and back out so we can manage lifetimes 336 336 // dump mem blocks into the store 337 337 store = tokio::task::spawn(async move { 338 - let mut writer = store.get_writer()?; 339 - 340 338 let kvs = self 341 339 .mem_blocks 342 340 .into_iter() 343 341 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 344 342 345 - writer.put_many(kvs)?; 343 + store.put_many(kvs)?; 346 344 Ok::<_, DriveError>(store) 347 345 }) 348 346 .await??; ··· 350 348 let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 351 349 352 350 let store_worker = tokio::task::spawn_blocking(move || { 353 - let mut writer = store.get_writer()?; 354 - 355 351 while let Some(chunk) = rx.blocking_recv() { 356 352 let kvs = chunk 357 353 .into_iter() 358 354 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 359 - writer.put_many(kvs)?; 355 + store.put_many(kvs)?; 360 356 } 361 357 Ok::<_, DriveError>(store) 362 358 }); // await later ··· 465 461 // comes out again. 466 462 let (state, res) = tokio::task::spawn_blocking( 467 463 move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 468 - let mut reader_res = state.store.get_reader(); 469 - let reader: &mut _ = match reader_res { 470 - Ok(ref mut r) => r, 471 - Err(ref mut e) => { 472 - // unfortunately we can't return the error directly because 473 - // (for some reason) it's attached to the lifetime of the 474 - // reader? 475 - // hack a mem::swap so we can get it out :/ 476 - let e_swapped = e.steal(); 477 - // the pain: `state` *has to* outlive the reader 478 - drop(reader_res); 479 - return (state, Err(e_swapped.into())); 480 - } 481 - }; 482 - 483 464 let mut out = Vec::with_capacity(n); 484 465 485 466 for _ in 0..n { 486 467 // walk as far as we can until we run out of blocks or find a record 487 - let step = match state.walker.disk_step(reader, process) { 468 + let step = match state.walker.disk_step(&mut state.store, process) { 488 469 Ok(s) => s, 489 470 Err(e) => { 490 - // the pain: `state` *has to* outlive the reader 491 - drop(reader_res); 492 471 return (state, Err(e.into())); 493 472 } 494 473 }; 495 474 match step { 496 475 Step::Missing(cid) => { 497 - // the pain: `state` *has to* outlive the reader 498 - drop(reader_res); 499 476 return (state, Err(DriveError::MissingBlock(cid))); 500 477 } 501 478 Step::Finish => break, ··· 503 480 }; 504 481 } 505 482 506 - // `state` *has to* outlive the reader 507 - drop(reader_res); 508 - 509 483 (state, Ok::<_, DriveError>(out)) 510 484 }, 511 485 ) ··· 529 503 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 530 504 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 531 505 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 532 - let mut reader = match store.get_reader() { 533 - Ok(r) => r, 534 - Err(e) => return tx.blocking_send(Err(e.into())), 535 - }; 536 506 537 507 loop { 538 508 let mut out: BlockChunk<T> = Vec::with_capacity(n); ··· 540 510 for _ in 0..n { 541 511 // walk as far as we can until we run out of blocks or find a record 542 512 543 - let step = match walker.disk_step(&mut reader, self.process) { 513 + let step = match walker.disk_step(store, self.process) { 544 514 Ok(s) => s, 545 515 Err(e) => return tx.blocking_send(Err(e.into())), 546 516 };
+4 -4
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::disk::SqliteReader; 3 + use crate::disk::DiskStore; 4 4 use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 5 use crate::mst::Node; 6 6 use crate::process::Processable; ··· 239 239 /// blocking!!!!!! 240 240 pub fn disk_step<T: Processable>( 241 241 &mut self, 242 - reader: &mut SqliteReader, 242 + reader: &mut DiskStore, 243 243 process: impl Fn(Vec<u8>) -> T, 244 244 ) -> Result<Step<T>, WalkError> { 245 245 loop { ··· 252 252 &mut Need::Node { depth, cid } => { 253 253 let cid_bytes = cid.to_bytes(); 254 254 log::trace!("need node {cid:?}"); 255 - let Some(block_bytes) = reader.get(cid_bytes)? else { 255 + let Some(block_bytes) = reader.get(&cid_bytes)? else { 256 256 log::trace!("node not found, resting"); 257 257 return Ok(Step::Missing(cid)); 258 258 }; ··· 274 274 Need::Record { rkey, cid } => { 275 275 log::trace!("need record {cid:?}"); 276 276 let cid_bytes = cid.to_bytes(); 277 - let Some(data_bytes) = reader.get(cid_bytes)? else { 277 + let Some(data_bytes) = reader.get(&cid_bytes)? else { 278 278 log::trace!("record block not found, resting"); 279 279 return Ok(Step::Missing(*cid)); 280 280 };