+73
-11
Cargo.lock
+73
-11
Cargo.lock
···
152
checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394"
153
154
[[package]]
155
name = "bumpalo"
156
version = "3.19.0"
157
source = "registry+https://github.com/rust-lang/crates.io-index"
···
287
]
288
289
[[package]]
290
name = "criterion"
291
version = "0.7.0"
292
source = "registry+https://github.com/rust-lang/crates.io-index"
···
352
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
353
354
[[package]]
355
name = "data-encoding"
356
version = "2.9.0"
357
source = "registry+https://github.com/rust-lang/crates.io-index"
···
375
dependencies = [
376
"data-encoding",
377
"syn 2.0.106",
378
]
379
380
[[package]]
···
530
]
531
532
[[package]]
533
name = "getrandom"
534
version = "0.3.3"
535
source = "registry+https://github.com/rust-lang/crates.io-index"
···
937
]
938
939
[[package]]
940
-
name = "redb"
941
-
version = "3.1.0"
942
-
source = "registry+https://github.com/rust-lang/crates.io-index"
943
-
checksum = "ae323eb086579a3769daa2c753bb96deb95993c534711e0dbe881b5192906a06"
944
-
dependencies = [
945
-
"libc",
946
-
]
947
-
948
-
[[package]]
949
name = "redox_syscall"
950
version = "0.5.18"
951
source = "registry+https://github.com/rust-lang/crates.io-index"
···
985
986
[[package]]
987
name = "repo-stream"
988
-
version = "0.1.1"
989
dependencies = [
990
"bincode",
991
"clap",
···
997
"iroh-car",
998
"log",
999
"multibase",
1000
-
"redb",
1001
"rusqlite",
1002
"serde",
1003
"serde_bytes",
1004
"serde_ipld_dagcbor",
1005
"tempfile",
1006
"thiserror 2.0.17",
1007
"tokio",
···
1133
]
1134
1135
[[package]]
1136
name = "signal-hook-registry"
1137
version = "1.4.6"
1138
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1286
]
1287
1288
[[package]]
1289
name = "unicode-ident"
1290
version = "1.0.19"
1291
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1320
version = "0.2.15"
1321
source = "registry+https://github.com/rust-lang/crates.io-index"
1322
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
1323
1324
[[package]]
1325
name = "virtue"
···
152
checksum = "2261d10cca569e4643e526d8dc2e62e433cc8aba21ab764233731f8d369bf394"
153
154
[[package]]
155
+
name = "block-buffer"
156
+
version = "0.10.4"
157
+
source = "registry+https://github.com/rust-lang/crates.io-index"
158
+
checksum = "3078c7629b62d3f0439517fa394996acacc5cbc91c5a20d8c658e77abd503a71"
159
+
dependencies = [
160
+
"generic-array",
161
+
]
162
+
163
+
[[package]]
164
name = "bumpalo"
165
version = "3.19.0"
166
source = "registry+https://github.com/rust-lang/crates.io-index"
···
296
]
297
298
[[package]]
299
+
name = "cpufeatures"
300
+
version = "0.2.17"
301
+
source = "registry+https://github.com/rust-lang/crates.io-index"
302
+
checksum = "59ed5838eebb26a2bb2e58f6d5b5316989ae9d08bab10e0e6d103e656d1b0280"
303
+
dependencies = [
304
+
"libc",
305
+
]
306
+
307
+
[[package]]
308
name = "criterion"
309
version = "0.7.0"
310
source = "registry+https://github.com/rust-lang/crates.io-index"
···
370
checksum = "460fbee9c2c2f33933d720630a6a0bac33ba7053db5344fac858d4b8952d77d5"
371
372
[[package]]
373
+
name = "crypto-common"
374
+
version = "0.1.6"
375
+
source = "registry+https://github.com/rust-lang/crates.io-index"
376
+
checksum = "1bfb12502f3fc46cca1bb51ac28df9d618d813cdc3d2f25b9fe775a34af26bb3"
377
+
dependencies = [
378
+
"generic-array",
379
+
"typenum",
380
+
]
381
+
382
+
[[package]]
383
name = "data-encoding"
384
version = "2.9.0"
385
source = "registry+https://github.com/rust-lang/crates.io-index"
···
403
dependencies = [
404
"data-encoding",
405
"syn 2.0.106",
406
+
]
407
+
408
+
[[package]]
409
+
name = "digest"
410
+
version = "0.10.7"
411
+
source = "registry+https://github.com/rust-lang/crates.io-index"
412
+
checksum = "9ed9a281f7bc9b7576e61468ba615a66a5c8cfdff42420a70aa82701a3b1e292"
413
+
dependencies = [
414
+
"block-buffer",
415
+
"crypto-common",
416
]
417
418
[[package]]
···
568
]
569
570
[[package]]
571
+
name = "generic-array"
572
+
version = "0.14.9"
573
+
source = "registry+https://github.com/rust-lang/crates.io-index"
574
+
checksum = "4bb6743198531e02858aeaea5398fcc883e71851fcbcb5a2f773e2fb6cb1edf2"
575
+
dependencies = [
576
+
"typenum",
577
+
"version_check",
578
+
]
579
+
580
+
[[package]]
581
name = "getrandom"
582
version = "0.3.3"
583
source = "registry+https://github.com/rust-lang/crates.io-index"
···
985
]
986
987
[[package]]
988
name = "redox_syscall"
989
version = "0.5.18"
990
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1024
1025
[[package]]
1026
name = "repo-stream"
1027
+
version = "0.2.2"
1028
dependencies = [
1029
"bincode",
1030
"clap",
···
1036
"iroh-car",
1037
"log",
1038
"multibase",
1039
"rusqlite",
1040
"serde",
1041
"serde_bytes",
1042
"serde_ipld_dagcbor",
1043
+
"sha2",
1044
"tempfile",
1045
"thiserror 2.0.17",
1046
"tokio",
···
1172
]
1173
1174
[[package]]
1175
+
name = "sha2"
1176
+
version = "0.10.9"
1177
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1178
+
checksum = "a7507d819769d01a365ab707794a4084392c824f54a7a6a7862f8c3d0892b283"
1179
+
dependencies = [
1180
+
"cfg-if",
1181
+
"cpufeatures",
1182
+
"digest",
1183
+
]
1184
+
1185
+
[[package]]
1186
name = "signal-hook-registry"
1187
version = "1.4.6"
1188
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1336
]
1337
1338
[[package]]
1339
+
name = "typenum"
1340
+
version = "1.19.0"
1341
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1342
+
checksum = "562d481066bde0658276a35467c4af00bdc6ee726305698a55b86e61d7ad82bb"
1343
+
1344
+
[[package]]
1345
name = "unicode-ident"
1346
version = "1.0.19"
1347
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1376
version = "0.2.15"
1377
source = "registry+https://github.com/rust-lang/crates.io-index"
1378
checksum = "accd4ea62f7bb7a82fe23066fb0957d48ef677f6eeb8215f372f52e48bb32426"
1379
+
1380
+
[[package]]
1381
+
name = "version_check"
1382
+
version = "0.9.5"
1383
+
source = "registry+https://github.com/rust-lang/crates.io-index"
1384
+
checksum = "0b928f33d975fc6ad9f86c8f283853ad26bdd5b10b7f1542aa2fa15e2289105a"
1385
1386
[[package]]
1387
name = "virtue"
+7
-4
Cargo.toml
+7
-4
Cargo.toml
···
1
[package]
2
name = "repo-stream"
3
-
version = "0.1.1"
4
edition = "2024"
5
license = "MIT OR Apache-2.0"
6
-
description = "Fast and robust atproto CAR file processing in rust"
7
repository = "https://tangled.org/@microcosm.blue/repo-stream"
8
9
[dependencies]
···
14
iroh-car = "0.5.1"
15
log = "0.4.28"
16
multibase = "0.9.2"
17
-
redb = "3.1.0"
18
rusqlite = "0.37.0"
19
serde = { version = "1.0.228", features = ["derive"] }
20
serde_bytes = "0.11.19"
21
serde_ipld_dagcbor = "0.6.4"
22
thiserror = "2.0.17"
23
-
tokio = "1.47.1"
24
25
[dev-dependencies]
26
clap = { version = "4.5.48", features = ["derive"] }
···
33
[profile.profiling]
34
inherits = "release"
35
debug = true
36
37
[[bench]]
38
name = "non-huge-cars"
···
1
[package]
2
name = "repo-stream"
3
+
version = "0.2.2"
4
edition = "2024"
5
license = "MIT OR Apache-2.0"
6
+
description = "A robust CAR file -> MST walker for atproto"
7
repository = "https://tangled.org/@microcosm.blue/repo-stream"
8
9
[dependencies]
···
14
iroh-car = "0.5.1"
15
log = "0.4.28"
16
multibase = "0.9.2"
17
rusqlite = "0.37.0"
18
serde = { version = "1.0.228", features = ["derive"] }
19
serde_bytes = "0.11.19"
20
serde_ipld_dagcbor = "0.6.4"
21
+
sha2 = "0.10.9"
22
thiserror = "2.0.17"
23
+
tokio = { version = "1.47.1", features = ["rt", "sync"] }
24
25
[dev-dependencies]
26
clap = { version = "4.5.48", features = ["derive"] }
···
33
[profile.profiling]
34
inherits = "release"
35
debug = true
36
+
37
+
# [profile.release]
38
+
# debug = true
39
40
[[bench]]
41
name = "non-huge-cars"
+12
-21
benches/huge-car.rs
+12
-21
benches/huge-car.rs
···
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
5
use std::path::{Path, PathBuf};
6
7
use criterion::{Criterion, criterion_group, criterion_main};
···
20
});
21
}
22
23
-
async fn drive_car(filename: impl AsRef<Path>) {
24
let reader = tokio::fs::File::open(filename).await.unwrap();
25
let reader = tokio::io::BufReader::new(reader);
26
-
let reader = CarReader::new(reader).await.unwrap();
27
28
-
let root = reader
29
-
.header()
30
-
.roots()
31
-
.first()
32
-
.ok_or("missing root")
33
.unwrap()
34
-
.clone();
35
-
36
-
let stream = std::pin::pin!(reader.stream());
37
-
38
-
let (_commit, v) =
39
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
40
-
.await
41
-
.unwrap();
42
-
let mut record_stream = std::pin::pin!(v.stream());
43
44
-
while let Some(_) = record_stream.try_next().await.unwrap() {
45
-
// just here for the drive
46
}
47
}
48
49
criterion_group!(benches, criterion_benchmark);
···
1
extern crate repo_stream;
2
+
use repo_stream::Driver;
3
use std::path::{Path, PathBuf};
4
5
use criterion::{Criterion, criterion_group, criterion_main};
···
18
});
19
}
20
21
+
async fn drive_car(filename: impl AsRef<Path>) -> usize {
22
let reader = tokio::fs::File::open(filename).await.unwrap();
23
let reader = tokio::io::BufReader::new(reader);
24
25
+
let mut driver = match Driver::load_car(reader, |block| block.len(), 1024)
26
+
.await
27
.unwrap()
28
+
{
29
+
Driver::Memory(_, mem_driver) => mem_driver,
30
+
Driver::Disk(_) => panic!("not doing disk for benchmark"),
31
+
};
32
33
+
let mut n = 0;
34
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
35
+
n += pairs.len();
36
}
37
+
n
38
}
39
40
criterion_group!(benches, criterion_benchmark);
+16
-22
benches/non-huge-cars.rs
+16
-22
benches/non-huge-cars.rs
···
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
5
6
use criterion::{Criterion, criterion_group, criterion_main};
7
8
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
9
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
10
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
15
.build()
16
.expect("Creating runtime failed");
17
18
c.bench_function("tiny-car", |b| {
19
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
20
});
···
26
});
27
}
28
29
-
async fn drive_car(bytes: &[u8]) {
30
-
let reader = CarReader::new(bytes).await.unwrap();
31
-
32
-
let root = reader
33
-
.header()
34
-
.roots()
35
-
.first()
36
-
.ok_or("missing root")
37
.unwrap()
38
-
.clone();
39
-
40
-
let stream = std::pin::pin!(reader.stream());
41
42
-
let (_commit, v) =
43
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
44
-
.await
45
-
.unwrap();
46
-
let mut record_stream = std::pin::pin!(v.stream());
47
-
48
-
while let Some(_) = record_stream.try_next().await.unwrap() {
49
-
// just here for the drive
50
}
51
}
52
53
criterion_group!(benches, criterion_benchmark);
···
1
extern crate repo_stream;
2
+
use repo_stream::Driver;
3
4
use criterion::{Criterion, criterion_group, criterion_main};
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
14
.build()
15
.expect("Creating runtime failed");
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
20
c.bench_function("tiny-car", |b| {
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
22
});
···
28
});
29
}
30
31
+
async fn drive_car(bytes: &[u8]) -> usize {
32
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 32)
33
+
.await
34
.unwrap()
35
+
{
36
+
Driver::Memory(_, mem_driver) => mem_driver,
37
+
Driver::Disk(_) => panic!("not benching big cars here"),
38
+
};
39
40
+
let mut n = 0;
41
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
42
+
n += pairs.len();
43
}
44
+
n
45
}
46
47
criterion_group!(benches, criterion_benchmark);
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+63
-27
examples/disk-read-file/main.rs
+63
-27
examples/disk-read-file/main.rs
···
1
extern crate repo_stream;
2
use clap::Parser;
3
-
use futures::TryStreamExt;
4
-
use iroh_car::CarReader;
5
-
use std::convert::Infallible;
6
use std::path::PathBuf;
7
-
8
-
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
10
#[derive(Debug, Parser)]
11
struct Args {
···
16
}
17
18
#[tokio::main]
19
-
async fn main() -> Result<()> {
20
env_logger::init();
21
22
let Args { car, tmpfile } = Args::parse();
23
let reader = tokio::fs::File::open(car).await?;
24
let reader = tokio::io::BufReader::new(reader);
25
26
-
println!("hello!");
27
28
-
let reader = CarReader::new(reader).await?;
29
30
-
let redb_store = repo_stream::disk_redb::RedbStore::new(tmpfile)?;
31
32
-
let root = reader
33
-
.header()
34
-
.roots()
35
-
.first()
36
-
.ok_or("missing root")?
37
-
.clone();
38
-
log::debug!("root: {root:?}");
39
40
-
// let stream = Box::pin(reader.stream());
41
-
let stream = std::pin::pin!(reader.stream());
42
43
-
let (commit, v) = repo_stream::disk_drive::Vehicle::init(root, stream, redb_store, |block| {
44
-
Ok::<_, Infallible>(block.len())
45
-
})
46
-
.await?;
47
-
let mut record_stream = std::pin::pin!(v.stream());
48
49
-
log::info!("got commit: {commit:?}");
50
51
-
while let Some((rkey, _rec)) = record_stream.try_next().await? {
52
-
log::info!("got {rkey:?}");
53
}
54
-
log::info!("bye!");
55
56
Ok(())
57
}
···
1
+
/*!
2
+
Read a CAR file by spilling to disk
3
+
*/
4
+
5
extern crate repo_stream;
6
use clap::Parser;
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8
use std::path::PathBuf;
9
+
use std::time::Instant;
10
11
#[derive(Debug, Parser)]
12
struct Args {
···
17
}
18
19
#[tokio::main]
20
+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
21
env_logger::init();
22
23
let Args { car, tmpfile } = Args::parse();
24
+
25
+
// repo-stream takes an AsyncRead as input. wrapping a filesystem read in
26
+
// BufReader can provide a really significant performance win.
27
let reader = tokio::fs::File::open(car).await?;
28
let reader = tokio::io::BufReader::new(reader);
29
30
+
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
32
33
+
// in this example we only bother handling CARs that are too big for memory
34
+
// `noop` helper means: do no block processing, store the raw blocks
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(10) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
40
+
Driver::Memory(_, _) => panic!("try this on a bigger car"),
41
+
Driver::Disk(big_stuff) => {
42
+
// we reach here if the repo was too big and needs to be spilled to
43
+
// disk to continue
44
45
+
// set up a disk store we can spill to
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
47
48
+
// do the spilling, get back a (similar) driver
49
+
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
50
51
+
// at this point you might want to fetch the account's signing key
52
+
// via the DID from the commit, and then verify the signature.
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
54
+
55
+
// pop the driver back out to get some code indentation relief
56
+
driver
57
+
}
58
+
};
59
+
60
+
// collect some random stats about the blocks
61
+
let mut n = 0;
62
+
let mut zeros = 0;
63
64
+
log::info!("walking...");
65
+
66
+
// this example uses the disk driver's channel mode: the tree walking is
67
+
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
68
+
let (mut rx, join) = driver.to_channel(512);
69
+
while let Some(r) = rx.recv().await {
70
+
let pairs = r?;
71
72
+
// keep a count of the total number of blocks seen
73
+
n += pairs.len();
74
75
+
for (_, block) in pairs {
76
+
// for each block, count how many bytes are equal to '0'
77
+
// (this is just an example, you probably want to do something more
78
+
// interesting)
79
+
zeros += block.into_iter().filter(|&b| b == b'0').count()
80
+
}
81
}
82
+
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
84
+
85
+
// clean up the database. would be nice to do this in drop so it happens
86
+
// automatically, but some blocking work happens, so that's not allowed in
87
+
// async rust. ๐คทโโ๏ธ
88
+
join.await?.reset_store().await?;
89
+
90
+
log::info!("done. n={n} zeros={zeros}");
91
92
Ok(())
93
}
+18
-25
examples/read-file/main.rs
+18
-25
examples/read-file/main.rs
···
1
extern crate repo_stream;
2
use clap::Parser;
3
-
use futures::TryStreamExt;
4
-
use iroh_car::CarReader;
5
-
use std::convert::Infallible;
6
use std::path::PathBuf;
7
8
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
21
let reader = tokio::fs::File::open(file).await?;
22
let reader = tokio::io::BufReader::new(reader);
23
24
-
println!("hello!");
25
-
26
-
let reader = CarReader::new(reader).await?;
27
-
28
-
let root = reader
29
-
.header()
30
-
.roots()
31
-
.first()
32
-
.ok_or("missing root")?
33
-
.clone();
34
-
log::debug!("root: {root:?}");
35
-
36
-
// let stream = Box::pin(reader.stream());
37
-
let stream = std::pin::pin!(reader.stream());
38
-
39
-
let (commit, v) =
40
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
41
-
.await?;
42
-
let mut record_stream = std::pin::pin!(v.stream());
43
44
log::info!("got commit: {commit:?}");
45
46
-
while let Some((rkey, _rec)) = record_stream.try_next().await? {
47
-
log::info!("got {rkey:?}");
48
}
49
-
log::info!("bye!");
50
51
Ok(())
52
}
···
1
+
/*!
2
+
Read a CAR file with in-memory processing
3
+
*/
4
+
5
extern crate repo_stream;
6
use clap::Parser;
7
+
use repo_stream::{Driver, DriverBuilder};
8
use std::path::PathBuf;
9
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
23
let reader = tokio::fs::File::open(file).await?;
24
let reader = tokio::io::BufReader::new(reader);
25
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
34
35
log::info!("got commit: {commit:?}");
36
37
+
let mut n = 0;
38
+
while let Some(pairs) = driver.next_chunk(256).await? {
39
+
n += pairs.len();
40
+
// log::info!("got {rkey:?}");
41
}
42
+
log::info!("bye! total records={n}");
43
44
Ok(())
45
}
+70
-2
readme.md
+70
-2
readme.md
···
1
# repo-stream
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
4
5
6
current car processing times (records processed into their length usize, phil's dev machine):
···
27
-> yeah the commit is returned from init
28
- [ ] spec compliance todos
29
- [x] assert that keys are ordered and fail if not
30
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
31
- [ ] performance todos
32
- [x] consume the serialized nodes into a mutable efficient format
33
- [ ] maybe customize the deserialize impl to do that directly?
···
1
# repo-stream
2
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
54
+
// clean up the disk store (drop tables etc)
55
+
driver.reset_store().await?;
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
Ok(())
60
+
}
61
+
```
62
+
63
+
more recent todo
64
+
65
+
- [ ] get an *emtpy* car for the test suite
66
+
- [x] implement a max size on disk limit
67
+
68
+
69
+
-----
70
+
71
+
older stuff (to clean up):
72
73
74
current car processing times (records processed into their length usize, phil's dev machine):
···
95
-> yeah the commit is returned from init
96
- [ ] spec compliance todos
97
- [x] assert that keys are ordered and fail if not
98
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
99
- [ ] performance todos
100
- [x] consume the serialized nodes into a mutable efficient format
101
- [ ] maybe customize the deserialize impl to do that directly?
+221
src/disk.rs
+221
src/disk.rs
···
···
1
+
/*!
2
+
Disk storage for blocks on disk
3
+
4
+
Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5
+
to be the best behaved in terms of both on-disk space usage and memory usage.
6
+
7
+
```no_run
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
+
# #[tokio::main]
10
+
# async fn main() -> Result<(), DiskError> {
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
15
+
# Ok(())
16
+
# }
17
+
```
18
+
*/
19
+
20
+
use crate::drive::DriveError;
21
+
use rusqlite::OptionalExtension;
22
+
use std::path::PathBuf;
23
+
24
+
#[derive(Debug, thiserror::Error)]
25
+
pub enum DiskError {
26
+
/// A wrapped database error
27
+
///
28
+
/// (The wrapped err should probably be obscured to remove public-facing
29
+
/// sqlite bits)
30
+
#[error(transparent)]
31
+
DbError(#[from] rusqlite::Error),
32
+
/// A tokio blocking task failed to join
33
+
#[error("Failed to join a tokio blocking task: {0}")]
34
+
JoinError(#[from] tokio::task::JoinError),
35
+
/// The total size of stored blocks exceeded the allowed size
36
+
///
37
+
/// If you need to process *really* big CARs, you can configure a higher
38
+
/// limit.
39
+
#[error("Maximum disk size reached")]
40
+
MaxSizeExceeded,
41
+
#[error("this error was replaced, seeing this is a bug.")]
42
+
#[doc(hidden)]
43
+
Stolen,
44
+
}
45
+
46
+
impl DiskError {
47
+
/// hack for ownership challenges with the disk driver
48
+
pub(crate) fn steal(&mut self) -> Self {
49
+
let mut swapped = DiskError::Stolen;
50
+
std::mem::swap(self, &mut swapped);
51
+
swapped
52
+
}
53
+
}
54
+
55
+
/// Builder-style disk store setup
56
+
#[derive(Debug, Clone)]
57
+
pub struct DiskBuilder {
58
+
/// Database in-memory cache allowance
59
+
///
60
+
/// Default: 32 MiB
61
+
pub cache_size_mb: usize,
62
+
/// Database stored block size limit
63
+
///
64
+
/// Default: 10 GiB
65
+
///
66
+
/// Note: actual size on disk may be more, but should approximately scale
67
+
/// with this limit
68
+
pub max_stored_mb: usize,
69
+
}
70
+
71
+
impl Default for DiskBuilder {
72
+
fn default() -> Self {
73
+
Self {
74
+
cache_size_mb: 32,
75
+
max_stored_mb: 10 * 1024, // 10 GiB
76
+
}
77
+
}
78
+
}
79
+
80
+
impl DiskBuilder {
81
+
/// Begin configuring the storage with defaults
82
+
pub fn new() -> Self {
83
+
Default::default()
84
+
}
85
+
/// Set the in-memory cache allowance for the database
86
+
///
87
+
/// Default: 32 MiB
88
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
89
+
self.cache_size_mb = size;
90
+
self
91
+
}
92
+
/// Set the approximate stored block size limit
93
+
///
94
+
/// Default: 10 GiB
95
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
96
+
self.max_stored_mb = max;
97
+
self
98
+
}
99
+
/// Open and initialize the actual disk storage
100
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
101
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
102
+
}
103
+
}
104
+
105
+
/// On-disk block storage
106
+
pub struct DiskStore {
107
+
conn: rusqlite::Connection,
108
+
max_stored: usize,
109
+
stored: usize,
110
+
}
111
+
112
+
impl DiskStore {
113
+
/// Initialize a new disk store
114
+
pub async fn new(
115
+
path: PathBuf,
116
+
cache_mb: usize,
117
+
max_stored_mb: usize,
118
+
) -> Result<Self, DiskError> {
119
+
let max_stored = max_stored_mb * 2_usize.pow(20);
120
+
let conn = tokio::task::spawn_blocking(move || {
121
+
let conn = rusqlite::Connection::open(path)?;
122
+
123
+
let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
124
+
125
+
// conn.pragma_update(None, "journal_mode", "OFF")?;
126
+
// conn.pragma_update(None, "journal_mode", "MEMORY")?;
127
+
conn.pragma_update(None, "journal_mode", "WAL")?;
128
+
// conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
129
+
conn.pragma_update(None, "synchronous", "OFF")?;
130
+
conn.pragma_update(
131
+
None,
132
+
"cache_size",
133
+
(cache_mb as i64 * sqlite_one_mb).to_string(),
134
+
)?;
135
+
Self::reset_tables(&conn)?;
136
+
137
+
Ok::<_, DiskError>(conn)
138
+
})
139
+
.await??;
140
+
141
+
Ok(Self {
142
+
conn,
143
+
max_stored,
144
+
stored: 0,
145
+
})
146
+
}
147
+
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
148
+
let tx = self.conn.transaction()?;
149
+
Ok(SqliteWriter {
150
+
tx,
151
+
stored: &mut self.stored,
152
+
max: self.max_stored,
153
+
})
154
+
}
155
+
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
156
+
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
157
+
Ok(SqliteReader { select_stmt })
158
+
}
159
+
/// Drop and recreate the kv table
160
+
pub async fn reset(self) -> Result<Self, DiskError> {
161
+
tokio::task::spawn_blocking(move || {
162
+
Self::reset_tables(&self.conn)?;
163
+
Ok(self)
164
+
})
165
+
.await?
166
+
}
167
+
fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
168
+
conn.execute("DROP TABLE IF EXISTS blocks", ())?;
169
+
conn.execute(
170
+
"CREATE TABLE blocks (
171
+
key BLOB PRIMARY KEY NOT NULL,
172
+
val BLOB NOT NULL
173
+
) WITHOUT ROWID",
174
+
(),
175
+
)?;
176
+
Ok(())
177
+
}
178
+
}
179
+
180
+
pub(crate) struct SqliteWriter<'conn> {
181
+
tx: rusqlite::Transaction<'conn>,
182
+
stored: &'conn mut usize,
183
+
max: usize,
184
+
}
185
+
186
+
impl SqliteWriter<'_> {
187
+
pub(crate) fn put_many(
188
+
&mut self,
189
+
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
190
+
) -> Result<(), DriveError> {
191
+
let mut insert_stmt = self
192
+
.tx
193
+
.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
194
+
.map_err(DiskError::DbError)?;
195
+
for pair in kv {
196
+
let (k, v) = pair?;
197
+
*self.stored += v.len();
198
+
if *self.stored > self.max {
199
+
return Err(DiskError::MaxSizeExceeded.into());
200
+
}
201
+
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
202
+
}
203
+
Ok(())
204
+
}
205
+
pub fn commit(self) -> Result<(), DiskError> {
206
+
self.tx.commit()?;
207
+
Ok(())
208
+
}
209
+
}
210
+
211
+
pub(crate) struct SqliteReader<'conn> {
212
+
select_stmt: rusqlite::Statement<'conn>,
213
+
}
214
+
215
+
impl SqliteReader<'_> {
216
+
pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
217
+
self.select_stmt
218
+
.query_one((&key,), |row| row.get(0))
219
+
.optional()
220
+
}
221
+
}
-143
src/disk_drive.rs
-143
src/disk_drive.rs
···
1
-
use futures::Stream;
2
-
use futures::TryStreamExt;
3
-
use std::error::Error;
4
-
5
-
use crate::disk_walk::{Step, Trip, Walker};
6
-
use crate::mst::Commit;
7
-
8
-
use ipld_core::cid::Cid;
9
-
use serde::{Serialize, de::DeserializeOwned};
10
-
11
-
/// Errors that can happen while consuming and emitting blocks and records
12
-
#[derive(Debug, thiserror::Error)]
13
-
pub enum DriveError {
14
-
#[error("Failed to initialize CarReader: {0}")]
15
-
CarReader(#[from] iroh_car::Error),
16
-
#[error("Car block stream error: {0}")]
17
-
CarBlockError(Box<dyn Error>),
18
-
#[error("Failed to decode commit block: {0}")]
19
-
BadCommit(Box<dyn Error>),
20
-
#[error("The Commit block reference by the root was not found")]
21
-
MissingCommit,
22
-
#[error("The MST block {0} could not be found")]
23
-
MissingBlock(Cid),
24
-
#[error("Failed to walk the mst tree: {0}")]
25
-
Tripped(#[from] Trip),
26
-
}
27
-
28
-
pub trait BlockStore<MPB: Serialize + DeserializeOwned> {
29
-
fn put_batch(&self, blocks: Vec<(Cid, MPB)>); // unwraps for now
30
-
fn get(&self, key: Cid) -> Option<MPB>;
31
-
}
32
-
33
-
type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
34
-
35
-
/// The core driver between the block stream and MST walker
36
-
pub struct Vehicle<SE, S, T, BS, P, PE>
37
-
where
38
-
SE: Error + 'static,
39
-
S: Stream<Item = CarBlock<SE>>,
40
-
T: Clone + Serialize + DeserializeOwned,
41
-
BS: BlockStore<Vec<u8>>,
42
-
P: Fn(&[u8]) -> Result<T, PE>,
43
-
PE: Error,
44
-
{
45
-
#[allow(dead_code)]
46
-
block_stream: Option<S>,
47
-
block_store: BS,
48
-
walker: Walker,
49
-
process: P,
50
-
}
51
-
52
-
impl<SE, S, T, BS, P, PE> Vehicle<SE, S, T, BS, P, PE>
53
-
where
54
-
SE: Error + 'static,
55
-
S: Stream<Item = CarBlock<SE>> + Unpin,
56
-
T: Clone + Serialize + DeserializeOwned,
57
-
BS: BlockStore<Vec<u8>>,
58
-
P: Fn(&[u8]) -> Result<T, PE>,
59
-
PE: Error,
60
-
{
61
-
/// Set up the stream
62
-
///
63
-
/// This will eagerly consume blocks until the `Commit` object is found.
64
-
/// *Usually* the it's the first block, but there is no guarantee.
65
-
///
66
-
/// ### Parameters
67
-
///
68
-
/// `root`: CID of the commit object that is the root of the MST
69
-
///
70
-
/// `block_stream`: Input stream of raw CAR blocks
71
-
///
72
-
/// `process`: record-transforming callback:
73
-
///
74
-
/// For tasks where records can be quickly processed into a *smaller*
75
-
/// useful representation, you can do that eagerly as blocks come in by
76
-
/// passing the processor as a callback here. This can reduce overall
77
-
/// memory usage.
78
-
pub async fn init(
79
-
root: Cid,
80
-
block_stream: S,
81
-
block_store: BS,
82
-
process: P,
83
-
) -> Result<(Commit, Self), DriveError> {
84
-
let mut commit = None;
85
-
86
-
log::warn!("init: load blocks");
87
-
88
-
let mut chunked = block_stream.try_chunks(4096);
89
-
90
-
// go ahead and put all blocks in the block store
91
-
while let Some(chunk) = chunked
92
-
.try_next()
93
-
.await
94
-
.map_err(|e| DriveError::CarBlockError(e.into()))?
95
-
{
96
-
let mut to_insert = Vec::with_capacity(chunk.len());
97
-
for (cid, data) in chunk {
98
-
if cid == root {
99
-
let c: Commit = serde_ipld_dagcbor::from_slice(&data)
100
-
.map_err(|e| DriveError::BadCommit(e.into()))?;
101
-
commit = Some(c);
102
-
} else {
103
-
to_insert.push((cid, data));
104
-
}
105
-
}
106
-
block_store.put_batch(to_insert)
107
-
}
108
-
109
-
log::warn!("init: got commit?");
110
-
111
-
// we either broke out or read all the blocks without finding the commit...
112
-
let commit = commit.ok_or(DriveError::MissingCommit)?;
113
-
114
-
let walker = Walker::new(commit.data);
115
-
116
-
log::warn!("init: wrapping up");
117
-
118
-
let me = Self {
119
-
block_stream: None,
120
-
block_store,
121
-
walker,
122
-
process,
123
-
};
124
-
Ok((commit, me))
125
-
}
126
-
127
-
/// Manually step through the record outputs
128
-
pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError> {
129
-
match self.walker.step(&mut self.block_store, &self.process)? {
130
-
Step::Rest(cid) => Err(DriveError::MissingBlock(cid)),
131
-
Step::Finish => Ok(None),
132
-
Step::Step { rkey, data } => Ok(Some((rkey, data))),
133
-
}
134
-
}
135
-
136
-
/// Convert to a futures::stream of record outputs
137
-
pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError>> {
138
-
futures::stream::try_unfold(self, |mut this| async move {
139
-
let maybe_record = this.next_record().await?;
140
-
Ok(maybe_record.map(|b| (b, this)))
141
-
})
142
-
}
143
-
}
···
-61
src/disk_redb.rs
-61
src/disk_redb.rs
···
1
-
use crate::disk_drive::BlockStore;
2
-
use ipld_core::cid::Cid;
3
-
use redb::{Database, Durability, Error, ReadableDatabase, TableDefinition};
4
-
use serde::{Serialize, de::DeserializeOwned};
5
-
use std::path::Path;
6
-
7
-
const TABLE: TableDefinition<&[u8], &[u8]> = TableDefinition::new("blocks");
8
-
9
-
pub struct RedbStore {
10
-
#[allow(dead_code)]
11
-
db: Database,
12
-
}
13
-
14
-
impl RedbStore {
15
-
pub fn new(path: impl AsRef<Path>) -> Result<Self, Error> {
16
-
log::warn!("redb new");
17
-
let db = Database::create(path)?;
18
-
log::warn!("db created");
19
-
Ok(Self { db })
20
-
}
21
-
}
22
-
23
-
impl Drop for RedbStore {
24
-
fn drop(&mut self) {
25
-
let mut tx = self.db.begin_write().unwrap();
26
-
tx.set_durability(Durability::None).unwrap();
27
-
tx.delete_table(TABLE).unwrap();
28
-
tx.commit().unwrap();
29
-
}
30
-
}
31
-
32
-
impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for RedbStore {
33
-
fn put_batch(&self, blocks: Vec<(Cid, MPB)>) {
34
-
let mut tx = self.db.begin_write().unwrap();
35
-
tx.set_durability(Durability::None).unwrap();
36
-
37
-
{
38
-
let mut table = tx.open_table(TABLE).unwrap();
39
-
for (cid, t) in blocks {
40
-
let key_bytes = cid.to_bytes();
41
-
let val_bytes =
42
-
bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap();
43
-
table.insert(&*key_bytes, &*val_bytes).unwrap();
44
-
}
45
-
}
46
-
47
-
tx.commit().unwrap();
48
-
}
49
-
50
-
fn get(&self, c: Cid) -> Option<MPB> {
51
-
let key_bytes = c.to_bytes();
52
-
let tx = self.db.begin_read().unwrap();
53
-
let table = tx.open_table(TABLE).unwrap();
54
-
let maybe_val_bytes = table.get(&*key_bytes).unwrap()?;
55
-
let (t, n): (MPB, usize) =
56
-
bincode::serde::decode_from_slice(maybe_val_bytes.value(), bincode::config::standard())
57
-
.unwrap();
58
-
assert_eq!(maybe_val_bytes.value().len(), n);
59
-
Some(t)
60
-
}
61
-
}
···
-65
src/disk_sqlite.rs
-65
src/disk_sqlite.rs
···
1
-
// use crate::disk_drive::BlockStore;
2
-
// use ipld_core::cid::Cid;
3
-
// use rusqlite::{Connection, OptionalExtension, Result};
4
-
// use serde::{Serialize, de::DeserializeOwned};
5
-
// use std::path::Path;
6
-
7
-
// pub struct SqliteStore {
8
-
// conn: Connection,
9
-
// }
10
-
11
-
// impl SqliteStore {
12
-
// pub fn new(path: impl AsRef<Path>) -> Result<Self> {
13
-
// let conn = Connection::open(path)?;
14
-
// conn.pragma_update(None, "journal_mode", "WAL")?;
15
-
// conn.pragma_update(None, "synchronous", "OFF")?;
16
-
// conn.pragma_update(None, "cache_size", (-32 * 2_i64.pow(10)).to_string())?;
17
-
// conn.execute(
18
-
// "CREATE TABLE blocks (
19
-
// key BLOB PRIMARY KEY NOT NULL,
20
-
// val BLOB NOT NULL
21
-
// ) WITHOUT ROWID",
22
-
// (),
23
-
// )?;
24
-
25
-
// Ok(Self { conn })
26
-
// }
27
-
// }
28
-
29
-
// impl Drop for SqliteStore {
30
-
// fn drop(&mut self) {
31
-
// self.conn.execute("DROP TABLE blocks", ()).unwrap();
32
-
// }
33
-
// }
34
-
35
-
// impl<MPB: Serialize + DeserializeOwned> BlockStore<MPB> for SqliteStore {
36
-
// fn put(&self, c: Cid, t: MPB) {
37
-
// let key_bytes = c.to_bytes();
38
-
// let val_bytes = bincode::serde::encode_to_vec(t, bincode::config::standard()).unwrap();
39
-
40
-
// self.conn
41
-
// .execute(
42
-
// "INSERT INTO blocks (key, val) VALUES (?1, ?2)",
43
-
// (&key_bytes, &val_bytes),
44
-
// )
45
-
// .unwrap();
46
-
// }
47
-
// fn get(&self, c: Cid) -> Option<MPB> {
48
-
// let key_bytes = c.to_bytes();
49
-
50
-
// let val_bytes: Vec<u8> = self
51
-
// .conn
52
-
// .query_one(
53
-
// "SELECT val FROM blocks WHERE key = ?1",
54
-
// (&key_bytes,),
55
-
// |row| row.get(0),
56
-
// )
57
-
// .optional()
58
-
// .unwrap()?;
59
-
60
-
// let (t, n): (MPB, usize) =
61
-
// bincode::serde::decode_from_slice(&val_bytes, bincode::config::standard()).unwrap();
62
-
// assert_eq!(val_bytes.len(), n);
63
-
// Some(t)
64
-
// }
65
-
// }
···
-392
src/disk_walk.rs
-392
src/disk_walk.rs
···
1
-
//! Depth-first MST traversal
2
-
3
-
use crate::disk_drive::BlockStore;
4
-
use crate::mst::Node;
5
-
6
-
use ipld_core::cid::Cid;
7
-
use serde::{Serialize, de::DeserializeOwned};
8
-
use std::error::Error;
9
-
10
-
/// Errors that can happen while walking
11
-
#[derive(Debug, thiserror::Error)]
12
-
pub enum Trip {
13
-
#[error("empty mst nodes are not allowed")]
14
-
NodeEmpty,
15
-
#[error("Failed to decode commit block: {0}")]
16
-
BadCommit(Box<dyn std::error::Error>),
17
-
#[error("Action node error: {0}")]
18
-
RkeyError(#[from] RkeyError),
19
-
#[error("Process failed: {0}")]
20
-
ProcessFailed(String),
21
-
#[error("Encountered an rkey out of order while walking the MST")]
22
-
RkeyOutOfOrder,
23
-
}
24
-
25
-
/// Errors from invalid Rkeys
26
-
#[derive(Debug, thiserror::Error)]
27
-
pub enum RkeyError {
28
-
#[error("Failed to compute an rkey due to invalid prefix_len")]
29
-
EntryPrefixOutOfbounds,
30
-
#[error("RKey was not utf-8")]
31
-
EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
32
-
}
33
-
34
-
/// Walker outputs
35
-
#[derive(Debug)]
36
-
pub enum Step<T: Serialize + DeserializeOwned> {
37
-
/// We need a CID but it's not in the block store
38
-
///
39
-
/// Give the needed CID to the driver so it can load blocks until it's found
40
-
Rest(Cid),
41
-
/// Reached the end of the MST! yay!
42
-
Finish,
43
-
/// A record was found!
44
-
Step { rkey: String, data: T },
45
-
}
46
-
47
-
#[derive(Debug, Clone, PartialEq)]
48
-
enum Need {
49
-
Node(Cid),
50
-
Record { rkey: String, cid: Cid },
51
-
}
52
-
53
-
fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> {
54
-
let mut entries = Vec::with_capacity(node.entries.len());
55
-
56
-
let mut prefix = vec![];
57
-
for entry in &node.entries {
58
-
let mut rkey = vec![];
59
-
let pre_checked = prefix
60
-
.get(..entry.prefix_len)
61
-
.ok_or(RkeyError::EntryPrefixOutOfbounds)?;
62
-
rkey.extend_from_slice(pre_checked);
63
-
rkey.extend_from_slice(&entry.keysuffix);
64
-
prefix = rkey.clone();
65
-
66
-
entries.push(Need::Record {
67
-
rkey: String::from_utf8(rkey)?,
68
-
cid: entry.value,
69
-
});
70
-
if let Some(ref tree) = entry.tree {
71
-
entries.push(Need::Node(*tree));
72
-
}
73
-
}
74
-
75
-
entries.reverse();
76
-
stack.append(&mut entries);
77
-
78
-
if let Some(tree) = node.left {
79
-
stack.push(Need::Node(tree));
80
-
}
81
-
Ok(())
82
-
}
83
-
84
-
/// Traverser of an atproto MST
85
-
///
86
-
/// Walks the tree from left-to-right in depth-first order
87
-
#[derive(Debug)]
88
-
pub struct Walker {
89
-
stack: Vec<Need>,
90
-
prev: String,
91
-
}
92
-
93
-
impl Walker {
94
-
pub fn new(tree_root_cid: Cid) -> Self {
95
-
Self {
96
-
stack: vec![Need::Node(tree_root_cid)],
97
-
prev: "".to_string(),
98
-
}
99
-
}
100
-
101
-
/// Advance through nodes until we find a record or can't go further
102
-
pub fn step<T: Clone + Serialize + DeserializeOwned, E: Error>(
103
-
&mut self,
104
-
block_store: &mut impl BlockStore<Vec<u8>>,
105
-
process: impl Fn(&[u8]) -> Result<T, E>,
106
-
) -> Result<Step<T>, Trip> {
107
-
loop {
108
-
let Some(mut need) = self.stack.last() else {
109
-
log::trace!("tried to walk but we're actually done.");
110
-
return Ok(Step::Finish);
111
-
};
112
-
113
-
match &mut need {
114
-
Need::Node(cid) => {
115
-
log::trace!("need node {cid:?}");
116
-
let Some(block) = block_store.get(*cid) else {
117
-
log::trace!("node not found, resting");
118
-
return Ok(Step::Rest(*cid));
119
-
};
120
-
121
-
let node = serde_ipld_dagcbor::from_slice::<Node>(&block)
122
-
.map_err(|e| Trip::BadCommit(e.into()))?;
123
-
124
-
// found node, make sure we remember
125
-
self.stack.pop();
126
-
127
-
// queue up work on the found node next
128
-
push_from_node(&mut self.stack, &node)?;
129
-
}
130
-
Need::Record { rkey, cid } => {
131
-
log::trace!("need record {cid:?}");
132
-
let Some(block) = block_store.get(*cid) else {
133
-
log::trace!("record block not found, resting");
134
-
return Ok(Step::Rest(*cid));
135
-
};
136
-
let rkey = rkey.clone();
137
-
138
-
let data = process(&block).map_err(|e| Trip::ProcessFailed(e.to_string()));
139
-
140
-
// found node, make sure we remember
141
-
self.stack.pop();
142
-
143
-
log::trace!("emitting a block as a step. depth={}", self.stack.len());
144
-
145
-
let data = data.map_err(|e| Trip::ProcessFailed(e.to_string()))?;
146
-
147
-
// rkeys *must* be in order or else the tree is invalid (or
148
-
// we have a bug)
149
-
if rkey <= self.prev {
150
-
return Err(Trip::RkeyOutOfOrder);
151
-
}
152
-
self.prev = rkey.clone();
153
-
154
-
return Ok(Step::Step { rkey, data });
155
-
}
156
-
}
157
-
}
158
-
}
159
-
}
160
-
161
-
#[cfg(test)]
162
-
mod test {
163
-
use super::*;
164
-
// use crate::mst::Entry;
165
-
166
-
fn cid1() -> Cid {
167
-
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
168
-
.parse()
169
-
.unwrap()
170
-
}
171
-
// fn cid2() -> Cid {
172
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
173
-
// .parse()
174
-
// .unwrap()
175
-
// }
176
-
// fn cid3() -> Cid {
177
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
178
-
// .parse()
179
-
// .unwrap()
180
-
// }
181
-
// fn cid4() -> Cid {
182
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
183
-
// .parse()
184
-
// .unwrap()
185
-
// }
186
-
// fn cid5() -> Cid {
187
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
188
-
// .parse()
189
-
// .unwrap()
190
-
// }
191
-
// fn cid6() -> Cid {
192
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
193
-
// .parse()
194
-
// .unwrap()
195
-
// }
196
-
// fn cid7() -> Cid {
197
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
198
-
// .parse()
199
-
// .unwrap()
200
-
// }
201
-
// fn cid8() -> Cid {
202
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
203
-
// .parse()
204
-
// .unwrap()
205
-
// }
206
-
// fn cid9() -> Cid {
207
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
208
-
// .parse()
209
-
// .unwrap()
210
-
// }
211
-
212
-
#[test]
213
-
fn test_next_from_node_empty() {
214
-
let node = Node {
215
-
left: None,
216
-
entries: vec![],
217
-
};
218
-
let mut stack = vec![];
219
-
push_from_node(&mut stack, &node).unwrap();
220
-
assert_eq!(stack.last(), None);
221
-
}
222
-
223
-
#[test]
224
-
fn test_needs_from_node_just_left() {
225
-
let node = Node {
226
-
left: Some(cid1()),
227
-
entries: vec![],
228
-
};
229
-
let mut stack = vec![];
230
-
push_from_node(&mut stack, &node).unwrap();
231
-
assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref());
232
-
}
233
-
234
-
// #[test]
235
-
// fn test_needs_from_node_just_one_record() {
236
-
// let node = Node {
237
-
// left: None,
238
-
// entries: vec![Entry {
239
-
// keysuffix: "asdf".into(),
240
-
// prefix_len: 0,
241
-
// value: cid1(),
242
-
// tree: None,
243
-
// }],
244
-
// };
245
-
// assert_eq!(
246
-
// needs_from_node(node).unwrap(),
247
-
// vec![Need::Record {
248
-
// rkey: "asdf".into(),
249
-
// cid: cid1(),
250
-
// },]
251
-
// );
252
-
// }
253
-
254
-
// #[test]
255
-
// fn test_needs_from_node_two_records() {
256
-
// let node = Node {
257
-
// left: None,
258
-
// entries: vec![
259
-
// Entry {
260
-
// keysuffix: "asdf".into(),
261
-
// prefix_len: 0,
262
-
// value: cid1(),
263
-
// tree: None,
264
-
// },
265
-
// Entry {
266
-
// keysuffix: "gh".into(),
267
-
// prefix_len: 2,
268
-
// value: cid2(),
269
-
// tree: None,
270
-
// },
271
-
// ],
272
-
// };
273
-
// assert_eq!(
274
-
// needs_from_node(node).unwrap(),
275
-
// vec![
276
-
// Need::Record {
277
-
// rkey: "asdf".into(),
278
-
// cid: cid1(),
279
-
// },
280
-
// Need::Record {
281
-
// rkey: "asgh".into(),
282
-
// cid: cid2(),
283
-
// },
284
-
// ]
285
-
// );
286
-
// }
287
-
288
-
// #[test]
289
-
// fn test_needs_from_node_with_both() {
290
-
// let node = Node {
291
-
// left: None,
292
-
// entries: vec![Entry {
293
-
// keysuffix: "asdf".into(),
294
-
// prefix_len: 0,
295
-
// value: cid1(),
296
-
// tree: Some(cid2()),
297
-
// }],
298
-
// };
299
-
// assert_eq!(
300
-
// needs_from_node(node).unwrap(),
301
-
// vec![
302
-
// Need::Record {
303
-
// rkey: "asdf".into(),
304
-
// cid: cid1(),
305
-
// },
306
-
// Need::Node(cid2()),
307
-
// ]
308
-
// );
309
-
// }
310
-
311
-
// #[test]
312
-
// fn test_needs_from_node_left_and_record() {
313
-
// let node = Node {
314
-
// left: Some(cid1()),
315
-
// entries: vec![Entry {
316
-
// keysuffix: "asdf".into(),
317
-
// prefix_len: 0,
318
-
// value: cid2(),
319
-
// tree: None,
320
-
// }],
321
-
// };
322
-
// assert_eq!(
323
-
// needs_from_node(node).unwrap(),
324
-
// vec![
325
-
// Need::Node(cid1()),
326
-
// Need::Record {
327
-
// rkey: "asdf".into(),
328
-
// cid: cid2(),
329
-
// },
330
-
// ]
331
-
// );
332
-
// }
333
-
334
-
// #[test]
335
-
// fn test_needs_from_full_node() {
336
-
// let node = Node {
337
-
// left: Some(cid1()),
338
-
// entries: vec![
339
-
// Entry {
340
-
// keysuffix: "asdf".into(),
341
-
// prefix_len: 0,
342
-
// value: cid2(),
343
-
// tree: Some(cid3()),
344
-
// },
345
-
// Entry {
346
-
// keysuffix: "ghi".into(),
347
-
// prefix_len: 1,
348
-
// value: cid4(),
349
-
// tree: Some(cid5()),
350
-
// },
351
-
// Entry {
352
-
// keysuffix: "jkl".into(),
353
-
// prefix_len: 2,
354
-
// value: cid6(),
355
-
// tree: Some(cid7()),
356
-
// },
357
-
// Entry {
358
-
// keysuffix: "mno".into(),
359
-
// prefix_len: 4,
360
-
// value: cid8(),
361
-
// tree: Some(cid9()),
362
-
// },
363
-
// ],
364
-
// };
365
-
// assert_eq!(
366
-
// needs_from_node(node).unwrap(),
367
-
// vec![
368
-
// Need::Node(cid1()),
369
-
// Need::Record {
370
-
// rkey: "asdf".into(),
371
-
// cid: cid2(),
372
-
// },
373
-
// Need::Node(cid3()),
374
-
// Need::Record {
375
-
// rkey: "aghi".into(),
376
-
// cid: cid4(),
377
-
// },
378
-
// Need::Node(cid5()),
379
-
// Need::Record {
380
-
// rkey: "agjkl".into(),
381
-
// cid: cid6(),
382
-
// },
383
-
// Need::Node(cid7()),
384
-
// Need::Record {
385
-
// rkey: "agjkmno".into(),
386
-
// cid: cid8(),
387
-
// },
388
-
// Need::Node(cid9()),
389
-
// ]
390
-
// );
391
-
// }
392
-
}
···
+556
-109
src/drive.rs
+556
-109
src/drive.rs
···
1
-
//! Consume an MST block stream, producing an ordered stream of records
2
3
-
use futures::{Stream, TryStreamExt};
4
use ipld_core::cid::Cid;
5
use std::collections::HashMap;
6
-
use std::error::Error;
7
8
use crate::mst::{Commit, Node};
9
-
use crate::walk::{Step, Trip, Walker};
10
11
/// Errors that can happen while consuming and emitting blocks and records
12
#[derive(Debug, thiserror::Error)]
13
-
pub enum DriveError<E: Error> {
14
-
#[error("Failed to initialize CarReader: {0}")]
15
CarReader(#[from] iroh_car::Error),
16
-
#[error("Car block stream error: {0}")]
17
-
CarBlockError(Box<dyn Error>),
18
#[error("Failed to decode commit block: {0}")]
19
-
BadCommit(Box<dyn Error>),
20
#[error("The Commit block reference by the root was not found")]
21
MissingCommit,
22
#[error("The MST block {0} could not be found")]
23
MissingBlock(Cid),
24
#[error("Failed to walk the mst tree: {0}")]
25
-
Tripped(#[from] Trip<E>),
26
}
27
28
-
type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
29
30
-
#[derive(Debug)]
31
-
pub enum MaybeProcessedBlock<T, E> {
32
/// A block that's *probably* a Node (but we can't know yet)
33
///
34
/// It *can be* a record that suspiciously looks a lot like a node, so we
···
50
/// There's an alternative here, which would be to kick unprocessable blocks
51
/// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
52
/// surface the typed error later if needed by trying to reprocess.
53
-
Processed(Result<T, E>),
54
}
55
56
-
/// The core driver between the block stream and MST walker
57
-
pub struct Vehicle<SE, S, T, P, PE>
58
-
where
59
-
S: Stream<Item = CarBlock<SE>>,
60
-
P: Fn(&[u8]) -> Result<T, PE>,
61
-
PE: Error,
62
-
{
63
-
block_stream: S,
64
-
blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
65
-
walker: Walker,
66
-
process: P,
67
}
68
69
-
impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
70
-
where
71
-
SE: Error + 'static,
72
-
S: Stream<Item = CarBlock<SE>> + Unpin,
73
-
P: Fn(&[u8]) -> Result<T, PE>,
74
-
PE: Error,
75
-
{
76
-
/// Set up the stream
77
///
78
-
/// This will eagerly consume blocks until the `Commit` object is found.
79
-
/// *Usually* the it's the first block, but there is no guarantee.
80
///
81
-
/// ### Parameters
82
///
83
-
/// `root`: CID of the commit object that is the root of the MST
84
///
85
-
/// `block_stream`: Input stream of raw CAR blocks
86
///
87
-
/// `process`: record-transforming callback:
88
///
89
-
/// For tasks where records can be quickly processed into a *smaller*
90
-
/// useful representation, you can do that eagerly as blocks come in by
91
-
/// passing the processor as a callback here. This can reduce overall
92
-
/// memory usage.
93
-
pub async fn init(
94
-
root: Cid,
95
-
mut block_stream: S,
96
-
process: P,
97
-
) -> Result<(Commit, Self), DriveError<PE>> {
98
-
let mut blocks = HashMap::new();
99
100
let mut commit = None;
101
102
-
while let Some((cid, data)) = block_stream
103
-
.try_next()
104
-
.await
105
-
.map_err(|e| DriveError::CarBlockError(e.into()))?
106
-
{
107
if cid == root {
108
-
let c: Commit = serde_ipld_dagcbor::from_slice(&data)
109
-
.map_err(|e| DriveError::BadCommit(e.into()))?;
110
commit = Some(c);
111
-
break;
112
-
} else {
113
-
blocks.insert(
114
-
cid,
115
-
if Node::could_be(&data) {
116
-
MaybeProcessedBlock::Raw(data)
117
-
} else {
118
-
MaybeProcessedBlock::Processed(process(&data))
119
-
},
120
-
);
121
}
122
}
123
124
-
// we either broke out or read all the blocks without finding the commit...
125
let commit = commit.ok_or(DriveError::MissingCommit)?;
126
127
let walker = Walker::new(commit.data);
128
129
-
let me = Self {
130
-
block_stream,
131
-
blocks,
132
-
walker,
133
-
process,
134
-
};
135
-
Ok((commit, me))
136
}
137
138
-
async fn drive_until(&mut self, cid_needed: Cid) -> Result<(), DriveError<PE>> {
139
-
while let Some((cid, data)) = self
140
-
.block_stream
141
-
.try_next()
142
-
.await
143
-
.map_err(|e| DriveError::CarBlockError(e.into()))?
144
-
{
145
-
self.blocks.insert(
146
-
cid,
147
-
if Node::could_be(&data) {
148
-
MaybeProcessedBlock::Raw(data)
149
-
} else {
150
-
MaybeProcessedBlock::Processed((self.process)(&data))
151
-
},
152
-
);
153
-
if cid == cid_needed {
154
-
return Ok(());
155
}
156
}
157
158
-
// if we never found the block
159
-
Err(DriveError::MissingBlock(cid_needed))
160
}
161
162
-
/// Manually step through the record outputs
163
-
pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> {
164
loop {
165
-
// walk as far as we can until we run out of blocks or find a record
166
-
let cid_needed = match self.walker.step(&mut self.blocks, &self.process)? {
167
-
Step::Rest(cid) => cid,
168
-
Step::Finish => return Ok(None),
169
-
Step::Step { rkey, data } => return Ok(Some((rkey, data))),
170
-
};
171
172
-
// load blocks until we reach that cid
173
-
self.drive_until(cid_needed).await?;
174
}
175
}
176
177
-
/// Convert to a futures::stream of record outputs
178
-
pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> {
179
-
futures::stream::try_unfold(self, |mut this| async move {
180
-
let maybe_record = this.next_record().await?;
181
-
Ok(maybe_record.map(|b| (b, this)))
182
-
})
183
}
184
}
···
1
+
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
3
+
use crate::disk::{DiskError, DiskStore};
4
+
use crate::process::Processable;
5
use ipld_core::cid::Cid;
6
+
use iroh_car::CarReader;
7
+
use serde::{Deserialize, Serialize};
8
use std::collections::HashMap;
9
+
use std::convert::Infallible;
10
+
use tokio::{io::AsyncRead, sync::mpsc};
11
12
use crate::mst::{Commit, Node};
13
+
use crate::walk::{Step, WalkError, Walker};
14
15
/// Errors that can happen while consuming and emitting blocks and records
16
#[derive(Debug, thiserror::Error)]
17
+
pub enum DriveError {
18
+
#[error("Error from iroh_car: {0}")]
19
CarReader(#[from] iroh_car::Error),
20
#[error("Failed to decode commit block: {0}")]
21
+
BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
22
#[error("The Commit block reference by the root was not found")]
23
MissingCommit,
24
#[error("The MST block {0} could not be found")]
25
MissingBlock(Cid),
26
#[error("Failed to walk the mst tree: {0}")]
27
+
WalkError(#[from] WalkError),
28
+
#[error("CAR file had no roots")]
29
+
MissingRoot,
30
+
#[error("Storage error")]
31
+
StorageError(#[from] DiskError),
32
+
#[error("Encode error: {0}")]
33
+
BincodeEncodeError(#[from] bincode::error::EncodeError),
34
+
#[error("Tried to send on a closed channel")]
35
+
ChannelSendError, // SendError takes <T> which we don't need
36
+
#[error("Failed to join a task: {0}")]
37
+
JoinError(#[from] tokio::task::JoinError),
38
}
39
40
+
#[derive(Debug, thiserror::Error)]
41
+
pub enum DecodeError {
42
+
#[error(transparent)]
43
+
BincodeDecodeError(#[from] bincode::error::DecodeError),
44
+
#[error("extra bytes remained after decoding")]
45
+
ExtraGarbage,
46
+
}
47
48
+
/// An in-order chunk of Rkey + (processed) Block pairs
49
+
pub type BlockChunk<T> = Vec<(String, T)>;
50
+
51
+
#[derive(Debug, Clone, Serialize, Deserialize)]
52
+
pub(crate) enum MaybeProcessedBlock<T> {
53
/// A block that's *probably* a Node (but we can't know yet)
54
///
55
/// It *can be* a record that suspiciously looks a lot like a node, so we
···
71
/// There's an alternative here, which would be to kick unprocessable blocks
72
/// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
73
/// surface the typed error later if needed by trying to reprocess.
74
+
Processed(T),
75
}
76
77
+
impl<T: Processable> Processable for MaybeProcessedBlock<T> {
78
+
/// TODO this is probably a little broken
79
+
fn get_size(&self) -> usize {
80
+
use std::{cmp::max, mem::size_of};
81
+
82
+
// enum is always as big as its biggest member?
83
+
let base_size = max(size_of::<Vec<u8>>(), size_of::<T>());
84
+
85
+
let extra = match self {
86
+
Self::Raw(bytes) => bytes.len(),
87
+
Self::Processed(t) => t.get_size(),
88
+
};
89
+
90
+
base_size + extra
91
+
}
92
+
}
93
+
94
+
impl<T> MaybeProcessedBlock<T> {
95
+
fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self {
96
+
if Node::could_be(&data) {
97
+
MaybeProcessedBlock::Raw(data)
98
+
} else {
99
+
MaybeProcessedBlock::Processed(process(data))
100
+
}
101
+
}
102
}
103
104
+
/// Read a CAR file, buffering blocks in memory or to disk
105
+
pub enum Driver<R: AsyncRead + Unpin, T: Processable> {
106
+
/// All blocks fit within the memory limit
107
+
///
108
+
/// You probably want to check the commit's signature. You can go ahead and
109
+
/// walk the MST right away.
110
+
Memory(Commit, MemDriver<T>),
111
+
/// Blocks exceed the memory limit
112
///
113
+
/// You'll need to provide a disk storage to continue. The commit will be
114
+
/// returned and can be validated only once all blocks are loaded.
115
+
Disk(NeedDisk<R, T>),
116
+
}
117
+
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
123
+
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
163
+
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
172
+
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
188
+
}
189
+
190
+
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
191
+
/// Begin processing an atproto MST from a CAR file
192
///
193
+
/// Blocks will be loaded, processed, and buffered in memory. If the entire
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
196
///
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
198
+
/// partial state will be returned as `Driver::Disk(needed)`, which can be
199
+
/// resumed by providing a `SqliteStorage` for on-disk block storage.
200
+
pub async fn load_car(
201
+
reader: R,
202
+
process: fn(Vec<u8>) -> T,
203
+
mem_limit_mb: usize,
204
+
) -> Result<Driver<R, T>, DriveError> {
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
206
+
let mut mem_blocks = HashMap::new();
207
+
208
+
let mut car = CarReader::new(reader).await?;
209
+
210
+
let root = *car
211
+
.header()
212
+
.roots()
213
+
.first()
214
+
.ok_or(DriveError::MissingRoot)?;
215
+
log::debug!("root: {root:?}");
216
217
let mut commit = None;
218
219
+
// try to load all the blocks into memory
220
+
let mut mem_size = 0;
221
+
while let Some((cid, data)) = car.next_block().await? {
222
+
// the root commit is a Special Third Kind of block that we need to make
223
+
// sure not to optimistically send to the processing function
224
if cid == root {
225
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
226
commit = Some(c);
227
+
continue;
228
+
}
229
+
230
+
// remaining possible types: node, record, other. optimistically process
231
+
let maybe_processed = MaybeProcessedBlock::maybe(process, data);
232
+
233
+
// stash (maybe processed) blocks in memory as long as we have room
234
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
235
+
mem_blocks.insert(cid, maybe_processed);
236
+
if mem_size >= max_size {
237
+
return Ok(Driver::Disk(NeedDisk {
238
+
car,
239
+
root,
240
+
process,
241
+
max_size,
242
+
mem_blocks,
243
+
commit,
244
+
}));
245
}
246
}
247
248
+
// all blocks loaded and we fit in memory! hopefully we found the commit...
249
let commit = commit.ok_or(DriveError::MissingCommit)?;
250
251
let walker = Walker::new(commit.data);
252
253
+
Ok(Driver::Memory(
254
+
commit,
255
+
MemDriver {
256
+
blocks: mem_blocks,
257
+
walker,
258
+
process,
259
+
},
260
+
))
261
+
}
262
+
}
263
+
264
+
/// The core driver between the block stream and MST walker
265
+
///
266
+
/// In the future, PDSs will export CARs in a stream-friendly order that will
267
+
/// enable processing them with tiny memory overhead. But that future is not
268
+
/// here yet.
269
+
///
270
+
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
271
+
/// optimistic stream features: we load all block first, then walk the MST.
272
+
///
273
+
/// This makes things much simpler: we only need to worry about spilling to disk
274
+
/// in one place, and we always have a reasonable expecatation about how much
275
+
/// work the init function will do. We can drop the CAR reader before walking,
276
+
/// so the sync/async boundaries become a little easier to work around.
277
+
#[derive(Debug)]
278
+
pub struct MemDriver<T: Processable> {
279
+
blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
280
+
walker: Walker,
281
+
process: fn(Vec<u8>) -> T,
282
+
}
283
+
284
+
impl<T: Processable> MemDriver<T> {
285
+
/// Step through the record outputs, in rkey order
286
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
287
+
let mut out = Vec::with_capacity(n);
288
+
for _ in 0..n {
289
+
// walk as far as we can until we run out of blocks or find a record
290
+
match self.walker.step(&mut self.blocks, self.process)? {
291
+
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
292
+
Step::Finish => break,
293
+
Step::Found { rkey, data } => {
294
+
out.push((rkey, data));
295
+
continue;
296
+
}
297
+
};
298
+
}
299
+
300
+
if out.is_empty() {
301
+
Ok(None)
302
+
} else {
303
+
Ok(Some(out))
304
+
}
305
+
}
306
+
}
307
+
308
+
/// A partially memory-loaded car file that needs disk spillover to continue
309
+
pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> {
310
+
car: CarReader<R>,
311
+
root: Cid,
312
+
process: fn(Vec<u8>) -> T,
313
+
max_size: usize,
314
+
mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
315
+
pub commit: Option<Commit>,
316
+
}
317
+
318
+
fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> {
319
+
bincode::serde::encode_to_vec(v, bincode::config::standard())
320
+
}
321
+
322
+
pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> {
323
+
let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
324
+
if n != bytes.len() {
325
+
return Err(DecodeError::ExtraGarbage);
326
}
327
+
Ok(t)
328
+
}
329
330
+
impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> {
331
+
pub async fn finish_loading(
332
+
mut self,
333
+
mut store: DiskStore,
334
+
) -> Result<(Commit, DiskDriver<T>), DriveError> {
335
+
// move store in and back out so we can manage lifetimes
336
+
// dump mem blocks into the store
337
+
store = tokio::task::spawn(async move {
338
+
let mut writer = store.get_writer()?;
339
+
340
+
let kvs = self
341
+
.mem_blocks
342
+
.into_iter()
343
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
344
+
345
+
writer.put_many(kvs)?;
346
+
writer.commit()?;
347
+
Ok::<_, DriveError>(store)
348
+
})
349
+
.await??;
350
+
351
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
352
+
353
+
let store_worker = tokio::task::spawn_blocking(move || {
354
+
let mut writer = store.get_writer()?;
355
+
356
+
while let Some(chunk) = rx.blocking_recv() {
357
+
let kvs = chunk
358
+
.into_iter()
359
+
.map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
360
+
writer.put_many(kvs)?;
361
}
362
+
363
+
writer.commit()?;
364
+
Ok::<_, DriveError>(store)
365
+
}); // await later
366
+
367
+
// dump the rest to disk (in chunks)
368
+
log::debug!("dumping the rest of the stream...");
369
+
loop {
370
+
let mut mem_size = 0;
371
+
let mut chunk = vec![];
372
+
loop {
373
+
let Some((cid, data)) = self.car.next_block().await? else {
374
+
break;
375
+
};
376
+
// we still gotta keep checking for the root since we might not have it
377
+
if cid == self.root {
378
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
379
+
self.commit = Some(c);
380
+
continue;
381
+
}
382
+
// remaining possible types: node, record, other. optimistically process
383
+
// TODO: get the actual in-memory size to compute disk spill
384
+
let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
385
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
386
+
chunk.push((cid, maybe_processed));
387
+
if mem_size >= self.max_size {
388
+
// soooooo if we're setting the db cache to max_size and then letting
389
+
// multiple chunks in the queue that are >= max_size, then at any time
390
+
// we might be using some multiple of max_size?
391
+
break;
392
+
}
393
+
}
394
+
if chunk.is_empty() {
395
+
break;
396
+
}
397
+
tx.send(chunk)
398
+
.await
399
+
.map_err(|_| DriveError::ChannelSendError)?;
400
}
401
+
drop(tx);
402
+
log::debug!("done. waiting for worker to finish...");
403
404
+
store = store_worker.await??;
405
+
406
+
log::debug!("worker finished.");
407
+
408
+
let commit = self.commit.ok_or(DriveError::MissingCommit)?;
409
+
410
+
let walker = Walker::new(commit.data);
411
+
412
+
Ok((
413
+
commit,
414
+
DiskDriver {
415
+
process: self.process,
416
+
state: Some(BigState { store, walker }),
417
+
},
418
+
))
419
+
}
420
+
}
421
+
422
+
struct BigState {
423
+
store: DiskStore,
424
+
walker: Walker,
425
+
}
426
+
427
+
/// MST walker that reads from disk instead of an in-memory hashmap
428
+
pub struct DiskDriver<T: Clone> {
429
+
process: fn(Vec<u8>) -> T,
430
+
state: Option<BigState>,
431
+
}
432
+
433
+
// for doctests only
434
+
#[doc(hidden)]
435
+
pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> {
436
+
use crate::process::noop;
437
+
DiskDriver {
438
+
process: noop,
439
+
state: None,
440
}
441
+
}
442
443
+
impl<T: Processable + Send + 'static> DiskDriver<T> {
444
+
/// Walk the MST returning up to `n` rkey + record pairs
445
+
///
446
+
/// ```no_run
447
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
448
+
/// # #[tokio::main]
449
+
/// # async fn main() -> Result<(), DriveError> {
450
+
/// # let mut disk_driver = _get_fake_disk_driver();
451
+
/// while let Some(pairs) = disk_driver.next_chunk(256).await? {
452
+
/// for (rkey, record) in pairs {
453
+
/// println!("{rkey}: size={}", record.len());
454
+
/// }
455
+
/// }
456
+
/// let store = disk_driver.reset_store().await?;
457
+
/// # Ok(())
458
+
/// # }
459
+
/// ```
460
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
461
+
let process = self.process;
462
+
463
+
// state should only *ever* be None transiently while inside here
464
+
let mut state = self.state.take().expect("DiskDriver must have Some(state)");
465
+
466
+
// the big pain here is that we don't want to leave self.state in an
467
+
// invalid state (None), so all the error paths have to make sure it
468
+
// comes out again.
469
+
let (state, res) = tokio::task::spawn_blocking(
470
+
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
471
+
let mut reader_res = state.store.get_reader();
472
+
let reader: &mut _ = match reader_res {
473
+
Ok(ref mut r) => r,
474
+
Err(ref mut e) => {
475
+
// unfortunately we can't return the error directly because
476
+
// (for some reason) it's attached to the lifetime of the
477
+
// reader?
478
+
// hack a mem::swap so we can get it out :/
479
+
let e_swapped = e.steal();
480
+
// the pain: `state` *has to* outlive the reader
481
+
drop(reader_res);
482
+
return (state, Err(e_swapped.into()));
483
+
}
484
+
};
485
+
486
+
let mut out = Vec::with_capacity(n);
487
+
488
+
for _ in 0..n {
489
+
// walk as far as we can until we run out of blocks or find a record
490
+
let step = match state.walker.disk_step(reader, process) {
491
+
Ok(s) => s,
492
+
Err(e) => {
493
+
// the pain: `state` *has to* outlive the reader
494
+
drop(reader_res);
495
+
return (state, Err(e.into()));
496
+
}
497
+
};
498
+
match step {
499
+
Step::Missing(cid) => {
500
+
// the pain: `state` *has to* outlive the reader
501
+
drop(reader_res);
502
+
return (state, Err(DriveError::MissingBlock(cid)));
503
+
}
504
+
Step::Finish => break,
505
+
Step::Found { rkey, data } => out.push((rkey, data)),
506
+
};
507
+
}
508
+
509
+
// `state` *has to* outlive the reader
510
+
drop(reader_res);
511
+
512
+
(state, Ok::<_, DriveError>(out))
513
+
},
514
+
)
515
+
.await?; // on tokio JoinError, we'll be left with invalid state :(
516
+
517
+
// *must* restore state before dealing with the actual result
518
+
self.state = Some(state);
519
+
520
+
let out = res?;
521
+
522
+
if out.is_empty() {
523
+
Ok(None)
524
+
} else {
525
+
Ok(Some(out))
526
+
}
527
+
}
528
+
529
+
fn read_tx_blocking(
530
+
&mut self,
531
+
n: usize,
532
+
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
533
+
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
534
+
let BigState { store, walker } = self.state.as_mut().expect("valid state");
535
+
let mut reader = match store.get_reader() {
536
+
Ok(r) => r,
537
+
Err(e) => return tx.blocking_send(Err(e.into())),
538
+
};
539
+
540
loop {
541
+
let mut out: BlockChunk<T> = Vec::with_capacity(n);
542
543
+
for _ in 0..n {
544
+
// walk as far as we can until we run out of blocks or find a record
545
+
546
+
let step = match walker.disk_step(&mut reader, self.process) {
547
+
Ok(s) => s,
548
+
Err(e) => return tx.blocking_send(Err(e.into())),
549
+
};
550
+
551
+
match step {
552
+
Step::Missing(cid) => {
553
+
return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
554
+
}
555
+
Step::Finish => return Ok(()),
556
+
Step::Found { rkey, data } => {
557
+
out.push((rkey, data));
558
+
continue;
559
+
}
560
+
};
561
+
}
562
+
563
+
if out.is_empty() {
564
+
break;
565
+
}
566
+
tx.blocking_send(Ok(out))?;
567
}
568
+
569
+
Ok(())
570
}
571
572
+
/// Spawn the disk reading task into a tokio blocking thread
573
+
///
574
+
/// The idea is to avoid so much sending back and forth to the blocking
575
+
/// thread, letting a blocking task do all the disk reading work and sending
576
+
/// records and rkeys back through an `mpsc` channel instead.
577
+
///
578
+
/// This might also allow the disk work to continue while processing the
579
+
/// records. It's still not yet clear if this method actually has much
580
+
/// benefit over just using `.next_chunk(n)`.
581
+
///
582
+
/// ```no_run
583
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
584
+
/// # #[tokio::main]
585
+
/// # async fn main() -> Result<(), DriveError> {
586
+
/// # let mut disk_driver = _get_fake_disk_driver();
587
+
/// let (mut rx, join) = disk_driver.to_channel(512);
588
+
/// while let Some(recvd) = rx.recv().await {
589
+
/// let pairs = recvd?;
590
+
/// for (rkey, record) in pairs {
591
+
/// println!("{rkey}: size={}", record.len());
592
+
/// }
593
+
///
594
+
/// }
595
+
/// let store = join.await?.reset_store().await?;
596
+
/// # Ok(())
597
+
/// # }
598
+
/// ```
599
+
pub fn to_channel(
600
+
mut self,
601
+
n: usize,
602
+
) -> (
603
+
mpsc::Receiver<Result<BlockChunk<T>, DriveError>>,
604
+
tokio::task::JoinHandle<Self>,
605
+
) {
606
+
let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1);
607
+
608
+
// sketch: this worker is going to be allowed to execute without a join handle
609
+
let chan_task = tokio::task::spawn_blocking(move || {
610
+
if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
611
+
log::debug!("big car reader exited early due to dropped receiver channel");
612
+
}
613
+
self
614
+
});
615
+
616
+
(rx, chan_task)
617
+
}
618
+
619
+
/// Reset the disk storage so it can be reused. You must call this.
620
+
///
621
+
/// Ideally we'd put this in an `impl Drop`, but since it makes blocking
622
+
/// calls, that would be risky in an async context. For now you just have to
623
+
/// carefully make sure you call it.
624
+
///
625
+
/// The sqlite store is returned, so it can be reused for another
626
+
/// `DiskDriver`.
627
+
pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
628
+
let BigState { store, .. } = self.state.take().expect("valid state");
629
+
Ok(store.reset().await?)
630
}
631
}
+85
-9
src/lib.rs
+85
-9
src/lib.rs
···
1
-
//! Fast and robust atproto CAR file processing in rust
2
-
//!
3
-
//! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples)
4
5
-
pub mod disk_drive;
6
-
pub mod disk_redb;
7
-
pub mod disk_sqlite;
8
-
pub mod disk_walk;
9
-
pub mod drive;
10
pub mod mst;
11
-
pub mod walk;
···
1
+
/*!
2
+
A robust CAR file -> MST walker for atproto
3
+
4
+
Small CARs have their blocks buffered in memory. If a configurable memory limit
5
+
is reached while reading blocks, CAR reading is suspended, and can be continued
6
+
by providing disk storage to buffer the CAR blocks instead.
7
+
8
+
A `process` function can be provided for tasks where records are transformed
9
+
into a smaller representation, to save memory (and disk) during block reading.
10
+
11
+
Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
12
+
`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
13
+
14
+
Some MST validations are applied
15
+
- Keys must appear in order
16
+
- Keys must be at the correct MST tree depth
17
+
18
+
`iroh_car` additionally applies a block size limit of `2MiB`.
19
+
20
+
```
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
+
23
+
# #[tokio::main]
24
+
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
+
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
+
let mut total_size = 0;
27
+
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
34
35
+
// if all blocks fit within memory
36
+
Driver::Memory(_commit, mut driver) => {
37
+
while let Some(chunk) = driver.next_chunk(256).await? {
38
+
for (_rkey, size) in chunk {
39
+
total_size += size;
40
+
}
41
+
}
42
+
},
43
+
44
+
// if the CAR was too big for in-memory processing
45
+
Driver::Disk(paused) => {
46
+
// set up a disk store we can spill to
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
48
+
// do the spilling, get back a (similar) driver
49
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
50
+
51
+
while let Some(chunk) = driver.next_chunk(256).await? {
52
+
for (_rkey, size) in chunk {
53
+
total_size += size;
54
+
}
55
+
}
56
+
57
+
// clean up the disk store (drop tables etc)
58
+
driver.reset_store().await?;
59
+
}
60
+
};
61
+
println!("sum of size of all records: {total_size}");
62
+
# Ok(())
63
+
# }
64
+
```
65
+
66
+
Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
67
+
ahead and eagerly using disk I/O. This means you have to write a bit more code
68
+
to handle both cases, but it allows you to have finer control over resource
69
+
usage. For example, you can drive a number of parallel memory CAR workers, and
70
+
separately have a different number of disk workers picking up suspended disk
71
+
tasks from a queue.
72
+
73
+
Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
74
+
75
+
*/
76
+
77
pub mod mst;
78
+
mod walk;
79
+
80
+
pub mod disk;
81
+
pub mod drive;
82
+
pub mod process;
83
+
84
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
85
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
86
+
pub use mst::Commit;
87
+
pub use process::Processable;
+4
-8
src/mst.rs
+4
-8
src/mst.rs
···
39
/// MST node data schema
40
#[derive(Debug, Deserialize, PartialEq)]
41
#[serde(deny_unknown_fields)]
42
-
pub struct Node {
43
/// link to sub-tree Node on a lower level and with all keys sorting before
44
/// keys at this node
45
#[serde(rename = "l")]
···
62
/// so if a block *could be* a node, any record converter must postpone
63
/// processing. if it turns out it happens to be a very node-looking record,
64
/// well, sorry, it just has to only be processed later when that's known.
65
-
pub fn could_be(bytes: impl AsRef<[u8]>) -> bool {
66
const NODE_FINGERPRINT: [u8; 3] = [
67
0xA2, // map length 2 (for "l" and "e" keys)
68
0x61, // text length 1
···
83
/// with an empty array of entries. This is the only situation in which a
84
/// tree may contain an empty leaf node which does not either contain keys
85
/// ("entries") or point to a sub-tree containing entries.
86
-
///
87
-
/// TODO: to me this is slightly unclear with respect to `l` (ask someone).
88
-
/// ...is that what "The top of the tree must not be a an empty node which
89
-
/// only points to a sub-tree." is referring to?
90
-
pub fn is_empty(&self) -> bool {
91
self.left.is_none() && self.entries.is_empty()
92
}
93
}
···
95
/// TreeEntry object
96
#[derive(Debug, Deserialize, PartialEq)]
97
#[serde(deny_unknown_fields)]
98
-
pub struct Entry {
99
/// count of bytes shared with previous TreeEntry in this Node (if any)
100
#[serde(rename = "p")]
101
pub prefix_len: usize,
···
39
/// MST node data schema
40
#[derive(Debug, Deserialize, PartialEq)]
41
#[serde(deny_unknown_fields)]
42
+
pub(crate) struct Node {
43
/// link to sub-tree Node on a lower level and with all keys sorting before
44
/// keys at this node
45
#[serde(rename = "l")]
···
62
/// so if a block *could be* a node, any record converter must postpone
63
/// processing. if it turns out it happens to be a very node-looking record,
64
/// well, sorry, it just has to only be processed later when that's known.
65
+
pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool {
66
const NODE_FINGERPRINT: [u8; 3] = [
67
0xA2, // map length 2 (for "l" and "e" keys)
68
0x61, // text length 1
···
83
/// with an empty array of entries. This is the only situation in which a
84
/// tree may contain an empty leaf node which does not either contain keys
85
/// ("entries") or point to a sub-tree containing entries.
86
+
pub(crate) fn is_empty(&self) -> bool {
87
self.left.is_none() && self.entries.is_empty()
88
}
89
}
···
91
/// TreeEntry object
92
#[derive(Debug, Deserialize, PartialEq)]
93
#[serde(deny_unknown_fields)]
94
+
pub(crate) struct Entry {
95
/// count of bytes shared with previous TreeEntry in this Node (if any)
96
#[serde(rename = "p")]
97
pub prefix_len: usize,
+108
src/process.rs
+108
src/process.rs
···
···
1
+
/*!
2
+
Record processor function output trait
3
+
4
+
The return type must satisfy the `Processable` trait, which requires:
5
+
6
+
- `Clone` because two rkeys can refer to the same record by CID, which may
7
+
only appear once in the CAR file.
8
+
- `Serialize + DeserializeOwned` so it can be spilled to disk.
9
+
10
+
One required function must be implemented, `get_size()`: this should return the
11
+
approximate total off-stack size of the type. (the on-stack size will be added
12
+
automatically via `std::mem::get_size`).
13
+
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
20
+
Here's a silly processing function that just collects 'eyy's found in the raw
21
+
record bytes
22
+
23
+
```
24
+
# use repo_stream::Processable;
25
+
# use serde::{Serialize, Deserialize};
26
+
#[derive(Debug, Clone, Serialize, Deserialize)]
27
+
struct Eyy(usize, String);
28
+
29
+
impl Processable for Eyy {
30
+
fn get_size(&self) -> usize {
31
+
// don't need to compute the usize, it's on the stack
32
+
self.1.capacity() // in-mem size from the string's capacity, in bytes
33
+
}
34
+
}
35
+
36
+
fn process(raw: Vec<u8>) -> Vec<Eyy> {
37
+
let mut out = Vec::new();
38
+
let to_find = "eyy".as_bytes();
39
+
for i in 0..(raw.len() - 3) {
40
+
if &raw[i..(i+3)] == to_find {
41
+
out.push(Eyy(i, "eyy".to_string()));
42
+
}
43
+
}
44
+
out
45
+
}
46
+
```
47
+
48
+
The memory sizing stuff is a little sketch but probably at least approximately
49
+
works.
50
+
*/
51
+
52
+
use serde::{Serialize, de::DeserializeOwned};
53
+
54
+
/// Output trait for record processing
55
+
pub trait Processable: Clone + Serialize + DeserializeOwned {
56
+
/// Any additional in-memory size taken by the processed type
57
+
///
58
+
/// Do not include stack size (`std::mem::size_of`)
59
+
fn get_size(&self) -> usize;
60
+
}
61
+
62
+
/// Processor that just returns the raw blocks
63
+
#[inline]
64
+
pub fn noop(block: Vec<u8>) -> Vec<u8> {
65
+
block
66
+
}
67
+
68
+
impl Processable for u8 {
69
+
fn get_size(&self) -> usize {
70
+
0
71
+
}
72
+
}
73
+
74
+
impl Processable for usize {
75
+
fn get_size(&self) -> usize {
76
+
0 // no additional space taken, just its stack size (newtype is free)
77
+
}
78
+
}
79
+
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
86
+
impl<Item: Sized + Processable> Processable for Vec<Item> {
87
+
fn get_size(&self) -> usize {
88
+
let slot_size = std::mem::size_of::<Item>();
89
+
let direct_size = slot_size * self.capacity();
90
+
let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum();
91
+
direct_size + items_referenced_size
92
+
}
93
+
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+260
-259
src/walk.rs
+260
-259
src/walk.rs
···
1
//! Depth-first MST traversal
2
3
-
use crate::drive::MaybeProcessedBlock;
4
use crate::mst::Node;
5
use ipld_core::cid::Cid;
6
use std::collections::HashMap;
7
-
use std::error::Error;
8
9
/// Errors that can happen while walking
10
#[derive(Debug, thiserror::Error)]
11
-
pub enum Trip<E: Error> {
12
-
#[error("empty mst nodes are not allowed")]
13
-
NodeEmpty,
14
#[error("Failed to decode commit block: {0}")]
15
-
BadCommit(Box<dyn std::error::Error>),
16
#[error("Action node error: {0}")]
17
-
RkeyError(#[from] RkeyError),
18
-
#[error("Process failed: {0}")]
19
-
ProcessFailed(E),
20
-
#[error("Encountered an rkey out of order while walking the MST")]
21
-
RkeyOutOfOrder,
22
}
23
24
/// Errors from invalid Rkeys
25
-
#[derive(Debug, thiserror::Error)]
26
-
pub enum RkeyError {
27
#[error("Failed to compute an rkey due to invalid prefix_len")]
28
EntryPrefixOutOfbounds,
29
#[error("RKey was not utf-8")]
30
EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
31
}
32
33
/// Walker outputs
34
#[derive(Debug)]
35
pub enum Step<T> {
36
-
/// We need a CID but it's not in the block store
37
-
///
38
-
/// Give the needed CID to the driver so it can load blocks until it's found
39
-
Rest(Cid),
40
/// Reached the end of the MST! yay!
41
Finish,
42
/// A record was found!
43
-
Step { rkey: String, data: T },
44
}
45
46
#[derive(Debug, Clone, PartialEq)]
47
enum Need {
48
-
Node(Cid),
49
Record { rkey: String, cid: Cid },
50
}
51
52
-
fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> {
53
-
let mut entries = Vec::with_capacity(node.entries.len());
54
55
let mut prefix = vec![];
56
for entry in &node.entries {
57
let mut rkey = vec![];
58
let pre_checked = prefix
59
.get(..entry.prefix_len)
60
-
.ok_or(RkeyError::EntryPrefixOutOfbounds)?;
61
rkey.extend_from_slice(pre_checked);
62
rkey.extend_from_slice(&entry.keysuffix);
63
prefix = rkey.clone();
64
65
entries.push(Need::Record {
···
67
cid: entry.value,
68
});
69
if let Some(ref tree) = entry.tree {
70
-
entries.push(Need::Node(*tree));
71
}
72
}
73
74
entries.reverse();
75
stack.append(&mut entries);
76
77
if let Some(tree) = node.left {
78
-
stack.push(Need::Node(tree));
79
}
80
Ok(())
81
}
···
92
impl Walker {
93
pub fn new(tree_root_cid: Cid) -> Self {
94
Self {
95
-
stack: vec![Need::Node(tree_root_cid)],
96
prev: "".to_string(),
97
}
98
}
99
100
/// Advance through nodes until we find a record or can't go further
101
-
pub fn step<T: Clone, E: Error>(
102
&mut self,
103
-
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>,
104
-
process: impl Fn(&[u8]) -> Result<T, E>,
105
-
) -> Result<Step<T>, Trip<E>> {
106
loop {
107
-
let Some(mut need) = self.stack.last() else {
108
log::trace!("tried to walk but we're actually done.");
109
return Ok(Step::Finish);
110
};
111
112
-
match &mut need {
113
-
Need::Node(cid) => {
114
log::trace!("need node {cid:?}");
115
-
let Some(block) = blocks.remove(cid) else {
116
log::trace!("node not found, resting");
117
-
return Ok(Step::Rest(*cid));
118
};
119
120
let MaybeProcessedBlock::Raw(data) = block else {
121
-
return Err(Trip::BadCommit("failed commit fingerprint".into()));
122
};
123
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
124
-
.map_err(|e| Trip::BadCommit(e.into()))?;
125
126
// found node, make sure we remember
127
self.stack.pop();
128
129
// queue up work on the found node next
130
-
push_from_node(&mut self.stack, &node)?;
131
}
132
Need::Record { rkey, cid } => {
133
log::trace!("need record {cid:?}");
134
let Some(data) = blocks.get_mut(cid) else {
135
log::trace!("record block not found, resting");
136
-
return Ok(Step::Rest(*cid));
137
};
138
let rkey = rkey.clone();
139
let data = match data {
140
MaybeProcessedBlock::Raw(data) => process(data),
141
-
MaybeProcessedBlock::Processed(Ok(t)) => Ok(t.clone()),
142
-
bad => {
143
-
// big hack to pull the error out -- this corrupts
144
-
// a block, so we should not continue trying to work
145
-
let mut steal = MaybeProcessedBlock::Raw(vec![]);
146
-
std::mem::swap(&mut steal, bad);
147
-
let MaybeProcessedBlock::Processed(Err(e)) = steal else {
148
-
unreachable!();
149
-
};
150
-
return Err(Trip::ProcessFailed(e));
151
-
}
152
};
153
154
// found node, make sure we remember
155
self.stack.pop();
156
157
log::trace!("emitting a block as a step. depth={}", self.stack.len());
158
-
let data = data.map_err(Trip::ProcessFailed)?;
159
160
// rkeys *must* be in order or else the tree is invalid (or
161
// we have a bug)
162
if rkey <= self.prev {
163
-
return Err(Trip::RkeyOutOfOrder);
164
}
165
self.prev = rkey.clone();
166
167
-
return Ok(Step::Step { rkey, data });
168
}
169
}
170
}
···
174
#[cfg(test)]
175
mod test {
176
use super::*;
177
-
// use crate::mst::Entry;
178
179
fn cid1() -> Cid {
180
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
181
.parse()
182
.unwrap()
183
}
184
-
// fn cid2() -> Cid {
185
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
186
-
// .parse()
187
-
// .unwrap()
188
-
// }
189
-
// fn cid3() -> Cid {
190
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
191
-
// .parse()
192
-
// .unwrap()
193
-
// }
194
-
// fn cid4() -> Cid {
195
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
196
-
// .parse()
197
-
// .unwrap()
198
-
// }
199
-
// fn cid5() -> Cid {
200
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
201
-
// .parse()
202
-
// .unwrap()
203
-
// }
204
-
// fn cid6() -> Cid {
205
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
206
-
// .parse()
207
-
// .unwrap()
208
-
// }
209
-
// fn cid7() -> Cid {
210
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
211
-
// .parse()
212
-
// .unwrap()
213
-
// }
214
-
// fn cid8() -> Cid {
215
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
216
-
// .parse()
217
-
// .unwrap()
218
-
// }
219
-
// fn cid9() -> Cid {
220
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
221
-
// .parse()
222
-
// .unwrap()
223
-
// }
224
225
#[test]
226
-
fn test_next_from_node_empty() {
227
-
let node = Node {
228
left: None,
229
entries: vec![],
230
};
231
let mut stack = vec![];
232
-
push_from_node(&mut stack, &node).unwrap();
233
-
assert_eq!(stack.last(), None);
234
}
235
236
#[test]
237
-
fn test_needs_from_node_just_left() {
238
let node = Node {
239
left: Some(cid1()),
240
entries: vec![],
241
};
242
let mut stack = vec![];
243
-
push_from_node(&mut stack, &node).unwrap();
244
-
assert_eq!(stack.last(), Some(Need::Node(cid1())).as_ref());
245
}
246
-
247
-
// #[test]
248
-
// fn test_needs_from_node_just_one_record() {
249
-
// let node = Node {
250
-
// left: None,
251
-
// entries: vec![Entry {
252
-
// keysuffix: "asdf".into(),
253
-
// prefix_len: 0,
254
-
// value: cid1(),
255
-
// tree: None,
256
-
// }],
257
-
// };
258
-
// assert_eq!(
259
-
// needs_from_node(node).unwrap(),
260
-
// vec![Need::Record {
261
-
// rkey: "asdf".into(),
262
-
// cid: cid1(),
263
-
// },]
264
-
// );
265
-
// }
266
-
267
-
// #[test]
268
-
// fn test_needs_from_node_two_records() {
269
-
// let node = Node {
270
-
// left: None,
271
-
// entries: vec![
272
-
// Entry {
273
-
// keysuffix: "asdf".into(),
274
-
// prefix_len: 0,
275
-
// value: cid1(),
276
-
// tree: None,
277
-
// },
278
-
// Entry {
279
-
// keysuffix: "gh".into(),
280
-
// prefix_len: 2,
281
-
// value: cid2(),
282
-
// tree: None,
283
-
// },
284
-
// ],
285
-
// };
286
-
// assert_eq!(
287
-
// needs_from_node(node).unwrap(),
288
-
// vec![
289
-
// Need::Record {
290
-
// rkey: "asdf".into(),
291
-
// cid: cid1(),
292
-
// },
293
-
// Need::Record {
294
-
// rkey: "asgh".into(),
295
-
// cid: cid2(),
296
-
// },
297
-
// ]
298
-
// );
299
-
// }
300
-
301
-
// #[test]
302
-
// fn test_needs_from_node_with_both() {
303
-
// let node = Node {
304
-
// left: None,
305
-
// entries: vec![Entry {
306
-
// keysuffix: "asdf".into(),
307
-
// prefix_len: 0,
308
-
// value: cid1(),
309
-
// tree: Some(cid2()),
310
-
// }],
311
-
// };
312
-
// assert_eq!(
313
-
// needs_from_node(node).unwrap(),
314
-
// vec![
315
-
// Need::Record {
316
-
// rkey: "asdf".into(),
317
-
// cid: cid1(),
318
-
// },
319
-
// Need::Node(cid2()),
320
-
// ]
321
-
// );
322
-
// }
323
-
324
-
// #[test]
325
-
// fn test_needs_from_node_left_and_record() {
326
-
// let node = Node {
327
-
// left: Some(cid1()),
328
-
// entries: vec![Entry {
329
-
// keysuffix: "asdf".into(),
330
-
// prefix_len: 0,
331
-
// value: cid2(),
332
-
// tree: None,
333
-
// }],
334
-
// };
335
-
// assert_eq!(
336
-
// needs_from_node(node).unwrap(),
337
-
// vec![
338
-
// Need::Node(cid1()),
339
-
// Need::Record {
340
-
// rkey: "asdf".into(),
341
-
// cid: cid2(),
342
-
// },
343
-
// ]
344
-
// );
345
-
// }
346
-
347
-
// #[test]
348
-
// fn test_needs_from_full_node() {
349
-
// let node = Node {
350
-
// left: Some(cid1()),
351
-
// entries: vec![
352
-
// Entry {
353
-
// keysuffix: "asdf".into(),
354
-
// prefix_len: 0,
355
-
// value: cid2(),
356
-
// tree: Some(cid3()),
357
-
// },
358
-
// Entry {
359
-
// keysuffix: "ghi".into(),
360
-
// prefix_len: 1,
361
-
// value: cid4(),
362
-
// tree: Some(cid5()),
363
-
// },
364
-
// Entry {
365
-
// keysuffix: "jkl".into(),
366
-
// prefix_len: 2,
367
-
// value: cid6(),
368
-
// tree: Some(cid7()),
369
-
// },
370
-
// Entry {
371
-
// keysuffix: "mno".into(),
372
-
// prefix_len: 4,
373
-
// value: cid8(),
374
-
// tree: Some(cid9()),
375
-
// },
376
-
// ],
377
-
// };
378
-
// assert_eq!(
379
-
// needs_from_node(node).unwrap(),
380
-
// vec![
381
-
// Need::Node(cid1()),
382
-
// Need::Record {
383
-
// rkey: "asdf".into(),
384
-
// cid: cid2(),
385
-
// },
386
-
// Need::Node(cid3()),
387
-
// Need::Record {
388
-
// rkey: "aghi".into(),
389
-
// cid: cid4(),
390
-
// },
391
-
// Need::Node(cid5()),
392
-
// Need::Record {
393
-
// rkey: "agjkl".into(),
394
-
// cid: cid6(),
395
-
// },
396
-
// Need::Node(cid7()),
397
-
// Need::Record {
398
-
// rkey: "agjkmno".into(),
399
-
// cid: cid8(),
400
-
// },
401
-
// Need::Node(cid9()),
402
-
// ]
403
-
// );
404
-
// }
405
}
···
1
//! Depth-first MST traversal
2
3
+
use crate::disk::SqliteReader;
4
+
use crate::drive::{DecodeError, MaybeProcessedBlock};
5
use crate::mst::Node;
6
+
use crate::process::Processable;
7
use ipld_core::cid::Cid;
8
+
use sha2::{Digest, Sha256};
9
use std::collections::HashMap;
10
+
use std::convert::Infallible;
11
12
/// Errors that can happen while walking
13
#[derive(Debug, thiserror::Error)]
14
+
pub enum WalkError {
15
+
#[error("Failed to fingerprint commit block")]
16
+
BadCommitFingerprint,
17
#[error("Failed to decode commit block: {0}")]
18
+
BadCommit(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
19
#[error("Action node error: {0}")]
20
+
MstError(#[from] MstError),
21
+
#[error("storage error: {0}")]
22
+
StorageError(#[from] rusqlite::Error),
23
+
#[error("Decode error: {0}")]
24
+
DecodeError(#[from] DecodeError),
25
}
26
27
/// Errors from invalid Rkeys
28
+
#[derive(Debug, PartialEq, thiserror::Error)]
29
+
pub enum MstError {
30
#[error("Failed to compute an rkey due to invalid prefix_len")]
31
EntryPrefixOutOfbounds,
32
#[error("RKey was not utf-8")]
33
EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
34
+
#[error("Nodes cannot be empty (except for an entirely empty MST)")]
35
+
EmptyNode,
36
+
#[error("Found an entry with rkey at the wrong depth")]
37
+
WrongDepth,
38
+
#[error("Lost track of our depth (possible bug?)")]
39
+
LostDepth,
40
+
#[error("MST depth underflow: depth-0 node with child trees")]
41
+
DepthUnderflow,
42
+
#[error("Encountered an rkey out of order while walking the MST")]
43
+
RkeyOutOfOrder,
44
}
45
46
/// Walker outputs
47
#[derive(Debug)]
48
pub enum Step<T> {
49
+
/// We needed this CID but it's not in the block store
50
+
Missing(Cid),
51
/// Reached the end of the MST! yay!
52
Finish,
53
/// A record was found!
54
+
Found { rkey: String, data: T },
55
}
56
57
#[derive(Debug, Clone, PartialEq)]
58
enum Need {
59
+
Node { depth: Depth, cid: Cid },
60
Record { rkey: String, cid: Cid },
61
}
62
63
+
#[derive(Debug, Clone, Copy, PartialEq)]
64
+
enum Depth {
65
+
Root,
66
+
Depth(u32),
67
+
}
68
69
+
impl Depth {
70
+
fn from_key(key: &[u8]) -> Self {
71
+
let mut zeros = 0;
72
+
for byte in Sha256::digest(key) {
73
+
let leading = byte.leading_zeros();
74
+
zeros += leading;
75
+
if leading < 8 {
76
+
break;
77
+
}
78
+
}
79
+
Self::Depth(zeros / 2) // truncating divide (rounds down)
80
+
}
81
+
fn next_expected(&self) -> Result<Option<u32>, MstError> {
82
+
match self {
83
+
Self::Root => Ok(None),
84
+
Self::Depth(d) => d.checked_sub(1).ok_or(MstError::DepthUnderflow).map(Some),
85
+
}
86
+
}
87
+
}
88
+
89
+
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
+
// empty nodes are not allowed in the MST except in an empty MST
91
+
if node.is_empty() {
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
97
+
}
98
+
99
+
let mut entries = Vec::with_capacity(node.entries.len());
100
let mut prefix = vec![];
101
+
let mut this_depth = parent_depth.next_expected()?;
102
+
103
for entry in &node.entries {
104
let mut rkey = vec![];
105
let pre_checked = prefix
106
.get(..entry.prefix_len)
107
+
.ok_or(MstError::EntryPrefixOutOfbounds)?;
108
rkey.extend_from_slice(pre_checked);
109
rkey.extend_from_slice(&entry.keysuffix);
110
+
111
+
let Depth::Depth(key_depth) = Depth::from_key(&rkey) else {
112
+
return Err(MstError::WrongDepth);
113
+
};
114
+
115
+
// this_depth is `none` if we are the deepest child (directly below root)
116
+
// in that case we accept whatever highest depth is claimed
117
+
let expected_depth = match this_depth {
118
+
Some(d) => d,
119
+
None => {
120
+
this_depth = Some(key_depth);
121
+
key_depth
122
+
}
123
+
};
124
+
125
+
// all keys we find should be this depth
126
+
if key_depth != expected_depth {
127
+
return Err(MstError::DepthUnderflow);
128
+
}
129
+
130
prefix = rkey.clone();
131
132
entries.push(Need::Record {
···
134
cid: entry.value,
135
});
136
if let Some(ref tree) = entry.tree {
137
+
entries.push(Need::Node {
138
+
depth: Depth::Depth(key_depth),
139
+
cid: *tree,
140
+
});
141
}
142
}
143
144
entries.reverse();
145
stack.append(&mut entries);
146
+
147
+
let d = this_depth.ok_or(MstError::LostDepth)?;
148
149
if let Some(tree) = node.left {
150
+
stack.push(Need::Node {
151
+
depth: Depth::Depth(d),
152
+
cid: tree,
153
+
});
154
}
155
Ok(())
156
}
···
167
impl Walker {
168
pub fn new(tree_root_cid: Cid) -> Self {
169
Self {
170
+
stack: vec![Need::Node {
171
+
depth: Depth::Root,
172
+
cid: tree_root_cid,
173
+
}],
174
prev: "".to_string(),
175
}
176
}
177
178
/// Advance through nodes until we find a record or can't go further
179
+
pub fn step<T: Processable>(
180
&mut self,
181
+
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T>>,
182
+
process: impl Fn(Vec<u8>) -> T,
183
+
) -> Result<Step<T>, WalkError> {
184
loop {
185
+
let Some(need) = self.stack.last_mut() else {
186
log::trace!("tried to walk but we're actually done.");
187
return Ok(Step::Finish);
188
};
189
190
+
match need {
191
+
&mut Need::Node { depth, cid } => {
192
log::trace!("need node {cid:?}");
193
+
let Some(block) = blocks.remove(&cid) else {
194
log::trace!("node not found, resting");
195
+
return Ok(Step::Missing(cid));
196
};
197
198
let MaybeProcessedBlock::Raw(data) = block else {
199
+
return Err(WalkError::BadCommitFingerprint);
200
};
201
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
202
+
.map_err(WalkError::BadCommit)?;
203
204
// found node, make sure we remember
205
self.stack.pop();
206
207
// queue up work on the found node next
208
+
push_from_node(&mut self.stack, &node, depth)?;
209
}
210
Need::Record { rkey, cid } => {
211
log::trace!("need record {cid:?}");
212
+
// note that we cannot *remove* a record block, sadly, since
213
+
// there can be multiple rkeys pointing to the same cid.
214
let Some(data) = blocks.get_mut(cid) else {
215
+
return Ok(Step::Missing(*cid));
216
+
};
217
+
let rkey = rkey.clone();
218
+
let data = match data {
219
+
MaybeProcessedBlock::Raw(data) => process(data.to_vec()),
220
+
MaybeProcessedBlock::Processed(t) => t.clone(),
221
+
};
222
+
223
+
// found node, make sure we remember
224
+
self.stack.pop();
225
+
226
+
// rkeys *must* be in order or else the tree is invalid (or
227
+
// we have a bug)
228
+
if rkey <= self.prev {
229
+
return Err(MstError::RkeyOutOfOrder)?;
230
+
}
231
+
self.prev = rkey.clone();
232
+
233
+
return Ok(Step::Found { rkey, data });
234
+
}
235
+
}
236
+
}
237
+
}
238
+
239
+
/// blocking!!!!!!
240
+
pub fn disk_step<T: Processable>(
241
+
&mut self,
242
+
reader: &mut SqliteReader,
243
+
process: impl Fn(Vec<u8>) -> T,
244
+
) -> Result<Step<T>, WalkError> {
245
+
loop {
246
+
let Some(need) = self.stack.last_mut() else {
247
+
log::trace!("tried to walk but we're actually done.");
248
+
return Ok(Step::Finish);
249
+
};
250
+
251
+
match need {
252
+
&mut Need::Node { depth, cid } => {
253
+
let cid_bytes = cid.to_bytes();
254
+
log::trace!("need node {cid:?}");
255
+
let Some(block_bytes) = reader.get(cid_bytes)? else {
256
+
log::trace!("node not found, resting");
257
+
return Ok(Step::Missing(cid));
258
+
};
259
+
260
+
let block: MaybeProcessedBlock<T> = crate::drive::decode(&block_bytes)?;
261
+
262
+
let MaybeProcessedBlock::Raw(data) = block else {
263
+
return Err(WalkError::BadCommitFingerprint);
264
+
};
265
+
let node = serde_ipld_dagcbor::from_slice::<Node>(&data)
266
+
.map_err(WalkError::BadCommit)?;
267
+
268
+
// found node, make sure we remember
269
+
self.stack.pop();
270
+
271
+
// queue up work on the found node next
272
+
push_from_node(&mut self.stack, &node, depth).map_err(WalkError::MstError)?;
273
+
}
274
+
Need::Record { rkey, cid } => {
275
+
log::trace!("need record {cid:?}");
276
+
let cid_bytes = cid.to_bytes();
277
+
let Some(data_bytes) = reader.get(cid_bytes)? else {
278
log::trace!("record block not found, resting");
279
+
return Ok(Step::Missing(*cid));
280
};
281
+
let data: MaybeProcessedBlock<T> = crate::drive::decode(&data_bytes)?;
282
let rkey = rkey.clone();
283
let data = match data {
284
MaybeProcessedBlock::Raw(data) => process(data),
285
+
MaybeProcessedBlock::Processed(t) => t.clone(),
286
};
287
288
// found node, make sure we remember
289
self.stack.pop();
290
291
log::trace!("emitting a block as a step. depth={}", self.stack.len());
292
293
// rkeys *must* be in order or else the tree is invalid (or
294
// we have a bug)
295
if rkey <= self.prev {
296
+
return Err(MstError::RkeyOutOfOrder)?;
297
}
298
self.prev = rkey.clone();
299
300
+
return Ok(Step::Found { rkey, data });
301
}
302
}
303
}
···
307
#[cfg(test)]
308
mod test {
309
use super::*;
310
311
fn cid1() -> Cid {
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
313
.parse()
314
.unwrap()
315
}
316
+
317
+
#[test]
318
+
fn test_depth_spec_0() {
319
+
let d = Depth::from_key(b"2653ae71");
320
+
assert_eq!(d, Depth::Depth(0))
321
+
}
322
+
323
+
#[test]
324
+
fn test_depth_spec_1() {
325
+
let d = Depth::from_key(b"blue");
326
+
assert_eq!(d, Depth::Depth(1))
327
+
}
328
+
329
+
#[test]
330
+
fn test_depth_spec_4() {
331
+
let d = Depth::from_key(b"app.bsky.feed.post/454397e440ec");
332
+
assert_eq!(d, Depth::Depth(4))
333
+
}
334
+
335
+
#[test]
336
+
fn test_depth_spec_8() {
337
+
let d = Depth::from_key(b"app.bsky.feed.post/9adeb165882c");
338
+
assert_eq!(d, Depth::Depth(8))
339
+
}
340
+
341
+
#[test]
342
+
fn test_depth_ietf_draft_0() {
343
+
let d = Depth::from_key(b"key1");
344
+
assert_eq!(d, Depth::Depth(0))
345
+
}
346
+
347
+
#[test]
348
+
fn test_depth_ietf_draft_1() {
349
+
let d = Depth::from_key(b"key7");
350
+
assert_eq!(d, Depth::Depth(1))
351
+
}
352
+
353
+
#[test]
354
+
fn test_depth_ietf_draft_4() {
355
+
let d = Depth::from_key(b"key515");
356
+
assert_eq!(d, Depth::Depth(4))
357
+
}
358
359
#[test]
360
+
fn test_depth_interop() {
361
+
// examples from https://github.com/bluesky-social/atproto-interop-tests/blob/main/mst/key_heights.json
362
+
for (k, expected) in [
363
+
("", 0),
364
+
("asdf", 0),
365
+
("blue", 1),
366
+
("2653ae71", 0),
367
+
("88bfafc7", 2),
368
+
("2a92d355", 4),
369
+
("884976f5", 6),
370
+
("app.bsky.feed.post/454397e440ec", 4),
371
+
("app.bsky.feed.post/9adeb165882c", 8),
372
+
] {
373
+
let d = Depth::from_key(k.as_bytes());
374
+
assert_eq!(d, Depth::Depth(expected), "key: {}", k);
375
+
}
376
+
}
377
+
378
+
#[test]
379
+
fn test_push_empty_fails() {
380
+
let empty_node = Node {
381
left: None,
382
entries: vec![],
383
};
384
let mut stack = vec![];
385
+
let err = push_from_node(&mut stack, &empty_node, Depth::Depth(4));
386
+
assert_eq!(err, Err(MstError::EmptyNode));
387
}
388
389
#[test]
390
+
fn test_push_one_node() {
391
let node = Node {
392
left: Some(cid1()),
393
entries: vec![],
394
};
395
let mut stack = vec![];
396
+
push_from_node(&mut stack, &node, Depth::Depth(4)).unwrap();
397
+
assert_eq!(
398
+
stack.last(),
399
+
Some(Need::Node {
400
+
depth: Depth::Depth(3),
401
+
cid: cid1()
402
+
})
403
+
.as_ref()
404
+
);
405
}
406
}
+34
-31
tests/non-huge-cars.rs
+34
-31
tests/non-huge-cars.rs
···
1
extern crate repo_stream;
2
-
use futures::TryStreamExt;
3
-
use iroh_car::CarReader;
4
-
use std::convert::Infallible;
5
6
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
9
10
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
11
-
let reader = CarReader::new(bytes).await.unwrap();
12
-
13
-
let root = reader
14
-
.header()
15
-
.roots()
16
-
.first()
17
-
.ok_or("missing root")
18
.unwrap()
19
-
.clone();
20
-
21
-
let stream = std::pin::pin!(reader.stream());
22
-
23
-
let (_commit, v) =
24
-
repo_stream::drive::Vehicle::init(root, stream, |block| Ok::<_, Infallible>(block.len()))
25
-
.await
26
-
.unwrap();
27
-
let mut record_stream = std::pin::pin!(v.stream());
28
29
let mut records = 0;
30
let mut sum = 0;
31
let mut found_bsky_profile = false;
32
let mut prev_rkey = "".to_string();
33
-
while let Some((rkey, size)) = record_stream.try_next().await.unwrap() {
34
-
records += 1;
35
-
sum += size;
36
-
if rkey == "app.bsky.actor.profile/self" {
37
-
found_bsky_profile = true;
38
}
39
-
assert!(rkey > prev_rkey, "rkeys are streamed in order");
40
-
prev_rkey = rkey;
41
}
42
assert_eq!(records, expected_records);
43
assert_eq!(sum, expected_sum);
44
-
assert!(found_bsky_profile);
45
}
46
47
#[tokio::test]
48
async fn test_tiny_car() {
49
-
test_car(TINY_CAR, 8, 2071).await
50
}
51
52
#[tokio::test]
53
async fn test_little_car() {
54
-
test_car(LITTLE_CAR, 278, 246960).await
55
}
56
57
#[tokio::test]
58
async fn test_midsize_car() {
59
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
60
}
···
1
extern crate repo_stream;
2
+
use repo_stream::Driver;
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
8
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
15
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
16
+
.await
17
.unwrap()
18
+
{
19
+
Driver::Memory(_commit, mem_driver) => mem_driver,
20
+
Driver::Disk(_) => panic!("too big"),
21
+
};
22
23
let mut records = 0;
24
let mut sum = 0;
25
let mut found_bsky_profile = false;
26
let mut prev_rkey = "".to_string();
27
+
28
+
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
29
+
for (rkey, size) in pairs {
30
+
records += 1;
31
+
sum += size;
32
+
if rkey == "app.bsky.actor.profile/self" {
33
+
found_bsky_profile = true;
34
+
}
35
+
assert!(rkey > prev_rkey, "rkeys are streamed in order");
36
+
prev_rkey = rkey;
37
}
38
}
39
+
40
assert_eq!(records, expected_records);
41
assert_eq!(sum, expected_sum);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
48
}
49
50
#[tokio::test]
51
async fn test_tiny_car() {
52
+
test_car(TINY_CAR, 8, 2071, true).await
53
}
54
55
#[tokio::test]
56
async fn test_little_car() {
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
58
}
59
60
#[tokio::test]
61
async fn test_midsize_car() {
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
63
}