Fast and robust atproto CAR file processing in rust

Compare changes

Choose any two refs to compare.

Changed files
+217 -224
examples
disk-read-file
src
+169 -60
Cargo.lock
··· 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 169 [[package]] 170 name = "bytes" 171 version = "1.10.1" 172 source = "registry+https://github.com/rust-lang/crates.io-index" 173 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 174 175 [[package]] 176 name = "cast" ··· 281 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 282 283 [[package]] 284 name = "const-str" 285 version = "0.4.3" 286 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 358 ] 359 360 [[package]] 361 name = "crossbeam-utils" 362 version = "0.8.21" 363 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 380 ] 381 382 [[package]] 383 name = "data-encoding" 384 version = "2.9.0" 385 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 422 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 423 424 [[package]] 425 name = "env_filter" 426 version = "0.1.3" 427 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 445 ] 446 447 [[package]] 448 name = "errno" 449 version = "0.3.14" 450 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 455 ] 456 457 [[package]] 458 - name = "fallible-iterator" 459 - version = "0.3.0" 460 source = "registry+https://github.com/rust-lang/crates.io-index" 461 - checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649" 462 463 [[package]] 464 - name = "fallible-streaming-iterator" 465 - version = "0.1.9" 466 source = "registry+https://github.com/rust-lang/crates.io-index" 467 - checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a" 468 469 [[package]] 470 - name = "fastrand" 471 - version = "2.3.0" 472 source = "registry+https://github.com/rust-lang/crates.io-index" 473 - checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 474 - 475 - [[package]] 476 - name = "foldhash" 477 - version = "0.1.5" 478 - source = "registry+https://github.com/rust-lang/crates.io-index" 479 - checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2" 480 481 [[package]] 482 name = "futures" ··· 608 609 [[package]] 610 name = "hashbrown" 611 - version = "0.15.5" 612 source = "registry+https://github.com/rust-lang/crates.io-index" 613 - checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1" 614 - dependencies = [ 615 - "foldhash", 616 - ] 617 618 [[package]] 619 - name = "hashlink" 620 - version = "0.10.0" 621 source = "registry+https://github.com/rust-lang/crates.io-index" 622 - checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1" 623 - dependencies = [ 624 - "hashbrown", 625 - ] 626 627 [[package]] 628 name = "heck" ··· 631 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 632 633 [[package]] 634 name = "io-uring" 635 version = "0.7.10" 636 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 730 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 731 732 [[package]] 733 - name = "libsqlite3-sys" 734 - version = "0.35.0" 735 - source = "registry+https://github.com/rust-lang/crates.io-index" 736 - checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f" 737 - dependencies = [ 738 - "pkg-config", 739 - "vcpkg", 740 - ] 741 - 742 - [[package]] 743 name = "linux-raw-sys" 744 version = "0.11.0" 745 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 761 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 762 763 [[package]] 764 name = "match-lookup" 765 version = "0.1.1" 766 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 892 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 893 894 [[package]] 895 - name = "pkg-config" 896 - version = "0.3.32" 897 - source = "registry+https://github.com/rust-lang/crates.io-index" 898 - checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c" 899 - 900 - [[package]] 901 name = "plotters" 902 version = "0.3.7" 903 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 947 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 948 dependencies = [ 949 "unicode-ident", 950 ] 951 952 [[package]] ··· 1030 "clap", 1031 "criterion", 1032 "env_logger", 1033 "futures", 1034 "futures-core", 1035 "ipld-core", 1036 "iroh-car", 1037 "log", 1038 "multibase", 1039 - "rusqlite", 1040 "serde", 1041 "serde_bytes", 1042 "serde_ipld_dagcbor", ··· 1047 ] 1048 1049 [[package]] 1050 - name = "rusqlite" 1051 - version = "0.37.0" 1052 source = "registry+https://github.com/rust-lang/crates.io-index" 1053 - checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f" 1054 - dependencies = [ 1055 - "bitflags", 1056 - "fallible-iterator", 1057 - "fallible-streaming-iterator", 1058 - "hashlink", 1059 - "libsqlite3-sys", 1060 - "smallvec", 1061 - ] 1062 1063 [[package]] 1064 - name = "rustc-demangle" 1065 - version = "0.1.26" 1066 source = "registry+https://github.com/rust-lang/crates.io-index" 1067 - checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1068 1069 [[package]] 1070 name = "rustix" ··· 1105 version = "1.2.0" 1106 source = "registry+https://github.com/rust-lang/crates.io-index" 1107 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1108 1109 [[package]] 1110 name = "serde" ··· 1172 ] 1173 1174 [[package]] 1175 name = "sha2" 1176 version = "0.10.9" 1177 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1211 dependencies = [ 1212 "libc", 1213 "windows-sys 0.59.0", 1214 ] 1215 1216 [[package]] ··· 1372 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1373 1374 [[package]] 1375 - name = "vcpkg" 1376 - version = "0.2.15" 1377 source = "registry+https://github.com/rust-lang/crates.io-index" 1378 - checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426" 1379 1380 [[package]] 1381 name = "version_check" ··· 1659 version = "0.46.0" 1660 source = "registry+https://github.com/rust-lang/crates.io-index" 1661 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1662 1663 [[package]] 1664 name = "zerocopy"
··· 167 checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43" 168 169 [[package]] 170 + name = "byteorder-lite" 171 + version = "0.1.0" 172 + source = "registry+https://github.com/rust-lang/crates.io-index" 173 + checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495" 174 + 175 + [[package]] 176 name = "bytes" 177 version = "1.10.1" 178 source = "registry+https://github.com/rust-lang/crates.io-index" 179 checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a" 180 + 181 + [[package]] 182 + name = "byteview" 183 + version = "0.10.0" 184 + source = "registry+https://github.com/rust-lang/crates.io-index" 185 + checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca" 186 187 [[package]] 188 name = "cast" ··· 293 checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75" 294 295 [[package]] 296 + name = "compare" 297 + version = "0.0.6" 298 + source = "registry+https://github.com/rust-lang/crates.io-index" 299 + checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7" 300 + 301 + [[package]] 302 name = "const-str" 303 version = "0.4.3" 304 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 376 ] 377 378 [[package]] 379 + name = "crossbeam-skiplist" 380 + version = "0.1.3" 381 + source = "registry+https://github.com/rust-lang/crates.io-index" 382 + checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b" 383 + dependencies = [ 384 + "crossbeam-epoch", 385 + "crossbeam-utils", 386 + ] 387 + 388 + [[package]] 389 name = "crossbeam-utils" 390 version = "0.8.21" 391 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 408 ] 409 410 [[package]] 411 + name = "dashmap" 412 + version = "6.1.0" 413 + source = "registry+https://github.com/rust-lang/crates.io-index" 414 + checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf" 415 + dependencies = [ 416 + "cfg-if", 417 + "crossbeam-utils", 418 + "hashbrown 0.14.5", 419 + "lock_api", 420 + "once_cell", 421 + "parking_lot_core", 422 + ] 423 + 424 + [[package]] 425 name = "data-encoding" 426 version = "2.9.0" 427 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 464 checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719" 465 466 [[package]] 467 + name = "enum_dispatch" 468 + version = "0.3.13" 469 + source = "registry+https://github.com/rust-lang/crates.io-index" 470 + checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd" 471 + dependencies = [ 472 + "once_cell", 473 + "proc-macro2", 474 + "quote", 475 + "syn 2.0.106", 476 + ] 477 + 478 + [[package]] 479 name = "env_filter" 480 version = "0.1.3" 481 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 499 ] 500 501 [[package]] 502 + name = "equivalent" 503 + version = "1.0.2" 504 + source = "registry+https://github.com/rust-lang/crates.io-index" 505 + checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f" 506 + 507 + [[package]] 508 name = "errno" 509 version = "0.3.14" 510 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 515 ] 516 517 [[package]] 518 + name = "fastrand" 519 + version = "2.3.0" 520 source = "registry+https://github.com/rust-lang/crates.io-index" 521 + checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be" 522 523 [[package]] 524 + name = "fjall" 525 + version = "3.0.1" 526 source = "registry+https://github.com/rust-lang/crates.io-index" 527 + checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093" 528 + dependencies = [ 529 + "byteorder-lite", 530 + "byteview", 531 + "dashmap", 532 + "flume", 533 + "log", 534 + "lsm-tree", 535 + "tempfile", 536 + "xxhash-rust", 537 + ] 538 539 [[package]] 540 + name = "flume" 541 + version = "0.12.0" 542 source = "registry+https://github.com/rust-lang/crates.io-index" 543 + checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be" 544 + dependencies = [ 545 + "spin", 546 + ] 547 548 [[package]] 549 name = "futures" ··· 675 676 [[package]] 677 name = "hashbrown" 678 + version = "0.14.5" 679 source = "registry+https://github.com/rust-lang/crates.io-index" 680 + checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1" 681 682 [[package]] 683 + name = "hashbrown" 684 + version = "0.16.1" 685 source = "registry+https://github.com/rust-lang/crates.io-index" 686 + checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100" 687 688 [[package]] 689 name = "heck" ··· 692 checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea" 693 694 [[package]] 695 + name = "interval-heap" 696 + version = "0.0.5" 697 + source = "registry+https://github.com/rust-lang/crates.io-index" 698 + checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6" 699 + dependencies = [ 700 + "compare", 701 + ] 702 + 703 + [[package]] 704 name = "io-uring" 705 version = "0.7.10" 706 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 800 checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174" 801 802 [[package]] 803 name = "linux-raw-sys" 804 version = "0.11.0" 805 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 821 checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432" 822 823 [[package]] 824 + name = "lsm-tree" 825 + version = "3.0.1" 826 + source = "registry+https://github.com/rust-lang/crates.io-index" 827 + checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb" 828 + dependencies = [ 829 + "byteorder-lite", 830 + "byteview", 831 + "crossbeam-skiplist", 832 + "enum_dispatch", 833 + "interval-heap", 834 + "log", 835 + "quick_cache", 836 + "rustc-hash", 837 + "self_cell", 838 + "sfa", 839 + "tempfile", 840 + "varint-rs", 841 + "xxhash-rust", 842 + ] 843 + 844 + [[package]] 845 name = "match-lookup" 846 version = "0.1.1" 847 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 973 checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" 974 975 [[package]] 976 name = "plotters" 977 version = "0.3.7" 978 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1022 checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de" 1023 dependencies = [ 1024 "unicode-ident", 1025 + ] 1026 + 1027 + [[package]] 1028 + name = "quick_cache" 1029 + version = "0.6.18" 1030 + source = "registry+https://github.com/rust-lang/crates.io-index" 1031 + checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3" 1032 + dependencies = [ 1033 + "equivalent", 1034 + "hashbrown 0.16.1", 1035 ] 1036 1037 [[package]] ··· 1115 "clap", 1116 "criterion", 1117 "env_logger", 1118 + "fjall", 1119 "futures", 1120 "futures-core", 1121 "ipld-core", 1122 "iroh-car", 1123 "log", 1124 "multibase", 1125 "serde", 1126 "serde_bytes", 1127 "serde_ipld_dagcbor", ··· 1132 ] 1133 1134 [[package]] 1135 + name = "rustc-demangle" 1136 + version = "0.1.26" 1137 source = "registry+https://github.com/rust-lang/crates.io-index" 1138 + checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace" 1139 1140 [[package]] 1141 + name = "rustc-hash" 1142 + version = "2.1.1" 1143 source = "registry+https://github.com/rust-lang/crates.io-index" 1144 + checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d" 1145 1146 [[package]] 1147 name = "rustix" ··· 1182 version = "1.2.0" 1183 source = "registry+https://github.com/rust-lang/crates.io-index" 1184 checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" 1185 + 1186 + [[package]] 1187 + name = "self_cell" 1188 + version = "1.2.2" 1189 + source = "registry+https://github.com/rust-lang/crates.io-index" 1190 + checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89" 1191 1192 [[package]] 1193 name = "serde" ··· 1255 ] 1256 1257 [[package]] 1258 + name = "sfa" 1259 + version = "1.0.0" 1260 + source = "registry+https://github.com/rust-lang/crates.io-index" 1261 + checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175" 1262 + dependencies = [ 1263 + "byteorder-lite", 1264 + "log", 1265 + "xxhash-rust", 1266 + ] 1267 + 1268 + [[package]] 1269 name = "sha2" 1270 version = "0.10.9" 1271 source = "registry+https://github.com/rust-lang/crates.io-index" ··· 1305 dependencies = [ 1306 "libc", 1307 "windows-sys 0.59.0", 1308 + ] 1309 + 1310 + [[package]] 1311 + name = "spin" 1312 + version = "0.9.8" 1313 + source = "registry+https://github.com/rust-lang/crates.io-index" 1314 + checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67" 1315 + dependencies = [ 1316 + "lock_api", 1317 ] 1318 1319 [[package]] ··· 1475 checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821" 1476 1477 [[package]] 1478 + name = "varint-rs" 1479 + version = "2.2.0" 1480 source = "registry+https://github.com/rust-lang/crates.io-index" 1481 + checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23" 1482 1483 [[package]] 1484 name = "version_check" ··· 1762 version = "0.46.0" 1763 source = "registry+https://github.com/rust-lang/crates.io-index" 1764 checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59" 1765 + 1766 + [[package]] 1767 + name = "xxhash-rust" 1768 + version = "0.8.15" 1769 + source = "registry+https://github.com/rust-lang/crates.io-index" 1770 + checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3" 1771 1772 [[package]] 1773 name = "zerocopy"
+1 -1
Cargo.toml
··· 8 9 [dependencies] 10 bincode = { version = "2.0.1", features = ["serde"] } 11 futures = "0.3.31" 12 futures-core = "0.3.31" 13 ipld-core = { version = "0.4.2", features = ["serde"] } 14 iroh-car = "0.5.1" 15 log = "0.4.28" 16 multibase = "0.9.2" 17 - rusqlite = "0.37.0" 18 serde = { version = "1.0.228", features = ["derive"] } 19 serde_bytes = "0.11.19" 20 serde_ipld_dagcbor = "0.6.4"
··· 8 9 [dependencies] 10 bincode = { version = "2.0.1", features = ["serde"] } 11 + fjall = { version = "3.0.1", default-features = false } 12 futures = "0.3.31" 13 futures-core = "0.3.31" 14 ipld-core = { version = "0.4.2", features = ["serde"] } 15 iroh-car = "0.5.1" 16 log = "0.4.28" 17 multibase = "0.9.2" 18 serde = { version = "1.0.228", features = ["derive"] } 19 serde_bytes = "0.11.19" 20 serde_ipld_dagcbor = "0.6.4"
+2 -5
examples/disk-read-file/main.rs
··· 33 // in this example we only bother handling CARs that are too big for memory 34 // `noop` helper means: do no block processing, store the raw blocks 35 let driver = match DriverBuilder::new() 36 - .with_mem_limit_mb(10) // how much memory can be used before disk spill 37 .load_car(reader) 38 .await? 39 { ··· 82 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 85 - // clean up the database. would be nice to do this in drop so it happens 86 - // automatically, but some blocking work happens, so that's not allowed in 87 - // async rust. ๐Ÿคทโ€โ™€๏ธ 88 - join.await?.reset_store().await?; 89 90 log::info!("done. n={n} zeros={zeros}"); 91
··· 33 // in this example we only bother handling CARs that are too big for memory 34 // `noop` helper means: do no block processing, store the raw blocks 35 let driver = match DriverBuilder::new() 36 + .with_mem_limit_mb(32) // how much memory can be used before disk spill 37 .load_car(reader) 38 .await? 39 { ··· 82 83 log::info!("arrived! ({:?}) joining rx...", t0.elapsed()); 84 85 + join.await?; 86 87 log::info!("done. n={n} zeros={zeros}"); 88
-3
readme.md
··· 50 total_size += size; 51 } 52 } 53 - 54 - // clean up the disk store (drop tables etc) 55 - driver.reset_store().await?; 56 } 57 }; 58 println!("sum of size of all records: {total_size}");
··· 50 total_size += size; 51 } 52 } 53 } 54 }; 55 println!("sum of size of all records: {total_size}");
+36 -95
src/disk.rs
··· 18 */ 19 20 use crate::drive::DriveError; 21 - use rusqlite::OptionalExtension; 22 use std::path::PathBuf; 23 24 #[derive(Debug, thiserror::Error)] ··· 28 /// (The wrapped err should probably be obscured to remove public-facing 29 /// sqlite bits) 30 #[error(transparent)] 31 - DbError(#[from] rusqlite::Error), 32 /// A tokio blocking task failed to join 33 #[error("Failed to join a tokio blocking task: {0}")] 34 JoinError(#[from] tokio::task::JoinError), ··· 38 /// limit. 39 #[error("Maximum disk size reached")] 40 MaxSizeExceeded, 41 - #[error("this error was replaced, seeing this is a bug.")] 42 - #[doc(hidden)] 43 - Stolen, 44 - } 45 - 46 - impl DiskError { 47 - /// hack for ownership challenges with the disk driver 48 - pub(crate) fn steal(&mut self) -> Self { 49 - let mut swapped = DiskError::Stolen; 50 - std::mem::swap(self, &mut swapped); 51 - swapped 52 - } 53 } 54 55 /// Builder-style disk store setup ··· 71 impl Default for DiskBuilder { 72 fn default() -> Self { 73 Self { 74 - cache_size_mb: 32, 75 max_stored_mb: 10 * 1024, // 10 GiB 76 } 77 } ··· 84 } 85 /// Set the in-memory cache allowance for the database 86 /// 87 - /// Default: 32 MiB 88 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 89 self.cache_size_mb = size; 90 self ··· 104 105 /// On-disk block storage 106 pub struct DiskStore { 107 - conn: rusqlite::Connection, 108 max_stored: usize, 109 stored: usize, 110 } ··· 117 max_stored_mb: usize, 118 ) -> Result<Self, DiskError> { 119 let max_stored = max_stored_mb * 2_usize.pow(20); 120 - let conn = tokio::task::spawn_blocking(move || { 121 - let conn = rusqlite::Connection::open(path)?; 122 - 123 - let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size 124 - 125 - // conn.pragma_update(None, "journal_mode", "OFF")?; 126 - // conn.pragma_update(None, "journal_mode", "MEMORY")?; 127 - conn.pragma_update(None, "journal_mode", "WAL")?; 128 - // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk 129 - conn.pragma_update(None, "synchronous", "OFF")?; 130 - conn.pragma_update( 131 - None, 132 - "cache_size", 133 - (cache_mb as i64 * sqlite_one_mb).to_string(), 134 - )?; 135 - Self::reset_tables(&conn)?; 136 137 - Ok::<_, DiskError>(conn) 138 }) 139 .await??; 140 141 Ok(Self { 142 - conn, 143 max_stored, 144 stored: 0, 145 }) 146 } 147 - pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> { 148 - let tx = self.conn.transaction()?; 149 - Ok(SqliteWriter { 150 - tx, 151 - stored: &mut self.stored, 152 - max: self.max_stored, 153 - }) 154 - } 155 - pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> { 156 - let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?; 157 - Ok(SqliteReader { select_stmt }) 158 - } 159 - /// Drop and recreate the kv table 160 - pub async fn reset(self) -> Result<Self, DiskError> { 161 - tokio::task::spawn_blocking(move || { 162 - Self::reset_tables(&self.conn)?; 163 - Ok(self) 164 - }) 165 - .await? 166 - } 167 - fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> { 168 - conn.execute("DROP TABLE IF EXISTS blocks", ())?; 169 - conn.execute( 170 - "CREATE TABLE blocks ( 171 - key BLOB PRIMARY KEY NOT NULL, 172 - val BLOB NOT NULL 173 - ) WITHOUT ROWID", 174 - (), 175 - )?; 176 - Ok(()) 177 - } 178 - } 179 180 - pub(crate) struct SqliteWriter<'conn> { 181 - tx: rusqlite::Transaction<'conn>, 182 - stored: &'conn mut usize, 183 - max: usize, 184 - } 185 - 186 - impl SqliteWriter<'_> { 187 pub(crate) fn put_many( 188 &mut self, 189 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 190 ) -> Result<(), DriveError> { 191 - let mut insert_stmt = self 192 - .tx 193 - .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)") 194 - .map_err(DiskError::DbError)?; 195 for pair in kv { 196 let (k, v) = pair?; 197 - *self.stored += v.len(); 198 - if *self.stored > self.max { 199 return Err(DiskError::MaxSizeExceeded.into()); 200 } 201 - insert_stmt.execute((k, v)).map_err(DiskError::DbError)?; 202 } 203 - Ok(()) 204 - } 205 - pub fn commit(self) -> Result<(), DiskError> { 206 - self.tx.commit()?; 207 Ok(()) 208 } 209 - } 210 - 211 - pub(crate) struct SqliteReader<'conn> { 212 - select_stmt: rusqlite::Statement<'conn>, 213 - } 214 215 - impl SqliteReader<'_> { 216 - pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> { 217 - self.select_stmt 218 - .query_one((&key,), |row| row.get(0)) 219 - .optional() 220 } 221 }
··· 18 */ 19 20 use crate::drive::DriveError; 21 + use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy}; 22 + use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions}; 23 use std::path::PathBuf; 24 25 #[derive(Debug, thiserror::Error)] ··· 29 /// (The wrapped err should probably be obscured to remove public-facing 30 /// sqlite bits) 31 #[error(transparent)] 32 + DbError(#[from] FjallError), 33 /// A tokio blocking task failed to join 34 #[error("Failed to join a tokio blocking task: {0}")] 35 JoinError(#[from] tokio::task::JoinError), ··· 39 /// limit. 40 #[error("Maximum disk size reached")] 41 MaxSizeExceeded, 42 } 43 44 /// Builder-style disk store setup ··· 60 impl Default for DiskBuilder { 61 fn default() -> Self { 62 Self { 63 + cache_size_mb: 64, 64 max_stored_mb: 10 * 1024, // 10 GiB 65 } 66 } ··· 73 } 74 /// Set the in-memory cache allowance for the database 75 /// 76 + /// Default: 64 MiB 77 pub fn with_cache_size_mb(mut self, size: usize) -> Self { 78 self.cache_size_mb = size; 79 self ··· 93 94 /// On-disk block storage 95 pub struct DiskStore { 96 + #[allow(unused)] 97 + db: Database, 98 + partition: Keyspace, 99 max_stored: usize, 100 stored: usize, 101 } ··· 108 max_stored_mb: usize, 109 ) -> Result<Self, DiskError> { 110 let max_stored = max_stored_mb * 2_usize.pow(20); 111 + let (db, partition) = tokio::task::spawn_blocking(move || { 112 + let db = Database::builder(path) 113 + // .manual_journal_persist(true) 114 + // .flush_workers(1) 115 + // .compaction_workers(1) 116 + .journal_compression(CompressionType::None) 117 + .cache_size(cache_mb as u64 * 2_u64.pow(20)) 118 + .temporary(true) 119 + .open()?; 120 + let opts = KeyspaceCreateOptions::default() 121 + .data_block_restart_interval_policy(RestartIntervalPolicy::all(8)) 122 + .filter_block_pinning_policy(PinningPolicy::disabled()) 123 + .expect_point_read_hits(true) 124 + .data_block_compression_policy(CompressionPolicy::disabled()) 125 + .manual_journal_persist(true) 126 + .max_memtable_size(32 * 2_u64.pow(20)); 127 + let partition = db.keyspace("z", || opts)?; 128 129 + Ok::<_, DiskError>((db, partition)) 130 }) 131 .await??; 132 133 Ok(Self { 134 + db, 135 + partition, 136 max_stored, 137 stored: 0, 138 }) 139 } 140 141 pub(crate) fn put_many( 142 &mut self, 143 kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>, 144 ) -> Result<(), DriveError> { 145 + let mut batch = self.db.batch(); 146 for pair in kv { 147 let (k, v) = pair?; 148 + self.stored += v.len(); 149 + if self.stored > self.max_stored { 150 return Err(DiskError::MaxSizeExceeded.into()); 151 } 152 + batch.insert(&self.partition, k, v); 153 } 154 + batch.commit().map_err(DiskError::DbError)?; 155 Ok(()) 156 } 157 158 + #[inline] 159 + pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> { 160 + self.partition.get(key) 161 } 162 }
+4 -52
src/drive.rs
··· 335 // move store in and back out so we can manage lifetimes 336 // dump mem blocks into the store 337 store = tokio::task::spawn(async move { 338 - let mut writer = store.get_writer()?; 339 - 340 let kvs = self 341 .mem_blocks 342 .into_iter() 343 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 344 345 - writer.put_many(kvs)?; 346 - writer.commit()?; 347 Ok::<_, DriveError>(store) 348 }) 349 .await??; ··· 351 let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 352 353 let store_worker = tokio::task::spawn_blocking(move || { 354 - let mut writer = store.get_writer()?; 355 - 356 while let Some(chunk) = rx.blocking_recv() { 357 let kvs = chunk 358 .into_iter() 359 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 360 - writer.put_many(kvs)?; 361 } 362 - 363 - writer.commit()?; 364 Ok::<_, DriveError>(store) 365 }); // await later 366 ··· 453 /// println!("{rkey}: size={}", record.len()); 454 /// } 455 /// } 456 - /// let store = disk_driver.reset_store().await?; 457 /// # Ok(()) 458 /// # } 459 /// ``` ··· 468 // comes out again. 469 let (state, res) = tokio::task::spawn_blocking( 470 move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 471 - let mut reader_res = state.store.get_reader(); 472 - let reader: &mut _ = match reader_res { 473 - Ok(ref mut r) => r, 474 - Err(ref mut e) => { 475 - // unfortunately we can't return the error directly because 476 - // (for some reason) it's attached to the lifetime of the 477 - // reader? 478 - // hack a mem::swap so we can get it out :/ 479 - let e_swapped = e.steal(); 480 - // the pain: `state` *has to* outlive the reader 481 - drop(reader_res); 482 - return (state, Err(e_swapped.into())); 483 - } 484 - }; 485 - 486 let mut out = Vec::with_capacity(n); 487 488 for _ in 0..n { 489 // walk as far as we can until we run out of blocks or find a record 490 - let step = match state.walker.disk_step(reader, process) { 491 Ok(s) => s, 492 Err(e) => { 493 - // the pain: `state` *has to* outlive the reader 494 - drop(reader_res); 495 return (state, Err(e.into())); 496 } 497 }; 498 match step { 499 Step::Missing(cid) => { 500 - // the pain: `state` *has to* outlive the reader 501 - drop(reader_res); 502 return (state, Err(DriveError::MissingBlock(cid))); 503 } 504 Step::Finish => break, ··· 506 }; 507 } 508 509 - // `state` *has to* outlive the reader 510 - drop(reader_res); 511 - 512 (state, Ok::<_, DriveError>(out)) 513 }, 514 ) ··· 532 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 533 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 534 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 535 - let mut reader = match store.get_reader() { 536 - Ok(r) => r, 537 - Err(e) => return tx.blocking_send(Err(e.into())), 538 - }; 539 540 loop { 541 let mut out: BlockChunk<T> = Vec::with_capacity(n); ··· 543 for _ in 0..n { 544 // walk as far as we can until we run out of blocks or find a record 545 546 - let step = match walker.disk_step(&mut reader, self.process) { 547 Ok(s) => s, 548 Err(e) => return tx.blocking_send(Err(e.into())), 549 }; ··· 592 /// } 593 /// 594 /// } 595 - /// let store = join.await?.reset_store().await?; 596 /// # Ok(()) 597 /// # } 598 /// ``` ··· 614 }); 615 616 (rx, chan_task) 617 - } 618 - 619 - /// Reset the disk storage so it can be reused. You must call this. 620 - /// 621 - /// Ideally we'd put this in an `impl Drop`, but since it makes blocking 622 - /// calls, that would be risky in an async context. For now you just have to 623 - /// carefully make sure you call it. 624 - /// 625 - /// The sqlite store is returned, so it can be reused for another 626 - /// `DiskDriver`. 627 - pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> { 628 - let BigState { store, .. } = self.state.take().expect("valid state"); 629 - Ok(store.reset().await?) 630 } 631 }
··· 335 // move store in and back out so we can manage lifetimes 336 // dump mem blocks into the store 337 store = tokio::task::spawn(async move { 338 let kvs = self 339 .mem_blocks 340 .into_iter() 341 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 342 343 + store.put_many(kvs)?; 344 Ok::<_, DriveError>(store) 345 }) 346 .await??; ··· 348 let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1); 349 350 let store_worker = tokio::task::spawn_blocking(move || { 351 while let Some(chunk) = rx.blocking_recv() { 352 let kvs = chunk 353 .into_iter() 354 .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?)); 355 + store.put_many(kvs)?; 356 } 357 Ok::<_, DriveError>(store) 358 }); // await later 359 ··· 446 /// println!("{rkey}: size={}", record.len()); 447 /// } 448 /// } 449 /// # Ok(()) 450 /// # } 451 /// ``` ··· 460 // comes out again. 461 let (state, res) = tokio::task::spawn_blocking( 462 move || -> (BigState, Result<BlockChunk<T>, DriveError>) { 463 let mut out = Vec::with_capacity(n); 464 465 for _ in 0..n { 466 // walk as far as we can until we run out of blocks or find a record 467 + let step = match state.walker.disk_step(&mut state.store, process) { 468 Ok(s) => s, 469 Err(e) => { 470 return (state, Err(e.into())); 471 } 472 }; 473 match step { 474 Step::Missing(cid) => { 475 return (state, Err(DriveError::MissingBlock(cid))); 476 } 477 Step::Finish => break, ··· 479 }; 480 } 481 482 (state, Ok::<_, DriveError>(out)) 483 }, 484 ) ··· 502 tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>, 503 ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> { 504 let BigState { store, walker } = self.state.as_mut().expect("valid state"); 505 506 loop { 507 let mut out: BlockChunk<T> = Vec::with_capacity(n); ··· 509 for _ in 0..n { 510 // walk as far as we can until we run out of blocks or find a record 511 512 + let step = match walker.disk_step(store, self.process) { 513 Ok(s) => s, 514 Err(e) => return tx.blocking_send(Err(e.into())), 515 }; ··· 558 /// } 559 /// 560 /// } 561 /// # Ok(()) 562 /// # } 563 /// ``` ··· 579 }); 580 581 (rx, chan_task) 582 } 583 }
-3
src/lib.rs
··· 53 total_size += size; 54 } 55 } 56 - 57 - // clean up the disk store (drop tables etc) 58 - driver.reset_store().await?; 59 } 60 }; 61 println!("sum of size of all records: {total_size}");
··· 53 total_size += size; 54 } 55 } 56 } 57 }; 58 println!("sum of size of all records: {total_size}");
+5 -5
src/walk.rs
··· 1 //! Depth-first MST traversal 2 3 - use crate::disk::SqliteReader; 4 use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 use crate::mst::Node; 6 use crate::process::Processable; ··· 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 - StorageError(#[from] rusqlite::Error), 23 #[error("Decode error: {0}")] 24 DecodeError(#[from] DecodeError), 25 } ··· 239 /// blocking!!!!!! 240 pub fn disk_step<T: Processable>( 241 &mut self, 242 - reader: &mut SqliteReader, 243 process: impl Fn(Vec<u8>) -> T, 244 ) -> Result<Step<T>, WalkError> { 245 loop { ··· 252 &mut Need::Node { depth, cid } => { 253 let cid_bytes = cid.to_bytes(); 254 log::trace!("need node {cid:?}"); 255 - let Some(block_bytes) = reader.get(cid_bytes)? else { 256 log::trace!("node not found, resting"); 257 return Ok(Step::Missing(cid)); 258 }; ··· 274 Need::Record { rkey, cid } => { 275 log::trace!("need record {cid:?}"); 276 let cid_bytes = cid.to_bytes(); 277 - let Some(data_bytes) = reader.get(cid_bytes)? else { 278 log::trace!("record block not found, resting"); 279 return Ok(Step::Missing(*cid)); 280 };
··· 1 //! Depth-first MST traversal 2 3 + use crate::disk::DiskStore; 4 use crate::drive::{DecodeError, MaybeProcessedBlock}; 5 use crate::mst::Node; 6 use crate::process::Processable; ··· 19 #[error("Action node error: {0}")] 20 MstError(#[from] MstError), 21 #[error("storage error: {0}")] 22 + StorageError(#[from] fjall::Error), 23 #[error("Decode error: {0}")] 24 DecodeError(#[from] DecodeError), 25 } ··· 239 /// blocking!!!!!! 240 pub fn disk_step<T: Processable>( 241 &mut self, 242 + reader: &mut DiskStore, 243 process: impl Fn(Vec<u8>) -> T, 244 ) -> Result<Step<T>, WalkError> { 245 loop { ··· 252 &mut Need::Node { depth, cid } => { 253 let cid_bytes = cid.to_bytes(); 254 log::trace!("need node {cid:?}"); 255 + let Some(block_bytes) = reader.get(&cid_bytes)? else { 256 log::trace!("node not found, resting"); 257 return Ok(Step::Missing(cid)); 258 }; ··· 274 Need::Record { rkey, cid } => { 275 log::trace!("need record {cid:?}"); 276 let cid_bytes = cid.to_bytes(); 277 + let Some(data_bytes) = reader.get(&cid_bytes)? else { 278 log::trace!("record block not found, resting"); 279 return Ok(Step::Missing(*cid)); 280 };