Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+337 -242
benches
car-samples
examples
disk-read-file
src
tests
+170 -61
Cargo.lock
··· 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 169 [[package]] 170 name = "bytes" 171 version = "1.10.1" 172 source = "registry+https://github.com/rust-lang/crates.io-index" 173 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 174 175 [[package]] 176 name = "cast" ··· 281 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 283 [[package]] 284 name = "const-str" 285 version = "0.4.3" 286 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 ] 359 360 [[package]] 361 name = "crossbeam-utils" 362 version = "0.8.21" 363 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 380 ] 381 382 [[package]] 383 name = "data-encoding" 384 version = "2.9.0" 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 422 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 424 [[package]] 425 name = "env_filter" 426 version = "0.1.3" 427 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 445 ] 446 447 [[package]] 448 name = "errno" 449 version = "0.3.14" 450 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 455 ] 456 457 [[package]] 458 - name = "fallible-iterator" 459 - version = "0.3.0" 460 source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 462 463 [[package]] 464 - name = "fallible-streaming-iterator" 465 - version = "0.1.9" 466 source = "registry+https://github.com/rust-lang/crates.io-index" 467 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 468 469 [[package]] 470 - name = "fastrand" 471 - version = "2.3.0" 472 source = "registry+https://github.com/rust-lang/crates.io-index" 473 - checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 474 - 475 - [[package]] 476 - name = "foldhash" 477 - version = "0.1.5" 478 - source = "registry+https://github.com/rust-lang/crates.io-index" 479 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 480 481 [[package]] 482 name = "futures" ··· 608 609 [[package]] 610 name = "hashbrown" 611 - version = "0.15.5" 612 source = "registry+https://github.com/rust-lang/crates.io-index" 613 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 - dependencies = [ 615 - "foldhash", 616 - ] 617 618 [[package]] 619 - name = "hashlink" 620 - version = "0.10.0" 621 source = "registry+https://github.com/rust-lang/crates.io-index" 622 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 - dependencies = [ 624 - "hashbrown", 625 - ] 626 627 [[package]] 628 name = "heck" ··· 631 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 633 [[package]] 634 name = "io-uring" 635 version = "0.7.10" 636 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 730 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 731 732 [[package]] 733 - name = "libsqlite3-sys" 734 - version = "0.35.0" 735 - source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 737 - dependencies = [ 738 - "pkg-config", 739 - "vcpkg", 740 - ] 741 - 742 - [[package]] 743 name = "linux-raw-sys" 744 version = "0.11.0" 745 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 761 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 763 [[package]] 764 name = "match-lookup" 765 version = "0.1.1" 766 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 892 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 893 894 [[package]] 895 - name = "pkg-config" 896 - version = "0.3.32" 897 - source = "registry+https://github.com/rust-lang/crates.io-index" 898 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 - 900 - [[package]] 901 name = "plotters" 902 version = "0.3.7" 903 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 950 ] 951 952 [[package]] 953 name = "quote" 954 version = "1.0.41" 955 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1024 1025 [[package]] 1026 name = "repo-stream" 1027 - version = "0.1.1" 1028 dependencies = [ 1029 "bincode", 1030 "clap", 1031 "criterion", 1032 "env_logger", 1033 "futures", 1034 "futures-core", 1035 "ipld-core", 1036 "iroh-car", 1037 "log", 1038 "multibase", 1039 - "rusqlite", 1040 "serde", 1041 "serde_bytes", 1042 "serde_ipld_dagcbor", ··· 1047 ] 1048 1049 [[package]] 1050 - name = "rusqlite" 1051 - version = "0.37.0" 1052 source = "registry+https://github.com/rust-lang/crates.io-index" 1053 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 - dependencies = [ 1055 - "bitflags", 1056 - "fallible-iterator", 1057 - "fallible-streaming-iterator", 1058 - "hashlink", 1059 - "libsqlite3-sys", 1060 - "smallvec", 1061 - ] 1062 1063 [[package]] 1064 - name = "rustc-demangle" 1065 - version = "0.1.26" 1066 source = "registry+https://github.com/rust-lang/crates.io-index" 1067 - checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1068 1069 [[package]] 1070 name = "rustix" ··· 1105 version = "1.2.0" 1106 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1108 1109 [[package]] 1110 name = "serde" ··· 1172 ] 1173 1174 [[package]] 1175 name = "sha2" 1176 version = "0.10.9" 1177 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1211 dependencies = [ 1212 "libc", 1213 "windows-sys 0.59.0", 1214 ] 1215 1216 [[package]] ··· 1372 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 1374 [[package]] 1375 - name = "vcpkg" 1376 - version = "0.2.15" 1377 source = "registry+https://github.com/rust-lang/crates.io-index" 1378 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 1380 [[package]] 1381 name = "version_check" ··· 1659 version = "0.46.0" 1660 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1662 1663 [[package]] 1664 name = "zerocopy"
··· 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 169 [[package]] 170 + name = "byteorder-lite" 171 + version = "0.1.0" 172 + source = "registry+https://github.com/rust-lang/crates.io-index" 173 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 174 + 175 + [[package]] 176 name = "bytes" 177 version = "1.10.1" 178 source = "registry+https://github.com/rust-lang/crates.io-index" 179 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 180 + 181 + [[package]] 182 + name = "byteview" 183 + version = "0.10.0" 184 + source = "registry+https://github.com/rust-lang/crates.io-index" 185 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 186 187 [[package]] 188 name = "cast" ··· 293 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 294 295 [[package]] 296 + name = "compare" 297 + version = "0.0.6" 298 + source = "registry+https://github.com/rust-lang/crates.io-index" 299 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 300 + 301 + [[package]] 302 name = "const-str" 303 version = "0.4.3" 304 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 376 ] 377 378 [[package]] 379 + name = "crossbeam-skiplist" 380 + version = "0.1.3" 381 + source = "registry+https://github.com/rust-lang/crates.io-index" 382 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 383 + dependencies = [ 384 + "crossbeam-epoch", 385 + "crossbeam-utils", 386 + ] 387 + 388 + [[package]] 389 name = "crossbeam-utils" 390 version = "0.8.21" 391 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 408 ] 409 410 [[package]] 411 + name = "dashmap" 412 + version = "6.1.0" 413 + source = "registry+https://github.com/rust-lang/crates.io-index" 414 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 415 + dependencies = [ 416 + "cfg-if", 417 + "crossbeam-utils", 418 + "hashbrown 0.14.5", 419 + "lock_api", 420 + "once_cell", 421 + "parking_lot_core", 422 + ] 423 + 424 + [[package]] 425 name = "data-encoding" 426 version = "2.9.0" 427 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 464 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 465 466 [[package]] 467 + name = "enum_dispatch" 468 + version = "0.3.13" 469 + source = "registry+https://github.com/rust-lang/crates.io-index" 470 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 471 + dependencies = [ 472 + "once_cell", 473 + "proc-macro2", 474 + "quote", 475 + "syn 2.0.106", 476 + ] 477 + 478 + [[package]] 479 name = "env_filter" 480 version = "0.1.3" 481 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 499 ] 500 501 [[package]] 502 + name = "equivalent" 503 + version = "1.0.2" 504 + source = "registry+https://github.com/rust-lang/crates.io-index" 505 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 506 + 507 + [[package]] 508 name = "errno" 509 version = "0.3.14" 510 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 515 ] 516 517 [[package]] 518 + name = "fastrand" 519 + version = "2.3.0" 520 source = "registry+https://github.com/rust-lang/crates.io-index" 521 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 522 523 [[package]] 524 + name = "fjall" 525 + version = "3.0.1" 526 source = "registry+https://github.com/rust-lang/crates.io-index" 527 + checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093" 528 + dependencies = [ 529 + "byteorder-lite", 530 + "byteview", 531 + "dashmap", 532 + "flume", 533 + "log", 534 + "lsm-tree", 535 + "tempfile", 536 + "xxhash-rust", 537 + ] 538 539 [[package]] 540 + name = "flume" 541 + version = "0.12.0" 542 source = "registry+https://github.com/rust-lang/crates.io-index" 543 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 544 + dependencies = [ 545 + "spin", 546 + ] 547 548 [[package]] 549 name = "futures" ··· 675 676 [[package]] 677 name = "hashbrown" 678 + version = "0.14.5" 679 source = "registry+https://github.com/rust-lang/crates.io-index" 680 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 681 682 [[package]] 683 + name = "hashbrown" 684 + version = "0.16.1" 685 source = "registry+https://github.com/rust-lang/crates.io-index" 686 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 687 688 [[package]] 689 name = "heck" ··· 692 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 693 694 [[package]] 695 + name = "interval-heap" 696 + version = "0.0.5" 697 + source = "registry+https://github.com/rust-lang/crates.io-index" 698 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 699 + dependencies = [ 700 + "compare", 701 + ] 702 + 703 + [[package]] 704 name = "io-uring" 705 version = "0.7.10" 706 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 800 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 801 802 [[package]] 803 name = "linux-raw-sys" 804 version = "0.11.0" 805 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 821 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 822 823 [[package]] 824 + name = "lsm-tree" 825 + version = "3.0.1" 826 + source = "registry+https://github.com/rust-lang/crates.io-index" 827 + checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 828 + dependencies = [ 829 + "byteorder-lite", 830 + "byteview", 831 + "crossbeam-skiplist", 832 + "enum_dispatch", 833 + "interval-heap", 834 + "log", 835 + "quick_cache", 836 + "rustc-hash", 837 + "self_cell", 838 + "sfa", 839 + "tempfile", 840 + "varint-rs", 841 + "xxhash-rust", 842 + ] 843 + 844 + [[package]] 845 name = "match-lookup" 846 version = "0.1.1" 847 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 973 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 974 975 [[package]] 976 name = "plotters" 977 version = "0.3.7" 978 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1025 ] 1026 1027 [[package]] 1028 + name = "quick_cache" 1029 + version = "0.6.18" 1030 + source = "registry+https://github.com/rust-lang/crates.io-index" 1031 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1032 + dependencies = [ 1033 + "equivalent", 1034 + "hashbrown 0.16.1", 1035 + ] 1036 + 1037 + [[package]] 1038 name = "quote" 1039 version = "1.0.41" 1040 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1109 1110 [[package]] 1111 name = "repo-stream" 1112 + version = "0.2.2" 1113 dependencies = [ 1114 "bincode", 1115 "clap", 1116 "criterion", 1117 "env_logger", 1118 + "fjall", 1119 "futures", 1120 "futures-core", 1121 "ipld-core", 1122 "iroh-car", 1123 "log", 1124 "multibase", 1125 "serde", 1126 "serde_bytes", 1127 "serde_ipld_dagcbor", ··· 1132 ] 1133 1134 [[package]] 1135 + name = "rustc-demangle" 1136 + version = "0.1.26" 1137 source = "registry+https://github.com/rust-lang/crates.io-index" 1138 + checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1139 1140 [[package]] 1141 + name = "rustc-hash" 1142 + version = "2.1.1" 1143 source = "registry+https://github.com/rust-lang/crates.io-index" 1144 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1145 1146 [[package]] 1147 name = "rustix" ··· 1182 version = "1.2.0" 1183 source = "registry+https://github.com/rust-lang/crates.io-index" 1184 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1185 + 1186 + [[package]] 1187 + name = "self_cell" 1188 + version = "1.2.2" 1189 + source = "registry+https://github.com/rust-lang/crates.io-index" 1190 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1191 1192 [[package]] 1193 name = "serde" ··· 1255 ] 1256 1257 [[package]] 1258 + name = "sfa" 1259 + version = "1.0.0" 1260 + source = "registry+https://github.com/rust-lang/crates.io-index" 1261 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1262 + dependencies = [ 1263 + "byteorder-lite", 1264 + "log", 1265 + "xxhash-rust", 1266 + ] 1267 + 1268 + [[package]] 1269 name = "sha2" 1270 version = "0.10.9" 1271 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1305 dependencies = [ 1306 "libc", 1307 "windows-sys 0.59.0", 1308 + ] 1309 + 1310 + [[package]] 1311 + name = "spin" 1312 + version = "0.9.8" 1313 + source = "registry+https://github.com/rust-lang/crates.io-index" 1314 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1315 + dependencies = [ 1316 + "lock_api", 1317 ] 1318 1319 [[package]] ··· 1475 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1476 1477 [[package]] 1478 + name = "varint-rs" 1479 + version = "2.2.0" 1480 source = "registry+https://github.com/rust-lang/crates.io-index" 1481 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1482 1483 [[package]] 1484 name = "version_check" ··· 1762 version = "0.46.0" 1763 source = "registry+https://github.com/rust-lang/crates.io-index" 1764 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1765 + 1766 + [[package]] 1767 + name = "xxhash-rust" 1768 + version = "0.8.15" 1769 + source = "registry+https://github.com/rust-lang/crates.io-index" 1770 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1771 1772 [[package]] 1773 name = "zerocopy"
+3 -3
Cargo.toml
··· 1 [package] 2 name = "repo-stream" 3 - version = "0.1.1" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 - description = "Fast and robust atproto CAR file processing in rust" 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies] 10 bincode = { version = "2.0.1", features = ["serde"] } 11 futures = "0.3.31" 12 futures-core = "0.3.31" 13 ipld-core = { version = "0.4.2", features = ["serde"] } 14 iroh-car = "0.5.1" 15 log = "0.4.28" 16 multibase = "0.9.2" 17 - rusqlite = "0.37.0" 18 serde = { version = "1.0.228", features = ["derive"] } 19 serde_bytes = "0.11.19" 20 serde_ipld_dagcbor = "0.6.4"
··· 1 [package] 2 name = "repo-stream" 3 + version = "0.2.2" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 + description = "A robust CAR file -> MST walker for atproto" 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies] 10 bincode = { version = "2.0.1", features = ["serde"] } 11 + fjall = { version = "3.0.1", default-features = false } 12 futures = "0.3.31" 13 futures-core = "0.3.31" 14 ipld-core = { version = "0.4.2", features = ["serde"] } 15 iroh-car = "0.5.1" 16 log = "0.4.28" 17 multibase = "0.9.2" 18 serde = { version = "1.0.228", features = ["derive"] } 19 serde_bytes = "0.11.19" 20 serde_ipld_dagcbor = "0.6.4"
+4
benches/non-huge-cars.rs
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 13 .build() 14 .expect("Creating runtime failed"); 15 16 c.bench_function("tiny-car", |b| { 17 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 18 });
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 14 .build() 15 .expect("Creating runtime failed"); 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 20 c.bench_function("tiny-car", |b| { 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 22 });
car-samples/empty.car

