Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

+73 -11
Cargo.lock
··· 152 152 checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" 153 153 154 154 [[package]] 155 + name = "block-buffer" 156 + version = "0.10.4" 157 + source = "registry+https://github.com/rust-lang/crates.io-index" 158 + checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" 159 + dependencies = [ 160 + "generic-array", 161 + ] 162 + 163 + [[package]] 155 164 name = "bumpalo" 156 165 version = "3.19.0" 157 166 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 287 296 ] 288 297 289 298 [[package]] 299 + name = "cpufeatures" 300 + version = "0.2.17" 301 + source = "registry+https://github.com/rust-lang/crates.io-index" 302 + checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" 303 + dependencies = [ 304 + "libc", 305 + ] 306 + 307 + [[package]] 290 308 name = "criterion" 291 309 version = "0.7.0" 292 310 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 352 370 checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" 353 371 354 372 [[package]] 373 + name = "crypto-common" 374 + version = "0.1.6" 375 + source = "registry+https://github.com/rust-lang/crates.io-index" 376 + checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" 377 + dependencies = [ 378 + "generic-array", 379 + "typenum", 380 + ] 381 + 382 + [[package]] 355 383 name = "data-encoding" 356 384 version = "2.9.0" 357 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 375 403 dependencies = [ 376 404 "data-encoding", 377 405 "syn 2.0.106", 406 + ] 407 + 408 + [[package]] 409 + name = "digest" 410 + version = "0.10.7" 411 + source = "registry+https://github.com/rust-lang/crates.io-index" 412 + checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" 413 + dependencies = [ 414 + "block-buffer", 415 + "crypto-common", 378 416 ] 379 417 380 418 [[package]] ··· 530 568 ] 531 569 532 570 [[package]] 571 + name = "generic-array" 572 + version = "0.14.9" 573 + source = "registry+https://github.com/rust-lang/crates.io-index" 574 + checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" 575 + dependencies = [ 576 + "typenum", 577 + "version_check", 578 + ] 579 + 580 + [[package]] 533 581 name = "getrandom" 534 582 version = "0.3.3" 535 583 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 937 985 ] 938 986 939 987 [[package]] 940 - name = "redb" 941 - version = "3.1.0" 942 - source = "registry+https://github.com/rust-lang/crates.io-index" 943 - checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06" 944 - dependencies = [ 945 - "libc", 946 - ] 947 - 948 - [[package]] 949 988 name = "redox_syscall" 950 989 version = "0.5.18" 951 990 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 985 1024 986 1025 [[package]] 987 1026 name = "repo-stream" 988 - version = "0.1.1" 1027 + version = "0.2.2" 989 1028 dependencies = [ 990 1029 "bincode", 991 1030 "clap", ··· 997 1036 "iroh-car", 998 1037 "log", 999 1038 "multibase", 1000 - "redb", 1001 1039 "rusqlite", 1002 1040 "serde", 1003 1041 "serde_bytes", 1004 1042 "serde_ipld_dagcbor", 1043 + "sha2", 1005 1044 "tempfile", 1006 1045 "thiserror 2.0.17", 1007 1046 "tokio", ··· 1133 1172 ] 1134 1173 1135 1174 [[package]] 1175 + name = "sha2" 1176 + version = "0.10.9" 1177 + source = "registry+https://github.com/rust-lang/crates.io-index" 1178 + checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" 1179 + dependencies = [ 1180 + "cfg-if", 1181 + "cpufeatures", 1182 + "digest", 1183 + ] 1184 + 1185 + [[package]] 1136 1186 name = "signal-hook-registry" 1137 1187 version = "1.4.6" 1138 1188 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1286 1336 ] 1287 1337 1288 1338 [[package]] 1339 + name = "typenum" 1340 + version = "1.19.0" 1341 + source = "registry+https://github.com/rust-lang/crates.io-index" 1342 + checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" 1343 + 1344 + [[package]] 1289 1345 name = "unicode-ident" 1290 1346 version = "1.0.19" 1291 1347 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1320 1376 version = "0.2.15" 1321 1377 source = "registry+https://github.com/rust-lang/crates.io-index" 1322 1378 checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 + 1380 + [[package]] 1381 + name = "version_check" 1382 + version = "0.9.5" 1383 + source = "registry+https://github.com/rust-lang/crates.io-index" 1384 + checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1323 1385 1324 1386 [[package]] 1325 1387 name = "virtue"
+7 -4
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.1.1" 3 + version = "0.2.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 - description = "Fast and robust atproto CAR file processing in rust" 6 + description = "A robust CAR file -> MST walker for atproto" 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] ··· 14 14 iroh-car = "0.5.1" 15 15 log = "0.4.28" 16 16 multibase = "0.9.2" 17 - redb = "3.1.0" 18 17 rusqlite = "0.37.0" 19 18 serde = { version = "1.0.228", features = ["derive"] } 20 19 serde_bytes = "0.11.19" 21 20 serde_ipld_dagcbor = "0.6.4" 21 + sha2 = "0.10.9" 22 22 thiserror = "2.0.17" 23 - tokio = "1.47.1" 23 + tokio = { version = "1.47.1", features = ["rt", "sync"] } 24 24 25 25 [dev-dependencies] 26 26 clap = { version = "4.5.48", features = ["derive"] } ··· 33 33 [profile.profiling] 34 34 inherits = "release" 35 35 debug = true 36 + 37 + # [profile.release] 38 + # debug = true 36 39 37 40 [[bench]] 38 41 name = "non-huge-cars"
+12 -21
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 use std::path::{Path, PathBuf}; 6 4 7 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 20 18 }); 21 19 } 22 20 23 - async fn drive_car(filename: impl AsRef<Path>) { 21 + async fn drive_car(filename: impl AsRef<Path>) -> usize { 24 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 25 23 let reader = tokio::io::BufReader::new(reader); 26 - let reader = CarReader::new(reader).await.unwrap(); 27 24 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root") 25 + let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 + .await 33 27 .unwrap() 34 - .clone(); 35 - 36 - let stream = std::pin::pin!(reader.stream()); 37 - 38 - let (_commit, v) = 39 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 40 - .await 41 - .unwrap(); 42 - let mut record_stream = std::pin::pin!(v.stream()); 28 + { 29 + Driver::Memory(_, mem_driver) => mem_driver, 30 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 31 + }; 43 32 44 - while let Some(_) = record_stream.try_next().await.unwrap() { 45 - // just here for the drive 33 + let mut n = 0; 34 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 35 + n += pairs.len(); 46 36 } 37 + n 47 38 } 48 39 49 40 criterion_group!(benches, criterion_benchmark);
+16 -22
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 6 4 use criterion::{Criterion, criterion_group, criterion_main}; 7 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 8 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 9 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 10 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 15 14 .build() 16 15 .expect("Creating runtime failed"); 17 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 18 20 c.bench_function("tiny-car", |b| { 19 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 20 22 }); ··· 26 28 }); 27 29 } 28 30 29 - async fn drive_car(bytes: &[u8]) { 30 - let reader = CarReader::new(bytes).await.unwrap(); 31 - 32 - let root = reader 33 - .header() 34 - .roots() 35 - .first() 36 - .ok_or("missing root") 31 + async fn drive_car(bytes: &[u8]) -> usize { 32 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 + .await 37 34 .unwrap() 38 - .clone(); 39 - 40 - let stream = std::pin::pin!(reader.stream()); 35 + { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not benching big cars here"), 38 + }; 41 39 42 - let (_commit, v) = 43 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 44 - .await 45 - .unwrap(); 46 - let mut record_stream = std::pin::pin!(v.stream()); 47 - 48 - while let Some(_) = record_stream.try_next().await.unwrap() { 49 - // just here for the drive 40 + let mut n = 0; 41 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 42 + n += pairs.len(); 50 43 } 44 + n 51 45 } 52 46 53 47 criterion_group!(benches, criterion_benchmark);
car-samples/empty.car

This is a binary file and will not be displayed.

+63 -27
examples/disk-read-file/main.rs
··· 1 + /*! 2 + Read a CAR file by spilling to disk 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 - use futures::TryStreamExt; 4 - use iroh_car::CarReader; 5 - use std::convert::Infallible; 7 + use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 6 8 use std::path::PathBuf; 7 - 8 - type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + use std::time::Instant; 9 10 10 11 #[derive(Debug, Parser)] 11 12 struct Args { ··· 16 17 } 17 18 18 19 #[tokio::main] 19 - async fn main() -> Result<()> { 20 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 20 21 env_logger::init(); 21 22 22 23 let Args { car, tmpfile } = Args::parse(); 24 + 25 + // repo-stream takes an AsyncRead as input. wrapping a filesystem read in 26 + // BufReader can provide a really significant performance win. 23 27 let reader = tokio::fs::File::open(car).await?; 24 28 let reader = tokio::io::BufReader::new(reader); 25 29 26 - println!("hello!"); 30 + log::info!("hello! reading the car..."); 31 + let t0 = Instant::now(); 27 32 28 - let reader = CarReader::new(reader).await?; 33 + // in this example we only bother handling CARs that are too big for memory 34 + // `noop` helper means: do no block processing, store the raw blocks 35 + let driver = match DriverBuilder::new() 36 + .with_mem_limit_mb(10) // how much memory can be used before disk spill 37 + .load_car(reader) 38 + .await? 39 + { 40 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 41 + Driver::Disk(big_stuff) => { 42 + // we reach here if the repo was too big and needs to be spilled to 43 + // disk to continue 29 44 30 - let redb_store = repo_stream::disk_redb::RedbStore::new(tmpfile)?; 45 + // set up a disk store we can spill to 46 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 31 47 32 - let root = reader 33 - .header() 34 - .roots() 35 - .first() 36 - .ok_or("missing root")? 37 - .clone(); 38 - log::debug!("root: {root:?}"); 48 + // do the spilling, get back a (similar) driver 49 + let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 39 50 40 - // let stream = Box::pin(reader.stream()); 41 - let stream = std::pin::pin!(reader.stream()); 51 + // at this point you might want to fetch the account's signing key 52 + // via the DID from the commit, and then verify the signature. 53 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 54 + 55 + // pop the driver back out to get some code indentation relief 56 + driver 57 + } 58 + }; 59 + 60 + // collect some random stats about the blocks 61 + let mut n = 0; 62 + let mut zeros = 0; 42 63 43 - let (commit, v) = repo_stream::disk_drive::Vehicle::init(root, stream, redb_store, |block| { 44 - Ok::<_, Infallible>(block.len()) 45 - }) 46 - .await?; 47 - let mut record_stream = std::pin::pin!(v.stream()); 64 + log::info!("walking..."); 65 + 66 + // this example uses the disk driver's channel mode: the tree walking is 67 + // spawned onto a blocking thread, and we get chunks of rkey+blocks back 68 + let (mut rx, join) = driver.to_channel(512); 69 + while let Some(r) = rx.recv().await { 70 + let pairs = r?; 48 71 49 - log::info!("got commit: {commit:?}"); 72 + // keep a count of the total number of blocks seen 73 + n += pairs.len(); 50 74 51 - while let Some((rkey, _rec)) = record_stream.try_next().await? { 52 - log::info!("got {rkey:?}"); 75 + for (_, block) in pairs { 76 + // for each block, count how many bytes are equal to '0' 77 + // (this is just an example, you probably want to do something more 78 + // interesting) 79 + zeros += block.into_iter().filter(|&b| b == b'0').count() 80 + } 53 81 } 54 - log::info!("bye!"); 82 + 83 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 + 85 + // clean up the database. would be nice to do this in drop so it happens 86 + // automatically, but some blocking work happens, so that's not allowed in 87 + // async rust. ๐Ÿคทโ€โ™€๏ธ 88 + join.await?.reset_store().await?; 89 + 90 + log::info!("done. n={n} zeros={zeros}"); 55 91 56 92 Ok(()) 57 93 }
+18 -25
examples/read-file/main.rs
··· 1 + /*! 2 + Read a CAR file with in-memory processing 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 - use futures::TryStreamExt; 4 - use iroh_car::CarReader; 5 - use std::convert::Infallible; 7 + use repo_stream::{Driver, DriverBuilder}; 6 8 use std::path::PathBuf; 7 9 8 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 21 23 let reader = tokio::fs::File::open(file).await?; 22 24 let reader = tokio::io::BufReader::new(reader); 23 25 24 - println!("hello!"); 25 - 26 - let reader = CarReader::new(reader).await?; 27 - 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root")? 33 - .clone(); 34 - log::debug!("root: {root:?}"); 35 - 36 - // let stream = Box::pin(reader.stream()); 37 - let stream = std::pin::pin!(reader.stream()); 38 - 39 - let (commit, v) = 40 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 41 - .await?; 42 - let mut record_stream = std::pin::pin!(v.stream()); 26 + let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len()) 28 + .load_car(reader) 29 + .await? 30 + { 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 + }; 43 34 44 35 log::info!("got commit: {commit:?}"); 45 36 46 - while let Some((rkey, _rec)) = record_stream.try_next().await? { 47 - log::info!("got {rkey:?}"); 37 + let mut n = 0; 38 + while let Some(pairs) = driver.next_chunk(256).await? { 39 + n += pairs.len(); 40 + // log::info!("got {rkey:?}"); 48 41 } 49 - log::info!("bye!"); 42 + log::info!("bye! total records={n}"); 50 43 51 44 Ok(()) 52 45 }
+70 -2
readme.md
··· 1 1 # repo-stream 2 2 3 - Fast and (aspirationally) robust atproto CAR file processing in rust 3 + A robust CAR file -> MST walker for atproto 4 + 5 + [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 + [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 8 + 9 + [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 10 + [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 + 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 15 + 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 + 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + 54 + // clean up the disk store (drop tables etc) 55 + driver.reset_store().await?; 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + Ok(()) 60 + } 61 + ``` 62 + 63 + more recent todo 64 + 65 + - [ ] get an *emtpy* car for the test suite 66 + - [x] implement a max size on disk limit 67 + 68 + 69 + ----- 70 + 71 + older stuff (to clean up): 4 72 5 73 6 74 current car processing times (records processed into their length usize, phil's dev machine): ··· 27 95 -> yeah the commit is returned from init 28 96 - [ ] spec compliance todos 29 97 - [x] assert that keys are ordered and fail if not 30 - - [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 98 + - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 31 99 - [ ] performance todos 32 100 - [x] consume the serialized nodes into a mutable efficient format 33 101 - [ ] maybe customize the deserialize impl to do that directly?
+221
src/disk.rs
··· 1 + /*! 2 + Disk storage for blocks on disk 3 + 4 + Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5 + to be the best behaved in terms of both on-disk space usage and memory usage. 6 + 7 + ```no_run 8 + # use repo_stream::{DiskBuilder, DiskError}; 9 + # #[tokio::main] 10 + # async fn main() -> Result<(), DiskError> { 11 + let store = DiskBuilder::new() 12 + .with_cache_size_mb(32) 13 + .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 + .open("/some/path.db".into()).await?; 15 + # Ok(()) 16 + # } 17 + ``` 18 + */ 19 + 20 + use crate::drive::DriveError; 21 + use rusqlite::OptionalExtension; 22 + use std::path::PathBuf; 23 + 24 + #[derive(Debug, thiserror::Error)] 25 + pub enum DiskError { 26 + /// A wrapped database error 27 + /// 28 + /// (The wrapped err should probably be obscured to remove public-facing 29 + /// sqlite bits) 30 + #[error(transparent)] 31 + DbError(#[from] rusqlite::Error), 32 + /// A tokio blocking task failed to join 33 + #[error("Failed to join a tokio blocking task: {0}")] 34 + JoinError(#[from] tokio::task::JoinError), 35 + /// The total size of stored blocks exceeded the allowed size 36 + /// 37 + /// If you need to process *really* big CARs, you can configure a higher 38 + /// limit. 39 + #[error("Maximum disk size reached")] 40 + MaxSizeExceeded, 41 + #[error("this error was replaced, seeing this is a bug.")] 42 + #[doc(hidden)] 43 + Stolen, 44 + } 45 + 46 + impl DiskError { 47 + /// hack for ownership challenges with the disk driver 48 + pub(crate) fn steal(&mut self) -> Self { 49 + let mut swapped = DiskError::Stolen; 50 + std::mem::swap(self, &mut swapped); 51 + swapped 52 + } 53 + } 54 + 55 + /// Builder-style disk store setup 56 + #[derive(Debug, Clone)] 57 + pub struct DiskBuilder { 58 + /// Database in-memory cache allowance 59 + /// 60 + /// Default: 32 MiB 61 + pub cache_size_mb: usize, 62 + /// Database stored block size limit 63 + /// 64 + /// Default: 10 GiB 65 + /// 66 + /// Note: actual size on disk may be more, but should approximately scale 67 + /// with this limit 68 + pub max_stored_mb: usize, 69 + } 70 + 71 + impl Default for DiskBuilder { 72 + fn default() -> Self { 73 + Self { 74 + cache_size_mb: 32, 75 + max_stored_mb: 10 * 1024, // 10 GiB 76 + } 77 + } 78 + } 79 + 80 + impl DiskBuilder { 81 + /// Begin configuring the storage with defaults 82 + pub fn new() -> Self { 83 + Default::default() 84 + } 85 + /// Set the in-memory cache allowance for the database 86 + /// 87 + /// Default: 32 MiB 88 + pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 + self.cache_size_mb = size; 90 + self 91 + } 92 + /// Set the approximate stored block size limit 93 + /// 94 + /// Default: 10 GiB 95 + pub fn with_max_stored_mb(mut self, max: usize) -> Self { 96 + self.max_stored_mb = max; 97 + self 98 + } 99 + /// Open and initialize the actual disk storage 100 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 101 + DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 102 + } 103 + } 104 + 105 + /// On-disk block storage 106 + pub struct DiskStore { 107 + conn: rusqlite::Connection, 108 + max_stored: usize, 109 + stored: usize, 110 + } 111 + 112 + impl DiskStore { 113 + /// Initialize a new disk store 114 + pub async fn new( 115 + path: PathBuf, 116 + cache_mb: usize, 117 + max_stored_mb: usize, 118 + ) -> Result<Self, DiskError> { 119 + let max_stored = max_stored_mb * 2_usize.pow(20); 120 + let conn = tokio::task::spawn_blocking(move || { 121 + let conn = rusqlite::Connection::open(path)?; 122 + 123 + let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 + 125 + // conn.pragma_update(None, "journal_mode", "OFF")?; 126 + // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 + conn.pragma_update(None, "journal_mode", "WAL")?; 128 + // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 + conn.pragma_update(None, "synchronous", "OFF")?; 130 + conn.pragma_update( 131 + None, 132 + "cache_size", 133 + (cache_mb as i64 * sqlite_one_mb).to_string(), 134 + )?; 135 + Self::reset_tables(&conn)?; 136 + 137 + Ok::<_, DiskError>(conn) 138 + }) 139 + .await??; 140 + 141 + Ok(Self { 142 + conn, 143 + max_stored, 144 + stored: 0, 145 + }) 146 + } 147 + pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 + let tx = self.conn.transaction()?; 149 + Ok(SqliteWriter { 150 + tx, 151 + stored: &mut self.stored, 152 + max: self.max_stored, 153 + }) 154 + } 155 + pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 + let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 + Ok(SqliteReader { select_stmt }) 158 + } 159 + /// Drop and recreate the kv table 160 + pub async fn reset(self) -> Result<Self, DiskError> { 161 + tokio::task::spawn_blocking(move || { 162 + Self::reset_tables(&self.conn)?; 163 + Ok(self) 164 + }) 165 + .await? 166 + } 167 + fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 + conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 + conn.execute( 170 + "CREATE TABLE blocks ( 171 + key BLOB PRIMARY KEY NOT NULL, 172 + val BLOB NOT NULL 173 + ) WITHOUT ROWID", 174 + (), 175 + )?; 176 + Ok(()) 177 + } 178 + } 179 + 180 + pub(crate) struct SqliteWriter<'conn> { 181 + tx: rusqlite::Transaction<'conn>, 182 + stored: &'conn mut usize, 183 + max: usize, 184 + } 185 + 186 + impl SqliteWriter<'_> { 187 + pub(crate) fn put_many( 188 + &mut self, 189 + kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 + ) -> Result<(), DriveError> { 191 + let mut insert_stmt = self 192 + .tx 193 + .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 + .map_err(DiskError::DbError)?; 195 + for pair in kv { 196 + let (k, v) = pair?; 197 + *self.stored += v.len(); 198 + if *self.stored > self.max { 199 + return Err(DiskError::MaxSizeExceeded.into()); 200 + } 201 + insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 202 + } 203 + Ok(()) 204 + } 205 + pub fn commit(self) -> Result<(), DiskError> { 206 + self.tx.commit()?; 207 + Ok(()) 208 + } 209 + } 210 + 211 + pub(crate) struct SqliteReader<'conn> { 212 + select_stmt: rusqlite::Statement<'conn>, 213 + } 214 + 215 + impl SqliteReader<'_> { 216 + pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 + self.select_stmt 218 + .query_one((&key,), |row| row.get(0)) 219 + .optional() 220 + } 221 + }
-175
src/disk_drive.rs
··· 1 - use futures::Stream; 2 - use futures::TryStreamExt; 3 - use std::error::Error; 4 - 5 - use crate::disk_walk::{Step, Trip, Walker}; 6 - use crate::mst::Commit; 7 - use crate::mst::Node; 8 - 9 - use ipld_core::cid::Cid; 10 - use serde::{Deserialize, Serialize, de::DeserializeOwned}; 11 - 12 - /// Errors that can happen while consuming and emitting blocks and records 13 - #[derive(Debug, thiserror::Error)] 14 - pub enum DriveError { 15 - #[error("Failed to initialize CarReader: {0}")] 16 - CarReader(#[from] iroh_car::Error), 17 - #[error("Car block stream error: {0}")] 18 - CarBlockError(Box<dyn Error>), 19 - #[error("Failed to decode commit block: {0}")] 20 - BadCommit(Box<dyn Error>), 21 - #[error("The Commit block reference by the root was not found")] 22 - MissingCommit, 23 - #[error("The MST block {0} could not be found")] 24 - MissingBlock(Cid), 25 - #[error("Failed to walk the mst tree: {0}")] 26 - Tripped(#[from] Trip), 27 - } 28 - 29 - #[derive(Debug, Clone, Serialize, Deserialize)] 30 - pub enum MaybeProcessedBlock<T: Clone + Serialize> { 31 - /// A block that's *probably* a Node (but we can't know yet) 32 - /// 33 - /// It *can be* a record that suspiciously looks a lot like a node, so we 34 - /// cannot eagerly turn it into a Node. We only know for sure what it is 35 - /// when we actually walk down the MST 36 - Raw(Vec<u8>), 37 - /// A processed record from a block that was definitely not a Node 38 - /// 39 - /// If we _never_ needed this block, then we may have wasted a bit of effort 40 - /// trying to process it. Oh well. 41 - /// 42 - /// Processing has to be fallible because the CAR can have totally-unused 43 - /// blocks, which can just be garbage. since we're eagerly trying to process 44 - /// record blocks without knowing for sure that they *are* records, we 45 - /// discard any definitely-not-nodes that fail processing and keep their 46 - /// error in the buffer for them. if we later try to retreive them as a 47 - /// record, then we can surface the error. 48 - /// 49 - /// The error type is `String` because we don't really want to put 50 - /// any constraints like `Serialize` on the error type, and `Error` 51 - /// at least requires `Display`. It's a compromise. 52 - ProcessedOk(T), 53 - Unprocessable(String), 54 - } 55 - 56 - pub trait BlockStore<MPB: Serialize + DeserializeOwned> { 57 - fn put(&self, key: Cid, value: MPB); // unwraps for now 58 - fn get(&self, key: Cid) -> Option<MPB>; 59 - } 60 - 61 - type CarBlock<E> = Result<(Cid, Vec<u8>), E>; 62 - 63 - /// The core driver between the block stream and MST walker 64 - pub struct Vehicle<SE, S, T, BS, P, PE> 65 - where 66 - SE: Error + 'static, 67 - S: Stream<Item = CarBlock<SE>>, 68 - T: Clone + Serialize + DeserializeOwned, 69 - BS: BlockStore<MaybeProcessedBlock<T>>, 70 - P: Fn(&[u8]) -> Result<T, PE>, 71 - PE: Error, 72 - { 73 - #[allow(dead_code)] 74 - block_stream: S, 75 - block_store: BS, 76 - walker: Walker, 77 - process: P, 78 - } 79 - 80 - impl<SE, S, T, BS, P, PE> Vehicle<SE, S, T, BS, P, PE> 81 - where 82 - SE: Error + 'static, 83 - S: Stream<Item = CarBlock<SE>> + Unpin, 84 - T: Clone + Serialize + DeserializeOwned, 85 - BS: BlockStore<MaybeProcessedBlock<T>>, 86 - P: Fn(&[u8]) -> Result<T, PE>, 87 - PE: Error, 88 - { 89 - /// Set up the stream 90 - /// 91 - /// This will eagerly consume blocks until the `Commit` object is found. 92 - /// *Usually* the it's the first block, but there is no guarantee. 93 - /// 94 - /// ### Parameters 95 - /// 96 - /// `root`: CID of the commit object that is the root of the MST 97 - /// 98 - /// `block_stream`: Input stream of raw CAR blocks 99 - /// 100 - /// `process`: record-transforming callback: 101 - /// 102 - /// For tasks where records can be quickly processed into a *smaller* 103 - /// useful representation, you can do that eagerly as blocks come in by 104 - /// passing the processor as a callback here. This can reduce overall 105 - /// memory usage. 106 - pub async fn init( 107 - root: Cid, 108 - mut block_stream: S, 109 - block_store: BS, 110 - process: P, 111 - ) -> Result<(Commit, Self), DriveError> { 112 - let mut commit = None; 113 - 114 - log::warn!("init: load blocks"); 115 - 116 - // go ahead and put all blocks in the block store 117 - while let Some((cid, data)) = block_stream 118 - .try_next() 119 - .await 120 - .map_err(|e| DriveError::CarBlockError(e.into()))? 121 - { 122 - if cid == root { 123 - let c: Commit = serde_ipld_dagcbor::from_slice(&data) 124 - .map_err(|e| DriveError::BadCommit(e.into()))?; 125 - commit = Some(c); 126 - } else { 127 - block_store.put( 128 - cid, 129 - if Node::could_be(&data) { 130 - MaybeProcessedBlock::Raw(data) 131 - } else { 132 - match process(&data) { 133 - Ok(t) => MaybeProcessedBlock::ProcessedOk(t), 134 - Err(e) => MaybeProcessedBlock::Unprocessable(e.to_string()), 135 - } 136 - }, 137 - ); 138 - } 139 - } 140 - 141 - log::warn!("init: got commit?"); 142 - 143 - // we either broke out or read all the blocks without finding the commit... 144 - let commit = commit.ok_or(DriveError::MissingCommit)?; 145 - 146 - let walker = Walker::new(commit.data); 147 - 148 - log::warn!("init: wrapping up"); 149 - 150 - let me = Self { 151 - block_stream, 152 - block_store, 153 - walker, 154 - process, 155 - }; 156 - Ok((commit, me)) 157 - } 158 - 159 - /// Manually step through the record outputs 160 - pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> { 161 - match self.walker.step(&mut self.block_store, &self.process)? { 162 - Step::Rest(cid) => Err(DriveError::MissingBlock(cid)), 163 - Step::Finish => Ok(None), 164 - Step::Step { rkey, data } => Ok(Some((rkey, data))), 165 - } 166 - } 167 - 168 - /// Convert to a futures::stream of record outputs 169 - pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError>> { 170 - futures::stream::try_unfold(self, |mut this| async move { 171 - let maybe_record = this.next_record().await?; 172 - Ok(maybe_record.map(|b| (b, this))) 173 - }) 174 - } 175 - }
-53
src/disk_redb.rs
··· 1 - use crate::disk_drive::BlockStore; 2 - use ipld_core::cid::Cid; 3 - use redb::{Database, Error, ReadableTable, TableDefinition, WriteTransaction}; 4 - use serde::{Serialize, de::DeserializeOwned}; 5 - use std::path::Path; 6 - 7 - const TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blocks"); 8 - 9 - pub struct RedbStore { 10 - #[allow(dead_code)] 11 - db: Database, 12 - tx: Option<WriteTransaction>, 13 - } 14 - 15 - impl RedbStore { 16 - pub fn new(path: impl AsRef<Path>) -> Result<Self, Error> { 17 - log::warn!("redb new"); 18 - let db = Database::create(path)?; 19 - log::warn!("db created"); 20 - let mut tx = db.begin_write()?; 21 - tx.set_durability(redb::Durability::None).unwrap(); 22 - log::warn!("transaction begun"); 23 - Ok(Self { db, tx: Some(tx) }) 24 - } 25 - } 26 - 27 - impl Drop for RedbStore { 28 - fn drop(&mut self) { 29 - let tx = self.tx.take(); 30 - tx.unwrap().abort().unwrap(); 31 - } 32 - } 33 - 34 - impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for RedbStore { 35 - fn put(&self, c: Cid, t: MPB) { 36 - let key_bytes = c.to_bytes(); 37 - let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 38 - { 39 - let mut table = self.tx.as_ref().unwrap().open_table(TABLE).unwrap(); 40 - table.insert(&*key_bytes, &*val_bytes).unwrap(); 41 - } 42 - } 43 - fn get(&self, c: Cid) -> Option<MPB> { 44 - let key_bytes = c.to_bytes(); 45 - let table = self.tx.as_ref().unwrap().open_table(TABLE).unwrap(); 46 - let maybe_val_bytes = table.get(&*key_bytes).unwrap()?; 47 - let (t, n): (MPB, usize) = 48 - bincode::serde::decode_from_slice(maybe_val_bytes.value(), bincode::config::standard()) 49 - .unwrap(); 50 - assert_eq!(maybe_val_bytes.value().len(), n); 51 - Some(t) 52 - } 53 - }
-65
src/disk_sqlite.rs
··· 1 - use crate::disk_drive::BlockStore; 2 - use ipld_core::cid::Cid; 3 - use rusqlite::{Connection, OptionalExtension, Result}; 4 - use serde::{Serialize, de::DeserializeOwned}; 5 - use std::path::Path; 6 - 7 - pub struct SqliteStore { 8 - conn: Connection, 9 - } 10 - 11 - impl SqliteStore { 12 - pub fn new(path: impl AsRef<Path>) -> Result<Self> { 13 - let conn = Connection::open(path)?; 14 - conn.pragma_update(None, "journal_mode", "WAL")?; 15 - conn.pragma_update(None, "synchronous", "OFF")?; 16 - conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?; 17 - conn.execute( 18 - "CREATE TABLE blocks ( 19 - key BLOB PRIMARY KEY NOT NULL, 20 - val BLOB NOT NULL 21 - ) WITHOUT ROWID", 22 - (), 23 - )?; 24 - 25 - Ok(Self { conn }) 26 - } 27 - } 28 - 29 - impl Drop for SqliteStore { 30 - fn drop(&mut self) { 31 - self.conn.execute("DROP TABLE blocks", ()).unwrap(); 32 - } 33 - } 34 - 35 - impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for SqliteStore { 36 - fn put(&self, c: Cid, t: MPB) { 37 - let key_bytes = c.to_bytes(); 38 - let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap(); 39 - 40 - self.conn 41 - .execute( 42 - "INSERT INTO blocks (key, val) VALUES (?1, ?2)", 43 - (&key_bytes, &val_bytes), 44 - ) 45 - .unwrap(); 46 - } 47 - fn get(&self, c: Cid) -> Option<MPB> { 48 - let key_bytes = c.to_bytes(); 49 - 50 - let val_bytes: Vec<u8> = self 51 - .conn 52 - .query_one( 53 - "SELECT val FROM blocks WHERE key = ?1", 54 - (&key_bytes,), 55 - |row| row.get(0), 56 - ) 57 - .optional() 58 - .unwrap()?; 59 - 60 - let (t, n): (MPB, usize) = 61 - bincode::serde::decode_from_slice(&val_bytes, bincode::config::standard()).unwrap(); 62 - assert_eq!(val_bytes.len(), n); 63 - Some(t) 64 - } 65 - }
-403
src/disk_walk.rs
··· 1 - //! Depth-first MST traversal 2 - 3 - use crate::disk_drive::{BlockStore, MaybeProcessedBlock}; 4 - use crate::mst::Node; 5 - 6 - use ipld_core::cid::Cid; 7 - use serde::{Serialize, de::DeserializeOwned}; 8 - use std::error::Error; 9 - 10 - /// Errors that can happen while walking 11 - #[derive(Debug, thiserror::Error)] 12 - pub enum Trip { 13 - #[error("empty mst nodes are not allowed")] 14 - NodeEmpty, 15 - #[error("Failed to decode commit block: {0}")] 16 - BadCommit(Box<dyn std::error::Error>), 17 - #[error("Action node error: {0}")] 18 - RkeyError(#[from] RkeyError), 19 - #[error("Process failed: {0}")] 20 - ProcessFailed(String), 21 - #[error("Encountered an rkey out of order while walking the MST")] 22 - RkeyOutOfOrder, 23 - } 24 - 25 - /// Errors from invalid Rkeys 26 - #[derive(Debug, thiserror::Error)] 27 - pub enum RkeyError { 28 - #[error("Failed to compute an rkey due to invalid prefix_len")] 29 - EntryPrefixOutOfbounds, 30 - #[error("RKey was not utf-8")] 31 - EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 32 - } 33 - 34 - /// Walker outputs 35 - #[derive(Debug)] 36 - pub enum Step<T: Serialize + DeserializeOwned> { 37 - /// We need a CID but it's not in the block store 38 - /// 39 - /// Give the needed CID to the driver so it can load blocks until it's found 40 - Rest(Cid), 41 - /// Reached the end of the MST! yay! 42 - Finish, 43 - /// A record was found! 44 - Step { rkey: String, data: T }, 45 - } 46 - 47 - #[derive(Debug, Clone, PartialEq)] 48 - enum Need { 49 - Node(Cid), 50 - Record { rkey: String, cid: Cid }, 51 - } 52 - 53 - fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> { 54 - let mut entries = Vec::with_capacity(node.entries.len()); 55 - 56 - let mut prefix = vec![]; 57 - for entry in &node.entries { 58 - let mut rkey = vec![]; 59 - let pre_checked = prefix 60 - .get(..entry.prefix_len) 61 - .ok_or(RkeyError::EntryPrefixOutOfbounds)?; 62 - rkey.extend_from_slice(pre_checked); 63 - rkey.extend_from_slice(&entry.keysuffix); 64 - prefix = rkey.clone(); 65 - 66 - entries.push(Need::Record { 67 - rkey: String::from_utf8(rkey)?, 68 - cid: entry.value, 69 - }); 70 - if let Some(ref tree) = entry.tree { 71 - entries.push(Need::Node(*tree)); 72 - } 73 - } 74 - 75 - entries.reverse(); 76 - stack.append(&mut entries); 77 - 78 - if let Some(tree) = node.left { 79 - stack.push(Need::Node(tree)); 80 - } 81 - Ok(()) 82 - } 83 - 84 - /// Traverser of an atproto MST 85 - /// 86 - /// Walks the tree from left-to-right in depth-first order 87 - #[derive(Debug)] 88 - pub struct Walker { 89 - stack: Vec<Need>, 90 - prev: String, 91 - } 92 - 93 - impl Walker { 94 - pub fn new(tree_root_cid: Cid) -> Self { 95 - Self { 96 - stack: vec![Need::Node(tree_root_cid)], 97 - prev: "".to_string(), 98 - } 99 - } 100 - 101 - /// Advance through nodes until we find a record or can't go further 102 - pub fn step<T: Clone + Serialize + DeserializeOwned, E: Error>( 103 - &mut self, 104 - block_store: &mut impl BlockStore<MaybeProcessedBlock<T>>, 105 - process: impl Fn(&[u8]) -> Result<T, E>, 106 - ) -> Result<Step<T>, Trip> { 107 - loop { 108 - let Some(mut need) = self.stack.last() else { 109 - log::trace!("tried to walk but we're actually done."); 110 - return Ok(Step::Finish); 111 - }; 112 - 113 - match &mut need { 114 - Need::Node(cid) => { 115 - log::trace!("need node {cid:?}"); 116 - let Some(mpb) = block_store.get(*cid) else { 117 - log::trace!("node not found, resting"); 118 - return Ok(Step::Rest(*cid)); 119 - }; 120 - 121 - let MaybeProcessedBlock::<T>::Raw(block) = mpb else { 122 - return Err(Trip::BadCommit("failed commit fingerprint".into())); 123 - }; 124 - let node = serde_ipld_dagcbor::from_slice::<Node>(&block) 125 - .map_err(|e| Trip::BadCommit(e.into()))?; 126 - 127 - // found node, make sure we remember 128 - self.stack.pop(); 129 - 130 - // queue up work on the found node next 131 - push_from_node(&mut self.stack, &node)?; 132 - } 133 - Need::Record { rkey, cid } => { 134 - log::trace!("need record {cid:?}"); 135 - let Some(mpb) = block_store.get(*cid) else { 136 - log::trace!("record block not found, resting"); 137 - return Ok(Step::Rest(*cid)); 138 - }; 139 - let rkey = rkey.clone(); 140 - let data = match mpb { 141 - MaybeProcessedBlock::Raw(data) => match process(&data) { 142 - Ok(t) => Ok(t), 143 - Err(e) => Err(Trip::ProcessFailed(e.to_string())), 144 - }, 145 - MaybeProcessedBlock::ProcessedOk(t) => Ok(t.clone()), 146 - MaybeProcessedBlock::Unprocessable(s) => { 147 - return Err(Trip::ProcessFailed(s.clone())); 148 - } 149 - }; 150 - 151 - // found node, make sure we remember 152 - self.stack.pop(); 153 - 154 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 155 - 156 - let data = data.map_err(|e| Trip::ProcessFailed(e.to_string()))?; 157 - 158 - // rkeys *must* be in order or else the tree is invalid (or 159 - // we have a bug) 160 - if rkey <= self.prev { 161 - return Err(Trip::RkeyOutOfOrder); 162 - } 163 - self.prev = rkey.clone(); 164 - 165 - return Ok(Step::Step { rkey, data }); 166 - } 167 - } 168 - } 169 - } 170 - } 171 - 172 - #[cfg(test)] 173 - mod test { 174 - use super::*; 175 - // use crate::mst::Entry; 176 - 177 - fn cid1() -> Cid { 178 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 179 - .parse() 180 - .unwrap() 181 - } 182 - // fn cid2() -> Cid { 183 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 184 - // .parse() 185 - // .unwrap() 186 - // } 187 - // fn cid3() -> Cid { 188 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 189 - // .parse() 190 - // .unwrap() 191 - // } 192 - // fn cid4() -> Cid { 193 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 194 - // .parse() 195 - // .unwrap() 196 - // } 197 - // fn cid5() -> Cid { 198 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 199 - // .parse() 200 - // .unwrap() 201 - // } 202 - // fn cid6() -> Cid { 203 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 204 - // .parse() 205 - // .unwrap() 206 - // } 207 - // fn cid7() -> Cid { 208 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 209 - // .parse() 210 - // .unwrap() 211 - // } 212 - // fn cid8() -> Cid { 213 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 214 - // .parse() 215 - // .unwrap() 216 - // } 217 - // fn cid9() -> Cid { 218 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 219 - // .parse() 220 - // .unwrap() 221 - // } 222 - 223 - #[test] 224 - fn test_next_from_node_empty() { 225 - let node = Node { 226 - left: None, 227 - entries: vec![], 228 - }; 229 - let mut stack = vec![]; 230 - push_from_node(&mut stack, &node).unwrap(); 231 - assert_eq!(stack.last(), None); 232 - } 233 - 234 - #[test] 235 - fn test_needs_from_node_just_left() { 236 - let node = Node { 237 - left: Some(cid1()), 238 - entries: vec![], 239 - }; 240 - let mut stack = vec![]; 241 - push_from_node(&mut stack, &node).unwrap(); 242 - assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref()); 243 - } 244 - 245 - // #[test] 246 - // fn test_needs_from_node_just_one_record() { 247 - // let node = Node { 248 - // left: None, 249 - // entries: vec![Entry { 250 - // keysuffix: "asdf".into(), 251 - // prefix_len: 0, 252 - // value: cid1(), 253 - // tree: None, 254 - // }], 255 - // }; 256 - // assert_eq!( 257 - // needs_from_node(node).unwrap(), 258 - // vec![Need::Record { 259 - // rkey: "asdf".into(), 260 - // cid: cid1(), 261 - // },] 262 - // ); 263 - // } 264 - 265 - // #[test] 266 - // fn test_needs_from_node_two_records() { 267 - // let node = Node { 268 - // left: None, 269 - // entries: vec![ 270 - // Entry { 271 - // keysuffix: "asdf".into(), 272 - // prefix_len: 0, 273 - // value: cid1(), 274 - // tree: None, 275 - // }, 276 - // Entry { 277 - // keysuffix: "gh".into(), 278 - // prefix_len: 2, 279 - // value: cid2(), 280 - // tree: None, 281 - // }, 282 - // ], 283 - // }; 284 - // assert_eq!( 285 - // needs_from_node(node).unwrap(), 286 - // vec![ 287 - // Need::Record { 288 - // rkey: "asdf".into(), 289 - // cid: cid1(), 290 - // }, 291 - // Need::Record { 292 - // rkey: "asgh".into(), 293 - // cid: cid2(), 294 - // }, 295 - // ] 296 - // ); 297 - // } 298 - 299 - // #[test] 300 - // fn test_needs_from_node_with_both() { 301 - // let node = Node { 302 - // left: None, 303 - // entries: vec![Entry { 304 - // keysuffix: "asdf".into(), 305 - // prefix_len: 0, 306 - // value: cid1(), 307 - // tree: Some(cid2()), 308 - // }], 309 - // }; 310 - // assert_eq!( 311 - // needs_from_node(node).unwrap(), 312 - // vec![ 313 - // Need::Record { 314 - // rkey: "asdf".into(), 315 - // cid: cid1(), 316 - // }, 317 - // Need::Node(cid2()), 318 - // ] 319 - // ); 320 - // } 321 - 322 - // #[test] 323 - // fn test_needs_from_node_left_and_record() { 324 - // let node = Node { 325 - // left: Some(cid1()), 326 - // entries: vec![Entry { 327 - // keysuffix: "asdf".into(), 328 - // prefix_len: 0, 329 - // value: cid2(), 330 - // tree: None, 331 - // }], 332 - // }; 333 - // assert_eq!( 334 - // needs_from_node(node).unwrap(), 335 - // vec![ 336 - // Need::Node(cid1()), 337 - // Need::Record { 338 - // rkey: "asdf".into(), 339 - // cid: cid2(), 340 - // }, 341 - // ] 342 - // ); 343 - // } 344 - 345 - // #[test] 346 - // fn test_needs_from_full_node() { 347 - // let node = Node { 348 - // left: Some(cid1()), 349 - // entries: vec![ 350 - // Entry { 351 - // keysuffix: "asdf".into(), 352 - // prefix_len: 0, 353 - // value: cid2(), 354 - // tree: Some(cid3()), 355 - // }, 356 - // Entry { 357 - // keysuffix: "ghi".into(), 358 - // prefix_len: 1, 359 - // value: cid4(), 360 - // tree: Some(cid5()), 361 - // }, 362 - // Entry { 363 - // keysuffix: "jkl".into(), 364 - // prefix_len: 2, 365 - // value: cid6(), 366 - // tree: Some(cid7()), 367 - // }, 368 - // Entry { 369 - // keysuffix: "mno".into(), 370 - // prefix_len: 4, 371 - // value: cid8(), 372 - // tree: Some(cid9()), 373 - // }, 374 - // ], 375 - // }; 376 - // assert_eq!( 377 - // needs_from_node(node).unwrap(), 378 - // vec![ 379 - // Need::Node(cid1()), 380 - // Need::Record { 381 - // rkey: "asdf".into(), 382 - // cid: cid2(), 383 - // }, 384 - // Need::Node(cid3()), 385 - // Need::Record { 386 - // rkey: "aghi".into(), 387 - // cid: cid4(), 388 - // }, 389 - // Need::Node(cid5()), 390 - // Need::Record { 391 - // rkey: "agjkl".into(), 392 - // cid: cid6(), 393 - // }, 394 - // Need::Node(cid7()), 395 - // Need::Record { 396 - // rkey: "agjkmno".into(), 397 - // cid: cid8(), 398 - // }, 399 - // Need::Node(cid9()), 400 - // ] 401 - // ); 402 - // } 403 - }
+556 -109
src/drive.rs
··· 1 - //! Consume an MST block stream, producing an ordered stream of records 1 + //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use futures::{Stream, TryStreamExt}; 3 + use crate::disk::{DiskError, DiskStore}; 4 + use crate::process::Processable; 4 5 use ipld_core::cid::Cid; 6 + use iroh_car::CarReader; 7 + use serde::{Deserialize, Serialize}; 5 8 use std::collections::HashMap; 6 - use std::error::Error; 9 + use std::convert::Infallible; 10 + use tokio::{io::AsyncRead, sync::mpsc}; 7 11 8 12 use crate::mst::{Commit, Node}; 9 - use crate::walk::{Step, Trip, Walker}; 13 + use crate::walk::{Step, WalkError, Walker}; 10 14 11 15 /// Errors that can happen while consuming and emitting blocks and records 12 16 #[derive(Debug, thiserror::Error)] 13 - pub enum DriveError<E: Error> { 14 - #[error("Failed to initialize CarReader: {0}")] 17 + pub enum DriveError { 18 + #[error("Error from iroh_car: {0}")] 15 19 CarReader(#[from] iroh_car::Error), 16 - #[error("Car block stream error: {0}")] 17 - CarBlockError(Box<dyn Error>), 18 20 #[error("Failed to decode commit block: {0}")] 19 - BadCommit(Box<dyn Error>), 21 + BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 20 22 #[error("The Commit block reference by the root was not found")] 21 23 MissingCommit, 22 24 #[error("The MST block {0} could not be found")] 23 25 MissingBlock(Cid), 24 26 #[error("Failed to walk the mst tree: {0}")] 25 - Tripped(#[from] Trip<E>), 27 + WalkError(#[from] WalkError), 28 + #[error("CAR file had no roots")] 29 + MissingRoot, 30 + #[error("Storage error")] 31 + StorageError(#[from] DiskError), 32 + #[error("Encode error: {0}")] 33 + BincodeEncodeError(#[from] bincode::error::EncodeError), 34 + #[error("Tried to send on a closed channel")] 35 + ChannelSendError, // SendError takes <T> which we don't need 36 + #[error("Failed to join a task: {0}")] 37 + JoinError(#[from] tokio::task::JoinError), 26 38 } 27 39 28 - type CarBlock<E> = Result<(Cid, Vec<u8>), E>; 40 + #[derive(Debug, thiserror::Error)] 41 + pub enum DecodeError { 42 + #[error(transparent)] 43 + BincodeDecodeError(#[from] bincode::error::DecodeError), 44 + #[error("extra bytes remained after decoding")] 45 + ExtraGarbage, 46 + } 29 47 30 - #[derive(Debug)] 31 - pub enum MaybeProcessedBlock<T, E> { 48 + /// An in-order chunk of Rkey + (processed) Block pairs 49 + pub type BlockChunk<T> = Vec<(String, T)>; 50 + 51 + #[derive(Debug, Clone, Serialize, Deserialize)] 52 + pub(crate) enum MaybeProcessedBlock<T> { 32 53 /// A block that's *probably* a Node (but we can't know yet) 33 54 /// 34 55 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 50 71 /// There's an alternative here, which would be to kick unprocessable blocks 51 72 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 52 73 /// surface the typed error later if needed by trying to reprocess. 53 - Processed(Result<T, E>), 74 + Processed(T), 54 75 } 55 76 56 - /// The core driver between the block stream and MST walker 57 - pub struct Vehicle<SE, S, T, P, PE> 58 - where 59 - S: Stream<Item = CarBlock<SE>>, 60 - P: Fn(&[u8]) -> Result<T, PE>, 61 - PE: Error, 62 - { 63 - block_stream: S, 64 - blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>, 65 - walker: Walker, 66 - process: P, 77 + impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 + /// TODO this is probably a little broken 79 + fn get_size(&self) -> usize { 80 + use std::{cmp::max, mem::size_of}; 81 + 82 + // enum is always as big as its biggest member? 83 + let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 + 85 + let extra = match self { 86 + Self::Raw(bytes) => bytes.len(), 87 + Self::Processed(t) => t.get_size(), 88 + }; 89 + 90 + base_size + extra 91 + } 92 + } 93 + 94 + impl<T> MaybeProcessedBlock<T> { 95 + fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 + if Node::could_be(&data) { 97 + MaybeProcessedBlock::Raw(data) 98 + } else { 99 + MaybeProcessedBlock::Processed(process(data)) 100 + } 101 + } 67 102 } 68 103 69 - impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE> 70 - where 71 - SE: Error + 'static, 72 - S: Stream<Item = CarBlock<SE>> + Unpin, 73 - P: Fn(&[u8]) -> Result<T, PE>, 74 - PE: Error, 75 - { 76 - /// Set up the stream 104 + /// Read a CAR file, buffering blocks in memory or to disk 105 + pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 + /// All blocks fit within the memory limit 107 + /// 108 + /// You probably want to check the commit's signature. You can go ahead and 109 + /// walk the MST right away. 110 + Memory(Commit, MemDriver<T>), 111 + /// Blocks exceed the memory limit 77 112 /// 78 - /// This will eagerly consume blocks until the `Commit` object is found. 79 - /// *Usually* the it's the first block, but there is no guarantee. 113 + /// You'll need to provide a disk storage to continue. The commit will be 114 + /// returned and can be validated only once all blocks are loaded. 115 + Disk(NeedDisk<R, T>), 116 + } 117 + 118 + /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 120 + pub struct DriverBuilder { 121 + pub mem_limit_mb: usize, 122 + } 123 + 124 + impl Default for DriverBuilder { 125 + fn default() -> Self { 126 + Self { mem_limit_mb: 16 } 127 + } 128 + } 129 + 130 + impl DriverBuilder { 131 + /// Begin configuring the driver with defaults 132 + pub fn new() -> Self { 133 + Default::default() 134 + } 135 + /// Set the in-memory size limit, in MiB 80 136 /// 81 - /// ### Parameters 137 + /// Default: 16 MiB 138 + pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 139 + Self { 140 + mem_limit_mb: new_limit, 141 + } 142 + } 143 + /// Set the block processor 82 144 /// 83 - /// `root`: CID of the commit object that is the root of the MST 145 + /// Default: noop, raw blocks will be emitted 146 + pub fn with_block_processor<T: Processable>( 147 + self, 148 + p: fn(Vec<u8>) -> T, 149 + ) -> DriverBuilderWithProcessor<T> { 150 + DriverBuilderWithProcessor { 151 + mem_limit_mb: self.mem_limit_mb, 152 + block_processor: p, 153 + } 154 + } 155 + /// Begin processing an atproto MST from a CAR file 156 + pub async fn load_car<R: AsyncRead + Unpin>( 157 + &self, 158 + reader: R, 159 + ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 + Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 161 + } 162 + } 163 + 164 + /// Builder-style driver intermediate step 165 + /// 166 + /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 168 + pub struct DriverBuilderWithProcessor<T: Processable> { 169 + pub mem_limit_mb: usize, 170 + pub block_processor: fn(Vec<u8>) -> T, 171 + } 172 + 173 + impl<T: Processable> DriverBuilderWithProcessor<T> { 174 + /// Set the in-memory size limit, in MiB 84 175 /// 85 - /// `block_stream`: Input stream of raw CAR blocks 176 + /// Default: 16 MiB 177 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 178 + self.mem_limit_mb = new_limit; 179 + self 180 + } 181 + /// Begin processing an atproto MST from a CAR file 182 + pub async fn load_car<R: AsyncRead + Unpin>( 183 + &self, 184 + reader: R, 185 + ) -> Result<Driver<R, T>, DriveError> { 186 + Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 + } 188 + } 189 + 190 + impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 191 + /// Begin processing an atproto MST from a CAR file 86 192 /// 87 - /// `process`: record-transforming callback: 193 + /// Blocks will be loaded, processed, and buffered in memory. If the entire 194 + /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 195 + /// will be returned along with a `Commit` ready for validation. 88 196 /// 89 - /// For tasks where records can be quickly processed into a *smaller* 90 - /// useful representation, you can do that eagerly as blocks come in by 91 - /// passing the processor as a callback here. This can reduce overall 92 - /// memory usage. 93 - pub async fn init( 94 - root: Cid, 95 - mut block_stream: S, 96 - process: P, 97 - ) -> Result<(Commit, Self), DriveError<PE>> { 98 - let mut blocks = HashMap::new(); 197 + /// If the `mem_limit_mb` limit is reached before loading all blocks, the 198 + /// partial state will be returned as `Driver::Disk(needed)`, which can be 199 + /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 + pub async fn load_car( 201 + reader: R, 202 + process: fn(Vec<u8>) -> T, 203 + mem_limit_mb: usize, 204 + ) -> Result<Driver<R, T>, DriveError> { 205 + let max_size = mem_limit_mb * 2_usize.pow(20); 206 + let mut mem_blocks = HashMap::new(); 207 + 208 + let mut car = CarReader::new(reader).await?; 209 + 210 + let root = *car 211 + .header() 212 + .roots() 213 + .first() 214 + .ok_or(DriveError::MissingRoot)?; 215 + log::debug!("root: {root:?}"); 99 216 100 217 let mut commit = None; 101 218 102 - while let Some((cid, data)) = block_stream 103 - .try_next() 104 - .await 105 - .map_err(|e| DriveError::CarBlockError(e.into()))? 106 - { 219 + // try to load all the blocks into memory 220 + let mut mem_size = 0; 221 + while let Some((cid, data)) = car.next_block().await? { 222 + // the root commit is a Special Third Kind of block that we need to make 223 + // sure not to optimistically send to the processing function 107 224 if cid == root { 108 - let c: Commit = serde_ipld_dagcbor::from_slice(&data) 109 - .map_err(|e| DriveError::BadCommit(e.into()))?; 225 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 110 226 commit = Some(c); 111 - break; 112 - } else { 113 - blocks.insert( 114 - cid, 115 - if Node::could_be(&data) { 116 - MaybeProcessedBlock::Raw(data) 117 - } else { 118 - MaybeProcessedBlock::Processed(process(&data)) 119 - }, 120 - ); 227 + continue; 228 + } 229 + 230 + // remaining possible types: node, record, other. optimistically process 231 + let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 + 233 + // stash (maybe processed) blocks in memory as long as we have room 234 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 235 + mem_blocks.insert(cid, maybe_processed); 236 + if mem_size >= max_size { 237 + return Ok(Driver::Disk(NeedDisk { 238 + car, 239 + root, 240 + process, 241 + max_size, 242 + mem_blocks, 243 + commit, 244 + })); 121 245 } 122 246 } 123 247 124 - // we either broke out or read all the blocks without finding the commit... 248 + // all blocks loaded and we fit in memory! hopefully we found the commit... 125 249 let commit = commit.ok_or(DriveError::MissingCommit)?; 126 250 127 251 let walker = Walker::new(commit.data); 128 252 129 - let me = Self { 130 - block_stream, 131 - blocks, 132 - walker, 133 - process, 134 - }; 135 - Ok((commit, me)) 253 + Ok(Driver::Memory( 254 + commit, 255 + MemDriver { 256 + blocks: mem_blocks, 257 + walker, 258 + process, 259 + }, 260 + )) 261 + } 262 + } 263 + 264 + /// The core driver between the block stream and MST walker 265 + /// 266 + /// In the future, PDSs will export CARs in a stream-friendly order that will 267 + /// enable processing them with tiny memory overhead. But that future is not 268 + /// here yet. 269 + /// 270 + /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 271 + /// optimistic stream features: we load all block first, then walk the MST. 272 + /// 273 + /// This makes things much simpler: we only need to worry about spilling to disk 274 + /// in one place, and we always have a reasonable expecatation about how much 275 + /// work the init function will do. We can drop the CAR reader before walking, 276 + /// so the sync/async boundaries become a little easier to work around. 277 + #[derive(Debug)] 278 + pub struct MemDriver<T: Processable> { 279 + blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 280 + walker: Walker, 281 + process: fn(Vec<u8>) -> T, 282 + } 283 + 284 + impl<T: Processable> MemDriver<T> { 285 + /// Step through the record outputs, in rkey order 286 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 287 + let mut out = Vec::with_capacity(n); 288 + for _ in 0..n { 289 + // walk as far as we can until we run out of blocks or find a record 290 + match self.walker.step(&mut self.blocks, self.process)? { 291 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 292 + Step::Finish => break, 293 + Step::Found { rkey, data } => { 294 + out.push((rkey, data)); 295 + continue; 296 + } 297 + }; 298 + } 299 + 300 + if out.is_empty() { 301 + Ok(None) 302 + } else { 303 + Ok(Some(out)) 304 + } 305 + } 306 + } 307 + 308 + /// A partially memory-loaded car file that needs disk spillover to continue 309 + pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 310 + car: CarReader<R>, 311 + root: Cid, 312 + process: fn(Vec<u8>) -> T, 313 + max_size: usize, 314 + mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 315 + pub commit: Option<Commit>, 316 + } 317 + 318 + fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 319 + bincode::serde::encode_to_vec(v, bincode::config::standard()) 320 + } 321 + 322 + pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 323 + let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 324 + if n != bytes.len() { 325 + return Err(DecodeError::ExtraGarbage); 136 326 } 327 + Ok(t) 328 + } 137 329 138 - async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> { 139 - while let Some((cid, data)) = self 140 - .block_stream 141 - .try_next() 142 - .await 143 - .map_err(|e| DriveError::CarBlockError(e.into()))? 144 - { 145 - self.blocks.insert( 146 - cid, 147 - if Node::could_be(&data) { 148 - MaybeProcessedBlock::Raw(data) 149 - } else { 150 - MaybeProcessedBlock::Processed((self.process)(&data)) 151 - }, 152 - ); 153 - if cid == cid_needed { 154 - return Ok(()); 330 + impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 331 + pub async fn finish_loading( 332 + mut self, 333 + mut store: DiskStore, 334 + ) -> Result<(Commit, DiskDriver<T>), DriveError> { 335 + // move store in and back out so we can manage lifetimes 336 + // dump mem blocks into the store 337 + store = tokio::task::spawn(async move { 338 + let mut writer = store.get_writer()?; 339 + 340 + let kvs = self 341 + .mem_blocks 342 + .into_iter() 343 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 344 + 345 + writer.put_many(kvs)?; 346 + writer.commit()?; 347 + Ok::<_, DriveError>(store) 348 + }) 349 + .await??; 350 + 351 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 352 + 353 + let store_worker = tokio::task::spawn_blocking(move || { 354 + let mut writer = store.get_writer()?; 355 + 356 + while let Some(chunk) = rx.blocking_recv() { 357 + let kvs = chunk 358 + .into_iter() 359 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 + writer.put_many(kvs)?; 155 361 } 362 + 363 + writer.commit()?; 364 + Ok::<_, DriveError>(store) 365 + }); // await later 366 + 367 + // dump the rest to disk (in chunks) 368 + log::debug!("dumping the rest of the stream..."); 369 + loop { 370 + let mut mem_size = 0; 371 + let mut chunk = vec![]; 372 + loop { 373 + let Some((cid, data)) = self.car.next_block().await? else { 374 + break; 375 + }; 376 + // we still gotta keep checking for the root since we might not have it 377 + if cid == self.root { 378 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 379 + self.commit = Some(c); 380 + continue; 381 + } 382 + // remaining possible types: node, record, other. optimistically process 383 + // TODO: get the actual in-memory size to compute disk spill 384 + let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 385 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 386 + chunk.push((cid, maybe_processed)); 387 + if mem_size >= self.max_size { 388 + // soooooo if we're setting the db cache to max_size and then letting 389 + // multiple chunks in the queue that are >= max_size, then at any time 390 + // we might be using some multiple of max_size? 391 + break; 392 + } 393 + } 394 + if chunk.is_empty() { 395 + break; 396 + } 397 + tx.send(chunk) 398 + .await 399 + .map_err(|_| DriveError::ChannelSendError)?; 156 400 } 401 + drop(tx); 402 + log::debug!("done. waiting for worker to finish..."); 157 403 158 - // if we never found the block 159 - Err(DriveError::MissingBlock(cid_needed)) 404 + store = store_worker.await??; 405 + 406 + log::debug!("worker finished."); 407 + 408 + let commit = self.commit.ok_or(DriveError::MissingCommit)?; 409 + 410 + let walker = Walker::new(commit.data); 411 + 412 + Ok(( 413 + commit, 414 + DiskDriver { 415 + process: self.process, 416 + state: Some(BigState { store, walker }), 417 + }, 418 + )) 419 + } 420 + } 421 + 422 + struct BigState { 423 + store: DiskStore, 424 + walker: Walker, 425 + } 426 + 427 + /// MST walker that reads from disk instead of an in-memory hashmap 428 + pub struct DiskDriver<T: Clone> { 429 + process: fn(Vec<u8>) -> T, 430 + state: Option<BigState>, 431 + } 432 + 433 + // for doctests only 434 + #[doc(hidden)] 435 + pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 436 + use crate::process::noop; 437 + DiskDriver { 438 + process: noop, 439 + state: None, 160 440 } 441 + } 161 442 162 - /// Manually step through the record outputs 163 - pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> { 443 + impl<T: Processable + Send + 'static> DiskDriver<T> { 444 + /// Walk the MST returning up to `n` rkey + record pairs 445 + /// 446 + /// ```no_run 447 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 448 + /// # #[tokio::main] 449 + /// # async fn main() -> Result<(), DriveError> { 450 + /// # let mut disk_driver = _get_fake_disk_driver(); 451 + /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 452 + /// for (rkey, record) in pairs { 453 + /// println!("{rkey}: size={}", record.len()); 454 + /// } 455 + /// } 456 + /// let store = disk_driver.reset_store().await?; 457 + /// # Ok(()) 458 + /// # } 459 + /// ``` 460 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 461 + let process = self.process; 462 + 463 + // state should only *ever* be None transiently while inside here 464 + let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 465 + 466 + // the big pain here is that we don't want to leave self.state in an 467 + // invalid state (None), so all the error paths have to make sure it 468 + // comes out again. 469 + let (state, res) = tokio::task::spawn_blocking( 470 + move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 471 + let mut reader_res = state.store.get_reader(); 472 + let reader: &mut _ = match reader_res { 473 + Ok(ref mut r) => r, 474 + Err(ref mut e) => { 475 + // unfortunately we can't return the error directly because 476 + // (for some reason) it's attached to the lifetime of the 477 + // reader? 478 + // hack a mem::swap so we can get it out :/ 479 + let e_swapped = e.steal(); 480 + // the pain: `state` *has to* outlive the reader 481 + drop(reader_res); 482 + return (state, Err(e_swapped.into())); 483 + } 484 + }; 485 + 486 + let mut out = Vec::with_capacity(n); 487 + 488 + for _ in 0..n { 489 + // walk as far as we can until we run out of blocks or find a record 490 + let step = match state.walker.disk_step(reader, process) { 491 + Ok(s) => s, 492 + Err(e) => { 493 + // the pain: `state` *has to* outlive the reader 494 + drop(reader_res); 495 + return (state, Err(e.into())); 496 + } 497 + }; 498 + match step { 499 + Step::Missing(cid) => { 500 + // the pain: `state` *has to* outlive the reader 501 + drop(reader_res); 502 + return (state, Err(DriveError::MissingBlock(cid))); 503 + } 504 + Step::Finish => break, 505 + Step::Found { rkey, data } => out.push((rkey, data)), 506 + }; 507 + } 508 + 509 + // `state` *has to* outlive the reader 510 + drop(reader_res); 511 + 512 + (state, Ok::<_, DriveError>(out)) 513 + }, 514 + ) 515 + .await?; // on tokio JoinError, we'll be left with invalid state :( 516 + 517 + // *must* restore state before dealing with the actual result 518 + self.state = Some(state); 519 + 520 + let out = res?; 521 + 522 + if out.is_empty() { 523 + Ok(None) 524 + } else { 525 + Ok(Some(out)) 526 + } 527 + } 528 + 529 + fn read_tx_blocking( 530 + &mut self, 531 + n: usize, 532 + tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 533 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 534 + let BigState { store, walker } = self.state.as_mut().expect("valid state"); 535 + let mut reader = match store.get_reader() { 536 + Ok(r) => r, 537 + Err(e) => return tx.blocking_send(Err(e.into())), 538 + }; 539 + 164 540 loop { 165 - // walk as far as we can until we run out of blocks or find a record 166 - let cid_needed = match self.walker.step(&mut self.blocks, &self.process)? { 167 - Step::Rest(cid) => cid, 168 - Step::Finish => return Ok(None), 169 - Step::Step { rkey, data } => return Ok(Some((rkey, data))), 170 - }; 541 + let mut out: BlockChunk<T> = Vec::with_capacity(n); 171 542 172 - // load blocks until we reach that cid 173 - self.drive_until(cid_needed).await?; 543 + for _ in 0..n { 544 + // walk as far as we can until we run out of blocks or find a record 545 + 546 + let step = match walker.disk_step(&mut reader, self.process) { 547 + Ok(s) => s, 548 + Err(e) => return tx.blocking_send(Err(e.into())), 549 + }; 550 + 551 + match step { 552 + Step::Missing(cid) => { 553 + return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 554 + } 555 + Step::Finish => return Ok(()), 556 + Step::Found { rkey, data } => { 557 + out.push((rkey, data)); 558 + continue; 559 + } 560 + }; 561 + } 562 + 563 + if out.is_empty() { 564 + break; 565 + } 566 + tx.blocking_send(Ok(out))?; 174 567 } 568 + 569 + Ok(()) 175 570 } 176 571 177 - /// Convert to a futures::stream of record outputs 178 - pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> { 179 - futures::stream::try_unfold(self, |mut this| async move { 180 - let maybe_record = this.next_record().await?; 181 - Ok(maybe_record.map(|b| (b, this))) 182 - }) 572 + /// Spawn the disk reading task into a tokio blocking thread 573 + /// 574 + /// The idea is to avoid so much sending back and forth to the blocking 575 + /// thread, letting a blocking task do all the disk reading work and sending 576 + /// records and rkeys back through an `mpsc` channel instead. 577 + /// 578 + /// This might also allow the disk work to continue while processing the 579 + /// records. It's still not yet clear if this method actually has much 580 + /// benefit over just using `.next_chunk(n)`. 581 + /// 582 + /// ```no_run 583 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 584 + /// # #[tokio::main] 585 + /// # async fn main() -> Result<(), DriveError> { 586 + /// # let mut disk_driver = _get_fake_disk_driver(); 587 + /// let (mut rx, join) = disk_driver.to_channel(512); 588 + /// while let Some(recvd) = rx.recv().await { 589 + /// let pairs = recvd?; 590 + /// for (rkey, record) in pairs { 591 + /// println!("{rkey}: size={}", record.len()); 592 + /// } 593 + /// 594 + /// } 595 + /// let store = join.await?.reset_store().await?; 596 + /// # Ok(()) 597 + /// # } 598 + /// ``` 599 + pub fn to_channel( 600 + mut self, 601 + n: usize, 602 + ) -> ( 603 + mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 604 + tokio::task::JoinHandle<Self>, 605 + ) { 606 + let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 607 + 608 + // sketch: this worker is going to be allowed to execute without a join handle 609 + let chan_task = tokio::task::spawn_blocking(move || { 610 + if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 611 + log::debug!("big car reader exited early due to dropped receiver channel"); 612 + } 613 + self 614 + }); 615 + 616 + (rx, chan_task) 617 + } 618 + 619 + /// Reset the disk storage so it can be reused. You must call this. 620 + /// 621 + /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 622 + /// calls, that would be risky in an async context. For now you just have to 623 + /// carefully make sure you call it. 624 + /// 625 + /// The sqlite store is returned, so it can be reused for another 626 + /// `DiskDriver`. 627 + pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 628 + let BigState { store, .. } = self.state.take().expect("valid state"); 629 + Ok(store.reset().await?) 183 630 } 184 631 }
+85 -9
src/lib.rs
··· 1 - //! Fast and robust atproto CAR file processing in rust 2 - //! 3 - //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 1 + /*! 2 + A robust CAR file -> MST walker for atproto 3 + 4 + Small CARs have their blocks buffered in memory. If a configurable memory limit 5 + is reached while reading blocks, CAR reading is suspended, and can be continued 6 + by providing disk storage to buffer the CAR blocks instead. 7 + 8 + A `process` function can be provided for tasks where records are transformed 9 + into a smaller representation, to save memory (and disk) during block reading. 10 + 11 + Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12 + `(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 + 14 + Some MST validations are applied 15 + - Keys must appear in order 16 + - Keys must be at the correct MST tree depth 17 + 18 + `iroh_car` additionally applies a block size limit of `2MiB`. 19 + 20 + ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 + 23 + # #[tokio::main] 24 + # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 + # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 + let mut total_size = 0; 27 + 28 + match DriverBuilder::new() 29 + .with_mem_limit_mb(10) 30 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 + .load_car(reader) 32 + .await? 33 + { 4 34 5 - pub mod disk_drive; 6 - pub mod disk_redb; 7 - pub mod disk_sqlite; 8 - pub mod disk_walk; 9 - pub mod drive; 35 + // if all blocks fit within memory 36 + Driver::Memory(_commit, mut driver) => { 37 + while let Some(chunk) = driver.next_chunk(256).await? { 38 + for (_rkey, size) in chunk { 39 + total_size += size; 40 + } 41 + } 42 + }, 43 + 44 + // if the CAR was too big for in-memory processing 45 + Driver::Disk(paused) => { 46 + // set up a disk store we can spill to 47 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 + // do the spilling, get back a (similar) driver 49 + let (_commit, mut driver) = paused.finish_loading(store).await?; 50 + 51 + while let Some(chunk) = driver.next_chunk(256).await? { 52 + for (_rkey, size) in chunk { 53 + total_size += size; 54 + } 55 + } 56 + 57 + // clean up the disk store (drop tables etc) 58 + driver.reset_store().await?; 59 + } 60 + }; 61 + println!("sum of size of all records: {total_size}"); 62 + # Ok(()) 63 + # } 64 + ``` 65 + 66 + Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 67 + ahead and eagerly using disk I/O. This means you have to write a bit more code 68 + to handle both cases, but it allows you to have finer control over resource 69 + usage. For example, you can drive a number of parallel memory CAR workers, and 70 + separately have a different number of disk workers picking up suspended disk 71 + tasks from a queue. 72 + 73 + Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 74 + 75 + */ 76 + 10 77 pub mod mst; 11 - pub mod walk; 78 + mod walk; 79 + 80 + pub mod disk; 81 + pub mod drive; 82 + pub mod process; 83 + 84 + pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 86 + pub use mst::Commit; 87 + pub use process::Processable;
+4 -8
src/mst.rs
··· 39 39 /// MST node data schema 40 40 #[derive(Debug, Deserialize, PartialEq)] 41 41 #[serde(deny_unknown_fields)] 42 - pub struct Node { 42 + pub(crate) struct Node { 43 43 /// link to sub-tree Node on a lower level and with all keys sorting before 44 44 /// keys at this node 45 45 #[serde(rename = "l")] ··· 62 62 /// so if a block *could be* a node, any record converter must postpone 63 63 /// processing. if it turns out it happens to be a very node-looking record, 64 64 /// well, sorry, it just has to only be processed later when that's known. 65 - pub fn could_be(bytes: impl AsRef<[u8]>) -> bool { 65 + pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 66 const NODE_FINGERPRINT: [u8; 3] = [ 67 67 0xA2, // map length 2 (for "l" and "e" keys) 68 68 0x61, // text length 1 ··· 83 83 /// with an empty array of entries. This is the only situation in which a 84 84 /// tree may contain an empty leaf node which does not either contain keys 85 85 /// ("entries") or point to a sub-tree containing entries. 86 - /// 87 - /// TODO: to me this is slightly unclear with respect to `l` (ask someone). 88 - /// ...is that what "The top of the tree must not be a an empty node which 89 - /// only points to a sub-tree." is referring to? 90 - pub fn is_empty(&self) -> bool { 86 + pub(crate) fn is_empty(&self) -> bool { 91 87 self.left.is_none() && self.entries.is_empty() 92 88 } 93 89 } ··· 95 91 /// TreeEntry object 96 92 #[derive(Debug, Deserialize, PartialEq)] 97 93 #[serde(deny_unknown_fields)] 98 - pub struct Entry { 94 + pub(crate) struct Entry { 99 95 /// count of bytes shared with previous TreeEntry in this Node (if any) 100 96 #[serde(rename = "p")] 101 97 pub prefix_len: usize,
+108
src/process.rs
··· 1 + /*! 2 + Record processor function output trait 3 + 4 + The return type must satisfy the `Processable` trait, which requires: 5 + 6 + - `Clone` because two rkeys can refer to the same record by CID, which may 7 + only appear once in the CAR file. 8 + - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 + 10 + One required function must be implemented, `get_size()`: this should return the 11 + approximate total off-stack size of the type. (the on-stack size will be added 12 + automatically via `std::mem::get_size`). 13 + 14 + Note that it is **not guaranteed** that the `process` function will run on a 15 + block before storing it in memory or on disk: it's not possible to know if a 16 + block is a record without actually walking the MST, so the best we can do is 17 + apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 + store the raw block bytes. 19 + 20 + Here's a silly processing function that just collects 'eyy's found in the raw 21 + record bytes 22 + 23 + ``` 24 + # use repo_stream::Processable; 25 + # use serde::{Serialize, Deserialize}; 26 + #[derive(Debug, Clone, Serialize, Deserialize)] 27 + struct Eyy(usize, String); 28 + 29 + impl Processable for Eyy { 30 + fn get_size(&self) -> usize { 31 + // don't need to compute the usize, it's on the stack 32 + self.1.capacity() // in-mem size from the string's capacity, in bytes 33 + } 34 + } 35 + 36 + fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 + let mut out = Vec::new(); 38 + let to_find = "eyy".as_bytes(); 39 + for i in 0..(raw.len() - 3) { 40 + if &raw[i..(i+3)] == to_find { 41 + out.push(Eyy(i, "eyy".to_string())); 42 + } 43 + } 44 + out 45 + } 46 + ``` 47 + 48 + The memory sizing stuff is a little sketch but probably at least approximately 49 + works. 50 + */ 51 + 52 + use serde::{Serialize, de::DeserializeOwned}; 53 + 54 + /// Output trait for record processing 55 + pub trait Processable: Clone + Serialize + DeserializeOwned { 56 + /// Any additional in-memory size taken by the processed type 57 + /// 58 + /// Do not include stack size (`std::mem::size_of`) 59 + fn get_size(&self) -> usize; 60 + } 61 + 62 + /// Processor that just returns the raw blocks 63 + #[inline] 64 + pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 + block 66 + } 67 + 68 + impl Processable for u8 { 69 + fn get_size(&self) -> usize { 70 + 0 71 + } 72 + } 73 + 74 + impl Processable for usize { 75 + fn get_size(&self) -> usize { 76 + 0 // no additional space taken, just its stack size (newtype is free) 77 + } 78 + } 79 + 80 + impl Processable for String { 81 + fn get_size(&self) -> usize { 82 + self.capacity() 83 + } 84 + } 85 + 86 + impl<Item: Sized + Processable> Processable for Vec<Item> { 87 + fn get_size(&self) -> usize { 88 + let slot_size = std::mem::size_of::<Item>(); 89 + let direct_size = slot_size * self.capacity(); 90 + let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 + direct_size + items_referenced_size 92 + } 93 + } 94 + 95 + impl<Item: Processable> Processable for Option<Item> { 96 + fn get_size(&self) -> usize { 97 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 + } 99 + } 100 + 101 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 + fn get_size(&self) -> usize { 103 + match self { 104 + Ok(item) => item.get_size(), 105 + Err(err) => err.get_size(), 106 + } 107 + } 108 + }
+260 -259
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::drive::MaybeProcessedBlock; 3 + use crate::disk::SqliteReader; 4 + use crate::drive::{DecodeError, MaybeProcessedBlock}; 4 5 use crate::mst::Node; 6 + use crate::process::Processable; 5 7 use ipld_core::cid::Cid; 8 + use sha2::{Digest, Sha256}; 6 9 use std::collections::HashMap; 7 - use std::error::Error; 10 + use std::convert::Infallible; 8 11 9 12 /// Errors that can happen while walking 10 13 #[derive(Debug, thiserror::Error)] 11 - pub enum Trip<E: Error> { 12 - #[error("empty mst nodes are not allowed")] 13 - NodeEmpty, 14 + pub enum WalkError { 15 + #[error("Failed to fingerprint commit block")] 16 + BadCommitFingerprint, 14 17 #[error("Failed to decode commit block: {0}")] 15 - BadCommit(Box<dyn std::error::Error>), 18 + BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 16 19 #[error("Action node error: {0}")] 17 - RkeyError(#[from] RkeyError), 18 - #[error("Process failed: {0}")] 19 - ProcessFailed(E), 20 - #[error("Encountered an rkey out of order while walking the MST")] 21 - RkeyOutOfOrder, 20 + MstError(#[from] MstError), 21 + #[error("storage error: {0}")] 22 + StorageError(#[from] rusqlite::Error), 23 + #[error("Decode error: {0}")] 24 + DecodeError(#[from] DecodeError), 22 25 } 23 26 24 27 /// Errors from invalid Rkeys 25 - #[derive(Debug, thiserror::Error)] 26 - pub enum RkeyError { 28 + #[derive(Debug, PartialEq, thiserror::Error)] 29 + pub enum MstError { 27 30 #[error("Failed to compute an rkey due to invalid prefix_len")] 28 31 EntryPrefixOutOfbounds, 29 32 #[error("RKey was not utf-8")] 30 33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 + #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 + EmptyNode, 36 + #[error("Found an entry with rkey at the wrong depth")] 37 + WrongDepth, 38 + #[error("Lost track of our depth (possible bug?)")] 39 + LostDepth, 40 + #[error("MST depth underflow: depth-0 node with child trees")] 41 + DepthUnderflow, 42 + #[error("Encountered an rkey out of order while walking the MST")] 43 + RkeyOutOfOrder, 31 44 } 32 45 33 46 /// Walker outputs 34 47 #[derive(Debug)] 35 48 pub enum Step<T> { 36 - /// We need a CID but it's not in the block store 37 - /// 38 - /// Give the needed CID to the driver so it can load blocks until it's found 39 - Rest(Cid), 49 + /// We needed this CID but it's not in the block store 50 + Missing(Cid), 40 51 /// Reached the end of the MST! yay! 41 52 Finish, 42 53 /// A record was found! 43 - Step { rkey: String, data: T }, 54 + Found { rkey: String, data: T }, 44 55 } 45 56 46 57 #[derive(Debug, Clone, PartialEq)] 47 58 enum Need { 48 - Node(Cid), 59 + Node { depth: Depth, cid: Cid }, 49 60 Record { rkey: String, cid: Cid }, 50 61 } 51 62 52 - fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> { 53 - let mut entries = Vec::with_capacity(node.entries.len()); 63 + #[derive(Debug, Clone, Copy, PartialEq)] 64 + enum Depth { 65 + Root, 66 + Depth(u32), 67 + } 54 68 69 + impl Depth { 70 + fn from_key(key: &[u8]) -> Self { 71 + let mut zeros = 0; 72 + for byte in Sha256::digest(key) { 73 + let leading = byte.leading_zeros(); 74 + zeros += leading; 75 + if leading < 8 { 76 + break; 77 + } 78 + } 79 + Self::Depth(zeros / 2) // truncating divide (rounds down) 80 + } 81 + fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 + match self { 83 + Self::Root => Ok(None), 84 + Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 + } 86 + } 87 + } 88 + 89 + fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 + // empty nodes are not allowed in the MST except in an empty MST 91 + if node.is_empty() { 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 97 + } 98 + 99 + let mut entries = Vec::with_capacity(node.entries.len()); 55 100 let mut prefix = vec![]; 101 + let mut this_depth = parent_depth.next_expected()?; 102 + 56 103 for entry in &node.entries { 57 104 let mut rkey = vec![]; 58 105 let pre_checked = prefix 59 106 .get(..entry.prefix_len) 60 - .ok_or(RkeyError::EntryPrefixOutOfbounds)?; 107 + .ok_or(MstError::EntryPrefixOutOfbounds)?; 61 108 rkey.extend_from_slice(pre_checked); 62 109 rkey.extend_from_slice(&entry.keysuffix); 110 + 111 + let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 112 + return Err(MstError::WrongDepth); 113 + }; 114 + 115 + // this_depth is `none` if we are the deepest child (directly below root) 116 + // in that case we accept whatever highest depth is claimed 117 + let expected_depth = match this_depth { 118 + Some(d) => d, 119 + None => { 120 + this_depth = Some(key_depth); 121 + key_depth 122 + } 123 + }; 124 + 125 + // all keys we find should be this depth 126 + if key_depth != expected_depth { 127 + return Err(MstError::DepthUnderflow); 128 + } 129 + 63 130 prefix = rkey.clone(); 64 131 65 132 entries.push(Need::Record { ··· 67 134 cid: entry.value, 68 135 }); 69 136 if let Some(ref tree) = entry.tree { 70 - entries.push(Need::Node(*tree)); 137 + entries.push(Need::Node { 138 + depth: Depth::Depth(key_depth), 139 + cid: *tree, 140 + }); 71 141 } 72 142 } 73 143 74 144 entries.reverse(); 75 145 stack.append(&mut entries); 146 + 147 + let d = this_depth.ok_or(MstError::LostDepth)?; 76 148 77 149 if let Some(tree) = node.left { 78 - stack.push(Need::Node(tree)); 150 + stack.push(Need::Node { 151 + depth: Depth::Depth(d), 152 + cid: tree, 153 + }); 79 154 } 80 155 Ok(()) 81 156 } ··· 92 167 impl Walker { 93 168 pub fn new(tree_root_cid: Cid) -> Self { 94 169 Self { 95 - stack: vec![Need::Node(tree_root_cid)], 170 + stack: vec![Need::Node { 171 + depth: Depth::Root, 172 + cid: tree_root_cid, 173 + }], 96 174 prev: "".to_string(), 97 175 } 98 176 } 99 177 100 178 /// Advance through nodes until we find a record or can't go further 101 - pub fn step<T: Clone, E: Error>( 179 + pub fn step<T: Processable>( 102 180 &mut self, 103 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>, 104 - process: impl Fn(&[u8]) -> Result<T, E>, 105 - ) -> Result<Step<T>, Trip<E>> { 181 + blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 182 + process: impl Fn(Vec<u8>) -> T, 183 + ) -> Result<Step<T>, WalkError> { 106 184 loop { 107 - let Some(mut need) = self.stack.last() else { 185 + let Some(need) = self.stack.last_mut() else { 108 186 log::trace!("tried to walk but we're actually done."); 109 187 return Ok(Step::Finish); 110 188 }; 111 189 112 - match &mut need { 113 - Need::Node(cid) => { 190 + match need { 191 + &mut Need::Node { depth, cid } => { 114 192 log::trace!("need node {cid:?}"); 115 - let Some(block) = blocks.remove(cid) else { 193 + let Some(block) = blocks.remove(&cid) else { 116 194 log::trace!("node not found, resting"); 117 - return Ok(Step::Rest(*cid)); 195 + return Ok(Step::Missing(cid)); 118 196 }; 119 197 120 198 let MaybeProcessedBlock::Raw(data) = block else { 121 - return Err(Trip::BadCommit("failed commit fingerprint".into())); 199 + return Err(WalkError::BadCommitFingerprint); 122 200 }; 123 201 let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 124 - .map_err(|e| Trip::BadCommit(e.into()))?; 202 + .map_err(WalkError::BadCommit)?; 125 203 126 204 // found node, make sure we remember 127 205 self.stack.pop(); 128 206 129 207 // queue up work on the found node next 130 - push_from_node(&mut self.stack, &node)?; 208 + push_from_node(&mut self.stack, &node, depth)?; 131 209 } 132 210 Need::Record { rkey, cid } => { 133 211 log::trace!("need record {cid:?}"); 212 + // note that we cannot *remove* a record block, sadly, since 213 + // there can be multiple rkeys pointing to the same cid. 134 214 let Some(data) = blocks.get_mut(cid) else { 215 + return Ok(Step::Missing(*cid)); 216 + }; 217 + let rkey = rkey.clone(); 218 + let data = match data { 219 + MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 220 + MaybeProcessedBlock::Processed(t) => t.clone(), 221 + }; 222 + 223 + // found node, make sure we remember 224 + self.stack.pop(); 225 + 226 + // rkeys *must* be in order or else the tree is invalid (or 227 + // we have a bug) 228 + if rkey <= self.prev { 229 + return Err(MstError::RkeyOutOfOrder)?; 230 + } 231 + self.prev = rkey.clone(); 232 + 233 + return Ok(Step::Found { rkey, data }); 234 + } 235 + } 236 + } 237 + } 238 + 239 + /// blocking!!!!!! 240 + pub fn disk_step<T: Processable>( 241 + &mut self, 242 + reader: &mut SqliteReader, 243 + process: impl Fn(Vec<u8>) -> T, 244 + ) -> Result<Step<T>, WalkError> { 245 + loop { 246 + let Some(need) = self.stack.last_mut() else { 247 + log::trace!("tried to walk but we're actually done."); 248 + return Ok(Step::Finish); 249 + }; 250 + 251 + match need { 252 + &mut Need::Node { depth, cid } => { 253 + let cid_bytes = cid.to_bytes(); 254 + log::trace!("need node {cid:?}"); 255 + let Some(block_bytes) = reader.get(cid_bytes)? else { 256 + log::trace!("node not found, resting"); 257 + return Ok(Step::Missing(cid)); 258 + }; 259 + 260 + let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 261 + 262 + let MaybeProcessedBlock::Raw(data) = block else { 263 + return Err(WalkError::BadCommitFingerprint); 264 + }; 265 + let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 266 + .map_err(WalkError::BadCommit)?; 267 + 268 + // found node, make sure we remember 269 + self.stack.pop(); 270 + 271 + // queue up work on the found node next 272 + push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 273 + } 274 + Need::Record { rkey, cid } => { 275 + log::trace!("need record {cid:?}"); 276 + let cid_bytes = cid.to_bytes(); 277 + let Some(data_bytes) = reader.get(cid_bytes)? else { 135 278 log::trace!("record block not found, resting"); 136 - return Ok(Step::Rest(*cid)); 279 + return Ok(Step::Missing(*cid)); 137 280 }; 281 + let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 138 282 let rkey = rkey.clone(); 139 283 let data = match data { 140 284 MaybeProcessedBlock::Raw(data) => process(data), 141 - MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()), 142 - bad => { 143 - // big hack to pull the error out -- this corrupts 144 - // a block, so we should not continue trying to work 145 - let mut steal = MaybeProcessedBlock::Raw(vec![]); 146 - std::mem::swap(&mut steal, bad); 147 - let MaybeProcessedBlock::Processed(Err(e)) = steal else { 148 - unreachable!(); 149 - }; 150 - return Err(Trip::ProcessFailed(e)); 151 - } 285 + MaybeProcessedBlock::Processed(t) => t.clone(), 152 286 }; 153 287 154 288 // found node, make sure we remember 155 289 self.stack.pop(); 156 290 157 291 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 158 - let data = data.map_err(Trip::ProcessFailed)?; 159 292 160 293 // rkeys *must* be in order or else the tree is invalid (or 161 294 // we have a bug) 162 295 if rkey <= self.prev { 163 - return Err(Trip::RkeyOutOfOrder); 296 + return Err(MstError::RkeyOutOfOrder)?; 164 297 } 165 298 self.prev = rkey.clone(); 166 299 167 - return Ok(Step::Step { rkey, data }); 300 + return Ok(Step::Found { rkey, data }); 168 301 } 169 302 } 170 303 } ··· 174 307 #[cfg(test)] 175 308 mod test { 176 309 use super::*; 177 - // use crate::mst::Entry; 178 310 179 311 fn cid1() -> Cid { 180 312 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 181 313 .parse() 182 314 .unwrap() 183 315 } 184 - // fn cid2() -> Cid { 185 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 186 - // .parse() 187 - // .unwrap() 188 - // } 189 - // fn cid3() -> Cid { 190 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 191 - // .parse() 192 - // .unwrap() 193 - // } 194 - // fn cid4() -> Cid { 195 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 196 - // .parse() 197 - // .unwrap() 198 - // } 199 - // fn cid5() -> Cid { 200 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 201 - // .parse() 202 - // .unwrap() 203 - // } 204 - // fn cid6() -> Cid { 205 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 206 - // .parse() 207 - // .unwrap() 208 - // } 209 - // fn cid7() -> Cid { 210 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 211 - // .parse() 212 - // .unwrap() 213 - // } 214 - // fn cid8() -> Cid { 215 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 216 - // .parse() 217 - // .unwrap() 218 - // } 219 - // fn cid9() -> Cid { 220 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 221 - // .parse() 222 - // .unwrap() 223 - // } 316 + 317 + #[test] 318 + fn test_depth_spec_0() { 319 + let d = Depth::from_key(b"2653ae71"); 320 + assert_eq!(d, Depth::Depth(0)) 321 + } 322 + 323 + #[test] 324 + fn test_depth_spec_1() { 325 + let d = Depth::from_key(b"blue"); 326 + assert_eq!(d, Depth::Depth(1)) 327 + } 328 + 329 + #[test] 330 + fn test_depth_spec_4() { 331 + let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 332 + assert_eq!(d, Depth::Depth(4)) 333 + } 334 + 335 + #[test] 336 + fn test_depth_spec_8() { 337 + let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 338 + assert_eq!(d, Depth::Depth(8)) 339 + } 340 + 341 + #[test] 342 + fn test_depth_ietf_draft_0() { 343 + let d = Depth::from_key(b"key1"); 344 + assert_eq!(d, Depth::Depth(0)) 345 + } 346 + 347 + #[test] 348 + fn test_depth_ietf_draft_1() { 349 + let d = Depth::from_key(b"key7"); 350 + assert_eq!(d, Depth::Depth(1)) 351 + } 352 + 353 + #[test] 354 + fn test_depth_ietf_draft_4() { 355 + let d = Depth::from_key(b"key515"); 356 + assert_eq!(d, Depth::Depth(4)) 357 + } 224 358 225 359 #[test] 226 - fn test_next_from_node_empty() { 227 - let node = Node { 360 + fn test_depth_interop() { 361 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 362 + for (k, expected) in [ 363 + ("", 0), 364 + ("asdf", 0), 365 + ("blue", 1), 366 + ("2653ae71", 0), 367 + ("88bfafc7", 2), 368 + ("2a92d355", 4), 369 + ("884976f5", 6), 370 + ("app.bsky.feed.post/454397e440ec", 4), 371 + ("app.bsky.feed.post/9adeb165882c", 8), 372 + ] { 373 + let d = Depth::from_key(k.as_bytes()); 374 + assert_eq!(d, Depth::Depth(expected), "key: {}", k); 375 + } 376 + } 377 + 378 + #[test] 379 + fn test_push_empty_fails() { 380 + let empty_node = Node { 228 381 left: None, 229 382 entries: vec![], 230 383 }; 231 384 let mut stack = vec![]; 232 - push_from_node(&mut stack, &node).unwrap(); 233 - assert_eq!(stack.last(), None); 385 + let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 386 + assert_eq!(err, Err(MstError::EmptyNode)); 234 387 } 235 388 236 389 #[test] 237 - fn test_needs_from_node_just_left() { 390 + fn test_push_one_node() { 238 391 let node = Node { 239 392 left: Some(cid1()), 240 393 entries: vec![], 241 394 }; 242 395 let mut stack = vec![]; 243 - push_from_node(&mut stack, &node).unwrap(); 244 - assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref()); 396 + push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 397 + assert_eq!( 398 + stack.last(), 399 + Some(Need::Node { 400 + depth: Depth::Depth(3), 401 + cid: cid1() 402 + }) 403 + .as_ref() 404 + ); 245 405 } 246 - 247 - // #[test] 248 - // fn test_needs_from_node_just_one_record() { 249 - // let node = Node { 250 - // left: None, 251 - // entries: vec![Entry { 252 - // keysuffix: "asdf".into(), 253 - // prefix_len: 0, 254 - // value: cid1(), 255 - // tree: None, 256 - // }], 257 - // }; 258 - // assert_eq!( 259 - // needs_from_node(node).unwrap(), 260 - // vec![Need::Record { 261 - // rkey: "asdf".into(), 262 - // cid: cid1(), 263 - // },] 264 - // ); 265 - // } 266 - 267 - // #[test] 268 - // fn test_needs_from_node_two_records() { 269 - // let node = Node { 270 - // left: None, 271 - // entries: vec![ 272 - // Entry { 273 - // keysuffix: "asdf".into(), 274 - // prefix_len: 0, 275 - // value: cid1(), 276 - // tree: None, 277 - // }, 278 - // Entry { 279 - // keysuffix: "gh".into(), 280 - // prefix_len: 2, 281 - // value: cid2(), 282 - // tree: None, 283 - // }, 284 - // ], 285 - // }; 286 - // assert_eq!( 287 - // needs_from_node(node).unwrap(), 288 - // vec![ 289 - // Need::Record { 290 - // rkey: "asdf".into(), 291 - // cid: cid1(), 292 - // }, 293 - // Need::Record { 294 - // rkey: "asgh".into(), 295 - // cid: cid2(), 296 - // }, 297 - // ] 298 - // ); 299 - // } 300 - 301 - // #[test] 302 - // fn test_needs_from_node_with_both() { 303 - // let node = Node { 304 - // left: None, 305 - // entries: vec![Entry { 306 - // keysuffix: "asdf".into(), 307 - // prefix_len: 0, 308 - // value: cid1(), 309 - // tree: Some(cid2()), 310 - // }], 311 - // }; 312 - // assert_eq!( 313 - // needs_from_node(node).unwrap(), 314 - // vec![ 315 - // Need::Record { 316 - // rkey: "asdf".into(), 317 - // cid: cid1(), 318 - // }, 319 - // Need::Node(cid2()), 320 - // ] 321 - // ); 322 - // } 323 - 324 - // #[test] 325 - // fn test_needs_from_node_left_and_record() { 326 - // let node = Node { 327 - // left: Some(cid1()), 328 - // entries: vec![Entry { 329 - // keysuffix: "asdf".into(), 330 - // prefix_len: 0, 331 - // value: cid2(), 332 - // tree: None, 333 - // }], 334 - // }; 335 - // assert_eq!( 336 - // needs_from_node(node).unwrap(), 337 - // vec![ 338 - // Need::Node(cid1()), 339 - // Need::Record { 340 - // rkey: "asdf".into(), 341 - // cid: cid2(), 342 - // }, 343 - // ] 344 - // ); 345 - // } 346 - 347 - // #[test] 348 - // fn test_needs_from_full_node() { 349 - // let node = Node { 350 - // left: Some(cid1()), 351 - // entries: vec![ 352 - // Entry { 353 - // keysuffix: "asdf".into(), 354 - // prefix_len: 0, 355 - // value: cid2(), 356 - // tree: Some(cid3()), 357 - // }, 358 - // Entry { 359 - // keysuffix: "ghi".into(), 360 - // prefix_len: 1, 361 - // value: cid4(), 362 - // tree: Some(cid5()), 363 - // }, 364 - // Entry { 365 - // keysuffix: "jkl".into(), 366 - // prefix_len: 2, 367 - // value: cid6(), 368 - // tree: Some(cid7()), 369 - // }, 370 - // Entry { 371 - // keysuffix: "mno".into(), 372 - // prefix_len: 4, 373 - // value: cid8(), 374 - // tree: Some(cid9()), 375 - // }, 376 - // ], 377 - // }; 378 - // assert_eq!( 379 - // needs_from_node(node).unwrap(), 380 - // vec![ 381 - // Need::Node(cid1()), 382 - // Need::Record { 383 - // rkey: "asdf".into(), 384 - // cid: cid2(), 385 - // }, 386 - // Need::Node(cid3()), 387 - // Need::Record { 388 - // rkey: "aghi".into(), 389 - // cid: cid4(), 390 - // }, 391 - // Need::Node(cid5()), 392 - // Need::Record { 393 - // rkey: "agjkl".into(), 394 - // cid: cid6(), 395 - // }, 396 - // Need::Node(cid7()), 397 - // Need::Record { 398 - // rkey: "agjkmno".into(), 399 - // cid: cid8(), 400 - // }, 401 - // Need::Node(cid9()), 402 - // ] 403 - // ); 404 - // } 405 406 }
+34 -31
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 9 8 10 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 11 - let reader = CarReader::new(bytes).await.unwrap(); 12 - 13 - let root = reader 14 - .header() 15 - .roots() 16 - .first() 17 - .ok_or("missing root") 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 15 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 + .await 18 17 .unwrap() 19 - .clone(); 20 - 21 - let stream = std::pin::pin!(reader.stream()); 22 - 23 - let (_commit, v) = 24 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 25 - .await 26 - .unwrap(); 27 - let mut record_stream = std::pin::pin!(v.stream()); 18 + { 19 + Driver::Memory(_commit, mem_driver) => mem_driver, 20 + Driver::Disk(_) => panic!("too big"), 21 + }; 28 22 29 23 let mut records = 0; 30 24 let mut sum = 0; 31 25 let mut found_bsky_profile = false; 32 26 let mut prev_rkey = "".to_string(); 33 - while let Some((rkey, size)) = record_stream.try_next().await.unwrap() { 34 - records += 1; 35 - sum += size; 36 - if rkey == "app.bsky.actor.profile/self" { 37 - found_bsky_profile = true; 27 + 28 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 29 + for (rkey, size) in pairs { 30 + records += 1; 31 + sum += size; 32 + if rkey == "app.bsky.actor.profile/self" { 33 + found_bsky_profile = true; 34 + } 35 + assert!(rkey > prev_rkey, "rkeys are streamed in order"); 36 + prev_rkey = rkey; 38 37 } 39 - assert!(rkey > prev_rkey, "rkeys are streamed in order"); 40 - prev_rkey = rkey; 41 38 } 39 + 42 40 assert_eq!(records, expected_records); 43 41 assert_eq!(sum, expected_sum); 44 - assert!(found_bsky_profile); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 45 48 } 46 49 47 50 #[tokio::test] 48 51 async fn test_tiny_car() { 49 - test_car(TINY_CAR, 8, 2071).await 52 + test_car(TINY_CAR, 8, 2071, true).await 50 53 } 51 54 52 55 #[tokio::test] 53 56 async fn test_little_car() { 54 - test_car(LITTLE_CAR, 278, 246960).await 57 + test_car(LITTLE_CAR, 278, 246960, true).await 55 58 } 56 59 57 60 #[tokio::test] 58 61 async fn test_midsize_car() { 59 - test_car(MIDSIZE_CAR, 11585, 3741393).await 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 60 63 }