+242
-71
Cargo.lock
+242
-71
Cargo.lock
···
152
152
checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394"
153
153
154
154
[[package]]
155
+
name = "block-buffer"
156
+
version = "0.10.4"
157
+
source = "registry+https://github.com/rust-lang/crates.io-index"
158
+
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
159
+
dependencies = [
160
+
"generic-array",
161
+
]
162
+
163
+
[[package]]
155
164
name = "bumpalo"
156
165
version = "3.19.0"
157
166
source = "registry+https://github.com/rust-lang/crates.io-index"
158
167
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
159
168
160
169
[[package]]
170
+
name = "byteorder-lite"
171
+
version = "0.1.0"
172
+
source = "registry+https://github.com/rust-lang/crates.io-index"
173
+
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
174
+
175
+
[[package]]
161
176
name = "bytes"
162
177
version = "1.10.1"
163
178
source = "registry+https://github.com/rust-lang/crates.io-index"
164
179
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
165
180
166
181
[[package]]
182
+
name = "byteview"
183
+
version = "0.10.0"
184
+
source = "registry+https://github.com/rust-lang/crates.io-index"
185
+
checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca"
186
+
187
+
[[package]]
167
188
name = "cast"
168
189
version = "0.3.0"
169
190
source = "registry+https://github.com/rust-lang/crates.io-index"
···
272
293
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
273
294
274
295
[[package]]
296
+
name = "compare"
297
+
version = "0.0.6"
298
+
source = "registry+https://github.com/rust-lang/crates.io-index"
299
+
checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7"
300
+
301
+
[[package]]
275
302
name = "const-str"
276
303
version = "0.4.3"
277
304
source = "registry+https://github.com/rust-lang/crates.io-index"
···
284
311
checksum = "b49ba7ef1ad6107f8824dbe97de947cbaac53c44e7f9756a1fba0d37c1eec505"
285
312
dependencies = [
286
313
"memchr",
314
+
]
315
+
316
+
[[package]]
317
+
name = "cpufeatures"
318
+
version = "0.2.17"
319
+
source = "registry+https://github.com/rust-lang/crates.io-index"
320
+
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
321
+
dependencies = [
322
+
"libc",
287
323
]
288
324
289
325
[[package]]
···
340
376
]
341
377
342
378
[[package]]
379
+
name = "crossbeam-skiplist"
380
+
version = "0.1.3"
381
+
source = "registry+https://github.com/rust-lang/crates.io-index"
382
+
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
383
+
dependencies = [
384
+
"crossbeam-epoch",
385
+
"crossbeam-utils",
386
+
]
387
+
388
+
[[package]]
343
389
name = "crossbeam-utils"
344
390
version = "0.8.21"
345
391
source = "registry+https://github.com/rust-lang/crates.io-index"
···
352
398
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
353
399
354
400
[[package]]
401
+
name = "crypto-common"
402
+
version = "0.1.6"
403
+
source = "registry+https://github.com/rust-lang/crates.io-index"
404
+
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
405
+
dependencies = [
406
+
"generic-array",
407
+
"typenum",
408
+
]
409
+
410
+
[[package]]
411
+
name = "dashmap"
412
+
version = "6.1.0"
413
+
source = "registry+https://github.com/rust-lang/crates.io-index"
414
+
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
415
+
dependencies = [
416
+
"cfg-if",
417
+
"crossbeam-utils",
418
+
"hashbrown 0.14.5",
419
+
"lock_api",
420
+
"once_cell",
421
+
"parking_lot_core",
422
+
]
423
+
424
+
[[package]]
355
425
name = "data-encoding"
356
426
version = "2.9.0"
357
427
source = "registry+https://github.com/rust-lang/crates.io-index"
···
378
448
]
379
449
380
450
[[package]]
451
+
name = "digest"
452
+
version = "0.10.7"
453
+
source = "registry+https://github.com/rust-lang/crates.io-index"
454
+
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
455
+
dependencies = [
456
+
"block-buffer",
457
+
"crypto-common",
458
+
]
459
+
460
+
[[package]]
381
461
name = "either"
382
462
version = "1.15.0"
383
463
source = "registry+https://github.com/rust-lang/crates.io-index"
384
464
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
385
465
386
466
[[package]]
467
+
name = "enum_dispatch"
468
+
version = "0.3.13"
469
+
source = "registry+https://github.com/rust-lang/crates.io-index"
470
+
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
471
+
dependencies = [
472
+
"once_cell",
473
+
"proc-macro2",
474
+
"quote",
475
+
"syn 2.0.106",
476
+
]
477
+
478
+
[[package]]
387
479
name = "env_filter"
388
480
version = "0.1.3"
389
481
source = "registry+https://github.com/rust-lang/crates.io-index"
···
407
499
]
408
500
409
501
[[package]]
502
+
name = "equivalent"
503
+
version = "1.0.2"
504
+
source = "registry+https://github.com/rust-lang/crates.io-index"
505
+
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
506
+
507
+
[[package]]
410
508
name = "errno"
411
509
version = "0.3.14"
412
510
source = "registry+https://github.com/rust-lang/crates.io-index"
···
417
515
]
418
516
419
517
[[package]]
420
-
name = "fallible-iterator"
421
-
version = "0.3.0"
422
-
source = "registry+https://github.com/rust-lang/crates.io-index"
423
-
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
424
-
425
-
[[package]]
426
-
name = "fallible-streaming-iterator"
427
-
version = "0.1.9"
428
-
source = "registry+https://github.com/rust-lang/crates.io-index"
429
-
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
430
-
431
-
[[package]]
432
518
name = "fastrand"
433
519
version = "2.3.0"
434
520
source = "registry+https://github.com/rust-lang/crates.io-index"
435
521
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
436
522
437
523
[[package]]
438
-
name = "foldhash"
439
-
version = "0.1.5"
524
+
name = "fjall"
525
+
version = "3.0.1"
440
526
source = "registry+https://github.com/rust-lang/crates.io-index"
441
-
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
527
+
checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093"
528
+
dependencies = [
529
+
"byteorder-lite",
530
+
"byteview",
531
+
"dashmap",
532
+
"flume",
533
+
"log",
534
+
"lsm-tree",
535
+
"tempfile",
536
+
"xxhash-rust",
537
+
]
538
+
539
+
[[package]]
540
+
name = "flume"
541
+
version = "0.12.0"
542
+
source = "registry+https://github.com/rust-lang/crates.io-index"
543
+
checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be"
544
+
dependencies = [
545
+
"spin",
546
+
]
442
547
443
548
[[package]]
444
549
name = "futures"
···
530
635
]
531
636
532
637
[[package]]
638
+
name = "generic-array"
639
+
version = "0.14.9"
640
+
source = "registry+https://github.com/rust-lang/crates.io-index"
641
+
checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2"
642
+
dependencies = [
643
+
"typenum",
644
+
"version_check",
645
+
]
646
+
647
+
[[package]]
533
648
name = "getrandom"
534
649
version = "0.3.3"
535
650
source = "registry+https://github.com/rust-lang/crates.io-index"
···
560
675
561
676
[[package]]
562
677
name = "hashbrown"
563
-
version = "0.15.5"
678
+
version = "0.14.5"
564
679
source = "registry+https://github.com/rust-lang/crates.io-index"
565
-
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
566
-
dependencies = [
567
-
"foldhash",
568
-
]
680
+
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
569
681
570
682
[[package]]
571
-
name = "hashlink"
572
-
version = "0.10.0"
683
+
name = "hashbrown"
684
+
version = "0.16.1"
573
685
source = "registry+https://github.com/rust-lang/crates.io-index"
574
-
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
575
-
dependencies = [
576
-
"hashbrown",
577
-
]
686
+
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
578
687
579
688
[[package]]
580
689
name = "heck"
581
690
version = "0.5.0"
582
691
source = "registry+https://github.com/rust-lang/crates.io-index"
583
692
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
693
+
694
+
[[package]]
695
+
name = "interval-heap"
696
+
version = "0.0.5"
697
+
source = "registry+https://github.com/rust-lang/crates.io-index"
698
+
checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6"
699
+
dependencies = [
700
+
"compare",
701
+
]
584
702
585
703
[[package]]
586
704
name = "io-uring"
···
682
800
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
683
801
684
802
[[package]]
685
-
name = "libsqlite3-sys"
686
-
version = "0.35.0"
687
-
source = "registry+https://github.com/rust-lang/crates.io-index"
688
-
checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f"
689
-
dependencies = [
690
-
"pkg-config",
691
-
"vcpkg",
692
-
]
693
-
694
-
[[package]]
695
803
name = "linux-raw-sys"
696
804
version = "0.11.0"
697
805
source = "registry+https://github.com/rust-lang/crates.io-index"
···
713
821
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
714
822
715
823
[[package]]
824
+
name = "lsm-tree"
825
+
version = "3.0.1"
826
+
source = "registry+https://github.com/rust-lang/crates.io-index"
827
+
checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb"
828
+
dependencies = [
829
+
"byteorder-lite",
830
+
"byteview",
831
+
"crossbeam-skiplist",
832
+
"enum_dispatch",
833
+
"interval-heap",
834
+
"log",
835
+
"quick_cache",
836
+
"rustc-hash",
837
+
"self_cell",
838
+
"sfa",
839
+
"tempfile",
840
+
"varint-rs",
841
+
"xxhash-rust",
842
+
]
843
+
844
+
[[package]]
716
845
name = "match-lookup"
717
846
version = "0.1.1"
718
847
source = "registry+https://github.com/rust-lang/crates.io-index"
···
844
973
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
845
974
846
975
[[package]]
847
-
name = "pkg-config"
848
-
version = "0.3.32"
849
-
source = "registry+https://github.com/rust-lang/crates.io-index"
850
-
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
851
-
852
-
[[package]]
853
976
name = "plotters"
854
977
version = "0.3.7"
855
978
source = "registry+https://github.com/rust-lang/crates.io-index"
···
902
1025
]
903
1026
904
1027
[[package]]
1028
+
name = "quick_cache"
1029
+
version = "0.6.18"
1030
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1031
+
checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3"
1032
+
dependencies = [
1033
+
"equivalent",
1034
+
"hashbrown 0.16.1",
1035
+
]
1036
+
1037
+
[[package]]
905
1038
name = "quote"
906
1039
version = "1.0.41"
907
1040
source = "registry+https://github.com/rust-lang/crates.io-index"
···
934
1067
dependencies = [
935
1068
"crossbeam-deque",
936
1069
"crossbeam-utils",
937
-
]
938
-
939
-
[[package]]
940
-
name = "redb"
941
-
version = "3.1.0"
942
-
source = "registry+https://github.com/rust-lang/crates.io-index"
943
-
checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06"
944
-
dependencies = [
945
-
"libc",
946
1070
]
947
1071
948
1072
[[package]]
···
985
1109
986
1110
[[package]]
987
1111
name = "repo-stream"
988
-
version = "0.1.1"
1112
+
version = "0.2.2"
989
1113
dependencies = [
990
1114
"bincode",
991
1115
"clap",
992
1116
"criterion",
993
1117
"env_logger",
1118
+
"fjall",
994
1119
"futures",
995
1120
"futures-core",
996
1121
"ipld-core",
997
1122
"iroh-car",
998
1123
"log",
999
1124
"multibase",
1000
-
"redb",
1001
-
"rusqlite",
1002
1125
"serde",
1003
1126
"serde_bytes",
1004
1127
"serde_ipld_dagcbor",
1128
+
"sha2",
1005
1129
"tempfile",
1006
1130
"thiserror 2.0.17",
1007
1131
"tokio",
1008
1132
]
1009
1133
1010
1134
[[package]]
1011
-
name = "rusqlite"
1012
-
version = "0.37.0"
1013
-
source = "registry+https://github.com/rust-lang/crates.io-index"
1014
-
checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f"
1015
-
dependencies = [
1016
-
"bitflags",
1017
-
"fallible-iterator",
1018
-
"fallible-streaming-iterator",
1019
-
"hashlink",
1020
-
"libsqlite3-sys",
1021
-
"smallvec",
1022
-
]
1023
-
1024
-
[[package]]
1025
1135
name = "rustc-demangle"
1026
1136
version = "0.1.26"
1027
1137
source = "registry+https://github.com/rust-lang/crates.io-index"
1028
1138
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
1139
+
1140
+
[[package]]
1141
+
name = "rustc-hash"
1142
+
version = "2.1.1"
1143
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1144
+
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
1029
1145
1030
1146
[[package]]
1031
1147
name = "rustix"
···
1068
1184
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
1069
1185
1070
1186
[[package]]
1187
+
name = "self_cell"
1188
+
version = "1.2.2"
1189
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1190
+
checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89"
1191
+
1192
+
[[package]]
1071
1193
name = "serde"
1072
1194
version = "1.0.228"
1073
1195
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1133
1255
]
1134
1256
1135
1257
[[package]]
1258
+
name = "sfa"
1259
+
version = "1.0.0"
1260
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1261
+
checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175"
1262
+
dependencies = [
1263
+
"byteorder-lite",
1264
+
"log",
1265
+
"xxhash-rust",
1266
+
]
1267
+
1268
+
[[package]]
1269
+
name = "sha2"
1270
+
version = "0.10.9"
1271
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1272
+
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
1273
+
dependencies = [
1274
+
"cfg-if",
1275
+
"cpufeatures",
1276
+
"digest",
1277
+
]
1278
+
1279
+
[[package]]
1136
1280
name = "signal-hook-registry"
1137
1281
version = "1.4.6"
1138
1282
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1161
1305
dependencies = [
1162
1306
"libc",
1163
1307
"windows-sys 0.59.0",
1308
+
]
1309
+
1310
+
[[package]]
1311
+
name = "spin"
1312
+
version = "0.9.8"
1313
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1314
+
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
1315
+
dependencies = [
1316
+
"lock_api",
1164
1317
]
1165
1318
1166
1319
[[package]]
···
1286
1439
]
1287
1440
1288
1441
[[package]]
1442
+
name = "typenum"
1443
+
version = "1.19.0"
1444
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1445
+
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
1446
+
1447
+
[[package]]
1289
1448
name = "unicode-ident"
1290
1449
version = "1.0.19"
1291
1450
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1316
1475
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
1317
1476
1318
1477
[[package]]
1319
-
name = "vcpkg"
1320
-
version = "0.2.15"
1478
+
name = "varint-rs"
1479
+
version = "2.2.0"
1321
1480
source = "registry+https://github.com/rust-lang/crates.io-index"
1322
-
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
1481
+
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
1482
+
1483
+
[[package]]
1484
+
name = "version_check"
1485
+
version = "0.9.5"
1486
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1487
+
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
1323
1488
1324
1489
[[package]]
1325
1490
name = "virtue"
···
1597
1762
version = "0.46.0"
1598
1763
source = "registry+https://github.com/rust-lang/crates.io-index"
1599
1764
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
1765
+
1766
+
[[package]]
1767
+
name = "xxhash-rust"
1768
+
version = "0.8.15"
1769
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1770
+
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
1600
1771
1601
1772
[[package]]
1602
1773
name = "zerocopy"
+7
-7
Cargo.toml
+7
-7
Cargo.toml
···
1
1
[package]
2
2
name = "repo-stream"
3
-
version = "0.1.1"
3
+
version = "0.2.2"
4
4
edition = "2024"
5
5
license = "MIT OR Apache-2.0"
6
-
description = "Fast and robust atproto CAR file processing in rust"
6
+
description = "A robust CAR file -> MST walker for atproto"
7
7
repository = "https://tangled.org/@microcosm.blue/repo-stream"
8
8
9
9
[dependencies]
10
10
bincode = { version = "2.0.1", features = ["serde"] }
11
+
fjall = { version = "3.0.1", default-features = false }
11
12
futures = "0.3.31"
12
13
futures-core = "0.3.31"
13
14
ipld-core = { version = "0.4.2", features = ["serde"] }
14
15
iroh-car = "0.5.1"
15
16
log = "0.4.28"
16
17
multibase = "0.9.2"
17
-
redb = "3.1.0"
18
-
rusqlite = "0.37.0"
19
18
serde = { version = "1.0.228", features = ["derive"] }
20
19
serde_bytes = "0.11.19"
21
20
serde_ipld_dagcbor = "0.6.4"
21
+
sha2 = "0.10.9"
22
22
thiserror = "2.0.17"
23
-
tokio = { version = "1.47.1", features = ["rt"] }
23
+
tokio = { version = "1.47.1", features = ["rt", "sync"] }
24
24
25
25
[dev-dependencies]
26
26
clap = { version = "4.5.48", features = ["derive"] }
···
34
34
inherits = "release"
35
35
debug = true
36
36
37
-
[profile.release]
38
-
debug = true
37
+
# [profile.release]
38
+
# debug = true
39
39
40
40
[[bench]]
41
41
name = "non-huge-cars"
+12
-21
benches/huge-car.rs
+12
-21
benches/huge-car.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
use std::path::{Path, PathBuf};
6
4
7
5
use criterion::{Criterion, criterion_group, criterion_main};
···
20
18
});
21
19
}
22
20
23
-
async fn drive_car(filename: impl AsRef<Path>) {
21
+
async fn drive_car(filename: impl AsRef<Path>) -> usize {
24
22
let reader = tokio::fs::File::open(filename).await.unwrap();
25
23
let reader = tokio::io::BufReader::new(reader);
26
-
let reader = CarReader::new(reader).await.unwrap();
27
24
28
-
let root = reader
29
-
.header()
30
-
.roots()
31
-
.first()
32
-
.ok_or("missing root")
25
+
let mut driver = match Driver::load_car(reader, |block| block.len(), 1024)
26
+
.await
33
27
.unwrap()
34
-
.clone();
35
-
36
-
let stream = std::pin::pin!(reader.stream());
37
-
38
-
let (_commit, v) =
39
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
40
-
.await
41
-
.unwrap();
42
-
let mut record_stream = std::pin::pin!(v.stream());
28
+
{
29
+
Driver::Memory(_, mem_driver) => mem_driver,
30
+
Driver::Disk(_) => panic!("not doing disk for benchmark"),
31
+
};
43
32
44
-
while let Some(_) = record_stream.try_next().await.unwrap() {
45
-
// just here for the drive
33
+
let mut n = 0;
34
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
35
+
n += pairs.len();
46
36
}
37
+
n
47
38
}
48
39
49
40
criterion_group!(benches, criterion_benchmark);
+16
-22
benches/non-huge-cars.rs
+16
-22
benches/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
6
4
use criterion::{Criterion, criterion_group, criterion_main};
7
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
8
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
9
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
10
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
15
14
.build()
16
15
.expect("Creating runtime failed");
17
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
18
20
c.bench_function("tiny-car", |b| {
19
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
20
22
});
···
26
28
});
27
29
}
28
30
29
-
async fn drive_car(bytes: &[u8]) {
30
-
let reader = CarReader::new(bytes).await.unwrap();
31
-
32
-
let root = reader
33
-
.header()
34
-
.roots()
35
-
.first()
36
-
.ok_or("missing root")
31
+
async fn drive_car(bytes: &[u8]) -> usize {
32
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 32)
33
+
.await
37
34
.unwrap()
38
-
.clone();
39
-
40
-
let stream = std::pin::pin!(reader.stream());
35
+
{
36
+
Driver::Memory(_, mem_driver) => mem_driver,
37
+
Driver::Disk(_) => panic!("not benching big cars here"),
38
+
};
41
39
42
-
let (_commit, v) =
43
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
44
-
.await
45
-
.unwrap();
46
-
let mut record_stream = std::pin::pin!(v.stream());
47
-
48
-
while let Some(_) = record_stream.try_next().await.unwrap() {
49
-
// just here for the drive
40
+
let mut n = 0;
41
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
42
+
n += pairs.len();
50
43
}
44
+
n
51
45
}
52
46
53
47
criterion_group!(benches, criterion_benchmark);
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+62
-35
examples/disk-read-file/main.rs
+62
-35
examples/disk-read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file by spilling to disk
3
+
*/
4
+
1
5
extern crate repo_stream;
2
6
use clap::Parser;
3
-
use repo_stream::drive::Processable;
4
-
use serde::{Deserialize, Serialize};
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
5
8
use std::path::PathBuf;
6
-
7
-
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
+
use std::time::Instant;
8
10
9
11
#[derive(Debug, Parser)]
10
12
struct Args {
···
14
16
tmpfile: PathBuf,
15
17
}
16
18
17
-
#[derive(Clone, Serialize, Deserialize)]
18
-
struct S(usize);
19
-
20
-
impl Processable for S {
21
-
fn get_size(&self) -> usize {
22
-
0 // no additional space taken, just its stack size (newtype is free)
23
-
}
24
-
}
25
-
26
19
#[tokio::main]
27
-
async fn main() -> Result<()> {
20
+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
28
21
env_logger::init();
29
22
30
23
let Args { car, tmpfile } = Args::parse();
24
+
25
+
// repo-stream takes an AsyncRead as input. wrapping a filesystem read in
26
+
// BufReader can provide a really significant performance win.
31
27
let reader = tokio::fs::File::open(car).await?;
32
28
let reader = tokio::io::BufReader::new(reader);
33
29
34
-
// let kb = 2_usize.pow(10);
35
-
let mb = 2_usize.pow(20);
30
+
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
36
32
37
-
let mut driver =
38
-
match repo_stream::drive::load_car(reader, |block| S(block.len()), 16 * mb).await? {
39
-
repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"),
40
-
repo_stream::drive::Vehicle::Big(big_stuff) => {
41
-
// let disk_store = repo_stream::disk::SqliteStore::new(tmpfile);
42
-
let disk_store = repo_stream::disk::RedbStore::new(tmpfile);
43
-
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
44
-
log::warn!("big: {:?}", commit);
45
-
driver
46
-
}
47
-
};
33
+
// in this example we only bother handling CARs that are too big for memory
34
+
// `noop` helper means: do no block processing, store the raw blocks
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(32) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
40
+
Driver::Memory(_, _) => panic!("try this on a bigger car"),
41
+
Driver::Disk(big_stuff) => {
42
+
// we reach here if the repo was too big and needs to be spilled to
43
+
// disk to continue
44
+
45
+
// set up a disk store we can spill to
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
47
+
48
+
// do the spilling, get back a (similar) driver
49
+
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
50
+
51
+
// at this point you might want to fetch the account's signing key
52
+
// via the DID from the commit, and then verify the signature.
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
48
54
49
-
println!("hello!");
55
+
// pop the driver back out to get some code indentation relief
56
+
driver
57
+
}
58
+
};
50
59
60
+
// collect some random stats about the blocks
51
61
let mut n = 0;
52
-
loop {
53
-
let (d, Some(pairs)) = driver.next_chunk(256).await? else {
54
-
break;
55
-
};
56
-
driver = d;
62
+
let mut zeros = 0;
63
+
64
+
log::info!("walking...");
65
+
66
+
// this example uses the disk driver's channel mode: the tree walking is
67
+
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
68
+
let (mut rx, join) = driver.to_channel(512);
69
+
while let Some(r) = rx.recv().await {
70
+
let pairs = r?;
71
+
72
+
// keep a count of the total number of blocks seen
57
73
n += pairs.len();
58
-
// log::info!("got {rkey:?}");
74
+
75
+
for (_, block) in pairs {
76
+
// for each block, count how many bytes are equal to '0'
77
+
// (this is just an example, you probably want to do something more
78
+
// interesting)
79
+
zeros += block.into_iter().filter(|&b| b == b'0').count()
80
+
}
59
81
}
60
-
log::info!("bye! {n}");
82
+
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
84
+
85
+
join.await?;
86
+
87
+
log::info!("done. n={n} zeros={zeros}");
61
88
62
89
Ok(())
63
90
}
+14
-6
examples/read-file/main.rs
+14
-6
examples/read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file with in-memory processing
3
+
*/
4
+
1
5
extern crate repo_stream;
2
6
use clap::Parser;
7
+
use repo_stream::{Driver, DriverBuilder};
3
8
use std::path::PathBuf;
4
9
5
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
18
23
let reader = tokio::fs::File::open(file).await?;
19
24
let reader = tokio::io::BufReader::new(reader);
20
25
21
-
let (commit, mut driver) =
22
-
match repo_stream::drive::load_car(reader, |block| block.len(), 1024 * 1024).await? {
23
-
repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver),
24
-
repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"),
25
-
};
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
26
34
27
35
log::info!("got commit: {commit:?}");
28
36
···
31
39
n += pairs.len();
32
40
// log::info!("got {rkey:?}");
33
41
}
34
-
log::info!("bye! {n}");
42
+
log::info!("bye! total records={n}");
35
43
36
44
Ok(())
37
45
}
+67
-2
readme.md
+67
-2
readme.md
···
1
1
# repo-stream
2
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
}
54
+
};
55
+
println!("sum of size of all records: {total_size}");
56
+
Ok(())
57
+
}
58
+
```
59
+
60
+
more recent todo
61
+
62
+
- [ ] get an *emtpy* car for the test suite
63
+
- [x] implement a max size on disk limit
64
+
65
+
66
+
-----
67
+
68
+
older stuff (to clean up):
4
69
5
70
6
71
current car processing times (records processed into their length usize, phil's dev machine):
···
27
92
-> yeah the commit is returned from init
28
93
- [ ] spec compliance todos
29
94
- [x] assert that keys are ordered and fail if not
30
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
95
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
31
96
- [ ] performance todos
32
97
- [x] consume the serialized nodes into a mutable efficient format
33
98
- [ ] maybe customize the deserialize impl to do that directly?
+133
-182
src/disk.rs
+133
-182
src/disk.rs
···
1
-
use redb::ReadableDatabase;
2
-
use rusqlite::OptionalExtension;
3
-
use std::error::Error;
4
-
use std::path::PathBuf;
1
+
/*!
2
+
Disk storage for blocks on disk
5
3
6
-
pub trait StorageErrorBase: Error + Send + 'static {}
4
+
Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5
+
to be the best behaved in terms of both on-disk space usage and memory usage.
7
6
8
-
/// high level potential storage resource
9
-
///
10
-
/// separating this allows (hopefully) implementing a storage pool that can
11
-
/// async-block when until a member is available to use
12
-
pub trait DiskStore {
13
-
type StorageError: StorageErrorBase + Send;
14
-
type Access: DiskAccess<StorageError = Self::StorageError>;
15
-
fn get_access(&mut self) -> impl Future<Output = Result<Self::Access, Self::StorageError>>;
16
-
}
7
+
```no_run
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
+
# #[tokio::main]
10
+
# async fn main() -> Result<(), DiskError> {
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
15
+
# Ok(())
16
+
# }
17
+
```
18
+
*/
17
19
18
-
/// actual concrete access to disk storage
19
-
pub trait DiskAccess: Send {
20
-
type StorageError: StorageErrorBase;
20
+
use crate::drive::DriveError;
21
+
use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy};
22
+
use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
23
+
use std::path::PathBuf;
21
24
22
-
fn get_writer(&mut self) -> Result<impl DiskWriter<Self::StorageError>, Self::StorageError>;
23
-
24
-
fn get_reader(
25
-
&self,
26
-
) -> Result<impl DiskReader<StorageError = Self::StorageError>, Self::StorageError>;
27
-
28
-
// TODO: force a cleanup implementation?
29
-
}
30
-
31
-
pub trait DiskWriter<E: StorageErrorBase> {
32
-
fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), E>;
25
+
#[derive(Debug, thiserror::Error)]
26
+
pub enum DiskError {
27
+
/// A wrapped database error
28
+
///
29
+
/// (The wrapped err should probably be obscured to remove public-facing
30
+
/// sqlite bits)
31
+
#[error(transparent)]
32
+
DbError(#[from] FjallError),
33
+
/// A tokio blocking task failed to join
34
+
#[error("Failed to join a tokio blocking task: {0}")]
35
+
JoinError(#[from] tokio::task::JoinError),
36
+
/// The total size of stored blocks exceeded the allowed size
37
+
///
38
+
/// If you need to process *really* big CARs, you can configure a higher
39
+
/// limit.
40
+
#[error("Maximum disk size reached")]
41
+
MaxSizeExceeded,
33
42
}
34
43
35
-
pub trait DiskReader {
36
-
type StorageError: StorageErrorBase;
37
-
fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, Self::StorageError>;
38
-
}
39
-
40
-
///////////////// sqlite
41
-
42
-
pub struct SqliteStore {
43
-
path: PathBuf,
44
+
/// Builder-style disk store setup
45
+
#[derive(Debug, Clone)]
46
+
pub struct DiskBuilder {
47
+
/// Database in-memory cache allowance
48
+
///
49
+
/// Default: 32 MiB
50
+
pub cache_size_mb: usize,
51
+
/// Database stored block size limit
52
+
///
53
+
/// Default: 10 GiB
54
+
///
55
+
/// Note: actual size on disk may be more, but should approximately scale
56
+
/// with this limit
57
+
pub max_stored_mb: usize,
44
58
}
45
59
46
-
impl SqliteStore {
47
-
pub fn new(path: PathBuf) -> Self {
48
-
Self { path }
60
+
impl Default for DiskBuilder {
61
+
fn default() -> Self {
62
+
Self {
63
+
cache_size_mb: 64,
64
+
max_stored_mb: 10 * 1024, // 10 GiB
65
+
}
49
66
}
50
67
}
51
68
52
-
impl StorageErrorBase for rusqlite::Error {}
53
-
54
-
impl DiskStore for SqliteStore {
55
-
type StorageError = rusqlite::Error;
56
-
type Access = SqliteAccess;
57
-
async fn get_access(&mut self) -> Result<SqliteAccess, rusqlite::Error> {
58
-
let path = self.path.clone();
59
-
let conn = tokio::task::spawn_blocking(move || {
60
-
let conn = rusqlite::Connection::open(path)?;
61
-
62
-
conn.pragma_update(None, "journal_mode", "WAL")?;
63
-
conn.pragma_update(None, "synchronous", "OFF")?;
64
-
conn.pragma_update(None, "cache_size", (-4 * 2_i64.pow(10)).to_string())?;
65
-
conn.execute(
66
-
"CREATE TABLE blocks (
67
-
key BLOB PRIMARY KEY NOT NULL,
68
-
val BLOB NOT NULL
69
-
) WITHOUT ROWID",
70
-
(),
71
-
)?;
72
-
73
-
Ok::<_, Self::StorageError>(conn)
74
-
})
75
-
.await
76
-
.expect("join error")?;
77
-
78
-
Ok(SqliteAccess { conn })
69
+
impl DiskBuilder {
70
+
/// Begin configuring the storage with defaults
71
+
pub fn new() -> Self {
72
+
Default::default()
79
73
}
80
-
}
81
-
82
-
pub struct SqliteAccess {
83
-
conn: rusqlite::Connection,
84
-
}
85
-
86
-
impl DiskAccess for SqliteAccess {
87
-
type StorageError = rusqlite::Error;
88
-
fn get_writer(&mut self) -> Result<impl DiskWriter<rusqlite::Error>, rusqlite::Error> {
89
-
let insert_stmt = self
90
-
.conn
91
-
.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?;
92
-
Ok(SqliteWriter { insert_stmt })
74
+
/// Set the in-memory cache allowance for the database
75
+
///
76
+
/// Default: 64 MiB
77
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
78
+
self.cache_size_mb = size;
79
+
self
93
80
}
94
-
fn get_reader(
95
-
&self,
96
-
) -> Result<impl DiskReader<StorageError = rusqlite::Error>, rusqlite::Error> {
97
-
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
98
-
Ok(SqliteReader { select_stmt })
81
+
/// Set the approximate stored block size limit
82
+
///
83
+
/// Default: 10 GiB
84
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
85
+
self.max_stored_mb = max;
86
+
self
99
87
}
100
-
}
101
-
102
-
pub struct SqliteWriter<'conn> {
103
-
insert_stmt: rusqlite::Statement<'conn>,
104
-
}
105
-
106
-
impl DiskWriter<rusqlite::Error> for SqliteWriter<'_> {
107
-
fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> rusqlite::Result<()> {
108
-
self.insert_stmt.execute((key, val))?;
109
-
Ok(())
88
+
/// Open and initialize the actual disk storage
89
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
90
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
110
91
}
111
92
}
112
93
113
-
pub struct SqliteReader<'conn> {
114
-
select_stmt: rusqlite::Statement<'conn>,
94
+
/// On-disk block storage
95
+
pub struct DiskStore {
96
+
#[allow(unused)]
97
+
db: Database,
98
+
partition: Keyspace,
99
+
max_stored: usize,
100
+
stored: usize,
115
101
}
116
102
117
-
impl DiskReader for SqliteReader<'_> {
118
-
type StorageError = rusqlite::Error;
119
-
fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
120
-
self.select_stmt
121
-
.query_one((&key,), |row| row.get(0))
122
-
.optional()
123
-
}
124
-
}
103
+
impl DiskStore {
104
+
/// Initialize a new disk store
105
+
pub async fn new(
106
+
path: PathBuf,
107
+
cache_mb: usize,
108
+
max_stored_mb: usize,
109
+
) -> Result<Self, DiskError> {
110
+
let max_stored = max_stored_mb * 2_usize.pow(20);
111
+
let (db, partition) = tokio::task::spawn_blocking(move || {
112
+
let db = Database::builder(path)
113
+
// .manual_journal_persist(true)
114
+
// .flush_workers(1)
115
+
// .compaction_workers(1)
116
+
.journal_compression(CompressionType::None)
117
+
.cache_size(cache_mb as u64 * 2_u64.pow(20))
118
+
.temporary(true)
119
+
.open()?;
120
+
let opts = KeyspaceCreateOptions::default()
121
+
.data_block_restart_interval_policy(RestartIntervalPolicy::all(8))
122
+
.filter_block_pinning_policy(PinningPolicy::disabled())
123
+
.expect_point_read_hits(true)
124
+
.data_block_compression_policy(CompressionPolicy::disabled())
125
+
.manual_journal_persist(true)
126
+
.max_memtable_size(32 * 2_u64.pow(20));
127
+
let partition = db.keyspace("z", || opts)?;
125
128
126
-
//////////// redb why not
129
+
Ok::<_, DiskError>((db, partition))
130
+
})
131
+
.await??;
127
132
128
-
const REDB_TABLE: redb::TableDefinition<&[u8], &[u8]> = redb::TableDefinition::new("blocks");
129
-
130
-
pub struct RedbStore {
131
-
path: PathBuf,
132
-
}
133
-
134
-
impl RedbStore {
135
-
pub fn new(path: PathBuf) -> Self {
136
-
Self { path }
137
-
}
138
-
}
139
-
140
-
impl StorageErrorBase for redb::Error {}
141
-
142
-
impl DiskStore for RedbStore {
143
-
type StorageError = redb::Error;
144
-
type Access = RedbAccess;
145
-
async fn get_access(&mut self) -> Result<RedbAccess, redb::Error> {
146
-
let path = self.path.clone();
147
-
let kb = 2_usize.pow(10);
148
-
let db = tokio::task::spawn_blocking(move || {
149
-
let db = redb::Database::builder()
150
-
.set_cache_size(16 * kb)
151
-
.create(path)?;
152
-
Ok::<_, Self::StorageError>(db)
133
+
Ok(Self {
134
+
db,
135
+
partition,
136
+
max_stored,
137
+
stored: 0,
153
138
})
154
-
.await
155
-
.expect("join error")?;
156
-
157
-
Ok(RedbAccess { db })
158
139
}
159
-
}
160
140
161
-
pub struct RedbAccess {
162
-
db: redb::Database,
163
-
}
164
-
165
-
impl DiskAccess for RedbAccess {
166
-
type StorageError = redb::Error;
167
-
fn get_writer(&mut self) -> Result<impl DiskWriter<redb::Error>, redb::Error> {
168
-
let mut tx = self.db.begin_write()?;
169
-
tx.set_durability(redb::Durability::None)?;
170
-
Ok(RedbWriter { tx: Some(tx) })
171
-
}
172
-
fn get_reader(&self) -> Result<impl DiskReader<StorageError = redb::Error>, redb::Error> {
173
-
let tx = self.db.begin_read()?;
174
-
Ok(RedbReader { tx })
175
-
}
176
-
}
177
-
178
-
pub struct RedbWriter {
179
-
tx: Option<redb::WriteTransaction>,
180
-
}
181
-
182
-
impl DiskWriter<redb::Error> for RedbWriter {
183
-
fn put(&mut self, key: Vec<u8>, val: Vec<u8>) -> Result<(), redb::Error> {
184
-
let mut table = self.tx.as_ref().unwrap().open_table(REDB_TABLE)?;
185
-
table.insert(&*key, &*val)?;
141
+
pub(crate) fn put_many(
142
+
&mut self,
143
+
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
144
+
) -> Result<(), DriveError> {
145
+
let mut batch = self.db.batch();
146
+
for pair in kv {
147
+
let (k, v) = pair?;
148
+
self.stored += v.len();
149
+
if self.stored > self.max_stored {
150
+
return Err(DiskError::MaxSizeExceeded.into());
151
+
}
152
+
batch.insert(&self.partition, k, v);
153
+
}
154
+
batch.commit().map_err(DiskError::DbError)?;
186
155
Ok(())
187
156
}
188
-
}
189
157
190
-
/// oops careful in async
191
-
impl Drop for RedbWriter {
192
-
fn drop(&mut self) {
193
-
let tx = self.tx.take();
194
-
tx.unwrap().commit().unwrap();
158
+
#[inline]
159
+
pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> {
160
+
self.partition.get(key)
195
161
}
196
162
}
197
-
198
-
pub struct RedbReader {
199
-
tx: redb::ReadTransaction,
200
-
}
201
-
202
-
impl DiskReader for RedbReader {
203
-
type StorageError = redb::Error;
204
-
fn get(&mut self, key: Vec<u8>) -> Result<Option<Vec<u8>>, redb::Error> {
205
-
let table = self.tx.open_table(REDB_TABLE)?;
206
-
let rv = table.get(&*key)?.map(|guard| guard.value().to_vec());
207
-
Ok(rv)
208
-
}
209
-
}
210
-
211
-
///// TODO: that other single file db thing to try
+419
-213
src/drive.rs
+419
-213
src/drive.rs
···
1
-
//! Consume an MST block stream, producing an ordered stream of records
1
+
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
2
3
-
use crate::disk::{DiskAccess, DiskStore, DiskWriter, StorageErrorBase};
3
+
use crate::disk::{DiskError, DiskStore};
4
+
use crate::process::Processable;
4
5
use ipld_core::cid::Cid;
5
6
use iroh_car::CarReader;
6
-
use serde::de::DeserializeOwned;
7
7
use serde::{Deserialize, Serialize};
8
8
use std::collections::HashMap;
9
9
use std::convert::Infallible;
10
-
use tokio::io::AsyncRead;
10
+
use tokio::{io::AsyncRead, sync::mpsc};
11
11
12
12
use crate::mst::{Commit, Node};
13
-
use crate::walk::{DiskTrip, Step, Trip, Walker};
13
+
use crate::walk::{Step, WalkError, Walker};
14
14
15
15
/// Errors that can happen while consuming and emitting blocks and records
16
16
#[derive(Debug, thiserror::Error)]
···
24
24
#[error("The MST block {0} could not be found")]
25
25
MissingBlock(Cid),
26
26
#[error("Failed to walk the mst tree: {0}")]
27
-
Tripped(#[from] Trip),
27
+
WalkError(#[from] WalkError),
28
28
#[error("CAR file had no roots")]
29
29
MissingRoot,
30
-
}
31
-
32
-
#[derive(Debug, thiserror::Error)]
33
-
pub enum DiskDriveError<E: StorageErrorBase> {
34
-
#[error("Error from iroh_car: {0}")]
35
-
CarReader(#[from] iroh_car::Error),
36
-
#[error("Failed to decode commit block: {0}")]
37
-
BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
38
30
#[error("Storage error")]
39
-
StorageError(#[from] E),
40
-
#[error("The Commit block reference by the root was not found")]
41
-
MissingCommit,
42
-
#[error("The MST block {0} could not be found")]
43
-
MissingBlock(Cid),
31
+
StorageError(#[from] DiskError),
44
32
#[error("Encode error: {0}")]
45
33
BincodeEncodeError(#[from] bincode::error::EncodeError),
46
-
#[error("Decode error: {0}")]
34
+
#[error("Tried to send on a closed channel")]
35
+
ChannelSendError, // SendError takes <T> which we don't need
36
+
#[error("Failed to join a task: {0}")]
37
+
JoinError(#[from] tokio::task::JoinError),
38
+
}
39
+
40
+
#[derive(Debug, thiserror::Error)]
41
+
pub enum DecodeError {
42
+
#[error(transparent)]
47
43
BincodeDecodeError(#[from] bincode::error::DecodeError),
48
-
#[error("disk tripped: {0}")]
49
-
DiskTripped(#[from] DiskTrip<E>),
44
+
#[error("extra bytes remained after decoding")]
45
+
ExtraGarbage,
50
46
}
51
47
52
-
// #[derive(Debug, thiserror::Error)]
53
-
// pub enum Boooooo<E: StorageErrorBase> {
54
-
// #[error("disk tripped: {0}")]
55
-
// DiskTripped(#[from] DiskTrip<E>),
56
-
// #[error("dde whatever: {0}")]
57
-
// DiskDriveError(#[from] DiskDriveError<E>),
58
-
// }
59
-
60
-
pub trait Processable: Clone + Serialize + DeserializeOwned {
61
-
/// the additional size taken up (not including its mem::size_of)
62
-
fn get_size(&self) -> usize;
63
-
}
48
+
/// An in-order chunk of Rkey + (processed) Block pairs
49
+
pub type BlockChunk<T> = Vec<(String, T)>;
64
50
65
51
#[derive(Debug, Clone, Serialize, Deserialize)]
66
-
pub enum MaybeProcessedBlock<T> {
52
+
pub(crate) enum MaybeProcessedBlock<T> {
67
53
/// A block that's *probably* a Node (but we can't know yet)
68
54
///
69
55
/// It *can be* a record that suspiciously looks a lot like a node, so we
···
105
91
}
106
92
}
107
93
108
-
pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> {
109
-
Lil(Commit, MemDriver<T>),
110
-
Big(BigCar<R, T>),
94
+
impl<T> MaybeProcessedBlock<T> {
95
+
fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self {
96
+
if Node::could_be(&data) {
97
+
MaybeProcessedBlock::Raw(data)
98
+
} else {
99
+
MaybeProcessedBlock::Processed(process(data))
100
+
}
101
+
}
111
102
}
112
103
113
-
pub async fn load_car<R: AsyncRead + Unpin, T: Processable>(
114
-
reader: R,
115
-
process: fn(&[u8]) -> T,
116
-
max_size: usize,
117
-
) -> Result<Vehicle<R, T>, DriveError> {
118
-
let mut mem_blocks = HashMap::new();
104
+
/// Read a CAR file, buffering blocks in memory or to disk
105
+
pub enum Driver<R: AsyncRead + Unpin, T: Processable> {
106
+
/// All blocks fit within the memory limit
107
+
///
108
+
/// You probably want to check the commit's signature. You can go ahead and
109
+
/// walk the MST right away.
110
+
Memory(Commit, MemDriver<T>),
111
+
/// Blocks exceed the memory limit
112
+
///
113
+
/// You'll need to provide a disk storage to continue. The commit will be
114
+
/// returned and can be validated only once all blocks are loaded.
115
+
Disk(NeedDisk<R, T>),
116
+
}
119
117
120
-
let mut car = CarReader::new(reader).await?;
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
121
123
122
-
let root = *car
123
-
.header()
124
-
.roots()
125
-
.first()
126
-
.ok_or(DriveError::MissingRoot)?;
127
-
log::debug!("root: {root:?}");
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
+
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
+
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
128
163
129
-
let mut commit = None;
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
130
172
131
-
// try to load all the blocks into memory
132
-
let mut mem_size = 0;
133
-
while let Some((cid, data)) = car.next_block().await? {
134
-
// the root commit is a Special Third Kind of block that we need to make
135
-
// sure not to optimistically send to the processing function
136
-
if cid == root {
137
-
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
138
-
commit = Some(c);
139
-
continue;
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
+
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
188
+
}
189
+
190
+
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
191
+
/// Begin processing an atproto MST from a CAR file
192
+
///
193
+
/// Blocks will be loaded, processed, and buffered in memory. If the entire
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
196
+
///
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
198
+
/// partial state will be returned as `Driver::Disk(needed)`, which can be
199
+
/// resumed by providing a `SqliteStorage` for on-disk block storage.
200
+
pub async fn load_car(
201
+
reader: R,
202
+
process: fn(Vec<u8>) -> T,
203
+
mem_limit_mb: usize,
204
+
) -> Result<Driver<R, T>, DriveError> {
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
206
+
let mut mem_blocks = HashMap::new();
207
+
208
+
let mut car = CarReader::new(reader).await?;
209
+
210
+
let root = *car
211
+
.header()
212
+
.roots()
213
+
.first()
214
+
.ok_or(DriveError::MissingRoot)?;
215
+
log::debug!("root: {root:?}");
216
+
217
+
let mut commit = None;
218
+
219
+
// try to load all the blocks into memory
220
+
let mut mem_size = 0;
221
+
while let Some((cid, data)) = car.next_block().await? {
222
+
// the root commit is a Special Third Kind of block that we need to make
223
+
// sure not to optimistically send to the processing function
224
+
if cid == root {
225
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
226
+
commit = Some(c);
227
+
continue;
228
+
}
229
+
230
+
// remaining possible types: node, record, other. optimistically process
231
+
let maybe_processed = MaybeProcessedBlock::maybe(process, data);
232
+
233
+
// stash (maybe processed) blocks in memory as long as we have room
234
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
235
+
mem_blocks.insert(cid, maybe_processed);
236
+
if mem_size >= max_size {
237
+
return Ok(Driver::Disk(NeedDisk {
238
+
car,
239
+
root,
240
+
process,
241
+
max_size,
242
+
mem_blocks,
243
+
commit,
244
+
}));
245
+
}
140
246
}
141
247
142
-
// remaining possible types: node, record, other. optimistically process
143
-
// TODO: get the actual in-memory size to compute disk spill
144
-
let maybe_processed = if Node::could_be(&data) {
145
-
MaybeProcessedBlock::Raw(data)
146
-
} else {
147
-
MaybeProcessedBlock::Processed(process(&data))
148
-
};
248
+
// all blocks loaded and we fit in memory! hopefully we found the commit...
249
+
let commit = commit.ok_or(DriveError::MissingCommit)?;
149
250
150
-
// stash (maybe processed) blocks in memory as long as we have room
151
-
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
152
-
mem_blocks.insert(cid, maybe_processed);
153
-
if mem_size >= max_size {
154
-
return Ok(Vehicle::Big(BigCar {
155
-
car,
156
-
root,
251
+
let walker = Walker::new(commit.data);
252
+
253
+
Ok(Driver::Memory(
254
+
commit,
255
+
MemDriver {
256
+
blocks: mem_blocks,
257
+
walker,
157
258
process,
158
-
max_size,
159
-
mem_blocks,
160
-
commit,
161
-
}));
162
-
}
259
+
},
260
+
))
163
261
}
262
+
}
164
263
165
-
// all blocks loaded and we fit in memory! hopefully we found the commit...
166
-
let commit = commit.ok_or(DriveError::MissingCommit)?;
264
+
/// The core driver between the block stream and MST walker
265
+
///
266
+
/// In the future, PDSs will export CARs in a stream-friendly order that will
267
+
/// enable processing them with tiny memory overhead. But that future is not
268
+
/// here yet.
269
+
///
270
+
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
271
+
/// optimistic stream features: we load all block first, then walk the MST.
272
+
///
273
+
/// This makes things much simpler: we only need to worry about spilling to disk
274
+
/// in one place, and we always have a reasonable expecatation about how much
275
+
/// work the init function will do. We can drop the CAR reader before walking,
276
+
/// so the sync/async boundaries become a little easier to work around.
277
+
#[derive(Debug)]
278
+
pub struct MemDriver<T: Processable> {
279
+
blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
280
+
walker: Walker,
281
+
process: fn(Vec<u8>) -> T,
282
+
}
167
283
168
-
let walker = Walker::new(commit.data);
284
+
impl<T: Processable> MemDriver<T> {
285
+
/// Step through the record outputs, in rkey order
286
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
287
+
let mut out = Vec::with_capacity(n);
288
+
for _ in 0..n {
289
+
// walk as far as we can until we run out of blocks or find a record
290
+
match self.walker.step(&mut self.blocks, self.process)? {
291
+
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
292
+
Step::Finish => break,
293
+
Step::Found { rkey, data } => {
294
+
out.push((rkey, data));
295
+
continue;
296
+
}
297
+
};
298
+
}
169
299
170
-
Ok(Vehicle::Lil(
171
-
commit,
172
-
MemDriver {
173
-
blocks: mem_blocks,
174
-
walker,
175
-
process,
176
-
},
177
-
))
300
+
if out.is_empty() {
301
+
Ok(None)
302
+
} else {
303
+
Ok(Some(out))
304
+
}
305
+
}
178
306
}
179
307
180
-
/// a paritally memory-loaded car file that needs disk spillover to continue
181
-
pub struct BigCar<R: AsyncRead + Unpin, T: Processable> {
308
+
/// A partially memory-loaded car file that needs disk spillover to continue
309
+
pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> {
182
310
car: CarReader<R>,
183
311
root: Cid,
184
-
process: fn(&[u8]) -> T,
312
+
process: fn(Vec<u8>) -> T,
185
313
max_size: usize,
186
314
mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
187
315
pub commit: Option<Commit>,
···
191
319
bincode::serde::encode_to_vec(v, bincode::config::standard())
192
320
}
193
321
194
-
pub fn decode<T: Processable>(bytes: &[u8]) -> Result<T, bincode::error::DecodeError> {
322
+
pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> {
195
323
let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
196
-
assert_eq!(n, bytes.len(), "expected to decode all bytes"); // TODO
324
+
if n != bytes.len() {
325
+
return Err(DecodeError::ExtraGarbage);
326
+
}
197
327
Ok(t)
198
328
}
199
329
200
-
impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> BigCar<R, T> {
201
-
pub async fn finish_loading<S: DiskStore>(
330
+
impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> {
331
+
pub async fn finish_loading(
202
332
mut self,
203
-
mut store: S,
204
-
) -> Result<(Commit, BigCarReady<T, S::Access>), DiskDriveError<S::StorageError>>
205
-
where
206
-
S::Access: Send + 'static,
207
-
S::StorageError: 'static,
208
-
{
209
-
// set up access for real
210
-
let mut access = store.get_access().await?;
333
+
mut store: DiskStore,
334
+
) -> Result<(Commit, DiskDriver<T>), DriveError> {
335
+
// move store in and back out so we can manage lifetimes
336
+
// dump mem blocks into the store
337
+
store = tokio::task::spawn(async move {
338
+
let kvs = self
339
+
.mem_blocks
340
+
.into_iter()
341
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
211
342
212
-
// move access in and back out so we can manage lifetimes
213
-
// dump mem blocks into the store
214
-
access = tokio::task::spawn(async move {
215
-
let mut writer = access.get_writer()?;
216
-
for (k, v) in self.mem_blocks {
217
-
let key_bytes = k.to_bytes();
218
-
let val_bytes = encode(v)?; // TODO
219
-
writer.put(key_bytes, val_bytes)?;
343
+
store.put_many(kvs)?;
344
+
Ok::<_, DriveError>(store)
345
+
})
346
+
.await??;
347
+
348
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
349
+
350
+
let store_worker = tokio::task::spawn_blocking(move || {
351
+
while let Some(chunk) = rx.blocking_recv() {
352
+
let kvs = chunk
353
+
.into_iter()
354
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
355
+
store.put_many(kvs)?;
220
356
}
221
-
drop(writer); // cannot outlive access
222
-
Ok::<_, DiskDriveError<S::StorageError>>(access)
223
-
})
224
-
.await
225
-
.unwrap()?;
357
+
Ok::<_, DriveError>(store)
358
+
}); // await later
226
359
227
360
// dump the rest to disk (in chunks)
361
+
log::debug!("dumping the rest of the stream...");
228
362
loop {
229
-
let mut chunk = vec![];
230
363
let mut mem_size = 0;
364
+
let mut chunk = vec![];
231
365
loop {
232
366
let Some((cid, data)) = self.car.next_block().await? else {
233
367
break;
···
240
374
}
241
375
// remaining possible types: node, record, other. optimistically process
242
376
// TODO: get the actual in-memory size to compute disk spill
243
-
let maybe_processed = if Node::could_be(&data) {
244
-
MaybeProcessedBlock::Raw(data)
245
-
} else {
246
-
MaybeProcessedBlock::Processed((self.process)(&data))
247
-
};
377
+
let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
248
378
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
249
379
chunk.push((cid, maybe_processed));
250
380
if mem_size >= self.max_size {
381
+
// soooooo if we're setting the db cache to max_size and then letting
382
+
// multiple chunks in the queue that are >= max_size, then at any time
383
+
// we might be using some multiple of max_size?
251
384
break;
252
385
}
253
386
}
254
387
if chunk.is_empty() {
255
388
break;
256
389
}
257
-
258
-
// move access in and back out so we can manage lifetimes
259
-
// dump mem blocks into the store
260
-
access = tokio::task::spawn_blocking(move || {
261
-
let mut writer = access.get_writer()?;
262
-
for (k, v) in chunk {
263
-
let key_bytes = k.to_bytes();
264
-
let val_bytes = encode(v)?; // TODO
265
-
writer.put(key_bytes, val_bytes)?;
266
-
}
267
-
drop(writer); // cannot outlive access
268
-
Ok::<_, DiskDriveError<S::StorageError>>(access)
269
-
})
270
-
.await
271
-
.unwrap()?; // TODO
390
+
tx.send(chunk)
391
+
.await
392
+
.map_err(|_| DriveError::ChannelSendError)?;
272
393
}
394
+
drop(tx);
395
+
log::debug!("done. waiting for worker to finish...");
273
396
274
-
let commit = self.commit.ok_or(DiskDriveError::MissingCommit)?;
397
+
store = store_worker.await??;
398
+
399
+
log::debug!("worker finished.");
400
+
401
+
let commit = self.commit.ok_or(DriveError::MissingCommit)?;
275
402
276
403
let walker = Walker::new(commit.data);
277
404
278
405
Ok((
279
406
commit,
280
-
BigCarReady {
407
+
DiskDriver {
281
408
process: self.process,
282
-
access,
283
-
walker,
409
+
state: Some(BigState { store, walker }),
284
410
},
285
411
))
286
412
}
287
413
}
288
414
289
-
pub struct BigCarReady<T: Clone, A: DiskAccess> {
290
-
process: fn(&[u8]) -> T,
291
-
access: A,
415
+
struct BigState {
416
+
store: DiskStore,
292
417
walker: Walker,
293
418
}
294
419
295
-
impl<T: Processable + Send + 'static, A: DiskAccess + Send + 'static> BigCarReady<T, A> {
296
-
pub async fn next_chunk(
297
-
mut self,
420
+
/// MST walker that reads from disk instead of an in-memory hashmap
421
+
pub struct DiskDriver<T: Clone> {
422
+
process: fn(Vec<u8>) -> T,
423
+
state: Option<BigState>,
424
+
}
425
+
426
+
// for doctests only
427
+
#[doc(hidden)]
428
+
pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> {
429
+
use crate::process::noop;
430
+
DiskDriver {
431
+
process: noop,
432
+
state: None,
433
+
}
434
+
}
435
+
436
+
impl<T: Processable + Send + 'static> DiskDriver<T> {
437
+
/// Walk the MST returning up to `n` rkey + record pairs
438
+
///
439
+
/// ```no_run
440
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
441
+
/// # #[tokio::main]
442
+
/// # async fn main() -> Result<(), DriveError> {
443
+
/// # let mut disk_driver = _get_fake_disk_driver();
444
+
/// while let Some(pairs) = disk_driver.next_chunk(256).await? {
445
+
/// for (rkey, record) in pairs {
446
+
/// println!("{rkey}: size={}", record.len());
447
+
/// }
448
+
/// }
449
+
/// # Ok(())
450
+
/// # }
451
+
/// ```
452
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
453
+
let process = self.process;
454
+
455
+
// state should only *ever* be None transiently while inside here
456
+
let mut state = self.state.take().expect("DiskDriver must have Some(state)");
457
+
458
+
// the big pain here is that we don't want to leave self.state in an
459
+
// invalid state (None), so all the error paths have to make sure it
460
+
// comes out again.
461
+
let (state, res) = tokio::task::spawn_blocking(
462
+
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
463
+
let mut out = Vec::with_capacity(n);
464
+
465
+
for _ in 0..n {
466
+
// walk as far as we can until we run out of blocks or find a record
467
+
let step = match state.walker.disk_step(&mut state.store, process) {
468
+
Ok(s) => s,
469
+
Err(e) => {
470
+
return (state, Err(e.into()));
471
+
}
472
+
};
473
+
match step {
474
+
Step::Missing(cid) => {
475
+
return (state, Err(DriveError::MissingBlock(cid)));
476
+
}
477
+
Step::Finish => break,
478
+
Step::Found { rkey, data } => out.push((rkey, data)),
479
+
};
480
+
}
481
+
482
+
(state, Ok::<_, DriveError>(out))
483
+
},
484
+
)
485
+
.await?; // on tokio JoinError, we'll be left with invalid state :(
486
+
487
+
// *must* restore state before dealing with the actual result
488
+
self.state = Some(state);
489
+
490
+
let out = res?;
491
+
492
+
if out.is_empty() {
493
+
Ok(None)
494
+
} else {
495
+
Ok(Some(out))
496
+
}
497
+
}
498
+
499
+
fn read_tx_blocking(
500
+
&mut self,
298
501
n: usize,
299
-
) -> Result<(Self, Option<Vec<(String, T)>>), DiskDriveError<A::StorageError>>
300
-
where
301
-
A::StorageError: Send,
302
-
{
303
-
let mut out = Vec::with_capacity(n);
304
-
(self, out) = tokio::task::spawn_blocking(move || {
305
-
let access = self.access;
306
-
let mut reader = access.get_reader()?;
502
+
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
503
+
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
504
+
let BigState { store, walker } = self.state.as_mut().expect("valid state");
505
+
506
+
loop {
507
+
let mut out: BlockChunk<T> = Vec::with_capacity(n);
307
508
308
509
for _ in 0..n {
309
510
// walk as far as we can until we run out of blocks or find a record
310
-
match self.walker.disk_step(&mut reader, self.process)? {
311
-
Step::Missing(cid) => return Err(DiskDriveError::MissingBlock(cid)),
312
-
Step::Finish => break,
313
-
Step::Step { rkey, data } => {
511
+
512
+
let step = match walker.disk_step(store, self.process) {
513
+
Ok(s) => s,
514
+
Err(e) => return tx.blocking_send(Err(e.into())),
515
+
};
516
+
517
+
match step {
518
+
Step::Missing(cid) => {
519
+
return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
520
+
}
521
+
Step::Finish => return Ok(()),
522
+
Step::Found { rkey, data } => {
314
523
out.push((rkey, data));
315
524
continue;
316
525
}
317
526
};
318
527
}
319
528
320
-
drop(reader); // cannot outlive access
321
-
self.access = access;
322
-
Ok::<_, DiskDriveError<A::StorageError>>((self, out))
323
-
})
324
-
.await
325
-
.unwrap()?; // TODO
529
+
if out.is_empty() {
530
+
break;
531
+
}
532
+
tx.blocking_send(Ok(out))?;
533
+
}
326
534
327
-
if out.is_empty() {
328
-
Ok((self, None))
329
-
} else {
330
-
Ok((self, Some(out)))
331
-
}
535
+
Ok(())
332
536
}
333
-
}
334
537
335
-
/// The core driver between the block stream and MST walker
336
-
///
337
-
/// In the future, PDSs will export CARs in a stream-friendly order that will
338
-
/// enable processing them with tiny memory overhead. But that future is not
339
-
/// here yet.
340
-
///
341
-
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
342
-
/// optimistic stream features: we load all block first, then walk the MST.
343
-
///
344
-
/// This makes things much simpler: we only need to worry about spilling to disk
345
-
/// in one place, and we always have a reasonable expecatation about how much
346
-
/// work the init function will do. We can drop the CAR reader before walking,
347
-
/// so the sync/async boundaries become a little easier to work around.
348
-
#[derive(Debug)]
349
-
pub struct MemDriver<T: Processable> {
350
-
blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
351
-
walker: Walker,
352
-
process: fn(&[u8]) -> T,
353
-
}
538
+
/// Spawn the disk reading task into a tokio blocking thread
539
+
///
540
+
/// The idea is to avoid so much sending back and forth to the blocking
541
+
/// thread, letting a blocking task do all the disk reading work and sending
542
+
/// records and rkeys back through an `mpsc` channel instead.
543
+
///
544
+
/// This might also allow the disk work to continue while processing the
545
+
/// records. It's still not yet clear if this method actually has much
546
+
/// benefit over just using `.next_chunk(n)`.
547
+
///
548
+
/// ```no_run
549
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
550
+
/// # #[tokio::main]
551
+
/// # async fn main() -> Result<(), DriveError> {
552
+
/// # let mut disk_driver = _get_fake_disk_driver();
553
+
/// let (mut rx, join) = disk_driver.to_channel(512);
554
+
/// while let Some(recvd) = rx.recv().await {
555
+
/// let pairs = recvd?;
556
+
/// for (rkey, record) in pairs {
557
+
/// println!("{rkey}: size={}", record.len());
558
+
/// }
559
+
///
560
+
/// }
561
+
/// # Ok(())
562
+
/// # }
563
+
/// ```
564
+
pub fn to_channel(
565
+
mut self,
566
+
n: usize,
567
+
) -> (
568
+
mpsc::Receiver<Result<BlockChunk<T>, DriveError>>,
569
+
tokio::task::JoinHandle<Self>,
570
+
) {
571
+
let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1);
354
572
355
-
impl<T: Processable> MemDriver<T> {
356
-
/// Manually step through the record outputs
357
-
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<(String, T)>>, DriveError> {
358
-
let mut out = Vec::with_capacity(n);
359
-
for _ in 0..n {
360
-
// walk as far as we can until we run out of blocks or find a record
361
-
match self.walker.step(&mut self.blocks, self.process)? {
362
-
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
363
-
Step::Finish => break,
364
-
Step::Step { rkey, data } => {
365
-
out.push((rkey, data));
366
-
continue;
367
-
}
368
-
};
369
-
}
573
+
// sketch: this worker is going to be allowed to execute without a join handle
574
+
let chan_task = tokio::task::spawn_blocking(move || {
575
+
if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
576
+
log::debug!("big car reader exited early due to dropped receiver channel");
577
+
}
578
+
self
579
+
});
370
580
371
-
if out.is_empty() {
372
-
Ok(None)
373
-
} else {
374
-
Ok(Some(out))
375
-
}
581
+
(rx, chan_task)
376
582
}
377
583
}
+81
-5
src/lib.rs
+81
-5
src/lib.rs
···
1
-
//! Fast and robust atproto CAR file processing in rust
2
-
//!
3
-
//! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples)
1
+
/*!
2
+
A robust CAR file -> MST walker for atproto
3
+
4
+
Small CARs have their blocks buffered in memory. If a configurable memory limit
5
+
is reached while reading blocks, CAR reading is suspended, and can be continued
6
+
by providing disk storage to buffer the CAR blocks instead.
7
+
8
+
A `process` function can be provided for tasks where records are transformed
9
+
into a smaller representation, to save memory (and disk) during block reading.
10
+
11
+
Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
12
+
`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
13
+
14
+
Some MST validations are applied
15
+
- Keys must appear in order
16
+
- Keys must be at the correct MST tree depth
17
+
18
+
`iroh_car` additionally applies a block size limit of `2MiB`.
19
+
20
+
```
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
+
23
+
# #[tokio::main]
24
+
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
+
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
+
let mut total_size = 0;
27
+
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
34
+
35
+
// if all blocks fit within memory
36
+
Driver::Memory(_commit, mut driver) => {
37
+
while let Some(chunk) = driver.next_chunk(256).await? {
38
+
for (_rkey, size) in chunk {
39
+
total_size += size;
40
+
}
41
+
}
42
+
},
43
+
44
+
// if the CAR was too big for in-memory processing
45
+
Driver::Disk(paused) => {
46
+
// set up a disk store we can spill to
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
48
+
// do the spilling, get back a (similar) driver
49
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
50
+
51
+
while let Some(chunk) = driver.next_chunk(256).await? {
52
+
for (_rkey, size) in chunk {
53
+
total_size += size;
54
+
}
55
+
}
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
# Ok(())
60
+
# }
61
+
```
62
+
63
+
Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
64
+
ahead and eagerly using disk I/O. This means you have to write a bit more code
65
+
to handle both cases, but it allows you to have finer control over resource
66
+
usage. For example, you can drive a number of parallel memory CAR workers, and
67
+
separately have a different number of disk workers picking up suspended disk
68
+
tasks from a queue.
69
+
70
+
Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
71
+
72
+
*/
73
+
74
+
pub mod mst;
75
+
mod walk;
4
76
5
77
pub mod disk;
6
78
pub mod drive;
7
-
pub mod mst;
8
-
pub mod walk;
79
+
pub mod process;
80
+
81
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
82
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
83
+
pub use mst::Commit;
84
+
pub use process::Processable;
+4
-8
src/mst.rs
+4
-8
src/mst.rs
···
39
39
/// MST node data schema
40
40
#[derive(Debug, Deserialize, PartialEq)]
41
41
#[serde(deny_unknown_fields)]
42
-
pub struct Node {
42
+
pub(crate) struct Node {
43
43
/// link to sub-tree Node on a lower level and with all keys sorting before
44
44
/// keys at this node
45
45
#[serde(rename = "l")]
···
62
62
/// so if a block *could be* a node, any record converter must postpone
63
63
/// processing. if it turns out it happens to be a very node-looking record,
64
64
/// well, sorry, it just has to only be processed later when that's known.
65
-
pub fn could_be(bytes: impl AsRef<[u8]>) -> bool {
65
+
pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool {
66
66
const NODE_FINGERPRINT: [u8; 3] = [
67
67
0xA2, // map length 2 (for "l" and "e" keys)
68
68
0x61, // text length 1
···
83
83
/// with an empty array of entries. This is the only situation in which a
84
84
/// tree may contain an empty leaf node which does not either contain keys
85
85
/// ("entries") or point to a sub-tree containing entries.
86
-
///
87
-
/// TODO: to me this is slightly unclear with respect to `l` (ask someone).
88
-
/// ...is that what "The top of the tree must not be a an empty node which
89
-
/// only points to a sub-tree." is referring to?
90
-
pub fn is_empty(&self) -> bool {
86
+
pub(crate) fn is_empty(&self) -> bool {
91
87
self.left.is_none() && self.entries.is_empty()
92
88
}
93
89
}
···
95
91
/// TreeEntry object
96
92
#[derive(Debug, Deserialize, PartialEq)]
97
93
#[serde(deny_unknown_fields)]
98
-
pub struct Entry {
94
+
pub(crate) struct Entry {
99
95
/// count of bytes shared with previous TreeEntry in this Node (if any)
100
96
#[serde(rename = "p")]
101
97
pub prefix_len: usize,
+108
src/process.rs
+108
src/process.rs
···
1
+
/*!
2
+
Record processor function output trait
3
+
4
+
The return type must satisfy the `Processable` trait, which requires:
5
+
6
+
- `Clone` because two rkeys can refer to the same record by CID, which may
7
+
only appear once in the CAR file.
8
+
- `Serialize + DeserializeOwned` so it can be spilled to disk.
9
+
10
+
One required function must be implemented, `get_size()`: this should return the
11
+
approximate total off-stack size of the type. (the on-stack size will be added
12
+
automatically via `std::mem::get_size`).
13
+
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
20
+
Here's a silly processing function that just collects 'eyy's found in the raw
21
+
record bytes
22
+
23
+
```
24
+
# use repo_stream::Processable;
25
+
# use serde::{Serialize, Deserialize};
26
+
#[derive(Debug, Clone, Serialize, Deserialize)]
27
+
struct Eyy(usize, String);
28
+
29
+
impl Processable for Eyy {
30
+
fn get_size(&self) -> usize {
31
+
// don't need to compute the usize, it's on the stack
32
+
self.1.capacity() // in-mem size from the string's capacity, in bytes
33
+
}
34
+
}
35
+
36
+
fn process(raw: Vec<u8>) -> Vec<Eyy> {
37
+
let mut out = Vec::new();
38
+
let to_find = "eyy".as_bytes();
39
+
for i in 0..(raw.len() - 3) {
40
+
if &raw[i..(i+3)] == to_find {
41
+
out.push(Eyy(i, "eyy".to_string()));
42
+
}
43
+
}
44
+
out
45
+
}
46
+
```
47
+
48
+
The memory sizing stuff is a little sketch but probably at least approximately
49
+
works.
50
+
*/
51
+
52
+
use serde::{Serialize, de::DeserializeOwned};
53
+
54
+
/// Output trait for record processing
55
+
pub trait Processable: Clone + Serialize + DeserializeOwned {
56
+
/// Any additional in-memory size taken by the processed type
57
+
///
58
+
/// Do not include stack size (`std::mem::size_of`)
59
+
fn get_size(&self) -> usize;
60
+
}
61
+
62
+
/// Processor that just returns the raw blocks
63
+
#[inline]
64
+
pub fn noop(block: Vec<u8>) -> Vec<u8> {
65
+
block
66
+
}
67
+
68
+
impl Processable for u8 {
69
+
fn get_size(&self) -> usize {
70
+
0
71
+
}
72
+
}
73
+
74
+
impl Processable for usize {
75
+
fn get_size(&self) -> usize {
76
+
0 // no additional space taken, just its stack size (newtype is free)
77
+
}
78
+
}
79
+
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
86
+
impl<Item: Sized + Processable> Processable for Vec<Item> {
87
+
fn get_size(&self) -> usize {
88
+
let slot_size = std::mem::size_of::<Item>();
89
+
let direct_size = slot_size * self.capacity();
90
+
let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum();
91
+
direct_size + items_referenced_size
92
+
}
93
+
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+203
-268
src/walk.rs
+203
-268
src/walk.rs
···
1
1
//! Depth-first MST traversal
2
2
3
-
use crate::disk::{DiskReader, StorageErrorBase};
4
-
use crate::drive::{MaybeProcessedBlock, Processable};
3
+
use crate::disk::DiskStore;
4
+
use crate::drive::{DecodeError, MaybeProcessedBlock};
5
5
use crate::mst::Node;
6
+
use crate::process::Processable;
6
7
use ipld_core::cid::Cid;
8
+
use sha2::{Digest, Sha256};
7
9
use std::collections::HashMap;
8
10
use std::convert::Infallible;
9
11
10
12
/// Errors that can happen while walking
11
13
#[derive(Debug, thiserror::Error)]
12
-
pub enum Trip {
13
-
#[error("empty mst nodes are not allowed")]
14
-
NodeEmpty,
14
+
pub enum WalkError {
15
15
#[error("Failed to fingerprint commit block")]
16
16
BadCommitFingerprint,
17
17
#[error("Failed to decode commit block: {0}")]
18
18
BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
19
19
#[error("Action node error: {0}")]
20
-
RkeyError(#[from] RkeyError),
21
-
#[error("Encountered an rkey out of order while walking the MST")]
22
-
RkeyOutOfOrder,
23
-
}
24
-
25
-
/// Errors that can happen while walking
26
-
#[derive(Debug, thiserror::Error)]
27
-
pub enum DiskTrip<E: StorageErrorBase> {
28
-
#[error("tripped: {0}")]
29
-
Trip(#[from] Trip),
20
+
MstError(#[from] MstError),
30
21
#[error("storage error: {0}")]
31
-
StorageError(#[from] E),
22
+
StorageError(#[from] fjall::Error),
32
23
#[error("Decode error: {0}")]
33
-
BincodeDecodeError(#[from] bincode::error::DecodeError),
24
+
DecodeError(#[from] DecodeError),
34
25
}
35
26
36
27
/// Errors from invalid Rkeys
37
-
#[derive(Debug, thiserror::Error)]
38
-
pub enum RkeyError {
28
+
#[derive(Debug, PartialEq, thiserror::Error)]
29
+
pub enum MstError {
39
30
#[error("Failed to compute an rkey due to invalid prefix_len")]
40
31
EntryPrefixOutOfbounds,
41
32
#[error("RKey was not utf-8")]
42
33
EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
34
+
#[error("Nodes cannot be empty (except for an entirely empty MST)")]
35
+
EmptyNode,
36
+
#[error("Found an entry with rkey at the wrong depth")]
37
+
WrongDepth,
38
+
#[error("Lost track of our depth (possible bug?)")]
39
+
LostDepth,
40
+
#[error("MST depth underflow: depth-0 node with child trees")]
41
+
DepthUnderflow,
42
+
#[error("Encountered an rkey out of order while walking the MST")]
43
+
RkeyOutOfOrder,
43
44
}
44
45
45
46
/// Walker outputs
···
50
51
/// Reached the end of the MST! yay!
51
52
Finish,
52
53
/// A record was found!
53
-
Step { rkey: String, data: T },
54
+
Found { rkey: String, data: T },
54
55
}
55
56
56
57
#[derive(Debug, Clone, PartialEq)]
57
58
enum Need {
58
-
Node(Cid),
59
+
Node { depth: Depth, cid: Cid },
59
60
Record { rkey: String, cid: Cid },
60
61
}
61
62
62
-
fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> {
63
-
let mut entries = Vec::with_capacity(node.entries.len());
63
+
#[derive(Debug, Clone, Copy, PartialEq)]
64
+
enum Depth {
65
+
Root,
66
+
Depth(u32),
67
+
}
64
68
69
+
impl Depth {
70
+
fn from_key(key: &[u8]) -> Self {
71
+
let mut zeros = 0;
72
+
for byte in Sha256::digest(key) {
73
+
let leading = byte.leading_zeros();
74
+
zeros += leading;
75
+
if leading < 8 {
76
+
break;
77
+
}
78
+
}
79
+
Self::Depth(zeros / 2) // truncating divide (rounds down)
80
+
}
81
+
fn next_expected(&self) -> Result<Option<u32>, MstError> {
82
+
match self {
83
+
Self::Root => Ok(None),
84
+
Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some),
85
+
}
86
+
}
87
+
}
88
+
89
+
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
+
// empty nodes are not allowed in the MST except in an empty MST
91
+
if node.is_empty() {
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
97
+
}
98
+
99
+
let mut entries = Vec::with_capacity(node.entries.len());
65
100
let mut prefix = vec![];
101
+
let mut this_depth = parent_depth.next_expected()?;
102
+
66
103
for entry in &node.entries {
67
104
let mut rkey = vec![];
68
105
let pre_checked = prefix
69
106
.get(..entry.prefix_len)
70
-
.ok_or(RkeyError::EntryPrefixOutOfbounds)?;
107
+
.ok_or(MstError::EntryPrefixOutOfbounds)?;
71
108
rkey.extend_from_slice(pre_checked);
72
109
rkey.extend_from_slice(&entry.keysuffix);
110
+
111
+
let Depth::Depth(key_depth) = Depth::from_key(&rkey) else {
112
+
return Err(MstError::WrongDepth);
113
+
};
114
+
115
+
// this_depth is `none` if we are the deepest child (directly below root)
116
+
// in that case we accept whatever highest depth is claimed
117
+
let expected_depth = match this_depth {
118
+
Some(d) => d,
119
+
None => {
120
+
this_depth = Some(key_depth);
121
+
key_depth
122
+
}
123
+
};
124
+
125
+
// all keys we find should be this depth
126
+
if key_depth != expected_depth {
127
+
return Err(MstError::DepthUnderflow);
128
+
}
129
+
73
130
prefix = rkey.clone();
74
131
75
132
entries.push(Need::Record {
···
77
134
cid: entry.value,
78
135
});
79
136
if let Some(ref tree) = entry.tree {
80
-
entries.push(Need::Node(*tree));
137
+
entries.push(Need::Node {
138
+
depth: Depth::Depth(key_depth),
139
+
cid: *tree,
140
+
});
81
141
}
82
142
}
83
143
84
144
entries.reverse();
85
145
stack.append(&mut entries);
86
146
147
+
let d = this_depth.ok_or(MstError::LostDepth)?;
148
+
87
149
if let Some(tree) = node.left {
88
-
stack.push(Need::Node(tree));
150
+
stack.push(Need::Node {
151
+
depth: Depth::Depth(d),
152
+
cid: tree,
153
+
});
89
154
}
90
155
Ok(())
91
156
}
···
102
167
impl Walker {
103
168
pub fn new(tree_root_cid: Cid) -> Self {
104
169
Self {
105
-
stack: vec![Need::Node(tree_root_cid)],
170
+
stack: vec![Need::Node {
171
+
depth: Depth::Root,
172
+
cid: tree_root_cid,
173
+
}],
106
174
prev: "".to_string(),
107
175
}
108
176
}
···
111
179
pub fn step<T: Processable>(
112
180
&mut self,
113
181
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>,
114
-
process: impl Fn(&[u8]) -> T,
115
-
) -> Result<Step<T>, Trip> {
182
+
process: impl Fn(Vec<u8>) -> T,
183
+
) -> Result<Step<T>, WalkError> {
116
184
loop {
117
-
let Some(mut need) = self.stack.last() else {
185
+
let Some(need) = self.stack.last_mut() else {
118
186
log::trace!("tried to walk but we're actually done.");
119
187
return Ok(Step::Finish);
120
188
};
121
189
122
-
match &mut need {
123
-
Need::Node(cid) => {
190
+
match need {
191
+
&mut Need::Node { depth, cid } => {
124
192
log::trace!("need node {cid:?}");
125
-
let Some(block) = blocks.remove(cid) else {
193
+
let Some(block) = blocks.remove(&cid) else {
126
194
log::trace!("node not found, resting");
127
-
return Ok(Step::Missing(*cid));
195
+
return Ok(Step::Missing(cid));
128
196
};
129
197
130
198
let MaybeProcessedBlock::Raw(data) = block else {
131
-
return Err(Trip::BadCommitFingerprint);
199
+
return Err(WalkError::BadCommitFingerprint);
132
200
};
133
-
let node =
134
-
serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?;
201
+
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
202
+
.map_err(WalkError::BadCommit)?;
135
203
136
204
// found node, make sure we remember
137
205
self.stack.pop();
138
206
139
207
// queue up work on the found node next
140
-
push_from_node(&mut self.stack, &node)?;
208
+
push_from_node(&mut self.stack, &node, depth)?;
141
209
}
142
210
Need::Record { rkey, cid } => {
143
211
log::trace!("need record {cid:?}");
212
+
// note that we cannot *remove* a record block, sadly, since
213
+
// there can be multiple rkeys pointing to the same cid.
144
214
let Some(data) = blocks.get_mut(cid) else {
145
-
log::trace!("record block not found, resting");
146
215
return Ok(Step::Missing(*cid));
147
216
};
148
217
let rkey = rkey.clone();
149
218
let data = match data {
150
-
MaybeProcessedBlock::Raw(data) => process(data),
219
+
MaybeProcessedBlock::Raw(data) => process(data.to_vec()),
151
220
MaybeProcessedBlock::Processed(t) => t.clone(),
152
221
};
153
222
154
223
// found node, make sure we remember
155
224
self.stack.pop();
156
225
157
-
log::trace!("emitting a block as a step. depth={}", self.stack.len());
158
-
159
226
// rkeys *must* be in order or else the tree is invalid (or
160
227
// we have a bug)
161
228
if rkey <= self.prev {
162
-
return Err(Trip::RkeyOutOfOrder);
229
+
return Err(MstError::RkeyOutOfOrder)?;
163
230
}
164
231
self.prev = rkey.clone();
165
232
166
-
return Ok(Step::Step { rkey, data });
233
+
return Ok(Step::Found { rkey, data });
167
234
}
168
235
}
169
236
}
170
237
}
171
238
172
239
/// blocking!!!!!!
173
-
pub fn disk_step<T: Processable, R: DiskReader>(
240
+
pub fn disk_step<T: Processable>(
174
241
&mut self,
175
-
reader: &mut R,
176
-
process: impl Fn(&[u8]) -> T,
177
-
) -> Result<Step<T>, DiskTrip<R::StorageError>> {
242
+
reader: &mut DiskStore,
243
+
process: impl Fn(Vec<u8>) -> T,
244
+
) -> Result<Step<T>, WalkError> {
178
245
loop {
179
-
let Some(mut need) = self.stack.last() else {
246
+
let Some(need) = self.stack.last_mut() else {
180
247
log::trace!("tried to walk but we're actually done.");
181
248
return Ok(Step::Finish);
182
249
};
183
250
184
-
match &mut need {
185
-
Need::Node(cid) => {
251
+
match need {
252
+
&mut Need::Node { depth, cid } => {
186
253
let cid_bytes = cid.to_bytes();
187
254
log::trace!("need node {cid:?}");
188
-
let Some(block_bytes) = reader.get(cid_bytes)? else {
255
+
let Some(block_bytes) = reader.get(&cid_bytes)? else {
189
256
log::trace!("node not found, resting");
190
-
return Ok(Step::Missing(*cid));
257
+
return Ok(Step::Missing(cid));
191
258
};
192
259
193
260
let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?;
194
261
195
262
let MaybeProcessedBlock::Raw(data) = block else {
196
-
return Err(Trip::BadCommitFingerprint.into());
263
+
return Err(WalkError::BadCommitFingerprint);
197
264
};
198
-
let node =
199
-
serde_ipld_dagcbor::from_slice::<Node>(&data).map_err(Trip::BadCommit)?;
265
+
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
266
+
.map_err(WalkError::BadCommit)?;
200
267
201
268
// found node, make sure we remember
202
269
self.stack.pop();
203
270
204
271
// queue up work on the found node next
205
-
push_from_node(&mut self.stack, &node).map_err(Trip::RkeyError)?;
272
+
push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?;
206
273
}
207
274
Need::Record { rkey, cid } => {
208
275
log::trace!("need record {cid:?}");
209
276
let cid_bytes = cid.to_bytes();
210
-
let Some(data_bytes) = reader.get(cid_bytes)? else {
277
+
let Some(data_bytes) = reader.get(&cid_bytes)? else {
211
278
log::trace!("record block not found, resting");
212
279
return Ok(Step::Missing(*cid));
213
280
};
214
281
let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?;
215
282
let rkey = rkey.clone();
216
283
let data = match data {
217
-
MaybeProcessedBlock::Raw(data) => process(&data),
284
+
MaybeProcessedBlock::Raw(data) => process(data),
218
285
MaybeProcessedBlock::Processed(t) => t.clone(),
219
286
};
220
287
···
226
293
// rkeys *must* be in order or else the tree is invalid (or
227
294
// we have a bug)
228
295
if rkey <= self.prev {
229
-
return Err(DiskTrip::Trip(Trip::RkeyOutOfOrder));
296
+
return Err(MstError::RkeyOutOfOrder)?;
230
297
}
231
298
self.prev = rkey.clone();
232
299
233
-
return Ok(Step::Step { rkey, data });
300
+
return Ok(Step::Found { rkey, data });
234
301
}
235
302
}
236
303
}
···
240
307
#[cfg(test)]
241
308
mod test {
242
309
use super::*;
243
-
// use crate::mst::Entry;
244
310
245
311
fn cid1() -> Cid {
246
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
247
313
.parse()
248
314
.unwrap()
249
315
}
250
-
// fn cid2() -> Cid {
251
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
252
-
// .parse()
253
-
// .unwrap()
254
-
// }
255
-
// fn cid3() -> Cid {
256
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
257
-
// .parse()
258
-
// .unwrap()
259
-
// }
260
-
// fn cid4() -> Cid {
261
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
262
-
// .parse()
263
-
// .unwrap()
264
-
// }
265
-
// fn cid5() -> Cid {
266
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
267
-
// .parse()
268
-
// .unwrap()
269
-
// }
270
-
// fn cid6() -> Cid {
271
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
272
-
// .parse()
273
-
// .unwrap()
274
-
// }
275
-
// fn cid7() -> Cid {
276
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
277
-
// .parse()
278
-
// .unwrap()
279
-
// }
280
-
// fn cid8() -> Cid {
281
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
282
-
// .parse()
283
-
// .unwrap()
284
-
// }
285
-
// fn cid9() -> Cid {
286
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
287
-
// .parse()
288
-
// .unwrap()
289
-
// }
316
+
317
+
#[test]
318
+
fn test_depth_spec_0() {
319
+
let d = Depth::from_key(b"2653ae71");
320
+
assert_eq!(d, Depth::Depth(0))
321
+
}
322
+
323
+
#[test]
324
+
fn test_depth_spec_1() {
325
+
let d = Depth::from_key(b"blue");
326
+
assert_eq!(d, Depth::Depth(1))
327
+
}
328
+
329
+
#[test]
330
+
fn test_depth_spec_4() {
331
+
let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec");
332
+
assert_eq!(d, Depth::Depth(4))
333
+
}
334
+
335
+
#[test]
336
+
fn test_depth_spec_8() {
337
+
let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c");
338
+
assert_eq!(d, Depth::Depth(8))
339
+
}
340
+
341
+
#[test]
342
+
fn test_depth_ietf_draft_0() {
343
+
let d = Depth::from_key(b"key1");
344
+
assert_eq!(d, Depth::Depth(0))
345
+
}
346
+
347
+
#[test]
348
+
fn test_depth_ietf_draft_1() {
349
+
let d = Depth::from_key(b"key7");
350
+
assert_eq!(d, Depth::Depth(1))
351
+
}
352
+
353
+
#[test]
354
+
fn test_depth_ietf_draft_4() {
355
+
let d = Depth::from_key(b"key515");
356
+
assert_eq!(d, Depth::Depth(4))
357
+
}
358
+
359
+
#[test]
360
+
fn test_depth_interop() {
361
+
// examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json
362
+
for (k, expected) in [
363
+
("", 0),
364
+
("asdf", 0),
365
+
("blue", 1),
366
+
("2653ae71", 0),
367
+
("88bfafc7", 2),
368
+
("2a92d355", 4),
369
+
("884976f5", 6),
370
+
("app.bsky.feed.post/454397e440ec", 4),
371
+
("app.bsky.feed.post/9adeb165882c", 8),
372
+
] {
373
+
let d = Depth::from_key(k.as_bytes());
374
+
assert_eq!(d, Depth::Depth(expected), "key: {}", k);
375
+
}
376
+
}
290
377
291
378
#[test]
292
-
fn test_next_from_node_empty() {
293
-
let node = Node {
379
+
fn test_push_empty_fails() {
380
+
let empty_node = Node {
294
381
left: None,
295
382
entries: vec![],
296
383
};
297
384
let mut stack = vec![];
298
-
push_from_node(&mut stack, &node).unwrap();
299
-
assert_eq!(stack.last(), None);
385
+
let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4));
386
+
assert_eq!(err, Err(MstError::EmptyNode));
300
387
}
301
388
302
389
#[test]
303
-
fn test_needs_from_node_just_left() {
390
+
fn test_push_one_node() {
304
391
let node = Node {
305
392
left: Some(cid1()),
306
393
entries: vec![],
307
394
};
308
395
let mut stack = vec![];
309
-
push_from_node(&mut stack, &node).unwrap();
310
-
assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref());
396
+
push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap();
397
+
assert_eq!(
398
+
stack.last(),
399
+
Some(Need::Node {
400
+
depth: Depth::Depth(3),
401
+
cid: cid1()
402
+
})
403
+
.as_ref()
404
+
);
311
405
}
312
-
313
-
// #[test]
314
-
// fn test_needs_from_node_just_one_record() {
315
-
// let node = Node {
316
-
// left: None,
317
-
// entries: vec![Entry {
318
-
// keysuffix: "asdf".into(),
319
-
// prefix_len: 0,
320
-
// value: cid1(),
321
-
// tree: None,
322
-
// }],
323
-
// };
324
-
// assert_eq!(
325
-
// needs_from_node(node).unwrap(),
326
-
// vec![Need::Record {
327
-
// rkey: "asdf".into(),
328
-
// cid: cid1(),
329
-
// },]
330
-
// );
331
-
// }
332
-
333
-
// #[test]
334
-
// fn test_needs_from_node_two_records() {
335
-
// let node = Node {
336
-
// left: None,
337
-
// entries: vec![
338
-
// Entry {
339
-
// keysuffix: "asdf".into(),
340
-
// prefix_len: 0,
341
-
// value: cid1(),
342
-
// tree: None,
343
-
// },
344
-
// Entry {
345
-
// keysuffix: "gh".into(),
346
-
// prefix_len: 2,
347
-
// value: cid2(),
348
-
// tree: None,
349
-
// },
350
-
// ],
351
-
// };
352
-
// assert_eq!(
353
-
// needs_from_node(node).unwrap(),
354
-
// vec![
355
-
// Need::Record {
356
-
// rkey: "asdf".into(),
357
-
// cid: cid1(),
358
-
// },
359
-
// Need::Record {
360
-
// rkey: "asgh".into(),
361
-
// cid: cid2(),
362
-
// },
363
-
// ]
364
-
// );
365
-
// }
366
-
367
-
// #[test]
368
-
// fn test_needs_from_node_with_both() {
369
-
// let node = Node {
370
-
// left: None,
371
-
// entries: vec![Entry {
372
-
// keysuffix: "asdf".into(),
373
-
// prefix_len: 0,
374
-
// value: cid1(),
375
-
// tree: Some(cid2()),
376
-
// }],
377
-
// };
378
-
// assert_eq!(
379
-
// needs_from_node(node).unwrap(),
380
-
// vec![
381
-
// Need::Record {
382
-
// rkey: "asdf".into(),
383
-
// cid: cid1(),
384
-
// },
385
-
// Need::Node(cid2()),
386
-
// ]
387
-
// );
388
-
// }
389
-
390
-
// #[test]
391
-
// fn test_needs_from_node_left_and_record() {
392
-
// let node = Node {
393
-
// left: Some(cid1()),
394
-
// entries: vec![Entry {
395
-
// keysuffix: "asdf".into(),
396
-
// prefix_len: 0,
397
-
// value: cid2(),
398
-
// tree: None,
399
-
// }],
400
-
// };
401
-
// assert_eq!(
402
-
// needs_from_node(node).unwrap(),
403
-
// vec![
404
-
// Need::Node(cid1()),
405
-
// Need::Record {
406
-
// rkey: "asdf".into(),
407
-
// cid: cid2(),
408
-
// },
409
-
// ]
410
-
// );
411
-
// }
412
-
413
-
// #[test]
414
-
// fn test_needs_from_full_node() {
415
-
// let node = Node {
416
-
// left: Some(cid1()),
417
-
// entries: vec![
418
-
// Entry {
419
-
// keysuffix: "asdf".into(),
420
-
// prefix_len: 0,
421
-
// value: cid2(),
422
-
// tree: Some(cid3()),
423
-
// },
424
-
// Entry {
425
-
// keysuffix: "ghi".into(),
426
-
// prefix_len: 1,
427
-
// value: cid4(),
428
-
// tree: Some(cid5()),
429
-
// },
430
-
// Entry {
431
-
// keysuffix: "jkl".into(),
432
-
// prefix_len: 2,
433
-
// value: cid6(),
434
-
// tree: Some(cid7()),
435
-
// },
436
-
// Entry {
437
-
// keysuffix: "mno".into(),
438
-
// prefix_len: 4,
439
-
// value: cid8(),
440
-
// tree: Some(cid9()),
441
-
// },
442
-
// ],
443
-
// };
444
-
// assert_eq!(
445
-
// needs_from_node(node).unwrap(),
446
-
// vec![
447
-
// Need::Node(cid1()),
448
-
// Need::Record {
449
-
// rkey: "asdf".into(),
450
-
// cid: cid2(),
451
-
// },
452
-
// Need::Node(cid3()),
453
-
// Need::Record {
454
-
// rkey: "aghi".into(),
455
-
// cid: cid4(),
456
-
// },
457
-
// Need::Node(cid5()),
458
-
// Need::Record {
459
-
// rkey: "agjkl".into(),
460
-
// cid: cid6(),
461
-
// },
462
-
// Need::Node(cid7()),
463
-
// Need::Record {
464
-
// rkey: "agjkmno".into(),
465
-
// cid: cid8(),
466
-
// },
467
-
// Need::Node(cid9()),
468
-
// ]
469
-
// );
470
-
// }
471
406
}
+34
-31
tests/non-huge-cars.rs
+34
-31
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
9
8
10
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
11
-
let reader = CarReader::new(bytes).await.unwrap();
12
-
13
-
let root = reader
14
-
.header()
15
-
.roots()
16
-
.first()
17
-
.ok_or("missing root")
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
15
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
16
+
.await
18
17
.unwrap()
19
-
.clone();
20
-
21
-
let stream = std::pin::pin!(reader.stream());
22
-
23
-
let (_commit, v) =
24
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
25
-
.await
26
-
.unwrap();
27
-
let mut record_stream = std::pin::pin!(v.stream());
18
+
{
19
+
Driver::Memory(_commit, mem_driver) => mem_driver,
20
+
Driver::Disk(_) => panic!("too big"),
21
+
};
28
22
29
23
let mut records = 0;
30
24
let mut sum = 0;
31
25
let mut found_bsky_profile = false;
32
26
let mut prev_rkey = "".to_string();
33
-
while let Some((rkey, size)) = record_stream.try_next().await.unwrap() {
34
-
records += 1;
35
-
sum += size;
36
-
if rkey == "app.bsky.actor.profile/self" {
37
-
found_bsky_profile = true;
27
+
28
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
29
+
for (rkey, size) in pairs {
30
+
records += 1;
31
+
sum += size;
32
+
if rkey == "app.bsky.actor.profile/self" {
33
+
found_bsky_profile = true;
34
+
}
35
+
assert!(rkey > prev_rkey, "rkeys are streamed in order");
36
+
prev_rkey = rkey;
38
37
}
39
-
assert!(rkey > prev_rkey, "rkeys are streamed in order");
40
-
prev_rkey = rkey;
41
38
}
39
+
42
40
assert_eq!(records, expected_records);
43
41
assert_eq!(sum, expected_sum);
44
-
assert!(found_bsky_profile);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
45
48
}
46
49
47
50
#[tokio::test]
48
51
async fn test_tiny_car() {
49
-
test_car(TINY_CAR, 8, 2071).await
52
+
test_car(TINY_CAR, 8, 2071, true).await
50
53
}
51
54
52
55
#[tokio::test]
53
56
async fn test_little_car() {
54
-
test_car(LITTLE_CAR, 278, 246960).await
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
55
58
}
56
59
57
60
#[tokio::test]
58
61
async fn test_midsize_car() {
59
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
60
63
}