Fast and robust atproto CAR file processing in rust

Merge branch 'disk'

Changed files
+1648 -479
benches
examples
disk-read-file
read-file
src
tests
+270 -1
Cargo.lock
··· 126 126 ] 127 127 128 128 [[package]] 129 + name = "bincode" 130 + version = "2.0.1" 131 + source = "registry+https://github.com/rust-lang/crates.io-index" 132 + checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 133 + dependencies = [ 134 + "bincode_derive", 135 + "serde", 136 + "unty", 137 + ] 138 + 139 + [[package]] 140 + name = "bincode_derive" 141 + version = "2.0.1" 142 + source = "registry+https://github.com/rust-lang/crates.io-index" 143 + checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 144 + dependencies = [ 145 + "virtue", 146 + ] 147 + 148 + [[package]] 129 149 name = "bitflags" 130 150 version = "2.9.4" 131 151 source = "registry+https://github.com/rust-lang/crates.io-index" 132 152 checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" 153 + 154 + [[package]] 155 + name = "block-buffer" 156 + version = "0.10.4" 157 + source = "registry+https://github.com/rust-lang/crates.io-index" 158 + checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" 159 + dependencies = [ 160 + "generic-array", 161 + ] 133 162 134 163 [[package]] 135 164 name = "bumpalo" ··· 267 296 ] 268 297 269 298 [[package]] 299 + name = "cpufeatures" 300 + version = "0.2.17" 301 + source = "registry+https://github.com/rust-lang/crates.io-index" 302 + checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" 303 + dependencies = [ 304 + "libc", 305 + ] 306 + 307 + [[package]] 270 308 name = "criterion" 271 309 version = "0.7.0" 272 310 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 332 370 checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" 333 371 334 372 [[package]] 373 + name = "crypto-common" 374 + version = "0.1.6" 375 + source = "registry+https://github.com/rust-lang/crates.io-index" 376 + checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" 377 + dependencies = [ 378 + "generic-array", 379 + "typenum", 380 + ] 381 + 382 + [[package]] 335 383 name = "data-encoding" 336 384 version = "2.9.0" 337 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 406 ] 359 407 360 408 [[package]] 409 + name = "digest" 410 + version = "0.10.7" 411 + source = "registry+https://github.com/rust-lang/crates.io-index" 412 + checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" 413 + dependencies = [ 414 + "block-buffer", 415 + "crypto-common", 416 + ] 417 + 418 + [[package]] 361 419 name = "either" 362 420 version = "1.15.0" 363 421 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 387 445 ] 388 446 389 447 [[package]] 448 + name = "errno" 449 + version = "0.3.14" 450 + source = "registry+https://github.com/rust-lang/crates.io-index" 451 + checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb" 452 + dependencies = [ 453 + "libc", 454 + "windows-sys 0.60.2", 455 + ] 456 + 457 + [[package]] 458 + name = "fallible-iterator" 459 + version = "0.3.0" 460 + source = "registry+https://github.com/rust-lang/crates.io-index" 461 + checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 462 + 463 + [[package]] 464 + name = "fallible-streaming-iterator" 465 + version = "0.1.9" 466 + source = "registry+https://github.com/rust-lang/crates.io-index" 467 + checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 468 + 469 + [[package]] 470 + name = "fastrand" 471 + version = "2.3.0" 472 + source = "registry+https://github.com/rust-lang/crates.io-index" 473 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 474 + 475 + [[package]] 476 + name = "foldhash" 477 + version = "0.1.5" 478 + source = "registry+https://github.com/rust-lang/crates.io-index" 479 + checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 480 + 481 + [[package]] 390 482 name = "futures" 391 483 version = "0.3.31" 392 484 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 476 568 ] 477 569 478 570 [[package]] 571 + name = "generic-array" 572 + version = "0.14.9" 573 + source = "registry+https://github.com/rust-lang/crates.io-index" 574 + checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" 575 + dependencies = [ 576 + "typenum", 577 + "version_check", 578 + ] 579 + 580 + [[package]] 581 + name = "getrandom" 582 + version = "0.3.3" 583 + source = "registry+https://github.com/rust-lang/crates.io-index" 584 + checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4" 585 + dependencies = [ 586 + "cfg-if", 587 + "libc", 588 + "r-efi", 589 + "wasi 0.14.7+wasi-0.2.4", 590 + ] 591 + 592 + [[package]] 479 593 name = "gimli" 480 594 version = "0.32.3" 481 595 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 490 604 "cfg-if", 491 605 "crunchy", 492 606 "zerocopy", 607 + ] 608 + 609 + [[package]] 610 + name = "hashbrown" 611 + version = "0.15.5" 612 + source = "registry+https://github.com/rust-lang/crates.io-index" 613 + checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 + dependencies = [ 615 + "foldhash", 616 + ] 617 + 618 + [[package]] 619 + name = "hashlink" 620 + version = "0.10.0" 621 + source = "registry+https://github.com/rust-lang/crates.io-index" 622 + checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 + dependencies = [ 624 + "hashbrown", 493 625 ] 494 626 495 627 [[package]] ··· 598 730 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 599 731 600 732 [[package]] 733 + name = "libsqlite3-sys" 734 + version = "0.35.0" 735 + source = "registry+https://github.com/rust-lang/crates.io-index" 736 + checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 737 + dependencies = [ 738 + "pkg-config", 739 + "vcpkg", 740 + ] 741 + 742 + [[package]] 743 + name = "linux-raw-sys" 744 + version = "0.11.0" 745 + source = "registry+https://github.com/rust-lang/crates.io-index" 746 + checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" 747 + 748 + [[package]] 601 749 name = "lock_api" 602 750 version = "0.4.14" 603 751 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 645 793 checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c" 646 794 dependencies = [ 647 795 "libc", 648 - "wasi", 796 + "wasi 0.11.1+wasi-snapshot-preview1", 649 797 "windows-sys 0.59.0", 650 798 ] 651 799 ··· 744 892 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 745 893 746 894 [[package]] 895 + name = "pkg-config" 896 + version = "0.3.32" 897 + source = "registry+https://github.com/rust-lang/crates.io-index" 898 + checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 + 900 + [[package]] 747 901 name = "plotters" 748 902 version = "0.3.7" 749 903 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 803 957 dependencies = [ 804 958 "proc-macro2", 805 959 ] 960 + 961 + [[package]] 962 + name = "r-efi" 963 + version = "5.3.0" 964 + source = "registry+https://github.com/rust-lang/crates.io-index" 965 + checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 806 966 807 967 [[package]] 808 968 name = "rayon" ··· 866 1026 name = "repo-stream" 867 1027 version = "0.1.1" 868 1028 dependencies = [ 1029 + "bincode", 869 1030 "clap", 870 1031 "criterion", 871 1032 "env_logger", ··· 875 1036 "iroh-car", 876 1037 "log", 877 1038 "multibase", 1039 + "rusqlite", 878 1040 "serde", 879 1041 "serde_bytes", 880 1042 "serde_ipld_dagcbor", 1043 + "sha2", 1044 + "tempfile", 881 1045 "thiserror 2.0.17", 882 1046 "tokio", 883 1047 ] 884 1048 885 1049 [[package]] 1050 + name = "rusqlite" 1051 + version = "0.37.0" 1052 + source = "registry+https://github.com/rust-lang/crates.io-index" 1053 + checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 + dependencies = [ 1055 + "bitflags", 1056 + "fallible-iterator", 1057 + "fallible-streaming-iterator", 1058 + "hashlink", 1059 + "libsqlite3-sys", 1060 + "smallvec", 1061 + ] 1062 + 1063 + [[package]] 886 1064 name = "rustc-demangle" 887 1065 version = "0.1.26" 888 1066 source = "registry+https://github.com/rust-lang/crates.io-index" 889 1067 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1068 + 1069 + [[package]] 1070 + name = "rustix" 1071 + version = "1.1.2" 1072 + source = "registry+https://github.com/rust-lang/crates.io-index" 1073 + checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e" 1074 + dependencies = [ 1075 + "bitflags", 1076 + "errno", 1077 + "libc", 1078 + "linux-raw-sys", 1079 + "windows-sys 0.60.2", 1080 + ] 890 1081 891 1082 [[package]] 892 1083 name = "rustversion" ··· 978 1169 "ryu", 979 1170 "serde", 980 1171 "serde_core", 1172 + ] 1173 + 1174 + [[package]] 1175 + name = "sha2" 1176 + version = "0.10.9" 1177 + source = "registry+https://github.com/rust-lang/crates.io-index" 1178 + checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" 1179 + dependencies = [ 1180 + "cfg-if", 1181 + "cpufeatures", 1182 + "digest", 981 1183 ] 982 1184 983 1185 [[package]] ··· 1040 1242 ] 1041 1243 1042 1244 [[package]] 1245 + name = "tempfile" 1246 + version = "3.23.0" 1247 + source = "registry+https://github.com/rust-lang/crates.io-index" 1248 + checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" 1249 + dependencies = [ 1250 + "fastrand", 1251 + "getrandom", 1252 + "once_cell", 1253 + "rustix", 1254 + "windows-sys 0.60.2", 1255 + ] 1256 + 1257 + [[package]] 1043 1258 name = "thiserror" 1044 1259 version = "1.0.69" 1045 1260 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1121 1336 ] 1122 1337 1123 1338 [[package]] 1339 + name = "typenum" 1340 + version = "1.19.0" 1341 + source = "registry+https://github.com/rust-lang/crates.io-index" 1342 + checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" 1343 + 1344 + [[package]] 1124 1345 name = "unicode-ident" 1125 1346 version = "1.0.19" 1126 1347 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1139 1360 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1140 1361 1141 1362 [[package]] 1363 + name = "unty" 1364 + version = "0.0.4" 1365 + source = "registry+https://github.com/rust-lang/crates.io-index" 1366 + checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1367 + 1368 + [[package]] 1142 1369 name = "utf8parse" 1143 1370 version = "0.2.2" 1144 1371 source = "registry+https://github.com/rust-lang/crates.io-index" 1145 1372 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1146 1373 1147 1374 [[package]] 1375 + name = "vcpkg" 1376 + version = "0.2.15" 1377 + source = "registry+https://github.com/rust-lang/crates.io-index" 1378 + checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 + 1380 + [[package]] 1381 + name = "version_check" 1382 + version = "0.9.5" 1383 + source = "registry+https://github.com/rust-lang/crates.io-index" 1384 + checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1385 + 1386 + [[package]] 1387 + name = "virtue" 1388 + version = "0.0.18" 1389 + source = "registry+https://github.com/rust-lang/crates.io-index" 1390 + checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 1391 + 1392 + [[package]] 1148 1393 name = "walkdir" 1149 1394 version = "2.5.0" 1150 1395 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1159 1404 version = "0.11.1+wasi-snapshot-preview1" 1160 1405 source = "registry+https://github.com/rust-lang/crates.io-index" 1161 1406 checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b" 1407 + 1408 + [[package]] 1409 + name = "wasi" 1410 + version = "0.14.7+wasi-0.2.4" 1411 + source = "registry+https://github.com/rust-lang/crates.io-index" 1412 + checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c" 1413 + dependencies = [ 1414 + "wasip2", 1415 + ] 1416 + 1417 + [[package]] 1418 + name = "wasip2" 1419 + version = "1.0.1+wasi-0.2.4" 1420 + source = "registry+https://github.com/rust-lang/crates.io-index" 1421 + checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7" 1422 + dependencies = [ 1423 + "wit-bindgen", 1424 + ] 1162 1425 1163 1426 [[package]] 1164 1427 name = "wasm-bindgen" ··· 1390 1653 version = "0.53.1" 1391 1654 source = "registry+https://github.com/rust-lang/crates.io-index" 1392 1655 checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650" 1656 + 1657 + [[package]] 1658 + name = "wit-bindgen" 1659 + version = "0.46.0" 1660 + source = "registry+https://github.com/rust-lang/crates.io-index" 1661 + checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1393 1662 1394 1663 [[package]] 1395 1664 name = "zerocopy"
+8 -1
Cargo.toml
··· 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 + bincode = { version = "2.0.1", features = ["serde"] } 10 11 futures = "0.3.31" 11 12 futures-core = "0.3.31" 12 13 ipld-core = { version = "0.4.2", features = ["serde"] } 13 14 iroh-car = "0.5.1" 14 15 log = "0.4.28" 15 16 multibase = "0.9.2" 17 + rusqlite = "0.37.0" 16 18 serde = { version = "1.0.228", features = ["derive"] } 17 19 serde_bytes = "0.11.19" 18 20 serde_ipld_dagcbor = "0.6.4" 21 + sha2 = "0.10.9" 19 22 thiserror = "2.0.17" 20 - tokio = "1.47.1" 23 + tokio = { version = "1.47.1", features = ["rt", "sync"] } 21 24 22 25 [dev-dependencies] 23 26 clap = { version = "4.5.48", features = ["derive"] } 24 27 criterion = { version = "0.7.0", features = ["async_tokio"] } 25 28 env_logger = "0.11.8" 26 29 multibase = "0.9.2" 30 + tempfile = "3.23.0" 27 31 tokio = { version = "1.47.1", features = ["full"] } 28 32 29 33 [profile.profiling] 30 34 inherits = "release" 31 35 debug = true 36 + 37 + # [profile.release] 38 + # debug = true 32 39 33 40 [[bench]] 34 41 name = "non-huge-cars"
+12 -21
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 use std::path::{Path, PathBuf}; 6 4 7 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 20 18 }); 21 19 } 22 20 23 - async fn drive_car(filename: impl AsRef<Path>) { 21 + async fn drive_car(filename: impl AsRef<Path>) -> usize { 24 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 25 23 let reader = tokio::io::BufReader::new(reader); 26 - let reader = CarReader::new(reader).await.unwrap(); 27 24 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root") 25 + let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 + .await 33 27 .unwrap() 34 - .clone(); 35 - 36 - let stream = std::pin::pin!(reader.stream()); 37 - 38 - let (_commit, v) = 39 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 40 - .await 41 - .unwrap(); 42 - let mut record_stream = std::pin::pin!(v.stream()); 28 + { 29 + Driver::Memory(_, mem_driver) => mem_driver, 30 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 31 + }; 43 32 44 - while let Some(_) = record_stream.try_next().await.unwrap() { 45 - // just here for the drive 33 + let mut n = 0; 34 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 35 + n += pairs.len(); 46 36 } 37 + n 47 38 } 48 39 49 40 criterion_group!(benches, criterion_benchmark);
+12 -22
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 6 4 use criterion::{Criterion, criterion_group, criterion_main}; 7 5 ··· 26 24 }); 27 25 } 28 26 29 - async fn drive_car(bytes: &[u8]) { 30 - let reader = CarReader::new(bytes).await.unwrap(); 31 - 32 - let root = reader 33 - .header() 34 - .roots() 35 - .first() 36 - .ok_or("missing root") 27 + async fn drive_car(bytes: &[u8]) -> usize { 28 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 29 + .await 37 30 .unwrap() 38 - .clone(); 39 - 40 - let stream = std::pin::pin!(reader.stream()); 31 + { 32 + Driver::Memory(_, mem_driver) => mem_driver, 33 + Driver::Disk(_) => panic!("not benching big cars here"), 34 + }; 41 35 42 - let (_commit, v) = 43 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 44 - .await 45 - .unwrap(); 46 - let mut record_stream = std::pin::pin!(v.stream()); 47 - 48 - while let Some(_) = record_stream.try_next().await.unwrap() { 49 - // just here for the drive 36 + let mut n = 0; 37 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 38 + n += pairs.len(); 50 39 } 40 + n 51 41 } 52 42 53 43 criterion_group!(benches, criterion_benchmark);
+91
examples/disk-read-file/main.rs
··· 1 + /*! 2 + Read a CAR file by spilling to disk 3 + */ 4 + 5 + extern crate repo_stream; 6 + use clap::Parser; 7 + use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 + use std::path::PathBuf; 9 + 10 + #[derive(Debug, Parser)] 11 + struct Args { 12 + #[arg()] 13 + car: PathBuf, 14 + #[arg()] 15 + tmpfile: PathBuf, 16 + } 17 + 18 + #[tokio::main] 19 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 20 + env_logger::init(); 21 + 22 + let Args { car, tmpfile } = Args::parse(); 23 + 24 + // repo-stream takes an AsyncRead as input. wrapping a filesystem read in 25 + // BufReader can provide a really significant performance win. 26 + let reader = tokio::fs::File::open(car).await?; 27 + let reader = tokio::io::BufReader::new(reader); 28 + 29 + log::info!("hello! reading the car..."); 30 + 31 + // in this example we only bother handling CARs that are too big for memory 32 + // `noop` helper means: do no block processing, store the raw blocks 33 + let driver = match DriverBuilder::new() 34 + .with_mem_limit_mb(10) // how much memory can be used before disk spill 35 + .load_car(reader) 36 + .await? 37 + { 38 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 39 + Driver::Disk(big_stuff) => { 40 + // we reach here if the repo was too big and needs to be spilled to 41 + // disk to continue 42 + 43 + // set up a disk store we can spill to 44 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 45 + 46 + // do the spilling, get back a (similar) driver 47 + let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 48 + 49 + // at this point you might want to fetch the account's signing key 50 + // via the DID from the commit, and then verify the signature. 51 + log::warn!("big's comit: {:?}", commit); 52 + 53 + // pop the driver back out to get some code indentation relief 54 + driver 55 + } 56 + }; 57 + 58 + // collect some random stats about the blocks 59 + let mut n = 0; 60 + let mut zeros = 0; 61 + 62 + log::info!("walking..."); 63 + 64 + // this example uses the disk driver's channel mode: the tree walking is 65 + // spawned onto a blocking thread, and we get chunks of rkey+blocks back 66 + let (mut rx, join) = driver.to_channel(512); 67 + while let Some(r) = rx.recv().await { 68 + let pairs = r?; 69 + 70 + // keep a count of the total number of blocks seen 71 + n += pairs.len(); 72 + 73 + for (_, block) in pairs { 74 + // for each block, count how many bytes are equal to '0' 75 + // (this is just an example, you probably want to do something more 76 + // interesting) 77 + zeros += block.into_iter().filter(|&b| b == b'0').count() 78 + } 79 + } 80 + 81 + log::info!("arrived! joining rx..."); 82 + 83 + // clean up the database. would be nice to do this in drop so it happens 84 + // automatically, but some blocking work happens, so that's not allowed in 85 + // async rust. 🤷‍♀️ 86 + join.await?.reset_store().await?; 87 + 88 + log::info!("done. n={n} zeros={zeros}"); 89 + 90 + Ok(()) 91 + }
+18 -25
examples/read-file/main.rs
··· 1 + /*! 2 + Read a CAR file with in-memory processing 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 - use futures::TryStreamExt; 4 - use iroh_car::CarReader; 5 - use std::convert::Infallible; 7 + use repo_stream::{Driver, DriverBuilder}; 6 8 use std::path::PathBuf; 7 9 8 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 21 23 let reader = tokio::fs::File::open(file).await?; 22 24 let reader = tokio::io::BufReader::new(reader); 23 25 24 - println!("hello!"); 25 - 26 - let reader = CarReader::new(reader).await?; 27 - 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root")? 33 - .clone(); 34 - log::debug!("root: {root:?}"); 35 - 36 - // let stream = Box::pin(reader.stream()); 37 - let stream = std::pin::pin!(reader.stream()); 38 - 39 - let (commit, v) = 40 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 41 - .await?; 42 - let mut record_stream = std::pin::pin!(v.stream()); 26 + let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len()) 28 + .load_car(reader) 29 + .await? 30 + { 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 + }; 43 34 44 35 log::info!("got commit: {commit:?}"); 45 36 46 - while let Some((rkey, _rec)) = record_stream.try_next().await? { 47 - log::info!("got {rkey:?}"); 37 + let mut n = 0; 38 + while let Some(pairs) = driver.next_chunk(256).await? { 39 + n += pairs.len(); 40 + // log::info!("got {rkey:?}"); 48 41 } 49 - log::info!("bye!"); 42 + log::info!("bye! total records={n}"); 50 43 51 44 Ok(()) 52 45 }
+12 -2
readme.md
··· 1 1 # repo-stream 2 2 3 - Fast and (aspirationally) robust atproto CAR file processing in rust 3 + Efficient and robust atproto CAR file processing in rust 4 + 5 + todo 6 + 7 + - [ ] get an *emtpy* car for the test suite 8 + - [ ] implement a max size on disk limit 9 + 10 + 11 + ----- 12 + 13 + older stuff (to clean up): 4 14 5 15 6 16 current car processing times (records processed into their length usize, phil's dev machine): ··· 27 37 -> yeah the commit is returned from init 28 38 - [ ] spec compliance todos 29 39 - [x] assert that keys are ordered and fail if not 30 - - [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 40 + - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 31 41 - [ ] performance todos 32 42 - [x] consume the serialized nodes into a mutable efficient format 33 43 - [ ] maybe customize the deserialize impl to do that directly?
+220
src/disk.rs
··· 1 + /*! 2 + Disk storage for blocks on disk 3 + 4 + Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5 + to be the best behaved in terms of both on-disk space usage and memory usage. 6 + 7 + ```no_run 8 + # use repo_stream::{DiskBuilder, DiskError}; 9 + # #[tokio::main] 10 + # async fn main() -> Result<(), DiskError> { 11 + let store = DiskBuilder::new() 12 + .with_cache_size_mb(32) 13 + .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 + .open("/some/path.db".into()).await?; 15 + # Ok(()) 16 + # } 17 + ``` 18 + */ 19 + 20 + use crate::drive::DriveError; 21 + use rusqlite::OptionalExtension; 22 + use std::path::PathBuf; 23 + 24 + #[derive(Debug, thiserror::Error)] 25 + pub enum DiskError { 26 + /// A wrapped database error 27 + /// 28 + /// (The wrapped err should probably be obscured to remove public-facing 29 + /// sqlite bits) 30 + #[error(transparent)] 31 + DbError(#[from] rusqlite::Error), 32 + /// A tokio blocking task failed to join 33 + #[error("Failed to join a tokio blocking task: {0}")] 34 + JoinError(#[from] tokio::task::JoinError), 35 + /// The total size of stored blocks exceeded the allowed size 36 + /// 37 + /// If you need to process *really* big CARs, you can configure a higher 38 + /// limit. 39 + #[error("Maximum disk size reached")] 40 + MaxSizeExceeded, 41 + #[error("this error was replaced, seeing this is a bug.")] 42 + #[doc(hidden)] 43 + Stolen, 44 + } 45 + 46 + impl DiskError { 47 + /// hack for ownership challenges with the disk driver 48 + pub(crate) fn steal(&mut self) -> Self { 49 + let mut swapped = DiskError::Stolen; 50 + std::mem::swap(self, &mut swapped); 51 + swapped 52 + } 53 + } 54 + 55 + /// Builder-style disk store setup 56 + pub struct DiskBuilder { 57 + /// Database in-memory cache allowance 58 + /// 59 + /// Default: 32 MiB 60 + pub cache_size_mb: usize, 61 + /// Database stored block size limit 62 + /// 63 + /// Default: 10 GiB 64 + /// 65 + /// Note: actual size on disk may be more, but should approximately scale 66 + /// with this limit 67 + pub max_stored_mb: usize, 68 + } 69 + 70 + impl Default for DiskBuilder { 71 + fn default() -> Self { 72 + Self { 73 + cache_size_mb: 32, 74 + max_stored_mb: 10 * 1024, // 10 GiB 75 + } 76 + } 77 + } 78 + 79 + impl DiskBuilder { 80 + /// Begin configuring the storage with defaults 81 + pub fn new() -> Self { 82 + Default::default() 83 + } 84 + /// Set the in-memory cache allowance for the database 85 + /// 86 + /// Default: 32 MiB 87 + pub fn with_cache_size_mb(mut self, size: usize) -> Self { 88 + self.cache_size_mb = size; 89 + self 90 + } 91 + /// Set the approximate stored block size limit 92 + /// 93 + /// Default: 10 GiB 94 + pub fn with_max_stored_mb(mut self, max: usize) -> Self { 95 + self.max_stored_mb = max; 96 + self 97 + } 98 + /// Open and initialize the actual disk storage 99 + pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 + DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 + } 102 + } 103 + 104 + /// On-disk block storage 105 + pub struct DiskStore { 106 + conn: rusqlite::Connection, 107 + max_stored: usize, 108 + stored: usize, 109 + } 110 + 111 + impl DiskStore { 112 + /// Initialize a new disk store 113 + pub async fn new( 114 + path: PathBuf, 115 + cache_mb: usize, 116 + max_stored_mb: usize, 117 + ) -> Result<Self, DiskError> { 118 + let max_stored = max_stored_mb * 2_usize.pow(20); 119 + let conn = tokio::task::spawn_blocking(move || { 120 + let conn = rusqlite::Connection::open(path)?; 121 + 122 + let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 123 + 124 + // conn.pragma_update(None, "journal_mode", "OFF")?; 125 + // conn.pragma_update(None, "journal_mode", "MEMORY")?; 126 + conn.pragma_update(None, "journal_mode", "WAL")?; 127 + // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 128 + conn.pragma_update(None, "synchronous", "OFF")?; 129 + conn.pragma_update( 130 + None, 131 + "cache_size", 132 + (cache_mb as i64 * sqlite_one_mb).to_string(), 133 + )?; 134 + Self::reset_tables(&conn)?; 135 + 136 + Ok::<_, DiskError>(conn) 137 + }) 138 + .await??; 139 + 140 + Ok(Self { 141 + conn, 142 + max_stored, 143 + stored: 0, 144 + }) 145 + } 146 + pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 147 + let tx = self.conn.transaction()?; 148 + Ok(SqliteWriter { 149 + tx, 150 + stored: &mut self.stored, 151 + max: self.max_stored, 152 + }) 153 + } 154 + pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 155 + let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 156 + Ok(SqliteReader { select_stmt }) 157 + } 158 + /// Drop and recreate the kv table 159 + pub async fn reset(self) -> Result<Self, DiskError> { 160 + tokio::task::spawn_blocking(move || { 161 + Self::reset_tables(&self.conn)?; 162 + Ok(self) 163 + }) 164 + .await? 165 + } 166 + fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 167 + conn.execute("DROP TABLE IF EXISTS blocks", ())?; 168 + conn.execute( 169 + "CREATE TABLE blocks ( 170 + key BLOB PRIMARY KEY NOT NULL, 171 + val BLOB NOT NULL 172 + ) WITHOUT ROWID", 173 + (), 174 + )?; 175 + Ok(()) 176 + } 177 + } 178 + 179 + pub(crate) struct SqliteWriter<'conn> { 180 + tx: rusqlite::Transaction<'conn>, 181 + stored: &'conn mut usize, 182 + max: usize, 183 + } 184 + 185 + impl SqliteWriter<'_> { 186 + pub(crate) fn put_many( 187 + &mut self, 188 + kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 189 + ) -> Result<(), DriveError> { 190 + let mut insert_stmt = self 191 + .tx 192 + .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 193 + .map_err(DiskError::DbError)?; 194 + for pair in kv { 195 + let (k, v) = pair?; 196 + *self.stored += v.len(); 197 + if *self.stored > self.max { 198 + return Err(DiskError::MaxSizeExceeded.into()); 199 + } 200 + insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 201 + } 202 + Ok(()) 203 + } 204 + pub fn commit(self) -> Result<(), DiskError> { 205 + self.tx.commit()?; 206 + Ok(()) 207 + } 208 + } 209 + 210 + pub(crate) struct SqliteReader<'conn> { 211 + select_stmt: rusqlite::Statement<'conn>, 212 + } 213 + 214 + impl SqliteReader<'_> { 215 + pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 216 + self.select_stmt 217 + .query_one((&key,), |row| row.get(0)) 218 + .optional() 219 + } 220 + }
+554 -109
src/drive.rs
··· 1 - //! Consume an MST block stream, producing an ordered stream of records 1 + //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use futures::{Stream, TryStreamExt}; 3 + use crate::disk::{DiskError, DiskStore}; 4 + use crate::process::Processable; 4 5 use ipld_core::cid::Cid; 6 + use iroh_car::CarReader; 7 + use serde::{Deserialize, Serialize}; 5 8 use std::collections::HashMap; 6 - use std::error::Error; 9 + use std::convert::Infallible; 10 + use tokio::{io::AsyncRead, sync::mpsc}; 7 11 8 12 use crate::mst::{Commit, Node}; 9 - use crate::walk::{Step, Trip, Walker}; 13 + use crate::walk::{Step, WalkError, Walker}; 10 14 11 15 /// Errors that can happen while consuming and emitting blocks and records 12 16 #[derive(Debug, thiserror::Error)] 13 - pub enum DriveError<E: Error> { 14 - #[error("Failed to initialize CarReader: {0}")] 17 + pub enum DriveError { 18 + #[error("Error from iroh_car: {0}")] 15 19 CarReader(#[from] iroh_car::Error), 16 - #[error("Car block stream error: {0}")] 17 - CarBlockError(Box<dyn Error>), 18 20 #[error("Failed to decode commit block: {0}")] 19 - BadCommit(Box<dyn Error>), 21 + BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 20 22 #[error("The Commit block reference by the root was not found")] 21 23 MissingCommit, 22 24 #[error("The MST block {0} could not be found")] 23 25 MissingBlock(Cid), 24 26 #[error("Failed to walk the mst tree: {0}")] 25 - Tripped(#[from] Trip<E>), 27 + WalkError(#[from] WalkError), 28 + #[error("CAR file had no roots")] 29 + MissingRoot, 30 + #[error("Storage error")] 31 + StorageError(#[from] DiskError), 32 + #[error("Encode error: {0}")] 33 + BincodeEncodeError(#[from] bincode::error::EncodeError), 34 + #[error("Tried to send on a closed channel")] 35 + ChannelSendError, // SendError takes <T> which we don't need 36 + #[error("Failed to join a task: {0}")] 37 + JoinError(#[from] tokio::task::JoinError), 26 38 } 27 39 28 - type CarBlock<E> = Result<(Cid, Vec<u8>), E>; 40 + #[derive(Debug, thiserror::Error)] 41 + pub enum DecodeError { 42 + #[error(transparent)] 43 + BincodeDecodeError(#[from] bincode::error::DecodeError), 44 + #[error("extra bytes remained after decoding")] 45 + ExtraGarbage, 46 + } 47 + 48 + /// An in-order chunk of Rkey + (processed) Block pairs 49 + pub type BlockChunk<T> = Vec<(String, T)>; 29 50 30 - #[derive(Debug)] 31 - pub enum MaybeProcessedBlock<T, E> { 51 + #[derive(Debug, Clone, Serialize, Deserialize)] 52 + pub(crate) enum MaybeProcessedBlock<T> { 32 53 /// A block that's *probably* a Node (but we can't know yet) 33 54 /// 34 55 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 50 71 /// There's an alternative here, which would be to kick unprocessable blocks 51 72 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 52 73 /// surface the typed error later if needed by trying to reprocess. 53 - Processed(Result<T, E>), 74 + Processed(T), 54 75 } 55 76 56 - /// The core driver between the block stream and MST walker 57 - pub struct Vehicle<SE, S, T, P, PE> 58 - where 59 - S: Stream<Item = CarBlock<SE>>, 60 - P: Fn(&[u8]) -> Result<T, PE>, 61 - PE: Error, 62 - { 63 - block_stream: S, 64 - blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>, 65 - walker: Walker, 66 - process: P, 77 + impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 + /// TODO this is probably a little broken 79 + fn get_size(&self) -> usize { 80 + use std::{cmp::max, mem::size_of}; 81 + 82 + // enum is always as big as its biggest member? 83 + let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 + 85 + let extra = match self { 86 + Self::Raw(bytes) => bytes.len(), 87 + Self::Processed(t) => t.get_size(), 88 + }; 89 + 90 + base_size + extra 91 + } 67 92 } 68 93 69 - impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE> 70 - where 71 - SE: Error + 'static, 72 - S: Stream<Item = CarBlock<SE>> + Unpin, 73 - P: Fn(&[u8]) -> Result<T, PE>, 74 - PE: Error, 75 - { 76 - /// Set up the stream 94 + impl<T> MaybeProcessedBlock<T> { 95 + fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 + if Node::could_be(&data) { 97 + MaybeProcessedBlock::Raw(data) 98 + } else { 99 + MaybeProcessedBlock::Processed(process(data)) 100 + } 101 + } 102 + } 103 + 104 + /// Read a CAR file, buffering blocks in memory or to disk 105 + pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 + /// All blocks fit within the memory limit 77 107 /// 78 - /// This will eagerly consume blocks until the `Commit` object is found. 79 - /// *Usually* the it's the first block, but there is no guarantee. 108 + /// You probably want to check the commit's signature. You can go ahead and 109 + /// walk the MST right away. 110 + Memory(Commit, MemDriver<T>), 111 + /// Blocks exceed the memory limit 80 112 /// 81 - /// ### Parameters 113 + /// You'll need to provide a disk storage to continue. The commit will be 114 + /// returned and can be validated only once all blocks are loaded. 115 + Disk(NeedDisk<R, T>), 116 + } 117 + 118 + /// Builder-style driver setup 119 + pub struct DriverBuilder { 120 + pub mem_limit_mb: usize, 121 + } 122 + 123 + impl Default for DriverBuilder { 124 + fn default() -> Self { 125 + Self { mem_limit_mb: 16 } 126 + } 127 + } 128 + 129 + impl DriverBuilder { 130 + /// Begin configuring the driver with defaults 131 + pub fn new() -> Self { 132 + Default::default() 133 + } 134 + /// Set the in-memory size limit, in MiB 82 135 /// 83 - /// `root`: CID of the commit object that is the root of the MST 136 + /// Default: 16 MiB 137 + pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 138 + Self { 139 + mem_limit_mb: new_limit, 140 + } 141 + } 142 + /// Set the block processor 84 143 /// 85 - /// `block_stream`: Input stream of raw CAR blocks 144 + /// Default: noop, raw blocks will be emitted 145 + pub fn with_block_processor<T: Processable>( 146 + self, 147 + p: fn(Vec<u8>) -> T, 148 + ) -> DriverBuilderWithProcessor<T> { 149 + DriverBuilderWithProcessor { 150 + mem_limit_mb: self.mem_limit_mb, 151 + block_processor: p, 152 + } 153 + } 154 + /// Begin processing an atproto MST from a CAR file 155 + pub async fn load_car<R: AsyncRead + Unpin>( 156 + self, 157 + reader: R, 158 + ) -> Result<Driver<R, Vec<u8>>, DriveError> { 159 + Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 160 + } 161 + } 162 + 163 + /// Builder-style driver intermediate step 164 + /// 165 + /// start from `DriverBuilder` 166 + pub struct DriverBuilderWithProcessor<T: Processable> { 167 + pub mem_limit_mb: usize, 168 + pub block_processor: fn(Vec<u8>) -> T, 169 + } 170 + 171 + impl<T: Processable> DriverBuilderWithProcessor<T> { 172 + /// Set the in-memory size limit, in MiB 86 173 /// 87 - /// `process`: record-transforming callback: 174 + /// Default: 16 MiB 175 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 176 + self.mem_limit_mb = new_limit; 177 + self 178 + } 179 + /// Begin processing an atproto MST from a CAR file 180 + pub async fn load_car<R: AsyncRead + Unpin>( 181 + self, 182 + reader: R, 183 + ) -> Result<Driver<R, T>, DriveError> { 184 + Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 185 + } 186 + } 187 + 188 + impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 189 + /// Begin processing an atproto MST from a CAR file 88 190 /// 89 - /// For tasks where records can be quickly processed into a *smaller* 90 - /// useful representation, you can do that eagerly as blocks come in by 91 - /// passing the processor as a callback here. This can reduce overall 92 - /// memory usage. 93 - pub async fn init( 94 - root: Cid, 95 - mut block_stream: S, 96 - process: P, 97 - ) -> Result<(Commit, Self), DriveError<PE>> { 98 - let mut blocks = HashMap::new(); 191 + /// Blocks will be loaded, processed, and buffered in memory. If the entire 192 + /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 193 + /// will be returned along with a `Commit` ready for validation. 194 + /// 195 + /// If the `mem_limit_mb` limit is reached before loading all blocks, the 196 + /// partial state will be returned as `Driver::Disk(needed)`, which can be 197 + /// resumed by providing a `SqliteStorage` for on-disk block storage. 198 + pub async fn load_car( 199 + reader: R, 200 + process: fn(Vec<u8>) -> T, 201 + mem_limit_mb: usize, 202 + ) -> Result<Driver<R, T>, DriveError> { 203 + let max_size = mem_limit_mb * 2_usize.pow(20); 204 + let mut mem_blocks = HashMap::new(); 205 + 206 + let mut car = CarReader::new(reader).await?; 207 + 208 + let root = *car 209 + .header() 210 + .roots() 211 + .first() 212 + .ok_or(DriveError::MissingRoot)?; 213 + log::debug!("root: {root:?}"); 99 214 100 215 let mut commit = None; 101 216 102 - while let Some((cid, data)) = block_stream 103 - .try_next() 104 - .await 105 - .map_err(|e| DriveError::CarBlockError(e.into()))? 106 - { 217 + // try to load all the blocks into memory 218 + let mut mem_size = 0; 219 + while let Some((cid, data)) = car.next_block().await? { 220 + // the root commit is a Special Third Kind of block that we need to make 221 + // sure not to optimistically send to the processing function 107 222 if cid == root { 108 - let c: Commit = serde_ipld_dagcbor::from_slice(&data) 109 - .map_err(|e| DriveError::BadCommit(e.into()))?; 223 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 110 224 commit = Some(c); 111 - break; 112 - } else { 113 - blocks.insert( 114 - cid, 115 - if Node::could_be(&data) { 116 - MaybeProcessedBlock::Raw(data) 117 - } else { 118 - MaybeProcessedBlock::Processed(process(&data)) 119 - }, 120 - ); 225 + continue; 226 + } 227 + 228 + // remaining possible types: node, record, other. optimistically process 229 + let maybe_processed = MaybeProcessedBlock::maybe(process, data); 230 + 231 + // stash (maybe processed) blocks in memory as long as we have room 232 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 233 + mem_blocks.insert(cid, maybe_processed); 234 + if mem_size >= max_size { 235 + return Ok(Driver::Disk(NeedDisk { 236 + car, 237 + root, 238 + process, 239 + max_size, 240 + mem_blocks, 241 + commit, 242 + })); 121 243 } 122 244 } 123 245 124 - // we either broke out or read all the blocks without finding the commit... 246 + // all blocks loaded and we fit in memory! hopefully we found the commit... 125 247 let commit = commit.ok_or(DriveError::MissingCommit)?; 126 248 127 249 let walker = Walker::new(commit.data); 128 250 129 - let me = Self { 130 - block_stream, 131 - blocks, 132 - walker, 133 - process, 134 - }; 135 - Ok((commit, me)) 251 + Ok(Driver::Memory( 252 + commit, 253 + MemDriver { 254 + blocks: mem_blocks, 255 + walker, 256 + process, 257 + }, 258 + )) 259 + } 260 + } 261 + 262 + /// The core driver between the block stream and MST walker 263 + /// 264 + /// In the future, PDSs will export CARs in a stream-friendly order that will 265 + /// enable processing them with tiny memory overhead. But that future is not 266 + /// here yet. 267 + /// 268 + /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 269 + /// optimistic stream features: we load all block first, then walk the MST. 270 + /// 271 + /// This makes things much simpler: we only need to worry about spilling to disk 272 + /// in one place, and we always have a reasonable expecatation about how much 273 + /// work the init function will do. We can drop the CAR reader before walking, 274 + /// so the sync/async boundaries become a little easier to work around. 275 + #[derive(Debug)] 276 + pub struct MemDriver<T: Processable> { 277 + blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 278 + walker: Walker, 279 + process: fn(Vec<u8>) -> T, 280 + } 281 + 282 + impl<T: Processable> MemDriver<T> { 283 + /// Step through the record outputs, in rkey order 284 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 285 + let mut out = Vec::with_capacity(n); 286 + for _ in 0..n { 287 + // walk as far as we can until we run out of blocks or find a record 288 + match self.walker.step(&mut self.blocks, self.process)? { 289 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 290 + Step::Finish => break, 291 + Step::Found { rkey, data } => { 292 + out.push((rkey, data)); 293 + continue; 294 + } 295 + }; 296 + } 297 + 298 + if out.is_empty() { 299 + Ok(None) 300 + } else { 301 + Ok(Some(out)) 302 + } 303 + } 304 + } 305 + 306 + /// A partially memory-loaded car file that needs disk spillover to continue 307 + pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 308 + car: CarReader<R>, 309 + root: Cid, 310 + process: fn(Vec<u8>) -> T, 311 + max_size: usize, 312 + mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 313 + pub commit: Option<Commit>, 314 + } 315 + 316 + fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 317 + bincode::serde::encode_to_vec(v, bincode::config::standard()) 318 + } 319 + 320 + pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 321 + let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 322 + if n != bytes.len() { 323 + return Err(DecodeError::ExtraGarbage); 136 324 } 325 + Ok(t) 326 + } 137 327 138 - async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> { 139 - while let Some((cid, data)) = self 140 - .block_stream 141 - .try_next() 142 - .await 143 - .map_err(|e| DriveError::CarBlockError(e.into()))? 144 - { 145 - self.blocks.insert( 146 - cid, 147 - if Node::could_be(&data) { 148 - MaybeProcessedBlock::Raw(data) 149 - } else { 150 - MaybeProcessedBlock::Processed((self.process)(&data)) 151 - }, 152 - ); 153 - if cid == cid_needed { 154 - return Ok(()); 328 + impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 329 + pub async fn finish_loading( 330 + mut self, 331 + mut store: DiskStore, 332 + ) -> Result<(Commit, DiskDriver<T>), DriveError> { 333 + // move store in and back out so we can manage lifetimes 334 + // dump mem blocks into the store 335 + store = tokio::task::spawn(async move { 336 + let mut writer = store.get_writer()?; 337 + 338 + let kvs = self 339 + .mem_blocks 340 + .into_iter() 341 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 342 + 343 + writer.put_many(kvs)?; 344 + writer.commit()?; 345 + Ok::<_, DriveError>(store) 346 + }) 347 + .await??; 348 + 349 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 350 + 351 + let store_worker = tokio::task::spawn_blocking(move || { 352 + let mut writer = store.get_writer()?; 353 + 354 + while let Some(chunk) = rx.blocking_recv() { 355 + let kvs = chunk 356 + .into_iter() 357 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 358 + writer.put_many(kvs)?; 155 359 } 360 + 361 + writer.commit()?; 362 + Ok::<_, DriveError>(store) 363 + }); // await later 364 + 365 + // dump the rest to disk (in chunks) 366 + log::debug!("dumping the rest of the stream..."); 367 + loop { 368 + let mut mem_size = 0; 369 + let mut chunk = vec![]; 370 + loop { 371 + let Some((cid, data)) = self.car.next_block().await? else { 372 + break; 373 + }; 374 + // we still gotta keep checking for the root since we might not have it 375 + if cid == self.root { 376 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 377 + self.commit = Some(c); 378 + continue; 379 + } 380 + // remaining possible types: node, record, other. optimistically process 381 + // TODO: get the actual in-memory size to compute disk spill 382 + let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 383 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 384 + chunk.push((cid, maybe_processed)); 385 + if mem_size >= self.max_size { 386 + // soooooo if we're setting the db cache to max_size and then letting 387 + // multiple chunks in the queue that are >= max_size, then at any time 388 + // we might be using some multiple of max_size? 389 + break; 390 + } 391 + } 392 + if chunk.is_empty() { 393 + break; 394 + } 395 + tx.send(chunk) 396 + .await 397 + .map_err(|_| DriveError::ChannelSendError)?; 156 398 } 399 + drop(tx); 400 + log::debug!("done. waiting for worker to finish..."); 157 401 158 - // if we never found the block 159 - Err(DriveError::MissingBlock(cid_needed)) 402 + store = store_worker.await??; 403 + 404 + log::debug!("worker finished."); 405 + 406 + let commit = self.commit.ok_or(DriveError::MissingCommit)?; 407 + 408 + let walker = Walker::new(commit.data); 409 + 410 + Ok(( 411 + commit, 412 + DiskDriver { 413 + process: self.process, 414 + state: Some(BigState { store, walker }), 415 + }, 416 + )) 417 + } 418 + } 419 + 420 + struct BigState { 421 + store: DiskStore, 422 + walker: Walker, 423 + } 424 + 425 + /// MST walker that reads from disk instead of an in-memory hashmap 426 + pub struct DiskDriver<T: Clone> { 427 + process: fn(Vec<u8>) -> T, 428 + state: Option<BigState>, 429 + } 430 + 431 + // for doctests only 432 + #[doc(hidden)] 433 + pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 434 + use crate::process::noop; 435 + DiskDriver { 436 + process: noop, 437 + state: None, 438 + } 439 + } 440 + 441 + impl<T: Processable + Send + 'static> DiskDriver<T> { 442 + /// Walk the MST returning up to `n` rkey + record pairs 443 + /// 444 + /// ```no_run 445 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 446 + /// # #[tokio::main] 447 + /// # async fn main() -> Result<(), DriveError> { 448 + /// # let mut disk_driver = _get_fake_disk_driver(); 449 + /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 450 + /// for (rkey, record) in pairs { 451 + /// println!("{rkey}: size={}", record.len()); 452 + /// } 453 + /// } 454 + /// let store = disk_driver.reset_store().await?; 455 + /// # Ok(()) 456 + /// # } 457 + /// ``` 458 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 459 + let process = self.process; 460 + 461 + // state should only *ever* be None transiently while inside here 462 + let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 463 + 464 + // the big pain here is that we don't want to leave self.state in an 465 + // invalid state (None), so all the error paths have to make sure it 466 + // comes out again. 467 + let (state, res) = tokio::task::spawn_blocking( 468 + move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 469 + let mut reader_res = state.store.get_reader(); 470 + let reader: &mut _ = match reader_res { 471 + Ok(ref mut r) => r, 472 + Err(ref mut e) => { 473 + // unfortunately we can't return the error directly because 474 + // (for some reason) it's attached to the lifetime of the 475 + // reader? 476 + // hack a mem::swap so we can get it out :/ 477 + let e_swapped = e.steal(); 478 + // the pain: `state` *has to* outlive the reader 479 + drop(reader_res); 480 + return (state, Err(e_swapped.into())); 481 + } 482 + }; 483 + 484 + let mut out = Vec::with_capacity(n); 485 + 486 + for _ in 0..n { 487 + // walk as far as we can until we run out of blocks or find a record 488 + let step = match state.walker.disk_step(reader, process) { 489 + Ok(s) => s, 490 + Err(e) => { 491 + // the pain: `state` *has to* outlive the reader 492 + drop(reader_res); 493 + return (state, Err(e.into())); 494 + } 495 + }; 496 + match step { 497 + Step::Missing(cid) => { 498 + // the pain: `state` *has to* outlive the reader 499 + drop(reader_res); 500 + return (state, Err(DriveError::MissingBlock(cid))); 501 + } 502 + Step::Finish => break, 503 + Step::Found { rkey, data } => out.push((rkey, data)), 504 + }; 505 + } 506 + 507 + // `state` *has to* outlive the reader 508 + drop(reader_res); 509 + 510 + (state, Ok::<_, DriveError>(out)) 511 + }, 512 + ) 513 + .await?; // on tokio JoinError, we'll be left with invalid state :( 514 + 515 + // *must* restore state before dealing with the actual result 516 + self.state = Some(state); 517 + 518 + let out = res?; 519 + 520 + if out.is_empty() { 521 + Ok(None) 522 + } else { 523 + Ok(Some(out)) 524 + } 160 525 } 161 526 162 - /// Manually step through the record outputs 163 - pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> { 527 + fn read_tx_blocking( 528 + &mut self, 529 + n: usize, 530 + tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 531 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 532 + let BigState { store, walker } = self.state.as_mut().expect("valid state"); 533 + let mut reader = match store.get_reader() { 534 + Ok(r) => r, 535 + Err(e) => return tx.blocking_send(Err(e.into())), 536 + }; 537 + 164 538 loop { 165 - // walk as far as we can until we run out of blocks or find a record 166 - let cid_needed = match self.walker.step(&mut self.blocks, &self.process)? { 167 - Step::Rest(cid) => cid, 168 - Step::Finish => return Ok(None), 169 - Step::Step { rkey, data } => return Ok(Some((rkey, data))), 170 - }; 539 + let mut out: BlockChunk<T> = Vec::with_capacity(n); 540 + 541 + for _ in 0..n { 542 + // walk as far as we can until we run out of blocks or find a record 543 + 544 + let step = match walker.disk_step(&mut reader, self.process) { 545 + Ok(s) => s, 546 + Err(e) => return tx.blocking_send(Err(e.into())), 547 + }; 548 + 549 + match step { 550 + Step::Missing(cid) => { 551 + return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 552 + } 553 + Step::Finish => return Ok(()), 554 + Step::Found { rkey, data } => { 555 + out.push((rkey, data)); 556 + continue; 557 + } 558 + }; 559 + } 171 560 172 - // load blocks until we reach that cid 173 - self.drive_until(cid_needed).await?; 561 + if out.is_empty() { 562 + break; 563 + } 564 + tx.blocking_send(Ok(out))?; 174 565 } 566 + 567 + Ok(()) 175 568 } 176 569 177 - /// Convert to a futures::stream of record outputs 178 - pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> { 179 - futures::stream::try_unfold(self, |mut this| async move { 180 - let maybe_record = this.next_record().await?; 181 - Ok(maybe_record.map(|b| (b, this))) 182 - }) 570 + /// Spawn the disk reading task into a tokio blocking thread 571 + /// 572 + /// The idea is to avoid so much sending back and forth to the blocking 573 + /// thread, letting a blocking task do all the disk reading work and sending 574 + /// records and rkeys back through an `mpsc` channel instead. 575 + /// 576 + /// This might also allow the disk work to continue while processing the 577 + /// records. It's still not yet clear if this method actually has much 578 + /// benefit over just using `.next_chunk(n)`. 579 + /// 580 + /// ```no_run 581 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 582 + /// # #[tokio::main] 583 + /// # async fn main() -> Result<(), DriveError> { 584 + /// # let mut disk_driver = _get_fake_disk_driver(); 585 + /// let (mut rx, join) = disk_driver.to_channel(512); 586 + /// while let Some(recvd) = rx.recv().await { 587 + /// let pairs = recvd?; 588 + /// for (rkey, record) in pairs { 589 + /// println!("{rkey}: size={}", record.len()); 590 + /// } 591 + /// 592 + /// } 593 + /// let store = join.await?.reset_store().await?; 594 + /// # Ok(()) 595 + /// # } 596 + /// ``` 597 + pub fn to_channel( 598 + mut self, 599 + n: usize, 600 + ) -> ( 601 + mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 602 + tokio::task::JoinHandle<Self>, 603 + ) { 604 + let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 605 + 606 + // sketch: this worker is going to be allowed to execute without a join handle 607 + let chan_task = tokio::task::spawn_blocking(move || { 608 + if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 609 + log::debug!("big car reader exited early due to dropped receiver channel"); 610 + } 611 + self 612 + }); 613 + 614 + (rx, chan_task) 615 + } 616 + 617 + /// Reset the disk storage so it can be reused. You must call this. 618 + /// 619 + /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 620 + /// calls, that would be risky in an async context. For now you just have to 621 + /// carefully make sure you call it. 622 + /// 623 + /// The sqlite store is returned, so it can be reused for another 624 + /// `DiskDriver`. 625 + pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 626 + let BigState { store, .. } = self.state.take().expect("valid state"); 627 + Ok(store.reset().await?) 183 628 } 184 629 }
+85 -5
src/lib.rs
··· 1 - //! Fast and robust atproto CAR file processing in rust 2 - //! 3 - //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 1 + /*! 2 + A robust CAR file -> MST walker for atproto 3 + 4 + Small CARs have their blocks buffered in memory. If a configurable memory limit 5 + is reached while reading blocks, CAR reading is suspended, and can be continued 6 + by providing disk storage to buffer the CAR blocks instead. 7 + 8 + A `process` function can be provided for tasks where records are transformed 9 + into a smaller representation, to save memory (and disk) during block reading. 10 + 11 + Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12 + `(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 + 14 + Some MST validations are applied 15 + - Keys must appear in order 16 + - Keys must be at the correct MST tree depth 17 + 18 + `iroh_car` additionally applies a block size limit of `2MiB`. 19 + 20 + ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 + 23 + # #[tokio::main] 24 + # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 + # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 + let mut total_size = 0; 4 27 5 - pub mod drive; 28 + match DriverBuilder::new() 29 + .with_mem_limit_mb(10) 30 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 + .load_car(reader) 32 + .await? 33 + { 34 + 35 + // if all blocks fit within memory 36 + Driver::Memory(_commit, mut driver) => { 37 + while let Some(chunk) = driver.next_chunk(256).await? { 38 + for (_rkey, size) in chunk { 39 + total_size += size; 40 + } 41 + } 42 + }, 43 + 44 + // if the CAR was too big for in-memory processing 45 + Driver::Disk(paused) => { 46 + // set up a disk store we can spill to 47 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 + // do the spilling, get back a (similar) driver 49 + let (_commit, mut driver) = paused.finish_loading(store).await?; 50 + 51 + while let Some(chunk) = driver.next_chunk(256).await? { 52 + for (_rkey, size) in chunk { 53 + total_size += size; 54 + } 55 + } 56 + 57 + // clean up the disk store (drop tables etc) 58 + driver.reset_store().await?; 59 + } 60 + }; 61 + println!("sum of size of all records: {total_size}"); 62 + # Ok(()) 63 + # } 64 + ``` 65 + 66 + Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 67 + ahead and eagerly using disk I/O. This means you have to write a bit more code 68 + to handle both cases, but it allows you to have finer control over resource 69 + usage. For example, you can drive a number of parallel memory CAR workers, and 70 + separately have a different number of disk workers picking up suspended disk 71 + tasks from a queue. 72 + 73 + Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 74 + 75 + */ 76 + 6 77 pub mod mst; 7 - pub mod walk; 78 + mod walk; 79 + 80 + pub mod disk; 81 + pub mod drive; 82 + pub mod process; 83 + 84 + pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 + pub use drive::{DriveError, Driver, DriverBuilder}; 86 + pub use mst::Commit; 87 + pub use process::Processable;
+4 -8
src/mst.rs
··· 39 39 /// MST node data schema 40 40 #[derive(Debug, Deserialize, PartialEq)] 41 41 #[serde(deny_unknown_fields)] 42 - pub struct Node { 42 + pub(crate) struct Node { 43 43 /// link to sub-tree Node on a lower level and with all keys sorting before 44 44 /// keys at this node 45 45 #[serde(rename = "l")] ··· 62 62 /// so if a block *could be* a node, any record converter must postpone 63 63 /// processing. if it turns out it happens to be a very node-looking record, 64 64 /// well, sorry, it just has to only be processed later when that's known. 65 - pub fn could_be(bytes: impl AsRef<[u8]>) -> bool { 65 + pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 66 const NODE_FINGERPRINT: [u8; 3] = [ 67 67 0xA2, // map length 2 (for "l" and "e" keys) 68 68 0x61, // text length 1 ··· 83 83 /// with an empty array of entries. This is the only situation in which a 84 84 /// tree may contain an empty leaf node which does not either contain keys 85 85 /// ("entries") or point to a sub-tree containing entries. 86 - /// 87 - /// TODO: to me this is slightly unclear with respect to `l` (ask someone). 88 - /// ...is that what "The top of the tree must not be a an empty node which 89 - /// only points to a sub-tree." is referring to? 90 - pub fn is_empty(&self) -> bool { 86 + pub(crate) fn is_empty(&self) -> bool { 91 87 self.left.is_none() && self.entries.is_empty() 92 88 } 93 89 } ··· 95 91 /// TreeEntry object 96 92 #[derive(Debug, Deserialize, PartialEq)] 97 93 #[serde(deny_unknown_fields)] 98 - pub struct Entry { 94 + pub(crate) struct Entry { 99 95 /// count of bytes shared with previous TreeEntry in this Node (if any) 100 96 #[serde(rename = "p")] 101 97 pub prefix_len: usize,
+87
src/process.rs
··· 1 + /*! 2 + Record processor function output trait 3 + 4 + The return type must satisfy the `Processable` trait, which requires: 5 + 6 + - `Clone` because two rkeys can refer to the same record by CID, which may 7 + only appear once in the CAR file. 8 + - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 + 10 + One required function must be implemented, `get_size()`: this should return the 11 + approximate total off-stack size of the type. (the on-stack size will be added 12 + automatically via `std::mem::get_size`). 13 + 14 + Note that it is **not guaranteed** that the `process` function will run on a 15 + block before storing it in memory or on disk: it's not possible to know if a 16 + block is a record without actually walking the MST, so the best we can do is 17 + apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 + store the raw block bytes. 19 + 20 + Here's a silly processing function that just collects 'eyy's found in the raw 21 + record bytes 22 + 23 + ``` 24 + # use repo_stream::Processable; 25 + # use serde::{Serialize, Deserialize}; 26 + #[derive(Debug, Clone, Serialize, Deserialize)] 27 + struct Eyy(usize, String); 28 + 29 + impl Processable for Eyy { 30 + fn get_size(&self) -> usize { 31 + // don't need to compute the usize, it's on the stack 32 + self.1.capacity() // in-mem size from the string's capacity, in bytes 33 + } 34 + } 35 + 36 + fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 + let mut out = Vec::new(); 38 + let to_find = "eyy".as_bytes(); 39 + for i in 0..(raw.len() - 3) { 40 + if &raw[i..(i+3)] == to_find { 41 + out.push(Eyy(i, "eyy".to_string())); 42 + } 43 + } 44 + out 45 + } 46 + ``` 47 + 48 + The memory sizing stuff is a little sketch but probably at least approximately 49 + works. 50 + */ 51 + 52 + use serde::{Serialize, de::DeserializeOwned}; 53 + 54 + /// Output trait for record processing 55 + pub trait Processable: Clone + Serialize + DeserializeOwned { 56 + /// Any additional in-memory size taken by the processed type 57 + /// 58 + /// Do not include stack size (`std::mem::size_of`) 59 + fn get_size(&self) -> usize; 60 + } 61 + 62 + /// Processor that just returns the raw blocks 63 + #[inline] 64 + pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 + block 66 + } 67 + 68 + impl Processable for u8 { 69 + fn get_size(&self) -> usize { 70 + 0 71 + } 72 + } 73 + 74 + impl Processable for usize { 75 + fn get_size(&self) -> usize { 76 + 0 // no additional space taken, just its stack size (newtype is free) 77 + } 78 + } 79 + 80 + impl<Item: Sized + Processable> Processable for Vec<Item> { 81 + fn get_size(&self) -> usize { 82 + let slot_size = std::mem::size_of::<Item>(); 83 + let direct_size = slot_size * self.capacity(); 84 + let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 85 + direct_size + items_referenced_size 86 + } 87 + }
+257 -259
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::drive::MaybeProcessedBlock; 3 + use crate::disk::SqliteReader; 4 + use crate::drive::{DecodeError, MaybeProcessedBlock}; 4 5 use crate::mst::Node; 6 + use crate::process::Processable; 5 7 use ipld_core::cid::Cid; 8 + use sha2::{Digest, Sha256}; 6 9 use std::collections::HashMap; 7 - use std::error::Error; 10 + use std::convert::Infallible; 8 11 9 12 /// Errors that can happen while walking 10 13 #[derive(Debug, thiserror::Error)] 11 - pub enum Trip<E: Error> { 12 - #[error("empty mst nodes are not allowed")] 13 - NodeEmpty, 14 + pub enum WalkError { 15 + #[error("Failed to fingerprint commit block")] 16 + BadCommitFingerprint, 14 17 #[error("Failed to decode commit block: {0}")] 15 - BadCommit(Box<dyn std::error::Error>), 18 + BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 16 19 #[error("Action node error: {0}")] 17 - RkeyError(#[from] RkeyError), 18 - #[error("Process failed: {0}")] 19 - ProcessFailed(E), 20 - #[error("Encountered an rkey out of order while walking the MST")] 21 - RkeyOutOfOrder, 20 + MstError(#[from] MstError), 21 + #[error("storage error: {0}")] 22 + StorageError(#[from] rusqlite::Error), 23 + #[error("Decode error: {0}")] 24 + DecodeError(#[from] DecodeError), 22 25 } 23 26 24 27 /// Errors from invalid Rkeys 25 - #[derive(Debug, thiserror::Error)] 26 - pub enum RkeyError { 28 + #[derive(Debug, PartialEq, thiserror::Error)] 29 + pub enum MstError { 27 30 #[error("Failed to compute an rkey due to invalid prefix_len")] 28 31 EntryPrefixOutOfbounds, 29 32 #[error("RKey was not utf-8")] 30 33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 + #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 + EmptyNode, 36 + #[error("Found an entry with rkey at the wrong depth")] 37 + WrongDepth, 38 + #[error("Lost track of our depth (possible bug?)")] 39 + LostDepth, 40 + #[error("MST depth underflow: depth-0 node with child trees")] 41 + DepthUnderflow, 42 + #[error("Encountered an rkey out of order while walking the MST")] 43 + RkeyOutOfOrder, 31 44 } 32 45 33 46 /// Walker outputs 34 47 #[derive(Debug)] 35 48 pub enum Step<T> { 36 - /// We need a CID but it's not in the block store 37 - /// 38 - /// Give the needed CID to the driver so it can load blocks until it's found 39 - Rest(Cid), 49 + /// We needed this CID but it's not in the block store 50 + Missing(Cid), 40 51 /// Reached the end of the MST! yay! 41 52 Finish, 42 53 /// A record was found! 43 - Step { rkey: String, data: T }, 54 + Found { rkey: String, data: T }, 44 55 } 45 56 46 57 #[derive(Debug, Clone, PartialEq)] 47 58 enum Need { 48 - Node(Cid), 59 + Node { depth: Depth, cid: Cid }, 49 60 Record { rkey: String, cid: Cid }, 50 61 } 51 62 52 - fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> { 53 - let mut entries = Vec::with_capacity(node.entries.len()); 63 + #[derive(Debug, Clone, Copy, PartialEq)] 64 + enum Depth { 65 + Root, 66 + Depth(u32), 67 + } 68 + 69 + impl Depth { 70 + fn from_key(key: &[u8]) -> Self { 71 + let mut zeros = 0; 72 + for byte in Sha256::digest(key) { 73 + let leading = byte.leading_zeros(); 74 + zeros += leading; 75 + if leading < 8 { 76 + break; 77 + } 78 + } 79 + Self::Depth(zeros / 2) // truncating divide (rounds down) 80 + } 81 + fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 + match self { 83 + Self::Root => Ok(None), 84 + Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 + } 86 + } 87 + } 54 88 89 + fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 + // empty nodes are not allowed in the MST 91 + // ...except for a single one for empty MST, but we wouldn't be pushing that 92 + if node.is_empty() { 93 + return Err(MstError::EmptyNode); 94 + } 95 + 96 + let mut entries = Vec::with_capacity(node.entries.len()); 55 97 let mut prefix = vec![]; 98 + let mut this_depth = parent_depth.next_expected()?; 99 + 56 100 for entry in &node.entries { 57 101 let mut rkey = vec![]; 58 102 let pre_checked = prefix 59 103 .get(..entry.prefix_len) 60 - .ok_or(RkeyError::EntryPrefixOutOfbounds)?; 104 + .ok_or(MstError::EntryPrefixOutOfbounds)?; 61 105 rkey.extend_from_slice(pre_checked); 62 106 rkey.extend_from_slice(&entry.keysuffix); 107 + 108 + let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 109 + return Err(MstError::WrongDepth); 110 + }; 111 + 112 + // this_depth is `none` if we are the deepest child (directly below root) 113 + // in that case we accept whatever highest depth is claimed 114 + let expected_depth = match this_depth { 115 + Some(d) => d, 116 + None => { 117 + this_depth = Some(key_depth); 118 + key_depth 119 + } 120 + }; 121 + 122 + // all keys we find should be this depth 123 + if key_depth != expected_depth { 124 + return Err(MstError::DepthUnderflow); 125 + } 126 + 63 127 prefix = rkey.clone(); 64 128 65 129 entries.push(Need::Record { ··· 67 131 cid: entry.value, 68 132 }); 69 133 if let Some(ref tree) = entry.tree { 70 - entries.push(Need::Node(*tree)); 134 + entries.push(Need::Node { 135 + depth: Depth::Depth(key_depth), 136 + cid: *tree, 137 + }); 71 138 } 72 139 } 73 140 74 141 entries.reverse(); 75 142 stack.append(&mut entries); 76 143 144 + let d = this_depth.ok_or(MstError::LostDepth)?; 145 + 77 146 if let Some(tree) = node.left { 78 - stack.push(Need::Node(tree)); 147 + stack.push(Need::Node { 148 + depth: Depth::Depth(d), 149 + cid: tree, 150 + }); 79 151 } 80 152 Ok(()) 81 153 } ··· 92 164 impl Walker { 93 165 pub fn new(tree_root_cid: Cid) -> Self { 94 166 Self { 95 - stack: vec![Need::Node(tree_root_cid)], 167 + stack: vec![Need::Node { 168 + depth: Depth::Root, 169 + cid: tree_root_cid, 170 + }], 96 171 prev: "".to_string(), 97 172 } 98 173 } 99 174 100 175 /// Advance through nodes until we find a record or can't go further 101 - pub fn step<T: Clone, E: Error>( 176 + pub fn step<T: Processable>( 102 177 &mut self, 103 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>, 104 - process: impl Fn(&[u8]) -> Result<T, E>, 105 - ) -> Result<Step<T>, Trip<E>> { 178 + blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 179 + process: impl Fn(Vec<u8>) -> T, 180 + ) -> Result<Step<T>, WalkError> { 106 181 loop { 107 - let Some(mut need) = self.stack.last() else { 182 + let Some(need) = self.stack.last_mut() else { 108 183 log::trace!("tried to walk but we're actually done."); 109 184 return Ok(Step::Finish); 110 185 }; 111 186 112 - match &mut need { 113 - Need::Node(cid) => { 187 + match need { 188 + &mut Need::Node { depth, cid } => { 114 189 log::trace!("need node {cid:?}"); 115 - let Some(block) = blocks.remove(cid) else { 190 + let Some(block) = blocks.remove(&cid) else { 116 191 log::trace!("node not found, resting"); 117 - return Ok(Step::Rest(*cid)); 192 + return Ok(Step::Missing(cid)); 118 193 }; 119 194 120 195 let MaybeProcessedBlock::Raw(data) = block else { 121 - return Err(Trip::BadCommit("failed commit fingerprint".into())); 196 + return Err(WalkError::BadCommitFingerprint); 122 197 }; 123 198 let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 124 - .map_err(|e| Trip::BadCommit(e.into()))?; 199 + .map_err(WalkError::BadCommit)?; 125 200 126 201 // found node, make sure we remember 127 202 self.stack.pop(); 128 203 129 204 // queue up work on the found node next 130 - push_from_node(&mut self.stack, &node)?; 205 + push_from_node(&mut self.stack, &node, depth)?; 131 206 } 132 207 Need::Record { rkey, cid } => { 133 208 log::trace!("need record {cid:?}"); 209 + // note that we cannot *remove* a record block, sadly, since 210 + // there can be multiple rkeys pointing to the same cid. 134 211 let Some(data) = blocks.get_mut(cid) else { 212 + return Ok(Step::Missing(*cid)); 213 + }; 214 + let rkey = rkey.clone(); 215 + let data = match data { 216 + MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 217 + MaybeProcessedBlock::Processed(t) => t.clone(), 218 + }; 219 + 220 + // found node, make sure we remember 221 + self.stack.pop(); 222 + 223 + // rkeys *must* be in order or else the tree is invalid (or 224 + // we have a bug) 225 + if rkey <= self.prev { 226 + return Err(MstError::RkeyOutOfOrder)?; 227 + } 228 + self.prev = rkey.clone(); 229 + 230 + return Ok(Step::Found { rkey, data }); 231 + } 232 + } 233 + } 234 + } 235 + 236 + /// blocking!!!!!! 237 + pub fn disk_step<T: Processable>( 238 + &mut self, 239 + reader: &mut SqliteReader, 240 + process: impl Fn(Vec<u8>) -> T, 241 + ) -> Result<Step<T>, WalkError> { 242 + loop { 243 + let Some(need) = self.stack.last_mut() else { 244 + log::trace!("tried to walk but we're actually done."); 245 + return Ok(Step::Finish); 246 + }; 247 + 248 + match need { 249 + &mut Need::Node { depth, cid } => { 250 + let cid_bytes = cid.to_bytes(); 251 + log::trace!("need node {cid:?}"); 252 + let Some(block_bytes) = reader.get(cid_bytes)? else { 253 + log::trace!("node not found, resting"); 254 + return Ok(Step::Missing(cid)); 255 + }; 256 + 257 + let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 258 + 259 + let MaybeProcessedBlock::Raw(data) = block else { 260 + return Err(WalkError::BadCommitFingerprint); 261 + }; 262 + let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 263 + .map_err(WalkError::BadCommit)?; 264 + 265 + // found node, make sure we remember 266 + self.stack.pop(); 267 + 268 + // queue up work on the found node next 269 + push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 270 + } 271 + Need::Record { rkey, cid } => { 272 + log::trace!("need record {cid:?}"); 273 + let cid_bytes = cid.to_bytes(); 274 + let Some(data_bytes) = reader.get(cid_bytes)? else { 135 275 log::trace!("record block not found, resting"); 136 - return Ok(Step::Rest(*cid)); 276 + return Ok(Step::Missing(*cid)); 137 277 }; 278 + let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 138 279 let rkey = rkey.clone(); 139 280 let data = match data { 140 281 MaybeProcessedBlock::Raw(data) => process(data), 141 - MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()), 142 - bad => { 143 - // big hack to pull the error out -- this corrupts 144 - // a block, so we should not continue trying to work 145 - let mut steal = MaybeProcessedBlock::Raw(vec![]); 146 - std::mem::swap(&mut steal, bad); 147 - let MaybeProcessedBlock::Processed(Err(e)) = steal else { 148 - unreachable!(); 149 - }; 150 - return Err(Trip::ProcessFailed(e)); 151 - } 282 + MaybeProcessedBlock::Processed(t) => t.clone(), 152 283 }; 153 284 154 285 // found node, make sure we remember 155 286 self.stack.pop(); 156 287 157 288 log::trace!("emitting a block as a step. depth={}", self.stack.len()); 158 - let data = data.map_err(Trip::ProcessFailed)?; 159 289 160 290 // rkeys *must* be in order or else the tree is invalid (or 161 291 // we have a bug) 162 292 if rkey <= self.prev { 163 - return Err(Trip::RkeyOutOfOrder); 293 + return Err(MstError::RkeyOutOfOrder)?; 164 294 } 165 295 self.prev = rkey.clone(); 166 296 167 - return Ok(Step::Step { rkey, data }); 297 + return Ok(Step::Found { rkey, data }); 168 298 } 169 299 } 170 300 } ··· 174 304 #[cfg(test)] 175 305 mod test { 176 306 use super::*; 177 - // use crate::mst::Entry; 178 307 179 308 fn cid1() -> Cid { 180 309 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 181 310 .parse() 182 311 .unwrap() 183 312 } 184 - // fn cid2() -> Cid { 185 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 186 - // .parse() 187 - // .unwrap() 188 - // } 189 - // fn cid3() -> Cid { 190 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 191 - // .parse() 192 - // .unwrap() 193 - // } 194 - // fn cid4() -> Cid { 195 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 196 - // .parse() 197 - // .unwrap() 198 - // } 199 - // fn cid5() -> Cid { 200 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 201 - // .parse() 202 - // .unwrap() 203 - // } 204 - // fn cid6() -> Cid { 205 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 206 - // .parse() 207 - // .unwrap() 208 - // } 209 - // fn cid7() -> Cid { 210 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 211 - // .parse() 212 - // .unwrap() 213 - // } 214 - // fn cid8() -> Cid { 215 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 216 - // .parse() 217 - // .unwrap() 218 - // } 219 - // fn cid9() -> Cid { 220 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 221 - // .parse() 222 - // .unwrap() 223 - // } 313 + 314 + #[test] 315 + fn test_depth_spec_0() { 316 + let d = Depth::from_key(b"2653ae71"); 317 + assert_eq!(d, Depth::Depth(0)) 318 + } 319 + 320 + #[test] 321 + fn test_depth_spec_1() { 322 + let d = Depth::from_key(b"blue"); 323 + assert_eq!(d, Depth::Depth(1)) 324 + } 325 + 326 + #[test] 327 + fn test_depth_spec_4() { 328 + let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 329 + assert_eq!(d, Depth::Depth(4)) 330 + } 331 + 332 + #[test] 333 + fn test_depth_spec_8() { 334 + let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 335 + assert_eq!(d, Depth::Depth(8)) 336 + } 337 + 338 + #[test] 339 + fn test_depth_ietf_draft_0() { 340 + let d = Depth::from_key(b"key1"); 341 + assert_eq!(d, Depth::Depth(0)) 342 + } 343 + 344 + #[test] 345 + fn test_depth_ietf_draft_1() { 346 + let d = Depth::from_key(b"key7"); 347 + assert_eq!(d, Depth::Depth(1)) 348 + } 349 + 350 + #[test] 351 + fn test_depth_ietf_draft_4() { 352 + let d = Depth::from_key(b"key515"); 353 + assert_eq!(d, Depth::Depth(4)) 354 + } 355 + 356 + #[test] 357 + fn test_depth_interop() { 358 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 359 + for (k, expected) in [ 360 + ("", 0), 361 + ("asdf", 0), 362 + ("blue", 1), 363 + ("2653ae71", 0), 364 + ("88bfafc7", 2), 365 + ("2a92d355", 4), 366 + ("884976f5", 6), 367 + ("app.bsky.feed.post/454397e440ec", 4), 368 + ("app.bsky.feed.post/9adeb165882c", 8), 369 + ] { 370 + let d = Depth::from_key(k.as_bytes()); 371 + assert_eq!(d, Depth::Depth(expected), "key: {}", k); 372 + } 373 + } 224 374 225 375 #[test] 226 - fn test_next_from_node_empty() { 227 - let node = Node { 376 + fn test_push_empty_fails() { 377 + let empty_node = Node { 228 378 left: None, 229 379 entries: vec![], 230 380 }; 231 381 let mut stack = vec![]; 232 - push_from_node(&mut stack, &node).unwrap(); 233 - assert_eq!(stack.last(), None); 382 + let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 383 + assert_eq!(err, Err(MstError::EmptyNode)); 234 384 } 235 385 236 386 #[test] 237 - fn test_needs_from_node_just_left() { 387 + fn test_push_one_node() { 238 388 let node = Node { 239 389 left: Some(cid1()), 240 390 entries: vec![], 241 391 }; 242 392 let mut stack = vec![]; 243 - push_from_node(&mut stack, &node).unwrap(); 244 - assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref()); 393 + push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 394 + assert_eq!( 395 + stack.last(), 396 + Some(Need::Node { 397 + depth: Depth::Depth(3), 398 + cid: cid1() 399 + }) 400 + .as_ref() 401 + ); 245 402 } 246 - 247 - // #[test] 248 - // fn test_needs_from_node_just_one_record() { 249 - // let node = Node { 250 - // left: None, 251 - // entries: vec![Entry { 252 - // keysuffix: "asdf".into(), 253 - // prefix_len: 0, 254 - // value: cid1(), 255 - // tree: None, 256 - // }], 257 - // }; 258 - // assert_eq!( 259 - // needs_from_node(node).unwrap(), 260 - // vec![Need::Record { 261 - // rkey: "asdf".into(), 262 - // cid: cid1(), 263 - // },] 264 - // ); 265 - // } 266 - 267 - // #[test] 268 - // fn test_needs_from_node_two_records() { 269 - // let node = Node { 270 - // left: None, 271 - // entries: vec![ 272 - // Entry { 273 - // keysuffix: "asdf".into(), 274 - // prefix_len: 0, 275 - // value: cid1(), 276 - // tree: None, 277 - // }, 278 - // Entry { 279 - // keysuffix: "gh".into(), 280 - // prefix_len: 2, 281 - // value: cid2(), 282 - // tree: None, 283 - // }, 284 - // ], 285 - // }; 286 - // assert_eq!( 287 - // needs_from_node(node).unwrap(), 288 - // vec![ 289 - // Need::Record { 290 - // rkey: "asdf".into(), 291 - // cid: cid1(), 292 - // }, 293 - // Need::Record { 294 - // rkey: "asgh".into(), 295 - // cid: cid2(), 296 - // }, 297 - // ] 298 - // ); 299 - // } 300 - 301 - // #[test] 302 - // fn test_needs_from_node_with_both() { 303 - // let node = Node { 304 - // left: None, 305 - // entries: vec![Entry { 306 - // keysuffix: "asdf".into(), 307 - // prefix_len: 0, 308 - // value: cid1(), 309 - // tree: Some(cid2()), 310 - // }], 311 - // }; 312 - // assert_eq!( 313 - // needs_from_node(node).unwrap(), 314 - // vec![ 315 - // Need::Record { 316 - // rkey: "asdf".into(), 317 - // cid: cid1(), 318 - // }, 319 - // Need::Node(cid2()), 320 - // ] 321 - // ); 322 - // } 323 - 324 - // #[test] 325 - // fn test_needs_from_node_left_and_record() { 326 - // let node = Node { 327 - // left: Some(cid1()), 328 - // entries: vec![Entry { 329 - // keysuffix: "asdf".into(), 330 - // prefix_len: 0, 331 - // value: cid2(), 332 - // tree: None, 333 - // }], 334 - // }; 335 - // assert_eq!( 336 - // needs_from_node(node).unwrap(), 337 - // vec![ 338 - // Need::Node(cid1()), 339 - // Need::Record { 340 - // rkey: "asdf".into(), 341 - // cid: cid2(), 342 - // }, 343 - // ] 344 - // ); 345 - // } 346 - 347 - // #[test] 348 - // fn test_needs_from_full_node() { 349 - // let node = Node { 350 - // left: Some(cid1()), 351 - // entries: vec![ 352 - // Entry { 353 - // keysuffix: "asdf".into(), 354 - // prefix_len: 0, 355 - // value: cid2(), 356 - // tree: Some(cid3()), 357 - // }, 358 - // Entry { 359 - // keysuffix: "ghi".into(), 360 - // prefix_len: 1, 361 - // value: cid4(), 362 - // tree: Some(cid5()), 363 - // }, 364 - // Entry { 365 - // keysuffix: "jkl".into(), 366 - // prefix_len: 2, 367 - // value: cid6(), 368 - // tree: Some(cid7()), 369 - // }, 370 - // Entry { 371 - // keysuffix: "mno".into(), 372 - // prefix_len: 4, 373 - // value: cid8(), 374 - // tree: Some(cid9()), 375 - // }, 376 - // ], 377 - // }; 378 - // assert_eq!( 379 - // needs_from_node(node).unwrap(), 380 - // vec![ 381 - // Need::Node(cid1()), 382 - // Need::Record { 383 - // rkey: "asdf".into(), 384 - // cid: cid2(), 385 - // }, 386 - // Need::Node(cid3()), 387 - // Need::Record { 388 - // rkey: "aghi".into(), 389 - // cid: cid4(), 390 - // }, 391 - // Need::Node(cid5()), 392 - // Need::Record { 393 - // rkey: "agjkl".into(), 394 - // cid: cid6(), 395 - // }, 396 - // Need::Node(cid7()), 397 - // Need::Record { 398 - // rkey: "agjkmno".into(), 399 - // cid: cid8(), 400 - // }, 401 - // Need::Node(cid9()), 402 - // ] 403 - // ); 404 - // } 405 403 }
+18 -26
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 6 4 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 5 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 6 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 9 7 10 8 async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 11 - let reader = CarReader::new(bytes).await.unwrap(); 12 - 13 - let root = reader 14 - .header() 15 - .roots() 16 - .first() 17 - .ok_or("missing root") 9 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 10 + .await 18 11 .unwrap() 19 - .clone(); 20 - 21 - let stream = std::pin::pin!(reader.stream()); 22 - 23 - let (_commit, v) = 24 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 25 - .await 26 - .unwrap(); 27 - let mut record_stream = std::pin::pin!(v.stream()); 12 + { 13 + Driver::Memory(_commit, mem_driver) => mem_driver, 14 + Driver::Disk(_) => panic!("too big"), 15 + }; 28 16 29 17 let mut records = 0; 30 18 let mut sum = 0; 31 19 let mut found_bsky_profile = false; 32 20 let mut prev_rkey = "".to_string(); 33 - while let Some((rkey, size)) = record_stream.try_next().await.unwrap() { 34 - records += 1; 35 - sum += size; 36 - if rkey == "app.bsky.actor.profile/self" { 37 - found_bsky_profile = true; 21 + 22 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 23 + for (rkey, size) in pairs { 24 + records += 1; 25 + sum += size; 26 + if rkey == "app.bsky.actor.profile/self" { 27 + found_bsky_profile = true; 28 + } 29 + assert!(rkey > prev_rkey, "rkeys are streamed in order"); 30 + prev_rkey = rkey; 38 31 } 39 - assert!(rkey > prev_rkey, "rkeys are streamed in order"); 40 - prev_rkey = rkey; 41 32 } 33 + 42 34 assert_eq!(records, expected_records); 43 35 assert_eq!(sum, expected_sum); 44 36 assert!(found_bsky_profile);