Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+1296 -1515
benches
car-samples
examples
disk-read-file
read-file
src
tests
+60 -533
Cargo.lock
··· 127 127 128 128 [[package]] 129 129 name = "bincode" 130 - version = "1.3.3" 131 - source = "registry+https://github.com/rust-lang/crates.io-index" 132 - checksum = "b1f45e9417d87227c7a56d22e471c6206462cba514c7590c09aff4cf6d1ddcad" 133 - dependencies = [ 134 - "serde", 135 - ] 136 - 137 - [[package]] 138 - name = "bincode" 139 130 version = "2.0.1" 140 131 source = "registry+https://github.com/rust-lang/crates.io-index" 141 132 checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" ··· 159 150 version = "2.9.4" 160 151 source = "registry+https://github.com/rust-lang/crates.io-index" 161 152 checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394" 153 + 154 + [[package]] 155 + name = "block-buffer" 156 + version = "0.10.4" 157 + source = "registry+https://github.com/rust-lang/crates.io-index" 158 + checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71" 162 159 dependencies = [ 163 - "serde", 160 + "generic-array", 164 161 ] 165 162 166 163 [[package]] ··· 170 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 171 168 172 169 [[package]] 173 - name = "byteorder" 174 - version = "1.5.0" 175 - source = "registry+https://github.com/rust-lang/crates.io-index" 176 - checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" 177 - 178 - [[package]] 179 170 name = "bytes" 180 171 version = "1.10.1" 181 172 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 194 185 checksum = "b544cf8c89359205f4f990d0e6f3828db42df85b5dac95d09157a250eb0749c4" 195 186 dependencies = [ 196 187 "serde", 197 - ] 198 - 199 - [[package]] 200 - name = "cc" 201 - version = "1.2.44" 202 - source = "registry+https://github.com/rust-lang/crates.io-index" 203 - checksum = "37521ac7aabe3d13122dc382493e20c9416f299d2ccd5b3a5340a2570cdeb0f3" 204 - dependencies = [ 205 - "find-msvc-tools", 206 - "shlex", 207 188 ] 208 189 209 190 [[package]] ··· 315 296 ] 316 297 317 298 [[package]] 299 + name = "cpufeatures" 300 + version = "0.2.17" 301 + source = "registry+https://github.com/rust-lang/crates.io-index" 302 + checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280" 303 + dependencies = [ 304 + "libc", 305 + ] 306 + 307 + [[package]] 318 308 name = "criterion" 319 309 version = "0.7.0" 320 310 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 368 358 ] 369 359 370 360 [[package]] 371 - name = "crossbeam-queue" 372 - version = "0.3.12" 373 - source = "registry+https://github.com/rust-lang/crates.io-index" 374 - checksum = "0f58bbc28f91df819d0aa2a2c00cd19754769c2fad90579b3592b1c9ba7a3115" 375 - dependencies = [ 376 - "crossbeam-utils", 377 - ] 378 - 379 - [[package]] 380 361 name = "crossbeam-utils" 381 362 version = "0.8.21" 382 363 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 389 370 checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5" 390 371 391 372 [[package]] 373 + name = "crypto-common" 374 + version = "0.1.6" 375 + source = "registry+https://github.com/rust-lang/crates.io-index" 376 + checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3" 377 + dependencies = [ 378 + "generic-array", 379 + "typenum", 380 + ] 381 + 382 + [[package]] 392 383 name = "data-encoding" 393 384 version = "2.9.0" 394 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 415 406 ] 416 407 417 408 [[package]] 418 - name = "displaydoc" 419 - version = "0.2.5" 409 + name = "digest" 410 + version = "0.10.7" 420 411 source = "registry+https://github.com/rust-lang/crates.io-index" 421 - checksum = "97369cbbc041bc366949bc74d34658d6cda5621039731c6310521892a3a20ae0" 412 + checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292" 422 413 dependencies = [ 423 - "proc-macro2", 424 - "quote", 425 - "syn 2.0.106", 426 - ] 427 - 428 - [[package]] 429 - name = "doxygen-rs" 430 - version = "0.4.2" 431 - source = "registry+https://github.com/rust-lang/crates.io-index" 432 - checksum = "415b6ec780d34dcf624666747194393603d0373b7141eef01d12ee58881507d9" 433 - dependencies = [ 434 - "phf", 414 + "block-buffer", 415 + "crypto-common", 435 416 ] 436 417 437 418 [[package]] ··· 492 473 checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 493 474 494 475 [[package]] 495 - name = "find-msvc-tools" 496 - version = "0.1.4" 497 - source = "registry+https://github.com/rust-lang/crates.io-index" 498 - checksum = "52051878f80a721bb68ebfbc930e07b65ba72f2da88968ea5c06fd6ca3d3a127" 499 - 500 - [[package]] 501 476 name = "foldhash" 502 477 version = "0.1.5" 503 478 source = "registry+https://github.com/rust-lang/crates.io-index" 504 479 checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 505 480 506 481 [[package]] 507 - name = "form_urlencoded" 508 - version = "1.2.2" 509 - source = "registry+https://github.com/rust-lang/crates.io-index" 510 - checksum = "cb4cb245038516f5f85277875cdaa4f7d2c9a0fa0468de06ed190163b1581fcf" 511 - dependencies = [ 512 - "percent-encoding", 513 - ] 514 - 515 - [[package]] 516 482 name = "futures" 517 483 version = "0.3.31" 518 484 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 602 568 ] 603 569 604 570 [[package]] 605 - name = "getrandom" 606 - version = "0.2.16" 571 + name = "generic-array" 572 + version = "0.14.9" 607 573 source = "registry+https://github.com/rust-lang/crates.io-index" 608 - checksum = "335ff9f135e4384c8150d6f27c6daed433577f86b4750418338c01a1a2528592" 574 + checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2" 609 575 dependencies = [ 610 - "cfg-if", 611 - "libc", 612 - "wasi 0.11.1+wasi-snapshot-preview1", 576 + "typenum", 577 + "version_check", 613 578 ] 614 579 615 580 [[package]] ··· 666 631 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 667 632 668 633 [[package]] 669 - name = "heed" 670 - version = "0.22.0" 671 - source = "registry+https://github.com/rust-lang/crates.io-index" 672 - checksum = "6a56c94661ddfb51aa9cdfbf102cfcc340aa69267f95ebccc4af08d7c530d393" 673 - dependencies = [ 674 - "bitflags", 675 - "byteorder", 676 - "heed-traits", 677 - "heed-types", 678 - "libc", 679 - "lmdb-master-sys", 680 - "once_cell", 681 - "page_size", 682 - "serde", 683 - "synchronoise", 684 - "url", 685 - ] 686 - 687 - [[package]] 688 - name = "heed-traits" 689 - version = "0.20.0" 690 - source = "registry+https://github.com/rust-lang/crates.io-index" 691 - checksum = "eb3130048d404c57ce5a1ac61a903696e8fcde7e8c2991e9fcfc1f27c3ef74ff" 692 - 693 - [[package]] 694 - name = "heed-types" 695 - version = "0.21.0" 696 - source = "registry+https://github.com/rust-lang/crates.io-index" 697 - checksum = "13c255bdf46e07fb840d120a36dcc81f385140d7191c76a7391672675c01a55d" 698 - dependencies = [ 699 - "bincode 1.3.3", 700 - "byteorder", 701 - "heed-traits", 702 - "serde", 703 - "serde_json", 704 - ] 705 - 706 - [[package]] 707 - name = "icu_collections" 708 - version = "2.1.1" 709 - source = "registry+https://github.com/rust-lang/crates.io-index" 710 - checksum = "4c6b649701667bbe825c3b7e6388cb521c23d88644678e83c0c4d0a621a34b43" 711 - dependencies = [ 712 - "displaydoc", 713 - "potential_utf", 714 - "yoke", 715 - "zerofrom", 716 - "zerovec", 717 - ] 718 - 719 - [[package]] 720 - name = "icu_locale_core" 721 - version = "2.1.1" 722 - source = "registry+https://github.com/rust-lang/crates.io-index" 723 - checksum = "edba7861004dd3714265b4db54a3c390e880ab658fec5f7db895fae2046b5bb6" 724 - dependencies = [ 725 - "displaydoc", 726 - "litemap", 727 - "tinystr", 728 - "writeable", 729 - "zerovec", 730 - ] 731 - 732 - [[package]] 733 - name = "icu_normalizer" 734 - version = "2.1.1" 735 - source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "5f6c8828b67bf8908d82127b2054ea1b4427ff0230ee9141c54251934ab1b599" 737 - dependencies = [ 738 - "icu_collections", 739 - "icu_normalizer_data", 740 - "icu_properties", 741 - "icu_provider", 742 - "smallvec", 743 - "zerovec", 744 - ] 745 - 746 - [[package]] 747 - name = "icu_normalizer_data" 748 - version = "2.1.1" 749 - source = "registry+https://github.com/rust-lang/crates.io-index" 750 - checksum = "7aedcccd01fc5fe81e6b489c15b247b8b0690feb23304303a9e560f37efc560a" 751 - 752 - [[package]] 753 - name = "icu_properties" 754 - version = "2.1.1" 755 - source = "registry+https://github.com/rust-lang/crates.io-index" 756 - checksum = "e93fcd3157766c0c8da2f8cff6ce651a31f0810eaa1c51ec363ef790bbb5fb99" 757 - dependencies = [ 758 - "icu_collections", 759 - "icu_locale_core", 760 - "icu_properties_data", 761 - "icu_provider", 762 - "zerotrie", 763 - "zerovec", 764 - ] 765 - 766 - [[package]] 767 - name = "icu_properties_data" 768 - version = "2.1.1" 769 - source = "registry+https://github.com/rust-lang/crates.io-index" 770 - checksum = "02845b3647bb045f1100ecd6480ff52f34c35f82d9880e029d329c21d1054899" 771 - 772 - [[package]] 773 - name = "icu_provider" 774 - version = "2.1.1" 775 - source = "registry+https://github.com/rust-lang/crates.io-index" 776 - checksum = "85962cf0ce02e1e0a629cc34e7ca3e373ce20dda4c4d7294bbd0bf1fdb59e614" 777 - dependencies = [ 778 - "displaydoc", 779 - "icu_locale_core", 780 - "writeable", 781 - "yoke", 782 - "zerofrom", 783 - "zerotrie", 784 - "zerovec", 785 - ] 786 - 787 - [[package]] 788 - name = "idna" 789 - version = "1.1.0" 790 - source = "registry+https://github.com/rust-lang/crates.io-index" 791 - checksum = "3b0875f23caa03898994f6ddc501886a45c7d3d62d04d2d90788d47be1b1e4de" 792 - dependencies = [ 793 - "idna_adapter", 794 - "smallvec", 795 - "utf8_iter", 796 - ] 797 - 798 - [[package]] 799 - name = "idna_adapter" 800 - version = "1.2.1" 801 - source = "registry+https://github.com/rust-lang/crates.io-index" 802 - checksum = "3acae9609540aa318d1bc588455225fb2085b9ed0c4f6bd0d9d5bcd86f1a0344" 803 - dependencies = [ 804 - "icu_normalizer", 805 - "icu_properties", 806 - ] 807 - 808 - [[package]] 809 634 name = "io-uring" 810 635 version = "0.7.10" 811 636 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 921 746 checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039" 922 747 923 748 [[package]] 924 - name = "litemap" 925 - version = "0.8.1" 926 - source = "registry+https://github.com/rust-lang/crates.io-index" 927 - checksum = "6373607a59f0be73a39b6fe456b8192fcc3585f602af20751600e974dd455e77" 928 - 929 - [[package]] 930 - name = "lmdb-master-sys" 931 - version = "0.2.5" 932 - source = "registry+https://github.com/rust-lang/crates.io-index" 933 - checksum = "864808e0b19fb6dd3b70ba94ee671b82fce17554cf80aeb0a155c65bb08027df" 934 - dependencies = [ 935 - "cc", 936 - "doxygen-rs", 937 - "libc", 938 - ] 939 - 940 - [[package]] 941 749 name = "lock_api" 942 750 version = "0.4.14" 943 751 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1049 857 checksum = "d6790f58c7ff633d8771f42965289203411a5e5c68388703c06e14f24770b41e" 1050 858 1051 859 [[package]] 1052 - name = "page_size" 1053 - version = "0.6.0" 1054 - source = "registry+https://github.com/rust-lang/crates.io-index" 1055 - checksum = "30d5b2194ed13191c1999ae0704b7839fb18384fa22e49b57eeaa97d79ce40da" 1056 - dependencies = [ 1057 - "libc", 1058 - "winapi", 1059 - ] 1060 - 1061 - [[package]] 1062 860 name = "parking_lot" 1063 861 version = "0.12.5" 1064 862 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1082 880 ] 1083 881 1084 882 [[package]] 1085 - name = "percent-encoding" 1086 - version = "2.3.2" 1087 - source = "registry+https://github.com/rust-lang/crates.io-index" 1088 - checksum = "9b4f627cb1b25917193a259e49bdad08f671f8d9708acfd5fe0a8c1455d87220" 1089 - 1090 - [[package]] 1091 - name = "phf" 1092 - version = "0.11.3" 1093 - source = "registry+https://github.com/rust-lang/crates.io-index" 1094 - checksum = "1fd6780a80ae0c52cc120a26a1a42c1ae51b247a253e4e06113d23d2c2edd078" 1095 - dependencies = [ 1096 - "phf_macros", 1097 - "phf_shared", 1098 - ] 1099 - 1100 - [[package]] 1101 - name = "phf_generator" 1102 - version = "0.11.3" 1103 - source = "registry+https://github.com/rust-lang/crates.io-index" 1104 - checksum = "3c80231409c20246a13fddb31776fb942c38553c51e871f8cbd687a4cfb5843d" 1105 - dependencies = [ 1106 - "phf_shared", 1107 - "rand", 1108 - ] 1109 - 1110 - [[package]] 1111 - name = "phf_macros" 1112 - version = "0.11.3" 1113 - source = "registry+https://github.com/rust-lang/crates.io-index" 1114 - checksum = "f84ac04429c13a7ff43785d75ad27569f2951ce0ffd30a3321230db2fc727216" 1115 - dependencies = [ 1116 - "phf_generator", 1117 - "phf_shared", 1118 - "proc-macro2", 1119 - "quote", 1120 - "syn 2.0.106", 1121 - ] 1122 - 1123 - [[package]] 1124 - name = "phf_shared" 1125 - version = "0.11.3" 1126 - source = "registry+https://github.com/rust-lang/crates.io-index" 1127 - checksum = "67eabc2ef2a60eb7faa00097bd1ffdb5bd28e62bf39990626a582201b7a754e5" 1128 - dependencies = [ 1129 - "siphasher", 1130 - ] 1131 - 1132 - [[package]] 1133 883 name = "pin-project-lite" 1134 884 version = "0.2.16" 1135 885 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1191 941 ] 1192 942 1193 943 [[package]] 1194 - name = "potential_utf" 1195 - version = "0.1.4" 1196 - source = "registry+https://github.com/rust-lang/crates.io-index" 1197 - checksum = "b73949432f5e2a09657003c25bca5e19a0e9c84f8058ca374f49e0ebe605af77" 1198 - dependencies = [ 1199 - "zerovec", 1200 - ] 1201 - 1202 - [[package]] 1203 - name = "ppv-lite86" 1204 - version = "0.2.21" 1205 - source = "registry+https://github.com/rust-lang/crates.io-index" 1206 - checksum = "85eae3c4ed2f50dcfe72643da4befc30deadb458a9b590d720cde2f2b1e97da9" 1207 - dependencies = [ 1208 - "zerocopy", 1209 - ] 1210 - 1211 - [[package]] 1212 944 name = "proc-macro2" 1213 945 version = "1.0.101" 1214 946 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1233 965 checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f" 1234 966 1235 967 [[package]] 1236 - name = "rand" 1237 - version = "0.8.5" 1238 - source = "registry+https://github.com/rust-lang/crates.io-index" 1239 - checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" 1240 - dependencies = [ 1241 - "libc", 1242 - "rand_chacha", 1243 - "rand_core", 1244 - ] 1245 - 1246 - [[package]] 1247 - name = "rand_chacha" 1248 - version = "0.3.1" 1249 - source = "registry+https://github.com/rust-lang/crates.io-index" 1250 - checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" 1251 - dependencies = [ 1252 - "ppv-lite86", 1253 - "rand_core", 1254 - ] 1255 - 1256 - [[package]] 1257 - name = "rand_core" 1258 - version = "0.6.4" 1259 - source = "registry+https://github.com/rust-lang/crates.io-index" 1260 - checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" 1261 - dependencies = [ 1262 - "getrandom 0.2.16", 1263 - ] 1264 - 1265 - [[package]] 1266 968 name = "rayon" 1267 969 version = "1.11.0" 1268 970 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1283 985 ] 1284 986 1285 987 [[package]] 1286 - name = "redb" 1287 - version = "3.1.0" 1288 - source = "registry+https://github.com/rust-lang/crates.io-index" 1289 - checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06" 1290 - dependencies = [ 1291 - "libc", 1292 - ] 1293 - 1294 - [[package]] 1295 988 name = "redox_syscall" 1296 989 version = "0.5.18" 1297 990 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1331 1024 1332 1025 [[package]] 1333 1026 name = "repo-stream" 1334 - version = "0.1.1" 1027 + version = "0.2.2" 1335 1028 dependencies = [ 1336 - "bincode 2.0.1", 1029 + "bincode", 1337 1030 "clap", 1338 1031 "criterion", 1339 1032 "env_logger", 1340 1033 "futures", 1341 1034 "futures-core", 1342 - "heed", 1343 1035 "ipld-core", 1344 1036 "iroh-car", 1345 1037 "log", 1346 1038 "multibase", 1347 - "redb", 1348 1039 "rusqlite", 1349 - "rustcask", 1350 1040 "serde", 1351 1041 "serde_bytes", 1352 1042 "serde_ipld_dagcbor", 1043 + "sha2", 1353 1044 "tempfile", 1354 1045 "thiserror 2.0.17", 1355 1046 "tokio", ··· 1376 1067 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1377 1068 1378 1069 [[package]] 1379 - name = "rustcask" 1380 - version = "0.1.0" 1381 - source = "registry+https://github.com/rust-lang/crates.io-index" 1382 - checksum = "e17ed1a2733a60fea8495ddcb42c22cabd17afec7ffa7b024b161dd662da4003" 1383 - dependencies = [ 1384 - "bincode 1.3.3", 1385 - "bytes", 1386 - "clap", 1387 - "log", 1388 - "rand", 1389 - "regex", 1390 - "serde", 1391 - "tokio", 1392 - ] 1393 - 1394 - [[package]] 1395 1070 name = "rustix" 1396 1071 version = "1.1.2" 1397 1072 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1497 1172 ] 1498 1173 1499 1174 [[package]] 1500 - name = "shlex" 1501 - version = "1.3.0" 1175 + name = "sha2" 1176 + version = "0.10.9" 1502 1177 source = "registry+https://github.com/rust-lang/crates.io-index" 1503 - checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" 1178 + checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283" 1179 + dependencies = [ 1180 + "cfg-if", 1181 + "cpufeatures", 1182 + "digest", 1183 + ] 1504 1184 1505 1185 [[package]] 1506 1186 name = "signal-hook-registry" ··· 1512 1192 ] 1513 1193 1514 1194 [[package]] 1515 - name = "siphasher" 1516 - version = "1.0.1" 1517 - source = "registry+https://github.com/rust-lang/crates.io-index" 1518 - checksum = "56199f7ddabf13fe5074ce809e7d3f42b42ae711800501b5b16ea82ad029c39d" 1519 - 1520 - [[package]] 1521 1195 name = "slab" 1522 1196 version = "0.4.11" 1523 1197 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1538 1212 "libc", 1539 1213 "windows-sys 0.59.0", 1540 1214 ] 1541 - 1542 - [[package]] 1543 - name = "stable_deref_trait" 1544 - version = "1.2.1" 1545 - source = "registry+https://github.com/rust-lang/crates.io-index" 1546 - checksum = "6ce2be8dc25455e1f91df71bfa12ad37d7af1092ae736f3a6cd0e37bc7810596" 1547 1215 1548 1216 [[package]] 1549 1217 name = "strsim" ··· 1574 1242 ] 1575 1243 1576 1244 [[package]] 1577 - name = "synchronoise" 1578 - version = "1.0.1" 1579 - source = "registry+https://github.com/rust-lang/crates.io-index" 1580 - checksum = "3dbc01390fc626ce8d1cffe3376ded2b72a11bb70e1c75f404a210e4daa4def2" 1581 - dependencies = [ 1582 - "crossbeam-queue", 1583 - ] 1584 - 1585 - [[package]] 1586 - name = "synstructure" 1587 - version = "0.13.2" 1588 - source = "registry+https://github.com/rust-lang/crates.io-index" 1589 - checksum = "728a70f3dbaf5bab7f0c4b1ac8d7ae5ea60a4b5549c8a5914361c99147a709d2" 1590 - dependencies = [ 1591 - "proc-macro2", 1592 - "quote", 1593 - "syn 2.0.106", 1594 - ] 1595 - 1596 - [[package]] 1597 1245 name = "tempfile" 1598 1246 version = "3.23.0" 1599 1247 source = "registry+https://github.com/rust-lang/crates.io-index" 1600 1248 checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16" 1601 1249 dependencies = [ 1602 1250 "fastrand", 1603 - "getrandom 0.3.3", 1251 + "getrandom", 1604 1252 "once_cell", 1605 1253 "rustix", 1606 1254 "windows-sys 0.60.2", ··· 1647 1295 ] 1648 1296 1649 1297 [[package]] 1650 - name = "tinystr" 1651 - version = "0.8.2" 1652 - source = "registry+https://github.com/rust-lang/crates.io-index" 1653 - checksum = "42d3e9c45c09de15d06dd8acf5f4e0e399e85927b7f00711024eb7ae10fa4869" 1654 - dependencies = [ 1655 - "displaydoc", 1656 - "zerovec", 1657 - ] 1658 - 1659 - [[package]] 1660 1298 name = "tinytemplate" 1661 1299 version = "1.2.1" 1662 1300 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1696 1334 "quote", 1697 1335 "syn 2.0.106", 1698 1336 ] 1337 + 1338 + [[package]] 1339 + name = "typenum" 1340 + version = "1.19.0" 1341 + source = "registry+https://github.com/rust-lang/crates.io-index" 1342 + checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb" 1699 1343 1700 1344 [[package]] 1701 1345 name = "unicode-ident" ··· 1720 1364 version = "0.0.4" 1721 1365 source = "registry+https://github.com/rust-lang/crates.io-index" 1722 1366 checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1723 - 1724 - [[package]] 1725 - name = "url" 1726 - version = "2.5.7" 1727 - source = "registry+https://github.com/rust-lang/crates.io-index" 1728 - checksum = "08bc136a29a3d1758e07a9cca267be308aeebf5cfd5a10f3f67ab2097683ef5b" 1729 - dependencies = [ 1730 - "form_urlencoded", 1731 - "idna", 1732 - "percent-encoding", 1733 - "serde", 1734 - ] 1735 - 1736 - [[package]] 1737 - name = "utf8_iter" 1738 - version = "1.0.4" 1739 - source = "registry+https://github.com/rust-lang/crates.io-index" 1740 - checksum = "b6c140620e7ffbb22c2dee59cafe6084a59b5ffc27a8859a5f0d494b5d52b6be" 1741 1367 1742 1368 [[package]] 1743 1369 name = "utf8parse" ··· 1752 1378 checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1753 1379 1754 1380 [[package]] 1381 + name = "version_check" 1382 + version = "0.9.5" 1383 + source = "registry+https://github.com/rust-lang/crates.io-index" 1384 + checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1385 + 1386 + [[package]] 1755 1387 name = "virtue" 1756 1388 version = "0.0.18" 1757 1389 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1861 1493 ] 1862 1494 1863 1495 [[package]] 1864 - name = "winapi" 1865 - version = "0.3.9" 1866 - source = "registry+https://github.com/rust-lang/crates.io-index" 1867 - checksum = "5c839a674fcd7a98952e593242ea400abe93992746761e38641405d28b00f419" 1868 - dependencies = [ 1869 - "winapi-i686-pc-windows-gnu", 1870 - "winapi-x86_64-pc-windows-gnu", 1871 - ] 1872 - 1873 - [[package]] 1874 - name = "winapi-i686-pc-windows-gnu" 1875 - version = "0.4.0" 1876 - source = "registry+https://github.com/rust-lang/crates.io-index" 1877 - checksum = "ac3b87c63620426dd9b991e5ce0329eff545bccbbb34f3be09ff6fb6ab51b7b6" 1878 - 1879 - [[package]] 1880 1496 name = "winapi-util" 1881 1497 version = "0.1.11" 1882 1498 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1884 1500 dependencies = [ 1885 1501 "windows-sys 0.60.2", 1886 1502 ] 1887 - 1888 - [[package]] 1889 - name = "winapi-x86_64-pc-windows-gnu" 1890 - version = "0.4.0" 1891 - source = "registry+https://github.com/rust-lang/crates.io-index" 1892 - checksum = "712e227841d057c1ee1cd2fb22fa7e5a5461ae8e48fa2ca79ec42cfc1931183f" 1893 1503 1894 1504 [[package]] 1895 1505 name = "windows-link" ··· 2051 1661 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 2052 1662 2053 1663 [[package]] 2054 - name = "writeable" 2055 - version = "0.6.2" 2056 - source = "registry+https://github.com/rust-lang/crates.io-index" 2057 - checksum = "9edde0db4769d2dc68579893f2306b26c6ecfbe0ef499b013d731b7b9247e0b9" 2058 - 2059 - [[package]] 2060 - name = "yoke" 2061 - version = "0.8.1" 2062 - source = "registry+https://github.com/rust-lang/crates.io-index" 2063 - checksum = "72d6e5c6afb84d73944e5cedb052c4680d5657337201555f9f2a16b7406d4954" 2064 - dependencies = [ 2065 - "stable_deref_trait", 2066 - "yoke-derive", 2067 - "zerofrom", 2068 - ] 2069 - 2070 - [[package]] 2071 - name = "yoke-derive" 2072 - version = "0.8.1" 2073 - source = "registry+https://github.com/rust-lang/crates.io-index" 2074 - checksum = "b659052874eb698efe5b9e8cf382204678a0086ebf46982b79d6ca3182927e5d" 2075 - dependencies = [ 2076 - "proc-macro2", 2077 - "quote", 2078 - "syn 2.0.106", 2079 - "synstructure", 2080 - ] 2081 - 2082 - [[package]] 2083 1664 name = "zerocopy" 2084 1665 version = "0.8.27" 2085 1666 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 2098 1679 "quote", 2099 1680 "syn 2.0.106", 2100 1681 ] 2101 - 2102 - [[package]] 2103 - name = "zerofrom" 2104 - version = "0.1.6" 2105 - source = "registry+https://github.com/rust-lang/crates.io-index" 2106 - checksum = "50cc42e0333e05660c3587f3bf9d0478688e15d870fab3346451ce7f8c9fbea5" 2107 - dependencies = [ 2108 - "zerofrom-derive", 2109 - ] 2110 - 2111 - [[package]] 2112 - name = "zerofrom-derive" 2113 - version = "0.1.6" 2114 - source = "registry+https://github.com/rust-lang/crates.io-index" 2115 - checksum = "d71e5d6e06ab090c67b5e44993ec16b72dcbaabc526db883a360057678b48502" 2116 - dependencies = [ 2117 - "proc-macro2", 2118 - "quote", 2119 - "syn 2.0.106", 2120 - "synstructure", 2121 - ] 2122 - 2123 - [[package]] 2124 - name = "zerotrie" 2125 - version = "0.2.3" 2126 - source = "registry+https://github.com/rust-lang/crates.io-index" 2127 - checksum = "2a59c17a5562d507e4b54960e8569ebee33bee890c70aa3fe7b97e85a9fd7851" 2128 - dependencies = [ 2129 - "displaydoc", 2130 - "yoke", 2131 - "zerofrom", 2132 - ] 2133 - 2134 - [[package]] 2135 - name = "zerovec" 2136 - version = "0.11.5" 2137 - source = "registry+https://github.com/rust-lang/crates.io-index" 2138 - checksum = "6c28719294829477f525be0186d13efa9a3c602f7ec202ca9e353d310fb9a002" 2139 - dependencies = [ 2140 - "yoke", 2141 - "zerofrom", 2142 - "zerovec-derive", 2143 - ] 2144 - 2145 - [[package]] 2146 - name = "zerovec-derive" 2147 - version = "0.11.2" 2148 - source = "registry+https://github.com/rust-lang/crates.io-index" 2149 - checksum = "eadce39539ca5cb3985590102671f2567e659fca9666581ad3411d59207951f3" 2150 - dependencies = [ 2151 - "proc-macro2", 2152 - "quote", 2153 - "syn 2.0.106", 2154 - ]
+6 -8
Cargo.toml
··· 1 1 [package] 2 2 name = "repo-stream" 3 - version = "0.1.1" 3 + version = "0.2.2" 4 4 edition = "2024" 5 5 license = "MIT OR Apache-2.0" 6 - description = "Fast and robust atproto CAR file processing in rust" 6 + description = "A robust CAR file -> MST walker for atproto" 7 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 8 9 9 [dependencies] 10 10 bincode = { version = "2.0.1", features = ["serde"] } 11 11 futures = "0.3.31" 12 12 futures-core = "0.3.31" 13 - heed = "0.22.0" 14 13 ipld-core = { version = "0.4.2", features = ["serde"] } 15 14 iroh-car = "0.5.1" 16 15 log = "0.4.28" 17 16 multibase = "0.9.2" 18 - redb = "3.1.0" 19 17 rusqlite = "0.37.0" 20 - rustcask = "0.1.0" 21 18 serde = { version = "1.0.228", features = ["derive"] } 22 19 serde_bytes = "0.11.19" 23 20 serde_ipld_dagcbor = "0.6.4" 21 + sha2 = "0.10.9" 24 22 thiserror = "2.0.17" 25 - tokio = { version = "1.47.1", features = ["rt"] } 23 + tokio = { version = "1.47.1", features = ["rt", "sync"] } 26 24 27 25 [dev-dependencies] 28 26 clap = { version = "4.5.48", features = ["derive"] } ··· 36 34 inherits = "release" 37 35 debug = true 38 36 39 - [profile.release] 40 - debug = true 37 + # [profile.release] 38 + # debug = true 41 39 42 40 [[bench]] 43 41 name = "non-huge-cars"
+12 -21
benches/huge-car.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 use std::path::{Path, PathBuf}; 6 4 7 5 use criterion::{Criterion, criterion_group, criterion_main}; ··· 20 18 }); 21 19 } 22 20 23 - async fn drive_car(filename: impl AsRef<Path>) { 21 + async fn drive_car(filename: impl AsRef<Path>) -> usize { 24 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 25 23 let reader = tokio::io::BufReader::new(reader); 26 - let reader = CarReader::new(reader).await.unwrap(); 27 24 28 - let root = reader 29 - .header() 30 - .roots() 31 - .first() 32 - .ok_or("missing root") 25 + let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 + .await 33 27 .unwrap() 34 - .clone(); 35 - 36 - let stream = std::pin::pin!(reader.stream()); 37 - 38 - let (_commit, v) = 39 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 40 - .await 41 - .unwrap(); 42 - let mut record_stream = std::pin::pin!(v.stream()); 28 + { 29 + Driver::Memory(_, mem_driver) => mem_driver, 30 + Driver::Disk(_) => panic!("not doing disk for benchmark"), 31 + }; 43 32 44 - while let Some(_) = record_stream.try_next().await.unwrap() { 45 - // just here for the drive 33 + let mut n = 0; 34 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 35 + n += pairs.len(); 46 36 } 37 + n 47 38 } 48 39 49 40 criterion_group!(benches, criterion_benchmark);
+16 -22
benches/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 6 4 use criterion::{Criterion, criterion_group, criterion_main}; 7 5 6 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 8 7 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 9 8 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 10 9 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 15 14 .build() 16 15 .expect("Creating runtime failed"); 17 16 17 + c.bench_function("empty-car", |b| { 18 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 19 + }); 18 20 c.bench_function("tiny-car", |b| { 19 21 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 20 22 }); ··· 26 28 }); 27 29 } 28 30 29 - async fn drive_car(bytes: &[u8]) { 30 - let reader = CarReader::new(bytes).await.unwrap(); 31 - 32 - let root = reader 33 - .header() 34 - .roots() 35 - .first() 36 - .ok_or("missing root") 31 + async fn drive_car(bytes: &[u8]) -> usize { 32 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 33 + .await 37 34 .unwrap() 38 - .clone(); 39 - 40 - let stream = std::pin::pin!(reader.stream()); 35 + { 36 + Driver::Memory(_, mem_driver) => mem_driver, 37 + Driver::Disk(_) => panic!("not benching big cars here"), 38 + }; 41 39 42 - let (_commit, v) = 43 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 44 - .await 45 - .unwrap(); 46 - let mut record_stream = std::pin::pin!(v.stream()); 47 - 48 - while let Some(_) = record_stream.try_next().await.unwrap() { 49 - // just here for the drive 40 + let mut n = 0; 41 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 42 + n += pairs.len(); 50 43 } 44 + n 51 45 } 52 46 53 47 criterion_group!(benches, criterion_benchmark);
car-samples/empty.car

This is a binary file and will not be displayed.

+64 -43
examples/disk-read-file/main.rs
··· 1 + /*! 2 + Read a CAR file by spilling to disk 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 3 - use repo_stream::drive::Processable; 4 - use serde::{Deserialize, Serialize}; 7 + use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 5 8 use std::path::PathBuf; 6 - 7 - type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; 9 + use std::time::Instant; 8 10 9 11 #[derive(Debug, Parser)] 10 12 struct Args { ··· 14 16 tmpfile: PathBuf, 15 17 } 16 18 17 - #[derive(Clone, Serialize, Deserialize)] 18 - struct S(usize); 19 - 20 - impl Processable for S { 21 - fn get_size(&self) -> usize { 22 - 0 // no additional space taken, just its stack size (newtype is free) 23 - } 24 - } 25 - 26 19 #[tokio::main] 27 - async fn main() -> Result<()> { 20 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 28 21 env_logger::init(); 29 22 30 23 let Args { car, tmpfile } = Args::parse(); 24 + 25 + // repo-stream takes an AsyncRead as input. wrapping a filesystem read in 26 + // BufReader can provide a really significant performance win. 31 27 let reader = tokio::fs::File::open(car).await?; 32 28 let reader = tokio::io::BufReader::new(reader); 33 29 34 - // let kb = 2_usize.pow(10); 35 - let mb = 2_usize.pow(20); 30 + log::info!("hello! reading the car..."); 31 + let t0 = Instant::now(); 36 32 37 - let mut driver = 38 - match repo_stream::drive::load_car(reader, |block| S(block.len()), 5 * mb).await? { 39 - repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"), 40 - repo_stream::drive::Vehicle::Big(big_stuff) => { 41 - let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone()); 42 - // let disk_store = repo_stream::disk::RedbStore::new(tmpfile.clone()); 43 - // let disk_store = repo_stream::disk::RustcaskStore::new(tmpfile.clone()); 44 - // let disk_store = repo_stream::disk::HeedStore::new(tmpfile.clone()); 45 - let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 46 - log::warn!("big: {:?}", commit); 47 - driver 48 - } 49 - }; 33 + // in this example we only bother handling CARs that are too big for memory 34 + // `noop` helper means: do no block processing, store the raw blocks 35 + let driver = match DriverBuilder::new() 36 + .with_mem_limit_mb(10) // how much memory can be used before disk spill 37 + .load_car(reader) 38 + .await? 39 + { 40 + Driver::Memory(_, _) => panic!("try this on a bigger car"), 41 + Driver::Disk(big_stuff) => { 42 + // we reach here if the repo was too big and needs to be spilled to 43 + // disk to continue 44 + 45 + // set up a disk store we can spill to 46 + let disk_store = DiskBuilder::new().open(tmpfile).await?; 47 + 48 + // do the spilling, get back a (similar) driver 49 + let (commit, driver) = big_stuff.finish_loading(disk_store).await?; 50 + 51 + // at this point you might want to fetch the account's signing key 52 + // via the DID from the commit, and then verify the signature. 53 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 50 54 51 - println!("hello!"); 55 + // pop the driver back out to get some code indentation relief 56 + driver 57 + } 58 + }; 52 59 60 + // collect some random stats about the blocks 53 61 let mut n = 0; 54 - loop { 55 - let (d, p) = driver.next_chunk(1024).await?; 56 - driver = d; 57 - let Some(pairs) = p else { 58 - break; 59 - }; 62 + let mut zeros = 0; 63 + 64 + log::info!("walking..."); 65 + 66 + // this example uses the disk driver's channel mode: the tree walking is 67 + // spawned onto a blocking thread, and we get chunks of rkey+blocks back 68 + let (mut rx, join) = driver.to_channel(512); 69 + while let Some(r) = rx.recv().await { 70 + let pairs = r?; 71 + 72 + // keep a count of the total number of blocks seen 60 73 n += pairs.len(); 61 - // log::info!("got {rkey:?}"); 74 + 75 + for (_, block) in pairs { 76 + // for each block, count how many bytes are equal to '0' 77 + // (this is just an example, you probably want to do something more 78 + // interesting) 79 + zeros += block.into_iter().filter(|&b| b == b'0').count() 80 + } 62 81 } 63 - // log::info!("now is the time to check mem..."); 64 - // tokio::time::sleep(std::time::Duration::from_secs(22)).await; 65 - drop(driver); 66 - log::info!("bye! {n}"); 82 + 83 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 + 85 + // clean up the database. would be nice to do this in drop so it happens 86 + // automatically, but some blocking work happens, so that's not allowed in 87 + // async rust. ๐Ÿคทโ€โ™€๏ธ 88 + join.await?.reset_store().await?; 67 89 68 - std::fs::remove_file(tmpfile).unwrap(); 69 - // std::fs::remove_dir_all(tmpfile).unwrap(); 90 + log::info!("done. n={n} zeros={zeros}"); 70 91 71 92 Ok(()) 72 93 }
+14 -6
examples/read-file/main.rs
··· 1 + /*! 2 + Read a CAR file with in-memory processing 3 + */ 4 + 1 5 extern crate repo_stream; 2 6 use clap::Parser; 7 + use repo_stream::{Driver, DriverBuilder}; 3 8 use std::path::PathBuf; 4 9 5 10 type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>; ··· 18 23 let reader = tokio::fs::File::open(file).await?; 19 24 let reader = tokio::io::BufReader::new(reader); 20 25 21 - let (commit, mut driver) = 22 - match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * 1024).await? { 23 - repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver), 24 - repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"), 25 - }; 26 + let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len()) 28 + .load_car(reader) 29 + .await? 30 + { 31 + Driver::Memory(commit, mem_driver) => (commit, mem_driver), 32 + Driver::Disk(_) => panic!("this example doesn't handle big CARs"), 33 + }; 26 34 27 35 log::info!("got commit: {commit:?}"); 28 36 ··· 31 39 n += pairs.len(); 32 40 // log::info!("got {rkey:?}"); 33 41 } 34 - log::info!("bye! {n}"); 42 + log::info!("bye! total records={n}"); 35 43 36 44 Ok(()) 37 45 }
+70 -2
readme.md
··· 1 1 # repo-stream 2 2 3 - Fast and (aspirationally) robust atproto CAR file processing in rust 3 + A robust CAR file -> MST walker for atproto 4 + 5 + [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 + [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 8 + 9 + [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 10 + [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 + 13 + ```rust 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder}; 15 + 16 + #[tokio::main] 17 + async fn main() -> Result<(), DriveError> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car".into()).await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 + 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 28 + .load_car(reader) 29 + .await? 30 + { 31 + 32 + // if all blocks fit within memory 33 + Driver::Memory(_commit, mut driver) => { 34 + while let Some(chunk) = driver.next_chunk(256).await? { 35 + for (_rkey, size) in chunk { 36 + total_size += size; 37 + } 38 + } 39 + }, 40 + 41 + // if the CAR was too big for in-memory processing 42 + Driver::Disk(paused) => { 43 + // set up a disk store we can spill to 44 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 45 + // do the spilling, get back a (similar) driver 46 + let (_commit, mut driver) = paused.finish_loading(store).await?; 47 + 48 + while let Some(chunk) = driver.next_chunk(256).await? { 49 + for (_rkey, size) in chunk { 50 + total_size += size; 51 + } 52 + } 53 + 54 + // clean up the disk store (drop tables etc) 55 + driver.reset_store().await?; 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + Ok(()) 60 + } 61 + ``` 62 + 63 + more recent todo 64 + 65 + - [ ] get an *emtpy* car for the test suite 66 + - [x] implement a max size on disk limit 67 + 68 + 69 + ----- 70 + 71 + older stuff (to clean up): 4 72 5 73 6 74 current car processing times (records processed into their length usize, phil's dev machine): ··· 27 95 -> yeah the commit is returned from init 28 96 - [ ] spec compliance todos 29 97 - [x] assert that keys are ordered and fail if not 30 - - [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 98 + - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 31 99 - [ ] performance todos 32 100 - [x] consume the serialized nodes into a mutable efficient format 33 101 - [ ] maybe customize the deserialize impl to do that directly?
+168 -372
src/disk.rs
··· 1 - use redb::ReadableDatabase; 2 - use rusqlite::OptionalExtension; 3 - use std::error::Error; 4 - use std::path::PathBuf; 5 - 6 - pub trait StorageErrorBase: Error + Send + 'static {} 7 - 8 - /// high level potential storage resource 9 - /// 10 - /// separating this allows (hopefully) implementing a storage pool that can 11 - /// async-block when until a member is available to use 12 - pub trait DiskStore { 13 - type StorageError: StorageErrorBase + Send; 14 - type Access: DiskAccess<StorageError = Self::StorageError>; 15 - fn get_access(&mut self) -> impl Future<Output = Result<Self::Access, Self::StorageError>>; 16 - } 1 + /*! 2 + Disk storage for blocks on disk 17 3 18 - /// actual concrete access to disk storage 19 - pub trait DiskAccess: Send { 20 - type StorageError: StorageErrorBase; 4 + Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed 5 + to be the best behaved in terms of both on-disk space usage and memory usage. 21 6 22 - fn get_writer(&mut self) -> Result<impl DiskWriter<Self::StorageError>, Self::StorageError>; 7 + ```no_run 8 + # use repo_stream::{DiskBuilder, DiskError}; 9 + # #[tokio::main] 10 + # async fn main() -> Result<(), DiskError> { 11 + let store = DiskBuilder::new() 12 + .with_cache_size_mb(32) 13 + .with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted 14 + .open("/some/path.db".into()).await?; 15 + # Ok(()) 16 + # } 17 + ``` 18 + */ 23 19 24 - fn get_reader( 25 - &self, 26 - ) -> Result<impl DiskReader<StorageError = Self::StorageError>, Self::StorageError>; 20 + use crate::drive::DriveError; 21 + use rusqlite::OptionalExtension; 22 + use std::path::PathBuf; 27 23 28 - // TODO: force a cleanup implementation? 24 + #[derive(Debug, thiserror::Error)] 25 + pub enum DiskError { 26 + /// A wrapped database error 27 + /// 28 + /// (The wrapped err should probably be obscured to remove public-facing 29 + /// sqlite bits) 30 + #[error(transparent)] 31 + DbError(#[from] rusqlite::Error), 32 + /// A tokio blocking task failed to join 33 + #[error("Failed to join a tokio blocking task: {0}")] 34 + JoinError(#[from] tokio::task::JoinError), 35 + /// The total size of stored blocks exceeded the allowed size 36 + /// 37 + /// If you need to process *really* big CARs, you can configure a higher 38 + /// limit. 39 + #[error("Maximum disk size reached")] 40 + MaxSizeExceeded, 41 + #[error("this error was replaced, seeing this is a bug.")] 42 + #[doc(hidden)] 43 + Stolen, 29 44 } 30 45 31 - pub trait DiskWriter<E: StorageErrorBase> { 32 - fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), E>; 33 - fn put_many(&mut self, _kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<(), E>; 46 + impl DiskError { 47 + /// hack for ownership challenges with the disk driver 48 + pub(crate) fn steal(&mut self) -> Self { 49 + let mut swapped = DiskError::Stolen; 50 + std::mem::swap(self, &mut swapped); 51 + swapped 52 + } 34 53 } 35 54 36 - pub trait DiskReader { 37 - type StorageError: StorageErrorBase; 38 - fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, Self::StorageError>; 55 + /// Builder-style disk store setup 56 + #[derive(Debug, Clone)] 57 + pub struct DiskBuilder { 58 + /// Database in-memory cache allowance 59 + /// 60 + /// Default: 32 MiB 61 + pub cache_size_mb: usize, 62 + /// Database stored block size limit 63 + /// 64 + /// Default: 10 GiB 65 + /// 66 + /// Note: actual size on disk may be more, but should approximately scale 67 + /// with this limit 68 + pub max_stored_mb: usize, 39 69 } 40 70 41 - ///////////////// sqlite 42 - 43 - pub struct SqliteStore { 44 - path: PathBuf, 71 + impl Default for DiskBuilder { 72 + fn default() -> Self { 73 + Self { 74 + cache_size_mb: 32, 75 + max_stored_mb: 10 * 1024, // 10 GiB 76 + } 77 + } 45 78 } 46 79 47 - impl SqliteStore { 48 - pub fn new(path: PathBuf) -> Self { 49 - Self { path } 80 + impl DiskBuilder { 81 + /// Begin configuring the storage with defaults 82 + pub fn new() -> Self { 83 + Default::default() 84 + } 85 + /// Set the in-memory cache allowance for the database 86 + /// 87 + /// Default: 32 MiB 88 + pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 + self.cache_size_mb = size; 90 + self 91 + } 92 + /// Set the approximate stored block size limit 93 + /// 94 + /// Default: 10 GiB 95 + pub fn with_max_stored_mb(mut self, max: usize) -> Self { 96 + self.max_stored_mb = max; 97 + self 98 + } 99 + /// Open and initialize the actual disk storage 100 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 101 + DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 50 102 } 51 103 } 52 104 53 - impl StorageErrorBase for rusqlite::Error {} 105 + /// On-disk block storage 106 + pub struct DiskStore { 107 + conn: rusqlite::Connection, 108 + max_stored: usize, 109 + stored: usize, 110 + } 54 111 55 - impl DiskStore for SqliteStore { 56 - type StorageError = rusqlite::Error; 57 - type Access = SqliteAccess; 58 - async fn get_access(&mut self) -> Result<SqliteAccess, rusqlite::Error> { 59 - let path = self.path.clone(); 112 + impl DiskStore { 113 + /// Initialize a new disk store 114 + pub async fn new( 115 + path: PathBuf, 116 + cache_mb: usize, 117 + max_stored_mb: usize, 118 + ) -> Result<Self, DiskError> { 119 + let max_stored = max_stored_mb * 2_usize.pow(20); 60 120 let conn = tokio::task::spawn_blocking(move || { 61 121 let conn = rusqlite::Connection::open(path)?; 62 122 63 - let sq_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 123 + let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 64 124 65 125 // conn.pragma_update(None, "journal_mode", "OFF")?; 66 126 // conn.pragma_update(None, "journal_mode", "MEMORY")?; 67 127 conn.pragma_update(None, "journal_mode", "WAL")?; 128 + // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 68 129 conn.pragma_update(None, "synchronous", "OFF")?; 69 - conn.pragma_update(None, "cache_size", (5 * sq_mb).to_string())?; 70 - conn.execute( 71 - "CREATE TABLE blocks ( 72 - key BLOB PRIMARY KEY NOT NULL, 73 - val BLOB NOT NULL 74 - ) WITHOUT ROWID", 75 - (), 130 + conn.pragma_update( 131 + None, 132 + "cache_size", 133 + (cache_mb as i64 * sqlite_one_mb).to_string(), 76 134 )?; 135 + Self::reset_tables(&conn)?; 77 136 78 - Ok::<_, Self::StorageError>(conn) 137 + Ok::<_, DiskError>(conn) 79 138 }) 80 - .await 81 - .expect("join error")?; 139 + .await??; 82 140 83 - Ok(SqliteAccess { conn }) 141 + Ok(Self { 142 + conn, 143 + max_stored, 144 + stored: 0, 145 + }) 84 146 } 85 - } 86 - 87 - pub struct SqliteAccess { 88 - conn: rusqlite::Connection, 89 - } 90 - 91 - impl DiskAccess for SqliteAccess { 92 - type StorageError = rusqlite::Error; 93 - fn get_writer(&mut self) -> Result<impl DiskWriter<rusqlite::Error>, rusqlite::Error> { 147 + pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 94 148 let tx = self.conn.transaction()?; 95 - // let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 96 - Ok(SqliteWriter { tx: Some(tx) }) 149 + Ok(SqliteWriter { 150 + tx, 151 + stored: &mut self.stored, 152 + max: self.max_stored, 153 + }) 97 154 } 98 - fn get_reader( 99 - &self, 100 - ) -> Result<impl DiskReader<StorageError = rusqlite::Error>, rusqlite::Error> { 155 + pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 101 156 let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 102 157 Ok(SqliteReader { select_stmt }) 103 158 } 104 - } 105 - 106 - pub struct SqliteWriter<'conn> { 107 - tx: Option<rusqlite::Transaction<'conn>>, 108 - } 109 - 110 - /// oops careful in async 111 - impl Drop for SqliteWriter<'_> { 112 - fn drop(&mut self) { 113 - let tx = self.tx.take(); 114 - tx.unwrap().commit().unwrap(); 115 - } 116 - } 117 - 118 - impl DiskWriter<rusqlite::Error> for SqliteWriter<'_> { 119 - fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> rusqlite::Result<()> { 120 - let tx = self.tx.as_ref().unwrap(); 121 - let mut insert_stmt = tx.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 122 - insert_stmt.execute((key, val))?; 123 - Ok(()) 124 - } 125 - fn put_many(&mut self, kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>) -> rusqlite::Result<()> { 126 - let tx = self.tx.as_ref().unwrap(); 127 - let mut insert_stmt = tx.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?; 128 - for (k, v) in kv { 129 - insert_stmt.execute((k, v))?; 130 - } 131 - Ok(()) 132 - } 133 - } 134 - 135 - pub struct SqliteReader<'conn> { 136 - select_stmt: rusqlite::Statement<'conn>, 137 - } 138 - 139 - impl DiskReader for SqliteReader<'_> { 140 - type StorageError = rusqlite::Error; 141 - fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 142 - self.select_stmt 143 - .query_one((&key,), |row| row.get(0)) 144 - .optional() 145 - } 146 - } 147 - 148 - //////////// redb why not 149 - 150 - const REDB_TABLE: redb::TableDefinition<&[u8], &[u8]> = redb::TableDefinition::new("blocks"); 151 - 152 - pub struct RedbStore { 153 - path: PathBuf, 154 - } 155 - 156 - impl RedbStore { 157 - pub fn new(path: PathBuf) -> Self { 158 - Self { path } 159 - } 160 - } 161 - 162 - impl StorageErrorBase for redb::Error {} 163 - 164 - impl DiskStore for RedbStore { 165 - type StorageError = redb::Error; 166 - type Access = RedbAccess; 167 - async fn get_access(&mut self) -> Result<RedbAccess, redb::Error> { 168 - let path = self.path.clone(); 169 - let mb = 2_usize.pow(20); 170 - let db = tokio::task::spawn_blocking(move || { 171 - let db = redb::Database::builder() 172 - .set_cache_size(5 * mb) 173 - .create(path)?; 174 - Ok::<_, Self::StorageError>(db) 159 + /// Drop and recreate the kv table 160 + pub async fn reset(self) -> Result<Self, DiskError> { 161 + tokio::task::spawn_blocking(move || { 162 + Self::reset_tables(&self.conn)?; 163 + Ok(self) 175 164 }) 176 - .await 177 - .expect("join error")?; 178 - 179 - Ok(RedbAccess { db }) 180 - } 181 - } 182 - 183 - pub struct RedbAccess { 184 - db: redb::Database, 185 - } 186 - 187 - impl DiskAccess for RedbAccess { 188 - type StorageError = redb::Error; 189 - fn get_writer(&mut self) -> Result<impl DiskWriter<redb::Error>, redb::Error> { 190 - let mut tx = self.db.begin_write()?; 191 - tx.set_durability(redb::Durability::None)?; 192 - Ok(RedbWriter { tx: Some(tx) }) 193 - } 194 - fn get_reader(&self) -> Result<impl DiskReader<StorageError = redb::Error>, redb::Error> { 195 - let tx = self.db.begin_read()?; 196 - Ok(RedbReader { tx }) 165 + .await? 197 166 } 198 - } 199 - 200 - pub struct RedbWriter { 201 - tx: Option<redb::WriteTransaction>, 202 - } 203 - 204 - impl DiskWriter<redb::Error> for RedbWriter { 205 - fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), redb::Error> { 206 - let mut table = self.tx.as_ref().unwrap().open_table(REDB_TABLE)?; 207 - table.insert(&*key, &*val)?; 208 - Ok(()) 209 - } 210 - fn put_many(&mut self, kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<(), redb::Error> { 211 - let mut table = self.tx.as_ref().unwrap().open_table(REDB_TABLE)?; 212 - for (k, v) in kv { 213 - table.insert(&*k, &*v)?; 214 - } 167 + fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 + conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 + conn.execute( 170 + "CREATE TABLE blocks ( 171 + key BLOB PRIMARY KEY NOT NULL, 172 + val BLOB NOT NULL 173 + ) WITHOUT ROWID", 174 + (), 175 + )?; 215 176 Ok(()) 216 177 } 217 178 } 218 179 219 - /// oops careful in async 220 - impl Drop for RedbWriter { 221 - fn drop(&mut self) { 222 - let tx = self.tx.take(); 223 - tx.unwrap().commit().unwrap(); 224 - } 180 + pub(crate) struct SqliteWriter<'conn> { 181 + tx: rusqlite::Transaction<'conn>, 182 + stored: &'conn mut usize, 183 + max: usize, 225 184 } 226 185 227 - pub struct RedbReader { 228 - tx: redb::ReadTransaction, 229 - } 230 - 231 - impl DiskReader for RedbReader { 232 - type StorageError = redb::Error; 233 - fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, redb::Error> { 234 - let table = self.tx.open_table(REDB_TABLE)?; 235 - let rv = table.get(&*key)?.map(|guard| guard.value().to_vec()); 236 - Ok(rv) 237 - } 238 - } 239 - 240 - ///// rustcask?? 241 - 242 - pub struct RustcaskStore { 243 - path: PathBuf, 244 - } 245 - 246 - impl RustcaskStore { 247 - pub fn new(path: PathBuf) -> Self { 248 - Self { path } 249 - } 250 - } 251 - 252 - #[derive(Debug, thiserror::Error)] 253 - pub enum CaskError { 254 - #[error(transparent)] 255 - OpenError(#[from] rustcask::error::OpenError), 256 - #[error(transparent)] 257 - SetError(#[from] rustcask::error::SetError), 258 - #[error("failed to get key: {0}")] 259 - GetError(String), 260 - #[error("failed to ensure directory: {0}")] 261 - EnsureDirError(std::io::Error), 262 - } 263 - 264 - impl StorageErrorBase for CaskError {} 265 - 266 - impl DiskStore for RustcaskStore { 267 - type StorageError = CaskError; 268 - type Access = RustcaskAccess; 269 - async fn get_access(&mut self) -> Result<RustcaskAccess, CaskError> { 270 - let path = self.path.clone(); 271 - let db = tokio::task::spawn_blocking(move || { 272 - std::fs::create_dir_all(&path).map_err(CaskError::EnsureDirError)?; 273 - let db = rustcask::Rustcask::builder().open(&path)?; 274 - Ok::<_, Self::StorageError>(db) 275 - }) 276 - .await 277 - .expect("join error")?; 278 - 279 - Ok(RustcaskAccess { db }) 280 - } 281 - } 282 - 283 - pub struct RustcaskAccess { 284 - db: rustcask::Rustcask, 285 - } 286 - 287 - impl DiskAccess for RustcaskAccess { 288 - type StorageError = CaskError; 289 - fn get_writer(&mut self) -> Result<impl DiskWriter<CaskError>, CaskError> { 290 - Ok(RustcaskWriter { db: self.db.clone() }) 291 - } 292 - fn get_reader(&self) -> Result<impl DiskReader<StorageError = CaskError>, CaskError> { 293 - Ok(RustcaskReader { db: self.db.clone() }) 294 - } 295 - } 296 - 297 - pub struct RustcaskWriter { 298 - db: rustcask::Rustcask, 299 - } 300 - 301 - impl DiskWriter<CaskError> for RustcaskWriter { 302 - fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), CaskError> { 303 - self.db.set(key, val)?; 304 - Ok(()) 305 - } 306 - fn put_many(&mut self, kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<(), CaskError> { 307 - for (k, v) in kv { 308 - self.db.set(k, v)?; 186 + impl SqliteWriter<'_> { 187 + pub(crate) fn put_many( 188 + &mut self, 189 + kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 + ) -> Result<(), DriveError> { 191 + let mut insert_stmt = self 192 + .tx 193 + .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 + .map_err(DiskError::DbError)?; 195 + for pair in kv { 196 + let (k, v) = pair?; 197 + *self.stored += v.len(); 198 + if *self.stored > self.max { 199 + return Err(DiskError::MaxSizeExceeded.into()); 200 + } 201 + insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 309 202 } 310 203 Ok(()) 311 204 } 312 - } 313 - 314 - pub struct RustcaskReader { 315 - db: rustcask::Rustcask, 316 - } 317 - 318 - impl DiskReader for RustcaskReader { 319 - type StorageError = CaskError; 320 - fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, CaskError> { 321 - self.db 322 - .get(&key) 323 - .map_err(|e| CaskError::GetError(e.to_string())) 324 - } 325 - } 326 - 327 - 328 - ///////// heeeeeeeeeeeeed 329 - 330 - type HeedBytes = heed::types::SerdeBincode<Vec<u8>>; 331 - type HeedDb = heed::Database<HeedBytes, HeedBytes>; 332 - // type HeedDb = heed::Database<Vec<u8>, Vec<u8>>; 333 - 334 - pub struct HeedStore { 335 - path: PathBuf, 336 - } 337 - 338 - impl HeedStore { 339 - pub fn new(path: PathBuf) -> Self { 340 - Self { path } 341 - } 342 - } 343 - 344 - impl StorageErrorBase for heed::Error {} 345 - 346 - impl DiskStore for HeedStore { 347 - type StorageError = heed::Error; 348 - type Access = HeedAccess; 349 - async fn get_access(&mut self) -> Result<HeedAccess, heed::Error> { 350 - let path = self.path.clone(); 351 - let env = tokio::task::spawn_blocking(move || { 352 - std::fs::create_dir_all(&path).unwrap(); 353 - let env = unsafe { 354 - heed::EnvOpenOptions::new() 355 - .map_size(1 * 2_usize.pow(30)) 356 - .open(path)? 357 - }; 358 - Ok::<_, Self::StorageError>(env) 359 - }) 360 - .await 361 - .expect("join error")?; 362 - 363 - Ok(HeedAccess { env, db: None }) 364 - } 365 - } 366 - 367 - pub struct HeedAccess { 368 - env: heed::Env, 369 - db: Option<HeedDb>, 370 - } 371 - 372 - impl DiskAccess for HeedAccess { 373 - type StorageError = heed::Error; 374 - fn get_writer(&mut self) -> Result<impl DiskWriter<heed::Error>, heed::Error> { 375 - let mut tx = self.env.write_txn()?; 376 - let db = self.env.create_database(&mut tx, None)?; 377 - self.db = Some(db.clone()); 378 - Ok(HeedWriter { tx: Some(tx), db }) 379 - } 380 - fn get_reader(&self) -> Result<impl DiskReader<StorageError = heed::Error>, heed::Error> { 381 - let tx = self.env.read_txn()?; 382 - let db = self.db.expect("should have called get_writer first"); 383 - Ok(HeedReader { tx, db }) 384 - } 385 - } 386 - 387 - pub struct HeedWriter<'tx> { 388 - tx: Option<heed::RwTxn<'tx>>, 389 - db: HeedDb, 390 - } 391 - 392 - impl DiskWriter<heed::Error> for HeedWriter<'_> { 393 - fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), heed::Error> { 394 - let mut tx = self.tx.as_mut().unwrap(); 395 - self.db.put(&mut tx, &key, &val)?; 396 - Ok(()) 397 - } 398 - fn put_many(&mut self, kv: impl Iterator<Item = (Vec<u8>, Vec<u8>)>) -> Result<(), heed::Error> { 399 - let mut tx = self.tx.as_mut().unwrap(); 400 - for (k, v) in kv { 401 - self.db.put(&mut tx, &k, &v)?; 402 - } 205 + pub fn commit(self) -> Result<(), DiskError> { 206 + self.tx.commit()?; 403 207 Ok(()) 404 208 } 405 209 } 406 210 407 - /// oops careful in async 408 - impl Drop for HeedWriter<'_> { 409 - fn drop(&mut self) { 410 - let tx = self.tx.take(); 411 - tx.unwrap().commit().unwrap(); 412 - } 211 + pub(crate) struct SqliteReader<'conn> { 212 + select_stmt: rusqlite::Statement<'conn>, 413 213 } 414 214 415 - pub struct HeedReader<'tx> { 416 - tx: heed::RoTxn<'tx, heed::WithTls>, 417 - db: HeedDb, 418 - } 419 - 420 - impl DiskReader for HeedReader<'_> { 421 - type StorageError = heed::Error; 422 - fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, heed::Error> { 423 - self.db.get(&self.tx, &key) 215 + impl SqliteReader<'_> { 216 + pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 + self.select_stmt 218 + .query_one((&key,), |row| row.get(0)) 219 + .optional() 424 220 } 425 221 }
+459 -202
src/drive.rs
··· 1 - //! Consume an MST block stream, producing an ordered stream of records 1 + //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 2 3 - use crate::disk::{DiskAccess, DiskStore, DiskWriter, StorageErrorBase}; 3 + use crate::disk::{DiskError, DiskStore}; 4 + use crate::process::Processable; 4 5 use ipld_core::cid::Cid; 5 6 use iroh_car::CarReader; 6 - use serde::de::DeserializeOwned; 7 7 use serde::{Deserialize, Serialize}; 8 8 use std::collections::HashMap; 9 9 use std::convert::Infallible; 10 - use tokio::io::AsyncRead; 10 + use tokio::{io::AsyncRead, sync::mpsc}; 11 11 12 12 use crate::mst::{Commit, Node}; 13 - use crate::walk::{DiskTrip, Step, Trip, Walker}; 13 + use crate::walk::{Step, WalkError, Walker}; 14 14 15 15 /// Errors that can happen while consuming and emitting blocks and records 16 16 #[derive(Debug, thiserror::Error)] ··· 24 24 #[error("The MST block {0} could not be found")] 25 25 MissingBlock(Cid), 26 26 #[error("Failed to walk the mst tree: {0}")] 27 - Tripped(#[from] Trip), 27 + WalkError(#[from] WalkError), 28 28 #[error("CAR file had no roots")] 29 29 MissingRoot, 30 - } 31 - 32 - #[derive(Debug, thiserror::Error)] 33 - pub enum DiskDriveError<E: StorageErrorBase> { 34 - #[error("Error from iroh_car: {0}")] 35 - CarReader(#[from] iroh_car::Error), 36 - #[error("Failed to decode commit block: {0}")] 37 - BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 38 30 #[error("Storage error")] 39 - StorageError(#[from] E), 40 - #[error("The Commit block reference by the root was not found")] 41 - MissingCommit, 42 - #[error("The MST block {0} could not be found")] 43 - MissingBlock(Cid), 31 + StorageError(#[from] DiskError), 44 32 #[error("Encode error: {0}")] 45 33 BincodeEncodeError(#[from] bincode::error::EncodeError), 46 - #[error("Decode error: {0}")] 34 + #[error("Tried to send on a closed channel")] 35 + ChannelSendError, // SendError takes <T> which we don't need 36 + #[error("Failed to join a task: {0}")] 37 + JoinError(#[from] tokio::task::JoinError), 38 + } 39 + 40 + #[derive(Debug, thiserror::Error)] 41 + pub enum DecodeError { 42 + #[error(transparent)] 47 43 BincodeDecodeError(#[from] bincode::error::DecodeError), 48 - #[error("disk tripped: {0}")] 49 - DiskTripped(#[from] DiskTrip<E>), 44 + #[error("extra bytes remained after decoding")] 45 + ExtraGarbage, 50 46 } 51 47 52 - pub trait Processable: Clone + Serialize + DeserializeOwned { 53 - /// the additional size taken up (not including its mem::size_of) 54 - fn get_size(&self) -> usize; 55 - } 48 + /// An in-order chunk of Rkey + (processed) Block pairs 49 + pub type BlockChunk<T> = Vec<(String, T)>; 56 50 57 51 #[derive(Debug, Clone, Serialize, Deserialize)] 58 - pub enum MaybeProcessedBlock<T> { 52 + pub(crate) enum MaybeProcessedBlock<T> { 59 53 /// A block that's *probably* a Node (but we can't know yet) 60 54 /// 61 55 /// It *can be* a record that suspiciously looks a lot like a node, so we ··· 97 91 } 98 92 } 99 93 100 - pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> { 101 - Lil(Commit, MemDriver<T>), 102 - Big(BigCar<R, T>), 94 + impl<T> MaybeProcessedBlock<T> { 95 + fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 + if Node::could_be(&data) { 97 + MaybeProcessedBlock::Raw(data) 98 + } else { 99 + MaybeProcessedBlock::Processed(process(data)) 100 + } 101 + } 102 + } 103 + 104 + /// Read a CAR file, buffering blocks in memory or to disk 105 + pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 + /// All blocks fit within the memory limit 107 + /// 108 + /// You probably want to check the commit's signature. You can go ahead and 109 + /// walk the MST right away. 110 + Memory(Commit, MemDriver<T>), 111 + /// Blocks exceed the memory limit 112 + /// 113 + /// You'll need to provide a disk storage to continue. The commit will be 114 + /// returned and can be validated only once all blocks are loaded. 115 + Disk(NeedDisk<R, T>), 116 + } 117 + 118 + /// Builder-style driver setup 119 + #[derive(Debug, Clone)] 120 + pub struct DriverBuilder { 121 + pub mem_limit_mb: usize, 103 122 } 104 123 105 - pub async fn load_car<R: AsyncRead + Unpin, T: Processable>( 106 - reader: R, 107 - process: fn(Vec<u8>) -> T, 108 - max_size: usize, 109 - ) -> Result<Vehicle<R, T>, DriveError> { 110 - let mut mem_blocks = HashMap::new(); 124 + impl Default for DriverBuilder { 125 + fn default() -> Self { 126 + Self { mem_limit_mb: 16 } 127 + } 128 + } 129 + 130 + impl DriverBuilder { 131 + /// Begin configuring the driver with defaults 132 + pub fn new() -> Self { 133 + Default::default() 134 + } 135 + /// Set the in-memory size limit, in MiB 136 + /// 137 + /// Default: 16 MiB 138 + pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 139 + Self { 140 + mem_limit_mb: new_limit, 141 + } 142 + } 143 + /// Set the block processor 144 + /// 145 + /// Default: noop, raw blocks will be emitted 146 + pub fn with_block_processor<T: Processable>( 147 + self, 148 + p: fn(Vec<u8>) -> T, 149 + ) -> DriverBuilderWithProcessor<T> { 150 + DriverBuilderWithProcessor { 151 + mem_limit_mb: self.mem_limit_mb, 152 + block_processor: p, 153 + } 154 + } 155 + /// Begin processing an atproto MST from a CAR file 156 + pub async fn load_car<R: AsyncRead + Unpin>( 157 + &self, 158 + reader: R, 159 + ) -> Result<Driver<R, Vec<u8>>, DriveError> { 160 + Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 161 + } 162 + } 163 + 164 + /// Builder-style driver intermediate step 165 + /// 166 + /// start from `DriverBuilder` 167 + #[derive(Debug, Clone)] 168 + pub struct DriverBuilderWithProcessor<T: Processable> { 169 + pub mem_limit_mb: usize, 170 + pub block_processor: fn(Vec<u8>) -> T, 171 + } 172 + 173 + impl<T: Processable> DriverBuilderWithProcessor<T> { 174 + /// Set the in-memory size limit, in MiB 175 + /// 176 + /// Default: 16 MiB 177 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 178 + self.mem_limit_mb = new_limit; 179 + self 180 + } 181 + /// Begin processing an atproto MST from a CAR file 182 + pub async fn load_car<R: AsyncRead + Unpin>( 183 + &self, 184 + reader: R, 185 + ) -> Result<Driver<R, T>, DriveError> { 186 + Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 187 + } 188 + } 189 + 190 + impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 191 + /// Begin processing an atproto MST from a CAR file 192 + /// 193 + /// Blocks will be loaded, processed, and buffered in memory. If the entire 194 + /// processed size is under the `mem_limit_mb` limit, a `Driver::Memory` 195 + /// will be returned along with a `Commit` ready for validation. 196 + /// 197 + /// If the `mem_limit_mb` limit is reached before loading all blocks, the 198 + /// partial state will be returned as `Driver::Disk(needed)`, which can be 199 + /// resumed by providing a `SqliteStorage` for on-disk block storage. 200 + pub async fn load_car( 201 + reader: R, 202 + process: fn(Vec<u8>) -> T, 203 + mem_limit_mb: usize, 204 + ) -> Result<Driver<R, T>, DriveError> { 205 + let max_size = mem_limit_mb * 2_usize.pow(20); 206 + let mut mem_blocks = HashMap::new(); 207 + 208 + let mut car = CarReader::new(reader).await?; 209 + 210 + let root = *car 211 + .header() 212 + .roots() 213 + .first() 214 + .ok_or(DriveError::MissingRoot)?; 215 + log::debug!("root: {root:?}"); 111 216 112 - let mut car = CarReader::new(reader).await?; 217 + let mut commit = None; 113 218 114 - let root = *car 115 - .header() 116 - .roots() 117 - .first() 118 - .ok_or(DriveError::MissingRoot)?; 119 - log::debug!("root: {root:?}"); 219 + // try to load all the blocks into memory 220 + let mut mem_size = 0; 221 + while let Some((cid, data)) = car.next_block().await? { 222 + // the root commit is a Special Third Kind of block that we need to make 223 + // sure not to optimistically send to the processing function 224 + if cid == root { 225 + let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 226 + commit = Some(c); 227 + continue; 228 + } 120 229 121 - let mut commit = None; 230 + // remaining possible types: node, record, other. optimistically process 231 + let maybe_processed = MaybeProcessedBlock::maybe(process, data); 122 232 123 - // try to load all the blocks into memory 124 - let mut mem_size = 0; 125 - while let Some((cid, data)) = car.next_block().await? { 126 - // the root commit is a Special Third Kind of block that we need to make 127 - // sure not to optimistically send to the processing function 128 - if cid == root { 129 - let c: Commit = serde_ipld_dagcbor::from_slice(&data)?; 130 - commit = Some(c); 131 - continue; 233 + // stash (maybe processed) blocks in memory as long as we have room 234 + mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 235 + mem_blocks.insert(cid, maybe_processed); 236 + if mem_size >= max_size { 237 + return Ok(Driver::Disk(NeedDisk { 238 + car, 239 + root, 240 + process, 241 + max_size, 242 + mem_blocks, 243 + commit, 244 + })); 245 + } 132 246 } 133 247 134 - // remaining possible types: node, record, other. optimistically process 135 - // TODO: get the actual in-memory size to compute disk spill 136 - let maybe_processed = if Node::could_be(&data) { 137 - MaybeProcessedBlock::Raw(data) 138 - } else { 139 - MaybeProcessedBlock::Processed(process(data)) 140 - }; 248 + // all blocks loaded and we fit in memory! hopefully we found the commit... 249 + let commit = commit.ok_or(DriveError::MissingCommit)?; 250 + 251 + let walker = Walker::new(commit.data); 141 252 142 - // stash (maybe processed) blocks in memory as long as we have room 143 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 144 - mem_blocks.insert(cid, maybe_processed); 145 - if mem_size >= max_size { 146 - return Ok(Vehicle::Big(BigCar { 147 - car, 148 - root, 253 + Ok(Driver::Memory( 254 + commit, 255 + MemDriver { 256 + blocks: mem_blocks, 257 + walker, 149 258 process, 150 - max_size, 151 - mem_blocks, 152 - commit, 153 - })); 154 - } 259 + }, 260 + )) 155 261 } 262 + } 156 263 157 - // all blocks loaded and we fit in memory! hopefully we found the commit... 158 - let commit = commit.ok_or(DriveError::MissingCommit)?; 264 + /// The core driver between the block stream and MST walker 265 + /// 266 + /// In the future, PDSs will export CARs in a stream-friendly order that will 267 + /// enable processing them with tiny memory overhead. But that future is not 268 + /// here yet. 269 + /// 270 + /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 271 + /// optimistic stream features: we load all block first, then walk the MST. 272 + /// 273 + /// This makes things much simpler: we only need to worry about spilling to disk 274 + /// in one place, and we always have a reasonable expecatation about how much 275 + /// work the init function will do. We can drop the CAR reader before walking, 276 + /// so the sync/async boundaries become a little easier to work around. 277 + #[derive(Debug)] 278 + pub struct MemDriver<T: Processable> { 279 + blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 280 + walker: Walker, 281 + process: fn(Vec<u8>) -> T, 282 + } 159 283 160 - let walker = Walker::new(commit.data); 284 + impl<T: Processable> MemDriver<T> { 285 + /// Step through the record outputs, in rkey order 286 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 287 + let mut out = Vec::with_capacity(n); 288 + for _ in 0..n { 289 + // walk as far as we can until we run out of blocks or find a record 290 + match self.walker.step(&mut self.blocks, self.process)? { 291 + Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 292 + Step::Finish => break, 293 + Step::Found { rkey, data } => { 294 + out.push((rkey, data)); 295 + continue; 296 + } 297 + }; 298 + } 161 299 162 - Ok(Vehicle::Lil( 163 - commit, 164 - MemDriver { 165 - blocks: mem_blocks, 166 - walker, 167 - process, 168 - }, 169 - )) 300 + if out.is_empty() { 301 + Ok(None) 302 + } else { 303 + Ok(Some(out)) 304 + } 305 + } 170 306 } 171 307 172 - /// a paritally memory-loaded car file that needs disk spillover to continue 173 - pub struct BigCar<R: AsyncRead + Unpin, T: Processable> { 308 + /// A partially memory-loaded car file that needs disk spillover to continue 309 + pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 174 310 car: CarReader<R>, 175 311 root: Cid, 176 312 process: fn(Vec<u8>) -> T, ··· 183 319 bincode::serde::encode_to_vec(v, bincode::config::standard()) 184 320 } 185 321 186 - pub fn decode<T: Processable>(bytes: &[u8]) -> Result<T, bincode::error::DecodeError> { 322 + pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 187 323 let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 188 - assert_eq!(n, bytes.len(), "expected to decode all bytes"); // TODO 324 + if n != bytes.len() { 325 + return Err(DecodeError::ExtraGarbage); 326 + } 189 327 Ok(t) 190 328 } 191 329 192 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> BigCar<R, T> { 193 - pub async fn finish_loading<S: DiskStore>( 330 + impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 331 + pub async fn finish_loading( 194 332 mut self, 195 - mut store: S, 196 - ) -> Result<(Commit, BigCarReady<T, S::Access>), DiskDriveError<S::StorageError>> 197 - where 198 - S::Access: Send + 'static, 199 - S::StorageError: 'static, 200 - { 201 - // set up access for real 202 - let mut access = store.get_access().await?; 203 - 204 - // move access in and back out so we can manage lifetimes 333 + mut store: DiskStore, 334 + ) -> Result<(Commit, DiskDriver<T>), DriveError> { 335 + // move store in and back out so we can manage lifetimes 205 336 // dump mem blocks into the store 206 - access = tokio::task::spawn(async move { 207 - let mut writer = access.get_writer()?; 337 + store = tokio::task::spawn(async move { 338 + let mut writer = store.get_writer()?; 208 339 209 340 let kvs = self 210 341 .mem_blocks 211 342 .into_iter() 212 - .map(|(k, v)| (k.to_bytes(), encode(v).unwrap())); 343 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 213 344 214 345 writer.put_many(kvs)?; 346 + writer.commit()?; 347 + Ok::<_, DriveError>(store) 348 + }) 349 + .await??; 215 350 216 - drop(writer); // cannot outlive access 217 - Ok::<_, DiskDriveError<S::StorageError>>(access) 218 - }) 219 - .await 220 - .unwrap()?; 351 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 352 + 353 + let store_worker = tokio::task::spawn_blocking(move || { 354 + let mut writer = store.get_writer()?; 355 + 356 + while let Some(chunk) = rx.blocking_recv() { 357 + let kvs = chunk 358 + .into_iter() 359 + .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 + writer.put_many(kvs)?; 361 + } 362 + 363 + writer.commit()?; 364 + Ok::<_, DriveError>(store) 365 + }); // await later 221 366 222 367 // dump the rest to disk (in chunks) 368 + log::debug!("dumping the rest of the stream..."); 223 369 loop { 224 - let mut chunk = vec![]; 225 370 let mut mem_size = 0; 371 + let mut chunk = vec![]; 226 372 loop { 227 373 let Some((cid, data)) = self.car.next_block().await? else { 228 374 break; ··· 235 381 } 236 382 // remaining possible types: node, record, other. optimistically process 237 383 // TODO: get the actual in-memory size to compute disk spill 238 - let maybe_processed = if Node::could_be(&data) { 239 - MaybeProcessedBlock::Raw(data) 240 - } else { 241 - MaybeProcessedBlock::Processed((self.process)(data)) 242 - }; 384 + let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 243 385 mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 244 386 chunk.push((cid, maybe_processed)); 245 387 if mem_size >= self.max_size { 388 + // soooooo if we're setting the db cache to max_size and then letting 389 + // multiple chunks in the queue that are >= max_size, then at any time 390 + // we might be using some multiple of max_size? 246 391 break; 247 392 } 248 393 } 249 394 if chunk.is_empty() { 250 395 break; 251 396 } 397 + tx.send(chunk) 398 + .await 399 + .map_err(|_| DriveError::ChannelSendError)?; 400 + } 401 + drop(tx); 402 + log::debug!("done. waiting for worker to finish..."); 252 403 253 - // move access in and back out so we can manage lifetimes 254 - // dump mem blocks into the store 255 - access = tokio::task::spawn_blocking(move || { 256 - let mut writer = access.get_writer()?; 404 + store = store_worker.await??; 257 405 258 - let kvs = chunk 259 - .into_iter() 260 - .map(|(k, v)| (k.to_bytes(), encode(v).unwrap())); 406 + log::debug!("worker finished."); 261 407 262 - writer.put_many(kvs)?; 263 - 264 - drop(writer); // cannot outlive access 265 - Ok::<_, DiskDriveError<S::StorageError>>(access) 266 - }) 267 - .await 268 - .unwrap()?; // TODO 269 - } 270 - 271 - let commit = self.commit.ok_or(DiskDriveError::MissingCommit)?; 408 + let commit = self.commit.ok_or(DriveError::MissingCommit)?; 272 409 273 410 let walker = Walker::new(commit.data); 274 411 275 412 Ok(( 276 413 commit, 277 - BigCarReady { 414 + DiskDriver { 278 415 process: self.process, 279 - access, 280 - walker, 416 + state: Some(BigState { store, walker }), 281 417 }, 282 418 )) 283 419 } 284 420 } 285 421 286 - pub struct BigCarReady<T: Clone, A: DiskAccess> { 422 + struct BigState { 423 + store: DiskStore, 424 + walker: Walker, 425 + } 426 + 427 + /// MST walker that reads from disk instead of an in-memory hashmap 428 + pub struct DiskDriver<T: Clone> { 287 429 process: fn(Vec<u8>) -> T, 288 - access: A, 289 - walker: Walker, 430 + state: Option<BigState>, 431 + } 432 + 433 + // for doctests only 434 + #[doc(hidden)] 435 + pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 436 + use crate::process::noop; 437 + DiskDriver { 438 + process: noop, 439 + state: None, 440 + } 290 441 } 291 442 292 - impl<T: Processable + Send + 'static, A: DiskAccess + Send + 'static> BigCarReady<T, A> { 293 - pub async fn next_chunk( 294 - mut self, 443 + impl<T: Processable + Send + 'static> DiskDriver<T> { 444 + /// Walk the MST returning up to `n` rkey + record pairs 445 + /// 446 + /// ```no_run 447 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 448 + /// # #[tokio::main] 449 + /// # async fn main() -> Result<(), DriveError> { 450 + /// # let mut disk_driver = _get_fake_disk_driver(); 451 + /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 452 + /// for (rkey, record) in pairs { 453 + /// println!("{rkey}: size={}", record.len()); 454 + /// } 455 + /// } 456 + /// let store = disk_driver.reset_store().await?; 457 + /// # Ok(()) 458 + /// # } 459 + /// ``` 460 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 461 + let process = self.process; 462 + 463 + // state should only *ever* be None transiently while inside here 464 + let mut state = self.state.take().expect("DiskDriver must have Some(state)"); 465 + 466 + // the big pain here is that we don't want to leave self.state in an 467 + // invalid state (None), so all the error paths have to make sure it 468 + // comes out again. 469 + let (state, res) = tokio::task::spawn_blocking( 470 + move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 471 + let mut reader_res = state.store.get_reader(); 472 + let reader: &mut _ = match reader_res { 473 + Ok(ref mut r) => r, 474 + Err(ref mut e) => { 475 + // unfortunately we can't return the error directly because 476 + // (for some reason) it's attached to the lifetime of the 477 + // reader? 478 + // hack a mem::swap so we can get it out :/ 479 + let e_swapped = e.steal(); 480 + // the pain: `state` *has to* outlive the reader 481 + drop(reader_res); 482 + return (state, Err(e_swapped.into())); 483 + } 484 + }; 485 + 486 + let mut out = Vec::with_capacity(n); 487 + 488 + for _ in 0..n { 489 + // walk as far as we can until we run out of blocks or find a record 490 + let step = match state.walker.disk_step(reader, process) { 491 + Ok(s) => s, 492 + Err(e) => { 493 + // the pain: `state` *has to* outlive the reader 494 + drop(reader_res); 495 + return (state, Err(e.into())); 496 + } 497 + }; 498 + match step { 499 + Step::Missing(cid) => { 500 + // the pain: `state` *has to* outlive the reader 501 + drop(reader_res); 502 + return (state, Err(DriveError::MissingBlock(cid))); 503 + } 504 + Step::Finish => break, 505 + Step::Found { rkey, data } => out.push((rkey, data)), 506 + }; 507 + } 508 + 509 + // `state` *has to* outlive the reader 510 + drop(reader_res); 511 + 512 + (state, Ok::<_, DriveError>(out)) 513 + }, 514 + ) 515 + .await?; // on tokio JoinError, we'll be left with invalid state :( 516 + 517 + // *must* restore state before dealing with the actual result 518 + self.state = Some(state); 519 + 520 + let out = res?; 521 + 522 + if out.is_empty() { 523 + Ok(None) 524 + } else { 525 + Ok(Some(out)) 526 + } 527 + } 528 + 529 + fn read_tx_blocking( 530 + &mut self, 295 531 n: usize, 296 - ) -> Result<(Self, Option<Vec<(String, T)>>), DiskDriveError<A::StorageError>> 297 - where 298 - A::StorageError: Send, 299 - { 300 - let mut out = Vec::with_capacity(n); 301 - (self, out) = tokio::task::spawn_blocking(move || { 302 - let access = self.access; 303 - let mut reader = access.get_reader()?; 532 + tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 533 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 534 + let BigState { store, walker } = self.state.as_mut().expect("valid state"); 535 + let mut reader = match store.get_reader() { 536 + Ok(r) => r, 537 + Err(e) => return tx.blocking_send(Err(e.into())), 538 + }; 539 + 540 + loop { 541 + let mut out: BlockChunk<T> = Vec::with_capacity(n); 304 542 305 543 for _ in 0..n { 306 544 // walk as far as we can until we run out of blocks or find a record 307 - match self.walker.disk_step(&mut reader, self.process)? { 308 - Step::Missing(cid) => return Err(DiskDriveError::MissingBlock(cid)), 309 - Step::Finish => break, 310 - Step::Step { rkey, data } => { 545 + 546 + let step = match walker.disk_step(&mut reader, self.process) { 547 + Ok(s) => s, 548 + Err(e) => return tx.blocking_send(Err(e.into())), 549 + }; 550 + 551 + match step { 552 + Step::Missing(cid) => { 553 + return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 554 + } 555 + Step::Finish => return Ok(()), 556 + Step::Found { rkey, data } => { 311 557 out.push((rkey, data)); 312 558 continue; 313 559 } 314 560 }; 315 561 } 316 562 317 - drop(reader); // cannot outlive access 318 - self.access = access; 319 - Ok::<_, DiskDriveError<A::StorageError>>((self, out)) 320 - }) 321 - .await 322 - .unwrap()?; // TODO 563 + if out.is_empty() { 564 + break; 565 + } 566 + tx.blocking_send(Ok(out))?; 567 + } 323 568 324 - if out.is_empty() { 325 - Ok((self, None)) 326 - } else { 327 - Ok((self, Some(out))) 328 - } 569 + Ok(()) 329 570 } 330 - } 571 + 572 + /// Spawn the disk reading task into a tokio blocking thread 573 + /// 574 + /// The idea is to avoid so much sending back and forth to the blocking 575 + /// thread, letting a blocking task do all the disk reading work and sending 576 + /// records and rkeys back through an `mpsc` channel instead. 577 + /// 578 + /// This might also allow the disk work to continue while processing the 579 + /// records. It's still not yet clear if this method actually has much 580 + /// benefit over just using `.next_chunk(n)`. 581 + /// 582 + /// ```no_run 583 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 584 + /// # #[tokio::main] 585 + /// # async fn main() -> Result<(), DriveError> { 586 + /// # let mut disk_driver = _get_fake_disk_driver(); 587 + /// let (mut rx, join) = disk_driver.to_channel(512); 588 + /// while let Some(recvd) = rx.recv().await { 589 + /// let pairs = recvd?; 590 + /// for (rkey, record) in pairs { 591 + /// println!("{rkey}: size={}", record.len()); 592 + /// } 593 + /// 594 + /// } 595 + /// let store = join.await?.reset_store().await?; 596 + /// # Ok(()) 597 + /// # } 598 + /// ``` 599 + pub fn to_channel( 600 + mut self, 601 + n: usize, 602 + ) -> ( 603 + mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 604 + tokio::task::JoinHandle<Self>, 605 + ) { 606 + let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 331 607 332 - /// The core driver between the block stream and MST walker 333 - /// 334 - /// In the future, PDSs will export CARs in a stream-friendly order that will 335 - /// enable processing them with tiny memory overhead. But that future is not 336 - /// here yet. 337 - /// 338 - /// CARs are almost always in a stream-unfriendly order, so I'm reverting the 339 - /// optimistic stream features: we load all block first, then walk the MST. 340 - /// 341 - /// This makes things much simpler: we only need to worry about spilling to disk 342 - /// in one place, and we always have a reasonable expecatation about how much 343 - /// work the init function will do. We can drop the CAR reader before walking, 344 - /// so the sync/async boundaries become a little easier to work around. 345 - #[derive(Debug)] 346 - pub struct MemDriver<T: Processable> { 347 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 348 - walker: Walker, 349 - process: fn(Vec<u8>) -> T, 350 - } 608 + // sketch: this worker is going to be allowed to execute without a join handle 609 + let chan_task = tokio::task::spawn_blocking(move || { 610 + if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) { 611 + log::debug!("big car reader exited early due to dropped receiver channel"); 612 + } 613 + self 614 + }); 351 615 352 - impl<T: Processable> MemDriver<T> { 353 - /// Manually step through the record outputs 354 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<(String, T)>>, DriveError> { 355 - let mut out = Vec::with_capacity(n); 356 - for _ in 0..n { 357 - // walk as far as we can until we run out of blocks or find a record 358 - match self.walker.step(&mut self.blocks, self.process)? { 359 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 360 - Step::Finish => break, 361 - Step::Step { rkey, data } => { 362 - out.push((rkey, data)); 363 - continue; 364 - } 365 - }; 366 - } 616 + (rx, chan_task) 617 + } 367 618 368 - if out.is_empty() { 369 - Ok(None) 370 - } else { 371 - Ok(Some(out)) 372 - } 619 + /// Reset the disk storage so it can be reused. You must call this. 620 + /// 621 + /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 622 + /// calls, that would be risky in an async context. For now you just have to 623 + /// carefully make sure you call it. 624 + /// 625 + /// The sqlite store is returned, so it can be reused for another 626 + /// `DiskDriver`. 627 + pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 628 + let BigState { store, .. } = self.state.take().expect("valid state"); 629 + Ok(store.reset().await?) 373 630 } 374 631 }
+84 -5
src/lib.rs
··· 1 - //! Fast and robust atproto CAR file processing in rust 2 - //! 3 - //! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples) 1 + /*! 2 + A robust CAR file -> MST walker for atproto 3 + 4 + Small CARs have their blocks buffered in memory. If a configurable memory limit 5 + is reached while reading blocks, CAR reading is suspended, and can be continued 6 + by providing disk storage to buffer the CAR blocks instead. 7 + 8 + A `process` function can be provided for tasks where records are transformed 9 + into a smaller representation, to save memory (and disk) during block reading. 10 + 11 + Once blocks are loaded, the MST is walked and emitted as chunks of pairs of 12 + `(rkey, processed_block)` pairs, in order (depth first, left-to-right). 13 + 14 + Some MST validations are applied 15 + - Keys must appear in order 16 + - Keys must be at the correct MST tree depth 17 + 18 + `iroh_car` additionally applies a block size limit of `2MiB`. 19 + 20 + ``` 21 + use repo_stream::{Driver, DriverBuilder, DiskBuilder}; 22 + 23 + # #[tokio::main] 24 + # async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 + # let reader = include_bytes!("../car-samples/tiny.car").as_slice(); 26 + let mut total_size = 0; 27 + 28 + match DriverBuilder::new() 29 + .with_mem_limit_mb(10) 30 + .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 + .load_car(reader) 32 + .await? 33 + { 34 + 35 + // if all blocks fit within memory 36 + Driver::Memory(_commit, mut driver) => { 37 + while let Some(chunk) = driver.next_chunk(256).await? { 38 + for (_rkey, size) in chunk { 39 + total_size += size; 40 + } 41 + } 42 + }, 43 + 44 + // if the CAR was too big for in-memory processing 45 + Driver::Disk(paused) => { 46 + // set up a disk store we can spill to 47 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 48 + // do the spilling, get back a (similar) driver 49 + let (_commit, mut driver) = paused.finish_loading(store).await?; 50 + 51 + while let Some(chunk) = driver.next_chunk(256).await? { 52 + for (_rkey, size) in chunk { 53 + total_size += size; 54 + } 55 + } 56 + 57 + // clean up the disk store (drop tables etc) 58 + driver.reset_store().await?; 59 + } 60 + }; 61 + println!("sum of size of all records: {total_size}"); 62 + # Ok(()) 63 + # } 64 + ``` 65 + 66 + Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going 67 + ahead and eagerly using disk I/O. This means you have to write a bit more code 68 + to handle both cases, but it allows you to have finer control over resource 69 + usage. For example, you can drive a number of parallel memory CAR workers, and 70 + separately have a different number of disk workers picking up suspended disk 71 + tasks from a queue. 72 + 73 + Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples). 74 + 75 + */ 76 + 77 + pub mod mst; 78 + mod walk; 4 79 5 80 pub mod disk; 6 81 pub mod drive; 7 - pub mod mst; 8 - pub mod walk; 82 + pub mod process; 83 + 84 + pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk}; 86 + pub use mst::Commit; 87 + pub use process::Processable;
+4 -8
src/mst.rs
··· 39 39 /// MST node data schema 40 40 #[derive(Debug, Deserialize, PartialEq)] 41 41 #[serde(deny_unknown_fields)] 42 - pub struct Node { 42 + pub(crate) struct Node { 43 43 /// link to sub-tree Node on a lower level and with all keys sorting before 44 44 /// keys at this node 45 45 #[serde(rename = "l")] ··· 62 62 /// so if a block *could be* a node, any record converter must postpone 63 63 /// processing. if it turns out it happens to be a very node-looking record, 64 64 /// well, sorry, it just has to only be processed later when that's known. 65 - pub fn could_be(bytes: impl AsRef<[u8]>) -> bool { 65 + pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 66 const NODE_FINGERPRINT: [u8; 3] = [ 67 67 0xA2, // map length 2 (for "l" and "e" keys) 68 68 0x61, // text length 1 ··· 83 83 /// with an empty array of entries. This is the only situation in which a 84 84 /// tree may contain an empty leaf node which does not either contain keys 85 85 /// ("entries") or point to a sub-tree containing entries. 86 - /// 87 - /// TODO: to me this is slightly unclear with respect to `l` (ask someone). 88 - /// ...is that what "The top of the tree must not be a an empty node which 89 - /// only points to a sub-tree." is referring to? 90 - pub fn is_empty(&self) -> bool { 86 + pub(crate) fn is_empty(&self) -> bool { 91 87 self.left.is_none() && self.entries.is_empty() 92 88 } 93 89 } ··· 95 91 /// TreeEntry object 96 92 #[derive(Debug, Deserialize, PartialEq)] 97 93 #[serde(deny_unknown_fields)] 98 - pub struct Entry { 94 + pub(crate) struct Entry { 99 95 /// count of bytes shared with previous TreeEntry in this Node (if any) 100 96 #[serde(rename = "p")] 101 97 pub prefix_len: usize,
+108
src/process.rs
··· 1 + /*! 2 + Record processor function output trait 3 + 4 + The return type must satisfy the `Processable` trait, which requires: 5 + 6 + - `Clone` because two rkeys can refer to the same record by CID, which may 7 + only appear once in the CAR file. 8 + - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 + 10 + One required function must be implemented, `get_size()`: this should return the 11 + approximate total off-stack size of the type. (the on-stack size will be added 12 + automatically via `std::mem::get_size`). 13 + 14 + Note that it is **not guaranteed** that the `process` function will run on a 15 + block before storing it in memory or on disk: it's not possible to know if a 16 + block is a record without actually walking the MST, so the best we can do is 17 + apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 + store the raw block bytes. 19 + 20 + Here's a silly processing function that just collects 'eyy's found in the raw 21 + record bytes 22 + 23 + ``` 24 + # use repo_stream::Processable; 25 + # use serde::{Serialize, Deserialize}; 26 + #[derive(Debug, Clone, Serialize, Deserialize)] 27 + struct Eyy(usize, String); 28 + 29 + impl Processable for Eyy { 30 + fn get_size(&self) -> usize { 31 + // don't need to compute the usize, it's on the stack 32 + self.1.capacity() // in-mem size from the string's capacity, in bytes 33 + } 34 + } 35 + 36 + fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 + let mut out = Vec::new(); 38 + let to_find = "eyy".as_bytes(); 39 + for i in 0..(raw.len() - 3) { 40 + if &raw[i..(i+3)] == to_find { 41 + out.push(Eyy(i, "eyy".to_string())); 42 + } 43 + } 44 + out 45 + } 46 + ``` 47 + 48 + The memory sizing stuff is a little sketch but probably at least approximately 49 + works. 50 + */ 51 + 52 + use serde::{Serialize, de::DeserializeOwned}; 53 + 54 + /// Output trait for record processing 55 + pub trait Processable: Clone + Serialize + DeserializeOwned { 56 + /// Any additional in-memory size taken by the processed type 57 + /// 58 + /// Do not include stack size (`std::mem::size_of`) 59 + fn get_size(&self) -> usize; 60 + } 61 + 62 + /// Processor that just returns the raw blocks 63 + #[inline] 64 + pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 + block 66 + } 67 + 68 + impl Processable for u8 { 69 + fn get_size(&self) -> usize { 70 + 0 71 + } 72 + } 73 + 74 + impl Processable for usize { 75 + fn get_size(&self) -> usize { 76 + 0 // no additional space taken, just its stack size (newtype is free) 77 + } 78 + } 79 + 80 + impl Processable for String { 81 + fn get_size(&self) -> usize { 82 + self.capacity() 83 + } 84 + } 85 + 86 + impl<Item: Sized + Processable> Processable for Vec<Item> { 87 + fn get_size(&self) -> usize { 88 + let slot_size = std::mem::size_of::<Item>(); 89 + let direct_size = slot_size * self.capacity(); 90 + let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 91 + direct_size + items_referenced_size 92 + } 93 + } 94 + 95 + impl<Item: Processable> Processable for Option<Item> { 96 + fn get_size(&self) -> usize { 97 + self.as_ref().map(|item| item.get_size()).unwrap_or(0) 98 + } 99 + } 100 + 101 + impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> { 102 + fn get_size(&self) -> usize { 103 + match self { 104 + Ok(item) => item.get_size(), 105 + Err(err) => err.get_size(), 106 + } 107 + } 108 + }
+197 -262
src/walk.rs
··· 1 1 //! Depth-first MST traversal 2 2 3 - use crate::disk::{DiskReader, StorageErrorBase}; 4 - use crate::drive::{MaybeProcessedBlock, Processable}; 3 + use crate::disk::SqliteReader; 4 + use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 5 use crate::mst::Node; 6 + use crate::process::Processable; 6 7 use ipld_core::cid::Cid; 8 + use sha2::{Digest, Sha256}; 7 9 use std::collections::HashMap; 8 10 use std::convert::Infallible; 9 11 10 12 /// Errors that can happen while walking 11 13 #[derive(Debug, thiserror::Error)] 12 - pub enum Trip { 13 - #[error("empty mst nodes are not allowed")] 14 - NodeEmpty, 14 + pub enum WalkError { 15 15 #[error("Failed to fingerprint commit block")] 16 16 BadCommitFingerprint, 17 17 #[error("Failed to decode commit block: {0}")] 18 18 BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 19 19 #[error("Action node error: {0}")] 20 - RkeyError(#[from] RkeyError), 21 - #[error("Encountered an rkey out of order while walking the MST")] 22 - RkeyOutOfOrder, 23 - } 24 - 25 - /// Errors that can happen while walking 26 - #[derive(Debug, thiserror::Error)] 27 - pub enum DiskTrip<E: StorageErrorBase> { 28 - #[error("tripped: {0}")] 29 - Trip(#[from] Trip), 20 + MstError(#[from] MstError), 30 21 #[error("storage error: {0}")] 31 - StorageError(#[from] E), 22 + StorageError(#[from] rusqlite::Error), 32 23 #[error("Decode error: {0}")] 33 - BincodeDecodeError(#[from] bincode::error::DecodeError), 24 + DecodeError(#[from] DecodeError), 34 25 } 35 26 36 27 /// Errors from invalid Rkeys 37 - #[derive(Debug, thiserror::Error)] 38 - pub enum RkeyError { 28 + #[derive(Debug, PartialEq, thiserror::Error)] 29 + pub enum MstError { 39 30 #[error("Failed to compute an rkey due to invalid prefix_len")] 40 31 EntryPrefixOutOfbounds, 41 32 #[error("RKey was not utf-8")] 42 33 EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 + #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 + EmptyNode, 36 + #[error("Found an entry with rkey at the wrong depth")] 37 + WrongDepth, 38 + #[error("Lost track of our depth (possible bug?)")] 39 + LostDepth, 40 + #[error("MST depth underflow: depth-0 node with child trees")] 41 + DepthUnderflow, 42 + #[error("Encountered an rkey out of order while walking the MST")] 43 + RkeyOutOfOrder, 43 44 } 44 45 45 46 /// Walker outputs ··· 50 51 /// Reached the end of the MST! yay! 51 52 Finish, 52 53 /// A record was found! 53 - Step { rkey: String, data: T }, 54 + Found { rkey: String, data: T }, 54 55 } 55 56 56 57 #[derive(Debug, Clone, PartialEq)] 57 58 enum Need { 58 - Node(Cid), 59 + Node { depth: Depth, cid: Cid }, 59 60 Record { rkey: String, cid: Cid }, 60 61 } 61 62 62 - fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> { 63 - let mut entries = Vec::with_capacity(node.entries.len()); 63 + #[derive(Debug, Clone, Copy, PartialEq)] 64 + enum Depth { 65 + Root, 66 + Depth(u32), 67 + } 68 + 69 + impl Depth { 70 + fn from_key(key: &[u8]) -> Self { 71 + let mut zeros = 0; 72 + for byte in Sha256::digest(key) { 73 + let leading = byte.leading_zeros(); 74 + zeros += leading; 75 + if leading < 8 { 76 + break; 77 + } 78 + } 79 + Self::Depth(zeros / 2) // truncating divide (rounds down) 80 + } 81 + fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 + match self { 83 + Self::Root => Ok(None), 84 + Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 + } 86 + } 87 + } 88 + 89 + fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 + // empty nodes are not allowed in the MST except in an empty MST 91 + if node.is_empty() { 92 + if parent_depth == Depth::Root { 93 + return Ok(()); // empty mst, nothing to push 94 + } else { 95 + return Err(MstError::EmptyNode); 96 + } 97 + } 64 98 99 + let mut entries = Vec::with_capacity(node.entries.len()); 65 100 let mut prefix = vec![]; 101 + let mut this_depth = parent_depth.next_expected()?; 102 + 66 103 for entry in &node.entries { 67 104 let mut rkey = vec![]; 68 105 let pre_checked = prefix 69 106 .get(..entry.prefix_len) 70 - .ok_or(RkeyError::EntryPrefixOutOfbounds)?; 107 + .ok_or(MstError::EntryPrefixOutOfbounds)?; 71 108 rkey.extend_from_slice(pre_checked); 72 109 rkey.extend_from_slice(&entry.keysuffix); 110 + 111 + let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 112 + return Err(MstError::WrongDepth); 113 + }; 114 + 115 + // this_depth is `none` if we are the deepest child (directly below root) 116 + // in that case we accept whatever highest depth is claimed 117 + let expected_depth = match this_depth { 118 + Some(d) => d, 119 + None => { 120 + this_depth = Some(key_depth); 121 + key_depth 122 + } 123 + }; 124 + 125 + // all keys we find should be this depth 126 + if key_depth != expected_depth { 127 + return Err(MstError::DepthUnderflow); 128 + } 129 + 73 130 prefix = rkey.clone(); 74 131 75 132 entries.push(Need::Record { ··· 77 134 cid: entry.value, 78 135 }); 79 136 if let Some(ref tree) = entry.tree { 80 - entries.push(Need::Node(*tree)); 137 + entries.push(Need::Node { 138 + depth: Depth::Depth(key_depth), 139 + cid: *tree, 140 + }); 81 141 } 82 142 } 83 143 84 144 entries.reverse(); 85 145 stack.append(&mut entries); 146 + 147 + let d = this_depth.ok_or(MstError::LostDepth)?; 86 148 87 149 if let Some(tree) = node.left { 88 - stack.push(Need::Node(tree)); 150 + stack.push(Need::Node { 151 + depth: Depth::Depth(d), 152 + cid: tree, 153 + }); 89 154 } 90 155 Ok(()) 91 156 } ··· 102 167 impl Walker { 103 168 pub fn new(tree_root_cid: Cid) -> Self { 104 169 Self { 105 - stack: vec![Need::Node(tree_root_cid)], 170 + stack: vec![Need::Node { 171 + depth: Depth::Root, 172 + cid: tree_root_cid, 173 + }], 106 174 prev: "".to_string(), 107 175 } 108 176 } ··· 112 180 &mut self, 113 181 blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 114 182 process: impl Fn(Vec<u8>) -> T, 115 - ) -> Result<Step<T>, Trip> { 183 + ) -> Result<Step<T>, WalkError> { 116 184 loop { 117 - let Some(mut need) = self.stack.last() else { 185 + let Some(need) = self.stack.last_mut() else { 118 186 log::trace!("tried to walk but we're actually done."); 119 187 return Ok(Step::Finish); 120 188 }; 121 189 122 - match &mut need { 123 - Need::Node(cid) => { 190 + match need { 191 + &mut Need::Node { depth, cid } => { 124 192 log::trace!("need node {cid:?}"); 125 - let Some(block) = blocks.remove(cid) else { 193 + let Some(block) = blocks.remove(&cid) else { 126 194 log::trace!("node not found, resting"); 127 - return Ok(Step::Missing(*cid)); 195 + return Ok(Step::Missing(cid)); 128 196 }; 129 197 130 198 let MaybeProcessedBlock::Raw(data) = block else { 131 - return Err(Trip::BadCommitFingerprint); 199 + return Err(WalkError::BadCommitFingerprint); 132 200 }; 133 - let node = 134 - serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?; 201 + let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 202 + .map_err(WalkError::BadCommit)?; 135 203 136 204 // found node, make sure we remember 137 205 self.stack.pop(); 138 206 139 207 // queue up work on the found node next 140 - push_from_node(&mut self.stack, &node)?; 208 + push_from_node(&mut self.stack, &node, depth)?; 141 209 } 142 210 Need::Record { rkey, cid } => { 143 211 log::trace!("need record {cid:?}"); 212 + // note that we cannot *remove* a record block, sadly, since 213 + // there can be multiple rkeys pointing to the same cid. 144 214 let Some(data) = blocks.get_mut(cid) else { 145 - log::trace!("record block not found, resting"); 146 215 return Ok(Step::Missing(*cid)); 147 216 }; 148 217 let rkey = rkey.clone(); ··· 154 223 // found node, make sure we remember 155 224 self.stack.pop(); 156 225 157 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 158 - 159 226 // rkeys *must* be in order or else the tree is invalid (or 160 227 // we have a bug) 161 228 if rkey <= self.prev { 162 - return Err(Trip::RkeyOutOfOrder); 229 + return Err(MstError::RkeyOutOfOrder)?; 163 230 } 164 231 self.prev = rkey.clone(); 165 232 166 - return Ok(Step::Step { rkey, data }); 233 + return Ok(Step::Found { rkey, data }); 167 234 } 168 235 } 169 236 } 170 237 } 171 238 172 239 /// blocking!!!!!! 173 - pub fn disk_step<T: Processable, R: DiskReader>( 240 + pub fn disk_step<T: Processable>( 174 241 &mut self, 175 - reader: &mut R, 242 + reader: &mut SqliteReader, 176 243 process: impl Fn(Vec<u8>) -> T, 177 - ) -> Result<Step<T>, DiskTrip<R::StorageError>> { 244 + ) -> Result<Step<T>, WalkError> { 178 245 loop { 179 - let Some(mut need) = self.stack.last() else { 246 + let Some(need) = self.stack.last_mut() else { 180 247 log::trace!("tried to walk but we're actually done."); 181 248 return Ok(Step::Finish); 182 249 }; 183 250 184 - match &mut need { 185 - Need::Node(cid) => { 251 + match need { 252 + &mut Need::Node { depth, cid } => { 186 253 let cid_bytes = cid.to_bytes(); 187 254 log::trace!("need node {cid:?}"); 188 255 let Some(block_bytes) = reader.get(cid_bytes)? else { 189 256 log::trace!("node not found, resting"); 190 - return Ok(Step::Missing(*cid)); 257 + return Ok(Step::Missing(cid)); 191 258 }; 192 259 193 260 let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 194 261 195 262 let MaybeProcessedBlock::Raw(data) = block else { 196 - return Err(Trip::BadCommitFingerprint.into()); 263 + return Err(WalkError::BadCommitFingerprint); 197 264 }; 198 - let node = 199 - serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?; 265 + let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 266 + .map_err(WalkError::BadCommit)?; 200 267 201 268 // found node, make sure we remember 202 269 self.stack.pop(); 203 270 204 271 // queue up work on the found node next 205 - push_from_node(&mut self.stack, &node).map_err(Trip::RkeyError)?; 272 + push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 206 273 } 207 274 Need::Record { rkey, cid } => { 208 275 log::trace!("need record {cid:?}"); ··· 226 293 // rkeys *must* be in order or else the tree is invalid (or 227 294 // we have a bug) 228 295 if rkey <= self.prev { 229 - return Err(DiskTrip::Trip(Trip::RkeyOutOfOrder)); 296 + return Err(MstError::RkeyOutOfOrder)?; 230 297 } 231 298 self.prev = rkey.clone(); 232 299 233 - return Ok(Step::Step { rkey, data }); 300 + return Ok(Step::Found { rkey, data }); 234 301 } 235 302 } 236 303 } ··· 240 307 #[cfg(test)] 241 308 mod test { 242 309 use super::*; 243 - // use crate::mst::Entry; 244 310 245 311 fn cid1() -> Cid { 246 312 "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 247 313 .parse() 248 314 .unwrap() 249 315 } 250 - // fn cid2() -> Cid { 251 - // "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU" 252 - // .parse() 253 - // .unwrap() 254 - // } 255 - // fn cid3() -> Cid { 256 - // "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi" 257 - // .parse() 258 - // .unwrap() 259 - // } 260 - // fn cid4() -> Cid { 261 - // "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR" 262 - // .parse() 263 - // .unwrap() 264 - // } 265 - // fn cid5() -> Cid { 266 - // "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D" 267 - // .parse() 268 - // .unwrap() 269 - // } 270 - // fn cid6() -> Cid { 271 - // "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm" 272 - // .parse() 273 - // .unwrap() 274 - // } 275 - // fn cid7() -> Cid { 276 - // "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze" 277 - // .parse() 278 - // .unwrap() 279 - // } 280 - // fn cid8() -> Cid { 281 - // "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi" 282 - // .parse() 283 - // .unwrap() 284 - // } 285 - // fn cid9() -> Cid { 286 - // "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq" 287 - // .parse() 288 - // .unwrap() 289 - // } 316 + 317 + #[test] 318 + fn test_depth_spec_0() { 319 + let d = Depth::from_key(b"2653ae71"); 320 + assert_eq!(d, Depth::Depth(0)) 321 + } 322 + 323 + #[test] 324 + fn test_depth_spec_1() { 325 + let d = Depth::from_key(b"blue"); 326 + assert_eq!(d, Depth::Depth(1)) 327 + } 328 + 329 + #[test] 330 + fn test_depth_spec_4() { 331 + let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 332 + assert_eq!(d, Depth::Depth(4)) 333 + } 334 + 335 + #[test] 336 + fn test_depth_spec_8() { 337 + let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 338 + assert_eq!(d, Depth::Depth(8)) 339 + } 340 + 341 + #[test] 342 + fn test_depth_ietf_draft_0() { 343 + let d = Depth::from_key(b"key1"); 344 + assert_eq!(d, Depth::Depth(0)) 345 + } 346 + 347 + #[test] 348 + fn test_depth_ietf_draft_1() { 349 + let d = Depth::from_key(b"key7"); 350 + assert_eq!(d, Depth::Depth(1)) 351 + } 352 + 353 + #[test] 354 + fn test_depth_ietf_draft_4() { 355 + let d = Depth::from_key(b"key515"); 356 + assert_eq!(d, Depth::Depth(4)) 357 + } 358 + 359 + #[test] 360 + fn test_depth_interop() { 361 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 362 + for (k, expected) in [ 363 + ("", 0), 364 + ("asdf", 0), 365 + ("blue", 1), 366 + ("2653ae71", 0), 367 + ("88bfafc7", 2), 368 + ("2a92d355", 4), 369 + ("884976f5", 6), 370 + ("app.bsky.feed.post/454397e440ec", 4), 371 + ("app.bsky.feed.post/9adeb165882c", 8), 372 + ] { 373 + let d = Depth::from_key(k.as_bytes()); 374 + assert_eq!(d, Depth::Depth(expected), "key: {}", k); 375 + } 376 + } 290 377 291 378 #[test] 292 - fn test_next_from_node_empty() { 293 - let node = Node { 379 + fn test_push_empty_fails() { 380 + let empty_node = Node { 294 381 left: None, 295 382 entries: vec![], 296 383 }; 297 384 let mut stack = vec![]; 298 - push_from_node(&mut stack, &node).unwrap(); 299 - assert_eq!(stack.last(), None); 385 + let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 386 + assert_eq!(err, Err(MstError::EmptyNode)); 300 387 } 301 388 302 389 #[test] 303 - fn test_needs_from_node_just_left() { 390 + fn test_push_one_node() { 304 391 let node = Node { 305 392 left: Some(cid1()), 306 393 entries: vec![], 307 394 }; 308 395 let mut stack = vec![]; 309 - push_from_node(&mut stack, &node).unwrap(); 310 - assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref()); 396 + push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 397 + assert_eq!( 398 + stack.last(), 399 + Some(Need::Node { 400 + depth: Depth::Depth(3), 401 + cid: cid1() 402 + }) 403 + .as_ref() 404 + ); 311 405 } 312 - 313 - // #[test] 314 - // fn test_needs_from_node_just_one_record() { 315 - // let node = Node { 316 - // left: None, 317 - // entries: vec![Entry { 318 - // keysuffix: "asdf".into(), 319 - // prefix_len: 0, 320 - // value: cid1(), 321 - // tree: None, 322 - // }], 323 - // }; 324 - // assert_eq!( 325 - // needs_from_node(node).unwrap(), 326 - // vec![Need::Record { 327 - // rkey: "asdf".into(), 328 - // cid: cid1(), 329 - // },] 330 - // ); 331 - // } 332 - 333 - // #[test] 334 - // fn test_needs_from_node_two_records() { 335 - // let node = Node { 336 - // left: None, 337 - // entries: vec![ 338 - // Entry { 339 - // keysuffix: "asdf".into(), 340 - // prefix_len: 0, 341 - // value: cid1(), 342 - // tree: None, 343 - // }, 344 - // Entry { 345 - // keysuffix: "gh".into(), 346 - // prefix_len: 2, 347 - // value: cid2(), 348 - // tree: None, 349 - // }, 350 - // ], 351 - // }; 352 - // assert_eq!( 353 - // needs_from_node(node).unwrap(), 354 - // vec![ 355 - // Need::Record { 356 - // rkey: "asdf".into(), 357 - // cid: cid1(), 358 - // }, 359 - // Need::Record { 360 - // rkey: "asgh".into(), 361 - // cid: cid2(), 362 - // }, 363 - // ] 364 - // ); 365 - // } 366 - 367 - // #[test] 368 - // fn test_needs_from_node_with_both() { 369 - // let node = Node { 370 - // left: None, 371 - // entries: vec![Entry { 372 - // keysuffix: "asdf".into(), 373 - // prefix_len: 0, 374 - // value: cid1(), 375 - // tree: Some(cid2()), 376 - // }], 377 - // }; 378 - // assert_eq!( 379 - // needs_from_node(node).unwrap(), 380 - // vec![ 381 - // Need::Record { 382 - // rkey: "asdf".into(), 383 - // cid: cid1(), 384 - // }, 385 - // Need::Node(cid2()), 386 - // ] 387 - // ); 388 - // } 389 - 390 - // #[test] 391 - // fn test_needs_from_node_left_and_record() { 392 - // let node = Node { 393 - // left: Some(cid1()), 394 - // entries: vec![Entry { 395 - // keysuffix: "asdf".into(), 396 - // prefix_len: 0, 397 - // value: cid2(), 398 - // tree: None, 399 - // }], 400 - // }; 401 - // assert_eq!( 402 - // needs_from_node(node).unwrap(), 403 - // vec![ 404 - // Need::Node(cid1()), 405 - // Need::Record { 406 - // rkey: "asdf".into(), 407 - // cid: cid2(), 408 - // }, 409 - // ] 410 - // ); 411 - // } 412 - 413 - // #[test] 414 - // fn test_needs_from_full_node() { 415 - // let node = Node { 416 - // left: Some(cid1()), 417 - // entries: vec![ 418 - // Entry { 419 - // keysuffix: "asdf".into(), 420 - // prefix_len: 0, 421 - // value: cid2(), 422 - // tree: Some(cid3()), 423 - // }, 424 - // Entry { 425 - // keysuffix: "ghi".into(), 426 - // prefix_len: 1, 427 - // value: cid4(), 428 - // tree: Some(cid5()), 429 - // }, 430 - // Entry { 431 - // keysuffix: "jkl".into(), 432 - // prefix_len: 2, 433 - // value: cid6(), 434 - // tree: Some(cid7()), 435 - // }, 436 - // Entry { 437 - // keysuffix: "mno".into(), 438 - // prefix_len: 4, 439 - // value: cid8(), 440 - // tree: Some(cid9()), 441 - // }, 442 - // ], 443 - // }; 444 - // assert_eq!( 445 - // needs_from_node(node).unwrap(), 446 - // vec![ 447 - // Need::Node(cid1()), 448 - // Need::Record { 449 - // rkey: "asdf".into(), 450 - // cid: cid2(), 451 - // }, 452 - // Need::Node(cid3()), 453 - // Need::Record { 454 - // rkey: "aghi".into(), 455 - // cid: cid4(), 456 - // }, 457 - // Need::Node(cid5()), 458 - // Need::Record { 459 - // rkey: "agjkl".into(), 460 - // cid: cid6(), 461 - // }, 462 - // Need::Node(cid7()), 463 - // Need::Record { 464 - // rkey: "agjkmno".into(), 465 - // cid: cid8(), 466 - // }, 467 - // Need::Node(cid9()), 468 - // ] 469 - // ); 470 - // } 471 406 }
+34 -31
tests/non-huge-cars.rs
··· 1 1 extern crate repo_stream; 2 - use futures::TryStreamExt; 3 - use iroh_car::CarReader; 4 - use std::convert::Infallible; 2 + use repo_stream::Driver; 5 3 4 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 5 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 6 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 7 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 9 8 10 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 11 - let reader = CarReader::new(bytes).await.unwrap(); 12 - 13 - let root = reader 14 - .header() 15 - .roots() 16 - .first() 17 - .ok_or("missing root") 9 + async fn test_car( 10 + bytes: &[u8], 11 + expected_records: usize, 12 + expected_sum: usize, 13 + expect_profile: bool, 14 + ) { 15 + let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 16 + .await 18 17 .unwrap() 19 - .clone(); 20 - 21 - let stream = std::pin::pin!(reader.stream()); 22 - 23 - let (_commit, v) = 24 - repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len())) 25 - .await 26 - .unwrap(); 27 - let mut record_stream = std::pin::pin!(v.stream()); 18 + { 19 + Driver::Memory(_commit, mem_driver) => mem_driver, 20 + Driver::Disk(_) => panic!("too big"), 21 + }; 28 22 29 23 let mut records = 0; 30 24 let mut sum = 0; 31 25 let mut found_bsky_profile = false; 32 26 let mut prev_rkey = "".to_string(); 33 - while let Some((rkey, size)) = record_stream.try_next().await.unwrap() { 34 - records += 1; 35 - sum += size; 36 - if rkey == "app.bsky.actor.profile/self" { 37 - found_bsky_profile = true; 27 + 28 + while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 29 + for (rkey, size) in pairs { 30 + records += 1; 31 + sum += size; 32 + if rkey == "app.bsky.actor.profile/self" { 33 + found_bsky_profile = true; 34 + } 35 + assert!(rkey > prev_rkey, "rkeys are streamed in order"); 36 + prev_rkey = rkey; 38 37 } 39 - assert!(rkey > prev_rkey, "rkeys are streamed in order"); 40 - prev_rkey = rkey; 41 38 } 39 + 42 40 assert_eq!(records, expected_records); 43 41 assert_eq!(sum, expected_sum); 44 - assert!(found_bsky_profile); 42 + assert_eq!(found_bsky_profile, expect_profile); 43 + } 44 + 45 + #[tokio::test] 46 + async fn test_empty_car() { 47 + test_car(EMPTY_CAR, 0, 0, false).await 45 48 } 46 49 47 50 #[tokio::test] 48 51 async fn test_tiny_car() { 49 - test_car(TINY_CAR, 8, 2071).await 52 + test_car(TINY_CAR, 8, 2071, true).await 50 53 } 51 54 52 55 #[tokio::test] 53 56 async fn test_little_car() { 54 - test_car(LITTLE_CAR, 278, 246960).await 57 + test_car(LITTLE_CAR, 278, 246960, true).await 55 58 } 56 59 57 60 #[tokio::test] 58 61 async fn test_midsize_car() { 59 - test_car(MIDSIZE_CAR, 11585, 3741393).await 62 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 60 63 }