Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+1037 -589
benches
car-samples
examples
disk-read-file
read-file
src
tests
+170 -61
Cargo.lock
··· 167 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 168 169 169 [[package]] 170 + name = "byteorder-lite" 171 + version = "0.1.0" 172 + source = "registry+https://github.com/rust-lang/crates.io-index" 173 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 174 + 175 + [[package]] 170 176 name = "bytes" 171 177 version = "1.10.1" 172 178 source = "registry+https://github.com/rust-lang/crates.io-index" 173 179 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 180 + 181 + [[package]] 182 + name = "byteview" 183 + version = "0.10.0" 184 + source = "registry+https://github.com/rust-lang/crates.io-index" 185 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 174 186 175 187 [[package]] 176 188 name = "cast" ··· 281 293 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 294 283 295 [[package]] 296 + name = "compare" 297 + version = "0.0.6" 298 + source = "registry+https://github.com/rust-lang/crates.io-index" 299 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 300 + 301 + [[package]] 284 302 name = "const-str" 285 303 version = "0.4.3" 286 304 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 376 ] 359 377 360 378 [[package]] 379 + name = "crossbeam-skiplist" 380 + version = "0.1.3" 381 + source = "registry+https://github.com/rust-lang/crates.io-index" 382 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 383 + dependencies = [ 384 + "crossbeam-epoch", 385 + "crossbeam-utils", 386 + ] 387 + 388 + [[package]] 361 389 name = "crossbeam-utils" 362 390 version = "0.8.21" 363 391 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 380 408 ] 381 409 382 410 [[package]] 411 + name = "dashmap" 412 + version = "6.1.0" 413 + source = "registry+https://github.com/rust-lang/crates.io-index" 414 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 415 + dependencies = [ 416 + "cfg-if", 417 + "crossbeam-utils", 418 + "hashbrown 0.14.5", 419 + "lock_api", 420 + "once_cell", 421 + "parking_lot_core", 422 + ] 423 + 424 + [[package]] 383 425 name = "data-encoding" 384 426 version = "2.9.0" 385 427 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 422 464 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 465 424 466 [[package]] 467 + name = "enum_dispatch" 468 + version = "0.3.13" 469 + source = "registry+https://github.com/rust-lang/crates.io-index" 470 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 471 + dependencies = [ 472 + "once_cell", 473 + "proc-macro2", 474 + "quote", 475 + "syn 2.0.106", 476 + ] 477 + 478 + [[package]] 425 479 name = "env_filter" 426 480 version = "0.1.3" 427 481 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 445 499 ] 446 500 447 501 [[package]] 502 + name = "equivalent" 503 + version = "1.0.2" 504 + source = "registry+https://github.com/rust-lang/crates.io-index" 505 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 506 + 507 + [[package]] 448 508 name = "errno" 449 509 version = "0.3.14" 450 510 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 455 515 ] 456 516 457 517 [[package]] 458 - name = "fallible-iterator" 459 - version = "0.3.0" 518 + name = "fastrand" 519 + version = "2.3.0" 460 520 source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 521 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 462 522 463 523 [[package]] 464 - name = "fallible-streaming-iterator" 465 - version = "0.1.9" 524 + name = "fjall" 525 + version = "3.0.1" 466 526 source = "registry+https://github.com/rust-lang/crates.io-index" 467 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 527 + checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093" 528 + dependencies = [ 529 + "byteorder-lite", 530 + "byteview", 531 + "dashmap", 532 + "flume", 533 + "log", 534 + "lsm-tree", 535 + "tempfile", 536 + "xxhash-rust", 537 + ] 468 538 469 539 [[package]] 470 - name = "fastrand" 471 - version = "2.3.0" 540 + name = "flume" 541 + version = "0.12.0" 472 542 source = "registry+https://github.com/rust-lang/crates.io-index" 473 - checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 474 - 475 - [[package]] 476 - name = "foldhash" 477 - version = "0.1.5" 478 - source = "registry+https://github.com/rust-lang/crates.io-index" 479 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 543 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 544 + dependencies = [ 545 + "spin", 546 + ] 480 547 481 548 [[package]] 482 549 name = "futures" ··· 608 675 609 676 [[package]] 610 677 name = "hashbrown" 611 - version = "0.15.5" 678 + version = "0.14.5" 612 679 source = "registry+https://github.com/rust-lang/crates.io-index" 613 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 - dependencies = [ 615 - "foldhash", 616 - ] 680 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 617 681 618 682 [[package]] 619 - name = "hashlink" 620 - version = "0.10.0" 683 + name = "hashbrown" 684 + version = "0.16.1" 621 685 source = "registry+https://github.com/rust-lang/crates.io-index" 622 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 - dependencies = [ 624 - "hashbrown", 625 - ] 686 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 626 687 627 688 [[package]] 628 689 name = "heck" ··· 631 692 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 693 633 694 [[package]] 695 + name = "interval-heap" 696 + version = "0.0.5" 697 + source = "registry+https://github.com/rust-lang/crates.io-index" 698 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 699 + dependencies = [ 700 + "compare", 701 + ] 702 + 703 + [[package]] 634 704 name = "io-uring" 635 705 version = "0.7.10" 636 706 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 730 800 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 731 801 732 802 [[package]] 733 - name = "libsqlite3-sys" 734 - version = "0.35.0" 735 - source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 737 - dependencies = [ 738 - "pkg-config", 739 - "vcpkg", 740 - ] 741 - 742 - [[package]] 743 803 name = "linux-raw-sys" 744 804 version = "0.11.0" 745 805 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 761 821 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 822 763 823 [[package]] 824 + name = "lsm-tree" 825 + version = "3.0.1" 826 + source = "registry+https://github.com/rust-lang/crates.io-index" 827 + checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 828 + dependencies = [ 829 + "byteorder-lite", 830 + "byteview", 831 + "crossbeam-skiplist", 832 + "enum_dispatch", 833 + "interval-heap", 834 + "log", 835 + "quick_cache", 836 + "rustc-hash", 837 + "self_cell", 838 + "sfa", 839 + "tempfile", 840 + "varint-rs", 841 + "xxhash-rust", 842 + ] 843 + 844 + [[package]] 764 845 name = "match-lookup" 765 846 version = "0.1.1" 766 847 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 892 973 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 893 974 894 975 [[package]] 895 - name = "pkg-config" 896 - version = "0.3.32" 897 - source = "registry+https://github.com/rust-lang/crates.io-index" 898 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 - 900 - [[package]] 901 976 name = "plotters" 902 977 version = "0.3.7" 903 978 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 950 1025 ] 951 1026 952 1027 [[package]] 1028 + name = "quick_cache" 1029 + version = "0.6.18" 1030 + source = "registry+https://github.com/rust-lang/crates.io-index" 1031 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1032 + dependencies = [ 1033 + "equivalent", 1034 + "hashbrown 0.16.1", 1035 + ] 1036 + 1037 + [[package]] 953 1038 name = "quote" 954 1039 version = "1.0.41" 955 1040 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1024 1109 1025 1110 [[package]] 1026 1111 name = "repo-stream" 1027 - version = "0.1.1" 1112 + version = "0.2.2" 1028 1113 dependencies = [ 1029 1114 "bincode", 1030 1115 "clap", 1031 1116 "criterion", 1032 1117 "env_logger", 1118 + "fjall", 1033 1119 "futures", 1034 1120 "futures-core", 1035 1121 "ipld-core", 1036 1122 "iroh-car", 1037 1123 "log", 1038 1124 "multibase", 1039 - "rusqlite", 1040 1125 "serde", 1041 1126 "serde_bytes", 1042 1127 "serde_ipld_dagcbor", ··· 1047 1132 ] 1048 1133 1049 1134 [[package]] 1050 - name = "rusqlite" 1051 - version = "0.37.0" 1135 + name = "rustc-demangle" 1136 + version = "0.1.26" 1052 1137 source = "registry+https://github.com/rust-lang/crates.io-index" 1053 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 - dependencies = [ 1055 - "bitflags", 1056 - "fallible-iterator", 1057 - "fallible-streaming-iterator", 1058 - "hashlink", 1059 - "libsqlite3-sys", 1060 - "smallvec", 1061 - ] 1138 + checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1062 1139 1063 1140 [[package]] 1064 - name = "rustc-demangle" 1065 - version = "0.1.26" 1141 + name = "rustc-hash" 1142 + version = "2.1.1" 1066 1143 source = "registry+https://github.com/rust-lang/crates.io-index" 1067 - checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1144 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1068 1145 1069 1146 [[package]] 1070 1147 name = "rustix" ··· 1105 1182 version = "1.2.0" 1106 1183 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 1184 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1185 + 1186 + [[package]] 1187 + name = "self_cell" 1188 + version = "1.2.2" 1189 + source = "registry+https://github.com/rust-lang/crates.io-index" 1190 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1108 1191 1109 1192 [[package]] 1110 1193 name = "serde" ··· 1172 1255 ] 1173 1256 1174 1257 [[package]] 1258 + name = "sfa" 1259 + version = "1.0.0" 1260 + source = "registry+https://github.com/rust-lang/crates.io-index" 1261 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1262 + dependencies = [ 1263 + "byteorder-lite", 1264 + "log", 1265 + "xxhash-rust", 1266 + ] 1267 + 1268 + [[package]] 1175 1269 name = "sha2" 1176 1270 version = "0.10.9" 1177 1271 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1211 1305 dependencies = [ 1212 1306 "libc", 1213 1307 "windows-sys 0.59.0", 1308 + ] 1309 + 1310 + [[package]] 1311 + name = "spin" 1312 + version = "0.9.8" 1313 + source = "registry+https://github.com/rust-lang/crates.io-index" 1314 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1315 + dependencies = [ 1316 + "lock_api", 1214 1317 ] 1215 1318 1216 1319 [[package]] ··· 1372 1475 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 1476 1374 1477 [[package]] 1375 - name = "vcpkg" 1376 - version = "0.2.15" 1478 + name = "varint-rs" 1479 + version = "2.2.0" 1377 1480 source = "registry+https://github.com/rust-lang/crates.io-index" 1378 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1481 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1379 1482 1380 1483 [[package]] 1381 1484 name = "version_check" ··· 1659 1762 version = "0.46.0" 1660 1763 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 1764 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1765 + 1766 + [[package]] 1767 + name = "xxhash-rust" 1768 + version = "0.8.15" 1769 + source = "registry+https://github.com/rust-lang/crates.io-index" 1770 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1662 1771 1663 1772 [[package]] 1664 1773 name = "zerocopy"
+3 -3
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.1.1" 3 + version = "0.2.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 - description = "Fast and robust atproto CAR file processing in rust" 6 + description = "A robust CAR file -> MST walker for atproto" 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 + fjall = { version = "3.0.1", default-features = false } 11 12 futures = "0.3.31" 12 13 futures-core = "0.3.31" 13 14 ipld-core = { version = "0.4.2", features = ["serde"] } 14 15 iroh-car = "0.5.1" 15 16 log = "0.4.28" 16 17 multibase = "0.9.2" 17 - rusqlite = "0.37.0" 18 18 serde = { version = "1.0.228", features = ["derive"] } 19 19 serde_bytes = "0.11.19" 20 20 serde_ipld_dagcbor = "0.6.4"
+4 -5
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 + use repo_stream::Driver; 2 3 use std::path::{Path, PathBuf}; 3 4 4 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 21 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 22 23 let reader = tokio::io::BufReader::new(reader); 23 24 24 - let mb = 2_usize.pow(20); 25 - 26 - let mut driver = match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * mb) 25 + let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 27 26 .await 28 27 .unwrap() 29 28 { 30 - repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver, 31 - repo_stream::drive::Vehicle::Big(_) => panic!("not doing disk for benchmark"), 29 + Driver::Memory(_, mem_driver) => mem_driver, 30 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 32 31 }; 33 32 34 33 let mut n = 0;
+12 -8
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 + use repo_stream::Driver; 2 3 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 4 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 6 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 7 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 12 14 .build() 13 15 .expect("Creating runtime failed"); 14 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 15 20 c.bench_function("tiny-car", |b| { 16 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 17 22 }); ··· 24 29 } 25 30 26 31 async fn drive_car(bytes: &[u8]) -> usize { 27 - let mut driver = 28 - match repo_stream::drive::load_car(bytes, |block| block.len(), 32 * 2_usize.pow(20)) 29 - .await 30 - .unwrap() 31 - { 32 - repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver, 33 - repo_stream::drive::Vehicle::Big(_) => panic!("not benching big cars here"), 34 - }; 32 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 + .await 34 + .unwrap() 35 + { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not benching big cars here"), 38 + }; 35 39 36 40 let mut n = 0; 37 41 while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
car-samples/empty.car

This is a binary file and will not be displayed.

+54 -22
examples/disk-read-file/main.rs
··· 1 + /*! 2 + Read a CAR file by spilling to disk 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 7 + use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 3 8 use std::path::PathBuf; 4 - 5 - type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + use std::time::Instant; 6 10 7 11 #[derive(Debug, Parser)] 8 12 struct Args { ··· 13 17 } 14 18 15 19 #[tokio::main] 16 - async fn main() -> Result<()> { 20 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 17 21 env_logger::init(); 18 22 19 23 let Args { car, tmpfile } = Args::parse(); 24 + 25 + // repo-stream takes an AsyncRead as input. wrapping a filesystem read in 26 + // BufReader can provide a really significant performance win. 20 27 let reader = tokio::fs::File::open(car).await?; 21 28 let reader = tokio::io::BufReader::new(reader); 22 29 23 - // let kb = 2_usize.pow(10); 24 - let mb = 2_usize.pow(20); 30 + log::info!("hello! reading the car..."); 31 + let t0 = Instant::now(); 25 32 26 - let limit_mb = 32; 33 + // in this example we only bother handling CARs that are too big for memory 34 + // `noop` helper means: do no block processing, store the raw blocks 35 + let driver = match DriverBuilder::new() 36 + .with_mem_limit_mb(32) // how much memory can be used before disk spill 37 + .load_car(reader) 38 + .await? 39 + { 40 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 41 + Driver::Disk(big_stuff) => { 42 + // we reach here if the repo was too big and needs to be spilled to 43 + // disk to continue 27 44 28 - let driver = match repo_stream::drive::load_car(reader, |block| block.len(), 10 * mb).await? { 29 - repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 30 - repo_stream::drive::Vehicle::Big(big_stuff) => { 31 - let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb).await?; 45 + // set up a disk store we can spill to 46 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 47 + 48 + // do the spilling, get back a (similar) driver 32 49 let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 33 - log::warn!("big: {:?}", commit); 50 + 51 + // at this point you might want to fetch the account's signing key 52 + // via the DID from the commit, and then verify the signature. 53 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 54 + 55 + // pop the driver back out to get some code indentation relief 34 56 driver 35 57 } 36 58 }; 37 59 60 + // collect some random stats about the blocks 38 61 let mut n = 0; 39 - let (mut rx, worker) = driver.rx(512).await?; 62 + let mut zeros = 0; 63 + 64 + log::info!("walking..."); 65 + 66 + // this example uses the disk driver's channel mode: the tree walking is 67 + // spawned onto a blocking thread, and we get chunks of rkey+blocks back 68 + let (mut rx, join) = driver.to_channel(512); 69 + while let Some(r) = rx.recv().await { 70 + let pairs = r?; 40 71 41 - log::debug!("walking..."); 42 - while let Some(pairs) = rx.recv().await { 72 + // keep a count of the total number of blocks seen 43 73 n += pairs.len(); 74 + 75 + for (_, block) in pairs { 76 + // for each block, count how many bytes are equal to '0' 77 + // (this is just an example, you probably want to do something more 78 + // interesting) 79 + zeros += block.into_iter().filter(|&b| b == b'0').count() 80 + } 44 81 } 45 - log::debug!("done walking! joining..."); 46 82 47 - worker.await.unwrap().unwrap(); 83 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 48 84 49 - log::debug!("joined."); 85 + join.await?; 50 86 51 - // log::info!("now is the time to check mem..."); 52 - // tokio::time::sleep(std::time::Duration::from_secs(22)).await; 53 - log::info!("bye! {n}"); 54 - 55 - std::fs::remove_file(tmpfile).unwrap(); // need to also remove -shm -wal 87 + log::info!("done. n={n} zeros={zeros}"); 56 88 57 89 Ok(()) 58 90 }
+14 -6
examples/read-file/main.rs
··· 1 + /*! 2 + Read a CAR file with in-memory processing 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 7 + use repo_stream::{Driver, DriverBuilder}; 3 8 use std::path::PathBuf; 4 9 5 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 18 23 let reader = tokio::fs::File::open(file).await?; 19 24 let reader = tokio::io::BufReader::new(reader); 20 25 21 - let (commit, mut driver) = 22 - match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * 1024).await? { 23 - repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver), 24 - repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"), 25 - }; 26 + let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len()) 28 + .load_car(reader) 29 + .await? 30 + { 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 + }; 26 34 27 35 log::info!("got commit: {commit:?}"); 28 36 ··· 31 39 n += pairs.len(); 32 40 // log::info!("got {rkey:?}"); 33 41 } 34 - log::info!("bye! {n}"); 42 + log::info!("bye! total records={n}"); 35 43 36 44 Ok(()) 37 45 }
+67 -2
readme.md
··· 1 1 # repo-stream 2 2 3 - Fast and (aspirationally) robust atproto CAR file processing in rust 3 + A robust CAR file -> MST walker for atproto 4 + 5 + [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 + [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 8 + 9 + [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 10 + [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 + 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 15 + 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 + 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + } 54 + }; 55 + println!("sum of size of all records: {total_size}"); 56 + Ok(()) 57 + } 58 + ``` 59 + 60 + more recent todo 61 + 62 + - [ ] get an *emtpy* car for the test suite 63 + - [x] implement a max size on disk limit 64 + 65 + 66 + ----- 67 + 68 + older stuff (to clean up): 4 69 5 70 6 71 current car processing times (records processed into their length usize, phil's dev machine): ··· 27 92 -> yeah the commit is returned from init 28 93 - [ ] spec compliance todos 29 94 - [x] assert that keys are ordered and fail if not 30 - - [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 95 + - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 31 96 - [ ] performance todos 32 97 - [x] consume the serialized nodes into a mutable efficient format 33 98 - [ ] maybe customize the deserialize impl to do that directly?
+137 -61
src/disk.rs
··· 1 + /*! 2 + Disk storage for blocks on disk 3 + 4 + Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5 + to be the best behaved in terms of both on-disk space usage and memory usage. 6 + 7 + ```no_run 8 + # use repo_stream::{DiskBuilder, DiskError}; 9 + # #[tokio::main] 10 + # async fn main() -> Result<(), DiskError> { 11 + let store = DiskBuilder::new() 12 + .with_cache_size_mb(32) 13 + .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 + .open("/some/path.db".into()).await?; 15 + # Ok(()) 16 + # } 17 + ``` 18 + */ 19 + 1 20 use crate::drive::DriveError; 2 - use rusqlite::OptionalExtension; 21 + use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 22 + use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 3 23 use std::path::PathBuf; 4 24 5 - pub struct SqliteStore { 6 - conn: rusqlite::Connection, 25 + #[derive(Debug, thiserror::Error)] 26 + pub enum DiskError { 27 + /// A wrapped database error 28 + /// 29 + /// (The wrapped err should probably be obscured to remove public-facing 30 + /// sqlite bits) 31 + #[error(transparent)] 32 + DbError(#[from] FjallError), 33 + /// A tokio blocking task failed to join 34 + #[error("Failed to join a tokio blocking task: {0}")] 35 + JoinError(#[from] tokio::task::JoinError), 36 + /// The total size of stored blocks exceeded the allowed size 37 + /// 38 + /// If you need to process *really* big CARs, you can configure a higher 39 + /// limit. 40 + #[error("Maximum disk size reached")] 41 + MaxSizeExceeded, 7 42 } 8 43 9 - impl SqliteStore { 10 - pub async fn new(path: PathBuf, cache_mb: usize) -> Result<Self, rusqlite::Error> { 11 - let conn = tokio::task::spawn_blocking(move || { 12 - let conn = rusqlite::Connection::open(path)?; 44 + /// Builder-style disk store setup 45 + #[derive(Debug, Clone)] 46 + pub struct DiskBuilder { 47 + /// Database in-memory cache allowance 48 + /// 49 + /// Default: 32 MiB 50 + pub cache_size_mb: usize, 51 + /// Database stored block size limit 52 + /// 53 + /// Default: 10 GiB 54 + /// 55 + /// Note: actual size on disk may be more, but should approximately scale 56 + /// with this limit 57 + pub max_stored_mb: usize, 58 + } 13 59 14 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 60 + impl Default for DiskBuilder { 61 + fn default() -> Self { 62 + Self { 63 + cache_size_mb: 64, 64 + max_stored_mb: 10 * 1024, // 10 GiB 65 + } 66 + } 67 + } 15 68 16 - // conn.pragma_update(None, "journal_mode", "OFF")?; 17 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 18 - conn.pragma_update(None, "journal_mode", "WAL")?; 19 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 20 - conn.pragma_update(None, "synchronous", "OFF")?; 21 - conn.pragma_update( 22 - None, 23 - "cache_size", 24 - (cache_mb as i64 * sqlite_one_mb).to_string(), 25 - )?; 26 - conn.execute( 27 - "CREATE TABLE blocks ( 28 - key BLOB PRIMARY KEY NOT NULL, 29 - val BLOB NOT NULL 30 - ) WITHOUT ROWID", 31 - (), 32 - )?; 33 - 34 - Ok::<_, rusqlite::Error>(conn) 35 - }) 36 - .await 37 - .expect("join error")?; 38 - 39 - Ok(Self { conn }) 69 + impl DiskBuilder { 70 + /// Begin configuring the storage with defaults 71 + pub fn new() -> Self { 72 + Default::default() 40 73 } 41 - pub fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, rusqlite::Error> { 42 - let tx = self.conn.transaction()?; 43 - // let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 44 - Ok(SqliteWriter { tx }) 74 + /// Set the in-memory cache allowance for the database 75 + /// 76 + /// Default: 64 MiB 77 + pub fn with_cache_size_mb(mut self, size: usize) -> Self { 78 + self.cache_size_mb = size; 79 + self 80 + } 81 + /// Set the approximate stored block size limit 82 + /// 83 + /// Default: 10 GiB 84 + pub fn with_max_stored_mb(mut self, max: usize) -> Self { 85 + self.max_stored_mb = max; 86 + self 45 87 } 46 - pub fn get_reader(&'_ self) -> Result<SqliteReader<'_>, rusqlite::Error> { 47 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 48 - Ok(SqliteReader { select_stmt }) 88 + /// Open and initialize the actual disk storage 89 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 90 + DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 49 91 } 50 92 } 51 93 52 - pub struct SqliteWriter<'conn> { 53 - tx: rusqlite::Transaction<'conn>, 94 + /// On-disk block storage 95 + pub struct DiskStore { 96 + #[allow(unused)] 97 + db: Database, 98 + partition: Keyspace, 99 + max_stored: usize, 100 + stored: usize, 54 101 } 55 102 56 - impl SqliteWriter<'_> { 57 - pub fn put_many( 103 + impl DiskStore { 104 + /// Initialize a new disk store 105 + pub async fn new( 106 + path: PathBuf, 107 + cache_mb: usize, 108 + max_stored_mb: usize, 109 + ) -> Result<Self, DiskError> { 110 + let max_stored = max_stored_mb * 2_usize.pow(20); 111 + let (db, partition) = tokio::task::spawn_blocking(move || { 112 + let db = Database::builder(path) 113 + // .manual_journal_persist(true) 114 + // .flush_workers(1) 115 + // .compaction_workers(1) 116 + .journal_compression(CompressionType::None) 117 + .cache_size(cache_mb as u64 * 2_u64.pow(20)) 118 + .temporary(true) 119 + .open()?; 120 + let opts = KeyspaceCreateOptions::default() 121 + .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 122 + .filter_block_pinning_policy(PinningPolicy::disabled()) 123 + .expect_point_read_hits(true) 124 + .data_block_compression_policy(CompressionPolicy::disabled()) 125 + .manual_journal_persist(true) 126 + .max_memtable_size(32 * 2_u64.pow(20)); 127 + let partition = db.keyspace("z", || opts)?; 128 + 129 + Ok::<_, DiskError>((db, partition)) 130 + }) 131 + .await??; 132 + 133 + Ok(Self { 134 + db, 135 + partition, 136 + max_stored, 137 + stored: 0, 138 + }) 139 + } 140 + 141 + pub(crate) fn put_many( 58 142 &mut self, 59 143 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 60 144 ) -> Result<(), DriveError> { 61 - let mut insert_stmt = self 62 - .tx 63 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 145 + let mut batch = self.db.batch(); 64 146 for pair in kv { 65 147 let (k, v) = pair?; 66 - insert_stmt.execute((k, v))?; 148 + self.stored += v.len(); 149 + if self.stored > self.max_stored { 150 + return Err(DiskError::MaxSizeExceeded.into()); 151 + } 152 + batch.insert(&self.partition, k, v); 67 153 } 68 - Ok(()) 69 - } 70 - pub fn commit(self) -> Result<(), rusqlite::Error> { 71 - self.tx.commit()?; 154 + batch.commit().map_err(DiskError::DbError)?; 72 155 Ok(()) 73 156 } 74 - } 75 157 76 - pub struct SqliteReader<'conn> { 77 - select_stmt: rusqlite::Statement<'conn>, 78 - } 79 - 80 - impl SqliteReader<'_> { 81 - pub fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 82 - self.select_stmt 83 - .query_one((&key,), |row| row.get(0)) 84 - .optional() 158 + #[inline] 159 + pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 160 + self.partition.get(key) 85 161 } 86 162 }
+361 -190
src/drive.rs
··· 1 - //! Consume an MST block stream, producing an ordered stream of records 1 + //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::disk::SqliteStore; 3 + use crate::disk::{DiskError, DiskStore}; 4 4 use crate::process::Processable; 5 5 use ipld_core::cid::Cid; 6 6 use iroh_car::CarReader; 7 7 use serde::{Deserialize, Serialize}; 8 8 use std::collections::HashMap; 9 9 use std::convert::Infallible; 10 - use tokio::io::AsyncRead; 10 + use tokio::{io::AsyncRead, sync::mpsc}; 11 11 12 12 use crate::mst::{Commit, Node}; 13 13 use crate::walk::{Step, WalkError, Walker}; ··· 28 28 #[error("CAR file had no roots")] 29 29 MissingRoot, 30 30 #[error("Storage error")] 31 - StorageError(#[from] rusqlite::Error), 31 + StorageError(#[from] DiskError), 32 32 #[error("Encode error: {0}")] 33 33 BincodeEncodeError(#[from] bincode::error::EncodeError), 34 34 #[error("Tried to send on a closed channel")] ··· 45 45 ExtraGarbage, 46 46 } 47 47 48 + /// An in-order chunk of Rkey + (processed) Block pairs 49 + pub type BlockChunk<T> = Vec<(String, T)>; 50 + 48 51 #[derive(Debug, Clone, Serialize, Deserialize)] 49 - pub enum MaybeProcessedBlock<T> { 52 + pub(crate) enum MaybeProcessedBlock<T> { 50 53 /// A block that's *probably* a Node (but we can't know yet) 51 54 /// 52 55 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 88 91 } 89 92 } 90 93 91 - pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> { 92 - Lil(Commit, MemDriver<T>), 93 - Big(BigCar<R, T>), 94 + impl<T> MaybeProcessedBlock<T> { 95 + fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 + if Node::could_be(&data) { 97 + MaybeProcessedBlock::Raw(data) 98 + } else { 99 + MaybeProcessedBlock::Processed(process(data)) 100 + } 101 + } 94 102 } 95 103 96 - pub async fn load_car<R: AsyncRead + Unpin, T: Processable>( 97 - reader: R, 98 - process: fn(Vec<u8>) -> T, 99 - max_size: usize, 100 - ) -> Result<Vehicle<R, T>, DriveError> { 101 - let mut mem_blocks = HashMap::new(); 104 + /// Read a CAR file, buffering blocks in memory or to disk 105 + pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 + /// All blocks fit within the memory limit 107 + /// 108 + /// You probably want to check the commit's signature. You can go ahead and 109 + /// walk the MST right away. 110 + Memory(Commit, MemDriver<T>), 111 + /// Blocks exceed the memory limit 112 + /// 113 + /// You'll need to provide a disk storage to continue. The commit will be 114 + /// returned and can be validated only once all blocks are loaded. 115 + Disk(NeedDisk<R, T>), 116 + } 102 117 103 - let mut car = CarReader::new(reader).await?; 118 + /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 120 + pub struct DriverBuilder { 121 + pub mem_limit_mb: usize, 122 + } 104 123 105 - let root = *car 106 - .header() 107 - .roots() 108 - .first() 109 - .ok_or(DriveError::MissingRoot)?; 110 - log::debug!("root: {root:?}"); 124 + impl Default for DriverBuilder { 125 + fn default() -> Self { 126 + Self { mem_limit_mb: 16 } 127 + } 128 + } 111 129 112 - let mut commit = None; 130 + impl DriverBuilder { 131 + /// Begin configuring the driver with defaults 132 + pub fn new() -> Self { 133 + Default::default() 134 + } 135 + /// Set the in-memory size limit, in MiB 136 + /// 137 + /// Default: 16 MiB 138 + pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 139 + Self { 140 + mem_limit_mb: new_limit, 141 + } 142 + } 143 + /// Set the block processor 144 + /// 145 + /// Default: noop, raw blocks will be emitted 146 + pub fn with_block_processor<T: Processable>( 147 + self, 148 + p: fn(Vec<u8>) -> T, 149 + ) -> DriverBuilderWithProcessor<T> { 150 + DriverBuilderWithProcessor { 151 + mem_limit_mb: self.mem_limit_mb, 152 + block_processor: p, 153 + } 154 + } 155 + /// Begin processing an atproto MST from a CAR file 156 + pub async fn load_car<R: AsyncRead + Unpin>( 157 + &self, 158 + reader: R, 159 + ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 + Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 161 + } 162 + } 113 163 114 - // try to load all the blocks into memory 115 - let mut mem_size = 0; 116 - while let Some((cid, data)) = car.next_block().await? { 117 - // the root commit is a Special Third Kind of block that we need to make 118 - // sure not to optimistically send to the processing function 119 - if cid == root { 120 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 121 - commit = Some(c); 122 - continue; 164 + /// Builder-style driver intermediate step 165 + /// 166 + /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 168 + pub struct DriverBuilderWithProcessor<T: Processable> { 169 + pub mem_limit_mb: usize, 170 + pub block_processor: fn(Vec<u8>) -> T, 171 + } 172 + 173 + impl<T: Processable> DriverBuilderWithProcessor<T> { 174 + /// Set the in-memory size limit, in MiB 175 + /// 176 + /// Default: 16 MiB 177 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 178 + self.mem_limit_mb = new_limit; 179 + self 180 + } 181 + /// Begin processing an atproto MST from a CAR file 182 + pub async fn load_car<R: AsyncRead + Unpin>( 183 + &self, 184 + reader: R, 185 + ) -> Result<Driver<R, T>, DriveError> { 186 + Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 + } 188 + } 189 + 190 + impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 191 + /// Begin processing an atproto MST from a CAR file 192 + /// 193 + /// Blocks will be loaded, processed, and buffered in memory. If the entire 194 + /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 195 + /// will be returned along with a `Commit` ready for validation. 196 + /// 197 + /// If the `mem_limit_mb` limit is reached before loading all blocks, the 198 + /// partial state will be returned as `Driver::Disk(needed)`, which can be 199 + /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 + pub async fn load_car( 201 + reader: R, 202 + process: fn(Vec<u8>) -> T, 203 + mem_limit_mb: usize, 204 + ) -> Result<Driver<R, T>, DriveError> { 205 + let max_size = mem_limit_mb * 2_usize.pow(20); 206 + let mut mem_blocks = HashMap::new(); 207 + 208 + let mut car = CarReader::new(reader).await?; 209 + 210 + let root = *car 211 + .header() 212 + .roots() 213 + .first() 214 + .ok_or(DriveError::MissingRoot)?; 215 + log::debug!("root: {root:?}"); 216 + 217 + let mut commit = None; 218 + 219 + // try to load all the blocks into memory 220 + let mut mem_size = 0; 221 + while let Some((cid, data)) = car.next_block().await? { 222 + // the root commit is a Special Third Kind of block that we need to make 223 + // sure not to optimistically send to the processing function 224 + if cid == root { 225 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 226 + commit = Some(c); 227 + continue; 228 + } 229 + 230 + // remaining possible types: node, record, other. optimistically process 231 + let maybe_processed = MaybeProcessedBlock::maybe(process, data); 232 + 233 + // stash (maybe processed) blocks in memory as long as we have room 234 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 235 + mem_blocks.insert(cid, maybe_processed); 236 + if mem_size >= max_size { 237 + return Ok(Driver::Disk(NeedDisk { 238 + car, 239 + root, 240 + process, 241 + max_size, 242 + mem_blocks, 243 + commit, 244 + })); 245 + } 123 246 } 124 247 125 - // remaining possible types: node, record, other. optimistically process 126 - let maybe_processed = if Node::could_be(&data) { 127 - MaybeProcessedBlock::Raw(data) 128 - } else { 129 - MaybeProcessedBlock::Processed(process(data)) 130 - }; 248 + // all blocks loaded and we fit in memory! hopefully we found the commit... 249 + let commit = commit.ok_or(DriveError::MissingCommit)?; 250 + 251 + let walker = Walker::new(commit.data); 131 252 132 - // stash (maybe processed) blocks in memory as long as we have room 133 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 134 - mem_blocks.insert(cid, maybe_processed); 135 - if mem_size >= max_size { 136 - return Ok(Vehicle::Big(BigCar { 137 - car, 138 - root, 253 + Ok(Driver::Memory( 254 + commit, 255 + MemDriver { 256 + blocks: mem_blocks, 257 + walker, 139 258 process, 140 - max_size, 141 - mem_blocks, 142 - commit, 143 - })); 144 - } 259 + }, 260 + )) 145 261 } 262 + } 146 263 147 - // all blocks loaded and we fit in memory! hopefully we found the commit... 148 - let commit = commit.ok_or(DriveError::MissingCommit)?; 264 + /// The core driver between the block stream and MST walker 265 + /// 266 + /// In the future, PDSs will export CARs in a stream-friendly order that will 267 + /// enable processing them with tiny memory overhead. But that future is not 268 + /// here yet. 269 + /// 270 + /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 271 + /// optimistic stream features: we load all block first, then walk the MST. 272 + /// 273 + /// This makes things much simpler: we only need to worry about spilling to disk 274 + /// in one place, and we always have a reasonable expecatation about how much 275 + /// work the init function will do. We can drop the CAR reader before walking, 276 + /// so the sync/async boundaries become a little easier to work around. 277 + #[derive(Debug)] 278 + pub struct MemDriver<T: Processable> { 279 + blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 280 + walker: Walker, 281 + process: fn(Vec<u8>) -> T, 282 + } 149 283 150 - let walker = Walker::new(commit.data); 284 + impl<T: Processable> MemDriver<T> { 285 + /// Step through the record outputs, in rkey order 286 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 287 + let mut out = Vec::with_capacity(n); 288 + for _ in 0..n { 289 + // walk as far as we can until we run out of blocks or find a record 290 + match self.walker.step(&mut self.blocks, self.process)? { 291 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 292 + Step::Finish => break, 293 + Step::Found { rkey, data } => { 294 + out.push((rkey, data)); 295 + continue; 296 + } 297 + }; 298 + } 151 299 152 - Ok(Vehicle::Lil( 153 - commit, 154 - MemDriver { 155 - blocks: mem_blocks, 156 - walker, 157 - process, 158 - }, 159 - )) 300 + if out.is_empty() { 301 + Ok(None) 302 + } else { 303 + Ok(Some(out)) 304 + } 305 + } 160 306 } 161 307 162 - /// a paritally memory-loaded car file that needs disk spillover to continue 163 - pub struct BigCar<R: AsyncRead + Unpin, T: Processable> { 308 + /// A partially memory-loaded car file that needs disk spillover to continue 309 + pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 164 310 car: CarReader<R>, 165 311 root: Cid, 166 312 process: fn(Vec<u8>) -> T, ··· 173 319 bincode::serde::encode_to_vec(v, bincode::config::standard()) 174 320 } 175 321 176 - pub fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 322 + pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 177 323 let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 178 324 if n != bytes.len() { 179 325 return Err(DecodeError::ExtraGarbage); ··· 181 327 Ok(t) 182 328 } 183 329 184 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> BigCar<R, T> { 330 + impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 185 331 pub async fn finish_loading( 186 332 mut self, 187 - mut store: SqliteStore, 188 - ) -> Result<(Commit, BigCarReady<T>), DriveError> { 333 + mut store: DiskStore, 334 + ) -> Result<(Commit, DiskDriver<T>), DriveError> { 189 335 // move store in and back out so we can manage lifetimes 190 336 // dump mem blocks into the store 191 337 store = tokio::task::spawn(async move { 192 - let mut writer = store.get_writer()?; 193 - 194 338 let kvs = self 195 339 .mem_blocks 196 340 .into_iter() 197 341 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 198 342 199 - writer.put_many(kvs)?; 200 - writer.commit()?; 343 + store.put_many(kvs)?; 201 344 Ok::<_, DriveError>(store) 202 345 }) 203 346 .await??; 204 347 205 - let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 348 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 206 349 207 350 let store_worker = tokio::task::spawn_blocking(move || { 208 - let mut writer = store.get_writer()?; 209 - 210 351 while let Some(chunk) = rx.blocking_recv() { 211 352 let kvs = chunk 212 353 .into_iter() 213 354 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 214 - writer.put_many(kvs)?; 355 + store.put_many(kvs)?; 215 356 } 216 - 217 - writer.commit()?; 218 357 Ok::<_, DriveError>(store) 219 358 }); // await later 220 359 ··· 235 374 } 236 375 // remaining possible types: node, record, other. optimistically process 237 376 // TODO: get the actual in-memory size to compute disk spill 238 - let maybe_processed = if Node::could_be(&data) { 239 - MaybeProcessedBlock::Raw(data) 240 - } else { 241 - MaybeProcessedBlock::Processed((self.process)(data)) 242 - }; 377 + let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 243 378 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 244 379 chunk.push((cid, maybe_processed)); 245 380 if mem_size >= self.max_size { ··· 269 404 270 405 Ok(( 271 406 commit, 272 - BigCarReady { 407 + DiskDriver { 273 408 process: self.process, 274 - store, 275 - walker, 409 + state: Some(BigState { store, walker }), 276 410 }, 277 411 )) 278 412 } 279 413 } 280 414 281 - pub struct BigCarReady<T: Clone> { 282 - process: fn(Vec<u8>) -> T, 283 - store: SqliteStore, 415 + struct BigState { 416 + store: DiskStore, 284 417 walker: Walker, 285 418 } 286 419 287 - impl<T: Processable + Send + 'static> BigCarReady<T> { 288 - pub async fn next_chunk( 289 - mut self, 290 - n: usize, 291 - ) -> Result<(Self, Option<Vec<(String, T)>>), DriveError> { 292 - let mut out = Vec::with_capacity(n); 293 - (self, out) = tokio::task::spawn_blocking(move || { 294 - let store = self.store; 295 - let mut reader = store.get_reader()?; 296 - 297 - for _ in 0..n { 298 - // walk as far as we can until we run out of blocks or find a record 299 - match self.walker.disk_step(&mut reader, self.process)? { 300 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 301 - Step::Finish => break, 302 - Step::Step { rkey, data } => { 303 - out.push((rkey, data)); 304 - continue; 305 - } 306 - }; 307 - } 420 + /// MST walker that reads from disk instead of an in-memory hashmap 421 + pub struct DiskDriver<T: Clone> { 422 + process: fn(Vec<u8>) -> T, 423 + state: Option<BigState>, 424 + } 308 425 309 - drop(reader); // cannot outlive store 310 - self.store = store; 311 - Ok::<_, DriveError>((self, out)) 312 - }) 313 - .await??; 314 - 315 - if out.is_empty() { 316 - Ok((self, None)) 317 - } else { 318 - Ok((self, Some(out))) 319 - } 426 + // for doctests only 427 + #[doc(hidden)] 428 + pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 429 + use crate::process::noop; 430 + DiskDriver { 431 + process: noop, 432 + state: None, 320 433 } 434 + } 321 435 322 - pub async fn rx( 323 - mut self, 324 - n: usize, 325 - ) -> Result< 326 - ( 327 - tokio::sync::mpsc::Receiver<Vec<(String, T)>>, 328 - tokio::task::JoinHandle<Result<(), DriveError>>, 329 - ), 330 - DriveError, 331 - > { 332 - let (tx, rx) = tokio::sync::mpsc::channel::<Vec<(String, T)>>(1); 436 + impl<T: Processable + Send + 'static> DiskDriver<T> { 437 + /// Walk the MST returning up to `n` rkey + record pairs 438 + /// 439 + /// ```no_run 440 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 441 + /// # #[tokio::main] 442 + /// # async fn main() -> Result<(), DriveError> { 443 + /// # let mut disk_driver = _get_fake_disk_driver(); 444 + /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 445 + /// for (rkey, record) in pairs { 446 + /// println!("{rkey}: size={}", record.len()); 447 + /// } 448 + /// } 449 + /// # Ok(()) 450 + /// # } 451 + /// ``` 452 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 453 + let process = self.process; 333 454 334 - // sketch: this worker is going to be allowed to execute without a join handle 335 - // ...should we return the join handle here so the caller at least knows about it? 336 - // yes probably for error handling?? (orrr put errors in the channel) 337 - let worker = tokio::task::spawn_blocking(move || { 338 - let mut reader = self.store.get_reader()?; 455 + // state should only *ever* be None transiently while inside here 456 + let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 339 457 340 - loop { 458 + // the big pain here is that we don't want to leave self.state in an 459 + // invalid state (None), so all the error paths have to make sure it 460 + // comes out again. 461 + let (state, res) = tokio::task::spawn_blocking( 462 + move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 341 463 let mut out = Vec::with_capacity(n); 342 464 343 465 for _ in 0..n { 344 466 // walk as far as we can until we run out of blocks or find a record 345 - match self.walker.disk_step(&mut reader, self.process)? { 346 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 347 - Step::Finish => break, 348 - Step::Step { rkey, data } => { 349 - out.push((rkey, data)); 350 - continue; 467 + let step = match state.walker.disk_step(&mut state.store, process) { 468 + Ok(s) => s, 469 + Err(e) => { 470 + return (state, Err(e.into())); 471 + } 472 + }; 473 + match step { 474 + Step::Missing(cid) => { 475 + return (state, Err(DriveError::MissingBlock(cid))); 351 476 } 477 + Step::Finish => break, 478 + Step::Found { rkey, data } => out.push((rkey, data)), 352 479 }; 353 480 } 354 481 355 - if out.is_empty() { 356 - break; 357 - } 358 - tx.blocking_send(out) 359 - .map_err(|_| DriveError::ChannelSendError)?; 360 - } 482 + (state, Ok::<_, DriveError>(out)) 483 + }, 484 + ) 485 + .await?; // on tokio JoinError, we'll be left with invalid state :( 361 486 362 - drop(reader); // cannot outlive store 363 - Ok(()) 364 - }); // await later 487 + // *must* restore state before dealing with the actual result 488 + self.state = Some(state); 365 489 366 - Ok((rx, worker)) 367 - } 368 - } 369 - 370 - /// The core driver between the block stream and MST walker 371 - /// 372 - /// In the future, PDSs will export CARs in a stream-friendly order that will 373 - /// enable processing them with tiny memory overhead. But that future is not 374 - /// here yet. 375 - /// 376 - /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 377 - /// optimistic stream features: we load all block first, then walk the MST. 378 - /// 379 - /// This makes things much simpler: we only need to worry about spilling to disk 380 - /// in one place, and we always have a reasonable expecatation about how much 381 - /// work the init function will do. We can drop the CAR reader before walking, 382 - /// so the sync/async boundaries become a little easier to work around. 383 - #[derive(Debug)] 384 - pub struct MemDriver<T: Processable> { 385 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 386 - walker: Walker, 387 - process: fn(Vec<u8>) -> T, 388 - } 389 - 390 - impl<T: Processable> MemDriver<T> { 391 - /// Manually step through the record outputs 392 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<(String, T)>>, DriveError> { 393 - let mut out = Vec::with_capacity(n); 394 - for _ in 0..n { 395 - // walk as far as we can until we run out of blocks or find a record 396 - match self.walker.step(&mut self.blocks, self.process)? { 397 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 398 - Step::Finish => break, 399 - Step::Step { rkey, data } => { 400 - out.push((rkey, data)); 401 - continue; 402 - } 403 - }; 404 - } 490 + let out = res?; 405 491 406 492 if out.is_empty() { 407 493 Ok(None) 408 494 } else { 409 495 Ok(Some(out)) 410 496 } 497 + } 498 + 499 + fn read_tx_blocking( 500 + &mut self, 501 + n: usize, 502 + tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 503 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 504 + let BigState { store, walker } = self.state.as_mut().expect("valid state"); 505 + 506 + loop { 507 + let mut out: BlockChunk<T> = Vec::with_capacity(n); 508 + 509 + for _ in 0..n { 510 + // walk as far as we can until we run out of blocks or find a record 511 + 512 + let step = match walker.disk_step(store, self.process) { 513 + Ok(s) => s, 514 + Err(e) => return tx.blocking_send(Err(e.into())), 515 + }; 516 + 517 + match step { 518 + Step::Missing(cid) => { 519 + return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 520 + } 521 + Step::Finish => return Ok(()), 522 + Step::Found { rkey, data } => { 523 + out.push((rkey, data)); 524 + continue; 525 + } 526 + }; 527 + } 528 + 529 + if out.is_empty() { 530 + break; 531 + } 532 + tx.blocking_send(Ok(out))?; 533 + } 534 + 535 + Ok(()) 536 + } 537 + 538 + /// Spawn the disk reading task into a tokio blocking thread 539 + /// 540 + /// The idea is to avoid so much sending back and forth to the blocking 541 + /// thread, letting a blocking task do all the disk reading work and sending 542 + /// records and rkeys back through an `mpsc` channel instead. 543 + /// 544 + /// This might also allow the disk work to continue while processing the 545 + /// records. It's still not yet clear if this method actually has much 546 + /// benefit over just using `.next_chunk(n)`. 547 + /// 548 + /// ```no_run 549 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 550 + /// # #[tokio::main] 551 + /// # async fn main() -> Result<(), DriveError> { 552 + /// # let mut disk_driver = _get_fake_disk_driver(); 553 + /// let (mut rx, join) = disk_driver.to_channel(512); 554 + /// while let Some(recvd) = rx.recv().await { 555 + /// let pairs = recvd?; 556 + /// for (rkey, record) in pairs { 557 + /// println!("{rkey}: size={}", record.len()); 558 + /// } 559 + /// 560 + /// } 561 + /// # Ok(()) 562 + /// # } 563 + /// ``` 564 + pub fn to_channel( 565 + mut self, 566 + n: usize, 567 + ) -> ( 568 + mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 569 + tokio::task::JoinHandle<Self>, 570 + ) { 571 + let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 572 + 573 + // sketch: this worker is going to be allowed to execute without a join handle 574 + let chan_task = tokio::task::spawn_blocking(move || { 575 + if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 576 + log::debug!("big car reader exited early due to dropped receiver channel"); 577 + } 578 + self 579 + }); 580 + 581 + (rx, chan_task) 411 582 } 412 583 }
+80 -5
src/lib.rs
··· 1 - //! Fast and robust atproto CAR file processing in rust 2 - //! 3 - //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 1 + /*! 2 + A robust CAR file -> MST walker for atproto 3 + 4 + Small CARs have their blocks buffered in memory. If a configurable memory limit 5 + is reached while reading blocks, CAR reading is suspended, and can be continued 6 + by providing disk storage to buffer the CAR blocks instead. 7 + 8 + A `process` function can be provided for tasks where records are transformed 9 + into a smaller representation, to save memory (and disk) during block reading. 10 + 11 + Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12 + `(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 + 14 + Some MST validations are applied 15 + - Keys must appear in order 16 + - Keys must be at the correct MST tree depth 17 + 18 + `iroh_car` additionally applies a block size limit of `2MiB`. 19 + 20 + ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 + 23 + # #[tokio::main] 24 + # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 + # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 + let mut total_size = 0; 27 + 28 + match DriverBuilder::new() 29 + .with_mem_limit_mb(10) 30 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 + .load_car(reader) 32 + .await? 33 + { 34 + 35 + // if all blocks fit within memory 36 + Driver::Memory(_commit, mut driver) => { 37 + while let Some(chunk) = driver.next_chunk(256).await? { 38 + for (_rkey, size) in chunk { 39 + total_size += size; 40 + } 41 + } 42 + }, 43 + 44 + // if the CAR was too big for in-memory processing 45 + Driver::Disk(paused) => { 46 + // set up a disk store we can spill to 47 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 + // do the spilling, get back a (similar) driver 49 + let (_commit, mut driver) = paused.finish_loading(store).await?; 50 + 51 + while let Some(chunk) = driver.next_chunk(256).await? { 52 + for (_rkey, size) in chunk { 53 + total_size += size; 54 + } 55 + } 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + # Ok(()) 60 + # } 61 + ``` 62 + 63 + Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 64 + ahead and eagerly using disk I/O. This means you have to write a bit more code 65 + to handle both cases, but it allows you to have finer control over resource 66 + usage. For example, you can drive a number of parallel memory CAR workers, and 67 + separately have a different number of disk workers picking up suspended disk 68 + tasks from a queue. 69 + 70 + Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 71 + 72 + */ 73 + 74 + pub mod mst; 75 + mod walk; 4 76 5 77 pub mod disk; 6 78 pub mod drive; 7 - pub mod mst; 8 79 pub mod process; 9 - pub mod walk; 80 + 81 + pub use disk::{DiskBuilder, DiskError, DiskStore}; 82 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 83 + pub use mst::Commit; 84 + pub use process::Processable;
+4 -4
src/mst.rs
··· 39 39 /// MST node data schema 40 40 #[derive(Debug, Deserialize, PartialEq)] 41 41 #[serde(deny_unknown_fields)] 42 - pub struct Node { 42 + pub(crate) struct Node { 43 43 /// link to sub-tree Node on a lower level and with all keys sorting before 44 44 /// keys at this node 45 45 #[serde(rename = "l")] ··· 62 62 /// so if a block *could be* a node, any record converter must postpone 63 63 /// processing. if it turns out it happens to be a very node-looking record, 64 64 /// well, sorry, it just has to only be processed later when that's known. 65 - pub fn could_be(bytes: impl AsRef<[u8]>) -> bool { 65 + pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 66 const NODE_FINGERPRINT: [u8; 3] = [ 67 67 0xA2, // map length 2 (for "l" and "e" keys) 68 68 0x61, // text length 1 ··· 83 83 /// with an empty array of entries. This is the only situation in which a 84 84 /// tree may contain an empty leaf node which does not either contain keys 85 85 /// ("entries") or point to a sub-tree containing entries. 86 - pub fn is_empty(&self) -> bool { 86 + pub(crate) fn is_empty(&self) -> bool { 87 87 self.left.is_none() && self.entries.is_empty() 88 88 } 89 89 } ··· 91 91 /// TreeEntry object 92 92 #[derive(Debug, Deserialize, PartialEq)] 93 93 #[serde(deny_unknown_fields)] 94 - pub struct Entry { 94 + pub(crate) struct Entry { 95 95 /// count of bytes shared with previous TreeEntry in this Node (if any) 96 96 #[serde(rename = "p")] 97 97 pub prefix_len: usize,
+97 -1
src/process.rs
··· 1 + /*! 2 + Record processor function output trait 3 + 4 + The return type must satisfy the `Processable` trait, which requires: 5 + 6 + - `Clone` because two rkeys can refer to the same record by CID, which may 7 + only appear once in the CAR file. 8 + - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 + 10 + One required function must be implemented, `get_size()`: this should return the 11 + approximate total off-stack size of the type. (the on-stack size will be added 12 + automatically via `std::mem::get_size`). 13 + 14 + Note that it is **not guaranteed** that the `process` function will run on a 15 + block before storing it in memory or on disk: it's not possible to know if a 16 + block is a record without actually walking the MST, so the best we can do is 17 + apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 + store the raw block bytes. 19 + 20 + Here's a silly processing function that just collects 'eyy's found in the raw 21 + record bytes 22 + 23 + ``` 24 + # use repo_stream::Processable; 25 + # use serde::{Serialize, Deserialize}; 26 + #[derive(Debug, Clone, Serialize, Deserialize)] 27 + struct Eyy(usize, String); 28 + 29 + impl Processable for Eyy { 30 + fn get_size(&self) -> usize { 31 + // don't need to compute the usize, it's on the stack 32 + self.1.capacity() // in-mem size from the string's capacity, in bytes 33 + } 34 + } 35 + 36 + fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 + let mut out = Vec::new(); 38 + let to_find = "eyy".as_bytes(); 39 + for i in 0..(raw.len() - 3) { 40 + if &raw[i..(i+3)] == to_find { 41 + out.push(Eyy(i, "eyy".to_string())); 42 + } 43 + } 44 + out 45 + } 46 + ``` 47 + 48 + The memory sizing stuff is a little sketch but probably at least approximately 49 + works. 50 + */ 51 + 1 52 use serde::{Serialize, de::DeserializeOwned}; 2 53 54 + /// Output trait for record processing 3 55 pub trait Processable: Clone + Serialize + DeserializeOwned { 4 - /// the additional size taken up (not including its mem::size_of) 56 + /// Any additional in-memory size taken by the processed type 57 + /// 58 + /// Do not include stack size (`std::mem::size_of`) 5 59 fn get_size(&self) -> usize; 6 60 } 7 61 62 + /// Processor that just returns the raw blocks 63 + #[inline] 64 + pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 + block 66 + } 67 + 68 + impl Processable for u8 { 69 + fn get_size(&self) -> usize { 70 + 0 71 + } 72 + } 73 + 8 74 impl Processable for usize { 9 75 fn get_size(&self) -> usize { 10 76 0 // no additional space taken, just its stack size (newtype is free) 11 77 } 12 78 } 79 + 80 + impl Processable for String { 81 + fn get_size(&self) -> usize { 82 + self.capacity() 83 + } 84 + } 85 + 86 + impl<Item: Sized + Processable> Processable for Vec<Item> { 87 + fn get_size(&self) -> usize { 88 + let slot_size = std::mem::size_of::<Item>(); 89 + let direct_size = slot_size * self.capacity(); 90 + let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 + direct_size + items_referenced_size 92 + } 93 + } 94 + 95 + impl<Item: Processable> Processable for Option<Item> { 96 + fn get_size(&self) -> usize { 97 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 + } 99 + } 100 + 101 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 + fn get_size(&self) -> usize { 103 + match self { 104 + Ok(item) => item.get_size(), 105 + Err(err) => err.get_size(), 106 + } 107 + } 108 + }
+14 -211
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::disk::SqliteReader; 3 + use crate::disk::DiskStore; 4 4 use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 5 use crate::mst::Node; 6 6 use crate::process::Processable; ··· 19 19 #[error("Action node error: {0}")] 20 20 MstError(#[from] MstError), 21 21 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 22 + StorageError(#[from] fjall::Error), 23 23 #[error("Decode error: {0}")] 24 24 DecodeError(#[from] DecodeError), 25 25 } ··· 51 51 /// Reached the end of the MST! yay! 52 52 Finish, 53 53 /// A record was found! 54 - Step { rkey: String, data: T }, 54 + Found { rkey: String, data: T }, 55 55 } 56 56 57 57 #[derive(Debug, Clone, PartialEq)] ··· 87 87 } 88 88 89 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST 91 - // ...except for a single one for empty MST, but we wouldn't be pushing that 90 + // empty nodes are not allowed in the MST except in an empty MST 92 91 if node.is_empty() { 93 - return Err(MstError::EmptyNode); 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 94 97 } 95 98 96 99 let mut entries = Vec::with_capacity(node.entries.len()); ··· 227 230 } 228 231 self.prev = rkey.clone(); 229 232 230 - return Ok(Step::Step { rkey, data }); 233 + return Ok(Step::Found { rkey, data }); 231 234 } 232 235 } 233 236 } ··· 236 239 /// blocking!!!!!! 237 240 pub fn disk_step<T: Processable>( 238 241 &mut self, 239 - reader: &mut SqliteReader, 242 + reader: &mut DiskStore, 240 243 process: impl Fn(Vec<u8>) -> T, 241 244 ) -> Result<Step<T>, WalkError> { 242 245 loop { ··· 249 252 &mut Need::Node { depth, cid } => { 250 253 let cid_bytes = cid.to_bytes(); 251 254 log::trace!("need node {cid:?}"); 252 - let Some(block_bytes) = reader.get(cid_bytes)? else { 255 + let Some(block_bytes) = reader.get(&cid_bytes)? else { 253 256 log::trace!("node not found, resting"); 254 257 return Ok(Step::Missing(cid)); 255 258 }; ··· 271 274 Need::Record { rkey, cid } => { 272 275 log::trace!("need record {cid:?}"); 273 276 let cid_bytes = cid.to_bytes(); 274 - let Some(data_bytes) = reader.get(cid_bytes)? else { 277 + let Some(data_bytes) = reader.get(&cid_bytes)? else { 275 278 log::trace!("record block not found, resting"); 276 279 return Ok(Step::Missing(*cid)); 277 280 }; ··· 294 297 } 295 298 self.prev = rkey.clone(); 296 299 297 - return Ok(Step::Step { rkey, data }); 300 + return Ok(Step::Found { rkey, data }); 298 301 } 299 302 } 300 303 } ··· 304 307 #[cfg(test)] 305 308 mod test { 306 309 use super::*; 307 - // use crate::mst::Entry; 308 310 309 311 fn cid1() -> Cid { 310 312 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 311 313 .parse() 312 314 .unwrap() 313 315 } 314 - // fn cid2() -> Cid { 315 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 316 - // .parse() 317 - // .unwrap() 318 - // } 319 - // fn cid3() -> Cid { 320 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 321 - // .parse() 322 - // .unwrap() 323 - // } 324 - // fn cid4() -> Cid { 325 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 326 - // .parse() 327 - // .unwrap() 328 - // } 329 - // fn cid5() -> Cid { 330 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 331 - // .parse() 332 - // .unwrap() 333 - // } 334 - // fn cid6() -> Cid { 335 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 336 - // .parse() 337 - // .unwrap() 338 - // } 339 - // fn cid7() -> Cid { 340 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 341 - // .parse() 342 - // .unwrap() 343 - // } 344 - // fn cid8() -> Cid { 345 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 346 - // .parse() 347 - // .unwrap() 348 - // } 349 - // fn cid9() -> Cid { 350 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 351 - // .parse() 352 - // .unwrap() 353 - // } 354 316 355 317 #[test] 356 318 fn test_depth_spec_0() { ··· 441 403 .as_ref() 442 404 ); 443 405 } 444 - 445 - // #[test] 446 - // fn test_needs_from_node_just_one_record() { 447 - // let node = Node { 448 - // left: None, 449 - // entries: vec![Entry { 450 - // keysuffix: "asdf".into(), 451 - // prefix_len: 0, 452 - // value: cid1(), 453 - // tree: None, 454 - // }], 455 - // }; 456 - // assert_eq!( 457 - // needs_from_node(node).unwrap(), 458 - // vec![Need::Record { 459 - // rkey: "asdf".into(), 460 - // cid: cid1(), 461 - // },] 462 - // ); 463 - // } 464 - 465 - // #[test] 466 - // fn test_needs_from_node_two_records() { 467 - // let node = Node { 468 - // left: None, 469 - // entries: vec![ 470 - // Entry { 471 - // keysuffix: "asdf".into(), 472 - // prefix_len: 0, 473 - // value: cid1(), 474 - // tree: None, 475 - // }, 476 - // Entry { 477 - // keysuffix: "gh".into(), 478 - // prefix_len: 2, 479 - // value: cid2(), 480 - // tree: None, 481 - // }, 482 - // ], 483 - // }; 484 - // assert_eq!( 485 - // needs_from_node(node).unwrap(), 486 - // vec![ 487 - // Need::Record { 488 - // rkey: "asdf".into(), 489 - // cid: cid1(), 490 - // }, 491 - // Need::Record { 492 - // rkey: "asgh".into(), 493 - // cid: cid2(), 494 - // }, 495 - // ] 496 - // ); 497 - // } 498 - 499 - // #[test] 500 - // fn test_needs_from_node_with_both() { 501 - // let node = Node { 502 - // left: None, 503 - // entries: vec![Entry { 504 - // keysuffix: "asdf".into(), 505 - // prefix_len: 0, 506 - // value: cid1(), 507 - // tree: Some(cid2()), 508 - // }], 509 - // }; 510 - // assert_eq!( 511 - // needs_from_node(node).unwrap(), 512 - // vec![ 513 - // Need::Record { 514 - // rkey: "asdf".into(), 515 - // cid: cid1(), 516 - // }, 517 - // Need::Node(cid2()), 518 - // ] 519 - // ); 520 - // } 521 - 522 - // #[test] 523 - // fn test_needs_from_node_left_and_record() { 524 - // let node = Node { 525 - // left: Some(cid1()), 526 - // entries: vec![Entry { 527 - // keysuffix: "asdf".into(), 528 - // prefix_len: 0, 529 - // value: cid2(), 530 - // tree: None, 531 - // }], 532 - // }; 533 - // assert_eq!( 534 - // needs_from_node(node).unwrap(), 535 - // vec![ 536 - // Need::Node(cid1()), 537 - // Need::Record { 538 - // rkey: "asdf".into(), 539 - // cid: cid2(), 540 - // }, 541 - // ] 542 - // ); 543 - // } 544 - 545 - // #[test] 546 - // fn test_needs_from_full_node() { 547 - // let node = Node { 548 - // left: Some(cid1()), 549 - // entries: vec![ 550 - // Entry { 551 - // keysuffix: "asdf".into(), 552 - // prefix_len: 0, 553 - // value: cid2(), 554 - // tree: Some(cid3()), 555 - // }, 556 - // Entry { 557 - // keysuffix: "ghi".into(), 558 - // prefix_len: 1, 559 - // value: cid4(), 560 - // tree: Some(cid5()), 561 - // }, 562 - // Entry { 563 - // keysuffix: "jkl".into(), 564 - // prefix_len: 2, 565 - // value: cid6(), 566 - // tree: Some(cid7()), 567 - // }, 568 - // Entry { 569 - // keysuffix: "mno".into(), 570 - // prefix_len: 4, 571 - // value: cid8(), 572 - // tree: Some(cid9()), 573 - // }, 574 - // ], 575 - // }; 576 - // assert_eq!( 577 - // needs_from_node(node).unwrap(), 578 - // vec![ 579 - // Need::Node(cid1()), 580 - // Need::Record { 581 - // rkey: "asdf".into(), 582 - // cid: cid2(), 583 - // }, 584 - // Need::Node(cid3()), 585 - // Need::Record { 586 - // rkey: "aghi".into(), 587 - // cid: cid4(), 588 - // }, 589 - // Need::Node(cid5()), 590 - // Need::Record { 591 - // rkey: "agjkl".into(), 592 - // cid: cid6(), 593 - // }, 594 - // Need::Node(cid7()), 595 - // Need::Record { 596 - // rkey: "agjkmno".into(), 597 - // cid: cid8(), 598 - // }, 599 - // Need::Node(cid9()), 600 - // ] 601 - // ); 602 - // } 603 406 }
+20 -10
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 + use repo_stream::Driver; 2 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 3 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 4 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 5 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 6 8 7 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 8 - let mb = 2_usize.pow(20); 9 - 10 - let mut driver = match repo_stream::drive::load_car(bytes, |block| block.len(), 10 * mb) 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 15 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 11 16 .await 12 17 .unwrap() 13 18 { 14 - repo_stream::drive::Vehicle::Lil(_commit, mem_driver) => mem_driver, 15 - repo_stream::drive::Vehicle::Big(_) => panic!("too big"), 19 + Driver::Memory(_commit, mem_driver) => mem_driver, 20 + Driver::Disk(_) => panic!("too big"), 16 21 }; 17 22 18 23 let mut records = 0; ··· 34 39 35 40 assert_eq!(records, expected_records); 36 41 assert_eq!(sum, expected_sum); 37 - assert!(found_bsky_profile); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 38 48 } 39 49 40 50 #[tokio::test] 41 51 async fn test_tiny_car() { 42 - test_car(TINY_CAR, 8, 2071).await 52 + test_car(TINY_CAR, 8, 2071, true).await 43 53 } 44 54 45 55 #[tokio::test] 46 56 async fn test_little_car() { 47 - test_car(LITTLE_CAR, 278, 246960).await 57 + test_car(LITTLE_CAR, 278, 246960, true).await 48 58 } 49 59 50 60 #[tokio::test] 51 61 async fn test_midsize_car() { 52 - test_car(MIDSIZE_CAR, 11585, 3741393).await 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 53 63 }