+169
-60
Cargo.lock
+169
-60
Cargo.lock
···
167
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
168
169
[[package]]
170
name = "bytes"
171
version = "1.10.1"
172
source = "registry+https://github.com/rust-lang/crates.io-index"
173
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
174
175
[[package]]
176
name = "cast"
···
281
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
282
283
[[package]]
284
name = "const-str"
285
version = "0.4.3"
286
source = "registry+https://github.com/rust-lang/crates.io-index"
···
358
]
359
360
[[package]]
361
name = "crossbeam-utils"
362
version = "0.8.21"
363
source = "registry+https://github.com/rust-lang/crates.io-index"
···
380
]
381
382
[[package]]
383
name = "data-encoding"
384
version = "2.9.0"
385
source = "registry+https://github.com/rust-lang/crates.io-index"
···
422
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
423
424
[[package]]
425
name = "env_filter"
426
version = "0.1.3"
427
source = "registry+https://github.com/rust-lang/crates.io-index"
···
445
]
446
447
[[package]]
448
name = "errno"
449
version = "0.3.14"
450
source = "registry+https://github.com/rust-lang/crates.io-index"
···
455
]
456
457
[[package]]
458
-
name = "fallible-iterator"
459
-
version = "0.3.0"
460
source = "registry+https://github.com/rust-lang/crates.io-index"
461
-
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
462
463
[[package]]
464
-
name = "fallible-streaming-iterator"
465
-
version = "0.1.9"
466
source = "registry+https://github.com/rust-lang/crates.io-index"
467
-
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
468
469
[[package]]
470
-
name = "fastrand"
471
-
version = "2.3.0"
472
source = "registry+https://github.com/rust-lang/crates.io-index"
473
-
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
474
-
475
-
[[package]]
476
-
name = "foldhash"
477
-
version = "0.1.5"
478
-
source = "registry+https://github.com/rust-lang/crates.io-index"
479
-
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
480
481
[[package]]
482
name = "futures"
···
608
609
[[package]]
610
name = "hashbrown"
611
-
version = "0.15.5"
612
source = "registry+https://github.com/rust-lang/crates.io-index"
613
-
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
614
-
dependencies = [
615
-
"foldhash",
616
-
]
617
618
[[package]]
619
-
name = "hashlink"
620
-
version = "0.10.0"
621
source = "registry+https://github.com/rust-lang/crates.io-index"
622
-
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
623
-
dependencies = [
624
-
"hashbrown",
625
-
]
626
627
[[package]]
628
name = "heck"
···
631
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
632
633
[[package]]
634
name = "io-uring"
635
version = "0.7.10"
636
source = "registry+https://github.com/rust-lang/crates.io-index"
···
730
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
731
732
[[package]]
733
-
name = "libsqlite3-sys"
734
-
version = "0.35.0"
735
-
source = "registry+https://github.com/rust-lang/crates.io-index"
736
-
checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f"
737
-
dependencies = [
738
-
"pkg-config",
739
-
"vcpkg",
740
-
]
741
-
742
-
[[package]]
743
name = "linux-raw-sys"
744
version = "0.11.0"
745
source = "registry+https://github.com/rust-lang/crates.io-index"
···
761
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
762
763
[[package]]
764
name = "match-lookup"
765
version = "0.1.1"
766
source = "registry+https://github.com/rust-lang/crates.io-index"
···
892
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
893
894
[[package]]
895
-
name = "pkg-config"
896
-
version = "0.3.32"
897
-
source = "registry+https://github.com/rust-lang/crates.io-index"
898
-
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
899
-
900
-
[[package]]
901
name = "plotters"
902
version = "0.3.7"
903
source = "registry+https://github.com/rust-lang/crates.io-index"
···
947
checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de"
948
dependencies = [
949
"unicode-ident",
950
]
951
952
[[package]]
···
1030
"clap",
1031
"criterion",
1032
"env_logger",
1033
"futures",
1034
"futures-core",
1035
"ipld-core",
1036
"iroh-car",
1037
"log",
1038
"multibase",
1039
-
"rusqlite",
1040
"serde",
1041
"serde_bytes",
1042
"serde_ipld_dagcbor",
···
1047
]
1048
1049
[[package]]
1050
-
name = "rusqlite"
1051
-
version = "0.37.0"
1052
source = "registry+https://github.com/rust-lang/crates.io-index"
1053
-
checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f"
1054
-
dependencies = [
1055
-
"bitflags",
1056
-
"fallible-iterator",
1057
-
"fallible-streaming-iterator",
1058
-
"hashlink",
1059
-
"libsqlite3-sys",
1060
-
"smallvec",
1061
-
]
1062
1063
[[package]]
1064
-
name = "rustc-demangle"
1065
-
version = "0.1.26"
1066
source = "registry+https://github.com/rust-lang/crates.io-index"
1067
-
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
1068
1069
[[package]]
1070
name = "rustix"
···
1105
version = "1.2.0"
1106
source = "registry+https://github.com/rust-lang/crates.io-index"
1107
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
1108
1109
[[package]]
1110
name = "serde"
···
1172
]
1173
1174
[[package]]
1175
name = "sha2"
1176
version = "0.10.9"
1177
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1211
dependencies = [
1212
"libc",
1213
"windows-sys 0.59.0",
1214
]
1215
1216
[[package]]
···
1372
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
1373
1374
[[package]]
1375
-
name = "vcpkg"
1376
-
version = "0.2.15"
1377
source = "registry+https://github.com/rust-lang/crates.io-index"
1378
-
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
1379
1380
[[package]]
1381
name = "version_check"
···
1659
version = "0.46.0"
1660
source = "registry+https://github.com/rust-lang/crates.io-index"
1661
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
1662
1663
[[package]]
1664
name = "zerocopy"
···
167
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
168
169
[[package]]
170
+
name = "byteorder-lite"
171
+
version = "0.1.0"
172
+
source = "registry+https://github.com/rust-lang/crates.io-index"
173
+
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
174
+
175
+
[[package]]
176
name = "bytes"
177
version = "1.10.1"
178
source = "registry+https://github.com/rust-lang/crates.io-index"
179
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
180
+
181
+
[[package]]
182
+
name = "byteview"
183
+
version = "0.10.0"
184
+
source = "registry+https://github.com/rust-lang/crates.io-index"
185
+
checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca"
186
187
[[package]]
188
name = "cast"
···
293
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
294
295
[[package]]
296
+
name = "compare"
297
+
version = "0.0.6"
298
+
source = "registry+https://github.com/rust-lang/crates.io-index"
299
+
checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7"
300
+
301
+
[[package]]
302
name = "const-str"
303
version = "0.4.3"
304
source = "registry+https://github.com/rust-lang/crates.io-index"
···
376
]
377
378
[[package]]
379
+
name = "crossbeam-skiplist"
380
+
version = "0.1.3"
381
+
source = "registry+https://github.com/rust-lang/crates.io-index"
382
+
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
383
+
dependencies = [
384
+
"crossbeam-epoch",
385
+
"crossbeam-utils",
386
+
]
387
+
388
+
[[package]]
389
name = "crossbeam-utils"
390
version = "0.8.21"
391
source = "registry+https://github.com/rust-lang/crates.io-index"
···
408
]
409
410
[[package]]
411
+
name = "dashmap"
412
+
version = "6.1.0"
413
+
source = "registry+https://github.com/rust-lang/crates.io-index"
414
+
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
415
+
dependencies = [
416
+
"cfg-if",
417
+
"crossbeam-utils",
418
+
"hashbrown 0.14.5",
419
+
"lock_api",
420
+
"once_cell",
421
+
"parking_lot_core",
422
+
]
423
+
424
+
[[package]]
425
name = "data-encoding"
426
version = "2.9.0"
427
source = "registry+https://github.com/rust-lang/crates.io-index"
···
464
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
465
466
[[package]]
467
+
name = "enum_dispatch"
468
+
version = "0.3.13"
469
+
source = "registry+https://github.com/rust-lang/crates.io-index"
470
+
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
471
+
dependencies = [
472
+
"once_cell",
473
+
"proc-macro2",
474
+
"quote",
475
+
"syn 2.0.106",
476
+
]
477
+
478
+
[[package]]
479
name = "env_filter"
480
version = "0.1.3"
481
source = "registry+https://github.com/rust-lang/crates.io-index"
···
499
]
500
501
[[package]]
502
+
name = "equivalent"
503
+
version = "1.0.2"
504
+
source = "registry+https://github.com/rust-lang/crates.io-index"
505
+
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
506
+
507
+
[[package]]
508
name = "errno"
509
version = "0.3.14"
510
source = "registry+https://github.com/rust-lang/crates.io-index"
···
515
]
516
517
[[package]]
518
+
name = "fastrand"
519
+
version = "2.3.0"
520
source = "registry+https://github.com/rust-lang/crates.io-index"
521
+
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
522
523
[[package]]
524
+
name = "fjall"
525
+
version = "3.0.1"
526
source = "registry+https://github.com/rust-lang/crates.io-index"
527
+
checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093"
528
+
dependencies = [
529
+
"byteorder-lite",
530
+
"byteview",
531
+
"dashmap",
532
+
"flume",
533
+
"log",
534
+
"lsm-tree",
535
+
"tempfile",
536
+
"xxhash-rust",
537
+
]
538
539
[[package]]
540
+
name = "flume"
541
+
version = "0.12.0"
542
source = "registry+https://github.com/rust-lang/crates.io-index"
543
+
checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be"
544
+
dependencies = [
545
+
"spin",
546
+
]
547
548
[[package]]
549
name = "futures"
···
675
676
[[package]]
677
name = "hashbrown"
678
+
version = "0.14.5"
679
source = "registry+https://github.com/rust-lang/crates.io-index"
680
+
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
681
682
[[package]]
683
+
name = "hashbrown"
684
+
version = "0.16.1"
685
source = "registry+https://github.com/rust-lang/crates.io-index"
686
+
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
687
688
[[package]]
689
name = "heck"
···
692
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
693
694
[[package]]
695
+
name = "interval-heap"
696
+
version = "0.0.5"
697
+
source = "registry+https://github.com/rust-lang/crates.io-index"
698
+
checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6"
699
+
dependencies = [
700
+
"compare",
701
+
]
702
+
703
+
[[package]]
704
name = "io-uring"
705
version = "0.7.10"
706
source = "registry+https://github.com/rust-lang/crates.io-index"
···
800
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
801
802
[[package]]
803
name = "linux-raw-sys"
804
version = "0.11.0"
805
source = "registry+https://github.com/rust-lang/crates.io-index"
···
821
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
822
823
[[package]]
824
+
name = "lsm-tree"
825
+
version = "3.0.1"
826
+
source = "registry+https://github.com/rust-lang/crates.io-index"
827
+
checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb"
828
+
dependencies = [
829
+
"byteorder-lite",
830
+
"byteview",
831
+
"crossbeam-skiplist",
832
+
"enum_dispatch",
833
+
"interval-heap",
834
+
"log",
835
+
"quick_cache",
836
+
"rustc-hash",
837
+
"self_cell",
838
+
"sfa",
839
+
"tempfile",
840
+
"varint-rs",
841
+
"xxhash-rust",
842
+
]
843
+
844
+
[[package]]
845
name = "match-lookup"
846
version = "0.1.1"
847
source = "registry+https://github.com/rust-lang/crates.io-index"
···
973
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
974
975
[[package]]
976
name = "plotters"
977
version = "0.3.7"
978
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1022
checksum = "89ae43fd86e4158d6db51ad8e2b80f313af9cc74f5c0e03ccb87de09998732de"
1023
dependencies = [
1024
"unicode-ident",
1025
+
]
1026
+
1027
+
[[package]]
1028
+
name = "quick_cache"
1029
+
version = "0.6.18"
1030
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1031
+
checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3"
1032
+
dependencies = [
1033
+
"equivalent",
1034
+
"hashbrown 0.16.1",
1035
]
1036
1037
[[package]]
···
1115
"clap",
1116
"criterion",
1117
"env_logger",
1118
+
"fjall",
1119
"futures",
1120
"futures-core",
1121
"ipld-core",
1122
"iroh-car",
1123
"log",
1124
"multibase",
1125
"serde",
1126
"serde_bytes",
1127
"serde_ipld_dagcbor",
···
1132
]
1133
1134
[[package]]
1135
+
name = "rustc-demangle"
1136
+
version = "0.1.26"
1137
source = "registry+https://github.com/rust-lang/crates.io-index"
1138
+
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
1139
1140
[[package]]
1141
+
name = "rustc-hash"
1142
+
version = "2.1.1"
1143
source = "registry+https://github.com/rust-lang/crates.io-index"
1144
+
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
1145
1146
[[package]]
1147
name = "rustix"
···
1182
version = "1.2.0"
1183
source = "registry+https://github.com/rust-lang/crates.io-index"
1184
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
1185
+
1186
+
[[package]]
1187
+
name = "self_cell"
1188
+
version = "1.2.2"
1189
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1190
+
checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89"
1191
1192
[[package]]
1193
name = "serde"
···
1255
]
1256
1257
[[package]]
1258
+
name = "sfa"
1259
+
version = "1.0.0"
1260
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1261
+
checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175"
1262
+
dependencies = [
1263
+
"byteorder-lite",
1264
+
"log",
1265
+
"xxhash-rust",
1266
+
]
1267
+
1268
+
[[package]]
1269
name = "sha2"
1270
version = "0.10.9"
1271
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1305
dependencies = [
1306
"libc",
1307
"windows-sys 0.59.0",
1308
+
]
1309
+
1310
+
[[package]]
1311
+
name = "spin"
1312
+
version = "0.9.8"
1313
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1314
+
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
1315
+
dependencies = [
1316
+
"lock_api",
1317
]
1318
1319
[[package]]
···
1475
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
1476
1477
[[package]]
1478
+
name = "varint-rs"
1479
+
version = "2.2.0"
1480
source = "registry+https://github.com/rust-lang/crates.io-index"
1481
+
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
1482
1483
[[package]]
1484
name = "version_check"
···
1762
version = "0.46.0"
1763
source = "registry+https://github.com/rust-lang/crates.io-index"
1764
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
1765
+
1766
+
[[package]]
1767
+
name = "xxhash-rust"
1768
+
version = "0.8.15"
1769
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1770
+
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
1771
1772
[[package]]
1773
name = "zerocopy"
+1
-1
Cargo.toml
+1
-1
Cargo.toml
···
8
9
[dependencies]
10
bincode = { version = "2.0.1", features = ["serde"] }
11
futures = "0.3.31"
12
futures-core = "0.3.31"
13
ipld-core = { version = "0.4.2", features = ["serde"] }
14
iroh-car = "0.5.1"
15
log = "0.4.28"
16
multibase = "0.9.2"
17
-
rusqlite = "0.37.0"
18
serde = { version = "1.0.228", features = ["derive"] }
19
serde_bytes = "0.11.19"
20
serde_ipld_dagcbor = "0.6.4"
···
8
9
[dependencies]
10
bincode = { version = "2.0.1", features = ["serde"] }
11
+
fjall = { version = "3.0.1", default-features = false }
12
futures = "0.3.31"
13
futures-core = "0.3.31"
14
ipld-core = { version = "0.4.2", features = ["serde"] }
15
iroh-car = "0.5.1"
16
log = "0.4.28"
17
multibase = "0.9.2"
18
serde = { version = "1.0.228", features = ["derive"] }
19
serde_bytes = "0.11.19"
20
serde_ipld_dagcbor = "0.6.4"
+2
-5
examples/disk-read-file/main.rs
+2
-5
examples/disk-read-file/main.rs
···
33
// in this example we only bother handling CARs that are too big for memory
34
// `noop` helper means: do no block processing, store the raw blocks
35
let driver = match DriverBuilder::new()
36
-
.with_mem_limit_mb(10) // how much memory can be used before disk spill
37
.load_car(reader)
38
.await?
39
{
···
82
83
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
84
85
-
// clean up the database. would be nice to do this in drop so it happens
86
-
// automatically, but some blocking work happens, so that's not allowed in
87
-
// async rust. ๐คทโโ๏ธ
88
-
join.await?.reset_store().await?;
89
90
log::info!("done. n={n} zeros={zeros}");
91
···
33
// in this example we only bother handling CARs that are too big for memory
34
// `noop` helper means: do no block processing, store the raw blocks
35
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(32) // how much memory can be used before disk spill
37
.load_car(reader)
38
.await?
39
{
···
82
83
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
84
85
+
join.await?;
86
87
log::info!("done. n={n} zeros={zeros}");
88
-3
readme.md
-3
readme.md
+36
-95
src/disk.rs
+36
-95
src/disk.rs
···
18
*/
19
20
use crate::drive::DriveError;
21
-
use rusqlite::OptionalExtension;
22
use std::path::PathBuf;
23
24
#[derive(Debug, thiserror::Error)]
···
28
/// (The wrapped err should probably be obscured to remove public-facing
29
/// sqlite bits)
30
#[error(transparent)]
31
-
DbError(#[from] rusqlite::Error),
32
/// A tokio blocking task failed to join
33
#[error("Failed to join a tokio blocking task: {0}")]
34
JoinError(#[from] tokio::task::JoinError),
···
38
/// limit.
39
#[error("Maximum disk size reached")]
40
MaxSizeExceeded,
41
-
#[error("this error was replaced, seeing this is a bug.")]
42
-
#[doc(hidden)]
43
-
Stolen,
44
-
}
45
-
46
-
impl DiskError {
47
-
/// hack for ownership challenges with the disk driver
48
-
pub(crate) fn steal(&mut self) -> Self {
49
-
let mut swapped = DiskError::Stolen;
50
-
std::mem::swap(self, &mut swapped);
51
-
swapped
52
-
}
53
}
54
55
/// Builder-style disk store setup
···
71
impl Default for DiskBuilder {
72
fn default() -> Self {
73
Self {
74
-
cache_size_mb: 32,
75
max_stored_mb: 10 * 1024, // 10 GiB
76
}
77
}
···
84
}
85
/// Set the in-memory cache allowance for the database
86
///
87
-
/// Default: 32 MiB
88
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
89
self.cache_size_mb = size;
90
self
···
104
105
/// On-disk block storage
106
pub struct DiskStore {
107
-
conn: rusqlite::Connection,
108
max_stored: usize,
109
stored: usize,
110
}
···
117
max_stored_mb: usize,
118
) -> Result<Self, DiskError> {
119
let max_stored = max_stored_mb * 2_usize.pow(20);
120
-
let conn = tokio::task::spawn_blocking(move || {
121
-
let conn = rusqlite::Connection::open(path)?;
122
-
123
-
let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
124
-
125
-
// conn.pragma_update(None, "journal_mode", "OFF")?;
126
-
// conn.pragma_update(None, "journal_mode", "MEMORY")?;
127
-
conn.pragma_update(None, "journal_mode", "WAL")?;
128
-
// conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
129
-
conn.pragma_update(None, "synchronous", "OFF")?;
130
-
conn.pragma_update(
131
-
None,
132
-
"cache_size",
133
-
(cache_mb as i64 * sqlite_one_mb).to_string(),
134
-
)?;
135
-
Self::reset_tables(&conn)?;
136
137
-
Ok::<_, DiskError>(conn)
138
})
139
.await??;
140
141
Ok(Self {
142
-
conn,
143
max_stored,
144
stored: 0,
145
})
146
}
147
-
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
148
-
let tx = self.conn.transaction()?;
149
-
Ok(SqliteWriter {
150
-
tx,
151
-
stored: &mut self.stored,
152
-
max: self.max_stored,
153
-
})
154
-
}
155
-
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
156
-
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
157
-
Ok(SqliteReader { select_stmt })
158
-
}
159
-
/// Drop and recreate the kv table
160
-
pub async fn reset(self) -> Result<Self, DiskError> {
161
-
tokio::task::spawn_blocking(move || {
162
-
Self::reset_tables(&self.conn)?;
163
-
Ok(self)
164
-
})
165
-
.await?
166
-
}
167
-
fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
168
-
conn.execute("DROP TABLE IF EXISTS blocks", ())?;
169
-
conn.execute(
170
-
"CREATE TABLE blocks (
171
-
key BLOB PRIMARY KEY NOT NULL,
172
-
val BLOB NOT NULL
173
-
) WITHOUT ROWID",
174
-
(),
175
-
)?;
176
-
Ok(())
177
-
}
178
-
}
179
180
-
pub(crate) struct SqliteWriter<'conn> {
181
-
tx: rusqlite::Transaction<'conn>,
182
-
stored: &'conn mut usize,
183
-
max: usize,
184
-
}
185
-
186
-
impl SqliteWriter<'_> {
187
pub(crate) fn put_many(
188
&mut self,
189
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
190
) -> Result<(), DriveError> {
191
-
let mut insert_stmt = self
192
-
.tx
193
-
.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
194
-
.map_err(DiskError::DbError)?;
195
for pair in kv {
196
let (k, v) = pair?;
197
-
*self.stored += v.len();
198
-
if *self.stored > self.max {
199
return Err(DiskError::MaxSizeExceeded.into());
200
}
201
-
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
202
}
203
-
Ok(())
204
-
}
205
-
pub fn commit(self) -> Result<(), DiskError> {
206
-
self.tx.commit()?;
207
Ok(())
208
}
209
-
}
210
-
211
-
pub(crate) struct SqliteReader<'conn> {
212
-
select_stmt: rusqlite::Statement<'conn>,
213
-
}
214
215
-
impl SqliteReader<'_> {
216
-
pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
217
-
self.select_stmt
218
-
.query_one((&key,), |row| row.get(0))
219
-
.optional()
220
}
221
}
···
18
*/
19
20
use crate::drive::DriveError;
21
+
use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy};
22
+
use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
23
use std::path::PathBuf;
24
25
#[derive(Debug, thiserror::Error)]
···
29
/// (The wrapped err should probably be obscured to remove public-facing
30
/// sqlite bits)
31
#[error(transparent)]
32
+
DbError(#[from] FjallError),
33
/// A tokio blocking task failed to join
34
#[error("Failed to join a tokio blocking task: {0}")]
35
JoinError(#[from] tokio::task::JoinError),
···
39
/// limit.
40
#[error("Maximum disk size reached")]
41
MaxSizeExceeded,
42
}
43
44
/// Builder-style disk store setup
···
60
impl Default for DiskBuilder {
61
fn default() -> Self {
62
Self {
63
+
cache_size_mb: 64,
64
max_stored_mb: 10 * 1024, // 10 GiB
65
}
66
}
···
73
}
74
/// Set the in-memory cache allowance for the database
75
///
76
+
/// Default: 64 MiB
77
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
78
self.cache_size_mb = size;
79
self
···
93
94
/// On-disk block storage
95
pub struct DiskStore {
96
+
#[allow(unused)]
97
+
db: Database,
98
+
partition: Keyspace,
99
max_stored: usize,
100
stored: usize,
101
}
···
108
max_stored_mb: usize,
109
) -> Result<Self, DiskError> {
110
let max_stored = max_stored_mb * 2_usize.pow(20);
111
+
let (db, partition) = tokio::task::spawn_blocking(move || {
112
+
let db = Database::builder(path)
113
+
// .manual_journal_persist(true)
114
+
// .flush_workers(1)
115
+
// .compaction_workers(1)
116
+
.journal_compression(CompressionType::None)
117
+
.cache_size(cache_mb as u64 * 2_u64.pow(20))
118
+
.temporary(true)
119
+
.open()?;
120
+
let opts = KeyspaceCreateOptions::default()
121
+
.data_block_restart_interval_policy(RestartIntervalPolicy::all(8))
122
+
.filter_block_pinning_policy(PinningPolicy::disabled())
123
+
.expect_point_read_hits(true)
124
+
.data_block_compression_policy(CompressionPolicy::disabled())
125
+
.manual_journal_persist(true)
126
+
.max_memtable_size(32 * 2_u64.pow(20));
127
+
let partition = db.keyspace("z", || opts)?;
128
129
+
Ok::<_, DiskError>((db, partition))
130
})
131
.await??;
132
133
Ok(Self {
134
+
db,
135
+
partition,
136
max_stored,
137
stored: 0,
138
})
139
}
140
141
pub(crate) fn put_many(
142
&mut self,
143
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
144
) -> Result<(), DriveError> {
145
+
let mut batch = self.db.batch();
146
for pair in kv {
147
let (k, v) = pair?;
148
+
self.stored += v.len();
149
+
if self.stored > self.max_stored {
150
return Err(DiskError::MaxSizeExceeded.into());
151
}
152
+
batch.insert(&self.partition, k, v);
153
}
154
+
batch.commit().map_err(DiskError::DbError)?;
155
Ok(())
156
}
157
158
+
#[inline]
159
+
pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> {
160
+
self.partition.get(key)
161
}
162
}
+4
-52
src/drive.rs
+4
-52
src/drive.rs
···
335
// move store in and back out so we can manage lifetimes
336
// dump mem blocks into the store
337
store = tokio::task::spawn(async move {
338
-
let mut writer = store.get_writer()?;
339
-
340
let kvs = self
341
.mem_blocks
342
.into_iter()
343
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
344
345
-
writer.put_many(kvs)?;
346
-
writer.commit()?;
347
Ok::<_, DriveError>(store)
348
})
349
.await??;
···
351
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
352
353
let store_worker = tokio::task::spawn_blocking(move || {
354
-
let mut writer = store.get_writer()?;
355
-
356
while let Some(chunk) = rx.blocking_recv() {
357
let kvs = chunk
358
.into_iter()
359
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
360
-
writer.put_many(kvs)?;
361
}
362
-
363
-
writer.commit()?;
364
Ok::<_, DriveError>(store)
365
}); // await later
366
···
453
/// println!("{rkey}: size={}", record.len());
454
/// }
455
/// }
456
-
/// let store = disk_driver.reset_store().await?;
457
/// # Ok(())
458
/// # }
459
/// ```
···
468
// comes out again.
469
let (state, res) = tokio::task::spawn_blocking(
470
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
471
-
let mut reader_res = state.store.get_reader();
472
-
let reader: &mut _ = match reader_res {
473
-
Ok(ref mut r) => r,
474
-
Err(ref mut e) => {
475
-
// unfortunately we can't return the error directly because
476
-
// (for some reason) it's attached to the lifetime of the
477
-
// reader?
478
-
// hack a mem::swap so we can get it out :/
479
-
let e_swapped = e.steal();
480
-
// the pain: `state` *has to* outlive the reader
481
-
drop(reader_res);
482
-
return (state, Err(e_swapped.into()));
483
-
}
484
-
};
485
-
486
let mut out = Vec::with_capacity(n);
487
488
for _ in 0..n {
489
// walk as far as we can until we run out of blocks or find a record
490
-
let step = match state.walker.disk_step(reader, process) {
491
Ok(s) => s,
492
Err(e) => {
493
-
// the pain: `state` *has to* outlive the reader
494
-
drop(reader_res);
495
return (state, Err(e.into()));
496
}
497
};
498
match step {
499
Step::Missing(cid) => {
500
-
// the pain: `state` *has to* outlive the reader
501
-
drop(reader_res);
502
return (state, Err(DriveError::MissingBlock(cid)));
503
}
504
Step::Finish => break,
···
506
};
507
}
508
509
-
// `state` *has to* outlive the reader
510
-
drop(reader_res);
511
-
512
(state, Ok::<_, DriveError>(out))
513
},
514
)
···
532
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
533
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
534
let BigState { store, walker } = self.state.as_mut().expect("valid state");
535
-
let mut reader = match store.get_reader() {
536
-
Ok(r) => r,
537
-
Err(e) => return tx.blocking_send(Err(e.into())),
538
-
};
539
540
loop {
541
let mut out: BlockChunk<T> = Vec::with_capacity(n);
···
543
for _ in 0..n {
544
// walk as far as we can until we run out of blocks or find a record
545
546
-
let step = match walker.disk_step(&mut reader, self.process) {
547
Ok(s) => s,
548
Err(e) => return tx.blocking_send(Err(e.into())),
549
};
···
592
/// }
593
///
594
/// }
595
-
/// let store = join.await?.reset_store().await?;
596
/// # Ok(())
597
/// # }
598
/// ```
···
614
});
615
616
(rx, chan_task)
617
-
}
618
-
619
-
/// Reset the disk storage so it can be reused. You must call this.
620
-
///
621
-
/// Ideally we'd put this in an `impl Drop`, but since it makes blocking
622
-
/// calls, that would be risky in an async context. For now you just have to
623
-
/// carefully make sure you call it.
624
-
///
625
-
/// The sqlite store is returned, so it can be reused for another
626
-
/// `DiskDriver`.
627
-
pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
628
-
let BigState { store, .. } = self.state.take().expect("valid state");
629
-
Ok(store.reset().await?)
630
}
631
}
···
335
// move store in and back out so we can manage lifetimes
336
// dump mem blocks into the store
337
store = tokio::task::spawn(async move {
338
let kvs = self
339
.mem_blocks
340
.into_iter()
341
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
342
343
+
store.put_many(kvs)?;
344
Ok::<_, DriveError>(store)
345
})
346
.await??;
···
348
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
349
350
let store_worker = tokio::task::spawn_blocking(move || {
351
while let Some(chunk) = rx.blocking_recv() {
352
let kvs = chunk
353
.into_iter()
354
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
355
+
store.put_many(kvs)?;
356
}
357
Ok::<_, DriveError>(store)
358
}); // await later
359
···
446
/// println!("{rkey}: size={}", record.len());
447
/// }
448
/// }
449
/// # Ok(())
450
/// # }
451
/// ```
···
460
// comes out again.
461
let (state, res) = tokio::task::spawn_blocking(
462
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
463
let mut out = Vec::with_capacity(n);
464
465
for _ in 0..n {
466
// walk as far as we can until we run out of blocks or find a record
467
+
let step = match state.walker.disk_step(&mut state.store, process) {
468
Ok(s) => s,
469
Err(e) => {
470
return (state, Err(e.into()));
471
}
472
};
473
match step {
474
Step::Missing(cid) => {
475
return (state, Err(DriveError::MissingBlock(cid)));
476
}
477
Step::Finish => break,
···
479
};
480
}
481
482
(state, Ok::<_, DriveError>(out))
483
},
484
)
···
502
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
503
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
504
let BigState { store, walker } = self.state.as_mut().expect("valid state");
505
506
loop {
507
let mut out: BlockChunk<T> = Vec::with_capacity(n);
···
509
for _ in 0..n {
510
// walk as far as we can until we run out of blocks or find a record
511
512
+
let step = match walker.disk_step(store, self.process) {
513
Ok(s) => s,
514
Err(e) => return tx.blocking_send(Err(e.into())),
515
};
···
558
/// }
559
///
560
/// }
561
/// # Ok(())
562
/// # }
563
/// ```
···
579
});
580
581
(rx, chan_task)
582
}
583
}
-3
src/lib.rs
-3
src/lib.rs
+5
-5
src/walk.rs
+5
-5
src/walk.rs
···
1
//! Depth-first MST traversal
2
3
-
use crate::disk::SqliteReader;
4
use crate::drive::{DecodeError, MaybeProcessedBlock};
5
use crate::mst::Node;
6
use crate::process::Processable;
···
19
#[error("Action node error: {0}")]
20
MstError(#[from] MstError),
21
#[error("storage error: {0}")]
22
-
StorageError(#[from] rusqlite::Error),
23
#[error("Decode error: {0}")]
24
DecodeError(#[from] DecodeError),
25
}
···
239
/// blocking!!!!!!
240
pub fn disk_step<T: Processable>(
241
&mut self,
242
-
reader: &mut SqliteReader,
243
process: impl Fn(Vec<u8>) -> T,
244
) -> Result<Step<T>, WalkError> {
245
loop {
···
252
&mut Need::Node { depth, cid } => {
253
let cid_bytes = cid.to_bytes();
254
log::trace!("need node {cid:?}");
255
-
let Some(block_bytes) = reader.get(cid_bytes)? else {
256
log::trace!("node not found, resting");
257
return Ok(Step::Missing(cid));
258
};
···
274
Need::Record { rkey, cid } => {
275
log::trace!("need record {cid:?}");
276
let cid_bytes = cid.to_bytes();
277
-
let Some(data_bytes) = reader.get(cid_bytes)? else {
278
log::trace!("record block not found, resting");
279
return Ok(Step::Missing(*cid));
280
};
···
1
//! Depth-first MST traversal
2
3
+
use crate::disk::DiskStore;
4
use crate::drive::{DecodeError, MaybeProcessedBlock};
5
use crate::mst::Node;
6
use crate::process::Processable;
···
19
#[error("Action node error: {0}")]
20
MstError(#[from] MstError),
21
#[error("storage error: {0}")]
22
+
StorageError(#[from] fjall::Error),
23
#[error("Decode error: {0}")]
24
DecodeError(#[from] DecodeError),
25
}
···
239
/// blocking!!!!!!
240
pub fn disk_step<T: Processable>(
241
&mut self,
242
+
reader: &mut DiskStore,
243
process: impl Fn(Vec<u8>) -> T,
244
) -> Result<Step<T>, WalkError> {
245
loop {
···
252
&mut Need::Node { depth, cid } => {
253
let cid_bytes = cid.to_bytes();
254
log::trace!("need node {cid:?}");
255
+
let Some(block_bytes) = reader.get(&cid_bytes)? else {
256
log::trace!("node not found, resting");
257
return Ok(Step::Missing(cid));
258
};
···
274
Need::Record { rkey, cid } => {
275
log::trace!("need record {cid:?}");
276
let cid_bytes = cid.to_bytes();
277
+
let Some(data_bytes) = reader.get(&cid_bytes)? else {
278
log::trace!("record block not found, resting");
279
return Ok(Step::Missing(*cid));
280
};