Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

+1192 -920
+229 -88
Cargo.lock
··· 27 ] 28 29 [[package]] 30 name = "anes" 31 version = "0.1.6" 32 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 126 ] 127 128 [[package]] 129 - name = "bincode" 130 - version = "2.0.1" 131 - source = "registry+https://github.com/rust-lang/crates.io-index" 132 - checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740" 133 - dependencies = [ 134 - "bincode_derive", 135 - "serde", 136 - "unty", 137 - ] 138 - 139 - [[package]] 140 - name = "bincode_derive" 141 - version = "2.0.1" 142 - source = "registry+https://github.com/rust-lang/crates.io-index" 143 - checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09" 144 - dependencies = [ 145 - "virtue", 146 - ] 147 - 148 - [[package]] 149 name = "bitflags" 150 version = "2.9.4" 151 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 169 [[package]] 170 name = "bytes" 171 - version = "1.10.1" 172 source = "registry+https://github.com/rust-lang/crates.io-index" 173 - checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 174 175 [[package]] 176 name = "cast" ··· 188 ] 189 190 [[package]] 191 name = "cfg-if" 192 version = "1.0.3" 193 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 281 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 283 [[package]] 284 name = "const-str" 285 version = "0.4.3" 286 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 ] 359 360 [[package]] 361 name = "crossbeam-utils" 362 version = "0.8.21" 363 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 380 ] 381 382 [[package]] 383 name = "data-encoding" 384 version = "2.9.0" 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 422 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 424 [[package]] 425 name = "env_filter" 426 version = "0.1.3" 427 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 443 "jiff", 444 "log", 445 ] 446 447 [[package]] 448 name = "errno" ··· 455 ] 456 457 [[package]] 458 - name = "fallible-iterator" 459 - version = "0.3.0" 460 source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 462 463 [[package]] 464 - name = "fallible-streaming-iterator" 465 - version = "0.1.9" 466 source = "registry+https://github.com/rust-lang/crates.io-index" 467 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 468 469 [[package]] 470 - name = "fastrand" 471 - version = "2.3.0" 472 source = "registry+https://github.com/rust-lang/crates.io-index" 473 - checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 474 475 [[package]] 476 name = "foldhash" 477 - version = "0.1.5" 478 source = "registry+https://github.com/rust-lang/crates.io-index" 479 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 480 481 [[package]] 482 name = "futures" ··· 608 609 [[package]] 610 name = "hashbrown" 611 - version = "0.15.5" 612 source = "registry+https://github.com/rust-lang/crates.io-index" 613 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 - dependencies = [ 615 - "foldhash", 616 - ] 617 618 [[package]] 619 - name = "hashlink" 620 - version = "0.10.0" 621 source = "registry+https://github.com/rust-lang/crates.io-index" 622 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 dependencies = [ 624 - "hashbrown", 625 ] 626 627 [[package]] ··· 631 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 633 [[package]] 634 name = "io-uring" 635 version = "0.7.10" 636 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 730 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 731 732 [[package]] 733 - name = "libsqlite3-sys" 734 - version = "0.35.0" 735 source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 737 dependencies = [ 738 - "pkg-config", 739 - "vcpkg", 740 ] 741 742 [[package]] ··· 761 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 763 [[package]] 764 name = "match-lookup" 765 version = "0.1.1" 766 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 776 version = "2.7.6" 777 source = "registry+https://github.com/rust-lang/crates.io-index" 778 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 779 780 [[package]] 781 name = "miniz_oxide" ··· 890 version = "0.1.0" 891 source = "registry+https://github.com/rust-lang/crates.io-index" 892 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 893 - 894 - [[package]] 895 - name = "pkg-config" 896 - version = "0.3.32" 897 - source = "registry+https://github.com/rust-lang/crates.io-index" 898 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 900 [[package]] 901 name = "plotters" ··· 950 ] 951 952 [[package]] 953 name = "quote" 954 version = "1.0.41" 955 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1024 1025 [[package]] 1026 name = "repo-stream" 1027 - version = "0.2.0" 1028 dependencies = [ 1029 - "bincode", 1030 "clap", 1031 "criterion", 1032 "env_logger", 1033 - "futures", 1034 - "futures-core", 1035 - "ipld-core", 1036 "iroh-car", 1037 "log", 1038 "multibase", 1039 - "rusqlite", 1040 "serde", 1041 "serde_bytes", 1042 "serde_ipld_dagcbor", ··· 1047 ] 1048 1049 [[package]] 1050 - name = "rusqlite" 1051 - version = "0.37.0" 1052 - source = "registry+https://github.com/rust-lang/crates.io-index" 1053 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 - dependencies = [ 1055 - "bitflags", 1056 - "fallible-iterator", 1057 - "fallible-streaming-iterator", 1058 - "hashlink", 1059 - "libsqlite3-sys", 1060 - "smallvec", 1061 - ] 1062 - 1063 - [[package]] 1064 name = "rustc-demangle" 1065 version = "0.1.26" 1066 source = "registry+https://github.com/rust-lang/crates.io-index" 1067 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1068 1069 [[package]] 1070 name = "rustix" 1071 version = "1.1.2" 1072 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1105 version = "1.2.0" 1106 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1108 1109 [[package]] 1110 name = "serde" ··· 1169 "ryu", 1170 "serde", 1171 "serde_core", 1172 ] 1173 1174 [[package]] ··· 1183 ] 1184 1185 [[package]] 1186 name = "signal-hook-registry" 1187 version = "1.4.6" 1188 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1211 dependencies = [ 1212 "libc", 1213 "windows-sys 0.59.0", 1214 ] 1215 1216 [[package]] ··· 1360 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1361 1362 [[package]] 1363 - name = "unty" 1364 - version = "0.0.4" 1365 - source = "registry+https://github.com/rust-lang/crates.io-index" 1366 - checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae" 1367 - 1368 - [[package]] 1369 name = "utf8parse" 1370 version = "0.2.2" 1371 source = "registry+https://github.com/rust-lang/crates.io-index" 1372 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 1374 [[package]] 1375 - name = "vcpkg" 1376 - version = "0.2.15" 1377 source = "registry+https://github.com/rust-lang/crates.io-index" 1378 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 1380 [[package]] 1381 name = "version_check" 1382 version = "0.9.5" 1383 source = "registry+https://github.com/rust-lang/crates.io-index" 1384 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1385 - 1386 - [[package]] 1387 - name = "virtue" 1388 - version = "0.0.18" 1389 - source = "registry+https://github.com/rust-lang/crates.io-index" 1390 - checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1" 1391 1392 [[package]] 1393 name = "walkdir" ··· 1659 version = "0.46.0" 1660 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1662 1663 [[package]] 1664 name = "zerocopy"
··· 27 ] 28 29 [[package]] 30 + name = "allocator-api2" 31 + version = "0.2.21" 32 + source = "registry+https://github.com/rust-lang/crates.io-index" 33 + checksum = "683d7910e743518b0e34f1186f92494becacb047c7b6bf616c96772180fef923" 34 + 35 + [[package]] 36 name = "anes" 37 version = "0.1.6" 38 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 132 ] 133 134 [[package]] 135 name = "bitflags" 136 version = "2.9.4" 137 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 153 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 154 155 [[package]] 156 + name = "byteorder-lite" 157 + version = "0.1.0" 158 + source = "registry+https://github.com/rust-lang/crates.io-index" 159 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 160 + 161 + [[package]] 162 name = "bytes" 163 + version = "1.11.0" 164 source = "registry+https://github.com/rust-lang/crates.io-index" 165 + checksum = "b35204fbdc0b3f4446b89fc1ac2cf84a8a68971995d0bf2e925ec7cd960f9cb3" 166 + 167 + [[package]] 168 + name = "byteview" 169 + version = "0.10.0" 170 + source = "registry+https://github.com/rust-lang/crates.io-index" 171 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 172 173 [[package]] 174 name = "cast" ··· 186 ] 187 188 [[package]] 189 + name = "cc" 190 + version = "1.2.52" 191 + source = "registry+https://github.com/rust-lang/crates.io-index" 192 + checksum = "cd4932aefd12402b36c60956a4fe0035421f544799057659ff86f923657aada3" 193 + dependencies = [ 194 + "find-msvc-tools", 195 + "shlex", 196 + ] 197 + 198 + [[package]] 199 name = "cfg-if" 200 version = "1.0.3" 201 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 289 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 290 291 [[package]] 292 + name = "compare" 293 + version = "0.0.6" 294 + source = "registry+https://github.com/rust-lang/crates.io-index" 295 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 296 + 297 + [[package]] 298 name = "const-str" 299 version = "0.4.3" 300 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 372 ] 373 374 [[package]] 375 + name = "crossbeam-skiplist" 376 + version = "0.1.3" 377 + source = "registry+https://github.com/rust-lang/crates.io-index" 378 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 379 + dependencies = [ 380 + "crossbeam-epoch", 381 + "crossbeam-utils", 382 + ] 383 + 384 + [[package]] 385 name = "crossbeam-utils" 386 version = "0.8.21" 387 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 404 ] 405 406 [[package]] 407 + name = "dashmap" 408 + version = "6.1.0" 409 + source = "registry+https://github.com/rust-lang/crates.io-index" 410 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 411 + dependencies = [ 412 + "cfg-if", 413 + "crossbeam-utils", 414 + "hashbrown 0.14.5", 415 + "lock_api", 416 + "once_cell", 417 + "parking_lot_core", 418 + ] 419 + 420 + [[package]] 421 name = "data-encoding" 422 version = "2.9.0" 423 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 460 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 461 462 [[package]] 463 + name = "enum_dispatch" 464 + version = "0.3.13" 465 + source = "registry+https://github.com/rust-lang/crates.io-index" 466 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 467 + dependencies = [ 468 + "once_cell", 469 + "proc-macro2", 470 + "quote", 471 + "syn 2.0.106", 472 + ] 473 + 474 + [[package]] 475 name = "env_filter" 476 version = "0.1.3" 477 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 493 "jiff", 494 "log", 495 ] 496 + 497 + [[package]] 498 + name = "equivalent" 499 + version = "1.0.2" 500 + source = "registry+https://github.com/rust-lang/crates.io-index" 501 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 502 503 [[package]] 504 name = "errno" ··· 511 ] 512 513 [[package]] 514 + name = "fastrand" 515 + version = "2.3.0" 516 source = "registry+https://github.com/rust-lang/crates.io-index" 517 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 518 519 [[package]] 520 + name = "find-msvc-tools" 521 + version = "0.1.7" 522 source = "registry+https://github.com/rust-lang/crates.io-index" 523 + checksum = "f449e6c6c08c865631d4890cfacf252b3d396c9bcc83adb6623cdb02a8336c41" 524 + 525 + [[package]] 526 + name = "fjall" 527 + version = "3.0.1" 528 + source = "registry+https://github.com/rust-lang/crates.io-index" 529 + checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093" 530 + dependencies = [ 531 + "byteorder-lite", 532 + "byteview", 533 + "dashmap", 534 + "flume", 535 + "log", 536 + "lsm-tree", 537 + "tempfile", 538 + "xxhash-rust", 539 + ] 540 541 [[package]] 542 + name = "flume" 543 + version = "0.12.0" 544 source = "registry+https://github.com/rust-lang/crates.io-index" 545 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 546 + dependencies = [ 547 + "spin", 548 + ] 549 550 [[package]] 551 name = "foldhash" 552 + version = "0.2.0" 553 source = "registry+https://github.com/rust-lang/crates.io-index" 554 + checksum = "77ce24cb58228fbb8aa041425bb1050850ac19177686ea6e0f41a70416f56fdb" 555 556 [[package]] 557 name = "futures" ··· 683 684 [[package]] 685 name = "hashbrown" 686 + version = "0.14.5" 687 source = "registry+https://github.com/rust-lang/crates.io-index" 688 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 689 690 [[package]] 691 + name = "hashbrown" 692 + version = "0.16.1" 693 source = "registry+https://github.com/rust-lang/crates.io-index" 694 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 695 dependencies = [ 696 + "allocator-api2", 697 + "equivalent", 698 + "foldhash", 699 ] 700 701 [[package]] ··· 705 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 706 707 [[package]] 708 + name = "hmac-sha256" 709 + version = "1.1.12" 710 + source = "registry+https://github.com/rust-lang/crates.io-index" 711 + checksum = "ad6880c8d4a9ebf39c6e8b77007ce223f646a4d21ce29d99f70cb16420545425" 712 + 713 + [[package]] 714 + name = "interval-heap" 715 + version = "0.0.5" 716 + source = "registry+https://github.com/rust-lang/crates.io-index" 717 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 718 + dependencies = [ 719 + "compare", 720 + ] 721 + 722 + [[package]] 723 name = "io-uring" 724 version = "0.7.10" 725 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 819 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 820 821 [[package]] 822 + name = "libmimalloc-sys" 823 + version = "0.1.44" 824 source = "registry+https://github.com/rust-lang/crates.io-index" 825 + checksum = "667f4fec20f29dfc6bc7357c582d91796c169ad7e2fce709468aefeb2c099870" 826 dependencies = [ 827 + "cc", 828 + "libc", 829 ] 830 831 [[package]] ··· 850 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 851 852 [[package]] 853 + name = "lsm-tree" 854 + version = "3.0.1" 855 + source = "registry+https://github.com/rust-lang/crates.io-index" 856 + checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 857 + dependencies = [ 858 + "byteorder-lite", 859 + "byteview", 860 + "crossbeam-skiplist", 861 + "enum_dispatch", 862 + "interval-heap", 863 + "log", 864 + "quick_cache", 865 + "rustc-hash", 866 + "self_cell", 867 + "sfa", 868 + "tempfile", 869 + "varint-rs", 870 + "xxhash-rust", 871 + ] 872 + 873 + [[package]] 874 name = "match-lookup" 875 version = "0.1.1" 876 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 886 version = "2.7.6" 887 source = "registry+https://github.com/rust-lang/crates.io-index" 888 checksum = "f52b00d39961fc5b2736ea853c9cc86238e165017a493d1d5c8eac6bdc4cc273" 889 + 890 + [[package]] 891 + name = "mimalloc" 892 + version = "0.1.48" 893 + source = "registry+https://github.com/rust-lang/crates.io-index" 894 + checksum = "e1ee66a4b64c74f4ef288bcbb9192ad9c3feaad75193129ac8509af543894fd8" 895 + dependencies = [ 896 + "libmimalloc-sys", 897 + ] 898 899 [[package]] 900 name = "miniz_oxide" ··· 1009 version = "0.1.0" 1010 source = "registry+https://github.com/rust-lang/crates.io-index" 1011 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 1012 1013 [[package]] 1014 name = "plotters" ··· 1063 ] 1064 1065 [[package]] 1066 + name = "quick_cache" 1067 + version = "0.6.18" 1068 + source = "registry+https://github.com/rust-lang/crates.io-index" 1069 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1070 + dependencies = [ 1071 + "equivalent", 1072 + "hashbrown 0.16.1", 1073 + ] 1074 + 1075 + [[package]] 1076 name = "quote" 1077 version = "1.0.41" 1078 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1147 1148 [[package]] 1149 name = "repo-stream" 1150 + version = "0.4.0" 1151 dependencies = [ 1152 + "cid", 1153 "clap", 1154 "criterion", 1155 "env_logger", 1156 + "fjall", 1157 + "hashbrown 0.16.1", 1158 + "hmac-sha256", 1159 "iroh-car", 1160 "log", 1161 + "mimalloc", 1162 "multibase", 1163 "serde", 1164 "serde_bytes", 1165 "serde_ipld_dagcbor", ··· 1170 ] 1171 1172 [[package]] 1173 name = "rustc-demangle" 1174 version = "0.1.26" 1175 source = "registry+https://github.com/rust-lang/crates.io-index" 1176 checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1177 1178 [[package]] 1179 + name = "rustc-hash" 1180 + version = "2.1.1" 1181 + source = "registry+https://github.com/rust-lang/crates.io-index" 1182 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1183 + 1184 + [[package]] 1185 name = "rustix" 1186 version = "1.1.2" 1187 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1220 version = "1.2.0" 1221 source = "registry+https://github.com/rust-lang/crates.io-index" 1222 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1223 + 1224 + [[package]] 1225 + name = "self_cell" 1226 + version = "1.2.2" 1227 + source = "registry+https://github.com/rust-lang/crates.io-index" 1228 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1229 1230 [[package]] 1231 name = "serde" ··· 1290 "ryu", 1291 "serde", 1292 "serde_core", 1293 + ] 1294 + 1295 + [[package]] 1296 + name = "sfa" 1297 + version = "1.0.0" 1298 + source = "registry+https://github.com/rust-lang/crates.io-index" 1299 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1300 + dependencies = [ 1301 + "byteorder-lite", 1302 + "log", 1303 + "xxhash-rust", 1304 ] 1305 1306 [[package]] ··· 1315 ] 1316 1317 [[package]] 1318 + name = "shlex" 1319 + version = "1.3.0" 1320 + source = "registry+https://github.com/rust-lang/crates.io-index" 1321 + checksum = "0fda2ff0d084019ba4d7c6f371c95d8fd75ce3524c3cb8fb653a3023f6323e64" 1322 + 1323 + [[package]] 1324 name = "signal-hook-registry" 1325 version = "1.4.6" 1326 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1349 dependencies = [ 1350 "libc", 1351 "windows-sys 0.59.0", 1352 + ] 1353 + 1354 + [[package]] 1355 + name = "spin" 1356 + version = "0.9.8" 1357 + source = "registry+https://github.com/rust-lang/crates.io-index" 1358 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1359 + dependencies = [ 1360 + "lock_api", 1361 ] 1362 1363 [[package]] ··· 1507 checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06" 1508 1509 [[package]] 1510 name = "utf8parse" 1511 version = "0.2.2" 1512 source = "registry+https://github.com/rust-lang/crates.io-index" 1513 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1514 1515 [[package]] 1516 + name = "varint-rs" 1517 + version = "2.2.0" 1518 source = "registry+https://github.com/rust-lang/crates.io-index" 1519 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1520 1521 [[package]] 1522 name = "version_check" 1523 version = "0.9.5" 1524 source = "registry+https://github.com/rust-lang/crates.io-index" 1525 checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a" 1526 1527 [[package]] 1528 name = "walkdir" ··· 1794 version = "0.46.0" 1795 source = "registry+https://github.com/rust-lang/crates.io-index" 1796 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1797 + 1798 + [[package]] 1799 + name = "xxhash-rust" 1800 + version = "0.8.15" 1801 + source = "registry+https://github.com/rust-lang/crates.io-index" 1802 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1803 1804 [[package]] 1805 name = "zerocopy"
+12 -9
Cargo.toml
··· 1 [package] 2 name = "repo-stream" 3 - version = "0.2.0" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 - description = "A robust CAR file -> MST walker for atproto" 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies] 10 - bincode = { version = "2.0.1", features = ["serde"] } 11 - futures = "0.3.31" 12 - futures-core = "0.3.31" 13 - ipld-core = { version = "0.4.2", features = ["serde"] } 14 iroh-car = "0.5.1" 15 log = "0.4.28" 16 - multibase = "0.9.2" 17 - rusqlite = "0.37.0" 18 serde = { version = "1.0.228", features = ["derive"] } 19 serde_bytes = "0.11.19" 20 serde_ipld_dagcbor = "0.6.4" 21 - sha2 = "0.10.9" 22 thiserror = "2.0.17" 23 tokio = { version = "1.47.1", features = ["rt", "sync"] } 24 ··· 29 multibase = "0.9.2" 30 tempfile = "3.23.0" 31 tokio = { version = "1.47.1", features = ["full"] } 32 33 [profile.profiling] 34 inherits = "release" ··· 44 [[bench]] 45 name = "huge-car" 46 harness = false
··· 1 [package] 2 name = "repo-stream" 3 + version = "0.4.0" 4 edition = "2024" 5 license = "MIT OR Apache-2.0" 6 + description = "Fast and robust atproto CAR file processing" 7 repository = "https://tangled.org/@microcosm.blue/repo-stream" 8 9 [dependencies] 10 + fjall = { version = "3.0.1", default-features = false } 11 + hashbrown = "0.16.1" 12 + cid = { version = "0.11.1", features = ["serde"] } 13 iroh-car = "0.5.1" 14 log = "0.4.28" 15 serde = { version = "1.0.228", features = ["derive"] } 16 serde_bytes = "0.11.19" 17 serde_ipld_dagcbor = "0.6.4" 18 + sha2 = "0.10.9" # note: hmac-sha256 is simpler, smaller, benches ~15ns slower 19 thiserror = "2.0.17" 20 tokio = { version = "1.47.1", features = ["rt", "sync"] } 21 ··· 26 multibase = "0.9.2" 27 tempfile = "3.23.0" 28 tokio = { version = "1.47.1", features = ["full"] } 29 + mimalloc = "0.1.48" 30 + hmac-sha256 = "1.1.12" 31 32 [profile.profiling] 33 inherits = "release" ··· 43 [[bench]] 44 name = "huge-car" 45 harness = false 46 + 47 + # [[bench]] 48 + # name = "leading" 49 + # harness = false
+11 -4
benches/huge-car.rs
··· 4 5 use criterion::{Criterion, criterion_group, criterion_main}; 6 7 pub fn criterion_benchmark(c: &mut Criterion) { 8 let rt = tokio::runtime::Builder::new_multi_thread() 9 .enable_all() ··· 18 }); 19 } 20 21 async fn drive_car(filename: impl AsRef<Path>) -> usize { 22 let reader = tokio::fs::File::open(filename).await.unwrap(); 23 let reader = tokio::io::BufReader::new(reader); 24 25 - let mut driver = match Driver::load_car(reader, |block| block.len(), 1024) 26 - .await 27 - .unwrap() 28 - { 29 Driver::Memory(_, mem_driver) => mem_driver, 30 Driver::Disk(_) => panic!("not doing disk for benchmark"), 31 };
··· 4 5 use criterion::{Criterion, criterion_group, criterion_main}; 6 7 + // use mimalloc::MiMalloc; 8 + // #[global_allocator] 9 + // static GLOBAL: MiMalloc = MiMalloc; 10 + 11 pub fn criterion_benchmark(c: &mut Criterion) { 12 let rt = tokio::runtime::Builder::new_multi_thread() 13 .enable_all() ··· 22 }); 23 } 24 25 + #[inline(always)] 26 + fn ser(block: Vec<u8>) -> Vec<u8> { 27 + let s = block.len(); 28 + usize::to_ne_bytes(s).to_vec() 29 + } 30 + 31 async fn drive_car(filename: impl AsRef<Path>) -> usize { 32 let reader = tokio::fs::File::open(filename).await.unwrap(); 33 let reader = tokio::io::BufReader::new(reader); 34 35 + let mut driver = match Driver::load_car(reader, ser, 1024).await.unwrap() { 36 Driver::Memory(_, mem_driver) => mem_driver, 37 Driver::Disk(_) => panic!("not doing disk for benchmark"), 38 };
+66
benches/leading.rs
···
··· 1 + use criterion::{BatchSize, Criterion, criterion_group, criterion_main}; 2 + use hmac_sha256::Hash; 3 + use sha2::{Digest, Sha256}; 4 + 5 + pub fn compute(bytes: [u8; 32]) -> u32 { 6 + let mut zeros = 0; 7 + for byte in bytes { 8 + if byte == 0 { 9 + zeros += 8 10 + } else { 11 + zeros += byte.leading_zeros(); 12 + break; 13 + } 14 + } 15 + zeros / 2 16 + } 17 + 18 + pub fn compute2(bytes: [u8; 32]) -> u32 { 19 + u128::from_be_bytes(bytes.split_at(16).0.try_into().unwrap()).leading_zeros() / 2 20 + } 21 + 22 + fn from_key_old(key: &[u8]) -> u32 { 23 + compute2(Sha256::digest(key).into()) 24 + } 25 + 26 + fn from_key_new(key: &[u8]) -> u32 { 27 + compute2(Hash::hash(key).into()) 28 + } 29 + 30 + pub fn criterion_benchmark(c: &mut Criterion) { 31 + for (name, case) in [ 32 + ("no zeros", [0xFF; 32]), 33 + ("two zeros", [0x3F; 32]), 34 + ( 35 + "some zeros", 36 + [ 37 + 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 38 + 1, 1, 1, 1, 39 + ], 40 + ), 41 + ( 42 + "many zeros", 43 + [ 44 + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 1, 45 + 1, 1, 1, 1, 46 + ], 47 + ), 48 + ] { 49 + let mut g = c.benchmark_group(name); 50 + g.bench_function("old", |b| { 51 + b.iter_batched(|| case.clone(), |c| compute(c), BatchSize::SmallInput) 52 + }); 53 + g.bench_function("new", |b| { 54 + b.iter_batched(|| case.clone(), |c| compute2(c), BatchSize::SmallInput) 55 + }); 56 + } 57 + 58 + for case in ["a", "aa", "aaa", "aaaa"] { 59 + let mut g = c.benchmark_group(case); 60 + g.bench_function("old", |b| b.iter(|| from_key_old(case.as_bytes()))); 61 + g.bench_function("new", |b| b.iter(|| from_key_new(case.as_bytes()))); 62 + } 63 + } 64 + 65 + criterion_group!(benches, criterion_benchmark); 66 + criterion_main!(benches);
+15 -4
benches/non-huge-cars.rs
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 13 .build() 14 .expect("Creating runtime failed"); 15 16 c.bench_function("tiny-car", |b| { 17 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 18 }); ··· 24 }); 25 } 26 27 async fn drive_car(bytes: &[u8]) -> usize { 28 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 32) 29 - .await 30 - .unwrap() 31 - { 32 Driver::Memory(_, mem_driver) => mem_driver, 33 Driver::Disk(_) => panic!("not benching big cars here"), 34 };
··· 3 4 use criterion::{Criterion, criterion_group, criterion_main}; 5 6 + // use mimalloc::MiMalloc; 7 + // #[global_allocator] 8 + // static GLOBAL: MiMalloc = MiMalloc; 9 + 10 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 11 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 12 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 13 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); ··· 18 .build() 19 .expect("Creating runtime failed"); 20 21 + c.bench_function("empty-car", |b| { 22 + b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await) 23 + }); 24 c.bench_function("tiny-car", |b| { 25 b.to_async(&rt).iter(async || drive_car(TINY_CAR).await) 26 }); ··· 32 }); 33 } 34 35 + #[inline(always)] 36 + fn ser(block: Vec<u8>) -> Vec<u8> { 37 + let s = block.len(); 38 + usize::to_ne_bytes(s).to_vec() 39 + } 40 + 41 async fn drive_car(bytes: &[u8]) -> usize { 42 + let mut driver = match Driver::load_car(bytes, ser, 32).await.unwrap() { 43 Driver::Memory(_, mem_driver) => mem_driver, 44 Driver::Disk(_) => panic!("not benching big cars here"), 45 };
car-samples/empty.car

