+380
-2
Cargo.lock
+380
-2
Cargo.lock
···
126
126
]
127
127
128
128
[[package]]
129
+
name = "bincode"
130
+
version = "2.0.1"
131
+
source = "registry+https://github.com/rust-lang/crates.io-index"
132
+
checksum = "36eaf5d7b090263e8150820482d5d93cd964a81e4019913c972f4edcc6edb740"
133
+
dependencies = [
134
+
"bincode_derive",
135
+
"serde",
136
+
"unty",
137
+
]
138
+
139
+
[[package]]
140
+
name = "bincode_derive"
141
+
version = "2.0.1"
142
+
source = "registry+https://github.com/rust-lang/crates.io-index"
143
+
checksum = "bf95709a440f45e986983918d0e8a1f30a9b1df04918fc828670606804ac3c09"
144
+
dependencies = [
145
+
"virtue",
146
+
]
147
+
148
+
[[package]]
129
149
name = "bitflags"
130
150
version = "2.9.4"
131
151
source = "registry+https://github.com/rust-lang/crates.io-index"
132
152
checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394"
133
153
134
154
[[package]]
155
+
name = "block-buffer"
156
+
version = "0.10.4"
157
+
source = "registry+https://github.com/rust-lang/crates.io-index"
158
+
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
159
+
dependencies = [
160
+
"generic-array",
161
+
]
162
+
163
+
[[package]]
135
164
name = "bumpalo"
136
165
version = "3.19.0"
137
166
source = "registry+https://github.com/rust-lang/crates.io-index"
138
167
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
139
168
140
169
[[package]]
170
+
name = "byteorder-lite"
171
+
version = "0.1.0"
172
+
source = "registry+https://github.com/rust-lang/crates.io-index"
173
+
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
174
+
175
+
[[package]]
141
176
name = "bytes"
142
177
version = "1.10.1"
143
178
source = "registry+https://github.com/rust-lang/crates.io-index"
144
179
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
180
+
181
+
[[package]]
182
+
name = "byteview"
183
+
version = "0.10.0"
184
+
source = "registry+https://github.com/rust-lang/crates.io-index"
185
+
checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca"
145
186
146
187
[[package]]
147
188
name = "cast"
···
252
293
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
253
294
254
295
[[package]]
296
+
name = "compare"
297
+
version = "0.0.6"
298
+
source = "registry+https://github.com/rust-lang/crates.io-index"
299
+
checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7"
300
+
301
+
[[package]]
255
302
name = "const-str"
256
303
version = "0.4.3"
257
304
source = "registry+https://github.com/rust-lang/crates.io-index"
···
267
314
]
268
315
269
316
[[package]]
317
+
name = "cpufeatures"
318
+
version = "0.2.17"
319
+
source = "registry+https://github.com/rust-lang/crates.io-index"
320
+
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
321
+
dependencies = [
322
+
"libc",
323
+
]
324
+
325
+
[[package]]
270
326
name = "criterion"
271
327
version = "0.7.0"
272
328
source = "registry+https://github.com/rust-lang/crates.io-index"
···
320
376
]
321
377
322
378
[[package]]
379
+
name = "crossbeam-skiplist"
380
+
version = "0.1.3"
381
+
source = "registry+https://github.com/rust-lang/crates.io-index"
382
+
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
383
+
dependencies = [
384
+
"crossbeam-epoch",
385
+
"crossbeam-utils",
386
+
]
387
+
388
+
[[package]]
323
389
name = "crossbeam-utils"
324
390
version = "0.8.21"
325
391
source = "registry+https://github.com/rust-lang/crates.io-index"
···
332
398
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
333
399
334
400
[[package]]
401
+
name = "crypto-common"
402
+
version = "0.1.6"
403
+
source = "registry+https://github.com/rust-lang/crates.io-index"
404
+
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
405
+
dependencies = [
406
+
"generic-array",
407
+
"typenum",
408
+
]
409
+
410
+
[[package]]
411
+
name = "dashmap"
412
+
version = "6.1.0"
413
+
source = "registry+https://github.com/rust-lang/crates.io-index"
414
+
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
415
+
dependencies = [
416
+
"cfg-if",
417
+
"crossbeam-utils",
418
+
"hashbrown 0.14.5",
419
+
"lock_api",
420
+
"once_cell",
421
+
"parking_lot_core",
422
+
]
423
+
424
+
[[package]]
335
425
name = "data-encoding"
336
426
version = "2.9.0"
337
427
source = "registry+https://github.com/rust-lang/crates.io-index"
···
358
448
]
359
449
360
450
[[package]]
451
+
name = "digest"
452
+
version = "0.10.7"
453
+
source = "registry+https://github.com/rust-lang/crates.io-index"
454
+
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
455
+
dependencies = [
456
+
"block-buffer",
457
+
"crypto-common",
458
+
]
459
+
460
+
[[package]]
361
461
name = "either"
362
462
version = "1.15.0"
363
463
source = "registry+https://github.com/rust-lang/crates.io-index"
364
464
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
365
465
366
466
[[package]]
467
+
name = "enum_dispatch"
468
+
version = "0.3.13"
469
+
source = "registry+https://github.com/rust-lang/crates.io-index"
470
+
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
471
+
dependencies = [
472
+
"once_cell",
473
+
"proc-macro2",
474
+
"quote",
475
+
"syn 2.0.106",
476
+
]
477
+
478
+
[[package]]
367
479
name = "env_filter"
368
480
version = "0.1.3"
369
481
source = "registry+https://github.com/rust-lang/crates.io-index"
···
387
499
]
388
500
389
501
[[package]]
502
+
name = "equivalent"
503
+
version = "1.0.2"
504
+
source = "registry+https://github.com/rust-lang/crates.io-index"
505
+
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
506
+
507
+
[[package]]
508
+
name = "errno"
509
+
version = "0.3.14"
510
+
source = "registry+https://github.com/rust-lang/crates.io-index"
511
+
checksum = "39cab71617ae0d63f51a36d69f866391735b51691dbda63cf6f96d042b63efeb"
512
+
dependencies = [
513
+
"libc",
514
+
"windows-sys 0.60.2",
515
+
]
516
+
517
+
[[package]]
518
+
name = "fastrand"
519
+
version = "2.3.0"
520
+
source = "registry+https://github.com/rust-lang/crates.io-index"
521
+
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
522
+
523
+
[[package]]
524
+
name = "fjall"
525
+
version = "3.0.1"
526
+
source = "registry+https://github.com/rust-lang/crates.io-index"
527
+
checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093"
528
+
dependencies = [
529
+
"byteorder-lite",
530
+
"byteview",
531
+
"dashmap",
532
+
"flume",
533
+
"log",
534
+
"lsm-tree",
535
+
"tempfile",
536
+
"xxhash-rust",
537
+
]
538
+
539
+
[[package]]
540
+
name = "flume"
541
+
version = "0.12.0"
542
+
source = "registry+https://github.com/rust-lang/crates.io-index"
543
+
checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be"
544
+
dependencies = [
545
+
"spin",
546
+
]
547
+
548
+
[[package]]
390
549
name = "futures"
391
550
version = "0.3.31"
392
551
source = "registry+https://github.com/rust-lang/crates.io-index"
···
476
635
]
477
636
478
637
[[package]]
638
+
name = "generic-array"
639
+
version = "0.14.9"
640
+
source = "registry+https://github.com/rust-lang/crates.io-index"
641
+
checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2"
642
+
dependencies = [
643
+
"typenum",
644
+
"version_check",
645
+
]
646
+
647
+
[[package]]
648
+
name = "getrandom"
649
+
version = "0.3.3"
650
+
source = "registry+https://github.com/rust-lang/crates.io-index"
651
+
checksum = "26145e563e54f2cadc477553f1ec5ee650b00862f0a58bcd12cbdc5f0ea2d2f4"
652
+
dependencies = [
653
+
"cfg-if",
654
+
"libc",
655
+
"r-efi",
656
+
"wasi 0.14.7+wasi-0.2.4",
657
+
]
658
+
659
+
[[package]]
479
660
name = "gimli"
480
661
version = "0.32.3"
481
662
source = "registry+https://github.com/rust-lang/crates.io-index"
···
493
674
]
494
675
495
676
[[package]]
677
+
name = "hashbrown"
678
+
version = "0.14.5"
679
+
source = "registry+https://github.com/rust-lang/crates.io-index"
680
+
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
681
+
682
+
[[package]]
683
+
name = "hashbrown"
684
+
version = "0.16.1"
685
+
source = "registry+https://github.com/rust-lang/crates.io-index"
686
+
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
687
+
688
+
[[package]]
496
689
name = "heck"
497
690
version = "0.5.0"
498
691
source = "registry+https://github.com/rust-lang/crates.io-index"
499
692
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
693
+
694
+
[[package]]
695
+
name = "interval-heap"
696
+
version = "0.0.5"
697
+
source = "registry+https://github.com/rust-lang/crates.io-index"
698
+
checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6"
699
+
dependencies = [
700
+
"compare",
701
+
]
500
702
501
703
[[package]]
502
704
name = "io-uring"
···
598
800
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
599
801
600
802
[[package]]
803
+
name = "linux-raw-sys"
804
+
version = "0.11.0"
805
+
source = "registry+https://github.com/rust-lang/crates.io-index"
806
+
checksum = "df1d3c3b53da64cf5760482273a98e575c651a67eec7f77df96b5b642de8f039"
807
+
808
+
[[package]]
601
809
name = "lock_api"
602
810
version = "0.4.14"
603
811
source = "registry+https://github.com/rust-lang/crates.io-index"
···
613
821
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
614
822
615
823
[[package]]
824
+
name = "lsm-tree"
825
+
version = "3.0.1"
826
+
source = "registry+https://github.com/rust-lang/crates.io-index"
827
+
checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb"
828
+
dependencies = [
829
+
"byteorder-lite",
830
+
"byteview",
831
+
"crossbeam-skiplist",
832
+
"enum_dispatch",
833
+
"interval-heap",
834
+
"log",
835
+
"quick_cache",
836
+
"rustc-hash",
837
+
"self_cell",
838
+
"sfa",
839
+
"tempfile",
840
+
"varint-rs",
841
+
"xxhash-rust",
842
+
]
843
+
844
+
[[package]]
616
845
name = "match-lookup"
617
846
version = "0.1.1"
618
847
source = "registry+https://github.com/rust-lang/crates.io-index"
···
645
874
checksum = "78bed444cc8a2160f01cbcf811ef18cac863ad68ae8ca62092e8db51d51c761c"
646
875
dependencies = [
647
876
"libc",
648
-
"wasi",
877
+
"wasi 0.11.1+wasi-snapshot-preview1",
649
878
"windows-sys 0.59.0",
650
879
]
651
880
···
796
1025
]
797
1026
798
1027
[[package]]
1028
+
name = "quick_cache"
1029
+
version = "0.6.18"
1030
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1031
+
checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3"
1032
+
dependencies = [
1033
+
"equivalent",
1034
+
"hashbrown 0.16.1",
1035
+
]
1036
+
1037
+
[[package]]
799
1038
name = "quote"
800
1039
version = "1.0.41"
801
1040
source = "registry+https://github.com/rust-lang/crates.io-index"
···
803
1042
dependencies = [
804
1043
"proc-macro2",
805
1044
]
1045
+
1046
+
[[package]]
1047
+
name = "r-efi"
1048
+
version = "5.3.0"
1049
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1050
+
checksum = "69cdb34c158ceb288df11e18b4bd39de994f6657d83847bdffdbd7f346754b0f"
806
1051
807
1052
[[package]]
808
1053
name = "rayon"
···
864
1109
865
1110
[[package]]
866
1111
name = "repo-stream"
867
-
version = "0.1.0"
1112
+
version = "0.2.2"
868
1113
dependencies = [
1114
+
"bincode",
869
1115
"clap",
870
1116
"criterion",
871
1117
"env_logger",
1118
+
"fjall",
872
1119
"futures",
873
1120
"futures-core",
874
1121
"ipld-core",
···
878
1125
"serde",
879
1126
"serde_bytes",
880
1127
"serde_ipld_dagcbor",
1128
+
"sha2",
1129
+
"tempfile",
881
1130
"thiserror 2.0.17",
882
1131
"tokio",
883
1132
]
···
889
1138
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
890
1139
891
1140
[[package]]
1141
+
name = "rustc-hash"
1142
+
version = "2.1.1"
1143
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1144
+
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
1145
+
1146
+
[[package]]
1147
+
name = "rustix"
1148
+
version = "1.1.2"
1149
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1150
+
checksum = "cd15f8a2c5551a84d56efdc1cd049089e409ac19a3072d5037a17fd70719ff3e"
1151
+
dependencies = [
1152
+
"bitflags",
1153
+
"errno",
1154
+
"libc",
1155
+
"linux-raw-sys",
1156
+
"windows-sys 0.60.2",
1157
+
]
1158
+
1159
+
[[package]]
892
1160
name = "rustversion"
893
1161
version = "1.0.22"
894
1162
source = "registry+https://github.com/rust-lang/crates.io-index"
···
914
1182
version = "1.2.0"
915
1183
source = "registry+https://github.com/rust-lang/crates.io-index"
916
1184
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
1185
+
1186
+
[[package]]
1187
+
name = "self_cell"
1188
+
version = "1.2.2"
1189
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1190
+
checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89"
917
1191
918
1192
[[package]]
919
1193
name = "serde"
···
981
1255
]
982
1256
983
1257
[[package]]
1258
+
name = "sfa"
1259
+
version = "1.0.0"
1260
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1261
+
checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175"
1262
+
dependencies = [
1263
+
"byteorder-lite",
1264
+
"log",
1265
+
"xxhash-rust",
1266
+
]
1267
+
1268
+
[[package]]
1269
+
name = "sha2"
1270
+
version = "0.10.9"
1271
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1272
+
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
1273
+
dependencies = [
1274
+
"cfg-if",
1275
+
"cpufeatures",
1276
+
"digest",
1277
+
]
1278
+
1279
+
[[package]]
984
1280
name = "signal-hook-registry"
985
1281
version = "1.4.6"
986
1282
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1012
1308
]
1013
1309
1014
1310
[[package]]
1311
+
name = "spin"
1312
+
version = "0.9.8"
1313
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1314
+
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
1315
+
dependencies = [
1316
+
"lock_api",
1317
+
]
1318
+
1319
+
[[package]]
1015
1320
name = "strsim"
1016
1321
version = "0.11.1"
1017
1322
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1040
1345
]
1041
1346
1042
1347
[[package]]
1348
+
name = "tempfile"
1349
+
version = "3.23.0"
1350
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1351
+
checksum = "2d31c77bdf42a745371d260a26ca7163f1e0924b64afa0b688e61b5a9fa02f16"
1352
+
dependencies = [
1353
+
"fastrand",
1354
+
"getrandom",
1355
+
"once_cell",
1356
+
"rustix",
1357
+
"windows-sys 0.60.2",
1358
+
]
1359
+
1360
+
[[package]]
1043
1361
name = "thiserror"
1044
1362
version = "1.0.69"
1045
1363
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1121
1439
]
1122
1440
1123
1441
[[package]]
1442
+
name = "typenum"
1443
+
version = "1.19.0"
1444
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1445
+
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
1446
+
1447
+
[[package]]
1124
1448
name = "unicode-ident"
1125
1449
version = "1.0.19"
1126
1450
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1139
1463
checksum = "eb066959b24b5196ae73cb057f45598450d2c5f71460e98c49b738086eff9c06"
1140
1464
1141
1465
[[package]]
1466
+
name = "unty"
1467
+
version = "0.0.4"
1468
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1469
+
checksum = "6d49784317cd0d1ee7ec5c716dd598ec5b4483ea832a2dced265471cc0f690ae"
1470
+
1471
+
[[package]]
1142
1472
name = "utf8parse"
1143
1473
version = "0.2.2"
1144
1474
source = "registry+https://github.com/rust-lang/crates.io-index"
1145
1475
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
1146
1476
1147
1477
[[package]]
1478
+
name = "varint-rs"
1479
+
version = "2.2.0"
1480
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1481
+
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
1482
+
1483
+
[[package]]
1484
+
name = "version_check"
1485
+
version = "0.9.5"
1486
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1487
+
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
1488
+
1489
+
[[package]]
1490
+
name = "virtue"
1491
+
version = "0.0.18"
1492
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1493
+
checksum = "051eb1abcf10076295e815102942cc58f9d5e3b4560e46e53c21e8ff6f3af7b1"
1494
+
1495
+
[[package]]
1148
1496
name = "walkdir"
1149
1497
version = "2.5.0"
1150
1498
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1159
1507
version = "0.11.1+wasi-snapshot-preview1"
1160
1508
source = "registry+https://github.com/rust-lang/crates.io-index"
1161
1509
checksum = "ccf3ec651a847eb01de73ccad15eb7d99f80485de043efb2f370cd654f4ea44b"
1510
+
1511
+
[[package]]
1512
+
name = "wasi"
1513
+
version = "0.14.7+wasi-0.2.4"
1514
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1515
+
checksum = "883478de20367e224c0090af9cf5f9fa85bed63a95c1abf3afc5c083ebc06e8c"
1516
+
dependencies = [
1517
+
"wasip2",
1518
+
]
1519
+
1520
+
[[package]]
1521
+
name = "wasip2"
1522
+
version = "1.0.1+wasi-0.2.4"
1523
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1524
+
checksum = "0562428422c63773dad2c345a1882263bbf4d65cf3f42e90921f787ef5ad58e7"
1525
+
dependencies = [
1526
+
"wit-bindgen",
1527
+
]
1162
1528
1163
1529
[[package]]
1164
1530
name = "wasm-bindgen"
···
1390
1756
version = "0.53.1"
1391
1757
source = "registry+https://github.com/rust-lang/crates.io-index"
1392
1758
checksum = "d6bbff5f0aada427a1e5a6da5f1f98158182f26556f345ac9e04d36d0ebed650"
1759
+
1760
+
[[package]]
1761
+
name = "wit-bindgen"
1762
+
version = "0.46.0"
1763
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1764
+
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
1765
+
1766
+
[[package]]
1767
+
name = "xxhash-rust"
1768
+
version = "0.8.15"
1769
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1770
+
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
1393
1771
1394
1772
[[package]]
1395
1773
name = "zerocopy"
+12
-2
Cargo.toml
+12
-2
Cargo.toml
···
1
1
[package]
2
2
name = "repo-stream"
3
-
version = "0.1.0"
3
+
version = "0.2.2"
4
4
edition = "2024"
5
+
license = "MIT OR Apache-2.0"
6
+
description = "A robust CAR file -> MST walker for atproto"
7
+
repository = "https://tangled.org/@microcosm.blue/repo-stream"
5
8
6
9
[dependencies]
10
+
bincode = { version = "2.0.1", features = ["serde"] }
11
+
fjall = { version = "3.0.1", default-features = false }
7
12
futures = "0.3.31"
8
13
futures-core = "0.3.31"
9
14
ipld-core = { version = "0.4.2", features = ["serde"] }
···
13
18
serde = { version = "1.0.228", features = ["derive"] }
14
19
serde_bytes = "0.11.19"
15
20
serde_ipld_dagcbor = "0.6.4"
21
+
sha2 = "0.10.9"
16
22
thiserror = "2.0.17"
17
-
tokio = "1.47.1"
23
+
tokio = { version = "1.47.1", features = ["rt", "sync"] }
18
24
19
25
[dev-dependencies]
20
26
clap = { version = "4.5.48", features = ["derive"] }
21
27
criterion = { version = "0.7.0", features = ["async_tokio"] }
22
28
env_logger = "0.11.8"
23
29
multibase = "0.9.2"
30
+
tempfile = "3.23.0"
24
31
tokio = { version = "1.47.1", features = ["full"] }
25
32
26
33
[profile.profiling]
27
34
inherits = "release"
28
35
debug = true
36
+
37
+
# [profile.release]
38
+
# debug = true
29
39
30
40
[[bench]]
31
41
name = "non-huge-cars"
+190
LICENSE.Apache-2.0
+190
LICENSE.Apache-2.0
···
1
+
Apache License
2
+
Version 2.0, January 2004
3
+
http://www.apache.org/licenses/
4
+
5
+
TERMS AND CONDITIONS FOR USE, REPRODUCTION, AND DISTRIBUTION
6
+
7
+
1. Definitions.
8
+
9
+
"License" shall mean the terms and conditions for use, reproduction,
10
+
and distribution as defined by Sections 1 through 9 of this document.
11
+
12
+
"Licensor" shall mean the copyright owner or entity authorized by
13
+
the copyright owner that is granting the License.
14
+
15
+
"Legal Entity" shall mean the union of the acting entity and all
16
+
other entities that control, are controlled by, or are under common
17
+
control with that entity. For the purposes of this definition,
18
+
"control" means (i) the power, direct or indirect, to cause the
19
+
direction or management of such entity, whether by contract or
20
+
otherwise, or (ii) ownership of fifty percent (50%) or more of the
21
+
outstanding shares, or (iii) beneficial ownership of such entity.
22
+
23
+
"You" (or "Your") shall mean an individual or Legal Entity
24
+
exercising permissions granted by this License.
25
+
26
+
"Source" form shall mean the preferred form for making modifications,
27
+
including but not limited to software source code, documentation
28
+
source, and configuration files.
29
+
30
+
"Object" form shall mean any form resulting from mechanical
31
+
transformation or translation of a Source form, including but
32
+
not limited to compiled object code, generated documentation,
33
+
and conversions to other media types.
34
+
35
+
"Work" shall mean the work of authorship, whether in Source or
36
+
Object form, made available under the License, as indicated by a
37
+
copyright notice that is included in or attached to the work
38
+
(an example is provided in the Appendix below).
39
+
40
+
"Derivative Works" shall mean any work, whether in Source or Object
41
+
form, that is based on (or derived from) the Work and for which the
42
+
editorial revisions, annotations, elaborations, or other modifications
43
+
represent, as a whole, an original work of authorship. For the purposes
44
+
of this License, Derivative Works shall not include works that remain
45
+
separable from, or merely link (or bind by name) to the interfaces of,
46
+
the Work and Derivative Works thereof.
47
+
48
+
"Contribution" shall mean any work of authorship, including
49
+
the original version of the Work and any modifications or additions
50
+
to that Work or Derivative Works thereof, that is intentionally
51
+
submitted to Licensor for inclusion in the Work by the copyright owner
52
+
or by an individual or Legal Entity authorized to submit on behalf of
53
+
the copyright owner. For the purposes of this definition, "submitted"
54
+
means any form of electronic, verbal, or written communication sent
55
+
to the Licensor or its representatives, including but not limited to
56
+
communication on electronic mailing lists, source code control systems,
57
+
and issue tracking systems that are managed by, or on behalf of, the
58
+
Licensor for the purpose of discussing and improving the Work, but
59
+
excluding communication that is conspicuously marked or otherwise
60
+
designated in writing by the copyright owner as "Not a Contribution."
61
+
62
+
"Contributor" shall mean Licensor and any individual or Legal Entity
63
+
on behalf of whom a Contribution has been received by Licensor and
64
+
subsequently incorporated within the Work.
65
+
66
+
2. Grant of Copyright License. Subject to the terms and conditions of
67
+
this License, each Contributor hereby grants to You a perpetual,
68
+
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
69
+
copyright license to reproduce, prepare Derivative Works of,
70
+
publicly display, publicly perform, sublicense, and distribute the
71
+
Work and such Derivative Works in Source or Object form.
72
+
73
+
3. Grant of Patent License. Subject to the terms and conditions of
74
+
this License, each Contributor hereby grants to You a perpetual,
75
+
worldwide, non-exclusive, no-charge, royalty-free, irrevocable
76
+
(except as stated in this section) patent license to make, have made,
77
+
use, offer to sell, sell, import, and otherwise transfer the Work,
78
+
where such license applies only to those patent claims licensable
79
+
by such Contributor that are necessarily infringed by their
80
+
Contribution(s) alone or by combination of their Contribution(s)
81
+
with the Work to which such Contribution(s) was submitted. If You
82
+
institute patent litigation against any entity (including a
83
+
cross-claim or counterclaim in a lawsuit) alleging that the Work
84
+
or a Contribution incorporated within the Work constitutes direct
85
+
or contributory patent infringement, then any patent licenses
86
+
granted to You under this License for that Work shall terminate
87
+
as of the date such litigation is filed.
88
+
89
+
4. Redistribution. You may reproduce and distribute copies of the
90
+
Work or Derivative Works thereof in any medium, with or without
91
+
modifications, and in Source or Object form, provided that You
92
+
meet the following conditions:
93
+
94
+
(a) You must give any other recipients of the Work or
95
+
Derivative Works a copy of this License; and
96
+
97
+
(b) You must cause any modified files to carry prominent notices
98
+
stating that You changed the files; and
99
+
100
+
(c) You must retain, in the Source form of any Derivative Works
101
+
that You distribute, all copyright, patent, trademark, and
102
+
attribution notices from the Source form of the Work,
103
+
excluding those notices that do not pertain to any part of
104
+
the Derivative Works; and
105
+
106
+
(d) If the Work includes a "NOTICE" text file as part of its
107
+
distribution, then any Derivative Works that You distribute must
108
+
include a readable copy of the attribution notices contained
109
+
within such NOTICE file, excluding those notices that do not
110
+
pertain to any part of the Derivative Works, in at least one
111
+
of the following places: within a NOTICE text file distributed
112
+
as part of the Derivative Works; within the Source form or
113
+
documentation, if provided along with the Derivative Works; or,
114
+
within a display generated by the Derivative Works, if and
115
+
wherever such third-party notices normally appear. The contents
116
+
of the NOTICE file are for informational purposes only and
117
+
do not modify the License. You may add Your own attribution
118
+
notices within Derivative Works that You distribute, alongside
119
+
or as an addendum to the NOTICE text from the Work, provided
120
+
that such additional attribution notices cannot be construed
121
+
as modifying the License.
122
+
123
+
You may add Your own copyright statement to Your modifications and
124
+
may provide additional or different license terms and conditions
125
+
for use, reproduction, or distribution of Your modifications, or
126
+
for any such Derivative Works as a whole, provided Your use,
127
+
reproduction, and distribution of the Work otherwise complies with
128
+
the conditions stated in this License.
129
+
130
+
5. Submission of Contributions. Unless You explicitly state otherwise,
131
+
any Contribution intentionally submitted for inclusion in the Work
132
+
by You to the Licensor shall be under the terms and conditions of
133
+
this License, without any additional terms or conditions.
134
+
Notwithstanding the above, nothing herein shall supersede or modify
135
+
the terms of any separate license agreement you may have executed
136
+
with Licensor regarding such Contributions.
137
+
138
+
6. Trademarks. This License does not grant permission to use the trade
139
+
names, trademarks, service marks, or product names of the Licensor,
140
+
except as required for reasonable and customary use in describing the
141
+
origin of the Work and reproducing the content of the NOTICE file.
142
+
143
+
7. Disclaimer of Warranty. Unless required by applicable law or
144
+
agreed to in writing, Licensor provides the Work (and each
145
+
Contributor provides its Contributions) on an "AS IS" BASIS,
146
+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
147
+
implied, including, without limitation, any warranties or conditions
148
+
of TITLE, NON-INFRINGEMENT, MERCHANTABILITY, or FITNESS FOR A
149
+
PARTICULAR PURPOSE. You are solely responsible for determining the
150
+
appropriateness of using or redistributing the Work and assume any
151
+
risks associated with Your exercise of permissions under this License.
152
+
153
+
8. Limitation of Liability. In no event and under no legal theory,
154
+
whether in tort (including negligence), contract, or otherwise,
155
+
unless required by applicable law (such as deliberate and grossly
156
+
negligent acts) or agreed to in writing, shall any Contributor be
157
+
liable to You for damages, including any direct, indirect, special,
158
+
incidental, or consequential damages of any character arising as a
159
+
result of this License or out of the use or inability to use the
160
+
Work (including but not limited to damages for loss of goodwill,
161
+
work stoppage, computer failure or malfunction, or any and all
162
+
other commercial damages or losses), even if such Contributor
163
+
has been advised of the possibility of such damages.
164
+
165
+
9. Accepting Warranty or Additional Liability. While redistributing
166
+
the Work or Derivative Works thereof, You may choose to offer,
167
+
and charge a fee for, acceptance of support, warranty, indemnity,
168
+
or other liability obligations and/or rights consistent with this
169
+
License. However, in accepting such obligations, You may act only
170
+
on Your own behalf and on Your sole responsibility, not on behalf
171
+
of any other Contributor, and only if You agree to indemnify,
172
+
defend, and hold each Contributor harmless for any liability
173
+
incurred by, or claims asserted against, such Contributor by reason
174
+
of your accepting any such warranty or additional liability.
175
+
176
+
END OF TERMS AND CONDITIONS
177
+
178
+
Copyright 2025 microcosm
179
+
180
+
Licensed under the Apache License, Version 2.0 (the "License");
181
+
you may not use this file except in compliance with the License.
182
+
You may obtain a copy of the License at
183
+
184
+
http://www.apache.org/licenses/LICENSE-2.0
185
+
186
+
Unless required by applicable law or agreed to in writing, software
187
+
distributed under the License is distributed on an "AS IS" BASIS,
188
+
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
189
+
See the License for the specific language governing permissions and
190
+
limitations under the License.
+21
LICENSE.MIT
+21
LICENSE.MIT
···
1
+
MIT License
2
+
3
+
Copyright (c) 2025 microcosm
4
+
5
+
Permission is hereby granted, free of charge, to any person obtaining a copy
6
+
of this software and associated documentation files (the "Software"), to deal
7
+
in the Software without restriction, including without limitation the rights
8
+
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
9
+
copies of the Software, and to permit persons to whom the Software is
10
+
furnished to do so, subject to the following conditions:
11
+
12
+
The above copyright notice and this permission notice shall be included in all
13
+
copies or substantial portions of the Software.
14
+
15
+
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
16
+
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
17
+
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
18
+
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
19
+
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
20
+
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
21
+
SOFTWARE.
+12
-21
benches/huge-car.rs
+12
-21
benches/huge-car.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
use std::path::{Path, PathBuf};
6
4
7
5
use criterion::{Criterion, criterion_group, criterion_main};
···
20
18
});
21
19
}
22
20
23
-
async fn drive_car(filename: impl AsRef<Path>) {
21
+
async fn drive_car(filename: impl AsRef<Path>) -> usize {
24
22
let reader = tokio::fs::File::open(filename).await.unwrap();
25
23
let reader = tokio::io::BufReader::new(reader);
26
-
let reader = CarReader::new(reader).await.unwrap();
27
24
28
-
let root = reader
29
-
.header()
30
-
.roots()
31
-
.first()
32
-
.ok_or("missing root")
25
+
let mut driver = match Driver::load_car(reader, |block| block.len(), 1024)
26
+
.await
33
27
.unwrap()
34
-
.clone();
35
-
36
-
let stream = std::pin::pin!(reader.stream());
37
-
38
-
let (_commit, v) =
39
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
40
-
.await
41
-
.unwrap();
42
-
let mut record_stream = std::pin::pin!(v.stream());
28
+
{
29
+
Driver::Memory(_, mem_driver) => mem_driver,
30
+
Driver::Disk(_) => panic!("not doing disk for benchmark"),
31
+
};
43
32
44
-
while let Some(_) = record_stream.try_next().await.unwrap() {
45
-
// just here for the drive
33
+
let mut n = 0;
34
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
35
+
n += pairs.len();
46
36
}
37
+
n
47
38
}
48
39
49
40
criterion_group!(benches, criterion_benchmark);
+16
-22
benches/non-huge-cars.rs
+16
-22
benches/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
6
4
use criterion::{Criterion, criterion_group, criterion_main};
7
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
8
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
9
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
10
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
15
14
.build()
16
15
.expect("Creating runtime failed");
17
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
18
20
c.bench_function("tiny-car", |b| {
19
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
20
22
});
···
26
28
});
27
29
}
28
30
29
-
async fn drive_car(bytes: &[u8]) {
30
-
let reader = CarReader::new(bytes).await.unwrap();
31
-
32
-
let root = reader
33
-
.header()
34
-
.roots()
35
-
.first()
36
-
.ok_or("missing root")
31
+
async fn drive_car(bytes: &[u8]) -> usize {
32
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 32)
33
+
.await
37
34
.unwrap()
38
-
.clone();
39
-
40
-
let stream = std::pin::pin!(reader.stream());
35
+
{
36
+
Driver::Memory(_, mem_driver) => mem_driver,
37
+
Driver::Disk(_) => panic!("not benching big cars here"),
38
+
};
41
39
42
-
let (_commit, v) =
43
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
44
-
.await
45
-
.unwrap();
46
-
let mut record_stream = std::pin::pin!(v.stream());
47
-
48
-
while let Some(_) = record_stream.try_next().await.unwrap() {
49
-
// just here for the drive
40
+
let mut n = 0;
41
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
42
+
n += pairs.len();
50
43
}
44
+
n
51
45
}
52
46
53
47
criterion_group!(benches, criterion_benchmark);
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+90
examples/disk-read-file/main.rs
+90
examples/disk-read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file by spilling to disk
3
+
*/
4
+
5
+
extern crate repo_stream;
6
+
use clap::Parser;
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8
+
use std::path::PathBuf;
9
+
use std::time::Instant;
10
+
11
+
#[derive(Debug, Parser)]
12
+
struct Args {
13
+
#[arg()]
14
+
car: PathBuf,
15
+
#[arg()]
16
+
tmpfile: PathBuf,
17
+
}
18
+
19
+
#[tokio::main]
20
+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
21
+
env_logger::init();
22
+
23
+
let Args { car, tmpfile } = Args::parse();
24
+
25
+
// repo-stream takes an AsyncRead as input. wrapping a filesystem read in
26
+
// BufReader can provide a really significant performance win.
27
+
let reader = tokio::fs::File::open(car).await?;
28
+
let reader = tokio::io::BufReader::new(reader);
29
+
30
+
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
32
+
33
+
// in this example we only bother handling CARs that are too big for memory
34
+
// `noop` helper means: do no block processing, store the raw blocks
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(32) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
40
+
Driver::Memory(_, _) => panic!("try this on a bigger car"),
41
+
Driver::Disk(big_stuff) => {
42
+
// we reach here if the repo was too big and needs to be spilled to
43
+
// disk to continue
44
+
45
+
// set up a disk store we can spill to
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
47
+
48
+
// do the spilling, get back a (similar) driver
49
+
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
50
+
51
+
// at this point you might want to fetch the account's signing key
52
+
// via the DID from the commit, and then verify the signature.
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
54
+
55
+
// pop the driver back out to get some code indentation relief
56
+
driver
57
+
}
58
+
};
59
+
60
+
// collect some random stats about the blocks
61
+
let mut n = 0;
62
+
let mut zeros = 0;
63
+
64
+
log::info!("walking...");
65
+
66
+
// this example uses the disk driver's channel mode: the tree walking is
67
+
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
68
+
let (mut rx, join) = driver.to_channel(512);
69
+
while let Some(r) = rx.recv().await {
70
+
let pairs = r?;
71
+
72
+
// keep a count of the total number of blocks seen
73
+
n += pairs.len();
74
+
75
+
for (_, block) in pairs {
76
+
// for each block, count how many bytes are equal to '0'
77
+
// (this is just an example, you probably want to do something more
78
+
// interesting)
79
+
zeros += block.into_iter().filter(|&b| b == b'0').count()
80
+
}
81
+
}
82
+
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
84
+
85
+
join.await?;
86
+
87
+
log::info!("done. n={n} zeros={zeros}");
88
+
89
+
Ok(())
90
+
}
+18
-25
examples/read-file/main.rs
+18
-25
examples/read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file with in-memory processing
3
+
*/
4
+
1
5
extern crate repo_stream;
2
6
use clap::Parser;
3
-
use futures::TryStreamExt;
4
-
use iroh_car::CarReader;
5
-
use std::convert::Infallible;
7
+
use repo_stream::{Driver, DriverBuilder};
6
8
use std::path::PathBuf;
7
9
8
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
21
23
let reader = tokio::fs::File::open(file).await?;
22
24
let reader = tokio::io::BufReader::new(reader);
23
25
24
-
println!("hello!");
25
-
26
-
let reader = CarReader::new(reader).await?;
27
-
28
-
let root = reader
29
-
.header()
30
-
.roots()
31
-
.first()
32
-
.ok_or("missing root")?
33
-
.clone();
34
-
log::debug!("root: {root:?}");
35
-
36
-
// let stream = Box::pin(reader.stream());
37
-
let stream = std::pin::pin!(reader.stream());
38
-
39
-
let (commit, v) =
40
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
41
-
.await?;
42
-
let mut record_stream = std::pin::pin!(v.stream());
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
43
34
44
35
log::info!("got commit: {commit:?}");
45
36
46
-
while let Some((rkey, _rec)) = record_stream.try_next().await? {
47
-
log::info!("got {rkey:?}");
37
+
let mut n = 0;
38
+
while let Some(pairs) = driver.next_chunk(256).await? {
39
+
n += pairs.len();
40
+
// log::info!("got {rkey:?}");
48
41
}
49
-
log::info!("bye!");
42
+
log::info!("bye! total records={n}");
50
43
51
44
Ok(())
52
45
}
+81
-7
readme.md
+81
-7
readme.md
···
1
1
# repo-stream
2
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
}
54
+
};
55
+
println!("sum of size of all records: {total_size}");
56
+
Ok(())
57
+
}
58
+
```
59
+
60
+
more recent todo
61
+
62
+
- [ ] get an *emtpy* car for the test suite
63
+
- [x] implement a max size on disk limit
64
+
65
+
66
+
-----
67
+
68
+
older stuff (to clean up):
4
69
5
70
6
71
current car processing times (records processed into their length usize, phil's dev machine):
···
23
88
todo
24
89
25
90
- [x] car file test fixtures & validation tests
26
-
- [ ] make sure we can get the did and signature out for verification
91
+
- [x] make sure we can get the did and signature out for verification
92
+
-> yeah the commit is returned from init
27
93
- [ ] spec compliance todos
28
-
- [ ] assert that keys are ordered and fail if not
29
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
94
+
- [x] assert that keys are ordered and fail if not
95
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
30
96
- [ ] performance todos
31
-
- [ ] consume the serialized nodes into a mutable efficient format
97
+
- [x] consume the serialized nodes into a mutable efficient format
32
98
- [ ] maybe customize the deserialize impl to do that directly?
33
99
- [x] benchmark and profile
34
100
- [ ] robustness todos
35
101
- [ ] swap the blocks hashmap for a BlockStore trait that can be dumped to redb
36
102
- [ ] maybe keep the redb function behind a feature flag?
37
103
- [ ] can we assert a max size for node blocks?
38
-
- [ ] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting
104
+
- [x] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting
105
+
-> because it's the upper 3 bytes, not upper 4 byte nibble, oops.
39
106
- [ ] max mst depth (there is actually a hard limit but a malicious repo could do anything)
40
-
- [ ] i don't think we need a max recursion depth for processing cbor contents since we leave records to the user to decode
107
+
- [ ] i don't *think* we need a max recursion depth for processing cbor contents since we leave records to the user to decode
41
108
42
109
newer ideas
43
110
···
64
131
- either just generally to handle huge CARs, or as a fallback when streaming fails
65
132
66
133
redb has an in-memory backend, so it would be possible to *always* use it for block caching. user can choose if they want to allow disk or just do memory, and then "spilling" from the cache to disk would be mostly free?
134
+
135
+
136
+
## license
137
+
138
+
This work is dual-licensed under MIT and Apache 2.0. You can choose between one of them if you use this work.
139
+
140
+
`SPDX-License-Identifier: MIT OR Apache-2.0`
+162
src/disk.rs
+162
src/disk.rs
···
1
+
/*!
2
+
Disk storage for blocks on disk
3
+
4
+
Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5
+
to be the best behaved in terms of both on-disk space usage and memory usage.
6
+
7
+
```no_run
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
+
# #[tokio::main]
10
+
# async fn main() -> Result<(), DiskError> {
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
15
+
# Ok(())
16
+
# }
17
+
```
18
+
*/
19
+
20
+
use crate::drive::DriveError;
21
+
use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy};
22
+
use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
23
+
use std::path::PathBuf;
24
+
25
+
#[derive(Debug, thiserror::Error)]
26
+
pub enum DiskError {
27
+
/// A wrapped database error
28
+
///
29
+
/// (The wrapped err should probably be obscured to remove public-facing
30
+
/// sqlite bits)
31
+
#[error(transparent)]
32
+
DbError(#[from] FjallError),
33
+
/// A tokio blocking task failed to join
34
+
#[error("Failed to join a tokio blocking task: {0}")]
35
+
JoinError(#[from] tokio::task::JoinError),
36
+
/// The total size of stored blocks exceeded the allowed size
37
+
///
38
+
/// If you need to process *really* big CARs, you can configure a higher
39
+
/// limit.
40
+
#[error("Maximum disk size reached")]
41
+
MaxSizeExceeded,
42
+
}
43
+
44
+
/// Builder-style disk store setup
45
+
#[derive(Debug, Clone)]
46
+
pub struct DiskBuilder {
47
+
/// Database in-memory cache allowance
48
+
///
49
+
/// Default: 32 MiB
50
+
pub cache_size_mb: usize,
51
+
/// Database stored block size limit
52
+
///
53
+
/// Default: 10 GiB
54
+
///
55
+
/// Note: actual size on disk may be more, but should approximately scale
56
+
/// with this limit
57
+
pub max_stored_mb: usize,
58
+
}
59
+
60
+
impl Default for DiskBuilder {
61
+
fn default() -> Self {
62
+
Self {
63
+
cache_size_mb: 64,
64
+
max_stored_mb: 10 * 1024, // 10 GiB
65
+
}
66
+
}
67
+
}
68
+
69
+
impl DiskBuilder {
70
+
/// Begin configuring the storage with defaults
71
+
pub fn new() -> Self {
72
+
Default::default()
73
+
}
74
+
/// Set the in-memory cache allowance for the database
75
+
///
76
+
/// Default: 64 MiB
77
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
78
+
self.cache_size_mb = size;
79
+
self
80
+
}
81
+
/// Set the approximate stored block size limit
82
+
///
83
+
/// Default: 10 GiB
84
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
85
+
self.max_stored_mb = max;
86
+
self
87
+
}
88
+
/// Open and initialize the actual disk storage
89
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
90
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
91
+
}
92
+
}
93
+
94
+
/// On-disk block storage
95
+
pub struct DiskStore {
96
+
#[allow(unused)]
97
+
db: Database,
98
+
partition: Keyspace,
99
+
max_stored: usize,
100
+
stored: usize,
101
+
}
102
+
103
+
impl DiskStore {
104
+
/// Initialize a new disk store
105
+
pub async fn new(
106
+
path: PathBuf,
107
+
cache_mb: usize,
108
+
max_stored_mb: usize,
109
+
) -> Result<Self, DiskError> {
110
+
let max_stored = max_stored_mb * 2_usize.pow(20);
111
+
let (db, partition) = tokio::task::spawn_blocking(move || {
112
+
let db = Database::builder(path)
113
+
// .manual_journal_persist(true)
114
+
// .flush_workers(1)
115
+
// .compaction_workers(1)
116
+
.journal_compression(CompressionType::None)
117
+
.cache_size(cache_mb as u64 * 2_u64.pow(20))
118
+
.temporary(true)
119
+
.open()?;
120
+
let opts = KeyspaceCreateOptions::default()
121
+
.data_block_restart_interval_policy(RestartIntervalPolicy::all(8))
122
+
.filter_block_pinning_policy(PinningPolicy::disabled())
123
+
.expect_point_read_hits(true)
124
+
.data_block_compression_policy(CompressionPolicy::disabled())
125
+
.manual_journal_persist(true)
126
+
.max_memtable_size(32 * 2_u64.pow(20));
127
+
let partition = db.keyspace("z", || opts)?;
128
+
129
+
Ok::<_, DiskError>((db, partition))
130
+
})
131
+
.await??;
132
+
133
+
Ok(Self {
134
+
db,
135
+
partition,
136
+
max_stored,
137
+
stored: 0,
138
+
})
139
+
}
140
+
141
+
pub(crate) fn put_many(
142
+
&mut self,
143
+
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
144
+
) -> Result<(), DriveError> {
145
+
let mut batch = self.db.batch();
146
+
for pair in kv {
147
+
let (k, v) = pair?;
148
+
self.stored += v.len();
149
+
if self.stored > self.max_stored {
150
+
return Err(DiskError::MaxSizeExceeded.into());
151
+
}
152
+
batch.insert(&self.partition, k, v);
153
+
}
154
+
batch.commit().map_err(DiskError::DbError)?;
155
+
Ok(())
156
+
}
157
+
158
+
#[inline]
159
+
pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> {
160
+
self.partition.get(key)
161
+
}
162
+
}
+514
-107
src/drive.rs
+514
-107
src/drive.rs
···
1
-
use futures::{Stream, TryStreamExt};
1
+
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
+
3
+
use crate::disk::{DiskError, DiskStore};
4
+
use crate::process::Processable;
2
5
use ipld_core::cid::Cid;
6
+
use iroh_car::CarReader;
7
+
use serde::{Deserialize, Serialize};
3
8
use std::collections::HashMap;
4
-
use std::error::Error;
9
+
use std::convert::Infallible;
10
+
use tokio::{io::AsyncRead, sync::mpsc};
5
11
6
12
use crate::mst::{Commit, Node};
7
-
use crate::walk::{Step, Trip, Walker};
13
+
use crate::walk::{Step, WalkError, Walker};
8
14
15
+
/// Errors that can happen while consuming and emitting blocks and records
9
16
#[derive(Debug, thiserror::Error)]
10
-
pub enum DriveError<E: Error> {
11
-
#[error("Failed to initialize CarReader: {0}")]
17
+
pub enum DriveError {
18
+
#[error("Error from iroh_car: {0}")]
12
19
CarReader(#[from] iroh_car::Error),
13
-
#[error("CAR file requires a root to be present")]
14
-
MissingRoot,
15
-
#[error("Car block stream error: {0}")]
16
-
CarBlockError(Box<dyn Error>),
17
20
#[error("Failed to decode commit block: {0}")]
18
-
BadCommit(Box<dyn Error>),
19
-
#[error("Failed to decode record block: {0}")]
20
-
BadRecord(Box<dyn Error>),
21
+
BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
21
22
#[error("The Commit block reference by the root was not found")]
22
23
MissingCommit,
23
24
#[error("The MST block {0} could not be found")]
24
25
MissingBlock(Cid),
25
26
#[error("Failed to walk the mst tree: {0}")]
26
-
Tripped(#[from] Trip<E>),
27
-
#[error("Not finished walking, but no more blocks are available to continue")]
28
-
Dnf,
27
+
WalkError(#[from] WalkError),
28
+
#[error("CAR file had no roots")]
29
+
MissingRoot,
30
+
#[error("Storage error")]
31
+
StorageError(#[from] DiskError),
32
+
#[error("Encode error: {0}")]
33
+
BincodeEncodeError(#[from] bincode::error::EncodeError),
34
+
#[error("Tried to send on a closed channel")]
35
+
ChannelSendError, // SendError takes <T> which we don't need
36
+
#[error("Failed to join a task: {0}")]
37
+
JoinError(#[from] tokio::task::JoinError),
29
38
}
30
39
31
-
type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
40
+
#[derive(Debug, thiserror::Error)]
41
+
pub enum DecodeError {
42
+
#[error(transparent)]
43
+
BincodeDecodeError(#[from] bincode::error::DecodeError),
44
+
#[error("extra bytes remained after decoding")]
45
+
ExtraGarbage,
46
+
}
32
47
33
-
#[derive(Debug)]
34
-
pub struct Rkey(pub String);
48
+
/// An in-order chunk of Rkey + (processed) Block pairs
49
+
pub type BlockChunk<T> = Vec<(String, T)>;
35
50
36
-
#[derive(Debug)]
37
-
pub enum MaybeProcessedBlock<T, E> {
51
+
#[derive(Debug, Clone, Serialize, Deserialize)]
52
+
pub(crate) enum MaybeProcessedBlock<T> {
38
53
/// A block that's *probably* a Node (but we can't know yet)
39
54
///
40
55
/// It *can be* a record that suspiciously looks a lot like a node, so we
···
53
68
/// If we _never_ needed this block, then we may have wasted a bit of effort
54
69
/// trying to process it. Oh well.
55
70
///
56
-
/// It would be nice to store the real error type from the processing
57
-
/// function, but I'm leaving that generics puzzle for later.
58
-
///
59
71
/// There's an alternative here, which would be to kick unprocessable blocks
60
72
/// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
61
73
/// surface the typed error later if needed by trying to reprocess.
62
-
Processed(Result<T, E>),
74
+
Processed(T),
75
+
}
76
+
77
+
impl<T: Processable> Processable for MaybeProcessedBlock<T> {
78
+
/// TODO this is probably a little broken
79
+
fn get_size(&self) -> usize {
80
+
use std::{cmp::max, mem::size_of};
81
+
82
+
// enum is always as big as its biggest member?
83
+
let base_size = max(size_of::<Vec<u8>>(), size_of::<T>());
84
+
85
+
let extra = match self {
86
+
Self::Raw(bytes) => bytes.len(),
87
+
Self::Processed(t) => t.get_size(),
88
+
};
89
+
90
+
base_size + extra
91
+
}
63
92
}
64
93
65
-
// TODO: generic error not box dyn nonsense.
66
-
pub type ProcRes<T, E> = Result<T, E>;
94
+
impl<T> MaybeProcessedBlock<T> {
95
+
fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self {
96
+
if Node::could_be(&data) {
97
+
MaybeProcessedBlock::Raw(data)
98
+
} else {
99
+
MaybeProcessedBlock::Processed(process(data))
100
+
}
101
+
}
102
+
}
103
+
104
+
/// Read a CAR file, buffering blocks in memory or to disk
105
+
pub enum Driver<R: AsyncRead + Unpin, T: Processable> {
106
+
/// All blocks fit within the memory limit
107
+
///
108
+
/// You probably want to check the commit's signature. You can go ahead and
109
+
/// walk the MST right away.
110
+
Memory(Commit, MemDriver<T>),
111
+
/// Blocks exceed the memory limit
112
+
///
113
+
/// You'll need to provide a disk storage to continue. The commit will be
114
+
/// returned and can be validated only once all blocks are loaded.
115
+
Disk(NeedDisk<R, T>),
116
+
}
117
+
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
123
+
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
+
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
+
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
163
+
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
67
172
68
-
pub struct Vehicle<SE, S, T, P, PE>
69
-
where
70
-
S: Stream<Item = CarBlock<SE>>,
71
-
P: Fn(&[u8]) -> ProcRes<T, PE>,
72
-
PE: Error,
73
-
{
74
-
block_stream: S,
75
-
blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
76
-
walker: Walker,
77
-
process: P,
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
+
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
78
188
}
79
189
80
-
impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
81
-
where
82
-
SE: Error + 'static,
83
-
S: Stream<Item = CarBlock<SE>> + Unpin,
84
-
P: Fn(&[u8]) -> ProcRes<T, PE>,
85
-
PE: Error,
86
-
{
87
-
pub async fn init(
88
-
root: Cid,
89
-
mut block_stream: S,
90
-
process: P,
91
-
) -> Result<(Commit, Self), DriveError<PE>> {
92
-
let mut blocks = HashMap::new();
190
+
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
191
+
/// Begin processing an atproto MST from a CAR file
192
+
///
193
+
/// Blocks will be loaded, processed, and buffered in memory. If the entire
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
196
+
///
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
198
+
/// partial state will be returned as `Driver::Disk(needed)`, which can be
199
+
/// resumed by providing a `SqliteStorage` for on-disk block storage.
200
+
pub async fn load_car(
201
+
reader: R,
202
+
process: fn(Vec<u8>) -> T,
203
+
mem_limit_mb: usize,
204
+
) -> Result<Driver<R, T>, DriveError> {
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
206
+
let mut mem_blocks = HashMap::new();
207
+
208
+
let mut car = CarReader::new(reader).await?;
209
+
210
+
let root = *car
211
+
.header()
212
+
.roots()
213
+
.first()
214
+
.ok_or(DriveError::MissingRoot)?;
215
+
log::debug!("root: {root:?}");
93
216
94
217
let mut commit = None;
95
218
96
-
while let Some((cid, data)) = block_stream
97
-
.try_next()
98
-
.await
99
-
.map_err(|e| DriveError::CarBlockError(e.into()))?
100
-
{
219
+
// try to load all the blocks into memory
220
+
let mut mem_size = 0;
221
+
while let Some((cid, data)) = car.next_block().await? {
222
+
// the root commit is a Special Third Kind of block that we need to make
223
+
// sure not to optimistically send to the processing function
101
224
if cid == root {
102
-
let c: Commit = serde_ipld_dagcbor::from_slice(&data)
103
-
.map_err(|e| DriveError::BadCommit(e.into()))?;
225
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
104
226
commit = Some(c);
105
-
break; // inner while
106
-
} else {
107
-
blocks.insert(
108
-
cid,
109
-
if Node::could_be(&data) {
110
-
MaybeProcessedBlock::Raw(data)
111
-
} else {
112
-
MaybeProcessedBlock::Processed(process(&data))
113
-
},
114
-
);
227
+
continue;
228
+
}
229
+
230
+
// remaining possible types: node, record, other. optimistically process
231
+
let maybe_processed = MaybeProcessedBlock::maybe(process, data);
232
+
233
+
// stash (maybe processed) blocks in memory as long as we have room
234
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
235
+
mem_blocks.insert(cid, maybe_processed);
236
+
if mem_size >= max_size {
237
+
return Ok(Driver::Disk(NeedDisk {
238
+
car,
239
+
root,
240
+
process,
241
+
max_size,
242
+
mem_blocks,
243
+
commit,
244
+
}));
115
245
}
116
246
}
117
247
118
-
// we either broke out or read all the blocks without finding the commit...
248
+
// all blocks loaded and we fit in memory! hopefully we found the commit...
119
249
let commit = commit.ok_or(DriveError::MissingCommit)?;
120
250
121
251
let walker = Walker::new(commit.data);
122
252
123
-
let me = Self {
124
-
block_stream,
125
-
blocks,
126
-
walker,
127
-
process,
128
-
};
129
-
Ok((commit, me))
253
+
Ok(Driver::Memory(
254
+
commit,
255
+
MemDriver {
256
+
blocks: mem_blocks,
257
+
walker,
258
+
process,
259
+
},
260
+
))
261
+
}
262
+
}
263
+
264
+
/// The core driver between the block stream and MST walker
265
+
///
266
+
/// In the future, PDSs will export CARs in a stream-friendly order that will
267
+
/// enable processing them with tiny memory overhead. But that future is not
268
+
/// here yet.
269
+
///
270
+
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
271
+
/// optimistic stream features: we load all block first, then walk the MST.
272
+
///
273
+
/// This makes things much simpler: we only need to worry about spilling to disk
274
+
/// in one place, and we always have a reasonable expecatation about how much
275
+
/// work the init function will do. We can drop the CAR reader before walking,
276
+
/// so the sync/async boundaries become a little easier to work around.
277
+
#[derive(Debug)]
278
+
pub struct MemDriver<T: Processable> {
279
+
blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
280
+
walker: Walker,
281
+
process: fn(Vec<u8>) -> T,
282
+
}
283
+
284
+
impl<T: Processable> MemDriver<T> {
285
+
/// Step through the record outputs, in rkey order
286
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
287
+
let mut out = Vec::with_capacity(n);
288
+
for _ in 0..n {
289
+
// walk as far as we can until we run out of blocks or find a record
290
+
match self.walker.step(&mut self.blocks, self.process)? {
291
+
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
292
+
Step::Finish => break,
293
+
Step::Found { rkey, data } => {
294
+
out.push((rkey, data));
295
+
continue;
296
+
}
297
+
};
298
+
}
299
+
300
+
if out.is_empty() {
301
+
Ok(None)
302
+
} else {
303
+
Ok(Some(out))
304
+
}
305
+
}
306
+
}
307
+
308
+
/// A partially memory-loaded car file that needs disk spillover to continue
309
+
pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> {
310
+
car: CarReader<R>,
311
+
root: Cid,
312
+
process: fn(Vec<u8>) -> T,
313
+
max_size: usize,
314
+
mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
315
+
pub commit: Option<Commit>,
316
+
}
317
+
318
+
fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> {
319
+
bincode::serde::encode_to_vec(v, bincode::config::standard())
320
+
}
321
+
322
+
pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> {
323
+
let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
324
+
if n != bytes.len() {
325
+
return Err(DecodeError::ExtraGarbage);
130
326
}
327
+
Ok(t)
328
+
}
131
329
132
-
async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> {
133
-
while let Some((cid, data)) = self
134
-
.block_stream
135
-
.try_next()
136
-
.await
137
-
.map_err(|e| DriveError::CarBlockError(e.into()))?
138
-
{
139
-
self.blocks.insert(
140
-
cid,
141
-
if Node::could_be(&data) {
142
-
MaybeProcessedBlock::Raw(data)
143
-
} else {
144
-
MaybeProcessedBlock::Processed((self.process)(&data))
145
-
},
146
-
);
147
-
if cid == cid_needed {
148
-
return Ok(());
330
+
impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> {
331
+
pub async fn finish_loading(
332
+
mut self,
333
+
mut store: DiskStore,
334
+
) -> Result<(Commit, DiskDriver<T>), DriveError> {
335
+
// move store in and back out so we can manage lifetimes
336
+
// dump mem blocks into the store
337
+
store = tokio::task::spawn(async move {
338
+
let kvs = self
339
+
.mem_blocks
340
+
.into_iter()
341
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
342
+
343
+
store.put_many(kvs)?;
344
+
Ok::<_, DriveError>(store)
345
+
})
346
+
.await??;
347
+
348
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
349
+
350
+
let store_worker = tokio::task::spawn_blocking(move || {
351
+
while let Some(chunk) = rx.blocking_recv() {
352
+
let kvs = chunk
353
+
.into_iter()
354
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
355
+
store.put_many(kvs)?;
356
+
}
357
+
Ok::<_, DriveError>(store)
358
+
}); // await later
359
+
360
+
// dump the rest to disk (in chunks)
361
+
log::debug!("dumping the rest of the stream...");
362
+
loop {
363
+
let mut mem_size = 0;
364
+
let mut chunk = vec![];
365
+
loop {
366
+
let Some((cid, data)) = self.car.next_block().await? else {
367
+
break;
368
+
};
369
+
// we still gotta keep checking for the root since we might not have it
370
+
if cid == self.root {
371
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
372
+
self.commit = Some(c);
373
+
continue;
374
+
}
375
+
// remaining possible types: node, record, other. optimistically process
376
+
// TODO: get the actual in-memory size to compute disk spill
377
+
let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
378
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
379
+
chunk.push((cid, maybe_processed));
380
+
if mem_size >= self.max_size {
381
+
// soooooo if we're setting the db cache to max_size and then letting
382
+
// multiple chunks in the queue that are >= max_size, then at any time
383
+
// we might be using some multiple of max_size?
384
+
break;
385
+
}
386
+
}
387
+
if chunk.is_empty() {
388
+
break;
149
389
}
390
+
tx.send(chunk)
391
+
.await
392
+
.map_err(|_| DriveError::ChannelSendError)?;
150
393
}
394
+
drop(tx);
395
+
log::debug!("done. waiting for worker to finish...");
151
396
152
-
// if we never found the block
153
-
Err(DriveError::MissingBlock(cid_needed))
397
+
store = store_worker.await??;
398
+
399
+
log::debug!("worker finished.");
400
+
401
+
let commit = self.commit.ok_or(DriveError::MissingCommit)?;
402
+
403
+
let walker = Walker::new(commit.data);
404
+
405
+
Ok((
406
+
commit,
407
+
DiskDriver {
408
+
process: self.process,
409
+
state: Some(BigState { store, walker }),
410
+
},
411
+
))
154
412
}
413
+
}
155
414
156
-
pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError<PE>> {
415
+
struct BigState {
416
+
store: DiskStore,
417
+
walker: Walker,
418
+
}
419
+
420
+
/// MST walker that reads from disk instead of an in-memory hashmap
421
+
pub struct DiskDriver<T: Clone> {
422
+
process: fn(Vec<u8>) -> T,
423
+
state: Option<BigState>,
424
+
}
425
+
426
+
// for doctests only
427
+
#[doc(hidden)]
428
+
pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> {
429
+
use crate::process::noop;
430
+
DiskDriver {
431
+
process: noop,
432
+
state: None,
433
+
}
434
+
}
435
+
436
+
impl<T: Processable + Send + 'static> DiskDriver<T> {
437
+
/// Walk the MST returning up to `n` rkey + record pairs
438
+
///
439
+
/// ```no_run
440
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
441
+
/// # #[tokio::main]
442
+
/// # async fn main() -> Result<(), DriveError> {
443
+
/// # let mut disk_driver = _get_fake_disk_driver();
444
+
/// while let Some(pairs) = disk_driver.next_chunk(256).await? {
445
+
/// for (rkey, record) in pairs {
446
+
/// println!("{rkey}: size={}", record.len());
447
+
/// }
448
+
/// }
449
+
/// # Ok(())
450
+
/// # }
451
+
/// ```
452
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
453
+
let process = self.process;
454
+
455
+
// state should only *ever* be None transiently while inside here
456
+
let mut state = self.state.take().expect("DiskDriver must have Some(state)");
457
+
458
+
// the big pain here is that we don't want to leave self.state in an
459
+
// invalid state (None), so all the error paths have to make sure it
460
+
// comes out again.
461
+
let (state, res) = tokio::task::spawn_blocking(
462
+
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
463
+
let mut out = Vec::with_capacity(n);
464
+
465
+
for _ in 0..n {
466
+
// walk as far as we can until we run out of blocks or find a record
467
+
let step = match state.walker.disk_step(&mut state.store, process) {
468
+
Ok(s) => s,
469
+
Err(e) => {
470
+
return (state, Err(e.into()));
471
+
}
472
+
};
473
+
match step {
474
+
Step::Missing(cid) => {
475
+
return (state, Err(DriveError::MissingBlock(cid)));
476
+
}
477
+
Step::Finish => break,
478
+
Step::Found { rkey, data } => out.push((rkey, data)),
479
+
};
480
+
}
481
+
482
+
(state, Ok::<_, DriveError>(out))
483
+
},
484
+
)
485
+
.await?; // on tokio JoinError, we'll be left with invalid state :(
486
+
487
+
// *must* restore state before dealing with the actual result
488
+
self.state = Some(state);
489
+
490
+
let out = res?;
491
+
492
+
if out.is_empty() {
493
+
Ok(None)
494
+
} else {
495
+
Ok(Some(out))
496
+
}
497
+
}
498
+
499
+
fn read_tx_blocking(
500
+
&mut self,
501
+
n: usize,
502
+
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
503
+
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
504
+
let BigState { store, walker } = self.state.as_mut().expect("valid state");
505
+
157
506
loop {
158
-
// walk as far as we can until we run out of blocks or find a record
159
-
let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? {
160
-
Step::Rest(cid) => cid,
161
-
Step::Finish => return Ok(None),
162
-
Step::Step { rkey, data } => return Ok(Some((Rkey(rkey), data))),
163
-
};
507
+
let mut out: BlockChunk<T> = Vec::with_capacity(n);
508
+
509
+
for _ in 0..n {
510
+
// walk as far as we can until we run out of blocks or find a record
511
+
512
+
let step = match walker.disk_step(store, self.process) {
513
+
Ok(s) => s,
514
+
Err(e) => return tx.blocking_send(Err(e.into())),
515
+
};
516
+
517
+
match step {
518
+
Step::Missing(cid) => {
519
+
return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
520
+
}
521
+
Step::Finish => return Ok(()),
522
+
Step::Found { rkey, data } => {
523
+
out.push((rkey, data));
524
+
continue;
525
+
}
526
+
};
527
+
}
164
528
165
-
// load blocks until we reach that cid
166
-
self.drive_until(cid_needed).await?;
529
+
if out.is_empty() {
530
+
break;
531
+
}
532
+
tx.blocking_send(Ok(out))?;
167
533
}
534
+
535
+
Ok(())
168
536
}
169
537
170
-
pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError<PE>>> {
171
-
futures::stream::try_unfold(self, |mut this| async move {
172
-
let maybe_record = this.next_record().await?;
173
-
Ok(maybe_record.map(|b| (b, this)))
174
-
})
538
+
/// Spawn the disk reading task into a tokio blocking thread
539
+
///
540
+
/// The idea is to avoid so much sending back and forth to the blocking
541
+
/// thread, letting a blocking task do all the disk reading work and sending
542
+
/// records and rkeys back through an `mpsc` channel instead.
543
+
///
544
+
/// This might also allow the disk work to continue while processing the
545
+
/// records. It's still not yet clear if this method actually has much
546
+
/// benefit over just using `.next_chunk(n)`.
547
+
///
548
+
/// ```no_run
549
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
550
+
/// # #[tokio::main]
551
+
/// # async fn main() -> Result<(), DriveError> {
552
+
/// # let mut disk_driver = _get_fake_disk_driver();
553
+
/// let (mut rx, join) = disk_driver.to_channel(512);
554
+
/// while let Some(recvd) = rx.recv().await {
555
+
/// let pairs = recvd?;
556
+
/// for (rkey, record) in pairs {
557
+
/// println!("{rkey}: size={}", record.len());
558
+
/// }
559
+
///
560
+
/// }
561
+
/// # Ok(())
562
+
/// # }
563
+
/// ```
564
+
pub fn to_channel(
565
+
mut self,
566
+
n: usize,
567
+
) -> (
568
+
mpsc::Receiver<Result<BlockChunk<T>, DriveError>>,
569
+
tokio::task::JoinHandle<Self>,
570
+
) {
571
+
let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1);
572
+
573
+
// sketch: this worker is going to be allowed to execute without a join handle
574
+
let chan_task = tokio::task::spawn_blocking(move || {
575
+
if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
576
+
log::debug!("big car reader exited early due to dropped receiver channel");
577
+
}
578
+
self
579
+
});
580
+
581
+
(rx, chan_task)
175
582
}
176
583
}
+83
-2
src/lib.rs
+83
-2
src/lib.rs
···
1
-
pub mod drive;
1
+
/*!
2
+
A robust CAR file -> MST walker for atproto
3
+
4
+
Small CARs have their blocks buffered in memory. If a configurable memory limit
5
+
is reached while reading blocks, CAR reading is suspended, and can be continued
6
+
by providing disk storage to buffer the CAR blocks instead.
7
+
8
+
A `process` function can be provided for tasks where records are transformed
9
+
into a smaller representation, to save memory (and disk) during block reading.
10
+
11
+
Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
12
+
`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
13
+
14
+
Some MST validations are applied
15
+
- Keys must appear in order
16
+
- Keys must be at the correct MST tree depth
17
+
18
+
`iroh_car` additionally applies a block size limit of `2MiB`.
19
+
20
+
```
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
+
23
+
# #[tokio::main]
24
+
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
+
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
+
let mut total_size = 0;
27
+
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
34
+
35
+
// if all blocks fit within memory
36
+
Driver::Memory(_commit, mut driver) => {
37
+
while let Some(chunk) = driver.next_chunk(256).await? {
38
+
for (_rkey, size) in chunk {
39
+
total_size += size;
40
+
}
41
+
}
42
+
},
43
+
44
+
// if the CAR was too big for in-memory processing
45
+
Driver::Disk(paused) => {
46
+
// set up a disk store we can spill to
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
48
+
// do the spilling, get back a (similar) driver
49
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
50
+
51
+
while let Some(chunk) = driver.next_chunk(256).await? {
52
+
for (_rkey, size) in chunk {
53
+
total_size += size;
54
+
}
55
+
}
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
# Ok(())
60
+
# }
61
+
```
62
+
63
+
Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
64
+
ahead and eagerly using disk I/O. This means you have to write a bit more code
65
+
to handle both cases, but it allows you to have finer control over resource
66
+
usage. For example, you can drive a number of parallel memory CAR workers, and
67
+
separately have a different number of disk workers picking up suspended disk
68
+
tasks from a queue.
69
+
70
+
Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
71
+
72
+
*/
73
+
2
74
pub mod mst;
3
-
pub mod walk;
75
+
mod walk;
76
+
77
+
pub mod disk;
78
+
pub mod drive;
79
+
pub mod process;
80
+
81
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
82
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
83
+
pub use mst::Commit;
84
+
pub use process::Processable;
+9
-10
src/mst.rs
+9
-10
src/mst.rs
···
39
39
/// MST node data schema
40
40
#[derive(Debug, Deserialize, PartialEq)]
41
41
#[serde(deny_unknown_fields)]
42
-
pub struct Node {
42
+
pub(crate) struct Node {
43
43
/// link to sub-tree Node on a lower level and with all keys sorting before
44
44
/// keys at this node
45
45
#[serde(rename = "l")]
···
62
62
/// so if a block *could be* a node, any record converter must postpone
63
63
/// processing. if it turns out it happens to be a very node-looking record,
64
64
/// well, sorry, it just has to only be processed later when that's known.
65
-
pub fn could_be(bytes: impl AsRef<[u8]>) -> bool {
65
+
pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool {
66
66
const NODE_FINGERPRINT: [u8; 3] = [
67
67
0xA2, // map length 2 (for "l" and "e" keys)
68
68
0x61, // text length 1
69
69
b'e', // "e" before "l" because map keys have to be lex-sorted
70
-
// 0x8?: "e" contains an array (0x8 nibble) of some length (low nib)
70
+
// 0x8?: "e" has array (0x100 upper 3 bits) of some length
71
71
];
72
72
let bytes = bytes.as_ref();
73
73
bytes.starts_with(&NODE_FINGERPRINT)
74
-
// && bytes.get(3).map(|b| b & 0xF0 == 0x80).unwrap_or(false)
74
+
&& bytes
75
+
.get(3)
76
+
.map(|b| b & 0b1110_0000 == 0x80)
77
+
.unwrap_or(false)
75
78
}
76
79
77
80
/// Check if a node has any entries
···
80
83
/// with an empty array of entries. This is the only situation in which a
81
84
/// tree may contain an empty leaf node which does not either contain keys
82
85
/// ("entries") or point to a sub-tree containing entries.
83
-
///
84
-
/// TODO: to me this is slightly unclear with respect to `l` (ask someone).
85
-
/// ...is that what "The top of the tree must not be a an empty node which
86
-
/// only points to a sub-tree." is referring to?
87
-
pub fn is_empty(&self) -> bool {
86
+
pub(crate) fn is_empty(&self) -> bool {
88
87
self.left.is_none() && self.entries.is_empty()
89
88
}
90
89
}
···
92
91
/// TreeEntry object
93
92
#[derive(Debug, Deserialize, PartialEq)]
94
93
#[serde(deny_unknown_fields)]
95
-
pub struct Entry {
94
+
pub(crate) struct Entry {
96
95
/// count of bytes shared with previous TreeEntry in this Node (if any)
97
96
#[serde(rename = "p")]
98
97
pub prefix_len: usize,
+108
src/process.rs
+108
src/process.rs
···
1
+
/*!
2
+
Record processor function output trait
3
+
4
+
The return type must satisfy the `Processable` trait, which requires:
5
+
6
+
- `Clone` because two rkeys can refer to the same record by CID, which may
7
+
only appear once in the CAR file.
8
+
- `Serialize + DeserializeOwned` so it can be spilled to disk.
9
+
10
+
One required function must be implemented, `get_size()`: this should return the
11
+
approximate total off-stack size of the type. (the on-stack size will be added
12
+
automatically via `std::mem::get_size`).
13
+
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
20
+
Here's a silly processing function that just collects 'eyy's found in the raw
21
+
record bytes
22
+
23
+
```
24
+
# use repo_stream::Processable;
25
+
# use serde::{Serialize, Deserialize};
26
+
#[derive(Debug, Clone, Serialize, Deserialize)]
27
+
struct Eyy(usize, String);
28
+
29
+
impl Processable for Eyy {
30
+
fn get_size(&self) -> usize {
31
+
// don't need to compute the usize, it's on the stack
32
+
self.1.capacity() // in-mem size from the string's capacity, in bytes
33
+
}
34
+
}
35
+
36
+
fn process(raw: Vec<u8>) -> Vec<Eyy> {
37
+
let mut out = Vec::new();
38
+
let to_find = "eyy".as_bytes();
39
+
for i in 0..(raw.len() - 3) {
40
+
if &raw[i..(i+3)] == to_find {
41
+
out.push(Eyy(i, "eyy".to_string()));
42
+
}
43
+
}
44
+
out
45
+
}
46
+
```
47
+
48
+
The memory sizing stuff is a little sketch but probably at least approximately
49
+
works.
50
+
*/
51
+
52
+
use serde::{Serialize, de::DeserializeOwned};
53
+
54
+
/// Output trait for record processing
55
+
pub trait Processable: Clone + Serialize + DeserializeOwned {
56
+
/// Any additional in-memory size taken by the processed type
57
+
///
58
+
/// Do not include stack size (`std::mem::size_of`)
59
+
fn get_size(&self) -> usize;
60
+
}
61
+
62
+
/// Processor that just returns the raw blocks
63
+
#[inline]
64
+
pub fn noop(block: Vec<u8>) -> Vec<u8> {
65
+
block
66
+
}
67
+
68
+
impl Processable for u8 {
69
+
fn get_size(&self) -> usize {
70
+
0
71
+
}
72
+
}
73
+
74
+
impl Processable for usize {
75
+
fn get_size(&self) -> usize {
76
+
0 // no additional space taken, just its stack size (newtype is free)
77
+
}
78
+
}
79
+
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
86
+
impl<Item: Sized + Processable> Processable for Vec<Item> {
87
+
fn get_size(&self) -> usize {
88
+
let slot_size = std::mem::size_of::<Item>();
89
+
let direct_size = slot_size * self.capacity();
90
+
let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum();
91
+
direct_size + items_referenced_size
92
+
}
93
+
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+278
-255
src/walk.rs
+278
-255
src/walk.rs
···
1
1
//! Depth-first MST traversal
2
2
3
-
use crate::drive::{MaybeProcessedBlock, ProcRes};
3
+
use crate::disk::DiskStore;
4
+
use crate::drive::{DecodeError, MaybeProcessedBlock};
4
5
use crate::mst::Node;
6
+
use crate::process::Processable;
5
7
use ipld_core::cid::Cid;
8
+
use sha2::{Digest, Sha256};
6
9
use std::collections::HashMap;
7
-
use std::error::Error;
10
+
use std::convert::Infallible;
8
11
12
+
/// Errors that can happen while walking
9
13
#[derive(Debug, thiserror::Error)]
10
-
pub enum Trip<E: Error> {
11
-
#[error("empty mst nodes are not allowed")]
12
-
NodeEmpty,
14
+
pub enum WalkError {
15
+
#[error("Failed to fingerprint commit block")]
16
+
BadCommitFingerprint,
13
17
#[error("Failed to decode commit block: {0}")]
14
-
BadCommit(Box<dyn std::error::Error>),
15
-
#[error("Failed to process record: {0}")]
16
-
RecordFailedProcessing(Box<dyn Error>),
18
+
BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
17
19
#[error("Action node error: {0}")]
18
-
ActionNode(#[from] ActionNodeError),
19
-
#[error("Process failed: {0}")]
20
-
ProcessFailed(E),
20
+
MstError(#[from] MstError),
21
+
#[error("storage error: {0}")]
22
+
StorageError(#[from] fjall::Error),
23
+
#[error("Decode error: {0}")]
24
+
DecodeError(#[from] DecodeError),
21
25
}
22
26
23
-
#[derive(Debug, thiserror::Error)]
24
-
pub enum ActionNodeError {
27
+
/// Errors from invalid Rkeys
28
+
#[derive(Debug, PartialEq, thiserror::Error)]
29
+
pub enum MstError {
25
30
#[error("Failed to compute an rkey due to invalid prefix_len")]
26
31
EntryPrefixOutOfbounds,
27
32
#[error("RKey was not utf-8")]
28
33
EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
34
+
#[error("Nodes cannot be empty (except for an entirely empty MST)")]
35
+
EmptyNode,
36
+
#[error("Found an entry with rkey at the wrong depth")]
37
+
WrongDepth,
38
+
#[error("Lost track of our depth (possible bug?)")]
39
+
LostDepth,
40
+
#[error("MST depth underflow: depth-0 node with child trees")]
41
+
DepthUnderflow,
42
+
#[error("Encountered an rkey out of order while walking the MST")]
43
+
RkeyOutOfOrder,
29
44
}
30
45
46
+
/// Walker outputs
31
47
#[derive(Debug)]
32
48
pub enum Step<T> {
33
-
Rest(Cid),
49
+
/// We needed this CID but it's not in the block store
50
+
Missing(Cid),
51
+
/// Reached the end of the MST! yay!
34
52
Finish,
35
-
Step { rkey: String, data: T },
53
+
/// A record was found!
54
+
Found { rkey: String, data: T },
36
55
}
37
56
38
57
#[derive(Debug, Clone, PartialEq)]
39
58
enum Need {
40
-
Node(Cid),
59
+
Node { depth: Depth, cid: Cid },
41
60
Record { rkey: String, cid: Cid },
42
61
}
43
62
44
-
fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), ActionNodeError> {
45
-
let mut entries = Vec::with_capacity(node.entries.len());
63
+
#[derive(Debug, Clone, Copy, PartialEq)]
64
+
enum Depth {
65
+
Root,
66
+
Depth(u32),
67
+
}
46
68
69
+
impl Depth {
70
+
fn from_key(key: &[u8]) -> Self {
71
+
let mut zeros = 0;
72
+
for byte in Sha256::digest(key) {
73
+
let leading = byte.leading_zeros();
74
+
zeros += leading;
75
+
if leading < 8 {
76
+
break;
77
+
}
78
+
}
79
+
Self::Depth(zeros / 2) // truncating divide (rounds down)
80
+
}
81
+
fn next_expected(&self) -> Result<Option<u32>, MstError> {
82
+
match self {
83
+
Self::Root => Ok(None),
84
+
Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some),
85
+
}
86
+
}
87
+
}
88
+
89
+
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
+
// empty nodes are not allowed in the MST except in an empty MST
91
+
if node.is_empty() {
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
97
+
}
98
+
99
+
let mut entries = Vec::with_capacity(node.entries.len());
47
100
let mut prefix = vec![];
101
+
let mut this_depth = parent_depth.next_expected()?;
102
+
48
103
for entry in &node.entries {
49
104
let mut rkey = vec![];
50
105
let pre_checked = prefix
51
106
.get(..entry.prefix_len)
52
-
.ok_or(ActionNodeError::EntryPrefixOutOfbounds)?;
107
+
.ok_or(MstError::EntryPrefixOutOfbounds)?;
53
108
rkey.extend_from_slice(pre_checked);
54
109
rkey.extend_from_slice(&entry.keysuffix);
110
+
111
+
let Depth::Depth(key_depth) = Depth::from_key(&rkey) else {
112
+
return Err(MstError::WrongDepth);
113
+
};
114
+
115
+
// this_depth is `none` if we are the deepest child (directly below root)
116
+
// in that case we accept whatever highest depth is claimed
117
+
let expected_depth = match this_depth {
118
+
Some(d) => d,
119
+
None => {
120
+
this_depth = Some(key_depth);
121
+
key_depth
122
+
}
123
+
};
124
+
125
+
// all keys we find should be this depth
126
+
if key_depth != expected_depth {
127
+
return Err(MstError::DepthUnderflow);
128
+
}
129
+
55
130
prefix = rkey.clone();
56
131
57
132
entries.push(Need::Record {
···
59
134
cid: entry.value,
60
135
});
61
136
if let Some(ref tree) = entry.tree {
62
-
entries.push(Need::Node(*tree));
137
+
entries.push(Need::Node {
138
+
depth: Depth::Depth(key_depth),
139
+
cid: *tree,
140
+
});
63
141
}
64
142
}
65
143
66
144
entries.reverse();
67
145
stack.append(&mut entries);
68
146
147
+
let d = this_depth.ok_or(MstError::LostDepth)?;
148
+
69
149
if let Some(tree) = node.left {
70
-
stack.push(Need::Node(tree));
150
+
stack.push(Need::Node {
151
+
depth: Depth::Depth(d),
152
+
cid: tree,
153
+
});
71
154
}
72
155
Ok(())
73
156
}
74
157
158
+
/// Traverser of an atproto MST
159
+
///
160
+
/// Walks the tree from left-to-right in depth-first order
75
161
#[derive(Debug)]
76
162
pub struct Walker {
77
163
stack: Vec<Need>,
164
+
prev: String,
78
165
}
79
166
80
167
impl Walker {
81
168
pub fn new(tree_root_cid: Cid) -> Self {
82
169
Self {
83
-
stack: vec![Need::Node(tree_root_cid)],
170
+
stack: vec![Need::Node {
171
+
depth: Depth::Root,
172
+
cid: tree_root_cid,
173
+
}],
174
+
prev: "".to_string(),
84
175
}
85
176
}
86
177
87
-
pub fn walk<T: Clone, E: Error>(
178
+
/// Advance through nodes until we find a record or can't go further
179
+
pub fn step<T: Processable>(
88
180
&mut self,
89
-
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>,
90
-
process: impl Fn(&[u8]) -> ProcRes<T, E>,
91
-
) -> Result<Step<T>, Trip<E>> {
181
+
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>,
182
+
process: impl Fn(Vec<u8>) -> T,
183
+
) -> Result<Step<T>, WalkError> {
92
184
loop {
93
-
let Some(mut need) = self.stack.last() else {
185
+
let Some(need) = self.stack.last_mut() else {
94
186
log::trace!("tried to walk but we're actually done.");
95
187
return Ok(Step::Finish);
96
188
};
97
189
98
-
match &mut need {
99
-
Need::Node(cid) => {
190
+
match need {
191
+
&mut Need::Node { depth, cid } => {
100
192
log::trace!("need node {cid:?}");
101
-
let Some(block) = blocks.remove(cid) else {
193
+
let Some(block) = blocks.remove(&cid) else {
102
194
log::trace!("node not found, resting");
103
-
return Ok(Step::Rest(*cid));
195
+
return Ok(Step::Missing(cid));
104
196
};
105
197
106
198
let MaybeProcessedBlock::Raw(data) = block else {
107
-
return Err(Trip::BadCommit("failed commit fingerprint".into()));
199
+
return Err(WalkError::BadCommitFingerprint);
108
200
};
109
201
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
110
-
.map_err(|e| Trip::BadCommit(e.into()))?;
202
+
.map_err(WalkError::BadCommit)?;
111
203
112
204
// found node, make sure we remember
113
205
self.stack.pop();
114
206
115
207
// queue up work on the found node next
116
-
push_from_node(&mut self.stack, &node)?;
208
+
push_from_node(&mut self.stack, &node, depth)?;
117
209
}
118
210
Need::Record { rkey, cid } => {
119
211
log::trace!("need record {cid:?}");
212
+
// note that we cannot *remove* a record block, sadly, since
213
+
// there can be multiple rkeys pointing to the same cid.
120
214
let Some(data) = blocks.get_mut(cid) else {
215
+
return Ok(Step::Missing(*cid));
216
+
};
217
+
let rkey = rkey.clone();
218
+
let data = match data {
219
+
MaybeProcessedBlock::Raw(data) => process(data.to_vec()),
220
+
MaybeProcessedBlock::Processed(t) => t.clone(),
221
+
};
222
+
223
+
// found node, make sure we remember
224
+
self.stack.pop();
225
+
226
+
// rkeys *must* be in order or else the tree is invalid (or
227
+
// we have a bug)
228
+
if rkey <= self.prev {
229
+
return Err(MstError::RkeyOutOfOrder)?;
230
+
}
231
+
self.prev = rkey.clone();
232
+
233
+
return Ok(Step::Found { rkey, data });
234
+
}
235
+
}
236
+
}
237
+
}
238
+
239
+
/// blocking!!!!!!
240
+
pub fn disk_step<T: Processable>(
241
+
&mut self,
242
+
reader: &mut DiskStore,
243
+
process: impl Fn(Vec<u8>) -> T,
244
+
) -> Result<Step<T>, WalkError> {
245
+
loop {
246
+
let Some(need) = self.stack.last_mut() else {
247
+
log::trace!("tried to walk but we're actually done.");
248
+
return Ok(Step::Finish);
249
+
};
250
+
251
+
match need {
252
+
&mut Need::Node { depth, cid } => {
253
+
let cid_bytes = cid.to_bytes();
254
+
log::trace!("need node {cid:?}");
255
+
let Some(block_bytes) = reader.get(&cid_bytes)? else {
256
+
log::trace!("node not found, resting");
257
+
return Ok(Step::Missing(cid));
258
+
};
259
+
260
+
let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?;
261
+
262
+
let MaybeProcessedBlock::Raw(data) = block else {
263
+
return Err(WalkError::BadCommitFingerprint);
264
+
};
265
+
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
266
+
.map_err(WalkError::BadCommit)?;
267
+
268
+
// found node, make sure we remember
269
+
self.stack.pop();
270
+
271
+
// queue up work on the found node next
272
+
push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?;
273
+
}
274
+
Need::Record { rkey, cid } => {
275
+
log::trace!("need record {cid:?}");
276
+
let cid_bytes = cid.to_bytes();
277
+
let Some(data_bytes) = reader.get(&cid_bytes)? else {
121
278
log::trace!("record block not found, resting");
122
-
return Ok(Step::Rest(*cid));
279
+
return Ok(Step::Missing(*cid));
123
280
};
281
+
let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?;
124
282
let rkey = rkey.clone();
125
283
let data = match data {
126
284
MaybeProcessedBlock::Raw(data) => process(data),
127
-
MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()),
128
-
bad => {
129
-
// big hack to pull the error out -- this corrupts
130
-
// a block, so we should not continue trying to work
131
-
let mut steal = MaybeProcessedBlock::Raw(vec![]);
132
-
std::mem::swap(&mut steal, bad);
133
-
let MaybeProcessedBlock::Processed(Err(e)) = steal else {
134
-
unreachable!();
135
-
};
136
-
return Err(Trip::ProcessFailed(e));
137
-
}
285
+
MaybeProcessedBlock::Processed(t) => t.clone(),
138
286
};
139
287
140
288
// found node, make sure we remember
141
289
self.stack.pop();
142
290
143
291
log::trace!("emitting a block as a step. depth={}", self.stack.len());
144
-
let data = data.map_err(Trip::ProcessFailed)?;
145
-
return Ok(Step::Step { rkey, data });
292
+
293
+
// rkeys *must* be in order or else the tree is invalid (or
294
+
// we have a bug)
295
+
if rkey <= self.prev {
296
+
return Err(MstError::RkeyOutOfOrder)?;
297
+
}
298
+
self.prev = rkey.clone();
299
+
300
+
return Ok(Step::Found { rkey, data });
146
301
}
147
302
}
148
303
}
···
152
307
#[cfg(test)]
153
308
mod test {
154
309
use super::*;
155
-
// use crate::mst::Entry;
156
310
157
311
fn cid1() -> Cid {
158
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
159
313
.parse()
160
314
.unwrap()
161
315
}
162
-
// fn cid2() -> Cid {
163
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
164
-
// .parse()
165
-
// .unwrap()
166
-
// }
167
-
// fn cid3() -> Cid {
168
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
169
-
// .parse()
170
-
// .unwrap()
171
-
// }
172
-
// fn cid4() -> Cid {
173
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
174
-
// .parse()
175
-
// .unwrap()
176
-
// }
177
-
// fn cid5() -> Cid {
178
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
179
-
// .parse()
180
-
// .unwrap()
181
-
// }
182
-
// fn cid6() -> Cid {
183
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
184
-
// .parse()
185
-
// .unwrap()
186
-
// }
187
-
// fn cid7() -> Cid {
188
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
189
-
// .parse()
190
-
// .unwrap()
191
-
// }
192
-
// fn cid8() -> Cid {
193
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
194
-
// .parse()
195
-
// .unwrap()
196
-
// }
197
-
// fn cid9() -> Cid {
198
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
199
-
// .parse()
200
-
// .unwrap()
201
-
// }
316
+
317
+
#[test]
318
+
fn test_depth_spec_0() {
319
+
let d = Depth::from_key(b"2653ae71");
320
+
assert_eq!(d, Depth::Depth(0))
321
+
}
322
+
323
+
#[test]
324
+
fn test_depth_spec_1() {
325
+
let d = Depth::from_key(b"blue");
326
+
assert_eq!(d, Depth::Depth(1))
327
+
}
328
+
329
+
#[test]
330
+
fn test_depth_spec_4() {
331
+
let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec");
332
+
assert_eq!(d, Depth::Depth(4))
333
+
}
334
+
335
+
#[test]
336
+
fn test_depth_spec_8() {
337
+
let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c");
338
+
assert_eq!(d, Depth::Depth(8))
339
+
}
340
+
341
+
#[test]
342
+
fn test_depth_ietf_draft_0() {
343
+
let d = Depth::from_key(b"key1");
344
+
assert_eq!(d, Depth::Depth(0))
345
+
}
346
+
347
+
#[test]
348
+
fn test_depth_ietf_draft_1() {
349
+
let d = Depth::from_key(b"key7");
350
+
assert_eq!(d, Depth::Depth(1))
351
+
}
352
+
353
+
#[test]
354
+
fn test_depth_ietf_draft_4() {
355
+
let d = Depth::from_key(b"key515");
356
+
assert_eq!(d, Depth::Depth(4))
357
+
}
358
+
359
+
#[test]
360
+
fn test_depth_interop() {
361
+
// examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json
362
+
for (k, expected) in [
363
+
("", 0),
364
+
("asdf", 0),
365
+
("blue", 1),
366
+
("2653ae71", 0),
367
+
("88bfafc7", 2),
368
+
("2a92d355", 4),
369
+
("884976f5", 6),
370
+
("app.bsky.feed.post/454397e440ec", 4),
371
+
("app.bsky.feed.post/9adeb165882c", 8),
372
+
] {
373
+
let d = Depth::from_key(k.as_bytes());
374
+
assert_eq!(d, Depth::Depth(expected), "key: {}", k);
375
+
}
376
+
}
202
377
203
378
#[test]
204
-
fn test_next_from_node_empty() {
205
-
let node = Node {
379
+
fn test_push_empty_fails() {
380
+
let empty_node = Node {
206
381
left: None,
207
382
entries: vec![],
208
383
};
209
384
let mut stack = vec![];
210
-
push_from_node(&mut stack, &node).unwrap();
211
-
assert_eq!(stack.last(), None);
385
+
let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4));
386
+
assert_eq!(err, Err(MstError::EmptyNode));
212
387
}
213
388
214
389
#[test]
215
-
fn test_needs_from_node_just_left() {
390
+
fn test_push_one_node() {
216
391
let node = Node {
217
392
left: Some(cid1()),
218
393
entries: vec![],
219
394
};
220
395
let mut stack = vec![];
221
-
push_from_node(&mut stack, &node).unwrap();
222
-
assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref());
396
+
push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap();
397
+
assert_eq!(
398
+
stack.last(),
399
+
Some(Need::Node {
400
+
depth: Depth::Depth(3),
401
+
cid: cid1()
402
+
})
403
+
.as_ref()
404
+
);
223
405
}
224
-
225
-
// #[test]
226
-
// fn test_needs_from_node_just_one_record() {
227
-
// let node = Node {
228
-
// left: None,
229
-
// entries: vec![Entry {
230
-
// keysuffix: "asdf".into(),
231
-
// prefix_len: 0,
232
-
// value: cid1(),
233
-
// tree: None,
234
-
// }],
235
-
// };
236
-
// assert_eq!(
237
-
// needs_from_node(node).unwrap(),
238
-
// vec![Need::Record {
239
-
// rkey: "asdf".into(),
240
-
// cid: cid1(),
241
-
// },]
242
-
// );
243
-
// }
244
-
245
-
// #[test]
246
-
// fn test_needs_from_node_two_records() {
247
-
// let node = Node {
248
-
// left: None,
249
-
// entries: vec![
250
-
// Entry {
251
-
// keysuffix: "asdf".into(),
252
-
// prefix_len: 0,
253
-
// value: cid1(),
254
-
// tree: None,
255
-
// },
256
-
// Entry {
257
-
// keysuffix: "gh".into(),
258
-
// prefix_len: 2,
259
-
// value: cid2(),
260
-
// tree: None,
261
-
// },
262
-
// ],
263
-
// };
264
-
// assert_eq!(
265
-
// needs_from_node(node).unwrap(),
266
-
// vec![
267
-
// Need::Record {
268
-
// rkey: "asdf".into(),
269
-
// cid: cid1(),
270
-
// },
271
-
// Need::Record {
272
-
// rkey: "asgh".into(),
273
-
// cid: cid2(),
274
-
// },
275
-
// ]
276
-
// );
277
-
// }
278
-
279
-
// #[test]
280
-
// fn test_needs_from_node_with_both() {
281
-
// let node = Node {
282
-
// left: None,
283
-
// entries: vec![Entry {
284
-
// keysuffix: "asdf".into(),
285
-
// prefix_len: 0,
286
-
// value: cid1(),
287
-
// tree: Some(cid2()),
288
-
// }],
289
-
// };
290
-
// assert_eq!(
291
-
// needs_from_node(node).unwrap(),
292
-
// vec![
293
-
// Need::Record {
294
-
// rkey: "asdf".into(),
295
-
// cid: cid1(),
296
-
// },
297
-
// Need::Node(cid2()),
298
-
// ]
299
-
// );
300
-
// }
301
-
302
-
// #[test]
303
-
// fn test_needs_from_node_left_and_record() {
304
-
// let node = Node {
305
-
// left: Some(cid1()),
306
-
// entries: vec![Entry {
307
-
// keysuffix: "asdf".into(),
308
-
// prefix_len: 0,
309
-
// value: cid2(),
310
-
// tree: None,
311
-
// }],
312
-
// };
313
-
// assert_eq!(
314
-
// needs_from_node(node).unwrap(),
315
-
// vec![
316
-
// Need::Node(cid1()),
317
-
// Need::Record {
318
-
// rkey: "asdf".into(),
319
-
// cid: cid2(),
320
-
// },
321
-
// ]
322
-
// );
323
-
// }
324
-
325
-
// #[test]
326
-
// fn test_needs_from_full_node() {
327
-
// let node = Node {
328
-
// left: Some(cid1()),
329
-
// entries: vec![
330
-
// Entry {
331
-
// keysuffix: "asdf".into(),
332
-
// prefix_len: 0,
333
-
// value: cid2(),
334
-
// tree: Some(cid3()),
335
-
// },
336
-
// Entry {
337
-
// keysuffix: "ghi".into(),
338
-
// prefix_len: 1,
339
-
// value: cid4(),
340
-
// tree: Some(cid5()),
341
-
// },
342
-
// Entry {
343
-
// keysuffix: "jkl".into(),
344
-
// prefix_len: 2,
345
-
// value: cid6(),
346
-
// tree: Some(cid7()),
347
-
// },
348
-
// Entry {
349
-
// keysuffix: "mno".into(),
350
-
// prefix_len: 4,
351
-
// value: cid8(),
352
-
// tree: Some(cid9()),
353
-
// },
354
-
// ],
355
-
// };
356
-
// assert_eq!(
357
-
// needs_from_node(node).unwrap(),
358
-
// vec![
359
-
// Need::Node(cid1()),
360
-
// Need::Record {
361
-
// rkey: "asdf".into(),
362
-
// cid: cid2(),
363
-
// },
364
-
// Need::Node(cid3()),
365
-
// Need::Record {
366
-
// rkey: "aghi".into(),
367
-
// cid: cid4(),
368
-
// },
369
-
// Need::Node(cid5()),
370
-
// Need::Record {
371
-
// rkey: "agjkl".into(),
372
-
// cid: cid6(),
373
-
// },
374
-
// Need::Node(cid7()),
375
-
// Need::Record {
376
-
// rkey: "agjkmno".into(),
377
-
// cid: cid8(),
378
-
// },
379
-
// Need::Node(cid9()),
380
-
// ]
381
-
// );
382
-
// }
383
406
}
+34
-31
tests/non-huge-cars.rs
+34
-31
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
2
+
use repo_stream::Driver;
5
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
9
8
10
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
11
-
let reader = CarReader::new(bytes).await.unwrap();
12
-
13
-
let root = reader
14
-
.header()
15
-
.roots()
16
-
.first()
17
-
.ok_or("missing root")
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
15
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
16
+
.await
18
17
.unwrap()
19
-
.clone();
20
-
21
-
let stream = std::pin::pin!(reader.stream());
22
-
23
-
let (_commit, v) =
24
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
25
-
.await
26
-
.unwrap();
27
-
let mut record_stream = std::pin::pin!(v.stream());
18
+
{
19
+
Driver::Memory(_commit, mem_driver) => mem_driver,
20
+
Driver::Disk(_) => panic!("too big"),
21
+
};
28
22
29
23
let mut records = 0;
30
24
let mut sum = 0;
31
25
let mut found_bsky_profile = false;
32
26
let mut prev_rkey = "".to_string();
33
-
while let Some((rkey, size)) = record_stream.try_next().await.unwrap() {
34
-
records += 1;
35
-
sum += size;
36
-
if rkey.0 == "app.bsky.actor.profile/self" {
37
-
found_bsky_profile = true;
27
+
28
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
29
+
for (rkey, size) in pairs {
30
+
records += 1;
31
+
sum += size;
32
+
if rkey == "app.bsky.actor.profile/self" {
33
+
found_bsky_profile = true;
34
+
}
35
+
assert!(rkey > prev_rkey, "rkeys are streamed in order");
36
+
prev_rkey = rkey;
38
37
}
39
-
assert!(rkey.0 > prev_rkey, "rkeys are streamed in order");
40
-
prev_rkey = rkey.0;
41
38
}
39
+
42
40
assert_eq!(records, expected_records);
43
41
assert_eq!(sum, expected_sum);
44
-
assert!(found_bsky_profile);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
45
48
}
46
49
47
50
#[tokio::test]
48
51
async fn test_tiny_car() {
49
-
test_car(TINY_CAR, 8, 2071).await
52
+
test_car(TINY_CAR, 8, 2071, true).await
50
53
}
51
54
52
55
#[tokio::test]
53
56
async fn test_little_car() {
54
-
test_car(LITTLE_CAR, 278, 246960).await
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
55
58
}
56
59
57
60
#[tokio::test]
58
61
async fn test_midsize_car() {
59
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
60
63
}