+271
-2
Cargo.lock
+271
-2
Cargo.lock
···
126
126
]
127
127
128
128
[[package]]
129
+
name = "bincode"
130
+
version = "2.0.1"
131
+
source = "registry+https://github.com/rust-lang/crates.io-index"
132
+
checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740"
133
+
dependencies = [
134
+
"bincode_derive",
135
+
"serde",
136
+
"unty",
137
+
]
138
+
139
+
[[package]]
140
+
name = "bincode_derive"
141
+
version = "2.0.1"
142
+
source = "registry+https://github.com/rust-lang/crates.io-index"
143
+
checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09"
144
+
dependencies = [
145
+
"virtue",
146
+
]
147
+
148
+
[[package]]
129
149
name = "bitflags"
130
150
version = "2.9.4"
131
151
source = "registry+https://github.com/rust-lang/crates.io-index"
132
152
checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394"
153
+
154
+
[[package]]
155
+
name = "block-buffer"
156
+
version = "0.10.4"
157
+
source = "registry+https://github.com/rust-lang/crates.io-index"
158
+
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
159
+
dependencies = [
160
+
"generic-array",
161
+
]
133
162
134
163
[[package]]
135
164
name = "bumpalo"
···
267
296
]
268
297
269
298
[[package]]
299
+
name = "cpufeatures"
300
+
version = "0.2.17"
301
+
source = "registry+https://github.com/rust-lang/crates.io-index"
302
+
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
303
+
dependencies = [
304
+
"libc",
305
+
]
306
+
307
+
[[package]]
270
308
name = "criterion"
271
309
version = "0.7.0"
272
310
source = "registry+https://github.com/rust-lang/crates.io-index"
···
332
370
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
333
371
334
372
[[package]]
373
+
name = "crypto-common"
374
+
version = "0.1.6"
375
+
source = "registry+https://github.com/rust-lang/crates.io-index"
376
+
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
377
+
dependencies = [
378
+
"generic-array",
379
+
"typenum",
380
+
]
381
+
382
+
[[package]]
335
383
name = "data-encoding"
336
384
version = "2.9.0"
337
385
source = "registry+https://github.com/rust-lang/crates.io-index"
···
358
406
]
359
407
360
408
[[package]]
409
+
name = "digest"
410
+
version = "0.10.7"
411
+
source = "registry+https://github.com/rust-lang/crates.io-index"
412
+
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
413
+
dependencies = [
414
+
"block-buffer",
415
+
"crypto-common",
416
+
]
417
+
418
+
[[package]]
361
419
name = "either"
362
420
version = "1.15.0"
363
421
source = "registry+https://github.com/rust-lang/crates.io-index"
···
387
445
]
388
446
389
447
[[package]]
448
+
name = "errno"
449
+
version = "0.3.14"
450
+
source = "registry+https://github.com/rust-lang/crates.io-index"
451
+
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
452
+
dependencies = [
453
+
"libc",
454
+
"windows-sys 0.60.2",
455
+
]
456
+
457
+
[[package]]
458
+
name = "fallible-iterator"
459
+
version = "0.3.0"
460
+
source = "registry+https://github.com/rust-lang/crates.io-index"
461
+
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
462
+
463
+
[[package]]
464
+
name = "fallible-streaming-iterator"
465
+
version = "0.1.9"
466
+
source = "registry+https://github.com/rust-lang/crates.io-index"
467
+
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
468
+
469
+
[[package]]
470
+
name = "fastrand"
471
+
version = "2.3.0"
472
+
source = "registry+https://github.com/rust-lang/crates.io-index"
473
+
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
474
+
475
+
[[package]]
476
+
name = "foldhash"
477
+
version = "0.1.5"
478
+
source = "registry+https://github.com/rust-lang/crates.io-index"
479
+
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
480
+
481
+
[[package]]
390
482
name = "futures"
391
483
version = "0.3.31"
392
484
source = "registry+https://github.com/rust-lang/crates.io-index"
···
476
568
]
477
569
478
570
[[package]]
571
+
name = "generic-array"
572
+
version = "0.14.9"
573
+
source = "registry+https://github.com/rust-lang/crates.io-index"
574
+
checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2"
575
+
dependencies = [
576
+
"typenum",
577
+
"version_check",
578
+
]
579
+
580
+
[[package]]
581
+
name = "getrandom"
582
+
version = "0.3.3"
583
+
source = "registry+https://github.com/rust-lang/crates.io-index"
584
+
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
585
+
dependencies = [
586
+
"cfg-if",
587
+
"libc",
588
+
"r-efi",
589
+
"wasi 0.14.7+wasi-0.2.4",
590
+
]
591
+
592
+
[[package]]
479
593
name = "gimli"
480
594
version = "0.32.3"
481
595
source = "registry+https://github.com/rust-lang/crates.io-index"
···
490
604
"cfg-if",
491
605
"crunchy",
492
606
"zerocopy",
607
+
]
608
+
609
+
[[package]]
610
+
name = "hashbrown"
611
+
version = "0.15.5"
612
+
source = "registry+https://github.com/rust-lang/crates.io-index"
613
+
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
614
+
dependencies = [
615
+
"foldhash",
616
+
]
617
+
618
+
[[package]]
619
+
name = "hashlink"
620
+
version = "0.10.0"
621
+
source = "registry+https://github.com/rust-lang/crates.io-index"
622
+
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
623
+
dependencies = [
624
+
"hashbrown",
493
625
]
494
626
495
627
[[package]]
···
598
730
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
599
731
600
732
[[package]]
733
+
name = "libsqlite3-sys"
734
+
version = "0.35.0"
735
+
source = "registry+https://github.com/rust-lang/crates.io-index"
736
+
checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f"
737
+
dependencies = [
738
+
"pkg-config",
739
+
"vcpkg",
740
+
]
741
+
742
+
[[package]]
743
+
name = "linux-raw-sys"
744
+
version = "0.11.0"
745
+
source = "registry+https://github.com/rust-lang/crates.io-index"
746
+
checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039"
747
+
748
+
[[package]]
601
749
name = "lock_api"
602
750
version = "0.4.14"
603
751
source = "registry+https://github.com/rust-lang/crates.io-index"
···
645
793
checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c"
646
794
dependencies = [
647
795
"libc",
648
-
"wasi",
796
+
"wasi 0.11.1+wasi-snapshot-preview1",
649
797
"windows-sys 0.59.0",
650
798
]
651
799
···
744
892
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
745
893
746
894
[[package]]
895
+
name = "pkg-config"
896
+
version = "0.3.32"
897
+
source = "registry+https://github.com/rust-lang/crates.io-index"
898
+
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
899
+
900
+
[[package]]
747
901
name = "plotters"
748
902
version = "0.3.7"
749
903
source = "registry+https://github.com/rust-lang/crates.io-index"
···
805
959
]
806
960
807
961
[[package]]
962
+
name = "r-efi"
963
+
version = "5.3.0"
964
+
source = "registry+https://github.com/rust-lang/crates.io-index"
965
+
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
966
+
967
+
[[package]]
808
968
name = "rayon"
809
969
version = "1.11.0"
810
970
source = "registry+https://github.com/rust-lang/crates.io-index"
···
864
1024
865
1025
[[package]]
866
1026
name = "repo-stream"
867
-
version = "0.1.0"
1027
+
version = "0.2.2"
868
1028
dependencies = [
1029
+
"bincode",
869
1030
"clap",
870
1031
"criterion",
871
1032
"env_logger",
···
875
1036
"iroh-car",
876
1037
"log",
877
1038
"multibase",
1039
+
"rusqlite",
878
1040
"serde",
879
1041
"serde_bytes",
880
1042
"serde_ipld_dagcbor",
1043
+
"sha2",
1044
+
"tempfile",
881
1045
"thiserror 2.0.17",
882
1046
"tokio",
883
1047
]
884
1048
885
1049
[[package]]
1050
+
name = "rusqlite"
1051
+
version = "0.37.0"
1052
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1053
+
checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f"
1054
+
dependencies = [
1055
+
"bitflags",
1056
+
"fallible-iterator",
1057
+
"fallible-streaming-iterator",
1058
+
"hashlink",
1059
+
"libsqlite3-sys",
1060
+
"smallvec",
1061
+
]
1062
+
1063
+
[[package]]
886
1064
name = "rustc-demangle"
887
1065
version = "0.1.26"
888
1066
source = "registry+https://github.com/rust-lang/crates.io-index"
889
1067
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
1068
+
1069
+
[[package]]
1070
+
name = "rustix"
1071
+
version = "1.1.2"
1072
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1073
+
checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e"
1074
+
dependencies = [
1075
+
"bitflags",
1076
+
"errno",
1077
+
"libc",
1078
+
"linux-raw-sys",
1079
+
"windows-sys 0.60.2",
1080
+
]
890
1081
891
1082
[[package]]
892
1083
name = "rustversion"
···
981
1172
]
982
1173
983
1174
[[package]]
1175
+
name = "sha2"
1176
+
version = "0.10.9"
1177
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1178
+
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
1179
+
dependencies = [
1180
+
"cfg-if",
1181
+
"cpufeatures",
1182
+
"digest",
1183
+
]
1184
+
1185
+
[[package]]
984
1186
name = "signal-hook-registry"
985
1187
version = "1.4.6"
986
1188
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1037
1239
"proc-macro2",
1038
1240
"quote",
1039
1241
"unicode-ident",
1242
+
]
1243
+
1244
+
[[package]]
1245
+
name = "tempfile"
1246
+
version = "3.23.0"
1247
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1248
+
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
1249
+
dependencies = [
1250
+
"fastrand",
1251
+
"getrandom",
1252
+
"once_cell",
1253
+
"rustix",
1254
+
"windows-sys 0.60.2",
1040
1255
]
1041
1256
1042
1257
[[package]]
···
1121
1336
]
1122
1337
1123
1338
[[package]]
1339
+
name = "typenum"
1340
+
version = "1.19.0"
1341
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1342
+
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
1343
+
1344
+
[[package]]
1124
1345
name = "unicode-ident"
1125
1346
version = "1.0.19"
1126
1347
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1137
1358
version = "0.8.0"
1138
1359
source = "registry+https://github.com/rust-lang/crates.io-index"
1139
1360
checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06"
1361
+
1362
+
[[package]]
1363
+
name = "unty"
1364
+
version = "0.0.4"
1365
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1366
+
checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae"
1140
1367
1141
1368
[[package]]
1142
1369
name = "utf8parse"
···
1145
1372
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
1146
1373
1147
1374
[[package]]
1375
+
name = "vcpkg"
1376
+
version = "0.2.15"
1377
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1378
+
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
1379
+
1380
+
[[package]]
1381
+
name = "version_check"
1382
+
version = "0.9.5"
1383
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1384
+
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
1385
+
1386
+
[[package]]
1387
+
name = "virtue"
1388
+
version = "0.0.18"
1389
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1390
+
checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1"
1391
+
1392
+
[[package]]
1148
1393
name = "walkdir"
1149
1394
version = "2.5.0"
1150
1395
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1159
1404
version = "0.11.1+wasi-snapshot-preview1"
1160
1405
source = "registry+https://github.com/rust-lang/crates.io-index"
1161
1406
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
1407
+
1408
+
[[package]]
1409
+
name = "wasi"
1410
+
version = "0.14.7+wasi-0.2.4"
1411
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1412
+
checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c"
1413
+
dependencies = [
1414
+
"wasip2",
1415
+
]
1416
+
1417
+
[[package]]
1418
+
name = "wasip2"
1419
+
version = "1.0.1+wasi-0.2.4"
1420
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1421
+
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
1422
+
dependencies = [
1423
+
"wit-bindgen",
1424
+
]
1162
1425
1163
1426
[[package]]
1164
1427
name = "wasm-bindgen"
···
1390
1653
version = "0.53.1"
1391
1654
source = "registry+https://github.com/rust-lang/crates.io-index"
1392
1655
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
1656
+
1657
+
[[package]]
1658
+
name = "wit-bindgen"
1659
+
version = "0.46.0"
1660
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1661
+
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
1393
1662
1394
1663
[[package]]
1395
1664
name = "zerocopy"
+16
-2
Cargo.toml
+16
-2
Cargo.toml
···
1
1
[package]
2
2
name = "repo-stream"
3
-
version = "0.1.0"
3
+
version = "0.2.2"
4
4
edition = "2024"
5
+
license = "MIT OR Apache-2.0"
6
+
description = "A robust CAR file -> MST walker for atproto"
7
+
repository = "https://tangled.org/@microcosm.blue/repo-stream"
5
8
6
9
[dependencies]
10
+
bincode = { version = "2.0.1", features = ["serde"] }
7
11
futures = "0.3.31"
8
12
futures-core = "0.3.31"
9
13
ipld-core = { version = "0.4.2", features = ["serde"] }
10
14
iroh-car = "0.5.1"
11
15
log = "0.4.28"
12
16
multibase = "0.9.2"
17
+
rusqlite = "0.37.0"
13
18
serde = { version = "1.0.228", features = ["derive"] }
14
19
serde_bytes = "0.11.19"
15
20
serde_ipld_dagcbor = "0.6.4"
21
+
sha2 = "0.10.9"
16
22
thiserror = "2.0.17"
17
-
tokio = "1.47.1"
23
+
tokio = { version = "1.47.1", features = ["rt", "sync"] }
18
24
19
25
[dev-dependencies]
20
26
clap = { version = "4.5.48", features = ["derive"] }
21
27
criterion = { version = "0.7.0", features = ["async_tokio"] }
22
28
env_logger = "0.11.8"
23
29
multibase = "0.9.2"
30
+
tempfile = "3.23.0"
24
31
tokio = { version = "1.47.1", features = ["full"] }
25
32
26
33
[profile.profiling]
27
34
inherits = "release"
28
35
debug = true
36
+
37
+
# [profile.release]
38
+
# debug = true
29
39
30
40
[[bench]]
31
41
name = "non-huge-cars"
32
42
harness = false
43
+
44
+
[[bench]]
45
+
name = "huge-car"
46
+
harness = false
+190
LICENSE.Apache-2.0
+190
LICENSE.Apache-2.0
···
1
+
Apache License
2
+
Version 2.0, January 2004
3
+
http://www.apache.org/licenses/
4
+
5
+
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
6
+
7
+
1. Definitions.
8
+
9
+
"License" shall mean the terms and conditions for use, reproduction,
10
+
and distribution as defined by Sections 1 through 9 of this document.
11
+
12
+
"Licensor" shall mean the copyright owner or entity authorized by
13
+
the copyright owner that is granting the License.
14
+
15
+
"Legal Entity" shall mean the union of the acting entity and all
16
+
other entities that control, are controlled by, or are under common
17
+
control with that entity. For the purposes of this definition,
18
+
"control" means (i) the power, direct or indirect, to cause the
19
+
direction or management of such entity, whether by contract or
20
+
otherwise, or (ii) ownership of fifty percent (50%) or more of the
21
+
outstanding shares, or (iii) beneficial ownership of such entity.
22
+
23
+
"You" (or "Your") shall mean an individual or Legal Entity
24
+
exercising permissions granted by this License.
25
+
26
+
"Source" form shall mean the preferred form for making modifications,
27
+
including but not limited to software source code, documentation
28
+
source, and configuration files.
29
+
30
+
"Object" form shall mean any form resulting from mechanical
31
+
transformation or translation of a Source form, including but
32
+
not limited to compiled object code, generated documentation,
33
+
and conversions to other media types.
34
+
35
+
"Work" shall mean the work of authorship, whether in Source or
36
+
Object form, made available under the License, as indicated by a
37
+
copyright notice that is included in or attached to the work
38
+
(an example is provided in the Appendix below).
39
+
40
+
"Derivative Works" shall mean any work, whether in Source or Object
41
+
form, that is based on (or derived from) the Work and for which the
42
+
editorial revisions, annotations, elaborations, or other modifications
43
+
represent, as a whole, an original work of authorship. For the purposes
44
+
of this License, Derivative Works shall not include works that remain
45
+
separable from, or merely link (or bind by name) to the interfaces of,
46
+
the Work and Derivative Works thereof.
47
+
48
+
"Contribution" shall mean any work of authorship, including
49
+
the original version of the Work and any modifications or additions
50
+
to that Work or Derivative Works thereof, that is intentionally
51
+
submitted to Licensor for inclusion in the Work by the copyright owner
52
+
or by an individual or Legal Entity authorized to submit on behalf of
53
+
the copyright owner. For the purposes of this definition, "submitted"
54
+
means any form of electronic, verbal, or written communication sent
55
+
to the Licensor or its representatives, including but not limited to
56
+
communication on electronic mailing lists, source code control systems,
57
+
and issue tracking systems that are managed by, or on behalf of, the
58
+
Licensor for the purpose of discussing and improving the Work, but
59
+
excluding communication that is conspicuously marked or otherwise
60
+
designated in writing by the copyright owner as "Not a Contribution."
61
+
62
+
"Contributor" shall mean Licensor and any individual or Legal Entity
63
+
on behalf of whom a Contribution has been received by Licensor and
64
+
subsequently incorporated within the Work.
65
+
66
+
2. Grant of Copyright License. Subject to the terms and conditions of
67
+
this License, each Contributor hereby grants to You a perpetual,
68
+
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
69
+
copyright license to reproduce, prepare Derivative Works of,
70
+
publicly display, publicly perform, sublicense, and distribute the
71
+
Work and such Derivative Works in Source or Object form.
72
+
73
+
3. Grant of Patent License. Subject to the terms and conditions of
74
+
this License, each Contributor hereby grants to You a perpetual,
75
+
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
76
+
(except as stated in this section) patent license to make, have made,
77
+
use, offer to sell, sell, import, and otherwise transfer the Work,
78
+
where such license applies only to those patent claims licensable
79
+
by such Contributor that are necessarily infringed by their
80
+
Contribution(s) alone or by combination of their Contribution(s)
81
+
with the Work to which such Contribution(s) was submitted. If You
82
+
institute patent litigation against any entity (including a
83
+
cross-claim or counterclaim in a lawsuit) alleging that the Work
84
+
or a Contribution incorporated within the Work constitutes direct
85
+
or contributory patent infringement, then any patent licenses
86
+
granted to You under this License for that Work shall terminate
87
+
as of the date such litigation is filed.
88
+
89
+
4. Redistribution. You may reproduce and distribute copies of the
90
+
Work or Derivative Works thereof in any medium, with or without
91
+
modifications, and in Source or Object form, provided that You
92
+
meet the following conditions:
93
+
94
+
(a) You must give any other recipients of the Work or
95
+
Derivative Works a copy of this License; and
96
+
97
+
(b) You must cause any modified files to carry prominent notices
98
+
stating that You changed the files; and
99
+
100
+
(c) You must retain, in the Source form of any Derivative Works
101
+
that You distribute, all copyright, patent, trademark, and
102
+
attribution notices from the Source form of the Work,
103
+
excluding those notices that do not pertain to any part of
104
+
the Derivative Works; and
105
+
106
+
(d) If the Work includes a "NOTICE" text file as part of its
107
+
distribution, then any Derivative Works that You distribute must
108
+
include a readable copy of the attribution notices contained
109
+
within such NOTICE file, excluding those notices that do not
110
+
pertain to any part of the Derivative Works, in at least one
111
+
of the following places: within a NOTICE text file distributed
112
+
as part of the Derivative Works; within the Source form or
113
+
documentation, if provided along with the Derivative Works; or,
114
+
within a display generated by the Derivative Works, if and
115
+
wherever such third-party notices normally appear. The contents
116
+
of the NOTICE file are for informational purposes only and
117
+
do not modify the License. You may add Your own attribution
118
+
notices within Derivative Works that You distribute, alongside
119
+
or as an addendum to the NOTICE text from the Work, provided
120
+
that such additional attribution notices cannot be construed
121
+
as modifying the License.
122
+
123
+
You may add Your own copyright statement to Your modifications and
124
+
may provide additional or different license terms and conditions
125
+
for use, reproduction, or distribution of Your modifications, or
126
+
for any such Derivative Works as a whole, provided Your use,
127
+
reproduction, and distribution of the Work otherwise complies with
128
+
the conditions stated in this License.
129
+
130
+
5. Submission of Contributions. Unless You explicitly state otherwise,
131
+
any Contribution intentionally submitted for inclusion in the Work
132
+
by You to the Licensor shall be under the terms and conditions of
133
+
this License, without any additional terms or conditions.
134
+
Notwithstanding the above, nothing herein shall supersede or modify
135
+
the terms of any separate license agreement you may have executed
136
+
with Licensor regarding such Contributions.
137
+
138
+
6. Trademarks. This License does not grant permission to use the trade
139
+
names, trademarks, service marks, or product names of the Licensor,
140
+
except as required for reasonable and customary use in describing the
141
+
origin of the Work and reproducing the content of the NOTICE file.
142
+
143
+
7. Disclaimer of Warranty. Unless required by applicable law or
144
+
agreed to in writing, Licensor provides the Work (and each
145
+
Contributor provides its Contributions) on an "AS IS" BASIS,
146
+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
147
+
implied, including, without limitation, any warranties or conditions
148
+
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
149
+
PARTICULAR PURPOSE. You are solely responsible for determining the
150
+
appropriateness of using or redistributing the Work and assume any
151
+
risks associated with Your exercise of permissions under this License.
152
+
153
+
8. Limitation of Liability. In no event and under no legal theory,
154
+
whether in tort (including negligence), contract, or otherwise,
155
+
unless required by applicable law (such as deliberate and grossly
156
+
negligent acts) or agreed to in writing, shall any Contributor be
157
+
liable to You for damages, including any direct, indirect, special,
158
+
incidental, or consequential damages of any character arising as a
159
+
result of this License or out of the use or inability to use the
160
+
Work (including but not limited to damages for loss of goodwill,
161
+
work stoppage, computer failure or malfunction, or any and all
162
+
other commercial damages or losses), even if such Contributor
163
+
has been advised of the possibility of such damages.
164
+
165
+
9. Accepting Warranty or Additional Liability. While redistributing
166
+
the Work or Derivative Works thereof, You may choose to offer,
167
+
and charge a fee for, acceptance of support, warranty, indemnity,
168
+
or other liability obligations and/or rights consistent with this
169
+
License. However, in accepting such obligations, You may act only
170
+
on Your own behalf and on Your sole responsibility, not on behalf
171
+
of any other Contributor, and only if You agree to indemnify,
172
+
defend, and hold each Contributor harmless for any liability
173
+
incurred by, or claims asserted against, such Contributor by reason
174
+
of your accepting any such warranty or additional liability.
175
+
176
+
END OF TERMS AND CONDITIONS
177
+
178
+
Copyright 2025 microcosm
179
+
180
+
Licensed under the Apache License, Version 2.0 (the "License");
181
+
you may not use this file except in compliance with the License.
182
+
You may obtain a copy of the License at
183
+
184
+
http://www.apache.org/licenses/LICENSE-2.0
185
+
186
+
Unless required by applicable law or agreed to in writing, software
187
+
distributed under the License is distributed on an "AS IS" BASIS,
188
+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
189
+
See the License for the specific language governing permissions and
190
+
limitations under the License.
+21
LICENSE.MIT
+21
LICENSE.MIT
···
1
+
MIT License
2
+
3
+
Copyright (c) 2025 microcosm
4
+
5
+
Permission is hereby granted, free of charge, to any person obtaining a copy
6
+
of this software and associated documentation files (the "Software"), to deal
7
+
in the Software without restriction, including without limitation the rights
8
+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
+
copies of the Software, and to permit persons to whom the Software is
10
+
furnished to do so, subject to the following conditions:
11
+
12
+
The above copyright notice and this permission notice shall be included in all
13
+
copies or substantial portions of the Software.
14
+
15
+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
+
SOFTWARE.
+41
benches/huge-car.rs
+41
benches/huge-car.rs
···
1
+
extern crate repo_stream;
2
+
use repo_stream::Driver;
3
+
use std::path::{Path, PathBuf};
4
+
5
+
use criterion::{Criterion, criterion_group, criterion_main};
6
+
7
+
pub fn criterion_benchmark(c: &mut Criterion) {
8
+
let rt = tokio::runtime::Builder::new_multi_thread()
9
+
.enable_all()
10
+
.build()
11
+
.expect("Creating runtime failed");
12
+
13
+
let filename = std::env::var("HUGE_CAR").expect("HUGE_CAR env var");
14
+
let filename: PathBuf = filename.try_into().unwrap();
15
+
16
+
c.bench_function("huge-car", |b| {
17
+
b.to_async(&rt).iter(async || drive_car(&filename).await)
18
+
});
19
+
}
20
+
21
+
async fn drive_car(filename: impl AsRef<Path>) -> usize {
22
+
let reader = tokio::fs::File::open(filename).await.unwrap();
23
+
let reader = tokio::io::BufReader::new(reader);
24
+
25
+
let mut driver = match Driver::load_car(reader, |block| block.len(), 1024)
26
+
.await
27
+
.unwrap()
28
+
{
29
+
Driver::Memory(_, mem_driver) => mem_driver,
30
+
Driver::Disk(_) => panic!("not doing disk for benchmark"),
31
+
};
32
+
33
+
let mut n = 0;
34
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
35
+
n += pairs.len();
36
+
}
37
+
n
38
+
}
39
+
40
+
criterion_group!(benches, criterion_benchmark);
41
+
criterion_main!(benches);
+16
-22
benches/non-huge-cars.rs
+16
-22
benches/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
6
4
use criterion::{Criterion, criterion_group, criterion_main};
7
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
8
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
9
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
10
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
15
14
.build()
16
15
.expect("Creating runtime failed");
17
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
18
20
c.bench_function("tiny-car", |b| {
19
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
20
22
});
···
26
28
});
27
29
}
28
30
29
-
async fn drive_car(bytes: &[u8]) {
30
-
let reader = CarReader::new(bytes).await.unwrap();
31
-
32
-
let root = reader
33
-
.header()
34
-
.roots()
35
-
.first()
36
-
.ok_or("missing root")
31
+
async fn drive_car(bytes: &[u8]) -> usize {
32
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 32)
33
+
.await
37
34
.unwrap()
38
-
.clone();
39
-
40
-
let stream = std::pin::pin!(reader.stream());
35
+
{
36
+
Driver::Memory(_, mem_driver) => mem_driver,
37
+
Driver::Disk(_) => panic!("not benching big cars here"),
38
+
};
41
39
42
-
let (_commit, v) =
43
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
44
-
.await
45
-
.unwrap();
46
-
let mut record_stream = std::pin::pin!(v.stream());
47
-
48
-
while let Some(_) = record_stream.try_next().await.unwrap() {
49
-
// just here for the drive
40
+
let mut n = 0;
41
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
42
+
n += pairs.len();
50
43
}
44
+
n
51
45
}
52
46
53
47
criterion_group!(benches, criterion_benchmark);
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+93
examples/disk-read-file/main.rs
+93
examples/disk-read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file by spilling to disk
3
+
*/
4
+
5
+
extern crate repo_stream;
6
+
use clap::Parser;
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8
+
use std::path::PathBuf;
9
+
use std::time::Instant;
10
+
11
+
#[derive(Debug, Parser)]
12
+
struct Args {
13
+
#[arg()]
14
+
car: PathBuf,
15
+
#[arg()]
16
+
tmpfile: PathBuf,
17
+
}
18
+
19
+
#[tokio::main]
20
+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
21
+
env_logger::init();
22
+
23
+
let Args { car, tmpfile } = Args::parse();
24
+
25
+
// repo-stream takes an AsyncRead as input. wrapping a filesystem read in
26
+
// BufReader can provide a really significant performance win.
27
+
let reader = tokio::fs::File::open(car).await?;
28
+
let reader = tokio::io::BufReader::new(reader);
29
+
30
+
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
32
+
33
+
// in this example we only bother handling CARs that are too big for memory
34
+
// `noop` helper means: do no block processing, store the raw blocks
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(10) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
40
+
Driver::Memory(_, _) => panic!("try this on a bigger car"),
41
+
Driver::Disk(big_stuff) => {
42
+
// we reach here if the repo was too big and needs to be spilled to
43
+
// disk to continue
44
+
45
+
// set up a disk store we can spill to
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
47
+
48
+
// do the spilling, get back a (similar) driver
49
+
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
50
+
51
+
// at this point you might want to fetch the account's signing key
52
+
// via the DID from the commit, and then verify the signature.
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
54
+
55
+
// pop the driver back out to get some code indentation relief
56
+
driver
57
+
}
58
+
};
59
+
60
+
// collect some random stats about the blocks
61
+
let mut n = 0;
62
+
let mut zeros = 0;
63
+
64
+
log::info!("walking...");
65
+
66
+
// this example uses the disk driver's channel mode: the tree walking is
67
+
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
68
+
let (mut rx, join) = driver.to_channel(512);
69
+
while let Some(r) = rx.recv().await {
70
+
let pairs = r?;
71
+
72
+
// keep a count of the total number of blocks seen
73
+
n += pairs.len();
74
+
75
+
for (_, block) in pairs {
76
+
// for each block, count how many bytes are equal to '0'
77
+
// (this is just an example, you probably want to do something more
78
+
// interesting)
79
+
zeros += block.into_iter().filter(|&b| b == b'0').count()
80
+
}
81
+
}
82
+
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
84
+
85
+
// clean up the database. would be nice to do this in drop so it happens
86
+
// automatically, but some blocking work happens, so that's not allowed in
87
+
// async rust. ๐คทโโ๏ธ
88
+
join.await?.reset_store().await?;
89
+
90
+
log::info!("done. n={n} zeros={zeros}");
91
+
92
+
Ok(())
93
+
}
+18
-25
examples/read-file/main.rs
+18
-25
examples/read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file with in-memory processing
3
+
*/
4
+
1
5
extern crate repo_stream;
2
6
use clap::Parser;
3
-
use futures::TryStreamExt;
4
-
use iroh_car::CarReader;
5
-
use std::convert::Infallible;
7
+
use repo_stream::{Driver, DriverBuilder};
6
8
use std::path::PathBuf;
7
9
8
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
21
23
let reader = tokio::fs::File::open(file).await?;
22
24
let reader = tokio::io::BufReader::new(reader);
23
25
24
-
println!("hello!");
25
-
26
-
let reader = CarReader::new(reader).await?;
27
-
28
-
let root = reader
29
-
.header()
30
-
.roots()
31
-
.first()
32
-
.ok_or("missing root")?
33
-
.clone();
34
-
log::debug!("root: {root:?}");
35
-
36
-
// let stream = Box::pin(reader.stream());
37
-
let stream = std::pin::pin!(reader.stream());
38
-
39
-
let (commit, v) =
40
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
41
-
.await?;
42
-
let mut record_stream = std::pin::pin!(v.stream());
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
43
34
44
35
log::info!("got commit: {commit:?}");
45
36
46
-
while let Some((rkey, _rec)) = record_stream.try_next().await? {
47
-
log::info!("got {rkey:?}");
37
+
let mut n = 0;
38
+
while let Some(pairs) = driver.next_chunk(256).await? {
39
+
n += pairs.len();
40
+
// log::info!("got {rkey:?}");
48
41
}
49
-
log::info!("bye!");
42
+
log::info!("bye! total records={n}");
50
43
51
44
Ok(())
52
45
}
+103
-9
readme.md
+103
-9
readme.md
···
1
1
# repo-stream
2
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
54
+
// clean up the disk store (drop tables etc)
55
+
driver.reset_store().await?;
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
Ok(())
60
+
}
61
+
```
62
+
63
+
more recent todo
64
+
65
+
- [ ] get an *emtpy* car for the test suite
66
+
- [x] implement a max size on disk limit
67
+
68
+
69
+
-----
70
+
71
+
older stuff (to clean up):
72
+
73
+
74
+
current car processing times (records processed into their length usize, phil's dev machine):
75
+
76
+
- 128MiB CAR file: `347ms`
77
+
- 5.0MiB: `6.1ms`
78
+
- 279KiB: `139us`
79
+
- 3.4KiB: `4.9us`
80
+
81
+
82
+
running the huge-car benchmark
83
+
84
+
- to avoid committing it to the repo, you have to pass it in through the env for now.
85
+
86
+
```bash
87
+
HUGE_CAR=~/Downloads/did_plc_redacted.car cargo bench -- huge-car
88
+
```
4
89
5
90
6
91
todo
7
92
8
-
- [ ] car file test fixtures & validation tests
9
-
- [ ] make sure we can get the did and signature out for verification
93
+
- [x] car file test fixtures & validation tests
94
+
- [x] make sure we can get the did and signature out for verification
95
+
-> yeah the commit is returned from init
10
96
- [ ] spec compliance todos
11
-
- [ ] assert that keys are ordered and fail if not
12
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
97
+
- [x] assert that keys are ordered and fail if not
98
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
13
99
- [ ] performance todos
14
-
- [ ] consume the serialized nodes into a mutable efficient format
100
+
- [x] consume the serialized nodes into a mutable efficient format
15
101
- [ ] maybe customize the deserialize impl to do that directly?
16
-
- [ ] benchmark and profile
102
+
- [x] benchmark and profile
17
103
- [ ] robustness todos
18
104
- [ ] swap the blocks hashmap for a BlockStore trait that can be dumped to redb
19
105
- [ ] maybe keep the redb function behind a feature flag?
20
106
- [ ] can we assert a max size for node blocks?
21
-
- [ ] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting
107
+
- [x] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting
108
+
-> because it's the upper 3 bytes, not upper 4 byte nibble, oops.
22
109
- [ ] max mst depth (there is actually a hard limit but a malicious repo could do anything)
23
-
- [ ] i don't think we need a max recursion depth for processing cbor contents since we leave records to the user to decode
110
+
- [ ] i don't *think* we need a max recursion depth for processing cbor contents since we leave records to the user to decode
24
111
25
112
newer ideas
26
113
···
47
134
- either just generally to handle huge CARs, or as a fallback when streaming fails
48
135
49
136
redb has an in-memory backend, so it would be possible to *always* use it for block caching. user can choose if they want to allow disk or just do memory, and then "spilling" from the cache to disk would be mostly free?
137
+
138
+
139
+
## license
140
+
141
+
This work is dual-licensed under MIT and Apache 2.0. You can choose between one of them if you use this work.
142
+
143
+
`SPDX-License-Identifier: MIT OR Apache-2.0`
+221
src/disk.rs
+221
src/disk.rs
···
1
+
/*!
2
+
Disk storage for blocks on disk
3
+
4
+
Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5
+
to be the best behaved in terms of both on-disk space usage and memory usage.
6
+
7
+
```no_run
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
+
# #[tokio::main]
10
+
# async fn main() -> Result<(), DiskError> {
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
15
+
# Ok(())
16
+
# }
17
+
```
18
+
*/
19
+
20
+
use crate::drive::DriveError;
21
+
use rusqlite::OptionalExtension;
22
+
use std::path::PathBuf;
23
+
24
+
#[derive(Debug, thiserror::Error)]
25
+
pub enum DiskError {
26
+
/// A wrapped database error
27
+
///
28
+
/// (The wrapped err should probably be obscured to remove public-facing
29
+
/// sqlite bits)
30
+
#[error(transparent)]
31
+
DbError(#[from] rusqlite::Error),
32
+
/// A tokio blocking task failed to join
33
+
#[error("Failed to join a tokio blocking task: {0}")]
34
+
JoinError(#[from] tokio::task::JoinError),
35
+
/// The total size of stored blocks exceeded the allowed size
36
+
///
37
+
/// If you need to process *really* big CARs, you can configure a higher
38
+
/// limit.
39
+
#[error("Maximum disk size reached")]
40
+
MaxSizeExceeded,
41
+
#[error("this error was replaced, seeing this is a bug.")]
42
+
#[doc(hidden)]
43
+
Stolen,
44
+
}
45
+
46
+
impl DiskError {
47
+
/// hack for ownership challenges with the disk driver
48
+
pub(crate) fn steal(&mut self) -> Self {
49
+
let mut swapped = DiskError::Stolen;
50
+
std::mem::swap(self, &mut swapped);
51
+
swapped
52
+
}
53
+
}
54
+
55
+
/// Builder-style disk store setup
56
+
#[derive(Debug, Clone)]
57
+
pub struct DiskBuilder {
58
+
/// Database in-memory cache allowance
59
+
///
60
+
/// Default: 32 MiB
61
+
pub cache_size_mb: usize,
62
+
/// Database stored block size limit
63
+
///
64
+
/// Default: 10 GiB
65
+
///
66
+
/// Note: actual size on disk may be more, but should approximately scale
67
+
/// with this limit
68
+
pub max_stored_mb: usize,
69
+
}
70
+
71
+
impl Default for DiskBuilder {
72
+
fn default() -> Self {
73
+
Self {
74
+
cache_size_mb: 32,
75
+
max_stored_mb: 10 * 1024, // 10 GiB
76
+
}
77
+
}
78
+
}
79
+
80
+
impl DiskBuilder {
81
+
/// Begin configuring the storage with defaults
82
+
pub fn new() -> Self {
83
+
Default::default()
84
+
}
85
+
/// Set the in-memory cache allowance for the database
86
+
///
87
+
/// Default: 32 MiB
88
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
89
+
self.cache_size_mb = size;
90
+
self
91
+
}
92
+
/// Set the approximate stored block size limit
93
+
///
94
+
/// Default: 10 GiB
95
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
96
+
self.max_stored_mb = max;
97
+
self
98
+
}
99
+
/// Open and initialize the actual disk storage
100
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
101
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
102
+
}
103
+
}
104
+
105
+
/// On-disk block storage
106
+
pub struct DiskStore {
107
+
conn: rusqlite::Connection,
108
+
max_stored: usize,
109
+
stored: usize,
110
+
}
111
+
112
+
impl DiskStore {
113
+
/// Initialize a new disk store
114
+
pub async fn new(
115
+
path: PathBuf,
116
+
cache_mb: usize,
117
+
max_stored_mb: usize,
118
+
) -> Result<Self, DiskError> {
119
+
let max_stored = max_stored_mb * 2_usize.pow(20);
120
+
let conn = tokio::task::spawn_blocking(move || {
121
+
let conn = rusqlite::Connection::open(path)?;
122
+
123
+
let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
124
+
125
+
// conn.pragma_update(None, "journal_mode", "OFF")?;
126
+
// conn.pragma_update(None, "journal_mode", "MEMORY")?;
127
+
conn.pragma_update(None, "journal_mode", "WAL")?;
128
+
// conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
129
+
conn.pragma_update(None, "synchronous", "OFF")?;
130
+
conn.pragma_update(
131
+
None,
132
+
"cache_size",
133
+
(cache_mb as i64 * sqlite_one_mb).to_string(),
134
+
)?;
135
+
Self::reset_tables(&conn)?;
136
+
137
+
Ok::<_, DiskError>(conn)
138
+
})
139
+
.await??;
140
+
141
+
Ok(Self {
142
+
conn,
143
+
max_stored,
144
+
stored: 0,
145
+
})
146
+
}
147
+
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
148
+
let tx = self.conn.transaction()?;
149
+
Ok(SqliteWriter {
150
+
tx,
151
+
stored: &mut self.stored,
152
+
max: self.max_stored,
153
+
})
154
+
}
155
+
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
156
+
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
157
+
Ok(SqliteReader { select_stmt })
158
+
}
159
+
/// Drop and recreate the kv table
160
+
pub async fn reset(self) -> Result<Self, DiskError> {
161
+
tokio::task::spawn_blocking(move || {
162
+
Self::reset_tables(&self.conn)?;
163
+
Ok(self)
164
+
})
165
+
.await?
166
+
}
167
+
fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
168
+
conn.execute("DROP TABLE IF EXISTS blocks", ())?;
169
+
conn.execute(
170
+
"CREATE TABLE blocks (
171
+
key BLOB PRIMARY KEY NOT NULL,
172
+
val BLOB NOT NULL
173
+
) WITHOUT ROWID",
174
+
(),
175
+
)?;
176
+
Ok(())
177
+
}
178
+
}
179
+
180
+
pub(crate) struct SqliteWriter<'conn> {
181
+
tx: rusqlite::Transaction<'conn>,
182
+
stored: &'conn mut usize,
183
+
max: usize,
184
+
}
185
+
186
+
impl SqliteWriter<'_> {
187
+
pub(crate) fn put_many(
188
+
&mut self,
189
+
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
190
+
) -> Result<(), DriveError> {
191
+
let mut insert_stmt = self
192
+
.tx
193
+
.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
194
+
.map_err(DiskError::DbError)?;
195
+
for pair in kv {
196
+
let (k, v) = pair?;
197
+
*self.stored += v.len();
198
+
if *self.stored > self.max {
199
+
return Err(DiskError::MaxSizeExceeded.into());
200
+
}
201
+
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
202
+
}
203
+
Ok(())
204
+
}
205
+
pub fn commit(self) -> Result<(), DiskError> {
206
+
self.tx.commit()?;
207
+
Ok(())
208
+
}
209
+
}
210
+
211
+
pub(crate) struct SqliteReader<'conn> {
212
+
select_stmt: rusqlite::Statement<'conn>,
213
+
}
214
+
215
+
impl SqliteReader<'_> {
216
+
pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
217
+
self.select_stmt
218
+
.query_one((&key,), |row| row.get(0))
219
+
.optional()
220
+
}
221
+
}
+562
-107
src/drive.rs
+562
-107
src/drive.rs
···
1
-
use futures::{Stream, TryStreamExt};
1
+
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
+
3
+
use crate::disk::{DiskError, DiskStore};
4
+
use crate::process::Processable;
2
5
use ipld_core::cid::Cid;
6
+
use iroh_car::CarReader;
7
+
use serde::{Deserialize, Serialize};
3
8
use std::collections::HashMap;
4
-
use std::error::Error;
9
+
use std::convert::Infallible;
10
+
use tokio::{io::AsyncRead, sync::mpsc};
5
11
6
12
use crate::mst::{Commit, Node};
7
-
use crate::walk::{Step, Trip, Walker};
13
+
use crate::walk::{Step, WalkError, Walker};
8
14
15
+
/// Errors that can happen while consuming and emitting blocks and records
9
16
#[derive(Debug, thiserror::Error)]
10
-
pub enum DriveError<E: Error> {
11
-
#[error("Failed to initialize CarReader: {0}")]
17
+
pub enum DriveError {
18
+
#[error("Error from iroh_car: {0}")]
12
19
CarReader(#[from] iroh_car::Error),
13
-
#[error("CAR file requires a root to be present")]
14
-
MissingRoot,
15
-
#[error("Car block stream error: {0}")]
16
-
CarBlockError(Box<dyn Error>),
17
20
#[error("Failed to decode commit block: {0}")]
18
-
BadCommit(Box<dyn Error>),
19
-
#[error("Failed to decode record block: {0}")]
20
-
BadRecord(Box<dyn Error>),
21
+
BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
21
22
#[error("The Commit block reference by the root was not found")]
22
23
MissingCommit,
23
24
#[error("The MST block {0} could not be found")]
24
25
MissingBlock(Cid),
25
26
#[error("Failed to walk the mst tree: {0}")]
26
-
Tripped(#[from] Trip<E>),
27
-
#[error("Not finished walking, but no more blocks are available to continue")]
28
-
Dnf,
27
+
WalkError(#[from] WalkError),
28
+
#[error("CAR file had no roots")]
29
+
MissingRoot,
30
+
#[error("Storage error")]
31
+
StorageError(#[from] DiskError),
32
+
#[error("Encode error: {0}")]
33
+
BincodeEncodeError(#[from] bincode::error::EncodeError),
34
+
#[error("Tried to send on a closed channel")]
35
+
ChannelSendError, // SendError takes <T> which we don't need
36
+
#[error("Failed to join a task: {0}")]
37
+
JoinError(#[from] tokio::task::JoinError),
29
38
}
30
39
31
-
type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
40
+
#[derive(Debug, thiserror::Error)]
41
+
pub enum DecodeError {
42
+
#[error(transparent)]
43
+
BincodeDecodeError(#[from] bincode::error::DecodeError),
44
+
#[error("extra bytes remained after decoding")]
45
+
ExtraGarbage,
46
+
}
32
47
33
-
#[derive(Debug)]
34
-
pub struct Rkey(pub String);
48
+
/// An in-order chunk of Rkey + (processed) Block pairs
49
+
pub type BlockChunk<T> = Vec<(String, T)>;
35
50
36
-
#[derive(Debug)]
37
-
pub enum MaybeProcessedBlock<T, E> {
51
+
#[derive(Debug, Clone, Serialize, Deserialize)]
52
+
pub(crate) enum MaybeProcessedBlock<T> {
38
53
/// A block that's *probably* a Node (but we can't know yet)
39
54
///
40
55
/// It *can be* a record that suspiciously looks a lot like a node, so we
···
53
68
/// If we _never_ needed this block, then we may have wasted a bit of effort
54
69
/// trying to process it. Oh well.
55
70
///
56
-
/// It would be nice to store the real error type from the processing
57
-
/// function, but I'm leaving that generics puzzle for later.
58
-
///
59
71
/// There's an alternative here, which would be to kick unprocessable blocks
60
72
/// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
61
73
/// surface the typed error later if needed by trying to reprocess.
62
-
Processed(Result<T, E>),
74
+
Processed(T),
63
75
}
64
76
65
-
// TODO: generic error not box dyn nonsense.
66
-
pub type ProcRes<T, E> = Result<T, E>;
77
+
impl<T: Processable> Processable for MaybeProcessedBlock<T> {
78
+
/// TODO this is probably a little broken
79
+
fn get_size(&self) -> usize {
80
+
use std::{cmp::max, mem::size_of};
67
81
68
-
pub struct Vehicle<SE, S, T, P, PE>
69
-
where
70
-
S: Stream<Item = CarBlock<SE>>,
71
-
P: Fn(&[u8]) -> ProcRes<T, PE>,
72
-
PE: Error,
73
-
{
74
-
block_stream: S,
75
-
blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
76
-
walker: Walker,
77
-
process: P,
82
+
// enum is always as big as its biggest member?
83
+
let base_size = max(size_of::<Vec<u8>>(), size_of::<T>());
84
+
85
+
let extra = match self {
86
+
Self::Raw(bytes) => bytes.len(),
87
+
Self::Processed(t) => t.get_size(),
88
+
};
89
+
90
+
base_size + extra
91
+
}
92
+
}
93
+
94
+
impl<T> MaybeProcessedBlock<T> {
95
+
fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self {
96
+
if Node::could_be(&data) {
97
+
MaybeProcessedBlock::Raw(data)
98
+
} else {
99
+
MaybeProcessedBlock::Processed(process(data))
100
+
}
101
+
}
102
+
}
103
+
104
+
/// Read a CAR file, buffering blocks in memory or to disk
105
+
pub enum Driver<R: AsyncRead + Unpin, T: Processable> {
106
+
/// All blocks fit within the memory limit
107
+
///
108
+
/// You probably want to check the commit's signature. You can go ahead and
109
+
/// walk the MST right away.
110
+
Memory(Commit, MemDriver<T>),
111
+
/// Blocks exceed the memory limit
112
+
///
113
+
/// You'll need to provide a disk storage to continue. The commit will be
114
+
/// returned and can be validated only once all blocks are loaded.
115
+
Disk(NeedDisk<R, T>),
116
+
}
117
+
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
123
+
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
+
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
+
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
163
+
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
172
+
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
+
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
78
188
}
79
189
80
-
impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
81
-
where
82
-
SE: Error + 'static,
83
-
S: Stream<Item = CarBlock<SE>> + Unpin,
84
-
P: Fn(&[u8]) -> ProcRes<T, PE>,
85
-
PE: Error,
86
-
{
87
-
pub async fn init(
88
-
root: Cid,
89
-
mut block_stream: S,
90
-
process: P,
91
-
) -> Result<(Commit, Self), DriveError<PE>> {
92
-
let mut blocks = HashMap::new();
190
+
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
191
+
/// Begin processing an atproto MST from a CAR file
192
+
///
193
+
/// Blocks will be loaded, processed, and buffered in memory. If the entire
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
196
+
///
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
198
+
/// partial state will be returned as `Driver::Disk(needed)`, which can be
199
+
/// resumed by providing a `SqliteStorage` for on-disk block storage.
200
+
pub async fn load_car(
201
+
reader: R,
202
+
process: fn(Vec<u8>) -> T,
203
+
mem_limit_mb: usize,
204
+
) -> Result<Driver<R, T>, DriveError> {
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
206
+
let mut mem_blocks = HashMap::new();
207
+
208
+
let mut car = CarReader::new(reader).await?;
209
+
210
+
let root = *car
211
+
.header()
212
+
.roots()
213
+
.first()
214
+
.ok_or(DriveError::MissingRoot)?;
215
+
log::debug!("root: {root:?}");
93
216
94
217
let mut commit = None;
95
218
96
-
while let Some((cid, data)) = block_stream
97
-
.try_next()
98
-
.await
99
-
.map_err(|e| DriveError::CarBlockError(e.into()))?
100
-
{
219
+
// try to load all the blocks into memory
220
+
let mut mem_size = 0;
221
+
while let Some((cid, data)) = car.next_block().await? {
222
+
// the root commit is a Special Third Kind of block that we need to make
223
+
// sure not to optimistically send to the processing function
101
224
if cid == root {
102
-
let c: Commit = serde_ipld_dagcbor::from_slice(&data)
103
-
.map_err(|e| DriveError::BadCommit(e.into()))?;
225
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
104
226
commit = Some(c);
105
-
break; // inner while
106
-
} else {
107
-
blocks.insert(
108
-
cid,
109
-
if Node::could_be(&data) {
110
-
MaybeProcessedBlock::Raw(data)
111
-
} else {
112
-
MaybeProcessedBlock::Processed(process(&data))
113
-
},
114
-
);
227
+
continue;
228
+
}
229
+
230
+
// remaining possible types: node, record, other. optimistically process
231
+
let maybe_processed = MaybeProcessedBlock::maybe(process, data);
232
+
233
+
// stash (maybe processed) blocks in memory as long as we have room
234
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
235
+
mem_blocks.insert(cid, maybe_processed);
236
+
if mem_size >= max_size {
237
+
return Ok(Driver::Disk(NeedDisk {
238
+
car,
239
+
root,
240
+
process,
241
+
max_size,
242
+
mem_blocks,
243
+
commit,
244
+
}));
115
245
}
116
246
}
117
247
118
-
// we either broke out or read all the blocks without finding the commit...
248
+
// all blocks loaded and we fit in memory! hopefully we found the commit...
119
249
let commit = commit.ok_or(DriveError::MissingCommit)?;
120
250
121
251
let walker = Walker::new(commit.data);
122
252
123
-
let me = Self {
124
-
block_stream,
125
-
blocks,
126
-
walker,
127
-
process,
128
-
};
129
-
Ok((commit, me))
253
+
Ok(Driver::Memory(
254
+
commit,
255
+
MemDriver {
256
+
blocks: mem_blocks,
257
+
walker,
258
+
process,
259
+
},
260
+
))
261
+
}
262
+
}
263
+
264
+
/// The core driver between the block stream and MST walker
265
+
///
266
+
/// In the future, PDSs will export CARs in a stream-friendly order that will
267
+
/// enable processing them with tiny memory overhead. But that future is not
268
+
/// here yet.
269
+
///
270
+
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
271
+
/// optimistic stream features: we load all block first, then walk the MST.
272
+
///
273
+
/// This makes things much simpler: we only need to worry about spilling to disk
274
+
/// in one place, and we always have a reasonable expecatation about how much
275
+
/// work the init function will do. We can drop the CAR reader before walking,
276
+
/// so the sync/async boundaries become a little easier to work around.
277
+
#[derive(Debug)]
278
+
pub struct MemDriver<T: Processable> {
279
+
blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
280
+
walker: Walker,
281
+
process: fn(Vec<u8>) -> T,
282
+
}
283
+
284
+
impl<T: Processable> MemDriver<T> {
285
+
/// Step through the record outputs, in rkey order
286
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
287
+
let mut out = Vec::with_capacity(n);
288
+
for _ in 0..n {
289
+
// walk as far as we can until we run out of blocks or find a record
290
+
match self.walker.step(&mut self.blocks, self.process)? {
291
+
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
292
+
Step::Finish => break,
293
+
Step::Found { rkey, data } => {
294
+
out.push((rkey, data));
295
+
continue;
296
+
}
297
+
};
298
+
}
299
+
300
+
if out.is_empty() {
301
+
Ok(None)
302
+
} else {
303
+
Ok(Some(out))
304
+
}
305
+
}
306
+
}
307
+
308
+
/// A partially memory-loaded car file that needs disk spillover to continue
309
+
pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> {
310
+
car: CarReader<R>,
311
+
root: Cid,
312
+
process: fn(Vec<u8>) -> T,
313
+
max_size: usize,
314
+
mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
315
+
pub commit: Option<Commit>,
316
+
}
317
+
318
+
fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> {
319
+
bincode::serde::encode_to_vec(v, bincode::config::standard())
320
+
}
321
+
322
+
pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> {
323
+
let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
324
+
if n != bytes.len() {
325
+
return Err(DecodeError::ExtraGarbage);
130
326
}
327
+
Ok(t)
328
+
}
131
329
132
-
async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> {
133
-
while let Some((cid, data)) = self
134
-
.block_stream
135
-
.try_next()
136
-
.await
137
-
.map_err(|e| DriveError::CarBlockError(e.into()))?
138
-
{
139
-
self.blocks.insert(
140
-
cid,
141
-
if Node::could_be(&data) {
142
-
MaybeProcessedBlock::Raw(data)
143
-
} else {
144
-
MaybeProcessedBlock::Processed((self.process)(&data))
145
-
},
146
-
);
147
-
if cid == cid_needed {
148
-
return Ok(());
330
+
impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> {
331
+
pub async fn finish_loading(
332
+
mut self,
333
+
mut store: DiskStore,
334
+
) -> Result<(Commit, DiskDriver<T>), DriveError> {
335
+
// move store in and back out so we can manage lifetimes
336
+
// dump mem blocks into the store
337
+
store = tokio::task::spawn(async move {
338
+
let mut writer = store.get_writer()?;
339
+
340
+
let kvs = self
341
+
.mem_blocks
342
+
.into_iter()
343
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
344
+
345
+
writer.put_many(kvs)?;
346
+
writer.commit()?;
347
+
Ok::<_, DriveError>(store)
348
+
})
349
+
.await??;
350
+
351
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
352
+
353
+
let store_worker = tokio::task::spawn_blocking(move || {
354
+
let mut writer = store.get_writer()?;
355
+
356
+
while let Some(chunk) = rx.blocking_recv() {
357
+
let kvs = chunk
358
+
.into_iter()
359
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
360
+
writer.put_many(kvs)?;
361
+
}
362
+
363
+
writer.commit()?;
364
+
Ok::<_, DriveError>(store)
365
+
}); // await later
366
+
367
+
// dump the rest to disk (in chunks)
368
+
log::debug!("dumping the rest of the stream...");
369
+
loop {
370
+
let mut mem_size = 0;
371
+
let mut chunk = vec![];
372
+
loop {
373
+
let Some((cid, data)) = self.car.next_block().await? else {
374
+
break;
375
+
};
376
+
// we still gotta keep checking for the root since we might not have it
377
+
if cid == self.root {
378
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
379
+
self.commit = Some(c);
380
+
continue;
381
+
}
382
+
// remaining possible types: node, record, other. optimistically process
383
+
// TODO: get the actual in-memory size to compute disk spill
384
+
let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
385
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
386
+
chunk.push((cid, maybe_processed));
387
+
if mem_size >= self.max_size {
388
+
// soooooo if we're setting the db cache to max_size and then letting
389
+
// multiple chunks in the queue that are >= max_size, then at any time
390
+
// we might be using some multiple of max_size?
391
+
break;
392
+
}
393
+
}
394
+
if chunk.is_empty() {
395
+
break;
149
396
}
397
+
tx.send(chunk)
398
+
.await
399
+
.map_err(|_| DriveError::ChannelSendError)?;
150
400
}
401
+
drop(tx);
402
+
log::debug!("done. waiting for worker to finish...");
403
+
404
+
store = store_worker.await??;
405
+
406
+
log::debug!("worker finished.");
407
+
408
+
let commit = self.commit.ok_or(DriveError::MissingCommit)?;
151
409
152
-
// if we never found the block
153
-
Err(DriveError::MissingBlock(cid_needed))
410
+
let walker = Walker::new(commit.data);
411
+
412
+
Ok((
413
+
commit,
414
+
DiskDriver {
415
+
process: self.process,
416
+
state: Some(BigState { store, walker }),
417
+
},
418
+
))
419
+
}
420
+
}
421
+
422
+
struct BigState {
423
+
store: DiskStore,
424
+
walker: Walker,
425
+
}
426
+
427
+
/// MST walker that reads from disk instead of an in-memory hashmap
428
+
pub struct DiskDriver<T: Clone> {
429
+
process: fn(Vec<u8>) -> T,
430
+
state: Option<BigState>,
431
+
}
432
+
433
+
// for doctests only
434
+
#[doc(hidden)]
435
+
pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> {
436
+
use crate::process::noop;
437
+
DiskDriver {
438
+
process: noop,
439
+
state: None,
154
440
}
441
+
}
155
442
156
-
pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError<PE>> {
443
+
impl<T: Processable + Send + 'static> DiskDriver<T> {
444
+
/// Walk the MST returning up to `n` rkey + record pairs
445
+
///
446
+
/// ```no_run
447
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
448
+
/// # #[tokio::main]
449
+
/// # async fn main() -> Result<(), DriveError> {
450
+
/// # let mut disk_driver = _get_fake_disk_driver();
451
+
/// while let Some(pairs) = disk_driver.next_chunk(256).await? {
452
+
/// for (rkey, record) in pairs {
453
+
/// println!("{rkey}: size={}", record.len());
454
+
/// }
455
+
/// }
456
+
/// let store = disk_driver.reset_store().await?;
457
+
/// # Ok(())
458
+
/// # }
459
+
/// ```
460
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
461
+
let process = self.process;
462
+
463
+
// state should only *ever* be None transiently while inside here
464
+
let mut state = self.state.take().expect("DiskDriver must have Some(state)");
465
+
466
+
// the big pain here is that we don't want to leave self.state in an
467
+
// invalid state (None), so all the error paths have to make sure it
468
+
// comes out again.
469
+
let (state, res) = tokio::task::spawn_blocking(
470
+
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
471
+
let mut reader_res = state.store.get_reader();
472
+
let reader: &mut _ = match reader_res {
473
+
Ok(ref mut r) => r,
474
+
Err(ref mut e) => {
475
+
// unfortunately we can't return the error directly because
476
+
// (for some reason) it's attached to the lifetime of the
477
+
// reader?
478
+
// hack a mem::swap so we can get it out :/
479
+
let e_swapped = e.steal();
480
+
// the pain: `state` *has to* outlive the reader
481
+
drop(reader_res);
482
+
return (state, Err(e_swapped.into()));
483
+
}
484
+
};
485
+
486
+
let mut out = Vec::with_capacity(n);
487
+
488
+
for _ in 0..n {
489
+
// walk as far as we can until we run out of blocks or find a record
490
+
let step = match state.walker.disk_step(reader, process) {
491
+
Ok(s) => s,
492
+
Err(e) => {
493
+
// the pain: `state` *has to* outlive the reader
494
+
drop(reader_res);
495
+
return (state, Err(e.into()));
496
+
}
497
+
};
498
+
match step {
499
+
Step::Missing(cid) => {
500
+
// the pain: `state` *has to* outlive the reader
501
+
drop(reader_res);
502
+
return (state, Err(DriveError::MissingBlock(cid)));
503
+
}
504
+
Step::Finish => break,
505
+
Step::Found { rkey, data } => out.push((rkey, data)),
506
+
};
507
+
}
508
+
509
+
// `state` *has to* outlive the reader
510
+
drop(reader_res);
511
+
512
+
(state, Ok::<_, DriveError>(out))
513
+
},
514
+
)
515
+
.await?; // on tokio JoinError, we'll be left with invalid state :(
516
+
517
+
// *must* restore state before dealing with the actual result
518
+
self.state = Some(state);
519
+
520
+
let out = res?;
521
+
522
+
if out.is_empty() {
523
+
Ok(None)
524
+
} else {
525
+
Ok(Some(out))
526
+
}
527
+
}
528
+
529
+
fn read_tx_blocking(
530
+
&mut self,
531
+
n: usize,
532
+
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
533
+
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
534
+
let BigState { store, walker } = self.state.as_mut().expect("valid state");
535
+
let mut reader = match store.get_reader() {
536
+
Ok(r) => r,
537
+
Err(e) => return tx.blocking_send(Err(e.into())),
538
+
};
539
+
157
540
loop {
158
-
// walk as far as we can until we run out of blocks or find a record
159
-
let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? {
160
-
Step::Rest(cid) => cid,
161
-
Step::Finish => return Ok(None),
162
-
Step::Step { rkey, data } => return Ok(Some((Rkey(rkey), data))),
163
-
};
541
+
let mut out: BlockChunk<T> = Vec::with_capacity(n);
542
+
543
+
for _ in 0..n {
544
+
// walk as far as we can until we run out of blocks or find a record
545
+
546
+
let step = match walker.disk_step(&mut reader, self.process) {
547
+
Ok(s) => s,
548
+
Err(e) => return tx.blocking_send(Err(e.into())),
549
+
};
550
+
551
+
match step {
552
+
Step::Missing(cid) => {
553
+
return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
554
+
}
555
+
Step::Finish => return Ok(()),
556
+
Step::Found { rkey, data } => {
557
+
out.push((rkey, data));
558
+
continue;
559
+
}
560
+
};
561
+
}
164
562
165
-
// load blocks until we reach that cid
166
-
self.drive_until(cid_needed).await?;
563
+
if out.is_empty() {
564
+
break;
565
+
}
566
+
tx.blocking_send(Ok(out))?;
167
567
}
568
+
569
+
Ok(())
168
570
}
169
571
170
-
pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError<PE>>> {
171
-
futures::stream::try_unfold(self, |mut this| async move {
172
-
let maybe_record = this.next_record().await?;
173
-
Ok(maybe_record.map(|b| (b, this)))
174
-
})
572
+
/// Spawn the disk reading task into a tokio blocking thread
573
+
///
574
+
/// The idea is to avoid so much sending back and forth to the blocking
575
+
/// thread, letting a blocking task do all the disk reading work and sending
576
+
/// records and rkeys back through an `mpsc` channel instead.
577
+
///
578
+
/// This might also allow the disk work to continue while processing the
579
+
/// records. It's still not yet clear if this method actually has much
580
+
/// benefit over just using `.next_chunk(n)`.
581
+
///
582
+
/// ```no_run
583
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
584
+
/// # #[tokio::main]
585
+
/// # async fn main() -> Result<(), DriveError> {
586
+
/// # let mut disk_driver = _get_fake_disk_driver();
587
+
/// let (mut rx, join) = disk_driver.to_channel(512);
588
+
/// while let Some(recvd) = rx.recv().await {
589
+
/// let pairs = recvd?;
590
+
/// for (rkey, record) in pairs {
591
+
/// println!("{rkey}: size={}", record.len());
592
+
/// }
593
+
///
594
+
/// }
595
+
/// let store = join.await?.reset_store().await?;
596
+
/// # Ok(())
597
+
/// # }
598
+
/// ```
599
+
pub fn to_channel(
600
+
mut self,
601
+
n: usize,
602
+
) -> (
603
+
mpsc::Receiver<Result<BlockChunk<T>, DriveError>>,
604
+
tokio::task::JoinHandle<Self>,
605
+
) {
606
+
let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1);
607
+
608
+
// sketch: this worker is going to be allowed to execute without a join handle
609
+
let chan_task = tokio::task::spawn_blocking(move || {
610
+
if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
611
+
log::debug!("big car reader exited early due to dropped receiver channel");
612
+
}
613
+
self
614
+
});
615
+
616
+
(rx, chan_task)
617
+
}
618
+
619
+
/// Reset the disk storage so it can be reused. You must call this.
620
+
///
621
+
/// Ideally we'd put this in an `impl Drop`, but since it makes blocking
622
+
/// calls, that would be risky in an async context. For now you just have to
623
+
/// carefully make sure you call it.
624
+
///
625
+
/// The sqlite store is returned, so it can be reused for another
626
+
/// `DiskDriver`.
627
+
pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
628
+
let BigState { store, .. } = self.state.take().expect("valid state");
629
+
Ok(store.reset().await?)
175
630
}
176
631
}
+86
-2
src/lib.rs
+86
-2
src/lib.rs
···
1
-
pub mod drive;
1
+
/*!
2
+
A robust CAR file -> MST walker for atproto
3
+
4
+
Small CARs have their blocks buffered in memory. If a configurable memory limit
5
+
is reached while reading blocks, CAR reading is suspended, and can be continued
6
+
by providing disk storage to buffer the CAR blocks instead.
7
+
8
+
A `process` function can be provided for tasks where records are transformed
9
+
into a smaller representation, to save memory (and disk) during block reading.
10
+
11
+
Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
12
+
`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
13
+
14
+
Some MST validations are applied
15
+
- Keys must appear in order
16
+
- Keys must be at the correct MST tree depth
17
+
18
+
`iroh_car` additionally applies a block size limit of `2MiB`.
19
+
20
+
```
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
+
23
+
# #[tokio::main]
24
+
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
+
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
+
let mut total_size = 0;
27
+
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
34
+
35
+
// if all blocks fit within memory
36
+
Driver::Memory(_commit, mut driver) => {
37
+
while let Some(chunk) = driver.next_chunk(256).await? {
38
+
for (_rkey, size) in chunk {
39
+
total_size += size;
40
+
}
41
+
}
42
+
},
43
+
44
+
// if the CAR was too big for in-memory processing
45
+
Driver::Disk(paused) => {
46
+
// set up a disk store we can spill to
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
48
+
// do the spilling, get back a (similar) driver
49
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
50
+
51
+
while let Some(chunk) = driver.next_chunk(256).await? {
52
+
for (_rkey, size) in chunk {
53
+
total_size += size;
54
+
}
55
+
}
56
+
57
+
// clean up the disk store (drop tables etc)
58
+
driver.reset_store().await?;
59
+
}
60
+
};
61
+
println!("sum of size of all records: {total_size}");
62
+
# Ok(())
63
+
# }
64
+
```
65
+
66
+
Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
67
+
ahead and eagerly using disk I/O. This means you have to write a bit more code
68
+
to handle both cases, but it allows you to have finer control over resource
69
+
usage. For example, you can drive a number of parallel memory CAR workers, and
70
+
separately have a different number of disk workers picking up suspended disk
71
+
tasks from a queue.
72
+
73
+
Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
74
+
75
+
*/
76
+
2
77
pub mod mst;
3
-
pub mod walk;
78
+
mod walk;
79
+
80
+
pub mod disk;
81
+
pub mod drive;
82
+
pub mod process;
83
+
84
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
85
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
86
+
pub use mst::Commit;
87
+
pub use process::Processable;
+9
-10
src/mst.rs
+9
-10
src/mst.rs
···
39
39
/// MST node data schema
40
40
#[derive(Debug, Deserialize, PartialEq)]
41
41
#[serde(deny_unknown_fields)]
42
-
pub struct Node {
42
+
pub(crate) struct Node {
43
43
/// link to sub-tree Node on a lower level and with all keys sorting before
44
44
/// keys at this node
45
45
#[serde(rename = "l")]
···
62
62
/// so if a block *could be* a node, any record converter must postpone
63
63
/// processing. if it turns out it happens to be a very node-looking record,
64
64
/// well, sorry, it just has to only be processed later when that's known.
65
-
pub fn could_be(bytes: impl AsRef<[u8]>) -> bool {
65
+
pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool {
66
66
const NODE_FINGERPRINT: [u8; 3] = [
67
67
0xA2, // map length 2 (for "l" and "e" keys)
68
68
0x61, // text length 1
69
69
b'e', // "e" before "l" because map keys have to be lex-sorted
70
-
// 0x8?: "e" contains an array (0x8 nibble) of some length (low nib)
70
+
// 0x8?: "e" has array (0x100 upper 3 bits) of some length
71
71
];
72
72
let bytes = bytes.as_ref();
73
73
bytes.starts_with(&NODE_FINGERPRINT)
74
-
// && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false)
74
+
&& bytes
75
+
.get(3)
76
+
.map(|b| b & 0b1110_0000 == 0x80)
77
+
.unwrap_or(false)
75
78
}
76
79
77
80
/// Check if a node has any entries
···
80
83
/// with an empty array of entries. This is the only situation in which a
81
84
/// tree may contain an empty leaf node which does not either contain keys
82
85
/// ("entries") or point to a sub-tree containing entries.
83
-
///
84
-
/// TODO: to me this is slightly unclear with respect to `l` (ask someone).
85
-
/// ...is that what "The top of the tree must not be a an empty node which
86
-
/// only points to a sub-tree." is referring to?
87
-
pub fn is_empty(&self) -> bool {
86
+
pub(crate) fn is_empty(&self) -> bool {
88
87
self.left.is_none() && self.entries.is_empty()
89
88
}
90
89
}
···
92
91
/// TreeEntry object
93
92
#[derive(Debug, Deserialize, PartialEq)]
94
93
#[serde(deny_unknown_fields)]
95
-
pub struct Entry {
94
+
pub(crate) struct Entry {
96
95
/// count of bytes shared with previous TreeEntry in this Node (if any)
97
96
#[serde(rename = "p")]
98
97
pub prefix_len: usize,
+108
src/process.rs
+108
src/process.rs
···
1
+
/*!
2
+
Record processor function output trait
3
+
4
+
The return type must satisfy the `Processable` trait, which requires:
5
+
6
+
- `Clone` because two rkeys can refer to the same record by CID, which may
7
+
only appear once in the CAR file.
8
+
- `Serialize + DeserializeOwned` so it can be spilled to disk.
9
+
10
+
One required function must be implemented, `get_size()`: this should return the
11
+
approximate total off-stack size of the type. (the on-stack size will be added
12
+
automatically via `std::mem::get_size`).
13
+
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
20
+
Here's a silly processing function that just collects 'eyy's found in the raw
21
+
record bytes
22
+
23
+
```
24
+
# use repo_stream::Processable;
25
+
# use serde::{Serialize, Deserialize};
26
+
#[derive(Debug, Clone, Serialize, Deserialize)]
27
+
struct Eyy(usize, String);
28
+
29
+
impl Processable for Eyy {
30
+
fn get_size(&self) -> usize {
31
+
// don't need to compute the usize, it's on the stack
32
+
self.1.capacity() // in-mem size from the string's capacity, in bytes
33
+
}
34
+
}
35
+
36
+
fn process(raw: Vec<u8>) -> Vec<Eyy> {
37
+
let mut out = Vec::new();
38
+
let to_find = "eyy".as_bytes();
39
+
for i in 0..(raw.len() - 3) {
40
+
if &raw[i..(i+3)] == to_find {
41
+
out.push(Eyy(i, "eyy".to_string()));
42
+
}
43
+
}
44
+
out
45
+
}
46
+
```
47
+
48
+
The memory sizing stuff is a little sketch but probably at least approximately
49
+
works.
50
+
*/
51
+
52
+
use serde::{Serialize, de::DeserializeOwned};
53
+
54
+
/// Output trait for record processing
55
+
pub trait Processable: Clone + Serialize + DeserializeOwned {
56
+
/// Any additional in-memory size taken by the processed type
57
+
///
58
+
/// Do not include stack size (`std::mem::size_of`)
59
+
fn get_size(&self) -> usize;
60
+
}
61
+
62
+
/// Processor that just returns the raw blocks
63
+
#[inline]
64
+
pub fn noop(block: Vec<u8>) -> Vec<u8> {
65
+
block
66
+
}
67
+
68
+
impl Processable for u8 {
69
+
fn get_size(&self) -> usize {
70
+
0
71
+
}
72
+
}
73
+
74
+
impl Processable for usize {
75
+
fn get_size(&self) -> usize {
76
+
0 // no additional space taken, just its stack size (newtype is free)
77
+
}
78
+
}
79
+
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
86
+
impl<Item: Sized + Processable> Processable for Vec<Item> {
87
+
fn get_size(&self) -> usize {
88
+
let slot_size = std::mem::size_of::<Item>();
89
+
let direct_size = slot_size * self.capacity();
90
+
let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum();
91
+
direct_size + items_referenced_size
92
+
}
93
+
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+278
-255
src/walk.rs
+278
-255
src/walk.rs
···
1
1
//! Depth-first MST traversal
2
2
3
-
use crate::drive::{MaybeProcessedBlock, ProcRes};
3
+
use crate::disk::SqliteReader;
4
+
use crate::drive::{DecodeError, MaybeProcessedBlock};
4
5
use crate::mst::Node;
6
+
use crate::process::Processable;
5
7
use ipld_core::cid::Cid;
8
+
use sha2::{Digest, Sha256};
6
9
use std::collections::HashMap;
7
-
use std::error::Error;
10
+
use std::convert::Infallible;
8
11
12
+
/// Errors that can happen while walking
9
13
#[derive(Debug, thiserror::Error)]
10
-
pub enum Trip<E: Error> {
11
-
#[error("empty mst nodes are not allowed")]
12
-
NodeEmpty,
14
+
pub enum WalkError {
15
+
#[error("Failed to fingerprint commit block")]
16
+
BadCommitFingerprint,
13
17
#[error("Failed to decode commit block: {0}")]
14
-
BadCommit(Box<dyn std::error::Error>),
15
-
#[error("Failed to process record: {0}")]
16
-
RecordFailedProcessing(Box<dyn Error>),
18
+
BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
17
19
#[error("Action node error: {0}")]
18
-
ActionNode(#[from] ActionNodeError),
19
-
#[error("Process failed: {0}")]
20
-
ProcessFailed(E),
20
+
MstError(#[from] MstError),
21
+
#[error("storage error: {0}")]
22
+
StorageError(#[from] rusqlite::Error),
23
+
#[error("Decode error: {0}")]
24
+
DecodeError(#[from] DecodeError),
21
25
}
22
26
23
-
#[derive(Debug, thiserror::Error)]
24
-
pub enum ActionNodeError {
27
+
/// Errors from invalid Rkeys
28
+
#[derive(Debug, PartialEq, thiserror::Error)]
29
+
pub enum MstError {
25
30
#[error("Failed to compute an rkey due to invalid prefix_len")]
26
31
EntryPrefixOutOfbounds,
27
32
#[error("RKey was not utf-8")]
28
33
EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
34
+
#[error("Nodes cannot be empty (except for an entirely empty MST)")]
35
+
EmptyNode,
36
+
#[error("Found an entry with rkey at the wrong depth")]
37
+
WrongDepth,
38
+
#[error("Lost track of our depth (possible bug?)")]
39
+
LostDepth,
40
+
#[error("MST depth underflow: depth-0 node with child trees")]
41
+
DepthUnderflow,
42
+
#[error("Encountered an rkey out of order while walking the MST")]
43
+
RkeyOutOfOrder,
29
44
}
30
45
46
+
/// Walker outputs
31
47
#[derive(Debug)]
32
48
pub enum Step<T> {
33
-
Rest(Cid),
49
+
/// We needed this CID but it's not in the block store
50
+
Missing(Cid),
51
+
/// Reached the end of the MST! yay!
34
52
Finish,
35
-
Step { rkey: String, data: T },
53
+
/// A record was found!
54
+
Found { rkey: String, data: T },
36
55
}
37
56
38
57
#[derive(Debug, Clone, PartialEq)]
39
58
enum Need {
40
-
Node(Cid),
59
+
Node { depth: Depth, cid: Cid },
41
60
Record { rkey: String, cid: Cid },
42
61
}
43
62
44
-
fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), ActionNodeError> {
45
-
let mut entries = Vec::with_capacity(node.entries.len());
63
+
#[derive(Debug, Clone, Copy, PartialEq)]
64
+
enum Depth {
65
+
Root,
66
+
Depth(u32),
67
+
}
46
68
69
+
impl Depth {
70
+
fn from_key(key: &[u8]) -> Self {
71
+
let mut zeros = 0;
72
+
for byte in Sha256::digest(key) {
73
+
let leading = byte.leading_zeros();
74
+
zeros += leading;
75
+
if leading < 8 {
76
+
break;
77
+
}
78
+
}
79
+
Self::Depth(zeros / 2) // truncating divide (rounds down)
80
+
}
81
+
fn next_expected(&self) -> Result<Option<u32>, MstError> {
82
+
match self {
83
+
Self::Root => Ok(None),
84
+
Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some),
85
+
}
86
+
}
87
+
}
88
+
89
+
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
+
// empty nodes are not allowed in the MST except in an empty MST
91
+
if node.is_empty() {
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
97
+
}
98
+
99
+
let mut entries = Vec::with_capacity(node.entries.len());
47
100
let mut prefix = vec![];
101
+
let mut this_depth = parent_depth.next_expected()?;
102
+
48
103
for entry in &node.entries {
49
104
let mut rkey = vec![];
50
105
let pre_checked = prefix
51
106
.get(..entry.prefix_len)
52
-
.ok_or(ActionNodeError::EntryPrefixOutOfbounds)?;
107
+
.ok_or(MstError::EntryPrefixOutOfbounds)?;
53
108
rkey.extend_from_slice(pre_checked);
54
109
rkey.extend_from_slice(&entry.keysuffix);
110
+
111
+
let Depth::Depth(key_depth) = Depth::from_key(&rkey) else {
112
+
return Err(MstError::WrongDepth);
113
+
};
114
+
115
+
// this_depth is `none` if we are the deepest child (directly below root)
116
+
// in that case we accept whatever highest depth is claimed
117
+
let expected_depth = match this_depth {
118
+
Some(d) => d,
119
+
None => {
120
+
this_depth = Some(key_depth);
121
+
key_depth
122
+
}
123
+
};
124
+
125
+
// all keys we find should be this depth
126
+
if key_depth != expected_depth {
127
+
return Err(MstError::DepthUnderflow);
128
+
}
129
+
55
130
prefix = rkey.clone();
56
131
57
132
entries.push(Need::Record {
···
59
134
cid: entry.value,
60
135
});
61
136
if let Some(ref tree) = entry.tree {
62
-
entries.push(Need::Node(*tree));
137
+
entries.push(Need::Node {
138
+
depth: Depth::Depth(key_depth),
139
+
cid: *tree,
140
+
});
63
141
}
64
142
}
65
143
66
144
entries.reverse();
67
145
stack.append(&mut entries);
68
146
147
+
let d = this_depth.ok_or(MstError::LostDepth)?;
148
+
69
149
if let Some(tree) = node.left {
70
-
stack.push(Need::Node(tree));
150
+
stack.push(Need::Node {
151
+
depth: Depth::Depth(d),
152
+
cid: tree,
153
+
});
71
154
}
72
155
Ok(())
73
156
}
74
157
158
+
/// Traverser of an atproto MST
159
+
///
160
+
/// Walks the tree from left-to-right in depth-first order
75
161
#[derive(Debug)]
76
162
pub struct Walker {
77
163
stack: Vec<Need>,
164
+
prev: String,
78
165
}
79
166
80
167
impl Walker {
81
168
pub fn new(tree_root_cid: Cid) -> Self {
82
169
Self {
83
-
stack: vec![Need::Node(tree_root_cid)],
170
+
stack: vec![Need::Node {
171
+
depth: Depth::Root,
172
+
cid: tree_root_cid,
173
+
}],
174
+
prev: "".to_string(),
84
175
}
85
176
}
86
177
87
-
pub fn walk<T: Clone, E: Error>(
178
+
/// Advance through nodes until we find a record or can't go further
179
+
pub fn step<T: Processable>(
88
180
&mut self,
89
-
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>,
90
-
process: impl Fn(&[u8]) -> ProcRes<T, E>,
91
-
) -> Result<Step<T>, Trip<E>> {
181
+
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>,
182
+
process: impl Fn(Vec<u8>) -> T,
183
+
) -> Result<Step<T>, WalkError> {
92
184
loop {
93
-
let Some(mut need) = self.stack.last() else {
185
+
let Some(need) = self.stack.last_mut() else {
94
186
log::trace!("tried to walk but we're actually done.");
95
187
return Ok(Step::Finish);
96
188
};
97
189
98
-
match &mut need {
99
-
Need::Node(cid) => {
190
+
match need {
191
+
&mut Need::Node { depth, cid } => {
100
192
log::trace!("need node {cid:?}");
101
-
let Some(block) = blocks.remove(cid) else {
193
+
let Some(block) = blocks.remove(&cid) else {
102
194
log::trace!("node not found, resting");
103
-
return Ok(Step::Rest(*cid));
195
+
return Ok(Step::Missing(cid));
104
196
};
105
197
106
198
let MaybeProcessedBlock::Raw(data) = block else {
107
-
return Err(Trip::BadCommit("failed commit fingerprint".into()));
199
+
return Err(WalkError::BadCommitFingerprint);
108
200
};
109
201
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
110
-
.map_err(|e| Trip::BadCommit(e.into()))?;
202
+
.map_err(WalkError::BadCommit)?;
111
203
112
204
// found node, make sure we remember
113
205
self.stack.pop();
114
206
115
207
// queue up work on the found node next
116
-
push_from_node(&mut self.stack, &node)?;
208
+
push_from_node(&mut self.stack, &node, depth)?;
117
209
}
118
210
Need::Record { rkey, cid } => {
119
211
log::trace!("need record {cid:?}");
212
+
// note that we cannot *remove* a record block, sadly, since
213
+
// there can be multiple rkeys pointing to the same cid.
120
214
let Some(data) = blocks.get_mut(cid) else {
215
+
return Ok(Step::Missing(*cid));
216
+
};
217
+
let rkey = rkey.clone();
218
+
let data = match data {
219
+
MaybeProcessedBlock::Raw(data) => process(data.to_vec()),
220
+
MaybeProcessedBlock::Processed(t) => t.clone(),
221
+
};
222
+
223
+
// found node, make sure we remember
224
+
self.stack.pop();
225
+
226
+
// rkeys *must* be in order or else the tree is invalid (or
227
+
// we have a bug)
228
+
if rkey <= self.prev {
229
+
return Err(MstError::RkeyOutOfOrder)?;
230
+
}
231
+
self.prev = rkey.clone();
232
+
233
+
return Ok(Step::Found { rkey, data });
234
+
}
235
+
}
236
+
}
237
+
}
238
+
239
+
/// blocking!!!!!!
240
+
pub fn disk_step<T: Processable>(
241
+
&mut self,
242
+
reader: &mut SqliteReader,
243
+
process: impl Fn(Vec<u8>) -> T,
244
+
) -> Result<Step<T>, WalkError> {
245
+
loop {
246
+
let Some(need) = self.stack.last_mut() else {
247
+
log::trace!("tried to walk but we're actually done.");
248
+
return Ok(Step::Finish);
249
+
};
250
+
251
+
match need {
252
+
&mut Need::Node { depth, cid } => {
253
+
let cid_bytes = cid.to_bytes();
254
+
log::trace!("need node {cid:?}");
255
+
let Some(block_bytes) = reader.get(cid_bytes)? else {
256
+
log::trace!("node not found, resting");
257
+
return Ok(Step::Missing(cid));
258
+
};
259
+
260
+
let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?;
261
+
262
+
let MaybeProcessedBlock::Raw(data) = block else {
263
+
return Err(WalkError::BadCommitFingerprint);
264
+
};
265
+
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
266
+
.map_err(WalkError::BadCommit)?;
267
+
268
+
// found node, make sure we remember
269
+
self.stack.pop();
270
+
271
+
// queue up work on the found node next
272
+
push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?;
273
+
}
274
+
Need::Record { rkey, cid } => {
275
+
log::trace!("need record {cid:?}");
276
+
let cid_bytes = cid.to_bytes();
277
+
let Some(data_bytes) = reader.get(cid_bytes)? else {
121
278
log::trace!("record block not found, resting");
122
-
return Ok(Step::Rest(*cid));
279
+
return Ok(Step::Missing(*cid));
123
280
};
281
+
let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?;
124
282
let rkey = rkey.clone();
125
283
let data = match data {
126
284
MaybeProcessedBlock::Raw(data) => process(data),
127
-
MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()),
128
-
bad => {
129
-
// big hack to pull the error out -- this corrupts
130
-
// a block, so we should not continue trying to work
131
-
let mut steal = MaybeProcessedBlock::Raw(vec![]);
132
-
std::mem::swap(&mut steal, bad);
133
-
let MaybeProcessedBlock::Processed(Err(e)) = steal else {
134
-
unreachable!();
135
-
};
136
-
return Err(Trip::ProcessFailed(e));
137
-
}
285
+
MaybeProcessedBlock::Processed(t) => t.clone(),
138
286
};
139
287
140
288
// found node, make sure we remember
141
289
self.stack.pop();
142
290
143
291
log::trace!("emitting a block as a step. depth={}", self.stack.len());
144
-
let data = data.map_err(Trip::ProcessFailed)?;
145
-
return Ok(Step::Step { rkey, data });
292
+
293
+
// rkeys *must* be in order or else the tree is invalid (or
294
+
// we have a bug)
295
+
if rkey <= self.prev {
296
+
return Err(MstError::RkeyOutOfOrder)?;
297
+
}
298
+
self.prev = rkey.clone();
299
+
300
+
return Ok(Step::Found { rkey, data });
146
301
}
147
302
}
148
303
}
···
152
307
#[cfg(test)]
153
308
mod test {
154
309
use super::*;
155
-
// use crate::mst::Entry;
156
310
157
311
fn cid1() -> Cid {
158
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
159
313
.parse()
160
314
.unwrap()
161
315
}
162
-
// fn cid2() -> Cid {
163
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
164
-
// .parse()
165
-
// .unwrap()
166
-
// }
167
-
// fn cid3() -> Cid {
168
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
169
-
// .parse()
170
-
// .unwrap()
171
-
// }
172
-
// fn cid4() -> Cid {
173
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
174
-
// .parse()
175
-
// .unwrap()
176
-
// }
177
-
// fn cid5() -> Cid {
178
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
179
-
// .parse()
180
-
// .unwrap()
181
-
// }
182
-
// fn cid6() -> Cid {
183
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
184
-
// .parse()
185
-
// .unwrap()
186
-
// }
187
-
// fn cid7() -> Cid {
188
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
189
-
// .parse()
190
-
// .unwrap()
191
-
// }
192
-
// fn cid8() -> Cid {
193
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
194
-
// .parse()
195
-
// .unwrap()
196
-
// }
197
-
// fn cid9() -> Cid {
198
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
199
-
// .parse()
200
-
// .unwrap()
201
-
// }
316
+
317
+
#[test]
318
+
fn test_depth_spec_0() {
319
+
let d = Depth::from_key(b"2653ae71");
320
+
assert_eq!(d, Depth::Depth(0))
321
+
}
322
+
323
+
#[test]
324
+
fn test_depth_spec_1() {
325
+
let d = Depth::from_key(b"blue");
326
+
assert_eq!(d, Depth::Depth(1))
327
+
}
328
+
329
+
#[test]
330
+
fn test_depth_spec_4() {
331
+
let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec");
332
+
assert_eq!(d, Depth::Depth(4))
333
+
}
334
+
335
+
#[test]
336
+
fn test_depth_spec_8() {
337
+
let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c");
338
+
assert_eq!(d, Depth::Depth(8))
339
+
}
340
+
341
+
#[test]
342
+
fn test_depth_ietf_draft_0() {
343
+
let d = Depth::from_key(b"key1");
344
+
assert_eq!(d, Depth::Depth(0))
345
+
}
346
+
347
+
#[test]
348
+
fn test_depth_ietf_draft_1() {
349
+
let d = Depth::from_key(b"key7");
350
+
assert_eq!(d, Depth::Depth(1))
351
+
}
352
+
353
+
#[test]
354
+
fn test_depth_ietf_draft_4() {
355
+
let d = Depth::from_key(b"key515");
356
+
assert_eq!(d, Depth::Depth(4))
357
+
}
358
+
359
+
#[test]
360
+
fn test_depth_interop() {
361
+
// examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json
362
+
for (k, expected) in [
363
+
("", 0),
364
+
("asdf", 0),
365
+
("blue", 1),
366
+
("2653ae71", 0),
367
+
("88bfafc7", 2),
368
+
("2a92d355", 4),
369
+
("884976f5", 6),
370
+
("app.bsky.feed.post/454397e440ec", 4),
371
+
("app.bsky.feed.post/9adeb165882c", 8),
372
+
] {
373
+
let d = Depth::from_key(k.as_bytes());
374
+
assert_eq!(d, Depth::Depth(expected), "key: {}", k);
375
+
}
376
+
}
202
377
203
378
#[test]
204
-
fn test_next_from_node_empty() {
205
-
let node = Node {
379
+
fn test_push_empty_fails() {
380
+
let empty_node = Node {
206
381
left: None,
207
382
entries: vec![],
208
383
};
209
384
let mut stack = vec![];
210
-
push_from_node(&mut stack, &node).unwrap();
211
-
assert_eq!(stack.last(), None);
385
+
let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4));
386
+
assert_eq!(err, Err(MstError::EmptyNode));
212
387
}
213
388
214
389
#[test]
215
-
fn test_needs_from_node_just_left() {
390
+
fn test_push_one_node() {
216
391
let node = Node {
217
392
left: Some(cid1()),
218
393
entries: vec![],
219
394
};
220
395
let mut stack = vec![];
221
-
push_from_node(&mut stack, &node).unwrap();
222
-
assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref());
396
+
push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap();
397
+
assert_eq!(
398
+
stack.last(),
399
+
Some(Need::Node {
400
+
depth: Depth::Depth(3),
401
+
cid: cid1()
402
+
})
403
+
.as_ref()
404
+
);
223
405
}
224
-
225
-
// #[test]
226
-
// fn test_needs_from_node_just_one_record() {
227
-
// let node = Node {
228
-
// left: None,
229
-
// entries: vec![Entry {
230
-
// keysuffix: "asdf".into(),
231
-
// prefix_len: 0,
232
-
// value: cid1(),
233
-
// tree: None,
234
-
// }],
235
-
// };
236
-
// assert_eq!(
237
-
// needs_from_node(node).unwrap(),
238
-
// vec![Need::Record {
239
-
// rkey: "asdf".into(),
240
-
// cid: cid1(),
241
-
// },]
242
-
// );
243
-
// }
244
-
245
-
// #[test]
246
-
// fn test_needs_from_node_two_records() {
247
-
// let node = Node {
248
-
// left: None,
249
-
// entries: vec![
250
-
// Entry {
251
-
// keysuffix: "asdf".into(),
252
-
// prefix_len: 0,
253
-
// value: cid1(),
254
-
// tree: None,
255
-
// },
256
-
// Entry {
257
-
// keysuffix: "gh".into(),
258
-
// prefix_len: 2,
259
-
// value: cid2(),
260
-
// tree: None,
261
-
// },
262
-
// ],
263
-
// };
264
-
// assert_eq!(
265
-
// needs_from_node(node).unwrap(),
266
-
// vec![
267
-
// Need::Record {
268
-
// rkey: "asdf".into(),
269
-
// cid: cid1(),
270
-
// },
271
-
// Need::Record {
272
-
// rkey: "asgh".into(),
273
-
// cid: cid2(),
274
-
// },
275
-
// ]
276
-
// );
277
-
// }
278
-
279
-
// #[test]
280
-
// fn test_needs_from_node_with_both() {
281
-
// let node = Node {
282
-
// left: None,
283
-
// entries: vec![Entry {
284
-
// keysuffix: "asdf".into(),
285
-
// prefix_len: 0,
286
-
// value: cid1(),
287
-
// tree: Some(cid2()),
288
-
// }],
289
-
// };
290
-
// assert_eq!(
291
-
// needs_from_node(node).unwrap(),
292
-
// vec![
293
-
// Need::Record {
294
-
// rkey: "asdf".into(),
295
-
// cid: cid1(),
296
-
// },
297
-
// Need::Node(cid2()),
298
-
// ]
299
-
// );
300
-
// }
301
-
302
-
// #[test]
303
-
// fn test_needs_from_node_left_and_record() {
304
-
// let node = Node {
305
-
// left: Some(cid1()),
306
-
// entries: vec![Entry {
307
-
// keysuffix: "asdf".into(),
308
-
// prefix_len: 0,
309
-
// value: cid2(),
310
-
// tree: None,
311
-
// }],
312
-
// };
313
-
// assert_eq!(
314
-
// needs_from_node(node).unwrap(),
315
-
// vec![
316
-
// Need::Node(cid1()),
317
-
// Need::Record {
318
-
// rkey: "asdf".into(),
319
-
// cid: cid2(),
320
-
// },
321
-
// ]
322
-
// );
323
-
// }
324
-
325
-
// #[test]
326
-
// fn test_needs_from_full_node() {
327
-
// let node = Node {
328
-
// left: Some(cid1()),
329
-
// entries: vec![
330
-
// Entry {
331
-
// keysuffix: "asdf".into(),
332
-
// prefix_len: 0,
333
-
// value: cid2(),
334
-
// tree: Some(cid3()),
335
-
// },
336
-
// Entry {
337
-
// keysuffix: "ghi".into(),
338
-
// prefix_len: 1,
339
-
// value: cid4(),
340
-
// tree: Some(cid5()),
341
-
// },
342
-
// Entry {
343
-
// keysuffix: "jkl".into(),
344
-
// prefix_len: 2,
345
-
// value: cid6(),
346
-
// tree: Some(cid7()),
347
-
// },
348
-
// Entry {
349
-
// keysuffix: "mno".into(),
350
-
// prefix_len: 4,
351
-
// value: cid8(),
352
-
// tree: Some(cid9()),
353
-
// },
354
-
// ],
355
-
// };
356
-
// assert_eq!(
357
-
// needs_from_node(node).unwrap(),
358
-
// vec![
359
-
// Need::Node(cid1()),
360
-
// Need::Record {
361
-
// rkey: "asdf".into(),
362
-
// cid: cid2(),
363
-
// },
364
-
// Need::Node(cid3()),
365
-
// Need::Record {
366
-
// rkey: "aghi".into(),
367
-
// cid: cid4(),
368
-
// },
369
-
// Need::Node(cid5()),
370
-
// Need::Record {
371
-
// rkey: "agjkl".into(),
372
-
// cid: cid6(),
373
-
// },
374
-
// Need::Node(cid7()),
375
-
// Need::Record {
376
-
// rkey: "agjkmno".into(),
377
-
// cid: cid8(),
378
-
// },
379
-
// Need::Node(cid9()),
380
-
// ]
381
-
// );
382
-
// }
383
406
}
+34
-31
tests/non-huge-cars.rs
+34
-31
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
9
8
10
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
11
-
let reader = CarReader::new(bytes).await.unwrap();
12
-
13
-
let root = reader
14
-
.header()
15
-
.roots()
16
-
.first()
17
-
.ok_or("missing root")
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
15
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
16
+
.await
18
17
.unwrap()
19
-
.clone();
20
-
21
-
let stream = std::pin::pin!(reader.stream());
22
-
23
-
let (_commit, v) =
24
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
25
-
.await
26
-
.unwrap();
27
-
let mut record_stream = std::pin::pin!(v.stream());
18
+
{
19
+
Driver::Memory(_commit, mem_driver) => mem_driver,
20
+
Driver::Disk(_) => panic!("too big"),
21
+
};
28
22
29
23
let mut records = 0;
30
24
let mut sum = 0;
31
25
let mut found_bsky_profile = false;
32
26
let mut prev_rkey = "".to_string();
33
-
while let Some((rkey, size)) = record_stream.try_next().await.unwrap() {
34
-
records += 1;
35
-
sum += size;
36
-
if rkey.0 == "app.bsky.actor.profile/self" {
37
-
found_bsky_profile = true;
27
+
28
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
29
+
for (rkey, size) in pairs {
30
+
records += 1;
31
+
sum += size;
32
+
if rkey == "app.bsky.actor.profile/self" {
33
+
found_bsky_profile = true;
34
+
}
35
+
assert!(rkey > prev_rkey, "rkeys are streamed in order");
36
+
prev_rkey = rkey;
38
37
}
39
-
assert!(rkey.0 > prev_rkey, "rkeys are streamed in order");
40
-
prev_rkey = rkey.0;
41
38
}
39
+
42
40
assert_eq!(records, expected_records);
43
41
assert_eq!(sum, expected_sum);
44
-
assert!(found_bsky_profile);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
45
48
}
46
49
47
50
#[tokio::test]
48
51
async fn test_tiny_car() {
49
-
test_car(TINY_CAR, 8, 2071).await
52
+
test_car(TINY_CAR, 8, 2071, true).await
50
53
}
51
54
52
55
#[tokio::test]
53
56
async fn test_little_car() {
54
-
test_car(LITTLE_CAR, 278, 246960).await
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
55
58
}
56
59
57
60
#[tokio::test]
58
61
async fn test_midsize_car() {
59
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
60
63
}