This is a binary file and will not be displayed.

+25
changelog.md
···
··· 1 + # v0.4.0 2 + 3 + _2026-01-15_ 4 + 5 + - use `Output { rkey, cid, data }` instead of the `(rkey, data)` tuple so that the `Cid` is exposed. this is to make tap-like diffing possible. 6 + 7 + 8 + # v0.3.1 9 + 10 + _2026-01-15_ 11 + 12 + - bring back the disk driver's `reset` function for disk storage reuse 13 + 14 + 15 + # v0.3.0 16 + 17 + _2026-01-15_ 18 + 19 + - drop sqlite, pick up fjall v3 for some speeeeeeed (and code simplification and easier build requirements and) 20 + - no more `Processable` trait, process functions are just `Vec<u8> -> Vec<u8>` now (bring your own ser/de). there's a potential small cost here where processors need to now actually go through serialization even for in-memory car walking, but i think zero-copy approaches (eg. rkyv) are low-cost enough 21 + - custom deserialize for MST nodes that does as much depth calculation and rkey validation as - possible in-line. (not clear if it actually made anything faster) 22 + - check MST depth at every node properly (previously it could do some walking before being able to check and included some assumptions) 23 + - check MST for empty leaf nodes (which not allowed) 24 + - shave 0.6 nanoseconds (really) from MST depth calculation (don't ask) 25 + - drop and swap some dependencies: `bincode`, `futures`, `futures-core`, `ipld-core` -> `cid`, `multibase`, `rusqlite` -> `fjall`. and add `hashbrown` bc it benchmarked a bit faster. (we hash on user-controlled CIDs -- is the lower DOS-resistance a risk to worry about?)
+16 -9
examples/disk-read-file/main.rs
··· 3 */ 4 5 extern crate repo_stream; 6 use clap::Parser; 7 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 8 use std::path::PathBuf; 9 10 #[derive(Debug, Parser)] 11 struct Args { ··· 27 let reader = tokio::io::BufReader::new(reader); 28 29 log::info!("hello! reading the car..."); 30 31 // in this example we only bother handling CARs that are too big for memory 32 // `noop` helper means: do no block processing, store the raw blocks 33 let driver = match DriverBuilder::new() 34 - .with_mem_limit_mb(10) // how much memory can be used before disk spill 35 .load_car(reader) 36 .await? 37 { ··· 48 49 // at this point you might want to fetch the account's signing key 50 // via the DID from the commit, and then verify the signature. 51 - log::warn!("big's comit: {:?}", commit); 52 53 // pop the driver back out to get some code indentation relief 54 driver ··· 70 // keep a count of the total number of blocks seen 71 n += pairs.len(); 72 73 - for (_, block) in pairs { 74 // for each block, count how many bytes are equal to '0' 75 // (this is just an example, you probably want to do something more 76 // interesting) 77 - zeros += block.into_iter().filter(|&b| b == b'0').count() 78 } 79 } 80 81 - log::info!("arrived! joining rx..."); 82 83 - // clean up the database. would be nice to do this in drop so it happens 84 - // automatically, but some blocking work happens, so that's not allowed in 85 - // async rust. ๐Ÿคทโ€โ™€๏ธ 86 - join.await?.reset_store().await?; 87 88 log::info!("done. n={n} zeros={zeros}"); 89
··· 3 */ 4 5 extern crate repo_stream; 6 + 7 + use mimalloc::MiMalloc; 8 + #[global_allocator] 9 + static GLOBAL: MiMalloc = MiMalloc; 10 + 11 use clap::Parser; 12 use repo_stream::{DiskBuilder, Driver, DriverBuilder}; 13 use std::path::PathBuf; 14 + use std::time::Instant; 15 16 #[derive(Debug, Parser)] 17 struct Args { ··· 33 let reader = tokio::io::BufReader::new(reader); 34 35 log::info!("hello! reading the car..."); 36 + let t0 = Instant::now(); 37 38 // in this example we only bother handling CARs that are too big for memory 39 // `noop` helper means: do no block processing, store the raw blocks 40 let driver = match DriverBuilder::new() 41 + .with_mem_limit_mb(32) // how much memory can be used before disk spill 42 .load_car(reader) 43 .await? 44 { ··· 55 56 // at this point you might want to fetch the account's signing key 57 // via the DID from the commit, and then verify the signature. 58 + log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit); 59 + 60 + // log::info!("now is good time to check mem usage..."); 61 + // tokio::time::sleep(std::time::Duration::from_secs(15)).await; 62 63 // pop the driver back out to get some code indentation relief 64 driver ··· 80 // keep a count of the total number of blocks seen 81 n += pairs.len(); 82 83 + for output in pairs { 84 // for each block, count how many bytes are equal to '0' 85 // (this is just an example, you probably want to do something more 86 // interesting) 87 + zeros += output.data.into_iter().filter(|&b| b == b'0').count() 88 } 89 } 90 91 + log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 92 93 + join.await?; 94 95 log::info!("done. n={n} zeros={zeros}"); 96
+1 -1
examples/read-file/main.rs
··· 24 let reader = tokio::io::BufReader::new(reader); 25 26 let (commit, mut driver) = match DriverBuilder::new() 27 - .with_block_processor(|block| block.len()) 28 .load_car(reader) 29 .await? 30 {
··· 24 let reader = tokio::io::BufReader::new(reader); 25 26 let (commit, mut driver) = match DriverBuilder::new() 27 + .with_block_processor(|block| block.len().to_ne_bytes().to_vec()) 28 .load_car(reader) 29 .await? 30 {
+115 -17
readme.md
··· 4 5 [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 8 [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 9 [docs-badge]: https://docs.rs/repo-stream/badge.svg 10 11 12 - todo 13 14 - - [ ] get an *emtpy* car for the test suite 15 - - [ ] implement a max size on disk limit 16 17 18 ----- 19 ··· 22 23 current car processing times (records processed into their length usize, phil's dev machine): 24 25 - - 128MiB CAR file: `347ms` 26 - - 5.0MiB: `6.1ms` 27 - - 279KiB: `139us` 28 - - 3.4KiB: `4.9us` 29 30 31 - running the huge-car benchmark 32 33 - to avoid committing it to the repo, you have to pass it in through the env for now. 34 ··· 42 - [x] car file test fixtures & validation tests 43 - [x] make sure we can get the did and signature out for verification 44 -> yeah the commit is returned from init 45 - - [ ] spec compliance todos 46 - [x] assert that keys are ordered and fail if not 47 - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 48 - - [ ] performance todos 49 - [x] consume the serialized nodes into a mutable efficient format 50 - - [ ] maybe customize the deserialize impl to do that directly? 51 - [x] benchmark and profile 52 - - [ ] robustness todos 53 - - [ ] swap the blocks hashmap for a BlockStore trait that can be dumped to redb 54 - - [ ] maybe keep the redb function behind a feature flag? 55 - - [ ] can we assert a max size for node blocks? 56 - [x] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting 57 -> because it's the upper 3 bytes, not upper 4 byte nibble, oops. 58 - - [ ] max mst depth (there is actually a hard limit but a malicious repo could do anything) 59 - - [ ] i don't *think* we need a max recursion depth for processing cbor contents since we leave records to the user to decode 60 61 newer ideas 62
··· 4 5 [![Crates.io][crates-badge]](https://crates.io/crates/repo-stream) 6 [![Documentation][docs-badge]](https://docs.rs/repo-stream) 7 + [![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil) 8 9 [crates-badge]: https://img.shields.io/crates/v/repo-stream.svg 10 [docs-badge]: https://docs.rs/repo-stream/badge.svg 11 + [sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff 12 13 + ```rust no_run 14 + use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output}; 15 16 + #[tokio::main] 17 + async fn main() -> Result<(), Box<dyn std::error::Error>> { 18 + // repo-stream takes any AsyncRead as input, like a tokio::fs::File 19 + let reader = tokio::fs::File::open("repo.car").await?; 20 + let reader = tokio::io::BufReader::new(reader); 21 + 22 + // example repo workload is simply counting the total record bytes 23 + let mut total_size = 0; 24 25 + match DriverBuilder::new() 26 + .with_mem_limit_mb(10) 27 + .with_block_processor( // block processing: just extract the raw record size 28 + |rec| rec.len().to_ne_bytes().to_vec()) 29 + .load_car(reader) 30 + .await? 31 + { 32 33 + // if all blocks fit within memory 34 + Driver::Memory(_commit, mut driver) => { 35 + while let Some(chunk) = driver.next_chunk(256).await? { 36 + for Output { rkey: _, cid: _, data } in chunk { 37 + let size = usize::from_ne_bytes(data.try_into().unwrap()); 38 + total_size += size; 39 + } 40 + } 41 + }, 42 + 43 + // if the CAR was too big for in-memory processing 44 + Driver::Disk(paused) => { 45 + // set up a disk store we can spill to 46 + let store = DiskBuilder::new().open("some/path.db".into()).await?; 47 + // do the spilling, get back a (similar) driver 48 + let (_commit, mut driver) = paused.finish_loading(store).await?; 49 + 50 + while let Some(chunk) = driver.next_chunk(256).await? { 51 + for Output { rkey: _, cid: _, data } in chunk { 52 + let size = usize::from_ne_bytes(data.try_into().unwrap()); 53 + total_size += size; 54 + } 55 + } 56 + } 57 + }; 58 + println!("sum of size of all records: {total_size}"); 59 + Ok(()) 60 + } 61 + ``` 62 + 63 + more recent todo 64 + - [ ] add a zero-copy rkyv process function example 65 + - [ ] repo car slices 66 + - [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling) 67 + - [x] get an *emtpy* car for the test suite 68 + - [x] implement a max size on disk limit 69 + 70 + some ideas 71 + - [ ] since the disk k/v get/set interface is now so similar to HashMap (blocking, no transactions,), it's probably possible to make a single `Driver` and move the thread stuff from the disk one to generic helper functions. (might create async footguns though) 72 + - [ ] fork iroh-car into a sync version so we can drop tokio as a hard requirement, and offer async via wrapper helper things 73 + - [ ] feature-flag the sha2 crate for hmac-sha256? if someone wanted fewer deps?? then maybe make `hashbrown` also optional vs builtin hashmap? 74 75 ----- 76 ··· 79 80 current car processing times (records processed into their length usize, phil's dev machine): 81 82 + - 450MiB CAR file (huge): `1.3s` 83 + - 128MiB (huge): `350ms` 84 + - 5.0MiB: `6.8ms` 85 + - 279KiB: `160us` 86 + - 3.4KiB: `5.1us` 87 + - empty: `690ns` 88 + 89 + it's a little faster with `mimalloc` 90 + 91 + ```rust 92 + use mimalloc::MiMalloc; 93 + #[global_allocator] 94 + static GLOBAL: MiMalloc = MiMalloc; 95 + ``` 96 + 97 + - 450MiB CAR file: `1.2s` (-8%) 98 + - 128MiB: `300ms` (-14%) 99 + - 5.0MiB: `6.0ms` (-11%) 100 + - 279KiB: `150us` (-7%) 101 + - 3.4KiB: `4.7us` (-8%) 102 + - empty: `670ns` (-4%) 103 104 + processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!) 105 106 + 1. spill blocks to disk 107 + 2. inline block processing 108 + 109 + #### spill blocks to disk 110 + 111 + this is a little slower but can greatly reduce the memory used. there's nothing special you need to do for this. 112 + 113 + 114 + #### inline block processing 115 + 116 + if you don't need to store the complete records, you can have repo-stream try to optimistically apply a processing function to the raw blocks as they are streamed in. 117 + 118 + 119 + #### constrained mem perf comparison 120 + 121 + sketchy benchmark but hey. mimalloc is enabled, and the processing spills to disk. inline processing reduces entire records to 8 bytes (usize of the raw record block size): 122 + 123 + - 450MiB CAR file: `5.0s` (4.5x slowdown for disk) 124 + - 128MiB: `1.27s` (4.1x slowdown) 125 + 126 + fortunately, most CARs in the ATmosphere are very small, so for eg. backfill purposes, the vast majority of inputs will not face this slowdown. 127 + 128 + 129 + #### running the huge-car benchmark 130 131 - to avoid committing it to the repo, you have to pass it in through the env for now. 132 ··· 140 - [x] car file test fixtures & validation tests 141 - [x] make sure we can get the did and signature out for verification 142 -> yeah the commit is returned from init 143 + - [x] spec compliance todos 144 - [x] assert that keys are ordered and fail if not 145 - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5)) 146 + - [x] performance todos 147 - [x] consume the serialized nodes into a mutable efficient format 148 + - [x] maybe customize the deserialize impl to do that directly? 149 - [x] benchmark and profile 150 + - [x] robustness todos 151 + - [x] swap the blocks hashmap for a BlockStore trait that can be dumped to redb 152 + - [x] maybe keep the redb function behind a feature flag? 153 + - [ ] can we assert a max size of entries for node blocks? 154 - [x] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting 155 -> because it's the upper 3 bytes, not upper 4 byte nibble, oops. 156 + - [x] max mst depth (to expensive to attack actually) 157 + - [x] i don't *think* we need a max recursion depth for processing cbor contents since we leave records to the user to decode 158 159 newer ideas 160
+39 -99
src/disk.rs
··· 17 ``` 18 */ 19 20 - use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 22 use std::path::PathBuf; 23 24 #[derive(Debug, thiserror::Error)] ··· 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), ··· 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 } 54 55 /// Builder-style disk store setup 56 pub struct DiskBuilder { 57 /// Database in-memory cache allowance 58 /// ··· 70 impl Default for DiskBuilder { 71 fn default() -> Self { 72 Self { 73 - cache_size_mb: 32, 74 max_stored_mb: 10 * 1024, // 10 GiB 75 } 76 } ··· 83 } 84 /// Set the in-memory cache allowance for the database 85 /// 86 - /// Default: 32 MiB 87 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 88 self.cache_size_mb = size; 89 self ··· 96 self 97 } 98 /// Open and initialize the actual disk storage 99 - pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> { 100 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 101 } 102 } 103 104 /// On-disk block storage 105 pub struct DiskStore { 106 - conn: rusqlite::Connection, 107 max_stored: usize, 108 stored: usize, 109 } ··· 116 max_stored_mb: usize, 117 ) -> Result<Self, DiskError> { 118 let max_stored = max_stored_mb * 2_usize.pow(20); 119 - let conn = tokio::task::spawn_blocking(move || { 120 - let conn = rusqlite::Connection::open(path)?; 121 - 122 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 123 - 124 - // conn.pragma_update(None, "journal_mode", "OFF")?; 125 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 126 - conn.pragma_update(None, "journal_mode", "WAL")?; 127 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 128 - conn.pragma_update(None, "synchronous", "OFF")?; 129 - conn.pragma_update( 130 - None, 131 - "cache_size", 132 - (cache_mb as i64 * sqlite_one_mb).to_string(), 133 - )?; 134 - Self::reset_tables(&conn)?; 135 136 - Ok::<_, DiskError>(conn) 137 }) 138 .await??; 139 140 Ok(Self { 141 - conn, 142 max_stored, 143 stored: 0, 144 }) 145 } 146 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 147 - let tx = self.conn.transaction()?; 148 - Ok(SqliteWriter { 149 - tx, 150 - stored: &mut self.stored, 151 - max: self.max_stored, 152 - }) 153 - } 154 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 155 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 156 - Ok(SqliteReader { select_stmt }) 157 - } 158 - /// Drop and recreate the kv table 159 - pub async fn reset(self) -> Result<Self, DiskError> { 160 - tokio::task::spawn_blocking(move || { 161 - Self::reset_tables(&self.conn)?; 162 - Ok(self) 163 - }) 164 - .await? 165 - } 166 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 167 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 168 - conn.execute( 169 - "CREATE TABLE blocks ( 170 - key BLOB PRIMARY KEY NOT NULL, 171 - val BLOB NOT NULL 172 - ) WITHOUT ROWID", 173 - (), 174 - )?; 175 - Ok(()) 176 - } 177 - } 178 179 - pub(crate) struct SqliteWriter<'conn> { 180 - tx: rusqlite::Transaction<'conn>, 181 - stored: &'conn mut usize, 182 - max: usize, 183 - } 184 - 185 - impl SqliteWriter<'_> { 186 pub(crate) fn put_many( 187 &mut self, 188 - kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 189 ) -> Result<(), DriveError> { 190 - let mut insert_stmt = self 191 - .tx 192 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 193 - .map_err(DiskError::DbError)?; 194 - for pair in kv { 195 - let (k, v) = pair?; 196 - *self.stored += v.len(); 197 - if *self.stored > self.max { 198 return Err(DiskError::MaxSizeExceeded.into()); 199 } 200 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 201 } 202 - Ok(()) 203 - } 204 - pub fn commit(self) -> Result<(), DiskError> { 205 - self.tx.commit()?; 206 Ok(()) 207 } 208 - } 209 210 - pub(crate) struct SqliteReader<'conn> { 211 - select_stmt: rusqlite::Statement<'conn>, 212 - } 213 214 - impl SqliteReader<'_> { 215 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 216 - self.select_stmt 217 - .query_one((&key,), |row| row.get(0)) 218 - .optional() 219 } 220 }
··· 17 ``` 18 */ 19 20 + use crate::{Bytes, drive::DriveError}; 21 + use fjall::{Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 22 use std::path::PathBuf; 23 24 #[derive(Debug, thiserror::Error)] ··· 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 + DbError(#[from] FjallError), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), ··· 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 } 42 43 /// Builder-style disk store setup 44 + #[derive(Debug, Clone)] 45 pub struct DiskBuilder { 46 /// Database in-memory cache allowance 47 /// ··· 59 impl Default for DiskBuilder { 60 fn default() -> Self { 61 Self { 62 + cache_size_mb: 64, 63 max_stored_mb: 10 * 1024, // 10 GiB 64 } 65 } ··· 72 } 73 /// Set the in-memory cache allowance for the database 74 /// 75 + /// Default: 64 MiB 76 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 77 self.cache_size_mb = size; 78 self ··· 85 self 86 } 87 /// Open and initialize the actual disk storage 88 + pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> { 89 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await 90 } 91 } 92 93 /// On-disk block storage 94 pub struct DiskStore { 95 + #[allow(unused)] 96 + db: Database, 97 + keyspace: Keyspace, 98 max_stored: usize, 99 stored: usize, 100 } ··· 107 max_stored_mb: usize, 108 ) -> Result<Self, DiskError> { 109 let max_stored = max_stored_mb * 2_usize.pow(20); 110 + let (db, keyspace) = tokio::task::spawn_blocking(move || { 111 + let db = Database::builder(path) 112 + .manual_journal_persist(true) 113 + .worker_threads(1) 114 + .cache_size(cache_mb as u64 * 2_u64.pow(20) / 2) 115 + .temporary(true) 116 + .open()?; 117 + let opts = KeyspaceCreateOptions::default() 118 + .expect_point_read_hits(true) 119 + .max_memtable_size(16 * 2_u64.pow(20)); 120 + let keyspace = db.keyspace("z", || opts)?; 121 122 + Ok::<_, DiskError>((db, keyspace)) 123 }) 124 .await??; 125 126 Ok(Self { 127 + db, 128 + keyspace, 129 max_stored, 130 stored: 0, 131 }) 132 } 133 134 pub(crate) fn put_many( 135 &mut self, 136 + kv: impl Iterator<Item = (Vec<u8>, Bytes)>, 137 ) -> Result<(), DriveError> { 138 + let mut batch = self.db.batch(); 139 + for (k, v) in kv { 140 + self.stored += v.len(); 141 + if self.stored > self.max_stored { 142 return Err(DiskError::MaxSizeExceeded.into()); 143 } 144 + batch.insert(&self.keyspace, k, v); 145 } 146 + batch.commit().map_err(DiskError::DbError)?; 147 Ok(()) 148 } 149 150 + #[inline] 151 + pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 152 + self.keyspace.get(key) 153 + } 154 155 + /// Drop and recreate the kv table 156 + pub async fn reset(&self) -> Result<(), DiskError> { 157 + let keyspace = self.keyspace.clone(); 158 + Ok(tokio::task::spawn_blocking(move || keyspace.clear()).await??) 159 } 160 }
+147 -214
src/drive.rs
··· 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 3 - use crate::disk::{DiskError, DiskStore}; 4 - use crate::process::Processable; 5 - use ipld_core::cid::Cid; 6 use iroh_car::CarReader; 7 - use serde::{Deserialize, Serialize}; 8 - use std::collections::HashMap; 9 use std::convert::Infallible; 10 use tokio::{io::AsyncRead, sync::mpsc}; 11 12 - use crate::mst::{Commit, Node}; 13 - use crate::walk::{Step, WalkError, Walker}; 14 15 /// Errors that can happen while consuming and emitting blocks and records 16 #[derive(Debug, thiserror::Error)] ··· 21 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 22 #[error("The Commit block reference by the root was not found")] 23 MissingCommit, 24 - #[error("The MST block {0} could not be found")] 25 - MissingBlock(Cid), 26 #[error("Failed to walk the mst tree: {0}")] 27 WalkError(#[from] WalkError), 28 #[error("CAR file had no roots")] 29 MissingRoot, 30 #[error("Storage error")] 31 StorageError(#[from] DiskError), 32 - #[error("Encode error: {0}")] 33 - BincodeEncodeError(#[from] bincode::error::EncodeError), 34 #[error("Tried to send on a closed channel")] 35 ChannelSendError, // SendError takes <T> which we don't need 36 #[error("Failed to join a task: {0}")] 37 JoinError(#[from] tokio::task::JoinError), 38 } 39 40 - #[derive(Debug, thiserror::Error)] 41 - pub enum DecodeError { 42 - #[error(transparent)] 43 - BincodeDecodeError(#[from] bincode::error::DecodeError), 44 - #[error("extra bytes remained after decoding")] 45 - ExtraGarbage, 46 - } 47 - 48 - /// An in-order chunk of Rkey + (processed) Block pairs 49 - pub type BlockChunk<T> = Vec<(String, T)>; 50 51 - #[derive(Debug, Clone, Serialize, Deserialize)] 52 - pub(crate) enum MaybeProcessedBlock<T> { 53 /// A block that's *probably* a Node (but we can't know yet) 54 /// 55 /// It *can be* a record that suspiciously looks a lot like a node, so we 56 /// cannot eagerly turn it into a Node. We only know for sure what it is 57 /// when we actually walk down the MST 58 - Raw(Vec<u8>), 59 /// A processed record from a block that was definitely not a Node 60 /// 61 /// Processing has to be fallible because the CAR can have totally-unused ··· 71 /// There's an alternative here, which would be to kick unprocessable blocks 72 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 73 /// surface the typed error later if needed by trying to reprocess. 74 - Processed(T), 75 } 76 77 - impl<T: Processable> Processable for MaybeProcessedBlock<T> { 78 - /// TODO this is probably a little broken 79 - fn get_size(&self) -> usize { 80 - use std::{cmp::max, mem::size_of}; 81 - 82 - // enum is always as big as its biggest member? 83 - let base_size = max(size_of::<Vec<u8>>(), size_of::<T>()); 84 - 85 - let extra = match self { 86 - Self::Raw(bytes) => bytes.len(), 87 - Self::Processed(t) => t.get_size(), 88 - }; 89 - 90 - base_size + extra 91 - } 92 - } 93 - 94 - impl<T> MaybeProcessedBlock<T> { 95 - fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self { 96 - if Node::could_be(&data) { 97 MaybeProcessedBlock::Raw(data) 98 } else { 99 MaybeProcessedBlock::Processed(process(data)) 100 } 101 } 102 } 103 104 /// Read a CAR file, buffering blocks in memory or to disk 105 - pub enum Driver<R: AsyncRead + Unpin, T: Processable> { 106 /// All blocks fit within the memory limit 107 /// 108 /// You probably want to check the commit's signature. You can go ahead and 109 /// walk the MST right away. 110 - Memory(Commit, MemDriver<T>), 111 /// Blocks exceed the memory limit 112 /// 113 /// You'll need to provide a disk storage to continue. The commit will be 114 /// returned and can be validated only once all blocks are loaded. 115 - Disk(NeedDisk<R, T>), 116 } 117 118 /// Builder-style driver setup 119 pub struct DriverBuilder { 120 pub mem_limit_mb: usize, 121 } 122 123 impl Default for DriverBuilder { 124 fn default() -> Self { 125 - Self { mem_limit_mb: 16 } 126 } 127 } 128 ··· 134 /// Set the in-memory size limit, in MiB 135 /// 136 /// Default: 16 MiB 137 - pub fn with_mem_limit_mb(self, new_limit: usize) -> Self { 138 - Self { 139 - mem_limit_mb: new_limit, 140 - } 141 } 142 /// Set the block processor 143 /// 144 /// Default: noop, raw blocks will be emitted 145 - pub fn with_block_processor<T: Processable>( 146 - self, 147 - p: fn(Vec<u8>) -> T, 148 - ) -> DriverBuilderWithProcessor<T> { 149 - DriverBuilderWithProcessor { 150 - mem_limit_mb: self.mem_limit_mb, 151 - block_processor: p, 152 - } 153 - } 154 - /// Begin processing an atproto MST from a CAR file 155 - pub async fn load_car<R: AsyncRead + Unpin>( 156 - self, 157 - reader: R, 158 - ) -> Result<Driver<R, Vec<u8>>, DriveError> { 159 - Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await 160 - } 161 - } 162 - 163 - /// Builder-style driver intermediate step 164 - /// 165 - /// start from `DriverBuilder` 166 - pub struct DriverBuilderWithProcessor<T: Processable> { 167 - pub mem_limit_mb: usize, 168 - pub block_processor: fn(Vec<u8>) -> T, 169 - } 170 - 171 - impl<T: Processable> DriverBuilderWithProcessor<T> { 172 - /// Set the in-memory size limit, in MiB 173 - /// 174 - /// Default: 16 MiB 175 - pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 176 - self.mem_limit_mb = new_limit; 177 self 178 } 179 /// Begin processing an atproto MST from a CAR file 180 - pub async fn load_car<R: AsyncRead + Unpin>( 181 - self, 182 - reader: R, 183 - ) -> Result<Driver<R, T>, DriveError> { 184 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 185 } 186 } 187 188 - impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> { 189 /// Begin processing an atproto MST from a CAR file 190 /// 191 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 197 /// resumed by providing a `SqliteStorage` for on-disk block storage. 198 pub async fn load_car( 199 reader: R, 200 - process: fn(Vec<u8>) -> T, 201 mem_limit_mb: usize, 202 - ) -> Result<Driver<R, T>, DriveError> { 203 let max_size = mem_limit_mb * 2_usize.pow(20); 204 let mut mem_blocks = HashMap::new(); 205 ··· 229 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 230 231 // stash (maybe processed) blocks in memory as long as we have room 232 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 233 mem_blocks.insert(cid, maybe_processed); 234 if mem_size >= max_size { 235 return Ok(Driver::Disk(NeedDisk { ··· 246 // all blocks loaded and we fit in memory! hopefully we found the commit... 247 let commit = commit.ok_or(DriveError::MissingCommit)?; 248 249 - let walker = Walker::new(commit.data); 250 251 Ok(Driver::Memory( 252 commit, ··· 273 /// work the init function will do. We can drop the CAR reader before walking, 274 /// so the sync/async boundaries become a little easier to work around. 275 #[derive(Debug)] 276 - pub struct MemDriver<T: Processable> { 277 - blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 278 walker: Walker, 279 - process: fn(Vec<u8>) -> T, 280 } 281 282 - impl<T: Processable> MemDriver<T> { 283 /// Step through the record outputs, in rkey order 284 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 285 let mut out = Vec::with_capacity(n); 286 for _ in 0..n { 287 // walk as far as we can until we run out of blocks or find a record 288 - match self.walker.step(&mut self.blocks, self.process)? { 289 - Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)), 290 - Step::Finish => break, 291 - Step::Found { rkey, data } => { 292 - out.push((rkey, data)); 293 - continue; 294 - } 295 }; 296 } 297 - 298 if out.is_empty() { 299 Ok(None) 300 } else { ··· 304 } 305 306 /// A partially memory-loaded car file that needs disk spillover to continue 307 - pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> { 308 car: CarReader<R>, 309 root: Cid, 310 - process: fn(Vec<u8>) -> T, 311 max_size: usize, 312 - mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>, 313 pub commit: Option<Commit>, 314 } 315 316 - fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> { 317 - bincode::serde::encode_to_vec(v, bincode::config::standard()) 318 - } 319 - 320 - pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> { 321 - let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?; 322 - if n != bytes.len() { 323 - return Err(DecodeError::ExtraGarbage); 324 - } 325 - Ok(t) 326 - } 327 - 328 - impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> { 329 pub async fn finish_loading( 330 mut self, 331 mut store: DiskStore, 332 - ) -> Result<(Commit, DiskDriver<T>), DriveError> { 333 // move store in and back out so we can manage lifetimes 334 // dump mem blocks into the store 335 store = tokio::task::spawn(async move { 336 - let mut writer = store.get_writer()?; 337 - 338 let kvs = self 339 .mem_blocks 340 .into_iter() 341 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 342 343 - writer.put_many(kvs)?; 344 - writer.commit()?; 345 Ok::<_, DriveError>(store) 346 }) 347 .await??; 348 349 - let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2); 350 351 let store_worker = tokio::task::spawn_blocking(move || { 352 - let mut writer = store.get_writer()?; 353 - 354 while let Some(chunk) = rx.blocking_recv() { 355 let kvs = chunk 356 .into_iter() 357 - .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 358 - writer.put_many(kvs)?; 359 } 360 - 361 - writer.commit()?; 362 Ok::<_, DriveError>(store) 363 }); // await later 364 ··· 377 self.commit = Some(c); 378 continue; 379 } 380 // remaining possible types: node, record, other. optimistically process 381 // TODO: get the actual in-memory size to compute disk spill 382 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 383 - mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size(); 384 chunk.push((cid, maybe_processed)); 385 - if mem_size >= self.max_size { 386 // soooooo if we're setting the db cache to max_size and then letting 387 // multiple chunks in the queue that are >= max_size, then at any time 388 // we might be using some multiple of max_size? ··· 405 406 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 407 408 - let walker = Walker::new(commit.data); 409 410 Ok(( 411 commit, ··· 423 } 424 425 /// MST walker that reads from disk instead of an in-memory hashmap 426 - pub struct DiskDriver<T: Clone> { 427 - process: fn(Vec<u8>) -> T, 428 state: Option<BigState>, 429 } 430 431 // for doctests only 432 #[doc(hidden)] 433 - pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> { 434 - use crate::process::noop; 435 DiskDriver { 436 process: noop, 437 state: None, 438 } 439 } 440 441 - impl<T: Processable + Send + 'static> DiskDriver<T> { 442 /// Walk the MST returning up to `n` rkey + record pairs 443 /// 444 /// ```no_run 445 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 446 /// # #[tokio::main] 447 /// # async fn main() -> Result<(), DriveError> { 448 /// # let mut disk_driver = _get_fake_disk_driver(); 449 /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 450 - /// for (rkey, record) in pairs { 451 - /// println!("{rkey}: size={}", record.len()); 452 /// } 453 /// } 454 - /// let store = disk_driver.reset_store().await?; 455 /// # Ok(()) 456 /// # } 457 /// ``` 458 - pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> { 459 let process = self.process; 460 461 // state should only *ever* be None transiently while inside here ··· 464 // the big pain here is that we don't want to leave self.state in an 465 // invalid state (None), so all the error paths have to make sure it 466 // comes out again. 467 - let (state, res) = tokio::task::spawn_blocking( 468 - move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 469 - let mut reader_res = state.store.get_reader(); 470 - let reader: &mut _ = match reader_res { 471 - Ok(ref mut r) => r, 472 - Err(ref mut e) => { 473 - // unfortunately we can't return the error directly because 474 - // (for some reason) it's attached to the lifetime of the 475 - // reader? 476 - // hack a mem::swap so we can get it out :/ 477 - let e_swapped = e.steal(); 478 - // the pain: `state` *has to* outlive the reader 479 - drop(reader_res); 480 - return (state, Err(e_swapped.into())); 481 - } 482 - }; 483 - 484 let mut out = Vec::with_capacity(n); 485 486 for _ in 0..n { 487 // walk as far as we can until we run out of blocks or find a record 488 - let step = match state.walker.disk_step(reader, process) { 489 Ok(s) => s, 490 Err(e) => { 491 - // the pain: `state` *has to* outlive the reader 492 - drop(reader_res); 493 return (state, Err(e.into())); 494 } 495 }; 496 - match step { 497 - Step::Missing(cid) => { 498 - // the pain: `state` *has to* outlive the reader 499 - drop(reader_res); 500 - return (state, Err(DriveError::MissingBlock(cid))); 501 - } 502 - Step::Finish => break, 503 - Step::Found { rkey, data } => out.push((rkey, data)), 504 }; 505 } 506 507 - // `state` *has to* outlive the reader 508 - drop(reader_res); 509 - 510 (state, Ok::<_, DriveError>(out)) 511 - }, 512 - ) 513 - .await?; // on tokio JoinError, we'll be left with invalid state :( 514 515 // *must* restore state before dealing with the actual result 516 self.state = Some(state); ··· 527 fn read_tx_blocking( 528 &mut self, 529 n: usize, 530 - tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 531 - ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 532 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 533 - let mut reader = match store.get_reader() { 534 - Ok(r) => r, 535 - Err(e) => return tx.blocking_send(Err(e.into())), 536 - }; 537 538 loop { 539 - let mut out: BlockChunk<T> = Vec::with_capacity(n); 540 541 for _ in 0..n { 542 // walk as far as we can until we run out of blocks or find a record 543 544 - let step = match walker.disk_step(&mut reader, self.process) { 545 Ok(s) => s, 546 Err(e) => return tx.blocking_send(Err(e.into())), 547 }; 548 549 - match step { 550 - Step::Missing(cid) => { 551 - return tx.blocking_send(Err(DriveError::MissingBlock(cid))); 552 - } 553 - Step::Finish => return Ok(()), 554 - Step::Found { rkey, data } => { 555 - out.push((rkey, data)); 556 - continue; 557 - } 558 }; 559 } 560 561 if out.is_empty() { ··· 578 /// benefit over just using `.next_chunk(n)`. 579 /// 580 /// ```no_run 581 - /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop}; 582 /// # #[tokio::main] 583 /// # async fn main() -> Result<(), DriveError> { 584 /// # let mut disk_driver = _get_fake_disk_driver(); 585 /// let (mut rx, join) = disk_driver.to_channel(512); 586 /// while let Some(recvd) = rx.recv().await { 587 /// let pairs = recvd?; 588 - /// for (rkey, record) in pairs { 589 - /// println!("{rkey}: size={}", record.len()); 590 /// } 591 /// 592 /// } 593 - /// let store = join.await?.reset_store().await?; 594 /// # Ok(()) 595 /// # } 596 /// ``` ··· 598 mut self, 599 n: usize, 600 ) -> ( 601 - mpsc::Receiver<Result<BlockChunk<T>, DriveError>>, 602 tokio::task::JoinHandle<Self>, 603 ) { 604 - let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1); 605 606 // sketch: this worker is going to be allowed to execute without a join handle 607 let chan_task = tokio::task::spawn_blocking(move || { ··· 614 (rx, chan_task) 615 } 616 617 - /// Reset the disk storage so it can be reused. You must call this. 618 - /// 619 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 620 - /// calls, that would be risky in an async context. For now you just have to 621 - /// carefully make sure you call it. 622 /// 623 - /// The sqlite store is returned, so it can be reused for another 624 - /// `DiskDriver`. 625 pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 626 let BigState { store, .. } = self.state.take().expect("valid state"); 627 - Ok(store.reset().await?) 628 } 629 }
··· 1 //! Consume a CAR from an AsyncRead, producing an ordered stream of records 2 3 + use crate::{ 4 + Bytes, HashMap, 5 + disk::{DiskError, DiskStore}, 6 + mst::MstNode, 7 + walk::Output, 8 + }; 9 + use cid::Cid; 10 use iroh_car::CarReader; 11 use std::convert::Infallible; 12 use tokio::{io::AsyncRead, sync::mpsc}; 13 14 + use crate::mst::Commit; 15 + use crate::walk::{WalkError, Walker}; 16 17 /// Errors that can happen while consuming and emitting blocks and records 18 #[derive(Debug, thiserror::Error)] ··· 23 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>), 24 #[error("The Commit block reference by the root was not found")] 25 MissingCommit, 26 #[error("Failed to walk the mst tree: {0}")] 27 WalkError(#[from] WalkError), 28 #[error("CAR file had no roots")] 29 MissingRoot, 30 #[error("Storage error")] 31 StorageError(#[from] DiskError), 32 #[error("Tried to send on a closed channel")] 33 ChannelSendError, // SendError takes <T> which we don't need 34 #[error("Failed to join a task: {0}")] 35 JoinError(#[from] tokio::task::JoinError), 36 } 37 38 + /// An in-order chunk of Rkey + CID + (processed) Block 39 + pub type BlockChunk = Vec<Output>; 40 41 + #[derive(Debug, Clone)] 42 + pub(crate) enum MaybeProcessedBlock { 43 /// A block that's *probably* a Node (but we can't know yet) 44 /// 45 /// It *can be* a record that suspiciously looks a lot like a node, so we 46 /// cannot eagerly turn it into a Node. We only know for sure what it is 47 /// when we actually walk down the MST 48 + Raw(Bytes), 49 /// A processed record from a block that was definitely not a Node 50 /// 51 /// Processing has to be fallible because the CAR can have totally-unused ··· 61 /// There's an alternative here, which would be to kick unprocessable blocks 62 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could 63 /// surface the typed error later if needed by trying to reprocess. 64 + Processed(Bytes), 65 } 66 67 + impl MaybeProcessedBlock { 68 + pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self { 69 + if MstNode::could_be(&data) { 70 MaybeProcessedBlock::Raw(data) 71 } else { 72 MaybeProcessedBlock::Processed(process(data)) 73 } 74 } 75 + pub(crate) fn len(&self) -> usize { 76 + match self { 77 + MaybeProcessedBlock::Raw(b) => b.len(), 78 + MaybeProcessedBlock::Processed(b) => b.len(), 79 + } 80 + } 81 + pub(crate) fn into_bytes(self) -> Bytes { 82 + match self { 83 + MaybeProcessedBlock::Raw(mut b) => { 84 + b.push(0x00); 85 + b 86 + } 87 + MaybeProcessedBlock::Processed(mut b) => { 88 + b.push(0x01); 89 + b 90 + } 91 + } 92 + } 93 + pub(crate) fn from_bytes(mut b: Bytes) -> Self { 94 + // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc 95 + let suffix = b.pop().unwrap(); 96 + if suffix == 0x00 { 97 + MaybeProcessedBlock::Raw(b) 98 + } else { 99 + MaybeProcessedBlock::Processed(b) 100 + } 101 + } 102 } 103 104 /// Read a CAR file, buffering blocks in memory or to disk 105 + pub enum Driver<R: AsyncRead + Unpin> { 106 /// All blocks fit within the memory limit 107 /// 108 /// You probably want to check the commit's signature. You can go ahead and 109 /// walk the MST right away. 110 + Memory(Commit, MemDriver), 111 /// Blocks exceed the memory limit 112 /// 113 /// You'll need to provide a disk storage to continue. The commit will be 114 /// returned and can be validated only once all blocks are loaded. 115 + Disk(NeedDisk<R>), 116 + } 117 + 118 + /// Processor that just returns the raw blocks 119 + #[inline] 120 + pub fn noop(block: Bytes) -> Bytes { 121 + block 122 } 123 124 /// Builder-style driver setup 125 + #[derive(Debug, Clone)] 126 pub struct DriverBuilder { 127 pub mem_limit_mb: usize, 128 + pub block_processor: fn(Bytes) -> Bytes, 129 } 130 131 impl Default for DriverBuilder { 132 fn default() -> Self { 133 + Self { 134 + mem_limit_mb: 16, 135 + block_processor: noop, 136 + } 137 } 138 } 139 ··· 145 /// Set the in-memory size limit, in MiB 146 /// 147 /// Default: 16 MiB 148 + pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self { 149 + self.mem_limit_mb = new_limit; 150 + self 151 } 152 + 153 /// Set the block processor 154 /// 155 /// Default: noop, raw blocks will be emitted 156 + pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder { 157 + self.block_processor = new_processor; 158 self 159 } 160 + 161 /// Begin processing an atproto MST from a CAR file 162 + pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> { 163 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await 164 } 165 } 166 167 + impl<R: AsyncRead + Unpin> Driver<R> { 168 /// Begin processing an atproto MST from a CAR file 169 /// 170 /// Blocks will be loaded, processed, and buffered in memory. If the entire ··· 176 /// resumed by providing a `SqliteStorage` for on-disk block storage. 177 pub async fn load_car( 178 reader: R, 179 + process: fn(Bytes) -> Bytes, 180 mem_limit_mb: usize, 181 + ) -> Result<Driver<R>, DriveError> { 182 let max_size = mem_limit_mb * 2_usize.pow(20); 183 let mut mem_blocks = HashMap::new(); 184 ··· 208 let maybe_processed = MaybeProcessedBlock::maybe(process, data); 209 210 // stash (maybe processed) blocks in memory as long as we have room 211 + mem_size += maybe_processed.len(); 212 mem_blocks.insert(cid, maybe_processed); 213 if mem_size >= max_size { 214 return Ok(Driver::Disk(NeedDisk { ··· 225 // all blocks loaded and we fit in memory! hopefully we found the commit... 226 let commit = commit.ok_or(DriveError::MissingCommit)?; 227 228 + // the commit always must point to a Node; empty node => empty MST special case 229 + let root_node: MstNode = match mem_blocks 230 + .get(&commit.data) 231 + .ok_or(DriveError::MissingCommit)? 232 + { 233 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 234 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?, 235 + }; 236 + let walker = Walker::new(root_node); 237 238 Ok(Driver::Memory( 239 commit, ··· 260 /// work the init function will do. We can drop the CAR reader before walking, 261 /// so the sync/async boundaries become a little easier to work around. 262 #[derive(Debug)] 263 + pub struct MemDriver { 264 + blocks: HashMap<Cid, MaybeProcessedBlock>, 265 walker: Walker, 266 + process: fn(Bytes) -> Bytes, 267 } 268 269 + impl MemDriver { 270 /// Step through the record outputs, in rkey order 271 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 272 let mut out = Vec::with_capacity(n); 273 for _ in 0..n { 274 // walk as far as we can until we run out of blocks or find a record 275 + let Some(output) = self.walker.step(&mut self.blocks, self.process)? else { 276 + break; 277 }; 278 + out.push(output); 279 } 280 if out.is_empty() { 281 Ok(None) 282 } else { ··· 286 } 287 288 /// A partially memory-loaded car file that needs disk spillover to continue 289 + pub struct NeedDisk<R: AsyncRead + Unpin> { 290 car: CarReader<R>, 291 root: Cid, 292 + process: fn(Bytes) -> Bytes, 293 max_size: usize, 294 + mem_blocks: HashMap<Cid, MaybeProcessedBlock>, 295 pub commit: Option<Commit>, 296 } 297 298 + impl<R: AsyncRead + Unpin> NeedDisk<R> { 299 pub async fn finish_loading( 300 mut self, 301 mut store: DiskStore, 302 + ) -> Result<(Commit, DiskDriver), DriveError> { 303 // move store in and back out so we can manage lifetimes 304 // dump mem blocks into the store 305 store = tokio::task::spawn(async move { 306 let kvs = self 307 .mem_blocks 308 .into_iter() 309 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 310 311 + store.put_many(kvs)?; 312 Ok::<_, DriveError>(store) 313 }) 314 .await??; 315 316 + let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1); 317 318 let store_worker = tokio::task::spawn_blocking(move || { 319 while let Some(chunk) = rx.blocking_recv() { 320 let kvs = chunk 321 .into_iter() 322 + .map(|(k, v)| (k.to_bytes(), v.into_bytes())); 323 + store.put_many(kvs)?; 324 } 325 Ok::<_, DriveError>(store) 326 }); // await later 327 ··· 340 self.commit = Some(c); 341 continue; 342 } 343 + 344 + let data = Bytes::from(data); 345 + 346 // remaining possible types: node, record, other. optimistically process 347 // TODO: get the actual in-memory size to compute disk spill 348 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data); 349 + mem_size += maybe_processed.len(); 350 chunk.push((cid, maybe_processed)); 351 + if mem_size >= (self.max_size / 2) { 352 // soooooo if we're setting the db cache to max_size and then letting 353 // multiple chunks in the queue that are >= max_size, then at any time 354 // we might be using some multiple of max_size? ··· 371 372 let commit = self.commit.ok_or(DriveError::MissingCommit)?; 373 374 + // the commit always must point to a Node; empty node => empty MST special case 375 + let db_bytes = store 376 + .get(&commit.data.to_bytes()) 377 + .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))? 378 + .ok_or(DriveError::MissingCommit)?; 379 + 380 + let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) { 381 + MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?, 382 + MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?, 383 + }; 384 + let walker = Walker::new(node); 385 386 Ok(( 387 commit, ··· 399 } 400 401 /// MST walker that reads from disk instead of an in-memory hashmap 402 + pub struct DiskDriver { 403 + process: fn(Bytes) -> Bytes, 404 state: Option<BigState>, 405 } 406 407 // for doctests only 408 #[doc(hidden)] 409 + pub fn _get_fake_disk_driver() -> DiskDriver { 410 DiskDriver { 411 process: noop, 412 state: None, 413 } 414 } 415 416 + impl DiskDriver { 417 /// Walk the MST returning up to `n` rkey + record pairs 418 /// 419 /// ```no_run 420 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 421 /// # #[tokio::main] 422 /// # async fn main() -> Result<(), DriveError> { 423 /// # let mut disk_driver = _get_fake_disk_driver(); 424 /// while let Some(pairs) = disk_driver.next_chunk(256).await? { 425 + /// for output in pairs { 426 + /// println!("{}: size={}", output.rkey, output.data.len()); 427 /// } 428 /// } 429 /// # Ok(()) 430 /// # } 431 /// ``` 432 + pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> { 433 let process = self.process; 434 435 // state should only *ever* be None transiently while inside here ··· 438 // the big pain here is that we don't want to leave self.state in an 439 // invalid state (None), so all the error paths have to make sure it 440 // comes out again. 441 + let (state, res) = 442 + tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) { 443 let mut out = Vec::with_capacity(n); 444 445 for _ in 0..n { 446 // walk as far as we can until we run out of blocks or find a record 447 + let step = match state.walker.disk_step(&mut state.store, process) { 448 Ok(s) => s, 449 Err(e) => { 450 return (state, Err(e.into())); 451 } 452 }; 453 + let Some(output) = step else { 454 + break; 455 }; 456 + out.push(output); 457 } 458 459 (state, Ok::<_, DriveError>(out)) 460 + }) 461 + .await?; // on tokio JoinError, we'll be left with invalid state :( 462 463 // *must* restore state before dealing with the actual result 464 self.state = Some(state); ··· 475 fn read_tx_blocking( 476 &mut self, 477 n: usize, 478 + tx: mpsc::Sender<Result<BlockChunk, DriveError>>, 479 + ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> { 480 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 481 482 loop { 483 + let mut out: BlockChunk = Vec::with_capacity(n); 484 485 for _ in 0..n { 486 // walk as far as we can until we run out of blocks or find a record 487 488 + let step = match walker.disk_step(store, self.process) { 489 Ok(s) => s, 490 Err(e) => return tx.blocking_send(Err(e.into())), 491 }; 492 493 + let Some(output) = step else { 494 + break; 495 }; 496 + out.push(output); 497 } 498 499 if out.is_empty() { ··· 516 /// benefit over just using `.next_chunk(n)`. 517 /// 518 /// ```no_run 519 + /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop}; 520 /// # #[tokio::main] 521 /// # async fn main() -> Result<(), DriveError> { 522 /// # let mut disk_driver = _get_fake_disk_driver(); 523 /// let (mut rx, join) = disk_driver.to_channel(512); 524 /// while let Some(recvd) = rx.recv().await { 525 /// let pairs = recvd?; 526 + /// for output in pairs { 527 + /// println!("{}: size={}", output.rkey, output.data.len()); 528 /// } 529 /// 530 /// } 531 /// # Ok(()) 532 /// # } 533 /// ``` ··· 535 mut self, 536 n: usize, 537 ) -> ( 538 + mpsc::Receiver<Result<BlockChunk, DriveError>>, 539 tokio::task::JoinHandle<Self>, 540 ) { 541 + let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1); 542 543 // sketch: this worker is going to be allowed to execute without a join handle 544 let chan_task = tokio::task::spawn_blocking(move || { ··· 551 (rx, chan_task) 552 } 553 554 + /// Reset the disk storage so it can be reused. 555 /// 556 + /// The store is returned, so it can be reused for another `DiskDriver`. 557 pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 558 let BigState { store, .. } = self.state.take().expect("valid state"); 559 + store.reset().await?; 560 + Ok(store) 561 } 562 }
+19 -9
src/lib.rs
··· 27 28 match DriverBuilder::new() 29 .with_mem_limit_mb(10) 30 - .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size 31 .load_car(reader) 32 .await? 33 { ··· 35 // if all blocks fit within memory 36 Driver::Memory(_commit, mut driver) => { 37 while let Some(chunk) = driver.next_chunk(256).await? { 38 - for (_rkey, size) in chunk { 39 total_size += size; 40 } 41 } ··· 49 let (_commit, mut driver) = paused.finish_loading(store).await?; 50 51 while let Some(chunk) = driver.next_chunk(256).await? { 52 - for (_rkey, size) in chunk { 53 total_size += size; 54 } 55 } 56 - 57 - // clean up the disk store (drop tables etc) 58 - driver.reset_store().await?; 59 } 60 }; 61 println!("sum of size of all records: {total_size}"); ··· 79 80 pub mod disk; 81 pub mod drive; 82 - pub mod process; 83 84 pub use disk::{DiskBuilder, DiskError, DiskStore}; 85 - pub use drive::{DriveError, Driver, DriverBuilder}; 86 pub use mst::Commit; 87 - pub use process::Processable;
··· 27 28 match DriverBuilder::new() 29 .with_mem_limit_mb(10) 30 + .with_block_processor( 31 + |rec| rec.len().to_ne_bytes().to_vec() 32 + ) // block processing: just extract the raw record size 33 .load_car(reader) 34 .await? 35 { ··· 37 // if all blocks fit within memory 38 Driver::Memory(_commit, mut driver) => { 39 while let Some(chunk) = driver.next_chunk(256).await? { 40 + for output in chunk { 41 + let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 42 + 43 total_size += size; 44 } 45 } ··· 53 let (_commit, mut driver) = paused.finish_loading(store).await?; 54 55 while let Some(chunk) = driver.next_chunk(256).await? { 56 + for output in chunk { 57 + let size = usize::from_ne_bytes(output.data.try_into().unwrap()); 58 + 59 total_size += size; 60 } 61 } 62 } 63 }; 64 println!("sum of size of all records: {total_size}"); ··· 82 83 pub mod disk; 84 pub mod drive; 85 86 pub use disk::{DiskBuilder, DiskError, DiskStore}; 87 + pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop}; 88 pub use mst::Commit; 89 + pub use walk::Output; 90 + 91 + pub type Bytes = Vec<u8>; 92 + 93 + pub(crate) use hashbrown::HashMap; 94 + 95 + #[doc = include_str!("../readme.md")] 96 + #[cfg(doctest)] 97 + pub struct ReadmeDoctests;
+151 -28
src/mst.rs
··· 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 6 - use ipld_core::cid::Cid; 7 use serde::Deserialize; 8 9 /// The top-level data object in a repository's tree is a signed commit. 10 #[derive(Debug, Deserialize)] ··· 33 pub prev: Option<Cid>, 34 /// cryptographic signature of this commit, as raw bytes 35 #[serde(with = "serde_bytes")] 36 - pub sig: Vec<u8>, 37 } 38 39 - /// MST node data schema 40 - #[derive(Debug, Deserialize, PartialEq)] 41 - #[serde(deny_unknown_fields)] 42 - pub(crate) struct Node { 43 - /// link to sub-tree Node on a lower level and with all keys sorting before 44 - /// keys at this node 45 - #[serde(rename = "l")] 46 - pub left: Option<Cid>, 47 - /// ordered list of TreeEntry objects 48 - /// 49 - /// atproto MSTs have a fanout of 4, so there can be max 4 entries. 50 - #[serde(rename = "e")] 51 - pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]? 52 } 53 54 - impl Node { 55 /// test if a block could possibly be a node 56 /// 57 /// we can't eagerly decode records except where we're *sure* they cannot be ··· 62 /// so if a block *could be* a node, any record converter must postpone 63 /// processing. if it turns out it happens to be a very node-looking record, 64 /// well, sorry, it just has to only be processed later when that's known. 65 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 66 const NODE_FINGERPRINT: [u8; 3] = [ 67 0xA2, // map length 2 (for "l" and "e" keys) ··· 76 .map(|b| b & 0b1110_0000 == 0x80) 77 .unwrap_or(false) 78 } 79 - 80 - /// Check if a node has any entries 81 - /// 82 - /// An empty repository with no records is represented as a single MST node 83 - /// with an empty array of entries. This is the only situation in which a 84 - /// tree may contain an empty leaf node which does not either contain keys 85 - /// ("entries") or point to a sub-tree containing entries. 86 - pub(crate) fn is_empty(&self) -> bool { 87 - self.left.is_none() && self.entries.is_empty() 88 - } 89 } 90 91 /// TreeEntry object ··· 96 #[serde(rename = "p")] 97 pub prefix_len: usize, 98 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 99 - #[serde(rename = "k", with = "serde_bytes")] 100 - pub keysuffix: Vec<u8>, // can we String this here? 101 /// link to the record data (CBOR) for this entry 102 #[serde(rename = "v")] 103 pub value: Cid,
··· 3 //! The primary aim is to work through the **tree** structure. Non-node blocks 4 //! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever. 5 6 + use cid::Cid; 7 use serde::Deserialize; 8 + use sha2::{Digest, Sha256}; 9 10 /// The top-level data object in a repository's tree is a signed commit. 11 #[derive(Debug, Deserialize)] ··· 34 pub prev: Option<Cid>, 35 /// cryptographic signature of this commit, as raw bytes 36 #[serde(with = "serde_bytes")] 37 + pub sig: serde_bytes::ByteBuf, 38 + } 39 + 40 + use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor}; 41 + use std::fmt; 42 + 43 + pub type Depth = u32; 44 + 45 + #[inline(always)] 46 + pub fn atproto_mst_depth(key: &str) -> Depth { 47 + // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24 48 + u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2 49 + } 50 + 51 + #[derive(Debug)] 52 + pub(crate) struct MstNode { 53 + pub depth: Option<Depth>, // known for nodes with entries (required for root) 54 + pub things: Vec<NodeThing>, 55 + } 56 + 57 + #[derive(Debug)] 58 + pub(crate) struct NodeThing { 59 + pub(crate) cid: Cid, 60 + pub(crate) kind: ThingKind, 61 + } 62 + 63 + #[derive(Debug)] 64 + pub(crate) enum ThingKind { 65 + Tree, 66 + Value { rkey: String }, 67 } 68 69 + impl<'de> Deserialize<'de> for MstNode { 70 + fn deserialize<D>(deserializer: D) -> Result<Self, D::Error> 71 + where 72 + D: Deserializer<'de>, 73 + { 74 + struct NodeVisitor; 75 + impl<'de> Visitor<'de> for NodeVisitor { 76 + type Value = MstNode; 77 + 78 + fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result { 79 + formatter.write_str("struct MstNode") 80 + } 81 + 82 + fn visit_map<V>(self, mut map: V) -> Result<MstNode, V::Error> 83 + where 84 + V: MapAccess<'de>, 85 + { 86 + let mut found_left = false; 87 + let mut left = None; 88 + let mut found_entries = false; 89 + let mut things = Vec::new(); 90 + let mut depth = None; 91 + 92 + while let Some(key) = map.next_key()? { 93 + match key { 94 + "l" => { 95 + if found_left { 96 + return Err(de::Error::duplicate_field("l")); 97 + } 98 + found_left = true; 99 + if let Some(cid) = map.next_value()? { 100 + left = Some(NodeThing { 101 + cid, 102 + kind: ThingKind::Tree, 103 + }); 104 + } 105 + } 106 + "e" => { 107 + if found_entries { 108 + return Err(de::Error::duplicate_field("e")); 109 + } 110 + found_entries = true; 111 + 112 + let mut prefix: Vec<u8> = vec![]; 113 + 114 + for entry in map.next_value::<Vec<Entry>>()? { 115 + let mut rkey: Vec<u8> = vec![]; 116 + let pre_checked = 117 + prefix.get(..entry.prefix_len).ok_or_else(|| { 118 + de::Error::invalid_value( 119 + Unexpected::Bytes(&prefix), 120 + &"a prefix at least as long as the prefix_len", 121 + ) 122 + })?; 123 + 124 + rkey.extend_from_slice(pre_checked); 125 + rkey.extend_from_slice(&entry.keysuffix); 126 + 127 + let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| { 128 + de::Error::invalid_value( 129 + Unexpected::Bytes(&rkey), 130 + &"a valid utf-8 rkey", 131 + ) 132 + })?; 133 + 134 + let key_depth = atproto_mst_depth(&rkey_s); 135 + if depth.is_none() { 136 + depth = Some(key_depth); 137 + } else if Some(key_depth) != depth { 138 + return Err(de::Error::invalid_value( 139 + Unexpected::Bytes(&prefix), 140 + &"all rkeys to have equal MST depth", 141 + )); 142 + } 143 + 144 + things.push(NodeThing { 145 + cid: entry.value, 146 + kind: ThingKind::Value { rkey: rkey_s }, 147 + }); 148 + 149 + if let Some(cid) = entry.tree { 150 + things.push(NodeThing { 151 + cid, 152 + kind: ThingKind::Tree, 153 + }); 154 + } 155 + 156 + prefix = rkey; 157 + } 158 + } 159 + f => return Err(de::Error::unknown_field(f, NODE_FIELDS)), 160 + } 161 + } 162 + if !found_left { 163 + return Err(de::Error::missing_field("l")); 164 + } 165 + if !found_entries { 166 + return Err(de::Error::missing_field("e")); 167 + } 168 + 169 + things.reverse(); 170 + if let Some(l) = left { 171 + things.push(l); 172 + } 173 + 174 + Ok(MstNode { depth, things }) 175 + } 176 + } 177 + 178 + const NODE_FIELDS: &[&str] = &["l", "e"]; 179 + deserializer.deserialize_struct("MstNode", NODE_FIELDS, NodeVisitor) 180 + } 181 } 182 183 + impl MstNode { 184 + pub(crate) fn is_empty(&self) -> bool { 185 + self.things.is_empty() 186 + } 187 /// test if a block could possibly be a node 188 /// 189 /// we can't eagerly decode records except where we're *sure* they cannot be ··· 194 /// so if a block *could be* a node, any record converter must postpone 195 /// processing. if it turns out it happens to be a very node-looking record, 196 /// well, sorry, it just has to only be processed later when that's known. 197 + #[inline(always)] 198 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool { 199 const NODE_FINGERPRINT: [u8; 3] = [ 200 0xA2, // map length 2 (for "l" and "e" keys) ··· 209 .map(|b| b & 0b1110_0000 == 0x80) 210 .unwrap_or(false) 211 } 212 } 213 214 /// TreeEntry object ··· 219 #[serde(rename = "p")] 220 pub prefix_len: usize, 221 /// remainder of key for this TreeEntry, after "prefixlen" have been removed 222 + #[serde(rename = "k")] 223 + pub keysuffix: serde_bytes::ByteBuf, 224 /// link to the record data (CBOR) for this entry 225 #[serde(rename = "v")] 226 pub value: Cid,
-87
src/process.rs
··· 1 - /*! 2 - Record processor function output trait 3 - 4 - The return type must satisfy the `Processable` trait, which requires: 5 - 6 - - `Clone` because two rkeys can refer to the same record by CID, which may 7 - only appear once in the CAR file. 8 - - `Serialize + DeserializeOwned` so it can be spilled to disk. 9 - 10 - One required function must be implemented, `get_size()`: this should return the 11 - approximate total off-stack size of the type. (the on-stack size will be added 12 - automatically via `std::mem::get_size`). 13 - 14 - Note that it is **not guaranteed** that the `process` function will run on a 15 - block before storing it in memory or on disk: it's not possible to know if a 16 - block is a record without actually walking the MST, so the best we can do is 17 - apply `process` to any block that we know *cannot* be an MST node, and otherwise 18 - store the raw block bytes. 19 - 20 - Here's a silly processing function that just collects 'eyy's found in the raw 21 - record bytes 22 - 23 - ``` 24 - # use repo_stream::Processable; 25 - # use serde::{Serialize, Deserialize}; 26 - #[derive(Debug, Clone, Serialize, Deserialize)] 27 - struct Eyy(usize, String); 28 - 29 - impl Processable for Eyy { 30 - fn get_size(&self) -> usize { 31 - // don't need to compute the usize, it's on the stack 32 - self.1.capacity() // in-mem size from the string's capacity, in bytes 33 - } 34 - } 35 - 36 - fn process(raw: Vec<u8>) -> Vec<Eyy> { 37 - let mut out = Vec::new(); 38 - let to_find = "eyy".as_bytes(); 39 - for i in 0..(raw.len() - 3) { 40 - if &raw[i..(i+3)] == to_find { 41 - out.push(Eyy(i, "eyy".to_string())); 42 - } 43 - } 44 - out 45 - } 46 - ``` 47 - 48 - The memory sizing stuff is a little sketch but probably at least approximately 49 - works. 50 - */ 51 - 52 - use serde::{Serialize, de::DeserializeOwned}; 53 - 54 - /// Output trait for record processing 55 - pub trait Processable: Clone + Serialize + DeserializeOwned { 56 - /// Any additional in-memory size taken by the processed type 57 - /// 58 - /// Do not include stack size (`std::mem::size_of`) 59 - fn get_size(&self) -> usize; 60 - } 61 - 62 - /// Processor that just returns the raw blocks 63 - #[inline] 64 - pub fn noop(block: Vec<u8>) -> Vec<u8> { 65 - block 66 - } 67 - 68 - impl Processable for u8 { 69 - fn get_size(&self) -> usize { 70 - 0 71 - } 72 - } 73 - 74 - impl Processable for usize { 75 - fn get_size(&self) -> usize { 76 - 0 // no additional space taken, just its stack size (newtype is free) 77 - } 78 - } 79 - 80 - impl<Item: Sized + Processable> Processable for Vec<Item> { 81 - fn get_size(&self) -> usize { 82 - let slot_size = std::mem::size_of::<Item>(); 83 - let direct_size = slot_size * self.capacity(); 84 - let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum(); 85 - direct_size + items_referenced_size 86 - } 87 - }
···
+105 -342
src/walk.rs
··· 1 //! Depth-first MST traversal 2 3 - use crate::disk::SqliteReader; 4 - use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 - use crate::mst::Node; 6 - use crate::process::Processable; 7 - use ipld_core::cid::Cid; 8 - use sha2::{Digest, Sha256}; 9 - use std::collections::HashMap; 10 use std::convert::Infallible; 11 12 /// Errors that can happen while walking ··· 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 23 - #[error("Decode error: {0}")] 24 - DecodeError(#[from] DecodeError), 25 } 26 27 /// Errors from invalid Rkeys 28 #[derive(Debug, PartialEq, thiserror::Error)] 29 pub enum MstError { 30 - #[error("Failed to compute an rkey due to invalid prefix_len")] 31 - EntryPrefixOutOfbounds, 32 - #[error("RKey was not utf-8")] 33 - EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error), 34 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 35 EmptyNode, 36 - #[error("Found an entry with rkey at the wrong depth")] 37 - WrongDepth, 38 - #[error("Lost track of our depth (possible bug?)")] 39 - LostDepth, 40 #[error("MST depth underflow: depth-0 node with child trees")] 41 DepthUnderflow, 42 - #[error("Encountered an rkey out of order while walking the MST")] 43 - RkeyOutOfOrder, 44 } 45 46 /// Walker outputs 47 - #[derive(Debug)] 48 - pub enum Step<T> { 49 - /// We needed this CID but it's not in the block store 50 - Missing(Cid), 51 - /// Reached the end of the MST! yay! 52 - Finish, 53 - /// A record was found! 54 - Found { rkey: String, data: T }, 55 - } 56 - 57 - #[derive(Debug, Clone, PartialEq)] 58 - enum Need { 59 - Node { depth: Depth, cid: Cid }, 60 - Record { rkey: String, cid: Cid }, 61 - } 62 - 63 - #[derive(Debug, Clone, Copy, PartialEq)] 64 - enum Depth { 65 - Root, 66 - Depth(u32), 67 - } 68 - 69 - impl Depth { 70 - fn from_key(key: &[u8]) -> Self { 71 - let mut zeros = 0; 72 - for byte in Sha256::digest(key) { 73 - let leading = byte.leading_zeros(); 74 - zeros += leading; 75 - if leading < 8 { 76 - break; 77 - } 78 - } 79 - Self::Depth(zeros / 2) // truncating divide (rounds down) 80 - } 81 - fn next_expected(&self) -> Result<Option<u32>, MstError> { 82 - match self { 83 - Self::Root => Ok(None), 84 - Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some), 85 - } 86 - } 87 - } 88 - 89 - fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> { 90 - // empty nodes are not allowed in the MST 91 - // ...except for a single one for empty MST, but we wouldn't be pushing that 92 - if node.is_empty() { 93 - return Err(MstError::EmptyNode); 94 - } 95 - 96 - let mut entries = Vec::with_capacity(node.entries.len()); 97 - let mut prefix = vec![]; 98 - let mut this_depth = parent_depth.next_expected()?; 99 - 100 - for entry in &node.entries { 101 - let mut rkey = vec![]; 102 - let pre_checked = prefix 103 - .get(..entry.prefix_len) 104 - .ok_or(MstError::EntryPrefixOutOfbounds)?; 105 - rkey.extend_from_slice(pre_checked); 106 - rkey.extend_from_slice(&entry.keysuffix); 107 - 108 - let Depth::Depth(key_depth) = Depth::from_key(&rkey) else { 109 - return Err(MstError::WrongDepth); 110 - }; 111 - 112 - // this_depth is `none` if we are the deepest child (directly below root) 113 - // in that case we accept whatever highest depth is claimed 114 - let expected_depth = match this_depth { 115 - Some(d) => d, 116 - None => { 117 - this_depth = Some(key_depth); 118 - key_depth 119 - } 120 - }; 121 - 122 - // all keys we find should be this depth 123 - if key_depth != expected_depth { 124 - return Err(MstError::DepthUnderflow); 125 - } 126 - 127 - prefix = rkey.clone(); 128 - 129 - entries.push(Need::Record { 130 - rkey: String::from_utf8(rkey)?, 131 - cid: entry.value, 132 - }); 133 - if let Some(ref tree) = entry.tree { 134 - entries.push(Need::Node { 135 - depth: Depth::Depth(key_depth), 136 - cid: *tree, 137 - }); 138 - } 139 - } 140 - 141 - entries.reverse(); 142 - stack.append(&mut entries); 143 - 144 - let d = this_depth.ok_or(MstError::LostDepth)?; 145 - 146 - if let Some(tree) = node.left { 147 - stack.push(Need::Node { 148 - depth: Depth::Depth(d), 149 - cid: tree, 150 - }); 151 - } 152 - Ok(()) 153 } 154 155 /// Traverser of an atproto MST ··· 157 /// Walks the tree from left-to-right in depth-first order 158 #[derive(Debug)] 159 pub struct Walker { 160 - stack: Vec<Need>, 161 - prev: String, 162 } 163 164 impl Walker { 165 - pub fn new(tree_root_cid: Cid) -> Self { 166 Self { 167 - stack: vec![Need::Node { 168 - depth: Depth::Root, 169 - cid: tree_root_cid, 170 - }], 171 - prev: "".to_string(), 172 } 173 } 174 175 - /// Advance through nodes until we find a record or can't go further 176 - pub fn step<T: Processable>( 177 &mut self, 178 - blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>, 179 - process: impl Fn(Vec<u8>) -> T, 180 - ) -> Result<Step<T>, WalkError> { 181 - loop { 182 - let Some(need) = self.stack.last_mut() else { 183 - log::trace!("tried to walk but we're actually done."); 184 - return Ok(Step::Finish); 185 - }; 186 187 - match need { 188 - &mut Need::Node { depth, cid } => { 189 - log::trace!("need node {cid:?}"); 190 - let Some(block) = blocks.remove(&cid) else { 191 - log::trace!("node not found, resting"); 192 - return Ok(Step::Missing(cid)); 193 - }; 194 195 - let MaybeProcessedBlock::Raw(data) = block else { 196 - return Err(WalkError::BadCommitFingerprint); 197 - }; 198 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 199 - .map_err(WalkError::BadCommit)?; 200 201 - // found node, make sure we remember 202 - self.stack.pop(); 203 204 - // queue up work on the found node next 205 - push_from_node(&mut self.stack, &node, depth)?; 206 } 207 - Need::Record { rkey, cid } => { 208 - log::trace!("need record {cid:?}"); 209 - // note that we cannot *remove* a record block, sadly, since 210 - // there can be multiple rkeys pointing to the same cid. 211 - let Some(data) = blocks.get_mut(cid) else { 212 - return Ok(Step::Missing(*cid)); 213 - }; 214 - let rkey = rkey.clone(); 215 - let data = match data { 216 - MaybeProcessedBlock::Raw(data) => process(data.to_vec()), 217 - MaybeProcessedBlock::Processed(t) => t.clone(), 218 - }; 219 220 - // found node, make sure we remember 221 - self.stack.pop(); 222 - 223 - // rkeys *must* be in order or else the tree is invalid (or 224 - // we have a bug) 225 - if rkey <= self.prev { 226 - return Err(MstError::RkeyOutOfOrder)?; 227 - } 228 - self.prev = rkey.clone(); 229 - 230 - return Ok(Step::Found { rkey, data }); 231 } 232 } 233 } 234 } 235 236 - /// blocking!!!!!! 237 - pub fn disk_step<T: Processable>( 238 - &mut self, 239 - reader: &mut SqliteReader, 240 - process: impl Fn(Vec<u8>) -> T, 241 - ) -> Result<Step<T>, WalkError> { 242 - loop { 243 - let Some(need) = self.stack.last_mut() else { 244 - log::trace!("tried to walk but we're actually done."); 245 - return Ok(Step::Finish); 246 }; 247 - 248 - match need { 249 - &mut Need::Node { depth, cid } => { 250 - let cid_bytes = cid.to_bytes(); 251 - log::trace!("need node {cid:?}"); 252 - let Some(block_bytes) = reader.get(cid_bytes)? else { 253 - log::trace!("node not found, resting"); 254 - return Ok(Step::Missing(cid)); 255 - }; 256 - 257 - let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?; 258 - 259 - let MaybeProcessedBlock::Raw(data) = block else { 260 - return Err(WalkError::BadCommitFingerprint); 261 - }; 262 - let node = serde_ipld_dagcbor::from_slice::<Node>(&data) 263 - .map_err(WalkError::BadCommit)?; 264 - 265 - // found node, make sure we remember 266 - self.stack.pop(); 267 - 268 - // queue up work on the found node next 269 - push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?; 270 - } 271 - Need::Record { rkey, cid } => { 272 - log::trace!("need record {cid:?}"); 273 - let cid_bytes = cid.to_bytes(); 274 - let Some(data_bytes) = reader.get(cid_bytes)? else { 275 - log::trace!("record block not found, resting"); 276 - return Ok(Step::Missing(*cid)); 277 - }; 278 - let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?; 279 - let rkey = rkey.clone(); 280 - let data = match data { 281 - MaybeProcessedBlock::Raw(data) => process(data), 282 - MaybeProcessedBlock::Processed(t) => t.clone(), 283 - }; 284 - 285 - // found node, make sure we remember 286 - self.stack.pop(); 287 - 288 - log::trace!("emitting a block as a step. depth={}", self.stack.len()); 289 - 290 - // rkeys *must* be in order or else the tree is invalid (or 291 - // we have a bug) 292 - if rkey <= self.prev { 293 - return Err(MstError::RkeyOutOfOrder)?; 294 - } 295 - self.prev = rkey.clone(); 296 297 - return Ok(Step::Found { rkey, data }); 298 - } 299 } 300 } 301 } 302 - } 303 304 - #[cfg(test)] 305 - mod test { 306 - use super::*; 307 - 308 - fn cid1() -> Cid { 309 - "bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m" 310 - .parse() 311 - .unwrap() 312 - } 313 - 314 - #[test] 315 - fn test_depth_spec_0() { 316 - let d = Depth::from_key(b"2653ae71"); 317 - assert_eq!(d, Depth::Depth(0)) 318 - } 319 - 320 - #[test] 321 - fn test_depth_spec_1() { 322 - let d = Depth::from_key(b"blue"); 323 - assert_eq!(d, Depth::Depth(1)) 324 - } 325 - 326 - #[test] 327 - fn test_depth_spec_4() { 328 - let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec"); 329 - assert_eq!(d, Depth::Depth(4)) 330 - } 331 - 332 - #[test] 333 - fn test_depth_spec_8() { 334 - let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c"); 335 - assert_eq!(d, Depth::Depth(8)) 336 - } 337 - 338 - #[test] 339 - fn test_depth_ietf_draft_0() { 340 - let d = Depth::from_key(b"key1"); 341 - assert_eq!(d, Depth::Depth(0)) 342 - } 343 - 344 - #[test] 345 - fn test_depth_ietf_draft_1() { 346 - let d = Depth::from_key(b"key7"); 347 - assert_eq!(d, Depth::Depth(1)) 348 - } 349 - 350 - #[test] 351 - fn test_depth_ietf_draft_4() { 352 - let d = Depth::from_key(b"key515"); 353 - assert_eq!(d, Depth::Depth(4)) 354 - } 355 - 356 - #[test] 357 - fn test_depth_interop() { 358 - // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 359 - for (k, expected) in [ 360 - ("", 0), 361 - ("asdf", 0), 362 - ("blue", 1), 363 - ("2653ae71", 0), 364 - ("88bfafc7", 2), 365 - ("2a92d355", 4), 366 - ("884976f5", 6), 367 - ("app.bsky.feed.post/454397e440ec", 4), 368 - ("app.bsky.feed.post/9adeb165882c", 8), 369 - ] { 370 - let d = Depth::from_key(k.as_bytes()); 371 - assert_eq!(d, Depth::Depth(expected), "key: {}", k); 372 } 373 - } 374 - 375 - #[test] 376 - fn test_push_empty_fails() { 377 - let empty_node = Node { 378 - left: None, 379 - entries: vec![], 380 - }; 381 - let mut stack = vec![]; 382 - let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4)); 383 - assert_eq!(err, Err(MstError::EmptyNode)); 384 - } 385 - 386 - #[test] 387 - fn test_push_one_node() { 388 - let node = Node { 389 - left: Some(cid1()), 390 - entries: vec![], 391 - }; 392 - let mut stack = vec![]; 393 - push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap(); 394 - assert_eq!( 395 - stack.last(), 396 - Some(Need::Node { 397 - depth: Depth::Depth(3), 398 - cid: cid1() 399 - }) 400 - .as_ref() 401 - ); 402 } 403 }
··· 1 //! Depth-first MST traversal 2 3 + use crate::mst::{Depth, MstNode, NodeThing, ThingKind}; 4 + use crate::{Bytes, HashMap, disk::DiskStore, drive::MaybeProcessedBlock}; 5 + use cid::Cid; 6 use std::convert::Infallible; 7 8 /// Errors that can happen while walking ··· 15 #[error("Action node error: {0}")] 16 MstError(#[from] MstError), 17 #[error("storage error: {0}")] 18 + StorageError(#[from] fjall::Error), 19 + #[error("block not found: {0}")] 20 + MissingBlock(Cid), 21 } 22 23 /// Errors from invalid Rkeys 24 #[derive(Debug, PartialEq, thiserror::Error)] 25 pub enum MstError { 26 #[error("Nodes cannot be empty (except for an entirely empty MST)")] 27 EmptyNode, 28 + #[error("Expected node to be at depth {expected}, but it was at {depth}")] 29 + WrongDepth { depth: Depth, expected: Depth }, 30 #[error("MST depth underflow: depth-0 node with child trees")] 31 DepthUnderflow, 32 + #[error("Encountered rkey {rkey:?} which cannot follow the previous: {prev:?}")] 33 + RkeyOutOfOrder { prev: String, rkey: String }, 34 } 35 36 /// Walker outputs 37 + #[derive(Debug, PartialEq)] 38 + pub struct Output { 39 + pub rkey: String, 40 + pub cid: Cid, 41 + pub data: Bytes, 42 } 43 44 /// Traverser of an atproto MST ··· 46 /// Walks the tree from left-to-right in depth-first order 47 #[derive(Debug)] 48 pub struct Walker { 49 + prev_rkey: String, 50 + root_depth: Depth, 51 + todo: Vec<Vec<NodeThing>>, 52 } 53 54 impl Walker { 55 + pub fn new(root_node: MstNode) -> Self { 56 Self { 57 + prev_rkey: "".to_string(), 58 + root_depth: root_node.depth.unwrap_or(0), // empty root node = empty mst 59 + todo: vec![root_node.things], 60 } 61 } 62 63 + fn mpb_step( 64 &mut self, 65 + kind: ThingKind, 66 + cid: Cid, 67 + mpb: &MaybeProcessedBlock, 68 + process: impl Fn(Bytes) -> Bytes, 69 + ) -> Result<Option<Output>, WalkError> { 70 + match kind { 71 + ThingKind::Value { rkey } => { 72 + let data = match mpb { 73 + MaybeProcessedBlock::Raw(data) => process(data.clone()), 74 + MaybeProcessedBlock::Processed(t) => t.clone(), 75 + }; 76 77 + if rkey <= self.prev_rkey { 78 + return Err(WalkError::MstError(MstError::RkeyOutOfOrder { 79 + rkey, 80 + prev: self.prev_rkey.clone(), 81 + })); 82 + } 83 + self.prev_rkey = rkey.clone(); 84 85 + log::trace!("val @ {rkey}"); 86 + Ok(Some(Output { rkey, cid, data })) 87 + } 88 + ThingKind::Tree => { 89 + let MaybeProcessedBlock::Raw(data) = mpb else { 90 + return Err(WalkError::BadCommitFingerprint); 91 + }; 92 93 + let node: MstNode = 94 + serde_ipld_dagcbor::from_slice(data).map_err(WalkError::BadCommit)?; 95 96 + if node.is_empty() { 97 + return Err(WalkError::MstError(MstError::EmptyNode)); 98 } 99 100 + let current_depth = self.root_depth - (self.todo.len() - 1) as u32; 101 + let next_depth = current_depth 102 + .checked_sub(1) 103 + .ok_or(MstError::DepthUnderflow)?; 104 + if let Some(d) = node.depth 105 + && d != next_depth 106 + { 107 + return Err(WalkError::MstError(MstError::WrongDepth { 108 + depth: d, 109 + expected: next_depth, 110 + })); 111 } 112 + 113 + log::trace!("node into depth {next_depth}"); 114 + self.todo.push(node.things); 115 + Ok(None) 116 } 117 } 118 } 119 120 + #[inline(always)] 121 + fn next_todo(&mut self) -> Option<NodeThing> { 122 + while let Some(last) = self.todo.last_mut() { 123 + let Some(thing) = last.pop() else { 124 + self.todo.pop(); 125 + continue; 126 }; 127 + return Some(thing); 128 + } 129 + None 130 + } 131 132 + /// Advance through nodes until we find a record or can't go further 133 + pub fn step( 134 + &mut self, 135 + blocks: &mut HashMap<Cid, MaybeProcessedBlock>, 136 + process: impl Fn(Bytes) -> Bytes, 137 + ) -> Result<Option<Output>, WalkError> { 138 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 139 + let Some(mpb) = blocks.get(&cid) else { 140 + return Err(WalkError::MissingBlock(cid)); 141 + }; 142 + if let Some(out) = self.mpb_step(kind, cid, mpb, &process)? { 143 + return Ok(Some(out)); 144 } 145 } 146 + Ok(None) 147 } 148 149 + /// blocking!!!!!! 150 + pub fn disk_step( 151 + &mut self, 152 + blocks: &mut DiskStore, 153 + process: impl Fn(Bytes) -> Bytes, 154 + ) -> Result<Option<Output>, WalkError> { 155 + while let Some(NodeThing { cid, kind }) = self.next_todo() { 156 + let Some(block_slice) = blocks.get(&cid.to_bytes())? else { 157 + return Err(WalkError::MissingBlock(cid)); 158 + }; 159 + let mpb = MaybeProcessedBlock::from_bytes(block_slice.to_vec()); 160 + if let Some(out) = self.mpb_step(kind, cid, &mpb, &process)? { 161 + return Ok(Some(out)); 162 + } 163 } 164 + Ok(None) 165 } 166 }
+212
tests/mst-depth.rs
···
··· 1 + // use repo_stream::Driver; 2 + use repo_stream::mst::atproto_mst_depth; 3 + 4 + // https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/example_keys.txt 5 + const INTEROP_EXAMPLE_KEYS: &str = "\ 6 + A0/374913 7 + A1/076595 8 + A2/827942 9 + A3/578971 10 + A4/055903 11 + A5/518415 12 + B0/601692 13 + B1/986427 14 + B2/827649 15 + B3/095483 16 + B4/774183 17 + B5/116729 18 + C0/451630 19 + C1/438573 20 + C2/014073 21 + C3/564755 22 + C4/134079 23 + C5/141153 24 + D0/952776 25 + D1/834852 26 + D2/269196 27 + D3/038750 28 + D4/052059 29 + D5/563177 30 + E0/670489 31 + E1/091396 32 + E2/819540 33 + E3/391311 34 + E4/820614 35 + E5/512478 36 + F0/697858 37 + F1/085263 38 + F2/483591 39 + F3/409933 40 + F4/789697 41 + F5/271416 42 + G0/765327 43 + G1/209912 44 + G2/611528 45 + G3/649394 46 + G4/585887 47 + G5/298495 48 + H0/131238 49 + H1/566929 50 + H2/618272 51 + H3/500151 52 + H4/841548 53 + H5/642354 54 + I0/536928 55 + I1/525517 56 + I2/800680 57 + I3/818503 58 + I4/561177 59 + I5/010047 60 + J0/453243 61 + J1/217783 62 + J2/960389 63 + J3/501274 64 + J4/042054 65 + J5/743154 66 + K0/125271 67 + K1/317361 68 + K2/453868 69 + K3/214010 70 + K4/164720 71 + K5/177856 72 + L0/502889 73 + L1/574576 74 + L2/596333 75 + L3/683657 76 + L4/724989 77 + L5/093883 78 + M0/141744 79 + M1/643368 80 + M2/919782 81 + M3/836327 82 + M4/177463 83 + M5/563354 84 + N0/370604 85 + N1/563732 86 + N2/177587 87 + N3/678428 88 + N4/599183 89 + N5/567564 90 + O0/523870 91 + O1/052141 92 + O2/037651 93 + O3/773808 94 + O4/140952 95 + O5/318605 96 + P0/133157 97 + P1/394633 98 + P2/521462 99 + P3/493488 100 + P4/908754 101 + P5/109455 102 + Q0/835234 103 + Q1/131542 104 + Q2/680035 105 + Q3/253381 106 + Q4/019053 107 + Q5/658167 108 + R0/129386 109 + R1/363149 110 + R2/742766 111 + R3/039235 112 + R4/482275 113 + R5/817312 114 + S0/340283 115 + S1/561525 116 + S2/914574 117 + S3/909434 118 + S4/789708 119 + S5/803866 120 + T0/255204 121 + T1/716687 122 + T2/256231 123 + T3/054247 124 + T4/419247 125 + T5/509584 126 + U0/298296 127 + U1/851680 128 + U2/342856 129 + U3/597327 130 + U4/311686 131 + U5/030156 132 + V0/221100 133 + V1/741554 134 + V2/267990 135 + V3/674163 136 + V4/739931 137 + V5/573718 138 + W0/034202 139 + W1/697411 140 + W2/460313 141 + W3/189647 142 + W4/847299 143 + W5/648086 144 + X0/287498 145 + X1/044093 146 + X2/613770 147 + X3/577587 148 + X4/779391 149 + X5/339246 150 + Y0/986350 151 + Y1/044567 152 + Y2/478044 153 + Y3/757097 154 + Y4/396913 155 + Y5/802264 156 + Z0/425878 157 + Z1/127557 158 + Z2/441927 159 + Z3/064474 160 + Z4/888344 161 + Z5/977983"; 162 + 163 + #[test] 164 + fn test_interop_example_keys() { 165 + for key in INTEROP_EXAMPLE_KEYS.split('\n') { 166 + let expected: u32 = key.chars().nth(1).unwrap().to_digit(16).unwrap(); 167 + let computed: u32 = atproto_mst_depth(key); 168 + assert_eq!(computed, expected); 169 + } 170 + } 171 + 172 + #[test] 173 + fn test_iterop_key_heights() { 174 + // examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json 175 + for (key, expected) in [ 176 + ("", 0), 177 + ("asdf", 0), 178 + ("blue", 1), 179 + ("2653ae71", 0), 180 + ("88bfafc7", 2), 181 + ("2a92d355", 4), 182 + ("884976f5", 6), 183 + ("app.bsky.feed.post/454397e440ec", 4), 184 + ("app.bsky.feed.post/9adeb165882c", 8), 185 + ] { 186 + let computed = atproto_mst_depth(key); 187 + assert_eq!(computed, expected); 188 + } 189 + } 190 + 191 + #[test] 192 + fn test_spec_example_keys() { 193 + // https://atproto.com/specs/repository#mst-structure 194 + for (key, expected) in [ 195 + ("2653ae71", 0), 196 + ("blue", 1), 197 + ("app.bsky.feed.post/454397e440ec", 4), 198 + ("app.bsky.feed.post/9adeb165882c", 8), 199 + ] { 200 + let computed = atproto_mst_depth(key); 201 + assert_eq!(computed, expected); 202 + } 203 + } 204 + 205 + #[test] 206 + fn test_ietf_example_keys() { 207 + // https://atproto.com/specs/repository#mst-structure 208 + for (key, expected) in [("key1", 0), ("key7", 1), ("key515", 4)] { 209 + let computed = atproto_mst_depth(key); 210 + assert_eq!(computed, expected); 211 + } 212 + }
+29 -9
tests/non-huge-cars.rs
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 4 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 5 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 6 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 7 8 - async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) { 9 - let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */) 10 - .await 11 - .unwrap() 12 { 13 Driver::Memory(_commit, mem_driver) => mem_driver, 14 Driver::Disk(_) => panic!("too big"), ··· 20 let mut prev_rkey = "".to_string(); 21 22 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 23 - for (rkey, size) in pairs { 24 records += 1; 25 sum += size; 26 if rkey == "app.bsky.actor.profile/self" { 27 found_bsky_profile = true; ··· 33 34 assert_eq!(records, expected_records); 35 assert_eq!(sum, expected_sum); 36 - assert!(found_bsky_profile); 37 } 38 39 #[tokio::test] 40 async fn test_tiny_car() { 41 - test_car(TINY_CAR, 8, 2071).await 42 } 43 44 #[tokio::test] 45 async fn test_little_car() { 46 - test_car(LITTLE_CAR, 278, 246960).await 47 } 48 49 #[tokio::test] 50 async fn test_midsize_car() { 51 - test_car(MIDSIZE_CAR, 11585, 3741393).await 52 }
··· 1 extern crate repo_stream; 2 use repo_stream::Driver; 3 + use repo_stream::Output; 4 5 + const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car"); 6 const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car"); 7 const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car"); 8 const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car"); 9 10 + async fn test_car( 11 + bytes: &[u8], 12 + expected_records: usize, 13 + expected_sum: usize, 14 + expect_profile: bool, 15 + ) { 16 + let mut driver = match Driver::load_car( 17 + bytes, 18 + |block| block.len().to_ne_bytes().to_vec().into(), 19 + 10, /* MiB */ 20 + ) 21 + .await 22 + .unwrap() 23 { 24 Driver::Memory(_commit, mem_driver) => mem_driver, 25 Driver::Disk(_) => panic!("too big"), ··· 31 let mut prev_rkey = "".to_string(); 32 33 while let Some(pairs) = driver.next_chunk(256).await.unwrap() { 34 + for Output { rkey, cid: _, data } in pairs { 35 records += 1; 36 + 37 + let (int_bytes, _) = data.split_at(size_of::<usize>()); 38 + let size = usize::from_ne_bytes(int_bytes.try_into().unwrap()); 39 + 40 sum += size; 41 if rkey == "app.bsky.actor.profile/self" { 42 found_bsky_profile = true; ··· 48 49 assert_eq!(records, expected_records); 50 assert_eq!(sum, expected_sum); 51 + assert_eq!(found_bsky_profile, expect_profile); 52 + } 53 + 54 + #[tokio::test] 55 + async fn test_empty_car() { 56 + test_car(EMPTY_CAR, 0, 0, false).await 57 } 58 59 #[tokio::test] 60 async fn test_tiny_car() { 61 + test_car(TINY_CAR, 8, 2071, true).await 62 } 63 64 #[tokio::test] 65 async fn test_little_car() { 66 + test_car(LITTLE_CAR, 278, 246960, true).await 67 } 68 69 #[tokio::test] 70 async fn test_midsize_car() { 71 + test_car(MIDSIZE_CAR, 11585, 3741393, true).await 72 }