+170
-61
Cargo.lock
+170
-61
Cargo.lock
···
167
167
checksum = "46c5e41b57b8bba42a04676d81cb89e9ee8e859a1a66f80a5a72e1cb76b34d43"
168
168
169
169
[[package]]
170
+
name = "byteorder-lite"
171
+
version = "0.1.0"
172
+
source = "registry+https://github.com/rust-lang/crates.io-index"
173
+
checksum = "8f1fe948ff07f4bd06c30984e69f5b4899c516a3ef74f34df92a2df2ab535495"
174
+
175
+
[[package]]
170
176
name = "bytes"
171
177
version = "1.10.1"
172
178
source = "registry+https://github.com/rust-lang/crates.io-index"
173
179
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
180
+
181
+
[[package]]
182
+
name = "byteview"
183
+
version = "0.10.0"
184
+
source = "registry+https://github.com/rust-lang/crates.io-index"
185
+
checksum = "dda4398f387cc6395a3e93b3867cd9abda914c97a0b344d1eefb2e5c51785fca"
174
186
175
187
[[package]]
176
188
name = "cast"
···
281
293
checksum = "b05b61dc5112cbb17e4b6cd61790d9845d13888356391624cbe7e41efeac1e75"
282
294
283
295
[[package]]
296
+
name = "compare"
297
+
version = "0.0.6"
298
+
source = "registry+https://github.com/rust-lang/crates.io-index"
299
+
checksum = "ea0095f6103c2a8b44acd6fd15960c801dafebf02e21940360833e0673f48ba7"
300
+
301
+
[[package]]
284
302
name = "const-str"
285
303
version = "0.4.3"
286
304
source = "registry+https://github.com/rust-lang/crates.io-index"
···
358
376
]
359
377
360
378
[[package]]
379
+
name = "crossbeam-skiplist"
380
+
version = "0.1.3"
381
+
source = "registry+https://github.com/rust-lang/crates.io-index"
382
+
checksum = "df29de440c58ca2cc6e587ec3d22347551a32435fbde9d2bff64e78a9ffa151b"
383
+
dependencies = [
384
+
"crossbeam-epoch",
385
+
"crossbeam-utils",
386
+
]
387
+
388
+
[[package]]
361
389
name = "crossbeam-utils"
362
390
version = "0.8.21"
363
391
source = "registry+https://github.com/rust-lang/crates.io-index"
···
380
408
]
381
409
382
410
[[package]]
411
+
name = "dashmap"
412
+
version = "6.1.0"
413
+
source = "registry+https://github.com/rust-lang/crates.io-index"
414
+
checksum = "5041cc499144891f3790297212f32a74fb938e5136a14943f338ef9e0ae276cf"
415
+
dependencies = [
416
+
"cfg-if",
417
+
"crossbeam-utils",
418
+
"hashbrown 0.14.5",
419
+
"lock_api",
420
+
"once_cell",
421
+
"parking_lot_core",
422
+
]
423
+
424
+
[[package]]
383
425
name = "data-encoding"
384
426
version = "2.9.0"
385
427
source = "registry+https://github.com/rust-lang/crates.io-index"
···
422
464
checksum = "48c757948c5ede0e46177b7add2e67155f70e33c07fea8284df6576da70b3719"
423
465
424
466
[[package]]
467
+
name = "enum_dispatch"
468
+
version = "0.3.13"
469
+
source = "registry+https://github.com/rust-lang/crates.io-index"
470
+
checksum = "aa18ce2bc66555b3218614519ac839ddb759a7d6720732f979ef8d13be147ecd"
471
+
dependencies = [
472
+
"once_cell",
473
+
"proc-macro2",
474
+
"quote",
475
+
"syn 2.0.106",
476
+
]
477
+
478
+
[[package]]
425
479
name = "env_filter"
426
480
version = "0.1.3"
427
481
source = "registry+https://github.com/rust-lang/crates.io-index"
···
445
499
]
446
500
447
501
[[package]]
502
+
name = "equivalent"
503
+
version = "1.0.2"
504
+
source = "registry+https://github.com/rust-lang/crates.io-index"
505
+
checksum = "877a4ace8713b0bcf2a4e7eec82529c029f1d0619886d18145fea96c3ffe5c0f"
506
+
507
+
[[package]]
448
508
name = "errno"
449
509
version = "0.3.14"
450
510
source = "registry+https://github.com/rust-lang/crates.io-index"
···
455
515
]
456
516
457
517
[[package]]
458
-
name = "fallible-iterator"
459
-
version = "0.3.0"
518
+
name = "fastrand"
519
+
version = "2.3.0"
460
520
source = "registry+https://github.com/rust-lang/crates.io-index"
461
-
checksum = "2acce4a10f12dc2fb14a218589d4f1f62ef011b2d0cc4b3cb1bba8e94da14649"
521
+
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
462
522
463
523
[[package]]
464
-
name = "fallible-streaming-iterator"
465
-
version = "0.1.9"
524
+
name = "fjall"
525
+
version = "3.0.1"
466
526
source = "registry+https://github.com/rust-lang/crates.io-index"
467
-
checksum = "7360491ce676a36bf9bb3c56c1aa791658183a54d2744120f27285738d90465a"
527
+
checksum = "4f69637c02d38ad1b0f003101d0195a60368130aa17d9ef78b1557d265a22093"
528
+
dependencies = [
529
+
"byteorder-lite",
530
+
"byteview",
531
+
"dashmap",
532
+
"flume",
533
+
"log",
534
+
"lsm-tree",
535
+
"tempfile",
536
+
"xxhash-rust",
537
+
]
468
538
469
539
[[package]]
470
-
name = "fastrand"
471
-
version = "2.3.0"
540
+
name = "flume"
541
+
version = "0.12.0"
472
542
source = "registry+https://github.com/rust-lang/crates.io-index"
473
-
checksum = "37909eebbb50d72f9059c3b6d82c0463f2ff062c9e95845c43a6c9c0355411be"
474
-
475
-
[[package]]
476
-
name = "foldhash"
477
-
version = "0.1.5"
478
-
source = "registry+https://github.com/rust-lang/crates.io-index"
479
-
checksum = "d9c4f5dac5e15c24eb999c26181a6ca40b39fe946cbe4c263c7209467bc83af2"
543
+
checksum = "5e139bc46ca777eb5efaf62df0ab8cc5fd400866427e56c68b22e414e53bd3be"
544
+
dependencies = [
545
+
"spin",
546
+
]
480
547
481
548
[[package]]
482
549
name = "futures"
···
608
675
609
676
[[package]]
610
677
name = "hashbrown"
611
-
version = "0.15.5"
678
+
version = "0.14.5"
612
679
source = "registry+https://github.com/rust-lang/crates.io-index"
613
-
checksum = "9229cfe53dfd69f0609a49f65461bd93001ea1ef889cd5529dd176593f5338a1"
614
-
dependencies = [
615
-
"foldhash",
616
-
]
680
+
checksum = "e5274423e17b7c9fc20b6e7e208532f9b19825d82dfd615708b70edd83df41f1"
617
681
618
682
[[package]]
619
-
name = "hashlink"
620
-
version = "0.10.0"
683
+
name = "hashbrown"
684
+
version = "0.16.1"
621
685
source = "registry+https://github.com/rust-lang/crates.io-index"
622
-
checksum = "7382cf6263419f2d8df38c55d7da83da5c18aef87fc7a7fc1fb1e344edfe14c1"
623
-
dependencies = [
624
-
"hashbrown",
625
-
]
686
+
checksum = "841d1cc9bed7f9236f321df977030373f4a4163ae1a7dbfe1a51a2c1a51d9100"
626
687
627
688
[[package]]
628
689
name = "heck"
···
631
692
checksum = "2304e00983f87ffb38b55b444b5e3b60a884b5d30c0fca7d82fe33449bbe55ea"
632
693
633
694
[[package]]
695
+
name = "interval-heap"
696
+
version = "0.0.5"
697
+
source = "registry+https://github.com/rust-lang/crates.io-index"
698
+
checksum = "11274e5e8e89b8607cfedc2910b6626e998779b48a019151c7604d0adcb86ac6"
699
+
dependencies = [
700
+
"compare",
701
+
]
702
+
703
+
[[package]]
634
704
name = "io-uring"
635
705
version = "0.7.10"
636
706
source = "registry+https://github.com/rust-lang/crates.io-index"
···
730
800
checksum = "58f929b4d672ea937a23a1ab494143d968337a5f47e56d0815df1e0890ddf174"
731
801
732
802
[[package]]
733
-
name = "libsqlite3-sys"
734
-
version = "0.35.0"
735
-
source = "registry+https://github.com/rust-lang/crates.io-index"
736
-
checksum = "133c182a6a2c87864fe97778797e46c7e999672690dc9fa3ee8e241aa4a9c13f"
737
-
dependencies = [
738
-
"pkg-config",
739
-
"vcpkg",
740
-
]
741
-
742
-
[[package]]
743
803
name = "linux-raw-sys"
744
804
version = "0.11.0"
745
805
source = "registry+https://github.com/rust-lang/crates.io-index"
···
761
821
checksum = "34080505efa8e45a4b816c349525ebe327ceaa8559756f0356cba97ef3bf7432"
762
822
763
823
[[package]]
824
+
name = "lsm-tree"
825
+
version = "3.0.1"
826
+
source = "registry+https://github.com/rust-lang/crates.io-index"
827
+
checksum = "b875f1dfe14f557f805b167fb9b0fc54c5560c7a4bd6ae02535b2846f276a8cb"
828
+
dependencies = [
829
+
"byteorder-lite",
830
+
"byteview",
831
+
"crossbeam-skiplist",
832
+
"enum_dispatch",
833
+
"interval-heap",
834
+
"log",
835
+
"quick_cache",
836
+
"rustc-hash",
837
+
"self_cell",
838
+
"sfa",
839
+
"tempfile",
840
+
"varint-rs",
841
+
"xxhash-rust",
842
+
]
843
+
844
+
[[package]]
764
845
name = "match-lookup"
765
846
version = "0.1.1"
766
847
source = "registry+https://github.com/rust-lang/crates.io-index"
···
892
973
checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184"
893
974
894
975
[[package]]
895
-
name = "pkg-config"
896
-
version = "0.3.32"
897
-
source = "registry+https://github.com/rust-lang/crates.io-index"
898
-
checksum = "7edddbd0b52d732b21ad9a5fab5c704c14cd949e5e9a1ec5929a24fded1b904c"
899
-
900
-
[[package]]
901
976
name = "plotters"
902
977
version = "0.3.7"
903
978
source = "registry+https://github.com/rust-lang/crates.io-index"
···
950
1025
]
951
1026
952
1027
[[package]]
1028
+
name = "quick_cache"
1029
+
version = "0.6.18"
1030
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1031
+
checksum = "7ada44a88ef953a3294f6eb55d2007ba44646015e18613d2f213016379203ef3"
1032
+
dependencies = [
1033
+
"equivalent",
1034
+
"hashbrown 0.16.1",
1035
+
]
1036
+
1037
+
[[package]]
953
1038
name = "quote"
954
1039
version = "1.0.41"
955
1040
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1024
1109
1025
1110
[[package]]
1026
1111
name = "repo-stream"
1027
-
version = "0.1.1"
1112
+
version = "0.2.2"
1028
1113
dependencies = [
1029
1114
"bincode",
1030
1115
"clap",
1031
1116
"criterion",
1032
1117
"env_logger",
1118
+
"fjall",
1033
1119
"futures",
1034
1120
"futures-core",
1035
1121
"ipld-core",
1036
1122
"iroh-car",
1037
1123
"log",
1038
1124
"multibase",
1039
-
"rusqlite",
1040
1125
"serde",
1041
1126
"serde_bytes",
1042
1127
"serde_ipld_dagcbor",
···
1047
1132
]
1048
1133
1049
1134
[[package]]
1050
-
name = "rusqlite"
1051
-
version = "0.37.0"
1135
+
name = "rustc-demangle"
1136
+
version = "0.1.26"
1052
1137
source = "registry+https://github.com/rust-lang/crates.io-index"
1053
-
checksum = "165ca6e57b20e1351573e3729b958bc62f0e48025386970b6e4d29e7a7e71f3f"
1054
-
dependencies = [
1055
-
"bitflags",
1056
-
"fallible-iterator",
1057
-
"fallible-streaming-iterator",
1058
-
"hashlink",
1059
-
"libsqlite3-sys",
1060
-
"smallvec",
1061
-
]
1138
+
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
1062
1139
1063
1140
[[package]]
1064
-
name = "rustc-demangle"
1065
-
version = "0.1.26"
1141
+
name = "rustc-hash"
1142
+
version = "2.1.1"
1066
1143
source = "registry+https://github.com/rust-lang/crates.io-index"
1067
-
checksum = "56f7d92ca342cea22a06f2121d944b4fd82af56988c270852495420f961d4ace"
1144
+
checksum = "357703d41365b4b27c590e3ed91eabb1b663f07c4c084095e60cbed4362dff0d"
1068
1145
1069
1146
[[package]]
1070
1147
name = "rustix"
···
1105
1182
version = "1.2.0"
1106
1183
source = "registry+https://github.com/rust-lang/crates.io-index"
1107
1184
checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49"
1185
+
1186
+
[[package]]
1187
+
name = "self_cell"
1188
+
version = "1.2.2"
1189
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1190
+
checksum = "b12e76d157a900eb52e81bc6e9f3069344290341720e9178cde2407113ac8d89"
1108
1191
1109
1192
[[package]]
1110
1193
name = "serde"
···
1172
1255
]
1173
1256
1174
1257
[[package]]
1258
+
name = "sfa"
1259
+
version = "1.0.0"
1260
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1261
+
checksum = "a1296838937cab56cd6c4eeeb8718ec777383700c33f060e2869867bd01d1175"
1262
+
dependencies = [
1263
+
"byteorder-lite",
1264
+
"log",
1265
+
"xxhash-rust",
1266
+
]
1267
+
1268
+
[[package]]
1175
1269
name = "sha2"
1176
1270
version = "0.10.9"
1177
1271
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1211
1305
dependencies = [
1212
1306
"libc",
1213
1307
"windows-sys 0.59.0",
1308
+
]
1309
+
1310
+
[[package]]
1311
+
name = "spin"
1312
+
version = "0.9.8"
1313
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1314
+
checksum = "6980e8d7511241f8acf4aebddbb1ff938df5eebe98691418c4468d0b72a96a67"
1315
+
dependencies = [
1316
+
"lock_api",
1214
1317
]
1215
1318
1216
1319
[[package]]
···
1372
1475
checksum = "06abde3611657adf66d383f00b093d7faecc7fa57071cce2578660c9f1010821"
1373
1476
1374
1477
[[package]]
1375
-
name = "vcpkg"
1376
-
version = "0.2.15"
1478
+
name = "varint-rs"
1479
+
version = "2.2.0"
1377
1480
source = "registry+https://github.com/rust-lang/crates.io-index"
1378
-
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
1481
+
checksum = "8f54a172d0620933a27a4360d3db3e2ae0dd6cceae9730751a036bbf182c4b23"
1379
1482
1380
1483
[[package]]
1381
1484
name = "version_check"
···
1659
1762
version = "0.46.0"
1660
1763
source = "registry+https://github.com/rust-lang/crates.io-index"
1661
1764
checksum = "f17a85883d4e6d00e8a97c586de764dabcc06133f7f1d55dce5cdc070ad7fe59"
1765
+
1766
+
[[package]]
1767
+
name = "xxhash-rust"
1768
+
version = "0.8.15"
1769
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1770
+
checksum = "fdd20c5420375476fbd4394763288da7eb0cc0b8c11deed431a91562af7335d3"
1662
1771
1663
1772
[[package]]
1664
1773
name = "zerocopy"
+3
-3
Cargo.toml
+3
-3
Cargo.toml
···
1
1
[package]
2
2
name = "repo-stream"
3
-
version = "0.1.1"
3
+
version = "0.2.2"
4
4
edition = "2024"
5
5
license = "MIT OR Apache-2.0"
6
-
description = "Fast and robust atproto CAR file processing in rust"
6
+
description = "A robust CAR file -> MST walker for atproto"
7
7
repository = "https://tangled.org/@microcosm.blue/repo-stream"
8
8
9
9
[dependencies]
10
10
bincode = { version = "2.0.1", features = ["serde"] }
11
+
fjall = { version = "3.0.1", default-features = false }
11
12
futures = "0.3.31"
12
13
futures-core = "0.3.31"
13
14
ipld-core = { version = "0.4.2", features = ["serde"] }
14
15
iroh-car = "0.5.1"
15
16
log = "0.4.28"
16
17
multibase = "0.9.2"
17
-
rusqlite = "0.37.0"
18
18
serde = { version = "1.0.228", features = ["derive"] }
19
19
serde_bytes = "0.11.19"
20
20
serde_ipld_dagcbor = "0.6.4"
+4
benches/non-huge-cars.rs
+4
benches/non-huge-cars.rs
···
3
3
4
4
use criterion::{Criterion, criterion_group, criterion_main};
5
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
13
14
.build()
14
15
.expect("Creating runtime failed");
15
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
16
20
c.bench_function("tiny-car", |b| {
17
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
18
22
});
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+12
-16
examples/disk-read-file/main.rs
+12
-16
examples/disk-read-file/main.rs
···
4
4
5
5
extern crate repo_stream;
6
6
use clap::Parser;
7
-
use repo_stream::{DiskStore, Driver, process::noop};
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8
8
use std::path::PathBuf;
9
+
use std::time::Instant;
9
10
10
11
#[derive(Debug, Parser)]
11
12
struct Args {
···
26
27
let reader = tokio::fs::File::open(car).await?;
27
28
let reader = tokio::io::BufReader::new(reader);
28
29
29
-
// configure how much memory can be used before spilling to disk.
30
-
// real memory usage may differ somewhat.
31
-
let in_mem_limit = 10; // MiB
32
-
33
-
// configure how much memory sqlite is allowed to use when dumping to disk
34
-
let db_cache_mb = 32; // MiB
35
-
36
30
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
37
32
38
33
// in this example we only bother handling CARs that are too big for memory
39
34
// `noop` helper means: do no block processing, store the raw blocks
40
-
let driver = match Driver::load_car(reader, noop, in_mem_limit).await? {
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(32) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
41
40
Driver::Memory(_, _) => panic!("try this on a bigger car"),
42
41
Driver::Disk(big_stuff) => {
43
42
// we reach here if the repo was too big and needs to be spilled to
44
43
// disk to continue
45
44
46
45
// set up a disk store we can spill to
47
-
let disk_store = DiskStore::new(tmpfile.clone(), db_cache_mb).await?;
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
48
47
49
48
// do the spilling, get back a (similar) driver
50
49
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
51
50
52
51
// at this point you might want to fetch the account's signing key
53
52
// via the DID from the commit, and then verify the signature.
54
-
log::warn!("big's comit: {:?}", commit);
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
55
54
56
55
// pop the driver back out to get some code indentation relief
57
56
driver
···
81
80
}
82
81
}
83
82
84
-
log::info!("arrived! joining rx...");
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
85
84
86
-
// clean up the database. would be nice to do this in drop so it happens
87
-
// automatically, but some blocking work happens, so that's not allowed in
88
-
// async rust. ๐คทโโ๏ธ
89
-
join.await?.reset_store().await?;
85
+
join.await?;
90
86
91
87
log::info!("done. n={n} zeros={zeros}");
92
88
+9
-6
examples/read-file/main.rs
+9
-6
examples/read-file/main.rs
···
4
4
5
5
extern crate repo_stream;
6
6
use clap::Parser;
7
-
use repo_stream::Driver;
7
+
use repo_stream::{Driver, DriverBuilder};
8
8
use std::path::PathBuf;
9
9
10
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
23
23
let reader = tokio::fs::File::open(file).await?;
24
24
let reader = tokio::io::BufReader::new(reader);
25
25
26
-
let (commit, mut driver) =
27
-
match Driver::load_car(reader, |block| block.len(), 16 /* MiB */).await? {
28
-
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
29
-
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
30
-
};
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
31
34
32
35
log::info!("got commit: {commit:?}");
33
36
+67
-2
readme.md
+67
-2
readme.md
···
1
1
# repo-stream
2
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
}
54
+
};
55
+
println!("sum of size of all records: {total_size}");
56
+
Ok(())
57
+
}
58
+
```
59
+
60
+
more recent todo
61
+
62
+
- [ ] get an *emtpy* car for the test suite
63
+
- [x] implement a max size on disk limit
64
+
65
+
66
+
-----
67
+
68
+
older stuff (to clean up):
4
69
5
70
6
71
current car processing times (records processed into their length usize, phil's dev machine):
···
27
92
-> yeah the commit is returned from init
28
93
- [ ] spec compliance todos
29
94
- [x] assert that keys are ordered and fail if not
30
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
95
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
31
96
- [ ] performance todos
32
97
- [x] consume the serialized nodes into a mutable efficient format
33
98
- [ ] maybe customize the deserialize impl to do that directly?
+105
-85
src/disk.rs
+105
-85
src/disk.rs
···
5
5
to be the best behaved in terms of both on-disk space usage and memory usage.
6
6
7
7
```no_run
8
-
# use repo_stream::{DiskStore, DiskError};
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
9
# #[tokio::main]
10
10
# async fn main() -> Result<(), DiskError> {
11
-
let db_cache_size = 32; // MiB
12
-
let store = DiskStore::new("/some/path.db".into(), db_cache_size).await?;
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
13
15
# Ok(())
14
16
# }
15
17
```
16
18
*/
17
19
18
20
use crate::drive::DriveError;
19
-
use rusqlite::OptionalExtension;
21
+
use fjall::config::{CompressionPolicy, PinningPolicy, RestartIntervalPolicy};
22
+
use fjall::{CompressionType, Database, Error as FjallError, Keyspace, KeyspaceCreateOptions};
20
23
use std::path::PathBuf;
21
24
22
25
#[derive(Debug, thiserror::Error)]
···
26
29
/// (The wrapped err should probably be obscured to remove public-facing
27
30
/// sqlite bits)
28
31
#[error(transparent)]
29
-
DbError(#[from] rusqlite::Error),
32
+
DbError(#[from] FjallError),
30
33
/// A tokio blocking task failed to join
31
34
#[error("Failed to join a tokio blocking task: {0}")]
32
35
JoinError(#[from] tokio::task::JoinError),
33
-
#[error("this error was replaced, seeing this is a bug.")]
34
-
#[doc(hidden)]
35
-
Stolen,
36
+
/// The total size of stored blocks exceeded the allowed size
37
+
///
38
+
/// If you need to process *really* big CARs, you can configure a higher
39
+
/// limit.
40
+
#[error("Maximum disk size reached")]
41
+
MaxSizeExceeded,
36
42
}
37
43
38
-
impl DiskError {
39
-
/// hack for ownership challenges with the disk driver
40
-
pub(crate) fn steal(&mut self) -> Self {
41
-
let mut swapped = DiskError::Stolen;
42
-
std::mem::swap(self, &mut swapped);
43
-
swapped
44
+
/// Builder-style disk store setup
45
+
#[derive(Debug, Clone)]
46
+
pub struct DiskBuilder {
47
+
/// Database in-memory cache allowance
48
+
///
49
+
/// Default: 32 MiB
50
+
pub cache_size_mb: usize,
51
+
/// Database stored block size limit
52
+
///
53
+
/// Default: 10 GiB
54
+
///
55
+
/// Note: actual size on disk may be more, but should approximately scale
56
+
/// with this limit
57
+
pub max_stored_mb: usize,
58
+
}
59
+
60
+
impl Default for DiskBuilder {
61
+
fn default() -> Self {
62
+
Self {
63
+
cache_size_mb: 64,
64
+
max_stored_mb: 10 * 1024, // 10 GiB
65
+
}
66
+
}
67
+
}
68
+
69
+
impl DiskBuilder {
70
+
/// Begin configuring the storage with defaults
71
+
pub fn new() -> Self {
72
+
Default::default()
73
+
}
74
+
/// Set the in-memory cache allowance for the database
75
+
///
76
+
/// Default: 64 MiB
77
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
78
+
self.cache_size_mb = size;
79
+
self
80
+
}
81
+
/// Set the approximate stored block size limit
82
+
///
83
+
/// Default: 10 GiB
84
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
85
+
self.max_stored_mb = max;
86
+
self
87
+
}
88
+
/// Open and initialize the actual disk storage
89
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
90
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
44
91
}
45
92
}
46
93
47
94
/// On-disk block storage
48
95
pub struct DiskStore {
49
-
conn: rusqlite::Connection,
96
+
#[allow(unused)]
97
+
db: Database,
98
+
partition: Keyspace,
99
+
max_stored: usize,
100
+
stored: usize,
50
101
}
51
102
52
103
impl DiskStore {
53
104
/// Initialize a new disk store
54
-
pub async fn new(path: PathBuf, cache_mb: usize) -> Result<Self, DiskError> {
55
-
let conn = tokio::task::spawn_blocking(move || {
56
-
let conn = rusqlite::Connection::open(path)?;
57
-
58
-
let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
59
-
60
-
// conn.pragma_update(None, "journal_mode", "OFF")?;
61
-
// conn.pragma_update(None, "journal_mode", "MEMORY")?;
62
-
conn.pragma_update(None, "journal_mode", "WAL")?;
63
-
// conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
64
-
conn.pragma_update(None, "synchronous", "OFF")?;
65
-
conn.pragma_update(
66
-
None,
67
-
"cache_size",
68
-
(cache_mb as i64 * sqlite_one_mb).to_string(),
69
-
)?;
70
-
Self::reset_tables(&conn)?;
105
+
pub async fn new(
106
+
path: PathBuf,
107
+
cache_mb: usize,
108
+
max_stored_mb: usize,
109
+
) -> Result<Self, DiskError> {
110
+
let max_stored = max_stored_mb * 2_usize.pow(20);
111
+
let (db, partition) = tokio::task::spawn_blocking(move || {
112
+
let db = Database::builder(path)
113
+
// .manual_journal_persist(true)
114
+
// .flush_workers(1)
115
+
// .compaction_workers(1)
116
+
.journal_compression(CompressionType::None)
117
+
.cache_size(cache_mb as u64 * 2_u64.pow(20))
118
+
.temporary(true)
119
+
.open()?;
120
+
let opts = KeyspaceCreateOptions::default()
121
+
.data_block_restart_interval_policy(RestartIntervalPolicy::all(8))
122
+
.filter_block_pinning_policy(PinningPolicy::disabled())
123
+
.expect_point_read_hits(true)
124
+
.data_block_compression_policy(CompressionPolicy::disabled())
125
+
.manual_journal_persist(true)
126
+
.max_memtable_size(32 * 2_u64.pow(20));
127
+
let partition = db.keyspace("z", || opts)?;
71
128
72
-
Ok::<_, DiskError>(conn)
129
+
Ok::<_, DiskError>((db, partition))
73
130
})
74
131
.await??;
75
132
76
-
Ok(Self { conn })
77
-
}
78
-
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
79
-
let tx = self.conn.transaction()?;
80
-
Ok(SqliteWriter { tx })
81
-
}
82
-
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
83
-
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
84
-
Ok(SqliteReader { select_stmt })
85
-
}
86
-
/// Drop and recreate the kv table
87
-
pub async fn reset(self) -> Result<Self, DiskError> {
88
-
tokio::task::spawn_blocking(move || {
89
-
Self::reset_tables(&self.conn)?;
90
-
Ok(self)
133
+
Ok(Self {
134
+
db,
135
+
partition,
136
+
max_stored,
137
+
stored: 0,
91
138
})
92
-
.await?
93
-
}
94
-
fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
95
-
conn.execute("DROP TABLE IF EXISTS blocks", ())?;
96
-
conn.execute(
97
-
"CREATE TABLE blocks (
98
-
key BLOB PRIMARY KEY NOT NULL,
99
-
val BLOB NOT NULL
100
-
) WITHOUT ROWID",
101
-
(),
102
-
)?;
103
-
Ok(())
104
139
}
105
-
}
106
140
107
-
pub(crate) struct SqliteWriter<'conn> {
108
-
tx: rusqlite::Transaction<'conn>,
109
-
}
110
-
111
-
impl SqliteWriter<'_> {
112
141
pub(crate) fn put_many(
113
142
&mut self,
114
143
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
115
144
) -> Result<(), DriveError> {
116
-
let mut insert_stmt = self
117
-
.tx
118
-
.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
119
-
.map_err(DiskError::DbError)?;
145
+
let mut batch = self.db.batch();
120
146
for pair in kv {
121
147
let (k, v) = pair?;
122
-
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
148
+
self.stored += v.len();
149
+
if self.stored > self.max_stored {
150
+
return Err(DiskError::MaxSizeExceeded.into());
151
+
}
152
+
batch.insert(&self.partition, k, v);
123
153
}
124
-
Ok(())
125
-
}
126
-
pub fn commit(self) -> Result<(), DiskError> {
127
-
self.tx.commit()?;
154
+
batch.commit().map_err(DiskError::DbError)?;
128
155
Ok(())
129
156
}
130
-
}
131
157
132
-
pub(crate) struct SqliteReader<'conn> {
133
-
select_stmt: rusqlite::Statement<'conn>,
134
-
}
135
-
136
-
impl SqliteReader<'_> {
137
-
pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
138
-
self.select_stmt
139
-
.query_one((&key,), |row| row.get(0))
140
-
.optional()
158
+
#[inline]
159
+
pub(crate) fn get(&mut self, key: &[u8]) -> Result<Option<fjall::Slice>, FjallError> {
160
+
self.partition.get(key)
141
161
}
142
162
}
+82
-58
src/drive.rs
+82
-58
src/drive.rs
···
115
115
Disk(NeedDisk<R, T>),
116
116
}
117
117
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
123
+
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
+
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
+
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
163
+
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
172
+
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
+
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
188
+
}
189
+
118
190
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
119
191
/// Begin processing an atproto MST from a CAR file
120
192
///
121
193
/// Blocks will be loaded, processed, and buffered in memory. If the entire
122
-
/// processed size is under the `max_size_mb` limit, a `Driver::Memory` will
123
-
/// be returned along with a `Commit` ready for validation.
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
124
196
///
125
-
/// If the `max_size_mb` limit is reached before loading all blocks, the
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
126
198
/// partial state will be returned as `Driver::Disk(needed)`, which can be
127
199
/// resumed by providing a `SqliteStorage` for on-disk block storage.
128
200
pub async fn load_car(
129
201
reader: R,
130
202
process: fn(Vec<u8>) -> T,
131
-
max_size_mb: usize,
203
+
mem_limit_mb: usize,
132
204
) -> Result<Driver<R, T>, DriveError> {
133
-
let max_size = max_size_mb * 2_usize.pow(20);
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
134
206
let mut mem_blocks = HashMap::new();
135
207
136
208
let mut car = CarReader::new(reader).await?;
···
263
335
// move store in and back out so we can manage lifetimes
264
336
// dump mem blocks into the store
265
337
store = tokio::task::spawn(async move {
266
-
let mut writer = store.get_writer()?;
267
-
268
338
let kvs = self
269
339
.mem_blocks
270
340
.into_iter()
271
341
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
272
342
273
-
writer.put_many(kvs)?;
274
-
writer.commit()?;
343
+
store.put_many(kvs)?;
275
344
Ok::<_, DriveError>(store)
276
345
})
277
346
.await??;
278
347
279
-
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
348
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
280
349
281
350
let store_worker = tokio::task::spawn_blocking(move || {
282
-
let mut writer = store.get_writer()?;
283
-
284
351
while let Some(chunk) = rx.blocking_recv() {
285
352
let kvs = chunk
286
353
.into_iter()
287
354
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
288
-
writer.put_many(kvs)?;
355
+
store.put_many(kvs)?;
289
356
}
290
-
291
-
writer.commit()?;
292
357
Ok::<_, DriveError>(store)
293
358
}); // await later
294
359
···
381
446
/// println!("{rkey}: size={}", record.len());
382
447
/// }
383
448
/// }
384
-
/// let store = disk_driver.reset_store().await?;
385
449
/// # Ok(())
386
450
/// # }
387
451
/// ```
···
396
460
// comes out again.
397
461
let (state, res) = tokio::task::spawn_blocking(
398
462
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
399
-
let mut reader_res = state.store.get_reader();
400
-
let reader: &mut _ = match reader_res {
401
-
Ok(ref mut r) => r,
402
-
Err(ref mut e) => {
403
-
// unfortunately we can't return the error directly because
404
-
// (for some reason) it's attached to the lifetime of the
405
-
// reader?
406
-
// hack a mem::swap so we can get it out :/
407
-
let e_swapped = e.steal();
408
-
// the pain: `state` *has to* outlive the reader
409
-
drop(reader_res);
410
-
return (state, Err(e_swapped.into()));
411
-
}
412
-
};
413
-
414
463
let mut out = Vec::with_capacity(n);
415
464
416
465
for _ in 0..n {
417
466
// walk as far as we can until we run out of blocks or find a record
418
-
let step = match state.walker.disk_step(reader, process) {
467
+
let step = match state.walker.disk_step(&mut state.store, process) {
419
468
Ok(s) => s,
420
469
Err(e) => {
421
-
// the pain: `state` *has to* outlive the reader
422
-
drop(reader_res);
423
470
return (state, Err(e.into()));
424
471
}
425
472
};
426
473
match step {
427
474
Step::Missing(cid) => {
428
-
// the pain: `state` *has to* outlive the reader
429
-
drop(reader_res);
430
475
return (state, Err(DriveError::MissingBlock(cid)));
431
476
}
432
477
Step::Finish => break,
433
478
Step::Found { rkey, data } => out.push((rkey, data)),
434
479
};
435
480
}
436
-
437
-
// `state` *has to* outlive the reader
438
-
drop(reader_res);
439
481
440
482
(state, Ok::<_, DriveError>(out))
441
483
},
···
460
502
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
461
503
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
462
504
let BigState { store, walker } = self.state.as_mut().expect("valid state");
463
-
let mut reader = match store.get_reader() {
464
-
Ok(r) => r,
465
-
Err(e) => return tx.blocking_send(Err(e.into())),
466
-
};
467
505
468
506
loop {
469
507
let mut out: BlockChunk<T> = Vec::with_capacity(n);
···
471
509
for _ in 0..n {
472
510
// walk as far as we can until we run out of blocks or find a record
473
511
474
-
let step = match walker.disk_step(&mut reader, self.process) {
512
+
let step = match walker.disk_step(store, self.process) {
475
513
Ok(s) => s,
476
514
Err(e) => return tx.blocking_send(Err(e.into())),
477
515
};
···
520
558
/// }
521
559
///
522
560
/// }
523
-
/// let store = join.await?.reset_store().await?;
524
561
/// # Ok(())
525
562
/// # }
526
563
/// ```
···
542
579
});
543
580
544
581
(rx, chan_task)
545
-
}
546
-
547
-
/// Reset the disk storage so it can be reused. You must call this.
548
-
///
549
-
/// Ideally we'd put this in an `impl Drop`, but since it makes blocking
550
-
/// calls, that would be risky in an async context. For now you just have to
551
-
/// carefully make sure you call it.
552
-
///
553
-
/// The sqlite store is returned, so it can be reused for another
554
-
/// `DiskDriver`.
555
-
pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
556
-
let BigState { store, .. } = self.state.take().expect("valid state");
557
-
Ok(store.reset().await?)
558
582
}
559
583
}
+10
-11
src/lib.rs
+10
-11
src/lib.rs
···
18
18
`iroh_car` additionally applies a block size limit of `2MiB`.
19
19
20
20
```
21
-
use repo_stream::{Driver, DiskStore};
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
22
23
23
# #[tokio::main]
24
24
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
25
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
26
let mut total_size = 0;
27
-
let process = |rec: Vec<u8>| rec.len(); // block processing: just extract the size
28
-
let in_mem_limit = 10; /* MiB */
29
-
let db_cache_size = 32; /* MiB */
30
27
31
-
match Driver::load_car(reader, process, in_mem_limit).await? {
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
32
34
33
35
// if all blocks fit within memory
34
36
Driver::Memory(_commit, mut driver) => {
···
42
44
// if the CAR was too big for in-memory processing
43
45
Driver::Disk(paused) => {
44
46
// set up a disk store we can spill to
45
-
let store = DiskStore::new("some/path.db".into(), db_cache_size).await?;
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
46
48
// do the spilling, get back a (similar) driver
47
49
let (_commit, mut driver) = paused.finish_loading(store).await?;
48
50
···
51
53
total_size += size;
52
54
}
53
55
}
54
-
55
-
// clean up the disk store (drop tables etc)
56
-
driver.reset_store().await?;
57
56
}
58
57
};
59
58
println!("sum of size of all records: {total_size}");
···
79
78
pub mod drive;
80
79
pub mod process;
81
80
82
-
pub use disk::{DiskError, DiskStore};
83
-
pub use drive::{DriveError, Driver};
81
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
82
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
84
83
pub use mst::Commit;
85
84
pub use process::Processable;
+27
src/process.rs
+27
src/process.rs
···
11
11
approximate total off-stack size of the type. (the on-stack size will be added
12
12
automatically via `std::mem::get_size`).
13
13
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
14
20
Here's a silly processing function that just collects 'eyy's found in the raw
15
21
record bytes
16
22
···
71
77
}
72
78
}
73
79
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
74
86
impl<Item: Sized + Processable> Processable for Vec<Item> {
75
87
fn get_size(&self) -> usize {
76
88
let slot_size = std::mem::size_of::<Item>();
···
79
91
direct_size + items_referenced_size
80
92
}
81
93
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+11
-208
src/walk.rs
+11
-208
src/walk.rs
···
1
1
//! Depth-first MST traversal
2
2
3
-
use crate::disk::SqliteReader;
3
+
use crate::disk::DiskStore;
4
4
use crate::drive::{DecodeError, MaybeProcessedBlock};
5
5
use crate::mst::Node;
6
6
use crate::process::Processable;
···
19
19
#[error("Action node error: {0}")]
20
20
MstError(#[from] MstError),
21
21
#[error("storage error: {0}")]
22
-
StorageError(#[from] rusqlite::Error),
22
+
StorageError(#[from] fjall::Error),
23
23
#[error("Decode error: {0}")]
24
24
DecodeError(#[from] DecodeError),
25
25
}
···
87
87
}
88
88
89
89
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
-
// empty nodes are not allowed in the MST
91
-
// ...except for a single one for empty MST, but we wouldn't be pushing that
90
+
// empty nodes are not allowed in the MST except in an empty MST
92
91
if node.is_empty() {
93
-
return Err(MstError::EmptyNode);
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
94
97
}
95
98
96
99
let mut entries = Vec::with_capacity(node.entries.len());
···
236
239
/// blocking!!!!!!
237
240
pub fn disk_step<T: Processable>(
238
241
&mut self,
239
-
reader: &mut SqliteReader,
242
+
reader: &mut DiskStore,
240
243
process: impl Fn(Vec<u8>) -> T,
241
244
) -> Result<Step<T>, WalkError> {
242
245
loop {
···
249
252
&mut Need::Node { depth, cid } => {
250
253
let cid_bytes = cid.to_bytes();
251
254
log::trace!("need node {cid:?}");
252
-
let Some(block_bytes) = reader.get(cid_bytes)? else {
255
+
let Some(block_bytes) = reader.get(&cid_bytes)? else {
253
256
log::trace!("node not found, resting");
254
257
return Ok(Step::Missing(cid));
255
258
};
···
271
274
Need::Record { rkey, cid } => {
272
275
log::trace!("need record {cid:?}");
273
276
let cid_bytes = cid.to_bytes();
274
-
let Some(data_bytes) = reader.get(cid_bytes)? else {
277
+
let Some(data_bytes) = reader.get(&cid_bytes)? else {
275
278
log::trace!("record block not found, resting");
276
279
return Ok(Step::Missing(*cid));
277
280
};
···
304
307
#[cfg(test)]
305
308
mod test {
306
309
use super::*;
307
-
// use crate::mst::Entry;
308
310
309
311
fn cid1() -> Cid {
310
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
311
313
.parse()
312
314
.unwrap()
313
315
}
314
-
// fn cid2() -> Cid {
315
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
316
-
// .parse()
317
-
// .unwrap()
318
-
// }
319
-
// fn cid3() -> Cid {
320
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
321
-
// .parse()
322
-
// .unwrap()
323
-
// }
324
-
// fn cid4() -> Cid {
325
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
326
-
// .parse()
327
-
// .unwrap()
328
-
// }
329
-
// fn cid5() -> Cid {
330
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
331
-
// .parse()
332
-
// .unwrap()
333
-
// }
334
-
// fn cid6() -> Cid {
335
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
336
-
// .parse()
337
-
// .unwrap()
338
-
// }
339
-
// fn cid7() -> Cid {
340
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
341
-
// .parse()
342
-
// .unwrap()
343
-
// }
344
-
// fn cid8() -> Cid {
345
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
346
-
// .parse()
347
-
// .unwrap()
348
-
// }
349
-
// fn cid9() -> Cid {
350
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
351
-
// .parse()
352
-
// .unwrap()
353
-
// }
354
316
355
317
#[test]
356
318
fn test_depth_spec_0() {
···
441
403
.as_ref()
442
404
);
443
405
}
444
-
445
-
// #[test]
446
-
// fn test_needs_from_node_just_one_record() {
447
-
// let node = Node {
448
-
// left: None,
449
-
// entries: vec![Entry {
450
-
// keysuffix: "asdf".into(),
451
-
// prefix_len: 0,
452
-
// value: cid1(),
453
-
// tree: None,
454
-
// }],
455
-
// };
456
-
// assert_eq!(
457
-
// needs_from_node(node).unwrap(),
458
-
// vec![Need::Record {
459
-
// rkey: "asdf".into(),
460
-
// cid: cid1(),
461
-
// },]
462
-
// );
463
-
// }
464
-
465
-
// #[test]
466
-
// fn test_needs_from_node_two_records() {
467
-
// let node = Node {
468
-
// left: None,
469
-
// entries: vec![
470
-
// Entry {
471
-
// keysuffix: "asdf".into(),
472
-
// prefix_len: 0,
473
-
// value: cid1(),
474
-
// tree: None,
475
-
// },
476
-
// Entry {
477
-
// keysuffix: "gh".into(),
478
-
// prefix_len: 2,
479
-
// value: cid2(),
480
-
// tree: None,
481
-
// },
482
-
// ],
483
-
// };
484
-
// assert_eq!(
485
-
// needs_from_node(node).unwrap(),
486
-
// vec![
487
-
// Need::Record {
488
-
// rkey: "asdf".into(),
489
-
// cid: cid1(),
490
-
// },
491
-
// Need::Record {
492
-
// rkey: "asgh".into(),
493
-
// cid: cid2(),
494
-
// },
495
-
// ]
496
-
// );
497
-
// }
498
-
499
-
// #[test]
500
-
// fn test_needs_from_node_with_both() {
501
-
// let node = Node {
502
-
// left: None,
503
-
// entries: vec![Entry {
504
-
// keysuffix: "asdf".into(),
505
-
// prefix_len: 0,
506
-
// value: cid1(),
507
-
// tree: Some(cid2()),
508
-
// }],
509
-
// };
510
-
// assert_eq!(
511
-
// needs_from_node(node).unwrap(),
512
-
// vec![
513
-
// Need::Record {
514
-
// rkey: "asdf".into(),
515
-
// cid: cid1(),
516
-
// },
517
-
// Need::Node(cid2()),
518
-
// ]
519
-
// );
520
-
// }
521
-
522
-
// #[test]
523
-
// fn test_needs_from_node_left_and_record() {
524
-
// let node = Node {
525
-
// left: Some(cid1()),
526
-
// entries: vec![Entry {
527
-
// keysuffix: "asdf".into(),
528
-
// prefix_len: 0,
529
-
// value: cid2(),
530
-
// tree: None,
531
-
// }],
532
-
// };
533
-
// assert_eq!(
534
-
// needs_from_node(node).unwrap(),
535
-
// vec![
536
-
// Need::Node(cid1()),
537
-
// Need::Record {
538
-
// rkey: "asdf".into(),
539
-
// cid: cid2(),
540
-
// },
541
-
// ]
542
-
// );
543
-
// }
544
-
545
-
// #[test]
546
-
// fn test_needs_from_full_node() {
547
-
// let node = Node {
548
-
// left: Some(cid1()),
549
-
// entries: vec![
550
-
// Entry {
551
-
// keysuffix: "asdf".into(),
552
-
// prefix_len: 0,
553
-
// value: cid2(),
554
-
// tree: Some(cid3()),
555
-
// },
556
-
// Entry {
557
-
// keysuffix: "ghi".into(),
558
-
// prefix_len: 1,
559
-
// value: cid4(),
560
-
// tree: Some(cid5()),
561
-
// },
562
-
// Entry {
563
-
// keysuffix: "jkl".into(),
564
-
// prefix_len: 2,
565
-
// value: cid6(),
566
-
// tree: Some(cid7()),
567
-
// },
568
-
// Entry {
569
-
// keysuffix: "mno".into(),
570
-
// prefix_len: 4,
571
-
// value: cid8(),
572
-
// tree: Some(cid9()),
573
-
// },
574
-
// ],
575
-
// };
576
-
// assert_eq!(
577
-
// needs_from_node(node).unwrap(),
578
-
// vec![
579
-
// Need::Node(cid1()),
580
-
// Need::Record {
581
-
// rkey: "asdf".into(),
582
-
// cid: cid2(),
583
-
// },
584
-
// Need::Node(cid3()),
585
-
// Need::Record {
586
-
// rkey: "aghi".into(),
587
-
// cid: cid4(),
588
-
// },
589
-
// Need::Node(cid5()),
590
-
// Need::Record {
591
-
// rkey: "agjkl".into(),
592
-
// cid: cid6(),
593
-
// },
594
-
// Need::Node(cid7()),
595
-
// Need::Record {
596
-
// rkey: "agjkmno".into(),
597
-
// cid: cid8(),
598
-
// },
599
-
// Need::Node(cid9()),
600
-
// ]
601
-
// );
602
-
// }
603
406
}
+16
-5
tests/non-huge-cars.rs
+16
-5
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
2
use repo_stream::Driver;
3
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
4
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
5
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
6
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
7
8
8
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
9
15
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
10
16
.await
11
17
.unwrap()
···
33
39
34
40
assert_eq!(records, expected_records);
35
41
assert_eq!(sum, expected_sum);
36
-
assert!(found_bsky_profile);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
37
48
}
38
49
39
50
#[tokio::test]
40
51
async fn test_tiny_car() {
41
-
test_car(TINY_CAR, 8, 2071).await
52
+
test_car(TINY_CAR, 8, 2071, true).await
42
53
}
43
54
44
55
#[tokio::test]
45
56
async fn test_little_car() {
46
-
test_car(LITTLE_CAR, 278, 246960).await
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
47
58
}
48
59
49
60
#[tokio::test]
50
61
async fn test_midsize_car() {
51
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
52
63
}