+1
-1
Cargo.lock
+1
-1
Cargo.lock
+2
-2
Cargo.toml
+2
-2
Cargo.toml
···
1
1
[package]
2
2
name = "repo-stream"
3
-
version = "0.1.1"
3
+
version = "0.2.2"
4
4
edition = "2024"
5
5
license = "MIT OR Apache-2.0"
6
-
description = "Fast and robust atproto CAR file processing in rust"
6
+
description = "A robust CAR file -> MST walker for atproto"
7
7
repository = "https://tangled.org/@microcosm.blue/repo-stream"
8
8
9
9
[dependencies]
+4
benches/non-huge-cars.rs
+4
benches/non-huge-cars.rs
···
3
3
4
4
use criterion::{Criterion, criterion_group, criterion_main};
5
5
6
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
6
7
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
7
8
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
8
9
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
···
13
14
.build()
14
15
.expect("Creating runtime failed");
15
16
17
+
c.bench_function("empty-car", |b| {
18
+
b.to_async(&rt).iter(async || drive_car(EMPTY_CAR).await)
19
+
});
16
20
c.bench_function("tiny-car", |b| {
17
21
b.to_async(&rt).iter(async || drive_car(TINY_CAR).await)
18
22
});
car-samples/empty.car
car-samples/empty.car
This is a binary file and will not be displayed.
+11
-12
examples/disk-read-file/main.rs
+11
-12
examples/disk-read-file/main.rs
···
4
4
5
5
extern crate repo_stream;
6
6
use clap::Parser;
7
-
use repo_stream::{DiskStore, Driver, process::noop};
7
+
use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8
8
use std::path::PathBuf;
9
+
use std::time::Instant;
9
10
10
11
#[derive(Debug, Parser)]
11
12
struct Args {
···
26
27
let reader = tokio::fs::File::open(car).await?;
27
28
let reader = tokio::io::BufReader::new(reader);
28
29
29
-
// configure how much memory can be used before spilling to disk.
30
-
// real memory usage may differ somewhat.
31
-
let in_mem_limit = 10; // MiB
32
-
33
-
// configure how much memory sqlite is allowed to use when dumping to disk
34
-
let db_cache_mb = 32; // MiB
35
-
36
30
log::info!("hello! reading the car...");
31
+
let t0 = Instant::now();
37
32
38
33
// in this example we only bother handling CARs that are too big for memory
39
34
// `noop` helper means: do no block processing, store the raw blocks
40
-
let driver = match Driver::load_car(reader, noop, in_mem_limit).await? {
35
+
let driver = match DriverBuilder::new()
36
+
.with_mem_limit_mb(10) // how much memory can be used before disk spill
37
+
.load_car(reader)
38
+
.await?
39
+
{
41
40
Driver::Memory(_, _) => panic!("try this on a bigger car"),
42
41
Driver::Disk(big_stuff) => {
43
42
// we reach here if the repo was too big and needs to be spilled to
44
43
// disk to continue
45
44
46
45
// set up a disk store we can spill to
47
-
let disk_store = DiskStore::new(tmpfile.clone(), db_cache_mb).await?;
46
+
let disk_store = DiskBuilder::new().open(tmpfile).await?;
48
47
49
48
// do the spilling, get back a (similar) driver
50
49
let (commit, driver) = big_stuff.finish_loading(disk_store).await?;
51
50
52
51
// at this point you might want to fetch the account's signing key
53
52
// via the DID from the commit, and then verify the signature.
54
-
log::warn!("big's comit: {:?}", commit);
53
+
log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
55
54
56
55
// pop the driver back out to get some code indentation relief
57
56
driver
···
81
80
}
82
81
}
83
82
84
-
log::info!("arrived! joining rx...");
83
+
log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
85
84
86
85
// clean up the database. would be nice to do this in drop so it happens
87
86
// automatically, but some blocking work happens, so that's not allowed in
+9
-6
examples/read-file/main.rs
+9
-6
examples/read-file/main.rs
···
4
4
5
5
extern crate repo_stream;
6
6
use clap::Parser;
7
-
use repo_stream::Driver;
7
+
use repo_stream::{Driver, DriverBuilder};
8
8
use std::path::PathBuf;
9
9
10
10
type Result<T> = std::result::Result<T, Box<dyn std::error::Error>>;
···
23
23
let reader = tokio::fs::File::open(file).await?;
24
24
let reader = tokio::io::BufReader::new(reader);
25
25
26
-
let (commit, mut driver) =
27
-
match Driver::load_car(reader, |block| block.len(), 16 /* MiB */).await? {
28
-
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
29
-
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
30
-
};
26
+
let (commit, mut driver) = match DriverBuilder::new()
27
+
.with_block_processor(|block| block.len())
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
Driver::Memory(commit, mem_driver) => (commit, mem_driver),
32
+
Driver::Disk(_) => panic!("this example doesn't handle big CARs"),
33
+
};
31
34
32
35
log::info!("got commit: {commit:?}");
33
36
+70
-2
readme.md
+70
-2
readme.md
···
1
1
# repo-stream
2
2
3
-
Fast and (aspirationally) robust atproto CAR file processing in rust
3
+
A robust CAR file -> MST walker for atproto
4
+
5
+
[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6
+
[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7
+
[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
8
+
9
+
[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10
+
[docs-badge]: https://docs.rs/repo-stream/badge.svg
11
+
[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
12
+
13
+
```rust
14
+
use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder};
15
+
16
+
#[tokio::main]
17
+
async fn main() -> Result<(), DriveError> {
18
+
// repo-stream takes any AsyncRead as input, like a tokio::fs::File
19
+
let reader = tokio::fs::File::open("repo.car".into()).await?;
20
+
let reader = tokio::io::BufReader::new(reader);
21
+
22
+
// example repo workload is simply counting the total record bytes
23
+
let mut total_size = 0;
24
+
25
+
match DriverBuilder::new()
26
+
.with_mem_limit_mb(10)
27
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
28
+
.load_car(reader)
29
+
.await?
30
+
{
31
+
32
+
// if all blocks fit within memory
33
+
Driver::Memory(_commit, mut driver) => {
34
+
while let Some(chunk) = driver.next_chunk(256).await? {
35
+
for (_rkey, size) in chunk {
36
+
total_size += size;
37
+
}
38
+
}
39
+
},
40
+
41
+
// if the CAR was too big for in-memory processing
42
+
Driver::Disk(paused) => {
43
+
// set up a disk store we can spill to
44
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
45
+
// do the spilling, get back a (similar) driver
46
+
let (_commit, mut driver) = paused.finish_loading(store).await?;
47
+
48
+
while let Some(chunk) = driver.next_chunk(256).await? {
49
+
for (_rkey, size) in chunk {
50
+
total_size += size;
51
+
}
52
+
}
53
+
54
+
// clean up the disk store (drop tables etc)
55
+
driver.reset_store().await?;
56
+
}
57
+
};
58
+
println!("sum of size of all records: {total_size}");
59
+
Ok(())
60
+
}
61
+
```
62
+
63
+
more recent todo
64
+
65
+
- [ ] get an *emtpy* car for the test suite
66
+
- [x] implement a max size on disk limit
67
+
68
+
69
+
-----
70
+
71
+
older stuff (to clean up):
4
72
5
73
6
74
current car processing times (records processed into their length usize, phil's dev machine):
···
27
95
-> yeah the commit is returned from init
28
96
- [ ] spec compliance todos
29
97
- [x] assert that keys are ordered and fail if not
30
-
- [ ] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
98
+
- [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
31
99
- [ ] performance todos
32
100
- [x] consume the serialized nodes into a mutable efficient format
33
101
- [ ] maybe customize the deserialize impl to do that directly?
+85
-6
src/disk.rs
+85
-6
src/disk.rs
···
5
5
to be the best behaved in terms of both on-disk space usage and memory usage.
6
6
7
7
```no_run
8
-
# use repo_stream::{DiskStore, DiskError};
8
+
# use repo_stream::{DiskBuilder, DiskError};
9
9
# #[tokio::main]
10
10
# async fn main() -> Result<(), DiskError> {
11
-
let db_cache_size = 32; // MiB
12
-
let store = DiskStore::new("/some/path.db".into(), db_cache_size).await?;
11
+
let store = DiskBuilder::new()
12
+
.with_cache_size_mb(32)
13
+
.with_max_stored_mb(1024) // errors when >1GiB of processed blocks are inserted
14
+
.open("/some/path.db".into()).await?;
13
15
# Ok(())
14
16
# }
15
17
```
···
30
32
/// A tokio blocking task failed to join
31
33
#[error("Failed to join a tokio blocking task: {0}")]
32
34
JoinError(#[from] tokio::task::JoinError),
35
+
/// The total size of stored blocks exceeded the allowed size
36
+
///
37
+
/// If you need to process *really* big CARs, you can configure a higher
38
+
/// limit.
39
+
#[error("Maximum disk size reached")]
40
+
MaxSizeExceeded,
33
41
#[error("this error was replaced, seeing this is a bug.")]
34
42
#[doc(hidden)]
35
43
Stolen,
···
44
52
}
45
53
}
46
54
55
+
/// Builder-style disk store setup
56
+
#[derive(Debug, Clone)]
57
+
pub struct DiskBuilder {
58
+
/// Database in-memory cache allowance
59
+
///
60
+
/// Default: 32 MiB
61
+
pub cache_size_mb: usize,
62
+
/// Database stored block size limit
63
+
///
64
+
/// Default: 10 GiB
65
+
///
66
+
/// Note: actual size on disk may be more, but should approximately scale
67
+
/// with this limit
68
+
pub max_stored_mb: usize,
69
+
}
70
+
71
+
impl Default for DiskBuilder {
72
+
fn default() -> Self {
73
+
Self {
74
+
cache_size_mb: 32,
75
+
max_stored_mb: 10 * 1024, // 10 GiB
76
+
}
77
+
}
78
+
}
79
+
80
+
impl DiskBuilder {
81
+
/// Begin configuring the storage with defaults
82
+
pub fn new() -> Self {
83
+
Default::default()
84
+
}
85
+
/// Set the in-memory cache allowance for the database
86
+
///
87
+
/// Default: 32 MiB
88
+
pub fn with_cache_size_mb(mut self, size: usize) -> Self {
89
+
self.cache_size_mb = size;
90
+
self
91
+
}
92
+
/// Set the approximate stored block size limit
93
+
///
94
+
/// Default: 10 GiB
95
+
pub fn with_max_stored_mb(mut self, max: usize) -> Self {
96
+
self.max_stored_mb = max;
97
+
self
98
+
}
99
+
/// Open and initialize the actual disk storage
100
+
pub async fn open(&self, path: PathBuf) -> Result<DiskStore, DiskError> {
101
+
DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
102
+
}
103
+
}
104
+
47
105
/// On-disk block storage
48
106
pub struct DiskStore {
49
107
conn: rusqlite::Connection,
108
+
max_stored: usize,
109
+
stored: usize,
50
110
}
51
111
52
112
impl DiskStore {
53
113
/// Initialize a new disk store
54
-
pub async fn new(path: PathBuf, cache_mb: usize) -> Result<Self, DiskError> {
114
+
pub async fn new(
115
+
path: PathBuf,
116
+
cache_mb: usize,
117
+
max_stored_mb: usize,
118
+
) -> Result<Self, DiskError> {
119
+
let max_stored = max_stored_mb * 2_usize.pow(20);
55
120
let conn = tokio::task::spawn_blocking(move || {
56
121
let conn = rusqlite::Connection::open(path)?;
57
122
···
73
138
})
74
139
.await??;
75
140
76
-
Ok(Self { conn })
141
+
Ok(Self {
142
+
conn,
143
+
max_stored,
144
+
stored: 0,
145
+
})
77
146
}
78
147
pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
79
148
let tx = self.conn.transaction()?;
80
-
Ok(SqliteWriter { tx })
149
+
Ok(SqliteWriter {
150
+
tx,
151
+
stored: &mut self.stored,
152
+
max: self.max_stored,
153
+
})
81
154
}
82
155
pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
83
156
let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
···
106
179
107
180
pub(crate) struct SqliteWriter<'conn> {
108
181
tx: rusqlite::Transaction<'conn>,
182
+
stored: &'conn mut usize,
183
+
max: usize,
109
184
}
110
185
111
186
impl SqliteWriter<'_> {
···
119
194
.map_err(DiskError::DbError)?;
120
195
for pair in kv {
121
196
let (k, v) = pair?;
197
+
*self.stored += v.len();
198
+
if *self.stored > self.max {
199
+
return Err(DiskError::MaxSizeExceeded.into());
200
+
}
122
201
insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
123
202
}
124
203
Ok(())
+78
-6
src/drive.rs
+78
-6
src/drive.rs
···
115
115
Disk(NeedDisk<R, T>),
116
116
}
117
117
118
+
/// Builder-style driver setup
119
+
#[derive(Debug, Clone)]
120
+
pub struct DriverBuilder {
121
+
pub mem_limit_mb: usize,
122
+
}
123
+
124
+
impl Default for DriverBuilder {
125
+
fn default() -> Self {
126
+
Self { mem_limit_mb: 16 }
127
+
}
128
+
}
129
+
130
+
impl DriverBuilder {
131
+
/// Begin configuring the driver with defaults
132
+
pub fn new() -> Self {
133
+
Default::default()
134
+
}
135
+
/// Set the in-memory size limit, in MiB
136
+
///
137
+
/// Default: 16 MiB
138
+
pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
139
+
Self {
140
+
mem_limit_mb: new_limit,
141
+
}
142
+
}
143
+
/// Set the block processor
144
+
///
145
+
/// Default: noop, raw blocks will be emitted
146
+
pub fn with_block_processor<T: Processable>(
147
+
self,
148
+
p: fn(Vec<u8>) -> T,
149
+
) -> DriverBuilderWithProcessor<T> {
150
+
DriverBuilderWithProcessor {
151
+
mem_limit_mb: self.mem_limit_mb,
152
+
block_processor: p,
153
+
}
154
+
}
155
+
/// Begin processing an atproto MST from a CAR file
156
+
pub async fn load_car<R: AsyncRead + Unpin>(
157
+
&self,
158
+
reader: R,
159
+
) -> Result<Driver<R, Vec<u8>>, DriveError> {
160
+
Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
161
+
}
162
+
}
163
+
164
+
/// Builder-style driver intermediate step
165
+
///
166
+
/// start from `DriverBuilder`
167
+
#[derive(Debug, Clone)]
168
+
pub struct DriverBuilderWithProcessor<T: Processable> {
169
+
pub mem_limit_mb: usize,
170
+
pub block_processor: fn(Vec<u8>) -> T,
171
+
}
172
+
173
+
impl<T: Processable> DriverBuilderWithProcessor<T> {
174
+
/// Set the in-memory size limit, in MiB
175
+
///
176
+
/// Default: 16 MiB
177
+
pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
178
+
self.mem_limit_mb = new_limit;
179
+
self
180
+
}
181
+
/// Begin processing an atproto MST from a CAR file
182
+
pub async fn load_car<R: AsyncRead + Unpin>(
183
+
&self,
184
+
reader: R,
185
+
) -> Result<Driver<R, T>, DriveError> {
186
+
Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
187
+
}
188
+
}
189
+
118
190
impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
119
191
/// Begin processing an atproto MST from a CAR file
120
192
///
121
193
/// Blocks will be loaded, processed, and buffered in memory. If the entire
122
-
/// processed size is under the `max_size_mb` limit, a `Driver::Memory` will
123
-
/// be returned along with a `Commit` ready for validation.
194
+
/// processed size is under the `mem_limit_mb` limit, a `Driver::Memory`
195
+
/// will be returned along with a `Commit` ready for validation.
124
196
///
125
-
/// If the `max_size_mb` limit is reached before loading all blocks, the
197
+
/// If the `mem_limit_mb` limit is reached before loading all blocks, the
126
198
/// partial state will be returned as `Driver::Disk(needed)`, which can be
127
199
/// resumed by providing a `SqliteStorage` for on-disk block storage.
128
200
pub async fn load_car(
129
201
reader: R,
130
202
process: fn(Vec<u8>) -> T,
131
-
max_size_mb: usize,
203
+
mem_limit_mb: usize,
132
204
) -> Result<Driver<R, T>, DriveError> {
133
-
let max_size = max_size_mb * 2_usize.pow(20);
205
+
let max_size = mem_limit_mb * 2_usize.pow(20);
134
206
let mut mem_blocks = HashMap::new();
135
207
136
208
let mut car = CarReader::new(reader).await?;
···
276
348
})
277
349
.await??;
278
350
279
-
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
351
+
let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(1);
280
352
281
353
let store_worker = tokio::task::spawn_blocking(move || {
282
354
let mut writer = store.get_writer()?;
+10
-8
src/lib.rs
+10
-8
src/lib.rs
···
18
18
`iroh_car` additionally applies a block size limit of `2MiB`.
19
19
20
20
```
21
-
use repo_stream::{Driver, DiskStore};
21
+
use repo_stream::{Driver, DriverBuilder, DiskBuilder};
22
22
23
23
# #[tokio::main]
24
24
# async fn main() -> Result<(), Box<dyn std::error::Error>> {
25
25
# let reader = include_bytes!("../car-samples/tiny.car").as_slice();
26
26
let mut total_size = 0;
27
-
let process = |rec: Vec<u8>| rec.len(); // block processing: just extract the size
28
-
let in_mem_limit = 10; /* MiB */
29
-
let db_cache_size = 32; /* MiB */
30
27
31
-
match Driver::load_car(reader, process, in_mem_limit).await? {
28
+
match DriverBuilder::new()
29
+
.with_mem_limit_mb(10)
30
+
.with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
31
+
.load_car(reader)
32
+
.await?
33
+
{
32
34
33
35
// if all blocks fit within memory
34
36
Driver::Memory(_commit, mut driver) => {
···
42
44
// if the CAR was too big for in-memory processing
43
45
Driver::Disk(paused) => {
44
46
// set up a disk store we can spill to
45
-
let store = DiskStore::new("some/path.db".into(), db_cache_size).await?;
47
+
let store = DiskBuilder::new().open("some/path.db".into()).await?;
46
48
// do the spilling, get back a (similar) driver
47
49
let (_commit, mut driver) = paused.finish_loading(store).await?;
48
50
···
79
81
pub mod drive;
80
82
pub mod process;
81
83
82
-
pub use disk::{DiskError, DiskStore};
83
-
pub use drive::{DriveError, Driver};
84
+
pub use disk::{DiskBuilder, DiskError, DiskStore};
85
+
pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk};
84
86
pub use mst::Commit;
85
87
pub use process::Processable;
+27
src/process.rs
+27
src/process.rs
···
11
11
approximate total off-stack size of the type. (the on-stack size will be added
12
12
automatically via `std::mem::get_size`).
13
13
14
+
Note that it is **not guaranteed** that the `process` function will run on a
15
+
block before storing it in memory or on disk: it's not possible to know if a
16
+
block is a record without actually walking the MST, so the best we can do is
17
+
apply `process` to any block that we know *cannot* be an MST node, and otherwise
18
+
store the raw block bytes.
19
+
14
20
Here's a silly processing function that just collects 'eyy's found in the raw
15
21
record bytes
16
22
···
71
77
}
72
78
}
73
79
80
+
impl Processable for String {
81
+
fn get_size(&self) -> usize {
82
+
self.capacity()
83
+
}
84
+
}
85
+
74
86
impl<Item: Sized + Processable> Processable for Vec<Item> {
75
87
fn get_size(&self) -> usize {
76
88
let slot_size = std::mem::size_of::<Item>();
···
79
91
direct_size + items_referenced_size
80
92
}
81
93
}
94
+
95
+
impl<Item: Processable> Processable for Option<Item> {
96
+
fn get_size(&self) -> usize {
97
+
self.as_ref().map(|item| item.get_size()).unwrap_or(0)
98
+
}
99
+
}
100
+
101
+
impl<Item: Processable, Error: Processable> Processable for Result<Item, Error> {
102
+
fn get_size(&self) -> usize {
103
+
match self {
104
+
Ok(item) => item.get_size(),
105
+
Err(err) => err.get_size(),
106
+
}
107
+
}
108
+
}
+6
-203
src/walk.rs
+6
-203
src/walk.rs
···
87
87
}
88
88
89
89
fn push_from_node(stack: &mut Vec<Need>, node: &Node, parent_depth: Depth) -> Result<(), MstError> {
90
-
// empty nodes are not allowed in the MST
91
-
// ...except for a single one for empty MST, but we wouldn't be pushing that
90
+
// empty nodes are not allowed in the MST except in an empty MST
92
91
if node.is_empty() {
93
-
return Err(MstError::EmptyNode);
92
+
if parent_depth == Depth::Root {
93
+
return Ok(()); // empty mst, nothing to push
94
+
} else {
95
+
return Err(MstError::EmptyNode);
96
+
}
94
97
}
95
98
96
99
let mut entries = Vec::with_capacity(node.entries.len());
···
304
307
#[cfg(test)]
305
308
mod test {
306
309
use super::*;
307
-
// use crate::mst::Entry;
308
310
309
311
fn cid1() -> Cid {
310
312
"bafyreihixenvk3ahqbytas4hk4a26w43bh6eo3w6usjqtxkpzsvi655a3m"
311
313
.parse()
312
314
.unwrap()
313
315
}
314
-
// fn cid2() -> Cid {
315
-
// "QmY7Yh4UquoXHLPFo2XbhXkhBvFoPwmQUSa92pxnxjQuPU"
316
-
// .parse()
317
-
// .unwrap()
318
-
// }
319
-
// fn cid3() -> Cid {
320
-
// "bafybeigdyrzt5sfp7udm7hu76uh7y26nf3efuylqabf3oclgtqy55fbzdi"
321
-
// .parse()
322
-
// .unwrap()
323
-
// }
324
-
// fn cid4() -> Cid {
325
-
// "QmbWqxBEKC3P8tqsKc98xmWNzrzDtRLMiMPL8wBuTGsMnR"
326
-
// .parse()
327
-
// .unwrap()
328
-
// }
329
-
// fn cid5() -> Cid {
330
-
// "QmSnuWmxptJZdLJpKRarxBMS2Ju2oANVrgbr2xWbie9b2D"
331
-
// .parse()
332
-
// .unwrap()
333
-
// }
334
-
// fn cid6() -> Cid {
335
-
// "QmdmQXB2mzChmMeKY47C43LxUdg1NDJ5MWcKMKxDu7RgQm"
336
-
// .parse()
337
-
// .unwrap()
338
-
// }
339
-
// fn cid7() -> Cid {
340
-
// "bafybeiaysi4s6lnjev27ln5icwm6tueaw2vdykrtjkwiphwekaywqhcjze"
341
-
// .parse()
342
-
// .unwrap()
343
-
// }
344
-
// fn cid8() -> Cid {
345
-
// "bafyreif3tfdpr5n4jdrbielmcapwvbpcthepfkwq2vwonmlhirbjmotedi"
346
-
// .parse()
347
-
// .unwrap()
348
-
// }
349
-
// fn cid9() -> Cid {
350
-
// "bafyreicnokmhmrnlp2wjhyk2haep4tqxiptwfrp2rrs7rzq7uk766chqvq"
351
-
// .parse()
352
-
// .unwrap()
353
-
// }
354
316
355
317
#[test]
356
318
fn test_depth_spec_0() {
···
441
403
.as_ref()
442
404
);
443
405
}
444
-
445
-
// #[test]
446
-
// fn test_needs_from_node_just_one_record() {
447
-
// let node = Node {
448
-
// left: None,
449
-
// entries: vec![Entry {
450
-
// keysuffix: "asdf".into(),
451
-
// prefix_len: 0,
452
-
// value: cid1(),
453
-
// tree: None,
454
-
// }],
455
-
// };
456
-
// assert_eq!(
457
-
// needs_from_node(node).unwrap(),
458
-
// vec![Need::Record {
459
-
// rkey: "asdf".into(),
460
-
// cid: cid1(),
461
-
// },]
462
-
// );
463
-
// }
464
-
465
-
// #[test]
466
-
// fn test_needs_from_node_two_records() {
467
-
// let node = Node {
468
-
// left: None,
469
-
// entries: vec![
470
-
// Entry {
471
-
// keysuffix: "asdf".into(),
472
-
// prefix_len: 0,
473
-
// value: cid1(),
474
-
// tree: None,
475
-
// },
476
-
// Entry {
477
-
// keysuffix: "gh".into(),
478
-
// prefix_len: 2,
479
-
// value: cid2(),
480
-
// tree: None,
481
-
// },
482
-
// ],
483
-
// };
484
-
// assert_eq!(
485
-
// needs_from_node(node).unwrap(),
486
-
// vec![
487
-
// Need::Record {
488
-
// rkey: "asdf".into(),
489
-
// cid: cid1(),
490
-
// },
491
-
// Need::Record {
492
-
// rkey: "asgh".into(),
493
-
// cid: cid2(),
494
-
// },
495
-
// ]
496
-
// );
497
-
// }
498
-
499
-
// #[test]
500
-
// fn test_needs_from_node_with_both() {
501
-
// let node = Node {
502
-
// left: None,
503
-
// entries: vec![Entry {
504
-
// keysuffix: "asdf".into(),
505
-
// prefix_len: 0,
506
-
// value: cid1(),
507
-
// tree: Some(cid2()),
508
-
// }],
509
-
// };
510
-
// assert_eq!(
511
-
// needs_from_node(node).unwrap(),
512
-
// vec![
513
-
// Need::Record {
514
-
// rkey: "asdf".into(),
515
-
// cid: cid1(),
516
-
// },
517
-
// Need::Node(cid2()),
518
-
// ]
519
-
// );
520
-
// }
521
-
522
-
// #[test]
523
-
// fn test_needs_from_node_left_and_record() {
524
-
// let node = Node {
525
-
// left: Some(cid1()),
526
-
// entries: vec![Entry {
527
-
// keysuffix: "asdf".into(),
528
-
// prefix_len: 0,
529
-
// value: cid2(),
530
-
// tree: None,
531
-
// }],
532
-
// };
533
-
// assert_eq!(
534
-
// needs_from_node(node).unwrap(),
535
-
// vec![
536
-
// Need::Node(cid1()),
537
-
// Need::Record {
538
-
// rkey: "asdf".into(),
539
-
// cid: cid2(),
540
-
// },
541
-
// ]
542
-
// );
543
-
// }
544
-
545
-
// #[test]
546
-
// fn test_needs_from_full_node() {
547
-
// let node = Node {
548
-
// left: Some(cid1()),
549
-
// entries: vec![
550
-
// Entry {
551
-
// keysuffix: "asdf".into(),
552
-
// prefix_len: 0,
553
-
// value: cid2(),
554
-
// tree: Some(cid3()),
555
-
// },
556
-
// Entry {
557
-
// keysuffix: "ghi".into(),
558
-
// prefix_len: 1,
559
-
// value: cid4(),
560
-
// tree: Some(cid5()),
561
-
// },
562
-
// Entry {
563
-
// keysuffix: "jkl".into(),
564
-
// prefix_len: 2,
565
-
// value: cid6(),
566
-
// tree: Some(cid7()),
567
-
// },
568
-
// Entry {
569
-
// keysuffix: "mno".into(),
570
-
// prefix_len: 4,
571
-
// value: cid8(),
572
-
// tree: Some(cid9()),
573
-
// },
574
-
// ],
575
-
// };
576
-
// assert_eq!(
577
-
// needs_from_node(node).unwrap(),
578
-
// vec![
579
-
// Need::Node(cid1()),
580
-
// Need::Record {
581
-
// rkey: "asdf".into(),
582
-
// cid: cid2(),
583
-
// },
584
-
// Need::Node(cid3()),
585
-
// Need::Record {
586
-
// rkey: "aghi".into(),
587
-
// cid: cid4(),
588
-
// },
589
-
// Need::Node(cid5()),
590
-
// Need::Record {
591
-
// rkey: "agjkl".into(),
592
-
// cid: cid6(),
593
-
// },
594
-
// Need::Node(cid7()),
595
-
// Need::Record {
596
-
// rkey: "agjkmno".into(),
597
-
// cid: cid8(),
598
-
// },
599
-
// Need::Node(cid9()),
600
-
// ]
601
-
// );
602
-
// }
603
406
}
+16
-5
tests/non-huge-cars.rs
+16
-5
tests/non-huge-cars.rs
···
1
1
extern crate repo_stream;
2
2
use repo_stream::Driver;
3
3
4
+
const EMPTY_CAR: &'static [u8] = include_bytes!("../car-samples/empty.car");
4
5
const TINY_CAR: &'static [u8] = include_bytes!("../car-samples/tiny.car");
5
6
const LITTLE_CAR: &'static [u8] = include_bytes!("../car-samples/little.car");
6
7
const MIDSIZE_CAR: &'static [u8] = include_bytes!("../car-samples/midsize.car");
7
8
8
-
async fn test_car(bytes: &[u8], expected_records: usize, expected_sum: usize) {
9
+
async fn test_car(
10
+
bytes: &[u8],
11
+
expected_records: usize,
12
+
expected_sum: usize,
13
+
expect_profile: bool,
14
+
) {
9
15
let mut driver = match Driver::load_car(bytes, |block| block.len(), 10 /* MiB */)
10
16
.await
11
17
.unwrap()
···
33
39
34
40
assert_eq!(records, expected_records);
35
41
assert_eq!(sum, expected_sum);
36
-
assert!(found_bsky_profile);
42
+
assert_eq!(found_bsky_profile, expect_profile);
43
+
}
44
+
45
+
#[tokio::test]
46
+
async fn test_empty_car() {
47
+
test_car(EMPTY_CAR, 0, 0, false).await
37
48
}
38
49
39
50
#[tokio::test]
40
51
async fn test_tiny_car() {
41
-
test_car(TINY_CAR, 8, 2071).await
52
+
test_car(TINY_CAR, 8, 2071, true).await
42
53
}
43
54
44
55
#[tokio::test]
45
56
async fn test_little_car() {
46
-
test_car(LITTLE_CAR, 278, 246960).await
57
+
test_car(LITTLE_CAR, 278, 246960, true).await
47
58
}
48
59
49
60
#[tokio::test]
50
61
async fn test_midsize_car() {
51
-
test_car(MIDSIZE_CAR, 11585, 3741393).await
62
+
test_car(MIDSIZE_CAR, 11585, 3741393, true).await
52
63
}