+1
-1
Cargo.lock
+1
-1
Cargo.lock
+2
-1
Cargo.toml
+2
-1
Cargo.toml
···
1
1
[package]
2
2
name = "repo-stream"
3
-
version = "0.1.0"
3
+
version = "0.1.1"
4
4
edition = "2024"
5
5
license = "MIT OR Apache-2.0"
6
6
description = "Fast and robust atproto CAR file processing in rust"
7
+
repository = "https://tangled.org/@microcosm.blue/repo-stream"
7
8
8
9
[dependencies]
9
10
futures = "0.3.31"
+28
-13
src/drive.rs
+28
-13
src/drive.rs
···
1
+
//! Consume an MST block stream, producing an ordered stream of records
2
+
1
3
use futures::{Stream, TryStreamExt};
2
4
use ipld_core::cid::Cid;
3
5
use std::collections::HashMap;
···
6
8
use crate::mst::{Commit, Node};
7
9
use crate::walk::{Step, Trip, Walker};
8
10
11
+
/// Errors that can happen while consuming and emitting blocks and records
9
12
#[derive(Debug, thiserror::Error)]
10
13
pub enum DriveError<E: Error> {
11
14
#[error("Failed to initialize CarReader: {0}")]
···
20
23
MissingBlock(Cid),
21
24
#[error("Failed to walk the mst tree: {0}")]
22
25
Tripped(#[from] Trip<E>),
23
-
#[error("Encountered an rkey out of order while walking the MST")]
24
-
RkeyOutOfOrder,
25
26
}
26
27
27
28
type CarBlock<E> = Result<(Cid, Vec<u8>), E>;
28
29
30
+
/// Newtype because i'll mix up strings somewhere if i don't
29
31
#[derive(Debug)]
30
32
pub struct Rkey(pub String);
31
33
···
55
57
Processed(Result<T, E>),
56
58
}
57
59
60
+
/// The core driver between the block stream and MST walker
58
61
pub struct Vehicle<SE, S, T, P, PE>
59
62
where
60
63
S: Stream<Item = CarBlock<SE>>,
···
65
68
blocks: HashMap<Cid, MaybeProcessedBlock<T, PE>>,
66
69
walker: Walker,
67
70
process: P,
68
-
prev_rkey: String,
69
71
}
70
72
71
73
impl<SE, S, T: Clone, P, PE> Vehicle<SE, S, T, P, PE>
···
75
77
P: Fn(&[u8]) -> Result<T, PE>,
76
78
PE: Error,
77
79
{
80
+
/// Set up the stream
81
+
///
82
+
/// This will eagerly consume blocks until the `Commit` object is found.
83
+
/// *Usually* the it's the first block, but there is no guarantee.
84
+
///
85
+
/// ### Parameters
86
+
///
87
+
/// `root`: CID of the commit object that is the root of the MST
88
+
///
89
+
/// `block_stream`: Input stream of raw CAR blocks
90
+
///
91
+
/// `process`: record-transforming callback:
92
+
///
93
+
/// For tasks where records can be quickly processed into a *smaller*
94
+
/// useful representation, you can do that eagerly as blocks come in by
95
+
/// passing the processor as a callback here. This can reduce overall
96
+
/// memory usage.
78
97
pub async fn init(
79
98
root: Cid,
80
99
mut block_stream: S,
···
116
135
blocks,
117
136
walker,
118
137
process,
119
-
prev_rkey: "".to_string(),
120
138
};
121
139
Ok((commit, me))
122
140
}
···
145
163
Err(DriveError::MissingBlock(cid_needed))
146
164
}
147
165
148
-
pub async fn next_record(&mut self) -> Result<Option<(Rkey, T)>, DriveError<PE>> {
166
+
/// Manually step through the record outputs
167
+
pub async fn next_record(&mut self) -> Result<Option<(String, T)>, DriveError<PE>> {
149
168
loop {
150
169
// walk as far as we can until we run out of blocks or find a record
151
-
let cid_needed = match self.walker.walk(&mut self.blocks, &self.process)? {
170
+
let cid_needed = match self.walker.step(&mut self.blocks, &self.process)? {
152
171
Step::Rest(cid) => cid,
153
172
Step::Finish => return Ok(None),
154
-
Step::Step { rkey, data } => {
155
-
if rkey <= self.prev_rkey {
156
-
return Err(DriveError::RkeyOutOfOrder);
157
-
}
158
-
return Ok(Some((Rkey(rkey), data)));
159
-
}
173
+
Step::Step { rkey, data } => return Ok(Some((rkey, data))),
160
174
};
161
175
162
176
// load blocks until we reach that cid
···
164
178
}
165
179
}
166
180
167
-
pub fn stream(self) -> impl Stream<Item = Result<(Rkey, T), DriveError<PE>>> {
181
+
/// Convert to a futures::stream of record outputs
182
+
pub fn stream(self) -> impl Stream<Item = Result<(String, T), DriveError<PE>>> {
168
183
futures::stream::try_unfold(self, |mut this| async move {
169
184
let maybe_record = this.next_record().await?;
170
185
Ok(maybe_record.map(|b| (b, this)))
+4
src/lib.rs
+4
src/lib.rs
+29
-5
src/walk.rs
+29
-5
src/walk.rs
···
6
6
use std::collections::HashMap;
7
7
use std::error::Error;
8
8
9
+
/// Errors that can happen while walking
9
10
#[derive(Debug, thiserror::Error)]
10
11
pub enum Trip<E: Error> {
11
12
#[error("empty mst nodes are not allowed")]
···
13
14
#[error("Failed to decode commit block: {0}")]
14
15
BadCommit(Box<dyn std::error::Error>),
15
16
#[error("Action node error: {0}")]
16
-
ActionNode(#[from] ActionNodeError),
17
+
RkeyError(#[from] RkeyError),
17
18
#[error("Process failed: {0}")]
18
19
ProcessFailed(E),
20
+
#[error("Encountered an rkey out of order while walking the MST")]
21
+
RkeyOutOfOrder,
19
22
}
20
23
24
+
/// Errors from invalid Rkeys
21
25
#[derive(Debug, thiserror::Error)]
22
-
pub enum ActionNodeError {
26
+
pub enum RkeyError {
23
27
#[error("Failed to compute an rkey due to invalid prefix_len")]
24
28
EntryPrefixOutOfbounds,
25
29
#[error("RKey was not utf-8")]
26
30
EntryRkeyNotUtf8(#[from] std::string::FromUtf8Error),
27
31
}
28
32
33
+
/// Walker outputs
29
34
#[derive(Debug)]
30
35
pub enum Step<T> {
36
+
/// We need a CID but it's not in the block store
37
+
///
38
+
/// Give the needed CID to the driver so it can load blocks until it's found
31
39
Rest(Cid),
40
+
/// Reached the end of the MST! yay!
32
41
Finish,
42
+
/// A record was found!
33
43
Step { rkey: String, data: T },
34
44
}
35
45
···
39
49
Record { rkey: String, cid: Cid },
40
50
}
41
51
42
-
fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), ActionNodeError> {
52
+
fn push_from_node(stack: &mut Vec<Need>, node: &Node) -> Result<(), RkeyError> {
43
53
let mut entries = Vec::with_capacity(node.entries.len());
44
54
45
55
let mut prefix = vec![];
···
47
57
let mut rkey = vec![];
48
58
let pre_checked = prefix
49
59
.get(..entry.prefix_len)
50
-
.ok_or(ActionNodeError::EntryPrefixOutOfbounds)?;
60
+
.ok_or(RkeyError::EntryPrefixOutOfbounds)?;
51
61
rkey.extend_from_slice(pre_checked);
52
62
rkey.extend_from_slice(&entry.keysuffix);
53
63
prefix = rkey.clone();
···
70
80
Ok(())
71
81
}
72
82
83
+
/// Traverser of an atproto MST
84
+
///
85
+
/// Walks the tree from left-to-right in depth-first order
73
86
#[derive(Debug)]
74
87
pub struct Walker {
75
88
stack: Vec<Need>,
89
+
prev: String,
76
90
}
77
91
78
92
impl Walker {
79
93
pub fn new(tree_root_cid: Cid) -> Self {
80
94
Self {
81
95
stack: vec![Need::Node(tree_root_cid)],
96
+
prev: "".to_string(),
82
97
}
83
98
}
84
99
85
-
pub fn walk<T: Clone, E: Error>(
100
+
/// Advance through nodes until we find a record or can't go further
101
+
pub fn step<T: Clone, E: Error>(
86
102
&mut self,
87
103
blocks: &mut HashMap<Cid, MaybeProcessedBlock<T, E>>,
88
104
process: impl Fn(&[u8]) -> Result<T, E>,
···
140
156
141
157
log::trace!("emitting a block as a step. depth={}", self.stack.len());
142
158
let data = data.map_err(Trip::ProcessFailed)?;
159
+
160
+
// rkeys *must* be in order or else the tree is invalid (or
161
+
// we have a bug)
162
+
if rkey <= self.prev {
163
+
return Err(Trip::RkeyOutOfOrder);
164
+
}
165
+
self.prev = rkey.clone();
166
+
143
167
return Ok(Step::Step { rkey, data });
144
168
}
145
169
}
+3
-3
tests/non-huge-cars.rs
+3
-3
tests/non-huge-cars.rs
···
33
33
while let Some((rkey, size)) = record_stream.try_next().await.unwrap() {
34
34
records += 1;
35
35
sum += size;
36
-
if rkey.0 == "app.bsky.actor.profile/self" {
36
+
if rkey == "app.bsky.actor.profile/self" {
37
37
found_bsky_profile = true;
38
38
}
39
-
assert!(rkey.0 > prev_rkey, "rkeys are streamed in order");
40
-
prev_rkey = rkey.0;
39
+
assert!(rkey > prev_rkey, "rkeys are streamed in order");
40
+
prev_rkey = rkey;
41
41
}
42
42
assert_eq!(records, expected_records);
43
43
assert_eq!(sum, expected_sum);