This is a binary file and will not be displayed.

+6 -7
examples/disk-read-file/main.rs
··· 6 use clap::Parser; 7 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 10 #[derive(Debug, Parser)] 11 struct Args { ··· 27 let reader = tokio::io::BufReader::new(reader); 28 29 log::info!("hello! reading the car..."); 30 31 // in this example we only bother handling CARs that are too big for memory 32 // `noop` helper means: do no block processing, store the raw blocks 33 let driver = match DriverBuilder::new() 34 - .with_mem_limit_mb(10) // how much memory can be used before disk spill 35 .load_car(reader) 36 .await? 37 { ··· 48 49 // at this point you might want to fetch the account's signing key 50 // via the DID from the commit, and then verify the signature. 51 - log::warn!("big's comit: {:?}", commit); 52 53 // pop the driver back out to get some code indentation relief 54 driver ··· 78 } 79 } 80 81 - log::info!("arrived! joining rx..."); 82 83 - // clean up the database. would be nice to do this in drop so it happens 84 - // automatically, but some blocking work happens, so that's not allowed in 85 - // async rust. ๐Ÿคทโ€โ™€๏ธ 86 - join.await?.reset_store().await?; 87 88 log::info!("done. n={n} zeros={zeros}"); 89
··· 6 use clap::Parser; 7 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 + use std::time::Instant; 10 11 #[derive(Debug, Parser)] 12 struct Args { ··· 28 let reader = tokio::io::BufReader::new(reader); 29 30 log::info!("hello! reading the car..."); 31 + let t0 = Instant::now(); 32 33 // in this example we only bother handling CARs that are too big for memory 34 // `noop` helper means: do no block processing, store the raw blocks 35 let driver = match DriverBuilder::new() 36 + .with_mem_limit_mb(32) // how much memory can be used before disk spill 37 .load_car(reader) 38 .await? 39 { ··· 50 51 // at this point you might want to fetch the account's signing key 52 // via the DID from the commit, and then verify the signature. 53 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 54 55 // pop the driver back out to get some code indentation relief 56 driver ··· 80 } 81 } 82 83 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 85 + join.await?; 86 87 log::info!("done. n={n} zeros={zeros}"); 88
+58 -3
readme.md
··· 1 # repo-stream 2 3 - Efficient and robust atproto CAR file processing in rust 4 5 - todo 6 7 - [ ] get an *emtpy* car for the test suite 8 - - [ ] implement a max size on disk limit 9 10 11 -----
··· 1 # repo-stream 2 3 + A robust CAR file -> MST walker for atproto 4 + 5 + [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 + [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 8 + 9 + [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 10 + [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 + 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 15 + 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + } 54 + }; 55 + println!("sum of size of all records: {total_size}"); 56 + Ok(()) 57 + } 58 + ``` 59 + 60 + more recent todo 61 62 - [ ] get an *emtpy* car for the test suite 63 + - [x] implement a max size on disk limit 64 65 66 -----
+38 -96
src/disk.rs
··· 18 */ 19 20 use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 22 use std::path::PathBuf; 23 24 #[derive(Debug, thiserror::Error)] ··· 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), ··· 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 } 54 55 /// Builder-style disk store setup 56 pub struct DiskBuilder { 57 /// Database in-memory cache allowance 58 /// ··· 70 impl Default for DiskBuilder { 71 fn default() -> Self { 72 Self { 73 - cache_size_mb: 32, 74 max_stored_mb: 10 * 1024, // 10 GiB 75 } 76 } ··· 83 } 84 /// Set the in-memory cache allowance for the database 85 /// 86 - /// Default: 32 MiB 87 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 88 self.cache_size_mb = size; 89 self ··· 96 self 97 } 98 /// Open and initialize the actual disk storage 99 - pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 } 102 } 103 104 /// On-disk block storage 105 pub struct DiskStore { 106 - conn: rusqlite::Connection, 107 max_stored: usize, 108 stored: usize, 109 } ··· 116 max_stored_mb: usize, 117 ) -> Result<Self, DiskError> { 118 let max_stored = max_stored_mb * 2_usize.pow(20); 119 - let conn = tokio::task::spawn_blocking(move || { 120 - let conn = rusqlite::Connection::open(path)?; 121 - 122 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 123 124 - // conn.pragma_update(None, "journal_mode", "OFF")?; 125 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 126 - conn.pragma_update(None, "journal_mode", "WAL")?; 127 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 128 - conn.pragma_update(None, "synchronous", "OFF")?; 129 - conn.pragma_update( 130 - None, 131 - "cache_size", 132 - (cache_mb as i64 * sqlite_one_mb).to_string(), 133 - )?; 134 - Self::reset_tables(&conn)?; 135 - 136 - Ok::<_, DiskError>(conn) 137 }) 138 .await??; 139 140 Ok(Self { 141 - conn, 142 max_stored, 143 stored: 0, 144 }) 145 } 146 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 147 - let tx = self.conn.transaction()?; 148 - Ok(SqliteWriter { 149 - tx, 150 - stored: &mut self.stored, 151 - max: self.max_stored, 152 - }) 153 - } 154 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 155 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 156 - Ok(SqliteReader { select_stmt }) 157 - } 158 - /// Drop and recreate the kv table 159 - pub async fn reset(self) -> Result<Self, DiskError> { 160 - tokio::task::spawn_blocking(move || { 161 - Self::reset_tables(&self.conn)?; 162 - Ok(self) 163 - }) 164 - .await? 165 - } 166 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 167 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 168 - conn.execute( 169 - "CREATE TABLE blocks ( 170 - key BLOB PRIMARY KEY NOT NULL, 171 - val BLOB NOT NULL 172 - ) WITHOUT ROWID", 173 - (), 174 - )?; 175 - Ok(()) 176 - } 177 - } 178 179 - pub(crate) struct SqliteWriter<'conn> { 180 - tx: rusqlite::Transaction<'conn>, 181 - stored: &'conn mut usize, 182 - max: usize, 183 - } 184 - 185 - impl SqliteWriter<'_> { 186 pub(crate) fn put_many( 187 &mut self, 188 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 189 ) -> Result<(), DriveError> { 190 - let mut insert_stmt = self 191 - .tx 192 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 193 - .map_err(DiskError::DbError)?; 194 for pair in kv { 195 let (k, v) = pair?; 196 - *self.stored += v.len(); 197 - if *self.stored > self.max { 198 return Err(DiskError::MaxSizeExceeded.into()); 199 } 200 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 201 } 202 Ok(()) 203 } 204 - pub fn commit(self) -> Result<(), DiskError> { 205 - self.tx.commit()?; 206 - Ok(()) 207 - } 208 - } 209 - 210 - pub(crate) struct SqliteReader<'conn> { 211 - select_stmt: rusqlite::Statement<'conn>, 212 - } 213 214 - impl SqliteReader<'_> { 215 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 216 - self.select_stmt 217 - .query_one((&key,), |row| row.get(0)) 218 - .optional() 219 } 220 }
··· 18 */ 19 20 use crate::drive::DriveError; 21 + use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 22 + use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 23 use std::path::PathBuf; 24 25 #[derive(Debug, thiserror::Error)] ··· 29 /// (The wrapped err should probably be obscured to remove public-facing 30 /// sqlite bits) 31 #[error(transparent)] 32 + DbError(#[from] FjallError), 33 /// A tokio blocking task failed to join 34 #[error("Failed to join a tokio blocking task: {0}")] 35 JoinError(#[from] tokio::task::JoinError), ··· 39 /// limit. 40 #[error("Maximum disk size reached")] 41 MaxSizeExceeded, 42 } 43 44 /// Builder-style disk store setup 45 + #[derive(Debug, Clone)] 46 pub struct DiskBuilder { 47 /// Database in-memory cache allowance 48 /// ··· 60 impl Default for DiskBuilder { 61 fn default() -> Self { 62 Self { 63 + cache_size_mb: 64, 64 max_stored_mb: 10 * 1024, // 10 GiB 65 } 66 } ··· 73 } 74 /// Set the in-memory cache allowance for the database 75 /// 76 + /// Default: 64 MiB 77 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 78 self.cache_size_mb = size; 79 self ··· 86 self 87 } 88 /// Open and initialize the actual disk storage 89 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 90 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 91 } 92 } 93 94 /// On-disk block storage 95 pub struct DiskStore { 96 + #[allow(unused)] 97 + db: Database, 98 + partition: Keyspace, 99 max_stored: usize, 100 stored: usize, 101 } ··· 108 max_stored_mb: usize, 109 ) -> Result<Self, DiskError> { 110 let max_stored = max_stored_mb * 2_usize.pow(20); 111 + let (db, partition) = tokio::task::spawn_blocking(move || { 112 + let db = Database::builder(path) 113 + // .manual_journal_persist(true) 114 + // .flush_workers(1) 115 + // .compaction_workers(1) 116 + .journal_compression(CompressionType::None) 117 + .cache_size(cache_mb as u64 * 2_u64.pow(20)) 118 + .temporary(true) 119 + .open()?; 120 + let opts = KeyspaceCreateOptions::default() 121 + .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 122 + .filter_block_pinning_policy(PinningPolicy::disabled()) 123 + .expect_point_read_hits(true) 124 + .data_block_compression_policy(CompressionPolicy::disabled()) 125 + .manual_journal_persist(true) 126 + .max_memtable_size(32 * 2_u64.pow(20)); 127 + let partition = db.keyspace("z", || opts)?; 128 129 + Ok::<_, DiskError>((db, partition)) 130 }) 131 .await??; 132 133 Ok(Self { 134 + db, 135 + partition, 136 max_stored, 137 stored: 0, 138 }) 139 } 140 141 pub(crate) fn put_many( 142 &mut self, 143 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 144 ) -> Result<(), DriveError> { 145 + let mut batch = self.db.batch(); 146 for pair in kv { 147 let (k, v) = pair?; 148 + self.stored += v.len(); 149 + if self.stored > self.max_stored { 150 return Err(DiskError::MaxSizeExceeded.into()); 151 } 152 + batch.insert(&self.partition, k, v); 153 } 154 + batch.commit().map_err(DiskError::DbError)?; 155 Ok(()) 156 } 157 158 + #[inline] 159 + pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 160 + self.partition.get(key) 161 } 162 }
+9 -55
src/drive.rs
··· 116 } 117 118 /// Builder-style driver setup 119 pub struct DriverBuilder { 120 pub mem_limit_mb: usize, 121 } ··· 153 } 154 /// Begin processing an atproto MST from a CAR file 155 pub async fn load_car<R: AsyncRead + Unpin>( 156 - self, 157 reader: R, 158 ) -> Result<Driver<R, Vec<u8>>, DriveError> { 159 Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await ··· 163 /// Builder-style driver intermediate step 164 /// 165 /// start from `DriverBuilder` 166 pub struct DriverBuilderWithProcessor<T: Processable> { 167 pub mem_limit_mb: usize, 168 pub block_processor: fn(Vec<u8>) -> T, ··· 178 } 179 /// Begin processing an atproto MST from a CAR file 180 pub async fn load_car<R: AsyncRead + Unpin>( 181 - self, 182 reader: R, 183 ) -> Result<Driver<R, T>, DriveError> { 184 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await ··· 333 // move store in and back out so we can manage lifetimes 334 // dump mem blocks into the store 335 store = tokio::task::spawn(async move { 336 - let mut writer = store.get_writer()?; 337 - 338 let kvs = self 339 .mem_blocks 340 .into_iter() 341 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 342 343 - writer.put_many(kvs)?; 344 - writer.commit()?; 345 Ok::<_, DriveError>(store) 346 }) 347 .await??; 348 349 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 350 351 let store_worker = tokio::task::spawn_blocking(move || { 352 - let mut writer = store.get_writer()?; 353 - 354 while let Some(chunk) = rx.blocking_recv() { 355 let kvs = chunk 356 .into_iter() 357 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 358 - writer.put_many(kvs)?; 359 } 360 - 361 - writer.commit()?; 362 Ok::<_, DriveError>(store) 363 }); // await later 364 ··· 451 /// println!("{rkey}: size={}", record.len()); 452 /// } 453 /// } 454 - /// let store = disk_driver.reset_store().await?; 455 /// # Ok(()) 456 /// # } 457 /// ``` ··· 466 // comes out again. 467 let (state, res) = tokio::task::spawn_blocking( 468 move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 469 - let mut reader_res = state.store.get_reader(); 470 - let reader: &mut _ = match reader_res { 471 - Ok(ref mut r) => r, 472 - Err(ref mut e) => { 473 - // unfortunately we can't return the error directly because 474 - // (for some reason) it's attached to the lifetime of the 475 - // reader? 476 - // hack a mem::swap so we can get it out :/ 477 - let e_swapped = e.steal(); 478 - // the pain: `state` *has to* outlive the reader 479 - drop(reader_res); 480 - return (state, Err(e_swapped.into())); 481 - } 482 - }; 483 - 484 let mut out = Vec::with_capacity(n); 485 486 for _ in 0..n { 487 // walk as far as we can until we run out of blocks or find a record 488 - let step = match state.walker.disk_step(reader, process) { 489 Ok(s) => s, 490 Err(e) => { 491 - // the pain: `state` *has to* outlive the reader 492 - drop(reader_res); 493 return (state, Err(e.into())); 494 } 495 }; 496 match step { 497 Step::Missing(cid) => { 498 - // the pain: `state` *has to* outlive the reader 499 - drop(reader_res); 500 return (state, Err(DriveError::MissingBlock(cid))); 501 } 502 Step::Finish => break, 503 Step::Found { rkey, data } => out.push((rkey, data)), 504 }; 505 } 506 - 507 - // `state` *has to* outlive the reader 508 - drop(reader_res); 509 510 (state, Ok::<_, DriveError>(out)) 511 }, ··· 530 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 531 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 532 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 533 - let mut reader = match store.get_reader() { 534 - Ok(r) => r, 535 - Err(e) => return tx.blocking_send(Err(e.into())), 536 - }; 537 538 loop { 539 let mut out: BlockChunk<T> = Vec::with_capacity(n); ··· 541 for _ in 0..n { 542 // walk as far as we can until we run out of blocks or find a record 543 544 - let step = match walker.disk_step(&mut reader, self.process) { 545 Ok(s) => s, 546 Err(e) => return tx.blocking_send(Err(e.into())), 547 }; ··· 590 /// } 591 /// 592 /// } 593 - /// let store = join.await?.reset_store().await?; 594 /// # Ok(()) 595 /// # } 596 /// ``` ··· 612 }); 613 614 (rx, chan_task) 615 - } 616 - 617 - /// Reset the disk storage so it can be reused. You must call this. 618 - /// 619 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 620 - /// calls, that would be risky in an async context. For now you just have to 621 - /// carefully make sure you call it. 622 - /// 623 - /// The sqlite store is returned, so it can be reused for another 624 - /// `DiskDriver`. 625 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 626 - let BigState { store, .. } = self.state.take().expect("valid state"); 627 - Ok(store.reset().await?) 628 } 629 }
··· 116 } 117 118 /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 120 pub struct DriverBuilder { 121 pub mem_limit_mb: usize, 122 } ··· 154 } 155 /// Begin processing an atproto MST from a CAR file 156 pub async fn load_car<R: AsyncRead + Unpin>( 157 + &self, 158 reader: R, 159 ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await ··· 164 /// Builder-style driver intermediate step 165 /// 166 /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 168 pub struct DriverBuilderWithProcessor<T: Processable> { 169 pub mem_limit_mb: usize, 170 pub block_processor: fn(Vec<u8>) -> T, ··· 180 } 181 /// Begin processing an atproto MST from a CAR file 182 pub async fn load_car<R: AsyncRead + Unpin>( 183 + &self, 184 reader: R, 185 ) -> Result<Driver<R, T>, DriveError> { 186 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await ··· 335 // move store in and back out so we can manage lifetimes 336 // dump mem blocks into the store 337 store = tokio::task::spawn(async move { 338 let kvs = self 339 .mem_blocks 340 .into_iter() 341 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 342 343 + store.put_many(kvs)?; 344 Ok::<_, DriveError>(store) 345 }) 346 .await??; 347 348 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 349 350 let store_worker = tokio::task::spawn_blocking(move || { 351 while let Some(chunk) = rx.blocking_recv() { 352 let kvs = chunk 353 .into_iter() 354 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 355 + store.put_many(kvs)?; 356 } 357 Ok::<_, DriveError>(store) 358 }); // await later 359 ··· 446 /// println!("{rkey}: size={}", record.len()); 447 /// } 448 /// } 449 /// # Ok(()) 450 /// # } 451 /// ``` ··· 460 // comes out again. 461 let (state, res) = tokio::task::spawn_blocking( 462 move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 463 let mut out = Vec::with_capacity(n); 464 465 for _ in 0..n { 466 // walk as far as we can until we run out of blocks or find a record 467 + let step = match state.walker.disk_step(&mut state.store, process) { 468 Ok(s) => s, 469 Err(e) => { 470 return (state, Err(e.into())); 471 } 472 }; 473 match step { 474 Step::Missing(cid) => { 475 return (state, Err(DriveError::MissingBlock(cid))); 476 } 477 Step::Finish => break, 478 Step::Found { rkey, data } => out.push((rkey, data)), 479 }; 480 } 481 482 (state, Ok::<_, DriveError>(out)) 483 }, ··· 502 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 503 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 504 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 505 506 loop { 507 let mut out: BlockChunk<T> = Vec::with_capacity(n); ··· 509 for _ in 0..n { 510 // walk as far as we can until we run out of blocks or find a record 511 512 + let step = match walker.disk_step(store, self.process) { 513 Ok(s) => s, 514 Err(e) => return tx.blocking_send(Err(e.into())), 515 }; ··· 558 /// } 559 /// 560 /// } 561 /// # Ok(()) 562 /// # } 563 /// ``` ··· 579 }); 580 581 (rx, chan_task) 582 } 583 }
+1 -4
src/lib.rs
··· 53 total_size += size; 54 } 55 } 56 - 57 - // clean up the disk store (drop tables etc) 58 - driver.reset_store().await?; 59 } 60 }; 61 println!("sum of size of all records: {total_size}"); ··· 82 pub mod process; 83 84 pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 - pub use drive::{DriveError, Driver, DriverBuilder}; 86 pub use mst::Commit; 87 pub use process::Processable;
··· 53 total_size += size; 54 } 55 } 56 } 57 }; 58 println!("sum of size of all records: {total_size}"); ··· 79 pub mod process; 80 81 pub use disk::{DiskBuilder, DiskError, DiskStore}; 82 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 83 pub use mst::Commit; 84 pub use process::Processable;
+21
src/process.rs
··· 77 } 78 } 79 80 impl<Item: Sized + Processable> Processable for Vec<Item> { 81 fn get_size(&self) -> usize { 82 let slot_size = std::mem::size_of::<Item>(); ··· 85 direct_size + items_referenced_size 86 } 87 }
··· 77 } 78 } 79 80 + impl Processable for String { 81 + fn get_size(&self) -> usize { 82 + self.capacity() 83 + } 84 + } 85 + 86 impl<Item: Sized + Processable> Processable for Vec<Item> { 87 fn get_size(&self) -> usize { 88 let slot_size = std::mem::size_of::<Item>(); ··· 91 direct_size + items_referenced_size 92 } 93 } 94 + 95 + impl<Item: Processable> Processable for Option<Item> { 96 + fn get_size(&self) -> usize { 97 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 + } 99 + } 100 + 101 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 + fn get_size(&self) -> usize { 103 + match self { 104 + Ok(item) => item.get_size(), 105 + Err(err) => err.get_size(), 106 + } 107 + } 108 + }
+11 -8
src/walk.rs
··· 1 //! Depth-first MST traversal 2 3 - use crate::disk::SqliteReader; 4 use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 use crate::mst::Node; 6 use crate::process::Processable; ··· 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 23 #[error("Decode error: {0}")] 24 DecodeError(#[from] DecodeError), 25 } ··· 87 } 88 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST 91 - // ...except for a single one for empty MST, but we wouldn't be pushing that 92 if node.is_empty() { 93 - return Err(MstError::EmptyNode); 94 } 95 96 let mut entries = Vec::with_capacity(node.entries.len()); ··· 236 /// blocking!!!!!! 237 pub fn disk_step<T: Processable>( 238 &mut self, 239 - reader: &mut SqliteReader, 240 process: impl Fn(Vec<u8>) -> T, 241 ) -> Result<Step<T>, WalkError> { 242 loop { ··· 249 &mut Need::Node { depth, cid } => { 250 let cid_bytes = cid.to_bytes(); 251 log::trace!("need node {cid:?}"); 252 - let Some(block_bytes) = reader.get(cid_bytes)? else { 253 log::trace!("node not found, resting"); 254 return Ok(Step::Missing(cid)); 255 }; ··· 271 Need::Record { rkey, cid } => { 272 log::trace!("need record {cid:?}"); 273 let cid_bytes = cid.to_bytes(); 274 - let Some(data_bytes) = reader.get(cid_bytes)? else { 275 log::trace!("record block not found, resting"); 276 return Ok(Step::Missing(*cid)); 277 };
··· 1 //! Depth-first MST traversal 2 3 + use crate::disk::DiskStore; 4 use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 use crate::mst::Node; 6 use crate::process::Processable; ··· 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 + StorageError(#[from] fjall::Error), 23 #[error("Decode error: {0}")] 24 DecodeError(#[from] DecodeError), 25 } ··· 87 } 88 89 fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 + // empty nodes are not allowed in the MST except in an empty MST 91 if node.is_empty() { 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 97 } 98 99 let mut entries = Vec::with_capacity(node.entries.len()); ··· 239 /// blocking!!!!!! 240 pub fn disk_step<T: Processable>( 241 &mut self, 242 + reader: &mut DiskStore, 243 process: impl Fn(Vec<u8>) -> T, 244 ) -> Result<Step<T>, WalkError> { 245 loop { ··· 252 &mut Need::Node { depth, cid } => { 253 let cid_bytes = cid.to_bytes(); 254 log::trace!("need node {cid:?}"); 255 + let Some(block_bytes) = reader.get(&cid_bytes)? else { 256 log::trace!("node not found, resting"); 257 return Ok(Step::Missing(cid)); 258 }; ··· 274 Need::Record { rkey, cid } => { 275 log::trace!("need record {cid:?}"); 276 let cid_bytes = cid.to_bytes(); 277 + let Some(data_bytes) = reader.get(&cid_bytes)? else { 278 log::trace!("record block not found, resting"); 279 return Ok(Step::Missing(*cid)); 280 };
+16 -5
tests/non-huge-cars.rs
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 4 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 5 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 6 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 7 8 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 9 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 10 .await 11 .unwrap() ··· 33 34 assert_eq!(records, expected_records); 35 assert_eq!(sum, expected_sum); 36 - assert!(found_bsky_profile); 37 } 38 39 #[tokio::test] 40 async fn test_tiny_car() { 41 - test_car(TINY_CAR, 8, 2071).await 42 } 43 44 #[tokio::test] 45 async fn test_little_car() { 46 - test_car(LITTLE_CAR, 278, 246960).await 47 } 48 49 #[tokio::test] 50 async fn test_midsize_car() { 51 - test_car(MIDSIZE_CAR, 11585, 3741393).await 52 }
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 8 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 15 let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 .await 17 .unwrap() ··· 39 40 assert_eq!(records, expected_records); 41 assert_eq!(sum, expected_sum); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 48 } 49 50 #[tokio::test] 51 async fn test_tiny_car() { 52 + test_car(TINY_CAR, 8, 2071, true).await 53 } 54 55 #[tokio::test] 56 async fn test_little_car() { 57 + test_car(LITTLE_CAR, 278, 246960, true).await 58 } 59 60 #[tokio::test] 61 async fn test_midsize_car() { 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 63 }