+1
-1
Cargo.lock
+1
-1
Cargo.lock
+2
-2
Cargo.toml
+2
-2
Cargo.toml
···
1
1
[package]
2
2
name = "repo-stream"
3
-
version = "0.1.1"
3
+
version = "0.2.2"
4
4
edition = "2024"
5
5
license = "MIT OR Apache-2.0"
6
-
description = "Fast and robust atproto CAR file processing in rust"
6
+
description = "A robust CAR file -> MST walker for atproto"
7
7
repository = "https://tangled.org/@microcosm.blue/repo-stream"
8
8
9
9
[dependencies]
+4
-16
benches/huge-car.rs
+4
-16
benches/huge-car.rs
···
1
1
extern crate repo_stream;
2
-
use repo_stream::drive::Processable;
3
-
use serde::{Deserialize, Serialize};
2
+
use repo_stream::Driver;
4
3
use std::path::{Path, PathBuf};
5
4
6
5
use criterion::{Criterion, criterion_group, criterion_main};
7
6
8
-
#[derive(Clone, Serialize, Deserialize)]
9
-
struct S(usize);
10
-
11
-
impl Processable for S {
12
-
fn get_size(&self) -> usize {
13
-
0 // no additional space taken, just its stack size (newtype is free)
14
-
}
15
-
}
16
-
17
7
pub fn criterion_benchmark(c: &mut Criterion) {
18
8
let rt = tokio::runtime::Builder::new_multi_thread()
19
9
.enable_all()
···
32
22
let reader = tokio::fs::File::open(filename).await.unwrap();
33
23
let reader = tokio::io::BufReader::new(reader);
34
24
35
-
let mb = 2_usize.pow(20);
36
-
37
-
let mut driver = match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024 * mb)
25
+
let mut driver = match Driver::load_car(reader, |block| block.len(), 1024)
38
26
.await
39
27
.unwrap()
40
28
{
41
-
repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver,
42
-
repo_stream::drive::Vehicle::Big(_) => panic!("not doing disk for benchmark"),
29
+
Driver::Memory(_, mem_driver) => mem_driver,
30
+
Driver::Disk(_) => panic!("not doing disk for benchmark"),
43
31
};
44
32
45
33
let mut n = 0;
+12
-19
benches/non-huge-cars.rs
+12
-19
benches/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
+
use repo_stream::Driver;
2
3
3
4
use criterion::{Criterion, criterion_group, criterion_main};
4
-
use repo_stream::drive::Processable;
5
-
use serde::{Deserialize, Serialize};
6
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
7
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
8
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
9
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
10
10
11
-
#[derive(Clone, Serialize, Deserialize)]
12
-
struct S(usize);
13
-
14
-
impl Processable for S {
15
-
fn get_size(&self) -> usize {
16
-
0 // no additional space taken, just its stack size (newtype is free)
17
-
}
18
-
}
19
-
20
11
pub fn criterion_benchmark(c: &mut Criterion) {
21
12
let rt = tokio::runtime::Builder::new_multi_thread()
22
13
.enable_all()
23
14
.build()
24
15
.expect("Creating runtime failed");
25
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
26
20
c.bench_function("tiny-car", |b| {
27
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
28
22
});
···
35
29
}
36
30
37
31
async fn drive_car(bytes: &[u8]) -> usize {
38
-
let mut driver =
39
-
match repo_stream::drive::load_car(bytes, |block| S(block.len()), 32 * 2_usize.pow(20))
40
-
.await
41
-
.unwrap()
42
-
{
43
-
repo_stream::drive::Vehicle::Lil(_, mem_driver) => mem_driver,
44
-
repo_stream::drive::Vehicle::Big(_) => panic!("not benching big cars here"),
45
-
};
32
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 32)
33
+
.await
34
+
.unwrap()
35
+
{
36
+
Driver::Memory(_, mem_driver) => mem_driver,
37
+
Driver::Disk(_) => panic!("not benching big cars here"),
38
+
};
46
39
47
40
let mut n = 0;
48
41
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+57
-34
examples/disk-read-file/main.rs
+57
-34
examples/disk-read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file by spilling to disk
3
+
*/
4
+
1
5
extern crate repo_stream;
2
6
use clap::Parser;
3
-
use repo_stream::drive::Processable;
4
-
use serde::{Deserialize, Serialize};
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
5
8
use std::path::PathBuf;
6
-
7
-
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
9
+
use std::time::Instant;
8
10
9
11
#[derive(Debug, Parser)]
10
12
struct Args {
···
14
16
tmpfile: PathBuf,
15
17
}
16
18
17
-
#[derive(Clone, Serialize, Deserialize)]
18
-
struct S(usize);
19
-
20
-
impl Processable for S {
21
-
fn get_size(&self) -> usize {
22
-
0 // no additional space taken, just its stack size (newtype is free)
23
-
}
24
-
}
25
-
26
19
#[tokio::main]
27
-
async fn main() -> Result<()> {
20
+
async fn main() -> Result<(), Box<dyn std::error::Error>> {
28
21
env_logger::init();
29
22
30
23
let Args { car, tmpfile } = Args::parse();
24
+
25
+
// repo-stream takes an AsyncRead as input. wrapping a filesystem read in
26
+
// BufReader can provide a really significant performance win.
31
27
let reader = tokio::fs::File::open(car).await?;
32
28
let reader = tokio::io::BufReader::new(reader);
33
29
34
-
// let kb = 2_usize.pow(10);
35
-
let mb = 2_usize.pow(20);
30
+
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
32
+
33
+
// in this example we only bother handling CARs that are too big for memory
34
+
// `noop` helper means: do no block processing, store the raw blocks
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(10) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
40
+
Driver::Memory(_, _) => panic!("try this on a bigger car"),
41
+
Driver::Disk(big_stuff) => {
42
+
// we reach here if the repo was too big and needs to be spilled to
43
+
// disk to continue
36
44
37
-
let limit_mb = 32;
45
+
// set up a disk store we can spill to
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
38
47
39
-
let driver = match repo_stream::drive::load_car(reader, |block| S(block.len()), 10 * mb).await?
40
-
{
41
-
repo_stream::drive::Vehicle::Lil(_, _) => panic!("try this on a bigger car"),
42
-
repo_stream::drive::Vehicle::Big(big_stuff) => {
43
-
let disk_store = repo_stream::disk::SqliteStore::new(tmpfile.clone(), limit_mb);
48
+
// do the spilling, get back a (similar) driver
44
49
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
45
-
log::warn!("big: {:?}", commit);
50
+
51
+
// at this point you might want to fetch the account's signing key
52
+
// via the DID from the commit, and then verify the signature.
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
54
+
55
+
// pop the driver back out to get some code indentation relief
46
56
driver
47
57
}
48
58
};
49
59
60
+
// collect some random stats about the blocks
50
61
let mut n = 0;
51
-
let (mut rx, worker) = driver.rx(512).await?;
62
+
let mut zeros = 0;
63
+
64
+
log::info!("walking...");
65
+
66
+
// this example uses the disk driver's channel mode: the tree walking is
67
+
// spawned onto a blocking thread, and we get chunks of rkey+blocks back
68
+
let (mut rx, join) = driver.to_channel(512);
69
+
while let Some(r) = rx.recv().await {
70
+
let pairs = r?;
52
71
53
-
log::debug!("walking...");
54
-
while let Some(pairs) = rx.recv().await {
72
+
// keep a count of the total number of blocks seen
55
73
n += pairs.len();
74
+
75
+
for (_, block) in pairs {
76
+
// for each block, count how many bytes are equal to '0'
77
+
// (this is just an example, you probably want to do something more
78
+
// interesting)
79
+
zeros += block.into_iter().filter(|&b| b == b'0').count()
80
+
}
56
81
}
57
-
log::debug!("done walking! joining...");
58
-
59
-
worker.await.unwrap().unwrap();
60
82
61
-
log::debug!("joined.");
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
62
84
63
-
// log::info!("now is the time to check mem...");
64
-
// tokio::time::sleep(std::time::Duration::from_secs(22)).await;
65
-
log::info!("bye! {n}");
85
+
// clean up the database. would be nice to do this in drop so it happens
86
+
// automatically, but some blocking work happens, so that's not allowed in
87
+
// async rust. ๐คทโโ๏ธ
88
+
join.await?.reset_store().await?;
66
89
67
-
std::fs::remove_file(tmpfile).unwrap(); // need to also remove -shm -wal
90
+
log::info!("done. n={n} zeros={zeros}");
68
91
69
92
Ok(())
70
93
}
+14
-17
examples/read-file/main.rs
+14
-17
examples/read-file/main.rs
···
1
+
/*!
2
+
Read a CAR file with in-memory processing
3
+
*/
4
+
1
5
extern crate repo_stream;
2
6
use clap::Parser;
3
-
use repo_stream::drive::Processable;
4
-
use serde::{Deserialize, Serialize};
7
+
use repo_stream::{Driver, DriverBuilder};
5
8
use std::path::PathBuf;
6
9
7
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
10
13
struct Args {
11
14
#[arg()]
12
15
file: PathBuf,
13
-
}
14
-
15
-
#[derive(Clone, Serialize, Deserialize)]
16
-
struct S(usize);
17
-
18
-
impl Processable for S {
19
-
fn get_size(&self) -> usize {
20
-
0 // no additional space taken, just its stack size (newtype is free)
21
-
}
22
16
}
23
17
24
18
#[tokio::main]
···
29
23
let reader = tokio::fs::File::open(file).await?;
30
24
let reader = tokio::io::BufReader::new(reader);
31
25
32
-
let (commit, mut driver) =
33
-
match repo_stream::drive::load_car(reader, |block| S(block.len()), 1024 * 1024).await? {
34
-
repo_stream::drive::Vehicle::Lil(commit, mem_driver) => (commit, mem_driver),
35
-
repo_stream::drive::Vehicle::Big(_) => panic!("can't handle big cars yet"),
36
-
};
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
37
34
38
35
log::info!("got commit: {commit:?}");
39
36
···
42
39
n += pairs.len();
43
40
// log::info!("got {rkey:?}");
44
41
}
45
-
log::info!("bye! {n}");
42
+
log::info!("bye! total records={n}");
46
43
47
44
Ok(())
48
45
}
+70
-2
readme.md
+70
-2
readme.md
···
1
1
# repo-stream
2
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
54
+
// clean up the disk store (drop tables etc)
55
+
driver.reset_store().await?;
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
Ok(())
60
+
}
61
+
```
62
+
63
+
more recent todo
64
+
65
+
- [ ] get an *emtpy* car for the test suite
66
+
- [x] implement a max size on disk limit
67
+
68
+
69
+
-----
70
+
71
+
older stuff (to clean up):
4
72
5
73
6
74
current car processing times (records processed into their length usize, phil's dev machine):
···
27
95
-> yeah the commit is returned from init
28
96
- [ ] spec compliance todos
29
97
- [x] assert that keys are ordered and fail if not
30
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
98
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
31
99
- [ ] performance todos
32
100
- [x] consume the serialized nodes into a mutable efficient format
33
101
- [ ] maybe customize the deserialize impl to do that directly?
+163
-40
src/disk.rs
+163
-40
src/disk.rs
···
1
+
/*!
2
+
Disk storage for blocks on disk
3
+
4
+
Currently this uses sqlite. In testing sqlite wasn't the fastest, but it seemed
5
+
to be the best behaved in terms of both on-disk space usage and memory usage.
6
+
7
+
```no_run
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
+
# #[tokio::main]
10
+
# async fn main() -> Result<(), DiskError> {
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
15
+
# Ok(())
16
+
# }
17
+
```
18
+
*/
19
+
1
20
use crate::drive::DriveError;
2
21
use rusqlite::OptionalExtension;
3
22
use std::path::PathBuf;
4
23
5
-
pub struct SqliteStore {
6
-
path: PathBuf,
7
-
limit_mb: usize,
24
+
#[derive(Debug, thiserror::Error)]
25
+
pub enum DiskError {
26
+
/// A wrapped database error
27
+
///
28
+
/// (The wrapped err should probably be obscured to remove public-facing
29
+
/// sqlite bits)
30
+
#[error(transparent)]
31
+
DbError(#[from] rusqlite::Error),
32
+
/// A tokio blocking task failed to join
33
+
#[error("Failed to join a tokio blocking task: {0}")]
34
+
JoinError(#[from] tokio::task::JoinError),
35
+
/// The total size of stored blocks exceeded the allowed size
36
+
///
37
+
/// If you need to process *really* big CARs, you can configure a higher
38
+
/// limit.
39
+
#[error("Maximum disk size reached")]
40
+
MaxSizeExceeded,
41
+
#[error("this error was replaced, seeing this is a bug.")]
42
+
#[doc(hidden)]
43
+
Stolen,
8
44
}
9
45
10
-
impl SqliteStore {
11
-
pub fn new(path: PathBuf, limit_mb: usize) -> Self {
12
-
Self { path, limit_mb }
46
+
impl DiskError {
47
+
/// hack for ownership challenges with the disk driver
48
+
pub(crate) fn steal(&mut self) -> Self {
49
+
let mut swapped = DiskError::Stolen;
50
+
std::mem::swap(self, &mut swapped);
51
+
swapped
13
52
}
14
53
}
15
54
16
-
impl SqliteStore {
17
-
pub async fn get_access(&mut self) -> Result<SqliteAccess, rusqlite::Error> {
18
-
let path = self.path.clone();
19
-
let limit_mb = self.limit_mb;
55
+
/// Builder-style disk store setup
56
+
#[derive(Debug, Clone)]
57
+
pub struct DiskBuilder {
58
+
/// Database in-memory cache allowance
59
+
///
60
+
/// Default: 32 MiB
61
+
pub cache_size_mb: usize,
62
+
/// Database stored block size limit
63
+
///
64
+
/// Default: 10 GiB
65
+
///
66
+
/// Note: actual size on disk may be more, but should approximately scale
67
+
/// with this limit
68
+
pub max_stored_mb: usize,
69
+
}
70
+
71
+
impl Default for DiskBuilder {
72
+
fn default() -> Self {
73
+
Self {
74
+
cache_size_mb: 32,
75
+
max_stored_mb: 10 * 1024, // 10 GiB
76
+
}
77
+
}
78
+
}
79
+
80
+
impl DiskBuilder {
81
+
/// Begin configuring the storage with defaults
82
+
pub fn new() -> Self {
83
+
Default::default()
84
+
}
85
+
/// Set the in-memory cache allowance for the database
86
+
///
87
+
/// Default: 32 MiB
88
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
89
+
self.cache_size_mb = size;
90
+
self
91
+
}
92
+
/// Set the approximate stored block size limit
93
+
///
94
+
/// Default: 10 GiB
95
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
96
+
self.max_stored_mb = max;
97
+
self
98
+
}
99
+
/// Open and initialize the actual disk storage
100
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
101
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
102
+
}
103
+
}
104
+
105
+
/// On-disk block storage
106
+
pub struct DiskStore {
107
+
conn: rusqlite::Connection,
108
+
max_stored: usize,
109
+
stored: usize,
110
+
}
111
+
112
+
impl DiskStore {
113
+
/// Initialize a new disk store
114
+
pub async fn new(
115
+
path: PathBuf,
116
+
cache_mb: usize,
117
+
max_stored_mb: usize,
118
+
) -> Result<Self, DiskError> {
119
+
let max_stored = max_stored_mb * 2_usize.pow(20);
20
120
let conn = tokio::task::spawn_blocking(move || {
21
121
let conn = rusqlite::Connection::open(path)?;
22
122
23
-
let sq_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
123
+
let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
24
124
25
125
// conn.pragma_update(None, "journal_mode", "OFF")?;
26
126
// conn.pragma_update(None, "journal_mode", "MEMORY")?;
27
127
conn.pragma_update(None, "journal_mode", "WAL")?;
28
128
// conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
29
129
conn.pragma_update(None, "synchronous", "OFF")?;
30
-
conn.pragma_update(None, "cache_size", (limit_mb as i64 * sq_mb).to_string())?;
31
-
conn.execute(
32
-
"CREATE TABLE blocks (
33
-
key BLOB PRIMARY KEY NOT NULL,
34
-
val BLOB NOT NULL
35
-
) WITHOUT ROWID",
36
-
(),
130
+
conn.pragma_update(
131
+
None,
132
+
"cache_size",
133
+
(cache_mb as i64 * sqlite_one_mb).to_string(),
37
134
)?;
135
+
Self::reset_tables(&conn)?;
38
136
39
-
Ok::<_, rusqlite::Error>(conn)
137
+
Ok::<_, DiskError>(conn)
40
138
})
41
-
.await
42
-
.expect("join error")?;
139
+
.await??;
43
140
44
-
Ok(SqliteAccess { conn })
141
+
Ok(Self {
142
+
conn,
143
+
max_stored,
144
+
stored: 0,
145
+
})
45
146
}
46
-
}
47
-
48
-
pub struct SqliteAccess {
49
-
conn: rusqlite::Connection,
50
-
}
51
-
52
-
impl SqliteAccess {
53
-
pub fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, rusqlite::Error> {
147
+
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
54
148
let tx = self.conn.transaction()?;
55
-
// let insert_stmt = tx.prepare("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?;
56
-
Ok(SqliteWriter { tx })
149
+
Ok(SqliteWriter {
150
+
tx,
151
+
stored: &mut self.stored,
152
+
max: self.max_stored,
153
+
})
57
154
}
58
-
pub fn get_reader(&'_ self) -> Result<SqliteReader<'_>, rusqlite::Error> {
155
+
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
59
156
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
60
157
Ok(SqliteReader { select_stmt })
61
158
}
159
+
/// Drop and recreate the kv table
160
+
pub async fn reset(self) -> Result<Self, DiskError> {
161
+
tokio::task::spawn_blocking(move || {
162
+
Self::reset_tables(&self.conn)?;
163
+
Ok(self)
164
+
})
165
+
.await?
166
+
}
167
+
fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
168
+
conn.execute("DROP TABLE IF EXISTS blocks", ())?;
169
+
conn.execute(
170
+
"CREATE TABLE blocks (
171
+
key BLOB PRIMARY KEY NOT NULL,
172
+
val BLOB NOT NULL
173
+
) WITHOUT ROWID",
174
+
(),
175
+
)?;
176
+
Ok(())
177
+
}
62
178
}
63
179
64
-
pub struct SqliteWriter<'conn> {
180
+
pub(crate) struct SqliteWriter<'conn> {
65
181
tx: rusqlite::Transaction<'conn>,
182
+
stored: &'conn mut usize,
183
+
max: usize,
66
184
}
67
185
68
186
impl SqliteWriter<'_> {
69
-
pub fn put_many(
187
+
pub(crate) fn put_many(
70
188
&mut self,
71
189
kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
72
190
) -> Result<(), DriveError> {
73
191
let mut insert_stmt = self
74
192
.tx
75
-
.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")?;
193
+
.prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
194
+
.map_err(DiskError::DbError)?;
76
195
for pair in kv {
77
196
let (k, v) = pair?;
78
-
insert_stmt.execute((k, v))?;
197
+
*self.stored += v.len();
198
+
if *self.stored > self.max {
199
+
return Err(DiskError::MaxSizeExceeded.into());
200
+
}
201
+
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
79
202
}
80
203
Ok(())
81
204
}
82
-
pub fn commit(self) -> Result<(), rusqlite::Error> {
205
+
pub fn commit(self) -> Result<(), DiskError> {
83
206
self.tx.commit()?;
84
207
Ok(())
85
208
}
86
209
}
87
210
88
-
pub struct SqliteReader<'conn> {
211
+
pub(crate) struct SqliteReader<'conn> {
89
212
select_stmt: rusqlite::Statement<'conn>,
90
213
}
91
214
92
215
impl SqliteReader<'_> {
93
-
pub fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
216
+
pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
94
217
self.select_stmt
95
218
.query_one((&key,), |row| row.get(0))
96
219
.optional()
+407
-196
src/drive.rs
+407
-196
src/drive.rs
···
1
-
//! Consume an MST block stream, producing an ordered stream of records
1
+
//! Consume a CAR from an AsyncRead, producing an ordered stream of records
2
2
3
-
use crate::disk::{SqliteAccess, SqliteStore};
3
+
use crate::disk::{DiskError, DiskStore};
4
+
use crate::process::Processable;
4
5
use ipld_core::cid::Cid;
5
6
use iroh_car::CarReader;
6
-
use serde::de::DeserializeOwned;
7
7
use serde::{Deserialize, Serialize};
8
8
use std::collections::HashMap;
9
9
use std::convert::Infallible;
10
-
use tokio::io::AsyncRead;
10
+
use tokio::{io::AsyncRead, sync::mpsc};
11
11
12
12
use crate::mst::{Commit, Node};
13
13
use crate::walk::{Step, WalkError, Walker};
···
28
28
#[error("CAR file had no roots")]
29
29
MissingRoot,
30
30
#[error("Storage error")]
31
-
StorageError(#[from] rusqlite::Error),
31
+
StorageError(#[from] DiskError),
32
32
#[error("Encode error: {0}")]
33
33
BincodeEncodeError(#[from] bincode::error::EncodeError),
34
34
#[error("Tried to send on a closed channel")]
···
45
45
ExtraGarbage,
46
46
}
47
47
48
-
pub trait Processable: Clone + Serialize + DeserializeOwned {
49
-
/// the additional size taken up (not including its mem::size_of)
50
-
fn get_size(&self) -> usize;
51
-
}
48
+
/// An in-order chunk of Rkey + (processed) Block pairs
49
+
pub type BlockChunk<T> = Vec<(String, T)>;
52
50
53
51
#[derive(Debug, Clone, Serialize, Deserialize)]
54
-
pub enum MaybeProcessedBlock<T> {
52
+
pub(crate) enum MaybeProcessedBlock<T> {
55
53
/// A block that's *probably* a Node (but we can't know yet)
56
54
///
57
55
/// It *can be* a record that suspiciously looks a lot like a node, so we
···
93
91
}
94
92
}
95
93
96
-
pub enum Vehicle<R: AsyncRead + Unpin, T: Processable> {
97
-
Lil(Commit, MemDriver<T>),
98
-
Big(BigCar<R, T>),
94
+
impl<T> MaybeProcessedBlock<T> {
95
+
fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self {
96
+
if Node::could_be(&data) {
97
+
MaybeProcessedBlock::Raw(data)
98
+
} else {
99
+
MaybeProcessedBlock::Processed(process(data))
100
+
}
101
+
}
99
102
}
100
103
101
-
pub async fn load_car<R: AsyncRead + Unpin, T: Processable>(
102
-
reader: R,
103
-
process: fn(Vec<u8>) -> T,
104
-
max_size: usize,
105
-
) -> Result<Vehicle<R, T>, DriveError> {
106
-
let mut mem_blocks = HashMap::new();
104
+
/// Read a CAR file, buffering blocks in memory or to disk
105
+
pub enum Driver<R: AsyncRead + Unpin, T: Processable> {
106
+
/// All blocks fit within the memory limit
107
+
///
108
+
/// You probably want to check the commit's signature. You can go ahead and
109
+
/// walk the MST right away.
110
+
Memory(Commit, MemDriver<T>),
111
+
/// Blocks exceed the memory limit
112
+
///
113
+
/// You'll need to provide a disk storage to continue. The commit will be
114
+
/// returned and can be validated only once all blocks are loaded.
115
+
Disk(NeedDisk<R, T>),
116
+
}
107
117
108
-
let mut car = CarReader::new(reader).await?;
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
123
+
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
+
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
+
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
109
163
110
-
let root = *car
111
-
.header()
112
-
.roots()
113
-
.first()
114
-
.ok_or(DriveError::MissingRoot)?;
115
-
log::debug!("root: {root:?}");
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
116
172
117
-
let mut commit = None;
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
+
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
188
+
}
118
189
119
-
// try to load all the blocks into memory
120
-
let mut mem_size = 0;
121
-
while let Some((cid, data)) = car.next_block().await? {
122
-
// the root commit is a Special Third Kind of block that we need to make
123
-
// sure not to optimistically send to the processing function
124
-
if cid == root {
125
-
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
126
-
commit = Some(c);
127
-
continue;
190
+
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
191
+
/// Begin processing an atproto MST from a CAR file
192
+
///
193
+
/// Blocks will be loaded, processed, and buffered in memory. If the entire
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
196
+
///
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
198
+
/// partial state will be returned as `Driver::Disk(needed)`, which can be
199
+
/// resumed by providing a `SqliteStorage` for on-disk block storage.
200
+
pub async fn load_car(
201
+
reader: R,
202
+
process: fn(Vec<u8>) -> T,
203
+
mem_limit_mb: usize,
204
+
) -> Result<Driver<R, T>, DriveError> {
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
206
+
let mut mem_blocks = HashMap::new();
207
+
208
+
let mut car = CarReader::new(reader).await?;
209
+
210
+
let root = *car
211
+
.header()
212
+
.roots()
213
+
.first()
214
+
.ok_or(DriveError::MissingRoot)?;
215
+
log::debug!("root: {root:?}");
216
+
217
+
let mut commit = None;
218
+
219
+
// try to load all the blocks into memory
220
+
let mut mem_size = 0;
221
+
while let Some((cid, data)) = car.next_block().await? {
222
+
// the root commit is a Special Third Kind of block that we need to make
223
+
// sure not to optimistically send to the processing function
224
+
if cid == root {
225
+
let c: Commit = serde_ipld_dagcbor::from_slice(&data)?;
226
+
commit = Some(c);
227
+
continue;
228
+
}
229
+
230
+
// remaining possible types: node, record, other. optimistically process
231
+
let maybe_processed = MaybeProcessedBlock::maybe(process, data);
232
+
233
+
// stash (maybe processed) blocks in memory as long as we have room
234
+
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
235
+
mem_blocks.insert(cid, maybe_processed);
236
+
if mem_size >= max_size {
237
+
return Ok(Driver::Disk(NeedDisk {
238
+
car,
239
+
root,
240
+
process,
241
+
max_size,
242
+
mem_blocks,
243
+
commit,
244
+
}));
245
+
}
128
246
}
129
247
130
-
// remaining possible types: node, record, other. optimistically process
131
-
let maybe_processed = if Node::could_be(&data) {
132
-
MaybeProcessedBlock::Raw(data)
133
-
} else {
134
-
MaybeProcessedBlock::Processed(process(data))
135
-
};
248
+
// all blocks loaded and we fit in memory! hopefully we found the commit...
249
+
let commit = commit.ok_or(DriveError::MissingCommit)?;
250
+
251
+
let walker = Walker::new(commit.data);
136
252
137
-
// stash (maybe processed) blocks in memory as long as we have room
138
-
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
139
-
mem_blocks.insert(cid, maybe_processed);
140
-
if mem_size >= max_size {
141
-
return Ok(Vehicle::Big(BigCar {
142
-
car,
143
-
root,
253
+
Ok(Driver::Memory(
254
+
commit,
255
+
MemDriver {
256
+
blocks: mem_blocks,
257
+
walker,
144
258
process,
145
-
max_size,
146
-
mem_blocks,
147
-
commit,
148
-
}));
149
-
}
259
+
},
260
+
))
150
261
}
262
+
}
151
263
152
-
// all blocks loaded and we fit in memory! hopefully we found the commit...
153
-
let commit = commit.ok_or(DriveError::MissingCommit)?;
264
+
/// The core driver between the block stream and MST walker
265
+
///
266
+
/// In the future, PDSs will export CARs in a stream-friendly order that will
267
+
/// enable processing them with tiny memory overhead. But that future is not
268
+
/// here yet.
269
+
///
270
+
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
271
+
/// optimistic stream features: we load all block first, then walk the MST.
272
+
///
273
+
/// This makes things much simpler: we only need to worry about spilling to disk
274
+
/// in one place, and we always have a reasonable expecatation about how much
275
+
/// work the init function will do. We can drop the CAR reader before walking,
276
+
/// so the sync/async boundaries become a little easier to work around.
277
+
#[derive(Debug)]
278
+
pub struct MemDriver<T: Processable> {
279
+
blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
280
+
walker: Walker,
281
+
process: fn(Vec<u8>) -> T,
282
+
}
154
283
155
-
let walker = Walker::new(commit.data);
284
+
impl<T: Processable> MemDriver<T> {
285
+
/// Step through the record outputs, in rkey order
286
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
287
+
let mut out = Vec::with_capacity(n);
288
+
for _ in 0..n {
289
+
// walk as far as we can until we run out of blocks or find a record
290
+
match self.walker.step(&mut self.blocks, self.process)? {
291
+
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
292
+
Step::Finish => break,
293
+
Step::Found { rkey, data } => {
294
+
out.push((rkey, data));
295
+
continue;
296
+
}
297
+
};
298
+
}
156
299
157
-
Ok(Vehicle::Lil(
158
-
commit,
159
-
MemDriver {
160
-
blocks: mem_blocks,
161
-
walker,
162
-
process,
163
-
},
164
-
))
300
+
if out.is_empty() {
301
+
Ok(None)
302
+
} else {
303
+
Ok(Some(out))
304
+
}
305
+
}
165
306
}
166
307
167
-
/// a paritally memory-loaded car file that needs disk spillover to continue
168
-
pub struct BigCar<R: AsyncRead + Unpin, T: Processable> {
308
+
/// A partially memory-loaded car file that needs disk spillover to continue
309
+
pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> {
169
310
car: CarReader<R>,
170
311
root: Cid,
171
312
process: fn(Vec<u8>) -> T,
···
178
319
bincode::serde::encode_to_vec(v, bincode::config::standard())
179
320
}
180
321
181
-
pub fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> {
322
+
pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> {
182
323
let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
183
324
if n != bytes.len() {
184
325
return Err(DecodeError::ExtraGarbage);
···
186
327
Ok(t)
187
328
}
188
329
189
-
impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> BigCar<R, T> {
330
+
impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> {
190
331
pub async fn finish_loading(
191
332
mut self,
192
-
mut store: SqliteStore,
193
-
) -> Result<(Commit, BigCarReady<T>), DriveError> {
194
-
// set up access for real
195
-
let mut access = store.get_access().await?;
196
-
197
-
// move access in and back out so we can manage lifetimes
333
+
mut store: DiskStore,
334
+
) -> Result<(Commit, DiskDriver<T>), DriveError> {
335
+
// move store in and back out so we can manage lifetimes
198
336
// dump mem blocks into the store
199
-
access = tokio::task::spawn(async move {
200
-
let mut writer = access.get_writer()?;
337
+
store = tokio::task::spawn(async move {
338
+
let mut writer = store.get_writer()?;
201
339
202
340
let kvs = self
203
341
.mem_blocks
···
206
344
207
345
writer.put_many(kvs)?;
208
346
writer.commit()?;
209
-
Ok::<_, DriveError>(access)
347
+
Ok::<_, DriveError>(store)
210
348
})
211
349
.await??;
212
350
213
-
let (tx, mut rx) = tokio::sync::mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
351
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
214
352
215
-
let access_worker = tokio::task::spawn_blocking(move || {
216
-
let mut writer = access.get_writer()?;
353
+
let store_worker = tokio::task::spawn_blocking(move || {
354
+
let mut writer = store.get_writer()?;
217
355
218
356
while let Some(chunk) = rx.blocking_recv() {
219
357
let kvs = chunk
···
223
361
}
224
362
225
363
writer.commit()?;
226
-
Ok::<_, DriveError>(access)
364
+
Ok::<_, DriveError>(store)
227
365
}); // await later
228
366
229
367
// dump the rest to disk (in chunks)
···
243
381
}
244
382
// remaining possible types: node, record, other. optimistically process
245
383
// TODO: get the actual in-memory size to compute disk spill
246
-
let maybe_processed = if Node::could_be(&data) {
247
-
MaybeProcessedBlock::Raw(data)
248
-
} else {
249
-
MaybeProcessedBlock::Processed((self.process)(data))
250
-
};
384
+
let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
251
385
mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
252
386
chunk.push((cid, maybe_processed));
253
387
if mem_size >= self.max_size {
···
267
401
drop(tx);
268
402
log::debug!("done. waiting for worker to finish...");
269
403
270
-
access = access_worker.await??;
404
+
store = store_worker.await??;
271
405
272
406
log::debug!("worker finished.");
273
407
···
277
411
278
412
Ok((
279
413
commit,
280
-
BigCarReady {
414
+
DiskDriver {
281
415
process: self.process,
282
-
access,
283
-
walker,
416
+
state: Some(BigState { store, walker }),
284
417
},
285
418
))
286
419
}
287
420
}
288
421
289
-
pub struct BigCarReady<T: Clone> {
290
-
process: fn(Vec<u8>) -> T,
291
-
access: SqliteAccess,
422
+
struct BigState {
423
+
store: DiskStore,
292
424
walker: Walker,
293
425
}
294
426
295
-
impl<T: Processable + Send + 'static> BigCarReady<T> {
296
-
pub async fn next_chunk(
297
-
mut self,
298
-
n: usize,
299
-
) -> Result<(Self, Option<Vec<(String, T)>>), DriveError> {
300
-
let mut out = Vec::with_capacity(n);
301
-
(self, out) = tokio::task::spawn_blocking(move || {
302
-
let access = self.access;
303
-
let mut reader = access.get_reader()?;
427
+
/// MST walker that reads from disk instead of an in-memory hashmap
428
+
pub struct DiskDriver<T: Clone> {
429
+
process: fn(Vec<u8>) -> T,
430
+
state: Option<BigState>,
431
+
}
304
432
305
-
for _ in 0..n {
306
-
// walk as far as we can until we run out of blocks or find a record
307
-
match self.walker.disk_step(&mut reader, self.process)? {
308
-
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
309
-
Step::Finish => break,
310
-
Step::Step { rkey, data } => {
311
-
out.push((rkey, data));
312
-
continue;
433
+
// for doctests only
434
+
#[doc(hidden)]
435
+
pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> {
436
+
use crate::process::noop;
437
+
DiskDriver {
438
+
process: noop,
439
+
state: None,
440
+
}
441
+
}
442
+
443
+
impl<T: Processable + Send + 'static> DiskDriver<T> {
444
+
/// Walk the MST returning up to `n` rkey + record pairs
445
+
///
446
+
/// ```no_run
447
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
448
+
/// # #[tokio::main]
449
+
/// # async fn main() -> Result<(), DriveError> {
450
+
/// # let mut disk_driver = _get_fake_disk_driver();
451
+
/// while let Some(pairs) = disk_driver.next_chunk(256).await? {
452
+
/// for (rkey, record) in pairs {
453
+
/// println!("{rkey}: size={}", record.len());
454
+
/// }
455
+
/// }
456
+
/// let store = disk_driver.reset_store().await?;
457
+
/// # Ok(())
458
+
/// # }
459
+
/// ```
460
+
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
461
+
let process = self.process;
462
+
463
+
// state should only *ever* be None transiently while inside here
464
+
let mut state = self.state.take().expect("DiskDriver must have Some(state)");
465
+
466
+
// the big pain here is that we don't want to leave self.state in an
467
+
// invalid state (None), so all the error paths have to make sure it
468
+
// comes out again.
469
+
let (state, res) = tokio::task::spawn_blocking(
470
+
move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
471
+
let mut reader_res = state.store.get_reader();
472
+
let reader: &mut _ = match reader_res {
473
+
Ok(ref mut r) => r,
474
+
Err(ref mut e) => {
475
+
// unfortunately we can't return the error directly because
476
+
// (for some reason) it's attached to the lifetime of the
477
+
// reader?
478
+
// hack a mem::swap so we can get it out :/
479
+
let e_swapped = e.steal();
480
+
// the pain: `state` *has to* outlive the reader
481
+
drop(reader_res);
482
+
return (state, Err(e_swapped.into()));
313
483
}
314
484
};
315
-
}
485
+
486
+
let mut out = Vec::with_capacity(n);
487
+
488
+
for _ in 0..n {
489
+
// walk as far as we can until we run out of blocks or find a record
490
+
let step = match state.walker.disk_step(reader, process) {
491
+
Ok(s) => s,
492
+
Err(e) => {
493
+
// the pain: `state` *has to* outlive the reader
494
+
drop(reader_res);
495
+
return (state, Err(e.into()));
496
+
}
497
+
};
498
+
match step {
499
+
Step::Missing(cid) => {
500
+
// the pain: `state` *has to* outlive the reader
501
+
drop(reader_res);
502
+
return (state, Err(DriveError::MissingBlock(cid)));
503
+
}
504
+
Step::Finish => break,
505
+
Step::Found { rkey, data } => out.push((rkey, data)),
506
+
};
507
+
}
508
+
509
+
// `state` *has to* outlive the reader
510
+
drop(reader_res);
511
+
512
+
(state, Ok::<_, DriveError>(out))
513
+
},
514
+
)
515
+
.await?; // on tokio JoinError, we'll be left with invalid state :(
516
+
517
+
// *must* restore state before dealing with the actual result
518
+
self.state = Some(state);
316
519
317
-
drop(reader); // cannot outlive access
318
-
self.access = access;
319
-
Ok::<_, DriveError>((self, out))
320
-
})
321
-
.await??;
520
+
let out = res?;
322
521
323
522
if out.is_empty() {
324
-
Ok((self, None))
523
+
Ok(None)
325
524
} else {
326
-
Ok((self, Some(out)))
525
+
Ok(Some(out))
327
526
}
328
527
}
329
528
330
-
pub async fn rx(
331
-
mut self,
529
+
fn read_tx_blocking(
530
+
&mut self,
332
531
n: usize,
333
-
) -> Result<
334
-
(
335
-
tokio::sync::mpsc::Receiver<Vec<(String, T)>>,
336
-
tokio::task::JoinHandle<Result<(), DriveError>>,
337
-
),
338
-
DriveError,
339
-
> {
340
-
let (tx, rx) = tokio::sync::mpsc::channel::<Vec<(String, T)>>(1);
532
+
tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
533
+
) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
534
+
let BigState { store, walker } = self.state.as_mut().expect("valid state");
535
+
let mut reader = match store.get_reader() {
536
+
Ok(r) => r,
537
+
Err(e) => return tx.blocking_send(Err(e.into())),
538
+
};
341
539
342
-
// sketch: this worker is going to be allowed to execute without a join handle
343
-
// ...should we return the join handle here so the caller at least knows about it?
344
-
// yes probably for error handling?? (orrr put errors in the channel)
345
-
let worker = tokio::task::spawn_blocking(move || {
346
-
let mut reader = self.access.get_reader()?;
540
+
loop {
541
+
let mut out: BlockChunk<T> = Vec::with_capacity(n);
347
542
348
-
loop {
349
-
let mut out = Vec::with_capacity(n);
543
+
for _ in 0..n {
544
+
// walk as far as we can until we run out of blocks or find a record
350
545
351
-
for _ in 0..n {
352
-
// walk as far as we can until we run out of blocks or find a record
353
-
match self.walker.disk_step(&mut reader, self.process)? {
354
-
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
355
-
Step::Finish => break,
356
-
Step::Step { rkey, data } => {
357
-
out.push((rkey, data));
358
-
continue;
359
-
}
360
-
};
361
-
}
546
+
let step = match walker.disk_step(&mut reader, self.process) {
547
+
Ok(s) => s,
548
+
Err(e) => return tx.blocking_send(Err(e.into())),
549
+
};
362
550
363
-
if out.is_empty() {
364
-
break;
365
-
}
366
-
tx.blocking_send(out)
367
-
.map_err(|_| DriveError::ChannelSendError)?;
551
+
match step {
552
+
Step::Missing(cid) => {
553
+
return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
554
+
}
555
+
Step::Finish => return Ok(()),
556
+
Step::Found { rkey, data } => {
557
+
out.push((rkey, data));
558
+
continue;
559
+
}
560
+
};
368
561
}
369
562
370
-
drop(reader); // cannot outlive access
371
-
Ok(())
372
-
}); // await later
563
+
if out.is_empty() {
564
+
break;
565
+
}
566
+
tx.blocking_send(Ok(out))?;
567
+
}
373
568
374
-
Ok((rx, worker))
569
+
Ok(())
375
570
}
376
-
}
377
571
378
-
/// The core driver between the block stream and MST walker
379
-
///
380
-
/// In the future, PDSs will export CARs in a stream-friendly order that will
381
-
/// enable processing them with tiny memory overhead. But that future is not
382
-
/// here yet.
383
-
///
384
-
/// CARs are almost always in a stream-unfriendly order, so I'm reverting the
385
-
/// optimistic stream features: we load all block first, then walk the MST.
386
-
///
387
-
/// This makes things much simpler: we only need to worry about spilling to disk
388
-
/// in one place, and we always have a reasonable expecatation about how much
389
-
/// work the init function will do. We can drop the CAR reader before walking,
390
-
/// so the sync/async boundaries become a little easier to work around.
391
-
#[derive(Debug)]
392
-
pub struct MemDriver<T: Processable> {
393
-
blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
394
-
walker: Walker,
395
-
process: fn(Vec<u8>) -> T,
396
-
}
572
+
/// Spawn the disk reading task into a tokio blocking thread
573
+
///
574
+
/// The idea is to avoid so much sending back and forth to the blocking
575
+
/// thread, letting a blocking task do all the disk reading work and sending
576
+
/// records and rkeys back through an `mpsc` channel instead.
577
+
///
578
+
/// This might also allow the disk work to continue while processing the
579
+
/// records. It's still not yet clear if this method actually has much
580
+
/// benefit over just using `.next_chunk(n)`.
581
+
///
582
+
/// ```no_run
583
+
/// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
584
+
/// # #[tokio::main]
585
+
/// # async fn main() -> Result<(), DriveError> {
586
+
/// # let mut disk_driver = _get_fake_disk_driver();
587
+
/// let (mut rx, join) = disk_driver.to_channel(512);
588
+
/// while let Some(recvd) = rx.recv().await {
589
+
/// let pairs = recvd?;
590
+
/// for (rkey, record) in pairs {
591
+
/// println!("{rkey}: size={}", record.len());
592
+
/// }
593
+
///
594
+
/// }
595
+
/// let store = join.await?.reset_store().await?;
596
+
/// # Ok(())
597
+
/// # }
598
+
/// ```
599
+
pub fn to_channel(
600
+
mut self,
601
+
n: usize,
602
+
) -> (
603
+
mpsc::Receiver<Result<BlockChunk<T>, DriveError>>,
604
+
tokio::task::JoinHandle<Self>,
605
+
) {
606
+
let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1);
397
607
398
-
impl<T: Processable> MemDriver<T> {
399
-
/// Manually step through the record outputs
400
-
pub async fn next_chunk(&mut self, n: usize) -> Result<Option<Vec<(String, T)>>, DriveError> {
401
-
let mut out = Vec::with_capacity(n);
402
-
for _ in 0..n {
403
-
// walk as far as we can until we run out of blocks or find a record
404
-
match self.walker.step(&mut self.blocks, self.process)? {
405
-
Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
406
-
Step::Finish => break,
407
-
Step::Step { rkey, data } => {
408
-
out.push((rkey, data));
409
-
continue;
410
-
}
411
-
};
412
-
}
608
+
// sketch: this worker is going to be allowed to execute without a join handle
609
+
let chan_task = tokio::task::spawn_blocking(move || {
610
+
if let Err(mpsc::error::SendError(_)) = self.read_tx_blocking(n, tx) {
611
+
log::debug!("big car reader exited early due to dropped receiver channel");
612
+
}
613
+
self
614
+
});
615
+
616
+
(rx, chan_task)
617
+
}
413
618
414
-
if out.is_empty() {
415
-
Ok(None)
416
-
} else {
417
-
Ok(Some(out))
418
-
}
619
+
/// Reset the disk storage so it can be reused. You must call this.
620
+
///
621
+
/// Ideally we'd put this in an `impl Drop`, but since it makes blocking
622
+
/// calls, that would be risky in an async context. For now you just have to
623
+
/// carefully make sure you call it.
624
+
///
625
+
/// The sqlite store is returned, so it can be reused for another
626
+
/// `DiskDriver`.
627
+
pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
628
+
let BigState { store, .. } = self.state.take().expect("valid state");
629
+
Ok(store.reset().await?)
419
630
}
420
631
}
+84
-5
src/lib.rs
+84
-5
src/lib.rs
···
1
-
//! Fast and robust atproto CAR file processing in rust
2
-
//!
3
-
//! For now see the [examples](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples)
1
+
/*!
2
+
A robust CAR file -> MST walker for atproto
3
+
4
+
Small CARs have their blocks buffered in memory. If a configurable memory limit
5
+
is reached while reading blocks, CAR reading is suspended, and can be continued
6
+
by providing disk storage to buffer the CAR blocks instead.
7
+
8
+
A `process` function can be provided for tasks where records are transformed
9
+
into a smaller representation, to save memory (and disk) during block reading.
10
+
11
+
Once blocks are loaded, the MST is walked and emitted as chunks of pairs of
12
+
`(rkey, processed_block)` pairs, in order (depth first, left-to-right).
13
+
14
+
Some MST validations are applied
15
+
- Keys must appear in order
16
+
- Keys must be at the correct MST tree depth
17
+
18
+
`iroh_car` additionally applies a block size limit of `2MiB`.
19
+
20
+
```
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
+
23
+
# #[tokio::main]
24
+
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
+
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
+
let mut total_size = 0;
27
+
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
34
+
35
+
// if all blocks fit within memory
36
+
Driver::Memory(_commit, mut driver) => {
37
+
while let Some(chunk) = driver.next_chunk(256).await? {
38
+
for (_rkey, size) in chunk {
39
+
total_size += size;
40
+
}
41
+
}
42
+
},
43
+
44
+
// if the CAR was too big for in-memory processing
45
+
Driver::Disk(paused) => {
46
+
// set up a disk store we can spill to
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
48
+
// do the spilling, get back a (similar) driver
49
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
50
+
51
+
while let Some(chunk) = driver.next_chunk(256).await? {
52
+
for (_rkey, size) in chunk {
53
+
total_size += size;
54
+
}
55
+
}
56
+
57
+
// clean up the disk store (drop tables etc)
58
+
driver.reset_store().await?;
59
+
}
60
+
};
61
+
println!("sum of size of all records: {total_size}");
62
+
# Ok(())
63
+
# }
64
+
```
65
+
66
+
Disk spilling suspends and returns a `Driver::Disk(paused)` instead of going
67
+
ahead and eagerly using disk I/O. This means you have to write a bit more code
68
+
to handle both cases, but it allows you to have finer control over resource
69
+
usage. For example, you can drive a number of parallel memory CAR workers, and
70
+
separately have a different number of disk workers picking up suspended disk
71
+
tasks from a queue.
72
+
73
+
Find more [examples in the repo](https://tangled.org/@microcosm.blue/repo-stream/tree/main/examples).
74
+
75
+
*/
76
+
77
+
pub mod mst;
78
+
mod walk;
4
79
5
80
pub mod disk;
6
81
pub mod drive;
7
-
pub mod mst;
8
-
pub mod walk;
82
+
pub mod process;
83
+
84
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
85
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
86
+
pub use mst::Commit;
87
+
pub use process::Processable;
+4
-4
src/mst.rs
+4
-4
src/mst.rs
···
39
39
/// MST node data schema
40
40
#[derive(Debug, Deserialize, PartialEq)]
41
41
#[serde(deny_unknown_fields)]
42
-
pub struct Node {
42
+
pub(crate) struct Node {
43
43
/// link to sub-tree Node on a lower level and with all keys sorting before
44
44
/// keys at this node
45
45
#[serde(rename = "l")]
···
62
62
/// so if a block *could be* a node, any record converter must postpone
63
63
/// processing. if it turns out it happens to be a very node-looking record,
64
64
/// well, sorry, it just has to only be processed later when that's known.
65
-
pub fn could_be(bytes: impl AsRef<[u8]>) -> bool {
65
+
pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool {
66
66
const NODE_FINGERPRINT: [u8; 3] = [
67
67
0xA2, // map length 2 (for "l" and "e" keys)
68
68
0x61, // text length 1
···
83
83
/// with an empty array of entries. This is the only situation in which a
84
84
/// tree may contain an empty leaf node which does not either contain keys
85
85
/// ("entries") or point to a sub-tree containing entries.
86
-
pub fn is_empty(&self) -> bool {
86
+
pub(crate) fn is_empty(&self) -> bool {
87
87
self.left.is_none() && self.entries.is_empty()
88
88
}
89
89
}
···
91
91
/// TreeEntry object
92
92
#[derive(Debug, Deserialize, PartialEq)]
93
93
#[serde(deny_unknown_fields)]
94
-
pub struct Entry {
94
+
pub(crate) struct Entry {
95
95
/// count of bytes shared with previous TreeEntry in this Node (if any)
96
96
#[serde(rename = "p")]
97
97
pub prefix_len: usize,
+108
src/process.rs
+108
src/process.rs
···
1
+
/*!
2
+
Record processor function output trait
3
+
4
+
The return type must satisfy the `Processable` trait, which requires:
5
+
6
+
- `Clone` because two rkeys can refer to the same record by CID, which may
7
+
only appear once in the CAR file.
8
+
- `Serialize + DeserializeOwned` so it can be spilled to disk.
9
+
10
+
One required function must be implemented, `get_size()`: this should return the
11
+
approximate total off-stack size of the type. (the on-stack size will be added
12
+
automatically via `std::mem::get_size`).
13
+
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
20
+
Here's a silly processing function that just collects 'eyy's found in the raw
21
+
record bytes
22
+
23
+
```
24
+
# use repo_stream::Processable;
25
+
# use serde::{Serialize, Deserialize};
26
+
#[derive(Debug, Clone, Serialize, Deserialize)]
27
+
struct Eyy(usize, String);
28
+
29
+
impl Processable for Eyy {
30
+
fn get_size(&self) -> usize {
31
+
// don't need to compute the usize, it's on the stack
32
+
self.1.capacity() // in-mem size from the string's capacity, in bytes
33
+
}
34
+
}
35
+
36
+
fn process(raw: Vec<u8>) -> Vec<Eyy> {
37
+
let mut out = Vec::new();
38
+
let to_find = "eyy".as_bytes();
39
+
for i in 0..(raw.len() - 3) {
40
+
if &raw[i..(i+3)] == to_find {
41
+
out.push(Eyy(i, "eyy".to_string()));
42
+
}
43
+
}
44
+
out
45
+
}
46
+
```
47
+
48
+
The memory sizing stuff is a little sketch but probably at least approximately
49
+
works.
50
+
*/
51
+
52
+
use serde::{Serialize, de::DeserializeOwned};
53
+
54
+
/// Output trait for record processing
55
+
pub trait Processable: Clone + Serialize + DeserializeOwned {
56
+
/// Any additional in-memory size taken by the processed type
57
+
///
58
+
/// Do not include stack size (`std::mem::size_of`)
59
+
fn get_size(&self) -> usize;
60
+
}
61
+
62
+
/// Processor that just returns the raw blocks
63
+
#[inline]
64
+
pub fn noop(block: Vec<u8>) -> Vec<u8> {
65
+
block
66
+
}
67
+
68
+
impl Processable for u8 {
69
+
fn get_size(&self) -> usize {
70
+
0
71
+
}
72
+
}
73
+
74
+
impl Processable for usize {
75
+
fn get_size(&self) -> usize {
76
+
0 // no additional space taken, just its stack size (newtype is free)
77
+
}
78
+
}
79
+
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
86
+
impl<Item: Sized + Processable> Processable for Vec<Item> {
87
+
fn get_size(&self) -> usize {
88
+
let slot_size = std::mem::size_of::<Item>();
89
+
let direct_size = slot_size * self.capacity();
90
+
let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum();
91
+
direct_size + items_referenced_size
92
+
}
93
+
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+11
-207
src/walk.rs
+11
-207
src/walk.rs
···
1
1
//! Depth-first MST traversal
2
2
3
3
use crate::disk::SqliteReader;
4
-
use crate::drive::{DecodeError, MaybeProcessedBlock, Processable};
4
+
use crate::drive::{DecodeError, MaybeProcessedBlock};
5
5
use crate::mst::Node;
6
+
use crate::process::Processable;
6
7
use ipld_core::cid::Cid;
7
8
use sha2::{Digest, Sha256};
8
9
use std::collections::HashMap;
···
50
51
/// Reached the end of the MST! yay!
51
52
Finish,
52
53
/// A record was found!
53
-
Step { rkey: String, data: T },
54
+
Found { rkey: String, data: T },
54
55
}
55
56
56
57
#[derive(Debug, Clone, PartialEq)]
···
86
87
}
87
88
88
89
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
89
-
// empty nodes are not allowed in the MST
90
-
// ...except for a single one for empty MST, but we wouldn't be pushing that
90
+
// empty nodes are not allowed in the MST except in an empty MST
91
91
if node.is_empty() {
92
-
return Err(MstError::EmptyNode);
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
93
97
}
94
98
95
99
let mut entries = Vec::with_capacity(node.entries.len());
···
226
230
}
227
231
self.prev = rkey.clone();
228
232
229
-
return Ok(Step::Step { rkey, data });
233
+
return Ok(Step::Found { rkey, data });
230
234
}
231
235
}
232
236
}
···
293
297
}
294
298
self.prev = rkey.clone();
295
299
296
-
return Ok(Step::Step { rkey, data });
300
+
return Ok(Step::Found { rkey, data });
297
301
}
298
302
}
299
303
}
···
303
307
#[cfg(test)]
304
308
mod test {
305
309
use super::*;
306
-
// use crate::mst::Entry;
307
310
308
311
fn cid1() -> Cid {
309
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
310
313
.parse()
311
314
.unwrap()
312
315
}
313
-
// fn cid2() -> Cid {
314
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
315
-
// .parse()
316
-
// .unwrap()
317
-
// }
318
-
// fn cid3() -> Cid {
319
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
320
-
// .parse()
321
-
// .unwrap()
322
-
// }
323
-
// fn cid4() -> Cid {
324
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
325
-
// .parse()
326
-
// .unwrap()
327
-
// }
328
-
// fn cid5() -> Cid {
329
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
330
-
// .parse()
331
-
// .unwrap()
332
-
// }
333
-
// fn cid6() -> Cid {
334
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
335
-
// .parse()
336
-
// .unwrap()
337
-
// }
338
-
// fn cid7() -> Cid {
339
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
340
-
// .parse()
341
-
// .unwrap()
342
-
// }
343
-
// fn cid8() -> Cid {
344
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
345
-
// .parse()
346
-
// .unwrap()
347
-
// }
348
-
// fn cid9() -> Cid {
349
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
350
-
// .parse()
351
-
// .unwrap()
352
-
// }
353
316
354
317
#[test]
355
318
fn test_depth_spec_0() {
···
440
403
.as_ref()
441
404
);
442
405
}
443
-
444
-
// #[test]
445
-
// fn test_needs_from_node_just_one_record() {
446
-
// let node = Node {
447
-
// left: None,
448
-
// entries: vec![Entry {
449
-
// keysuffix: "asdf".into(),
450
-
// prefix_len: 0,
451
-
// value: cid1(),
452
-
// tree: None,
453
-
// }],
454
-
// };
455
-
// assert_eq!(
456
-
// needs_from_node(node).unwrap(),
457
-
// vec![Need::Record {
458
-
// rkey: "asdf".into(),
459
-
// cid: cid1(),
460
-
// },]
461
-
// );
462
-
// }
463
-
464
-
// #[test]
465
-
// fn test_needs_from_node_two_records() {
466
-
// let node = Node {
467
-
// left: None,
468
-
// entries: vec![
469
-
// Entry {
470
-
// keysuffix: "asdf".into(),
471
-
// prefix_len: 0,
472
-
// value: cid1(),
473
-
// tree: None,
474
-
// },
475
-
// Entry {
476
-
// keysuffix: "gh".into(),
477
-
// prefix_len: 2,
478
-
// value: cid2(),
479
-
// tree: None,
480
-
// },
481
-
// ],
482
-
// };
483
-
// assert_eq!(
484
-
// needs_from_node(node).unwrap(),
485
-
// vec![
486
-
// Need::Record {
487
-
// rkey: "asdf".into(),
488
-
// cid: cid1(),
489
-
// },
490
-
// Need::Record {
491
-
// rkey: "asgh".into(),
492
-
// cid: cid2(),
493
-
// },
494
-
// ]
495
-
// );
496
-
// }
497
-
498
-
// #[test]
499
-
// fn test_needs_from_node_with_both() {
500
-
// let node = Node {
501
-
// left: None,
502
-
// entries: vec![Entry {
503
-
// keysuffix: "asdf".into(),
504
-
// prefix_len: 0,
505
-
// value: cid1(),
506
-
// tree: Some(cid2()),
507
-
// }],
508
-
// };
509
-
// assert_eq!(
510
-
// needs_from_node(node).unwrap(),
511
-
// vec![
512
-
// Need::Record {
513
-
// rkey: "asdf".into(),
514
-
// cid: cid1(),
515
-
// },
516
-
// Need::Node(cid2()),
517
-
// ]
518
-
// );
519
-
// }
520
-
521
-
// #[test]
522
-
// fn test_needs_from_node_left_and_record() {
523
-
// let node = Node {
524
-
// left: Some(cid1()),
525
-
// entries: vec![Entry {
526
-
// keysuffix: "asdf".into(),
527
-
// prefix_len: 0,
528
-
// value: cid2(),
529
-
// tree: None,
530
-
// }],
531
-
// };
532
-
// assert_eq!(
533
-
// needs_from_node(node).unwrap(),
534
-
// vec![
535
-
// Need::Node(cid1()),
536
-
// Need::Record {
537
-
// rkey: "asdf".into(),
538
-
// cid: cid2(),
539
-
// },
540
-
// ]
541
-
// );
542
-
// }
543
-
544
-
// #[test]
545
-
// fn test_needs_from_full_node() {
546
-
// let node = Node {
547
-
// left: Some(cid1()),
548
-
// entries: vec![
549
-
// Entry {
550
-
// keysuffix: "asdf".into(),
551
-
// prefix_len: 0,
552
-
// value: cid2(),
553
-
// tree: Some(cid3()),
554
-
// },
555
-
// Entry {
556
-
// keysuffix: "ghi".into(),
557
-
// prefix_len: 1,
558
-
// value: cid4(),
559
-
// tree: Some(cid5()),
560
-
// },
561
-
// Entry {
562
-
// keysuffix: "jkl".into(),
563
-
// prefix_len: 2,
564
-
// value: cid6(),
565
-
// tree: Some(cid7()),
566
-
// },
567
-
// Entry {
568
-
// keysuffix: "mno".into(),
569
-
// prefix_len: 4,
570
-
// value: cid8(),
571
-
// tree: Some(cid9()),
572
-
// },
573
-
// ],
574
-
// };
575
-
// assert_eq!(
576
-
// needs_from_node(node).unwrap(),
577
-
// vec![
578
-
// Need::Node(cid1()),
579
-
// Need::Record {
580
-
// rkey: "asdf".into(),
581
-
// cid: cid2(),
582
-
// },
583
-
// Need::Node(cid3()),
584
-
// Need::Record {
585
-
// rkey: "aghi".into(),
586
-
// cid: cid4(),
587
-
// },
588
-
// Need::Node(cid5()),
589
-
// Need::Record {
590
-
// rkey: "agjkl".into(),
591
-
// cid: cid6(),
592
-
// },
593
-
// Need::Node(cid7()),
594
-
// Need::Record {
595
-
// rkey: "agjkmno".into(),
596
-
// cid: cid8(),
597
-
// },
598
-
// Need::Node(cid9()),
599
-
// ]
600
-
// );
601
-
// }
602
406
}
+21
-22
tests/non-huge-cars.rs
+21
-22
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
-
use repo_stream::drive::Processable;
3
-
use serde::{Deserialize, Serialize};
2
+
use repo_stream::Driver;
4
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
5
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
6
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
7
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
8
8
9
-
#[derive(Clone, Serialize, Deserialize)]
10
-
struct S(usize);
11
-
12
-
impl Processable for S {
13
-
fn get_size(&self) -> usize {
14
-
0 // no additional space taken, just its stack size (newtype is free)
15
-
}
16
-
}
17
-
18
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
19
-
let mb = 2_usize.pow(20);
20
-
21
-
let mut driver = match repo_stream::drive::load_car(bytes, |block| S(block.len()), 10 * mb)
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
15
+
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
22
16
.await
23
17
.unwrap()
24
18
{
25
-
repo_stream::drive::Vehicle::Lil(_commit, mem_driver) => mem_driver,
26
-
repo_stream::drive::Vehicle::Big(_) => panic!("too big"),
19
+
Driver::Memory(_commit, mem_driver) => mem_driver,
20
+
Driver::Disk(_) => panic!("too big"),
27
21
};
28
22
29
23
let mut records = 0;
···
32
26
let mut prev_rkey = "".to_string();
33
27
34
28
while let Some(pairs) = driver.next_chunk(256).await.unwrap() {
35
-
for (rkey, S(size)) in pairs {
29
+
for (rkey, size) in pairs {
36
30
records += 1;
37
31
sum += size;
38
32
if rkey == "app.bsky.actor.profile/self" {
···
45
39
46
40
assert_eq!(records, expected_records);
47
41
assert_eq!(sum, expected_sum);
48
-
assert!(found_bsky_profile);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
49
48
}
50
49
51
50
#[tokio::test]
52
51
async fn test_tiny_car() {
53
-
test_car(TINY_CAR, 8, 2071).await
52
+
test_car(TINY_CAR, 8, 2071, true).await
54
53
}
55
54
56
55
#[tokio::test]
57
56
async fn test_little_car() {
58
-
test_car(LITTLE_CAR, 278, 246960).await
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
59
58
}
60
59
61
60
#[tokio::test]
62
61
async fn test_midsize_car() {
63
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
64
63
}