···1+# v0.4.0
2+3+_2026-01-15_
4+5+- use `Output { rkey, cid, data }` instead of the `(rkey, data)` tuple so that the `Cid` is exposed. this is to make tap-like diffing possible.
6+7+8+# v0.3.1
9+10+_2026-01-15_
11+12+- bring back the disk driver's `reset` function for disk storage reuse
13+14+15+# v0.3.0
16+17+_2026-01-15_
18+19+- drop sqlite, pick up fjall v3 for some speeeeeeed (and code simplification and easier build requirements and)
20+- no more `Processable` trait, process functions are just `Vec<u8> -> Vec<u8>` now (bring your own ser/de). there's a potential small cost here where processors need to now actually go through serialization even for in-memory car walking, but i think zero-copy approaches (eg. rkyv) are low-cost enough
21+- custom deserialize for MST nodes that does as much depth calculation and rkey validation as - possible in-line. (not clear if it actually made anything faster)
22+- check MST depth at every node properly (previously it could do some walking before being able to check and included some assumptions)
23+- check MST for empty leaf nodes (which not allowed)
24+- shave 0.6 nanoseconds (really) from MST depth calculation (don't ask)
25+- drop and swap some dependencies: `bincode`, `futures`, `futures-core`, `ipld-core` -> `cid`, `multibase`, `rusqlite` -> `fjall`. and add `hashbrown` bc it benchmarked a bit faster. (we hash on user-controlled CIDs -- is the lower DOS-resistance a risk to worry about?)
+16-9
examples/disk-read-file/main.rs
···3*/
45extern crate repo_stream;
000006use clap::Parser;
7use repo_stream::{DiskBuilder, Driver, DriverBuilder};
8use std::path::PathBuf;
0910#[derive(Debug, Parser)]
11struct Args {
···27 let reader = tokio::io::BufReader::new(reader);
2829 log::info!("hello! reading the car...");
03031 // in this example we only bother handling CARs that are too big for memory
32 // `noop` helper means: do no block processing, store the raw blocks
33 let driver = match DriverBuilder::new()
34- .with_mem_limit_mb(10) // how much memory can be used before disk spill
35 .load_car(reader)
36 .await?
37 {
···4849 // at this point you might want to fetch the account's signing key
50 // via the DID from the commit, and then verify the signature.
51- log::warn!("big's comit: {:?}", commit);
0005253 // pop the driver back out to get some code indentation relief
54 driver
···70 // keep a count of the total number of blocks seen
71 n += pairs.len();
7273- for (_, block) in pairs {
74 // for each block, count how many bytes are equal to '0'
75 // (this is just an example, you probably want to do something more
76 // interesting)
77- zeros += block.into_iter().filter(|&b| b == b'0').count()
78 }
79 }
8081- log::info!("arrived! joining rx...");
8283- // clean up the database. would be nice to do this in drop so it happens
84- // automatically, but some blocking work happens, so that's not allowed in
85- // async rust. ๐คทโโ๏ธ
86- join.await?.reset_store().await?;
8788 log::info!("done. n={n} zeros={zeros}");
89
···3*/
45extern crate repo_stream;
6+7+use mimalloc::MiMalloc;
8+#[global_allocator]
9+static GLOBAL: MiMalloc = MiMalloc;
10+11use clap::Parser;
12use repo_stream::{DiskBuilder, Driver, DriverBuilder};
13use std::path::PathBuf;
14+use std::time::Instant;
1516#[derive(Debug, Parser)]
17struct Args {
···33 let reader = tokio::io::BufReader::new(reader);
3435 log::info!("hello! reading the car...");
36+ let t0 = Instant::now();
3738 // in this example we only bother handling CARs that are too big for memory
39 // `noop` helper means: do no block processing, store the raw blocks
40 let driver = match DriverBuilder::new()
41+ .with_mem_limit_mb(32) // how much memory can be used before disk spill
42 .load_car(reader)
43 .await?
44 {
···5556 // at this point you might want to fetch the account's signing key
57 // via the DID from the commit, and then verify the signature.
58+ log::warn!("big's comit ({:?}): {:?}", t0.elapsed(), commit);
59+60+ // log::info!("now is good time to check mem usage...");
61+ // tokio::time::sleep(std::time::Duration::from_secs(15)).await;
6263 // pop the driver back out to get some code indentation relief
64 driver
···80 // keep a count of the total number of blocks seen
81 n += pairs.len();
8283+ for output in pairs {
84 // for each block, count how many bytes are equal to '0'
85 // (this is just an example, you probably want to do something more
86 // interesting)
87+ zeros += output.data.into_iter().filter(|&b| b == b'0').count()
88 }
89 }
9091+ log::info!("arrived! ({:?}) joining rx...", t0.elapsed());
9293+ join.await?;
0009495 log::info!("done. n={n} zeros={zeros}");
96
+1-1
examples/read-file/main.rs
···24 let reader = tokio::io::BufReader::new(reader);
2526 let (commit, mut driver) = match DriverBuilder::new()
27- .with_block_processor(|block| block.len())
28 .load_car(reader)
29 .await?
30 {
···24 let reader = tokio::io::BufReader::new(reader);
2526 let (commit, mut driver) = match DriverBuilder::new()
27+ .with_block_processor(|block| block.len().to_ne_bytes().to_vec())
28 .load_car(reader)
29 .await?
30 {
+115-17
readme.md
···45[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6[![Documentation][docs-badge]](https://docs.rs/repo-stream)
078[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
9[docs-badge]: https://docs.rs/repo-stream/badge.svg
010001112-todo
00000001314-- [ ] get an *emtpy* car for the test suite
15-- [ ] implement a max size on disk limit
0000016000000000000000000000000000000000000000001718-----
19···2223current car processing times (records processed into their length usize, phil's dev machine):
2425-- 128MiB CAR file: `347ms`
26-- 5.0MiB: `6.1ms`
27-- 279KiB: `139us`
28-- 3.4KiB: `4.9us`
000000000000000002903031-running the huge-car benchmark
000000000000000000000003233- to avoid committing it to the repo, you have to pass it in through the env for now.
34···42- [x] car file test fixtures & validation tests
43- [x] make sure we can get the did and signature out for verification
44 -> yeah the commit is returned from init
45-- [ ] spec compliance todos
46 - [x] assert that keys are ordered and fail if not
47 - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
48-- [ ] performance todos
49 - [x] consume the serialized nodes into a mutable efficient format
50- - [ ] maybe customize the deserialize impl to do that directly?
51 - [x] benchmark and profile
52-- [ ] robustness todos
53- - [ ] swap the blocks hashmap for a BlockStore trait that can be dumped to redb
54- - [ ] maybe keep the redb function behind a feature flag?
55- - [ ] can we assert a max size for node blocks?
56 - [x] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting
57 -> because it's the upper 3 bytes, not upper 4 byte nibble, oops.
58- - [ ] max mst depth (there is actually a hard limit but a malicious repo could do anything)
59- - [ ] i don't *think* we need a max recursion depth for processing cbor contents since we leave records to the user to decode
6061newer ideas
62
···45[![Crates.io][crates-badge]](https://crates.io/crates/repo-stream)
6[![Documentation][docs-badge]](https://docs.rs/repo-stream)
7+[![Sponsor][sponsor-badge]](https://github.com/sponsors/uniphil)
89[crates-badge]: https://img.shields.io/crates/v/repo-stream.svg
10[docs-badge]: https://docs.rs/repo-stream/badge.svg
11+[sponsor-badge]: https://img.shields.io/badge/at-microcosm-b820f9?labelColor=b820f9&logo=githubsponsors&logoColor=fff
1213+```rust no_run
14+use repo_stream::{Driver, DriverBuilder, DriveError, DiskBuilder, Output};
1516+#[tokio::main]
17+async fn main() -> Result<(), Box<dyn std::error::Error>> {
18+ // repo-stream takes any AsyncRead as input, like a tokio::fs::File
19+ let reader = tokio::fs::File::open("repo.car").await?;
20+ let reader = tokio::io::BufReader::new(reader);
21+22+ // example repo workload is simply counting the total record bytes
23+ let mut total_size = 0;
2425+ match DriverBuilder::new()
26+ .with_mem_limit_mb(10)
27+ .with_block_processor( // block processing: just extract the raw record size
28+ |rec| rec.len().to_ne_bytes().to_vec())
29+ .load_car(reader)
30+ .await?
31+ {
3233+ // if all blocks fit within memory
34+ Driver::Memory(_commit, mut driver) => {
35+ while let Some(chunk) = driver.next_chunk(256).await? {
36+ for Output { rkey: _, cid: _, data } in chunk {
37+ let size = usize::from_ne_bytes(data.try_into().unwrap());
38+ total_size += size;
39+ }
40+ }
41+ },
42+43+ // if the CAR was too big for in-memory processing
44+ Driver::Disk(paused) => {
45+ // set up a disk store we can spill to
46+ let store = DiskBuilder::new().open("some/path.db".into()).await?;
47+ // do the spilling, get back a (similar) driver
48+ let (_commit, mut driver) = paused.finish_loading(store).await?;
49+50+ while let Some(chunk) = driver.next_chunk(256).await? {
51+ for Output { rkey: _, cid: _, data } in chunk {
52+ let size = usize::from_ne_bytes(data.try_into().unwrap());
53+ total_size += size;
54+ }
55+ }
56+ }
57+ };
58+ println!("sum of size of all records: {total_size}");
59+ Ok(())
60+}
61+```
62+63+more recent todo
64+- [ ] add a zero-copy rkyv process function example
65+- [ ] repo car slices
66+- [ ] lazy-value stream (rkey -> CID diffing for tap-like `#sync` handling)
67+- [x] get an *emtpy* car for the test suite
68+- [x] implement a max size on disk limit
69+70+some ideas
71+- [ ] since the disk k/v get/set interface is now so similar to HashMap (blocking, no transactions,), it's probably possible to make a single `Driver` and move the thread stuff from the disk one to generic helper functions. (might create async footguns though)
72+- [ ] fork iroh-car into a sync version so we can drop tokio as a hard requirement, and offer async via wrapper helper things
73+- [ ] feature-flag the sha2 crate for hmac-sha256? if someone wanted fewer deps?? then maybe make `hashbrown` also optional vs builtin hashmap?
7475-----
76···7980current car processing times (records processed into their length usize, phil's dev machine):
8182+- 450MiB CAR file (huge): `1.3s`
83+- 128MiB (huge): `350ms`
84+- 5.0MiB: `6.8ms`
85+- 279KiB: `160us`
86+- 3.4KiB: `5.1us`
87+- empty: `690ns`
88+89+it's a little faster with `mimalloc`
90+91+```rust
92+use mimalloc::MiMalloc;
93+#[global_allocator]
94+static GLOBAL: MiMalloc = MiMalloc;
95+```
96+97+- 450MiB CAR file: `1.2s` (-8%)
98+- 128MiB: `300ms` (-14%)
99+- 5.0MiB: `6.0ms` (-11%)
100+- 279KiB: `150us` (-7%)
101+- 3.4KiB: `4.7us` (-8%)
102+- empty: `670ns` (-4%)
103104+processing CARs requires buffering blocks, so it can consume a lot of memory. repo-stream's in-memory driver has minimal memory overhead, but there are two ways to make it work with less mem (you can do either or both!)
105106+1. spill blocks to disk
107+2. inline block processing
108+109+#### spill blocks to disk
110+111+this is a little slower but can greatly reduce the memory used. there's nothing special you need to do for this.
112+113+114+#### inline block processing
115+116+if you don't need to store the complete records, you can have repo-stream try to optimistically apply a processing function to the raw blocks as they are streamed in.
117+118+119+#### constrained mem perf comparison
120+121+sketchy benchmark but hey. mimalloc is enabled, and the processing spills to disk. inline processing reduces entire records to 8 bytes (usize of the raw record block size):
122+123+- 450MiB CAR file: `5.0s` (4.5x slowdown for disk)
124+- 128MiB: `1.27s` (4.1x slowdown)
125+126+fortunately, most CARs in the ATmosphere are very small, so for eg. backfill purposes, the vast majority of inputs will not face this slowdown.
127+128+129+#### running the huge-car benchmark
130131- to avoid committing it to the repo, you have to pass it in through the env for now.
132···140- [x] car file test fixtures & validation tests
141- [x] make sure we can get the did and signature out for verification
142 -> yeah the commit is returned from init
143+- [x] spec compliance todos
144 - [x] assert that keys are ordered and fail if not
145 - [x] verify node mst depth from key (possibly pending [interop test fixes](https://github.com/bluesky-social/atproto-interop-tests/issues/5))
146+- [x] performance todos
147 - [x] consume the serialized nodes into a mutable efficient format
148+ - [x] maybe customize the deserialize impl to do that directly?
149 - [x] benchmark and profile
150+- [x] robustness todos
151+ - [x] swap the blocks hashmap for a BlockStore trait that can be dumped to redb
152+ - [x] maybe keep the redb function behind a feature flag?
153+ - [ ] can we assert a max size of entries for node blocks?
154 - [x] figure out why asserting the upper nibble of the fourth byte of a node fails fingerprinting
155 -> because it's the upper 3 bytes, not upper 4 byte nibble, oops.
156+ - [x] max mst depth (to expensive to attack actually)
157+ - [x] i don't *think* we need a max recursion depth for processing cbor contents since we leave records to the user to decode
158159newer ideas
160
+39-99
src/disk.rs
···17```
18*/
1920-use crate::drive::DriveError;
21-use rusqlite::OptionalExtension;
22use std::path::PathBuf;
2324#[derive(Debug, thiserror::Error)]
···28 /// (The wrapped err should probably be obscured to remove public-facing
29 /// sqlite bits)
30 #[error(transparent)]
31- DbError(#[from] rusqlite::Error),
32 /// A tokio blocking task failed to join
33 #[error("Failed to join a tokio blocking task: {0}")]
34 JoinError(#[from] tokio::task::JoinError),
···38 /// limit.
39 #[error("Maximum disk size reached")]
40 MaxSizeExceeded,
41- #[error("this error was replaced, seeing this is a bug.")]
42- #[doc(hidden)]
43- Stolen,
44-}
45-46-impl DiskError {
47- /// hack for ownership challenges with the disk driver
48- pub(crate) fn steal(&mut self) -> Self {
49- let mut swapped = DiskError::Stolen;
50- std::mem::swap(self, &mut swapped);
51- swapped
52- }
53}
5455/// Builder-style disk store setup
056pub struct DiskBuilder {
57 /// Database in-memory cache allowance
58 ///
···70impl Default for DiskBuilder {
71 fn default() -> Self {
72 Self {
73- cache_size_mb: 32,
74 max_stored_mb: 10 * 1024, // 10 GiB
75 }
76 }
···83 }
84 /// Set the in-memory cache allowance for the database
85 ///
86- /// Default: 32 MiB
87 pub fn with_cache_size_mb(mut self, size: usize) -> Self {
88 self.cache_size_mb = size;
89 self
···96 self
97 }
98 /// Open and initialize the actual disk storage
99- pub async fn open(self, path: PathBuf) -> Result<DiskStore, DiskError> {
100 DiskStore::new(path, self.cache_size_mb, self.max_stored_mb).await
101 }
102}
103104/// On-disk block storage
105pub struct DiskStore {
106- conn: rusqlite::Connection,
00107 max_stored: usize,
108 stored: usize,
109}
···116 max_stored_mb: usize,
117 ) -> Result<Self, DiskError> {
118 let max_stored = max_stored_mb * 2_usize.pow(20);
119- let conn = tokio::task::spawn_blocking(move || {
120- let conn = rusqlite::Connection::open(path)?;
121-122- let sqlite_one_mb = -(2_i64.pow(10)); // negative is kibibytes for sqlite cache_size
123-124- // conn.pragma_update(None, "journal_mode", "OFF")?;
125- // conn.pragma_update(None, "journal_mode", "MEMORY")?;
126- conn.pragma_update(None, "journal_mode", "WAL")?;
127- // conn.pragma_update(None, "wal_autocheckpoint", "0")?; // this lets things get a bit big on disk
128- conn.pragma_update(None, "synchronous", "OFF")?;
129- conn.pragma_update(
130- None,
131- "cache_size",
132- (cache_mb as i64 * sqlite_one_mb).to_string(),
133- )?;
134- Self::reset_tables(&conn)?;
135136- Ok::<_, DiskError>(conn)
137 })
138 .await??;
139140 Ok(Self {
141- conn,
0142 max_stored,
143 stored: 0,
144 })
145 }
146- pub(crate) fn get_writer(&'_ mut self) -> Result<SqliteWriter<'_>, DiskError> {
147- let tx = self.conn.transaction()?;
148- Ok(SqliteWriter {
149- tx,
150- stored: &mut self.stored,
151- max: self.max_stored,
152- })
153- }
154- pub(crate) fn get_reader<'conn>(&'conn self) -> Result<SqliteReader<'conn>, DiskError> {
155- let select_stmt = self.conn.prepare("SELECT val FROM blocks WHERE key = ?1")?;
156- Ok(SqliteReader { select_stmt })
157- }
158- /// Drop and recreate the kv table
159- pub async fn reset(self) -> Result<Self, DiskError> {
160- tokio::task::spawn_blocking(move || {
161- Self::reset_tables(&self.conn)?;
162- Ok(self)
163- })
164- .await?
165- }
166- fn reset_tables(conn: &rusqlite::Connection) -> Result<(), DiskError> {
167- conn.execute("DROP TABLE IF EXISTS blocks", ())?;
168- conn.execute(
169- "CREATE TABLE blocks (
170- key BLOB PRIMARY KEY NOT NULL,
171- val BLOB NOT NULL
172- ) WITHOUT ROWID",
173- (),
174- )?;
175- Ok(())
176- }
177-}
178179-pub(crate) struct SqliteWriter<'conn> {
180- tx: rusqlite::Transaction<'conn>,
181- stored: &'conn mut usize,
182- max: usize,
183-}
184-185-impl SqliteWriter<'_> {
186 pub(crate) fn put_many(
187 &mut self,
188- kv: impl Iterator<Item = Result<(Vec<u8>, Vec<u8>), DriveError>>,
189 ) -> Result<(), DriveError> {
190- let mut insert_stmt = self
191- .tx
192- .prepare_cached("INSERT INTO blocks (key, val) VALUES (?1, ?2)")
193- .map_err(DiskError::DbError)?;
194- for pair in kv {
195- let (k, v) = pair?;
196- *self.stored += v.len();
197- if *self.stored > self.max {
198 return Err(DiskError::MaxSizeExceeded.into());
199 }
200- insert_stmt.execute((k, v)).map_err(DiskError::DbError)?;
201 }
202- Ok(())
203- }
204- pub fn commit(self) -> Result<(), DiskError> {
205- self.tx.commit()?;
206 Ok(())
207 }
208-}
209210-pub(crate) struct SqliteReader<'conn> {
211- select_stmt: rusqlite::Statement<'conn>,
212-}
0213214-impl SqliteReader<'_> {
215- pub(crate) fn get(&mut self, key: Vec<u8>) -> rusqlite::Result<Option<Vec<u8>>> {
216- self.select_stmt
217- .query_one((&key,), |row| row.get(0))
218- .optional()
219 }
220}
···1//! Consume a CAR from an AsyncRead, producing an ordered stream of records
23-use crate::disk::{DiskError, DiskStore};
4-use crate::process::Processable;
5-use ipld_core::cid::Cid;
00006use iroh_car::CarReader;
7-use serde::{Deserialize, Serialize};
8-use std::collections::HashMap;
9use std::convert::Infallible;
10use tokio::{io::AsyncRead, sync::mpsc};
1112-use crate::mst::{Commit, Node};
13-use crate::walk::{Step, WalkError, Walker};
1415/// Errors that can happen while consuming and emitting blocks and records
16#[derive(Debug, thiserror::Error)]
···21 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
22 #[error("The Commit block reference by the root was not found")]
23 MissingCommit,
24- #[error("The MST block {0} could not be found")]
25- MissingBlock(Cid),
26 #[error("Failed to walk the mst tree: {0}")]
27 WalkError(#[from] WalkError),
28 #[error("CAR file had no roots")]
29 MissingRoot,
30 #[error("Storage error")]
31 StorageError(#[from] DiskError),
32- #[error("Encode error: {0}")]
33- BincodeEncodeError(#[from] bincode::error::EncodeError),
34 #[error("Tried to send on a closed channel")]
35 ChannelSendError, // SendError takes <T> which we don't need
36 #[error("Failed to join a task: {0}")]
37 JoinError(#[from] tokio::task::JoinError),
38}
3940-#[derive(Debug, thiserror::Error)]
41-pub enum DecodeError {
42- #[error(transparent)]
43- BincodeDecodeError(#[from] bincode::error::DecodeError),
44- #[error("extra bytes remained after decoding")]
45- ExtraGarbage,
46-}
47-48-/// An in-order chunk of Rkey + (processed) Block pairs
49-pub type BlockChunk<T> = Vec<(String, T)>;
5051-#[derive(Debug, Clone, Serialize, Deserialize)]
52-pub(crate) enum MaybeProcessedBlock<T> {
53 /// A block that's *probably* a Node (but we can't know yet)
54 ///
55 /// It *can be* a record that suspiciously looks a lot like a node, so we
56 /// cannot eagerly turn it into a Node. We only know for sure what it is
57 /// when we actually walk down the MST
58- Raw(Vec<u8>),
59 /// A processed record from a block that was definitely not a Node
60 ///
61 /// Processing has to be fallible because the CAR can have totally-unused
···71 /// There's an alternative here, which would be to kick unprocessable blocks
72 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
73 /// surface the typed error later if needed by trying to reprocess.
74- Processed(T),
75}
7677-impl<T: Processable> Processable for MaybeProcessedBlock<T> {
78- /// TODO this is probably a little broken
79- fn get_size(&self) -> usize {
80- use std::{cmp::max, mem::size_of};
81-82- // enum is always as big as its biggest member?
83- let base_size = max(size_of::<Vec<u8>>(), size_of::<T>());
84-85- let extra = match self {
86- Self::Raw(bytes) => bytes.len(),
87- Self::Processed(t) => t.get_size(),
88- };
89-90- base_size + extra
91- }
92-}
93-94-impl<T> MaybeProcessedBlock<T> {
95- fn maybe(process: fn(Vec<u8>) -> T, data: Vec<u8>) -> Self {
96- if Node::could_be(&data) {
97 MaybeProcessedBlock::Raw(data)
98 } else {
99 MaybeProcessedBlock::Processed(process(data))
100 }
101 }
000000000000000000000000000102}
103104/// Read a CAR file, buffering blocks in memory or to disk
105-pub enum Driver<R: AsyncRead + Unpin, T: Processable> {
106 /// All blocks fit within the memory limit
107 ///
108 /// You probably want to check the commit's signature. You can go ahead and
109 /// walk the MST right away.
110- Memory(Commit, MemDriver<T>),
111 /// Blocks exceed the memory limit
112 ///
113 /// You'll need to provide a disk storage to continue. The commit will be
114 /// returned and can be validated only once all blocks are loaded.
115- Disk(NeedDisk<R, T>),
000000116}
117118/// Builder-style driver setup
0119pub struct DriverBuilder {
120 pub mem_limit_mb: usize,
0121}
122123impl Default for DriverBuilder {
124 fn default() -> Self {
125- Self { mem_limit_mb: 16 }
000126 }
127}
128···134 /// Set the in-memory size limit, in MiB
135 ///
136 /// Default: 16 MiB
137- pub fn with_mem_limit_mb(self, new_limit: usize) -> Self {
138- Self {
139- mem_limit_mb: new_limit,
140- }
141 }
0142 /// Set the block processor
143 ///
144 /// Default: noop, raw blocks will be emitted
145- pub fn with_block_processor<T: Processable>(
146- self,
147- p: fn(Vec<u8>) -> T,
148- ) -> DriverBuilderWithProcessor<T> {
149- DriverBuilderWithProcessor {
150- mem_limit_mb: self.mem_limit_mb,
151- block_processor: p,
152- }
153- }
154- /// Begin processing an atproto MST from a CAR file
155- pub async fn load_car<R: AsyncRead + Unpin>(
156- self,
157- reader: R,
158- ) -> Result<Driver<R, Vec<u8>>, DriveError> {
159- Driver::load_car(reader, crate::process::noop, self.mem_limit_mb).await
160- }
161-}
162-163-/// Builder-style driver intermediate step
164-///
165-/// start from `DriverBuilder`
166-pub struct DriverBuilderWithProcessor<T: Processable> {
167- pub mem_limit_mb: usize,
168- pub block_processor: fn(Vec<u8>) -> T,
169-}
170-171-impl<T: Processable> DriverBuilderWithProcessor<T> {
172- /// Set the in-memory size limit, in MiB
173- ///
174- /// Default: 16 MiB
175- pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
176- self.mem_limit_mb = new_limit;
177 self
178 }
0179 /// Begin processing an atproto MST from a CAR file
180- pub async fn load_car<R: AsyncRead + Unpin>(
181- self,
182- reader: R,
183- ) -> Result<Driver<R, T>, DriveError> {
184 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
185 }
186}
187188-impl<R: AsyncRead + Unpin, T: Processable> Driver<R, T> {
189 /// Begin processing an atproto MST from a CAR file
190 ///
191 /// Blocks will be loaded, processed, and buffered in memory. If the entire
···197 /// resumed by providing a `SqliteStorage` for on-disk block storage.
198 pub async fn load_car(
199 reader: R,
200- process: fn(Vec<u8>) -> T,
201 mem_limit_mb: usize,
202- ) -> Result<Driver<R, T>, DriveError> {
203 let max_size = mem_limit_mb * 2_usize.pow(20);
204 let mut mem_blocks = HashMap::new();
205···229 let maybe_processed = MaybeProcessedBlock::maybe(process, data);
230231 // stash (maybe processed) blocks in memory as long as we have room
232- mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
233 mem_blocks.insert(cid, maybe_processed);
234 if mem_size >= max_size {
235 return Ok(Driver::Disk(NeedDisk {
···246 // all blocks loaded and we fit in memory! hopefully we found the commit...
247 let commit = commit.ok_or(DriveError::MissingCommit)?;
248249- let walker = Walker::new(commit.data);
00000000250251 Ok(Driver::Memory(
252 commit,
···273/// work the init function will do. We can drop the CAR reader before walking,
274/// so the sync/async boundaries become a little easier to work around.
275#[derive(Debug)]
276-pub struct MemDriver<T: Processable> {
277- blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
278 walker: Walker,
279- process: fn(Vec<u8>) -> T,
280}
281282-impl<T: Processable> MemDriver<T> {
283 /// Step through the record outputs, in rkey order
284- pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
285 let mut out = Vec::with_capacity(n);
286 for _ in 0..n {
287 // walk as far as we can until we run out of blocks or find a record
288- match self.walker.step(&mut self.blocks, self.process)? {
289- Step::Missing(cid) => return Err(DriveError::MissingBlock(cid)),
290- Step::Finish => break,
291- Step::Found { rkey, data } => {
292- out.push((rkey, data));
293- continue;
294- }
295 };
0296 }
297-298 if out.is_empty() {
299 Ok(None)
300 } else {
···304}
305306/// A partially memory-loaded car file that needs disk spillover to continue
307-pub struct NeedDisk<R: AsyncRead + Unpin, T: Processable> {
308 car: CarReader<R>,
309 root: Cid,
310- process: fn(Vec<u8>) -> T,
311 max_size: usize,
312- mem_blocks: HashMap<Cid, MaybeProcessedBlock<T>>,
313 pub commit: Option<Commit>,
314}
315316-fn encode(v: impl Serialize) -> Result<Vec<u8>, bincode::error::EncodeError> {
317- bincode::serde::encode_to_vec(v, bincode::config::standard())
318-}
319-320-pub(crate) fn decode<T: Processable>(bytes: &[u8]) -> Result<T, DecodeError> {
321- let (t, n) = bincode::serde::decode_from_slice(bytes, bincode::config::standard())?;
322- if n != bytes.len() {
323- return Err(DecodeError::ExtraGarbage);
324- }
325- Ok(t)
326-}
327-328-impl<R: AsyncRead + Unpin, T: Processable + Send + 'static> NeedDisk<R, T> {
329 pub async fn finish_loading(
330 mut self,
331 mut store: DiskStore,
332- ) -> Result<(Commit, DiskDriver<T>), DriveError> {
333 // move store in and back out so we can manage lifetimes
334 // dump mem blocks into the store
335 store = tokio::task::spawn(async move {
336- let mut writer = store.get_writer()?;
337-338 let kvs = self
339 .mem_blocks
340 .into_iter()
341- .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
342343- writer.put_many(kvs)?;
344- writer.commit()?;
345 Ok::<_, DriveError>(store)
346 })
347 .await??;
348349- let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock<T>)>>(2);
350351 let store_worker = tokio::task::spawn_blocking(move || {
352- let mut writer = store.get_writer()?;
353-354 while let Some(chunk) = rx.blocking_recv() {
355 let kvs = chunk
356 .into_iter()
357- .map(|(k, v)| Ok(encode(v).map(|v| (k.to_bytes(), v))?));
358- writer.put_many(kvs)?;
359 }
360-361- writer.commit()?;
362 Ok::<_, DriveError>(store)
363 }); // await later
364···377 self.commit = Some(c);
378 continue;
379 }
000380 // remaining possible types: node, record, other. optimistically process
381 // TODO: get the actual in-memory size to compute disk spill
382 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
383- mem_size += std::mem::size_of::<Cid>() + maybe_processed.get_size();
384 chunk.push((cid, maybe_processed));
385- if mem_size >= self.max_size {
386 // soooooo if we're setting the db cache to max_size and then letting
387 // multiple chunks in the queue that are >= max_size, then at any time
388 // we might be using some multiple of max_size?
···405406 let commit = self.commit.ok_or(DriveError::MissingCommit)?;
407408- let walker = Walker::new(commit.data);
0000000000409410 Ok((
411 commit,
···423}
424425/// MST walker that reads from disk instead of an in-memory hashmap
426-pub struct DiskDriver<T: Clone> {
427- process: fn(Vec<u8>) -> T,
428 state: Option<BigState>,
429}
430431// for doctests only
432#[doc(hidden)]
433-pub fn _get_fake_disk_driver() -> DiskDriver<Vec<u8>> {
434- use crate::process::noop;
435 DiskDriver {
436 process: noop,
437 state: None,
438 }
439}
440441-impl<T: Processable + Send + 'static> DiskDriver<T> {
442 /// Walk the MST returning up to `n` rkey + record pairs
443 ///
444 /// ```no_run
445- /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
446 /// # #[tokio::main]
447 /// # async fn main() -> Result<(), DriveError> {
448 /// # let mut disk_driver = _get_fake_disk_driver();
449 /// while let Some(pairs) = disk_driver.next_chunk(256).await? {
450- /// for (rkey, record) in pairs {
451- /// println!("{rkey}: size={}", record.len());
452 /// }
453 /// }
454- /// let store = disk_driver.reset_store().await?;
455 /// # Ok(())
456 /// # }
457 /// ```
458- pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk<T>>, DriveError> {
459 let process = self.process;
460461 // state should only *ever* be None transiently while inside here
···464 // the big pain here is that we don't want to leave self.state in an
465 // invalid state (None), so all the error paths have to make sure it
466 // comes out again.
467- let (state, res) = tokio::task::spawn_blocking(
468- move || -> (BigState, Result<BlockChunk<T>, DriveError>) {
469- let mut reader_res = state.store.get_reader();
470- let reader: &mut _ = match reader_res {
471- Ok(ref mut r) => r,
472- Err(ref mut e) => {
473- // unfortunately we can't return the error directly because
474- // (for some reason) it's attached to the lifetime of the
475- // reader?
476- // hack a mem::swap so we can get it out :/
477- let e_swapped = e.steal();
478- // the pain: `state` *has to* outlive the reader
479- drop(reader_res);
480- return (state, Err(e_swapped.into()));
481- }
482- };
483-484 let mut out = Vec::with_capacity(n);
485486 for _ in 0..n {
487 // walk as far as we can until we run out of blocks or find a record
488- let step = match state.walker.disk_step(reader, process) {
489 Ok(s) => s,
490 Err(e) => {
491- // the pain: `state` *has to* outlive the reader
492- drop(reader_res);
493 return (state, Err(e.into()));
494 }
495 };
496- match step {
497- Step::Missing(cid) => {
498- // the pain: `state` *has to* outlive the reader
499- drop(reader_res);
500- return (state, Err(DriveError::MissingBlock(cid)));
501- }
502- Step::Finish => break,
503- Step::Found { rkey, data } => out.push((rkey, data)),
504 };
0505 }
506507- // `state` *has to* outlive the reader
508- drop(reader_res);
509-510 (state, Ok::<_, DriveError>(out))
511- },
512- )
513- .await?; // on tokio JoinError, we'll be left with invalid state :(
514515 // *must* restore state before dealing with the actual result
516 self.state = Some(state);
···527 fn read_tx_blocking(
528 &mut self,
529 n: usize,
530- tx: mpsc::Sender<Result<BlockChunk<T>, DriveError>>,
531- ) -> Result<(), mpsc::error::SendError<Result<BlockChunk<T>, DriveError>>> {
532 let BigState { store, walker } = self.state.as_mut().expect("valid state");
533- let mut reader = match store.get_reader() {
534- Ok(r) => r,
535- Err(e) => return tx.blocking_send(Err(e.into())),
536- };
537538 loop {
539- let mut out: BlockChunk<T> = Vec::with_capacity(n);
540541 for _ in 0..n {
542 // walk as far as we can until we run out of blocks or find a record
543544- let step = match walker.disk_step(&mut reader, self.process) {
545 Ok(s) => s,
546 Err(e) => return tx.blocking_send(Err(e.into())),
547 };
548549- match step {
550- Step::Missing(cid) => {
551- return tx.blocking_send(Err(DriveError::MissingBlock(cid)));
552- }
553- Step::Finish => return Ok(()),
554- Step::Found { rkey, data } => {
555- out.push((rkey, data));
556- continue;
557- }
558 };
0559 }
560561 if out.is_empty() {
···578 /// benefit over just using `.next_chunk(n)`.
579 ///
580 /// ```no_run
581- /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, process::noop};
582 /// # #[tokio::main]
583 /// # async fn main() -> Result<(), DriveError> {
584 /// # let mut disk_driver = _get_fake_disk_driver();
585 /// let (mut rx, join) = disk_driver.to_channel(512);
586 /// while let Some(recvd) = rx.recv().await {
587 /// let pairs = recvd?;
588- /// for (rkey, record) in pairs {
589- /// println!("{rkey}: size={}", record.len());
590 /// }
591 ///
592 /// }
593- /// let store = join.await?.reset_store().await?;
594 /// # Ok(())
595 /// # }
596 /// ```
···598 mut self,
599 n: usize,
600 ) -> (
601- mpsc::Receiver<Result<BlockChunk<T>, DriveError>>,
602 tokio::task::JoinHandle<Self>,
603 ) {
604- let (tx, rx) = mpsc::channel::<Result<BlockChunk<T>, DriveError>>(1);
605606 // sketch: this worker is going to be allowed to execute without a join handle
607 let chan_task = tokio::task::spawn_blocking(move || {
···614 (rx, chan_task)
615 }
616617- /// Reset the disk storage so it can be reused. You must call this.
618- ///
619- /// Ideally we'd put this in an `impl Drop`, but since it makes blocking
620- /// calls, that would be risky in an async context. For now you just have to
621- /// carefully make sure you call it.
622 ///
623- /// The sqlite store is returned, so it can be reused for another
624- /// `DiskDriver`.
625 pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
626 let BigState { store, .. } = self.state.take().expect("valid state");
627- Ok(store.reset().await?)
0628 }
629}
···1//! Consume a CAR from an AsyncRead, producing an ordered stream of records
23+use crate::{
4+ Bytes, HashMap,
5+ disk::{DiskError, DiskStore},
6+ mst::MstNode,
7+ walk::Output,
8+};
9+use cid::Cid;
10use iroh_car::CarReader;
0011use std::convert::Infallible;
12use tokio::{io::AsyncRead, sync::mpsc};
1314+use crate::mst::Commit;
15+use crate::walk::{WalkError, Walker};
1617/// Errors that can happen while consuming and emitting blocks and records
18#[derive(Debug, thiserror::Error)]
···23 BadBlock(#[from] serde_ipld_dagcbor::DecodeError<Infallible>),
24 #[error("The Commit block reference by the root was not found")]
25 MissingCommit,
0026 #[error("Failed to walk the mst tree: {0}")]
27 WalkError(#[from] WalkError),
28 #[error("CAR file had no roots")]
29 MissingRoot,
30 #[error("Storage error")]
31 StorageError(#[from] DiskError),
0032 #[error("Tried to send on a closed channel")]
33 ChannelSendError, // SendError takes <T> which we don't need
34 #[error("Failed to join a task: {0}")]
35 JoinError(#[from] tokio::task::JoinError),
36}
3738+/// An in-order chunk of Rkey + CID + (processed) Block
39+pub type BlockChunk = Vec<Output>;
000000004041+#[derive(Debug, Clone)]
42+pub(crate) enum MaybeProcessedBlock {
43 /// A block that's *probably* a Node (but we can't know yet)
44 ///
45 /// It *can be* a record that suspiciously looks a lot like a node, so we
46 /// cannot eagerly turn it into a Node. We only know for sure what it is
47 /// when we actually walk down the MST
48+ Raw(Bytes),
49 /// A processed record from a block that was definitely not a Node
50 ///
51 /// Processing has to be fallible because the CAR can have totally-unused
···61 /// There's an alternative here, which would be to kick unprocessable blocks
62 /// back to Raw, or maybe even a new RawUnprocessable variant. Then we could
63 /// surface the typed error later if needed by trying to reprocess.
64+ Processed(Bytes),
65}
6667+impl MaybeProcessedBlock {
68+ pub(crate) fn maybe(process: fn(Bytes) -> Bytes, data: Bytes) -> Self {
69+ if MstNode::could_be(&data) {
0000000000000000070 MaybeProcessedBlock::Raw(data)
71 } else {
72 MaybeProcessedBlock::Processed(process(data))
73 }
74 }
75+ pub(crate) fn len(&self) -> usize {
76+ match self {
77+ MaybeProcessedBlock::Raw(b) => b.len(),
78+ MaybeProcessedBlock::Processed(b) => b.len(),
79+ }
80+ }
81+ pub(crate) fn into_bytes(self) -> Bytes {
82+ match self {
83+ MaybeProcessedBlock::Raw(mut b) => {
84+ b.push(0x00);
85+ b
86+ }
87+ MaybeProcessedBlock::Processed(mut b) => {
88+ b.push(0x01);
89+ b
90+ }
91+ }
92+ }
93+ pub(crate) fn from_bytes(mut b: Bytes) -> Self {
94+ // TODO: make sure bytes is not empty, that it's explicitly 0 or 1, etc
95+ let suffix = b.pop().unwrap();
96+ if suffix == 0x00 {
97+ MaybeProcessedBlock::Raw(b)
98+ } else {
99+ MaybeProcessedBlock::Processed(b)
100+ }
101+ }
102}
103104/// Read a CAR file, buffering blocks in memory or to disk
105+pub enum Driver<R: AsyncRead + Unpin> {
106 /// All blocks fit within the memory limit
107 ///
108 /// You probably want to check the commit's signature. You can go ahead and
109 /// walk the MST right away.
110+ Memory(Commit, MemDriver),
111 /// Blocks exceed the memory limit
112 ///
113 /// You'll need to provide a disk storage to continue. The commit will be
114 /// returned and can be validated only once all blocks are loaded.
115+ Disk(NeedDisk<R>),
116+}
117+118+/// Processor that just returns the raw blocks
119+#[inline]
120+pub fn noop(block: Bytes) -> Bytes {
121+ block
122}
123124/// Builder-style driver setup
125+#[derive(Debug, Clone)]
126pub struct DriverBuilder {
127 pub mem_limit_mb: usize,
128+ pub block_processor: fn(Bytes) -> Bytes,
129}
130131impl Default for DriverBuilder {
132 fn default() -> Self {
133+ Self {
134+ mem_limit_mb: 16,
135+ block_processor: noop,
136+ }
137 }
138}
139···145 /// Set the in-memory size limit, in MiB
146 ///
147 /// Default: 16 MiB
148+ pub fn with_mem_limit_mb(mut self, new_limit: usize) -> Self {
149+ self.mem_limit_mb = new_limit;
150+ self
0151 }
152+153 /// Set the block processor
154 ///
155 /// Default: noop, raw blocks will be emitted
156+ pub fn with_block_processor(mut self, new_processor: fn(Bytes) -> Bytes) -> DriverBuilder {
157+ self.block_processor = new_processor;
000000000000000000000000000000158 self
159 }
160+161 /// Begin processing an atproto MST from a CAR file
162+ pub async fn load_car<R: AsyncRead + Unpin>(&self, reader: R) -> Result<Driver<R>, DriveError> {
000163 Driver::load_car(reader, self.block_processor, self.mem_limit_mb).await
164 }
165}
166167+impl<R: AsyncRead + Unpin> Driver<R> {
168 /// Begin processing an atproto MST from a CAR file
169 ///
170 /// Blocks will be loaded, processed, and buffered in memory. If the entire
···176 /// resumed by providing a `SqliteStorage` for on-disk block storage.
177 pub async fn load_car(
178 reader: R,
179+ process: fn(Bytes) -> Bytes,
180 mem_limit_mb: usize,
181+ ) -> Result<Driver<R>, DriveError> {
182 let max_size = mem_limit_mb * 2_usize.pow(20);
183 let mut mem_blocks = HashMap::new();
184···208 let maybe_processed = MaybeProcessedBlock::maybe(process, data);
209210 // stash (maybe processed) blocks in memory as long as we have room
211+ mem_size += maybe_processed.len();
212 mem_blocks.insert(cid, maybe_processed);
213 if mem_size >= max_size {
214 return Ok(Driver::Disk(NeedDisk {
···225 // all blocks loaded and we fit in memory! hopefully we found the commit...
226 let commit = commit.ok_or(DriveError::MissingCommit)?;
227228+ // the commit always must point to a Node; empty node => empty MST special case
229+ let root_node: MstNode = match mem_blocks
230+ .get(&commit.data)
231+ .ok_or(DriveError::MissingCommit)?
232+ {
233+ MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
234+ MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(bytes)?,
235+ };
236+ let walker = Walker::new(root_node);
237238 Ok(Driver::Memory(
239 commit,
···260/// work the init function will do. We can drop the CAR reader before walking,
261/// so the sync/async boundaries become a little easier to work around.
262#[derive(Debug)]
263+pub struct MemDriver {
264+ blocks: HashMap<Cid, MaybeProcessedBlock>,
265 walker: Walker,
266+ process: fn(Bytes) -> Bytes,
267}
268269+impl MemDriver {
270 /// Step through the record outputs, in rkey order
271+ pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
272 let mut out = Vec::with_capacity(n);
273 for _ in 0..n {
274 // walk as far as we can until we run out of blocks or find a record
275+ let Some(output) = self.walker.step(&mut self.blocks, self.process)? else {
276+ break;
00000277 };
278+ out.push(output);
279 }
0280 if out.is_empty() {
281 Ok(None)
282 } else {
···286}
287288/// A partially memory-loaded car file that needs disk spillover to continue
289+pub struct NeedDisk<R: AsyncRead + Unpin> {
290 car: CarReader<R>,
291 root: Cid,
292+ process: fn(Bytes) -> Bytes,
293 max_size: usize,
294+ mem_blocks: HashMap<Cid, MaybeProcessedBlock>,
295 pub commit: Option<Commit>,
296}
297298+impl<R: AsyncRead + Unpin> NeedDisk<R> {
000000000000299 pub async fn finish_loading(
300 mut self,
301 mut store: DiskStore,
302+ ) -> Result<(Commit, DiskDriver), DriveError> {
303 // move store in and back out so we can manage lifetimes
304 // dump mem blocks into the store
305 store = tokio::task::spawn(async move {
00306 let kvs = self
307 .mem_blocks
308 .into_iter()
309+ .map(|(k, v)| (k.to_bytes(), v.into_bytes()));
310311+ store.put_many(kvs)?;
0312 Ok::<_, DriveError>(store)
313 })
314 .await??;
315316+ let (tx, mut rx) = mpsc::channel::<Vec<(Cid, MaybeProcessedBlock)>>(1);
317318 let store_worker = tokio::task::spawn_blocking(move || {
00319 while let Some(chunk) = rx.blocking_recv() {
320 let kvs = chunk
321 .into_iter()
322+ .map(|(k, v)| (k.to_bytes(), v.into_bytes()));
323+ store.put_many(kvs)?;
324 }
00325 Ok::<_, DriveError>(store)
326 }); // await later
327···340 self.commit = Some(c);
341 continue;
342 }
343+344+ let data = Bytes::from(data);
345+346 // remaining possible types: node, record, other. optimistically process
347 // TODO: get the actual in-memory size to compute disk spill
348 let maybe_processed = MaybeProcessedBlock::maybe(self.process, data);
349+ mem_size += maybe_processed.len();
350 chunk.push((cid, maybe_processed));
351+ if mem_size >= (self.max_size / 2) {
352 // soooooo if we're setting the db cache to max_size and then letting
353 // multiple chunks in the queue that are >= max_size, then at any time
354 // we might be using some multiple of max_size?
···371372 let commit = self.commit.ok_or(DriveError::MissingCommit)?;
373374+ // the commit always must point to a Node; empty node => empty MST special case
375+ let db_bytes = store
376+ .get(&commit.data.to_bytes())
377+ .map_err(|e| DriveError::StorageError(DiskError::DbError(e)))?
378+ .ok_or(DriveError::MissingCommit)?;
379+380+ let node: MstNode = match MaybeProcessedBlock::from_bytes(db_bytes.to_vec()) {
381+ MaybeProcessedBlock::Processed(_) => Err(WalkError::BadCommitFingerprint)?,
382+ MaybeProcessedBlock::Raw(bytes) => serde_ipld_dagcbor::from_slice(&bytes)?,
383+ };
384+ let walker = Walker::new(node);
385386 Ok((
387 commit,
···399}
400401/// MST walker that reads from disk instead of an in-memory hashmap
402+pub struct DiskDriver {
403+ process: fn(Bytes) -> Bytes,
404 state: Option<BigState>,
405}
406407// for doctests only
408#[doc(hidden)]
409+pub fn _get_fake_disk_driver() -> DiskDriver {
0410 DiskDriver {
411 process: noop,
412 state: None,
413 }
414}
415416+impl DiskDriver {
417 /// Walk the MST returning up to `n` rkey + record pairs
418 ///
419 /// ```no_run
420+ /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
421 /// # #[tokio::main]
422 /// # async fn main() -> Result<(), DriveError> {
423 /// # let mut disk_driver = _get_fake_disk_driver();
424 /// while let Some(pairs) = disk_driver.next_chunk(256).await? {
425+ /// for output in pairs {
426+ /// println!("{}: size={}", output.rkey, output.data.len());
427 /// }
428 /// }
0429 /// # Ok(())
430 /// # }
431 /// ```
432+ pub async fn next_chunk(&mut self, n: usize) -> Result<Option<BlockChunk>, DriveError> {
433 let process = self.process;
434435 // state should only *ever* be None transiently while inside here
···438 // the big pain here is that we don't want to leave self.state in an
439 // invalid state (None), so all the error paths have to make sure it
440 // comes out again.
441+ let (state, res) =
442+ tokio::task::spawn_blocking(move || -> (BigState, Result<BlockChunk, DriveError>) {
000000000000000443 let mut out = Vec::with_capacity(n);
444445 for _ in 0..n {
446 // walk as far as we can until we run out of blocks or find a record
447+ let step = match state.walker.disk_step(&mut state.store, process) {
448 Ok(s) => s,
449 Err(e) => {
00450 return (state, Err(e.into()));
451 }
452 };
453+ let Some(output) = step else {
454+ break;
000000455 };
456+ out.push(output);
457 }
458000459 (state, Ok::<_, DriveError>(out))
460+ })
461+ .await?; // on tokio JoinError, we'll be left with invalid state :(
0462463 // *must* restore state before dealing with the actual result
464 self.state = Some(state);
···475 fn read_tx_blocking(
476 &mut self,
477 n: usize,
478+ tx: mpsc::Sender<Result<BlockChunk, DriveError>>,
479+ ) -> Result<(), mpsc::error::SendError<Result<BlockChunk, DriveError>>> {
480 let BigState { store, walker } = self.state.as_mut().expect("valid state");
0000481482 loop {
483+ let mut out: BlockChunk = Vec::with_capacity(n);
484485 for _ in 0..n {
486 // walk as far as we can until we run out of blocks or find a record
487488+ let step = match walker.disk_step(store, self.process) {
489 Ok(s) => s,
490 Err(e) => return tx.blocking_send(Err(e.into())),
491 };
492493+ let Some(output) = step else {
494+ break;
0000000495 };
496+ out.push(output);
497 }
498499 if out.is_empty() {
···516 /// benefit over just using `.next_chunk(n)`.
517 ///
518 /// ```no_run
519+ /// # use repo_stream::{drive::{DiskDriver, DriveError, _get_fake_disk_driver}, noop};
520 /// # #[tokio::main]
521 /// # async fn main() -> Result<(), DriveError> {
522 /// # let mut disk_driver = _get_fake_disk_driver();
523 /// let (mut rx, join) = disk_driver.to_channel(512);
524 /// while let Some(recvd) = rx.recv().await {
525 /// let pairs = recvd?;
526+ /// for output in pairs {
527+ /// println!("{}: size={}", output.rkey, output.data.len());
528 /// }
529 ///
530 /// }
0531 /// # Ok(())
532 /// # }
533 /// ```
···535 mut self,
536 n: usize,
537 ) -> (
538+ mpsc::Receiver<Result<BlockChunk, DriveError>>,
539 tokio::task::JoinHandle<Self>,
540 ) {
541+ let (tx, rx) = mpsc::channel::<Result<BlockChunk, DriveError>>(1);
542543 // sketch: this worker is going to be allowed to execute without a join handle
544 let chan_task = tokio::task::spawn_blocking(move || {
···551 (rx, chan_task)
552 }
553554+ /// Reset the disk storage so it can be reused.
0000555 ///
556+ /// The store is returned, so it can be reused for another `DiskDriver`.
0557 pub async fn reset_store(mut self) -> Result<DiskStore, DriveError> {
558 let BigState { store, .. } = self.state.take().expect("valid state");
559+ store.reset().await?;
560+ Ok(store)
561 }
562}
+19-9
src/lib.rs
···2728match DriverBuilder::new()
29 .with_mem_limit_mb(10)
30- .with_block_processor(|rec| rec.len()) // block processing: just extract the raw record size
0031 .load_car(reader)
32 .await?
33{
···35 // if all blocks fit within memory
36 Driver::Memory(_commit, mut driver) => {
37 while let Some(chunk) = driver.next_chunk(256).await? {
38- for (_rkey, size) in chunk {
0039 total_size += size;
40 }
41 }
···49 let (_commit, mut driver) = paused.finish_loading(store).await?;
5051 while let Some(chunk) = driver.next_chunk(256).await? {
52- for (_rkey, size) in chunk {
0053 total_size += size;
54 }
55 }
56-57- // clean up the disk store (drop tables etc)
58- driver.reset_store().await?;
59 }
60};
61println!("sum of size of all records: {total_size}");
···7980pub mod disk;
81pub mod drive;
82-pub mod process;
8384pub use disk::{DiskBuilder, DiskError, DiskStore};
85-pub use drive::{DriveError, Driver, DriverBuilder};
86pub use mst::Commit;
87-pub use process::Processable;
00000000
···2728match DriverBuilder::new()
29 .with_mem_limit_mb(10)
30+ .with_block_processor(
31+ |rec| rec.len().to_ne_bytes().to_vec()
32+ ) // block processing: just extract the raw record size
33 .load_car(reader)
34 .await?
35{
···37 // if all blocks fit within memory
38 Driver::Memory(_commit, mut driver) => {
39 while let Some(chunk) = driver.next_chunk(256).await? {
40+ for output in chunk {
41+ let size = usize::from_ne_bytes(output.data.try_into().unwrap());
42+43 total_size += size;
44 }
45 }
···53 let (_commit, mut driver) = paused.finish_loading(store).await?;
5455 while let Some(chunk) = driver.next_chunk(256).await? {
56+ for output in chunk {
57+ let size = usize::from_ne_bytes(output.data.try_into().unwrap());
58+59 total_size += size;
60 }
61 }
00062 }
63};
64println!("sum of size of all records: {total_size}");
···8283pub mod disk;
84pub mod drive;
08586pub use disk::{DiskBuilder, DiskError, DiskStore};
87+pub use drive::{DriveError, Driver, DriverBuilder, NeedDisk, noop};
88pub use mst::Commit;
89+pub use walk::Output;
90+91+pub type Bytes = Vec<u8>;
92+93+pub(crate) use hashbrown::HashMap;
94+95+#[doc = include_str!("../readme.md")]
96+#[cfg(doctest)]
97+pub struct ReadmeDoctests;
+151-28
src/mst.rs
···3//! The primary aim is to work through the **tree** structure. Non-node blocks
4//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
56-use ipld_core::cid::Cid;
7use serde::Deserialize;
089/// The top-level data object in a repository's tree is a signed commit.
10#[derive(Debug, Deserialize)]
···33 pub prev: Option<Cid>,
34 /// cryptographic signature of this commit, as raw bytes
35 #[serde(with = "serde_bytes")]
36- pub sig: Vec<u8>,
0000000000000000000000000000037}
3839-/// MST node data schema
40-#[derive(Debug, Deserialize, PartialEq)]
41-#[serde(deny_unknown_fields)]
42-pub(crate) struct Node {
43- /// link to sub-tree Node on a lower level and with all keys sorting before
44- /// keys at this node
45- #[serde(rename = "l")]
46- pub left: Option<Cid>,
47- /// ordered list of TreeEntry objects
48- ///
49- /// atproto MSTs have a fanout of 4, so there can be max 4 entries.
50- #[serde(rename = "e")]
51- pub entries: Vec<Entry>, // maybe we can do [Option<Entry>; 4]?
00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000052}
5354-impl Node {
00055 /// test if a block could possibly be a node
56 ///
57 /// we can't eagerly decode records except where we're *sure* they cannot be
···62 /// so if a block *could be* a node, any record converter must postpone
63 /// processing. if it turns out it happens to be a very node-looking record,
64 /// well, sorry, it just has to only be processed later when that's known.
065 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool {
66 const NODE_FINGERPRINT: [u8; 3] = [
67 0xA2, // map length 2 (for "l" and "e" keys)
···76 .map(|b| b & 0b1110_0000 == 0x80)
77 .unwrap_or(false)
78 }
79-80- /// Check if a node has any entries
81- ///
82- /// An empty repository with no records is represented as a single MST node
83- /// with an empty array of entries. This is the only situation in which a
84- /// tree may contain an empty leaf node which does not either contain keys
85- /// ("entries") or point to a sub-tree containing entries.
86- pub(crate) fn is_empty(&self) -> bool {
87- self.left.is_none() && self.entries.is_empty()
88- }
89}
9091/// TreeEntry object
···96 #[serde(rename = "p")]
97 pub prefix_len: usize,
98 /// remainder of key for this TreeEntry, after "prefixlen" have been removed
99- #[serde(rename = "k", with = "serde_bytes")]
100- pub keysuffix: Vec<u8>, // can we String this here?
101 /// link to the record data (CBOR) for this entry
102 #[serde(rename = "v")]
103 pub value: Cid,
···3//! The primary aim is to work through the **tree** structure. Non-node blocks
4//! are left as raw bytes, for upper levels to parse into DAG-CBOR or whatever.
56+use cid::Cid;
7use serde::Deserialize;
8+use sha2::{Digest, Sha256};
910/// The top-level data object in a repository's tree is a signed commit.
11#[derive(Debug, Deserialize)]
···34 pub prev: Option<Cid>,
35 /// cryptographic signature of this commit, as raw bytes
36 #[serde(with = "serde_bytes")]
37+ pub sig: serde_bytes::ByteBuf,
38+}
39+40+use serde::de::{self, Deserializer, MapAccess, Unexpected, Visitor};
41+use std::fmt;
42+43+pub type Depth = u32;
44+45+#[inline(always)]
46+pub fn atproto_mst_depth(key: &str) -> Depth {
47+ // 128 bits oughta be enough: https://bsky.app/profile/retr0.id/post/3jwwbf4izps24
48+ u128::from_be_bytes(Sha256::digest(key).split_at(16).0.try_into().unwrap()).leading_zeros() / 2
49+}
50+51+#[derive(Debug)]
52+pub(crate) struct MstNode {
53+ pub depth: Option<Depth>, // known for nodes with entries (required for root)
54+ pub things: Vec<NodeThing>,
55+}
56+57+#[derive(Debug)]
58+pub(crate) struct NodeThing {
59+ pub(crate) cid: Cid,
60+ pub(crate) kind: ThingKind,
61+}
62+63+#[derive(Debug)]
64+pub(crate) enum ThingKind {
65+ Tree,
66+ Value { rkey: String },
67}
6869+impl<'de> Deserialize<'de> for MstNode {
70+ fn deserialize<D>(deserializer: D) -> Result<Self, D::Error>
71+ where
72+ D: Deserializer<'de>,
73+ {
74+ struct NodeVisitor;
75+ impl<'de> Visitor<'de> for NodeVisitor {
76+ type Value = MstNode;
77+78+ fn expecting(&self, formatter: &mut fmt::Formatter) -> fmt::Result {
79+ formatter.write_str("struct MstNode")
80+ }
81+82+ fn visit_map<V>(self, mut map: V) -> Result<MstNode, V::Error>
83+ where
84+ V: MapAccess<'de>,
85+ {
86+ let mut found_left = false;
87+ let mut left = None;
88+ let mut found_entries = false;
89+ let mut things = Vec::new();
90+ let mut depth = None;
91+92+ while let Some(key) = map.next_key()? {
93+ match key {
94+ "l" => {
95+ if found_left {
96+ return Err(de::Error::duplicate_field("l"));
97+ }
98+ found_left = true;
99+ if let Some(cid) = map.next_value()? {
100+ left = Some(NodeThing {
101+ cid,
102+ kind: ThingKind::Tree,
103+ });
104+ }
105+ }
106+ "e" => {
107+ if found_entries {
108+ return Err(de::Error::duplicate_field("e"));
109+ }
110+ found_entries = true;
111+112+ let mut prefix: Vec<u8> = vec![];
113+114+ for entry in map.next_value::<Vec<Entry>>()? {
115+ let mut rkey: Vec<u8> = vec![];
116+ let pre_checked =
117+ prefix.get(..entry.prefix_len).ok_or_else(|| {
118+ de::Error::invalid_value(
119+ Unexpected::Bytes(&prefix),
120+ &"a prefix at least as long as the prefix_len",
121+ )
122+ })?;
123+124+ rkey.extend_from_slice(pre_checked);
125+ rkey.extend_from_slice(&entry.keysuffix);
126+127+ let rkey_s = String::from_utf8(rkey.clone()).map_err(|_| {
128+ de::Error::invalid_value(
129+ Unexpected::Bytes(&rkey),
130+ &"a valid utf-8 rkey",
131+ )
132+ })?;
133+134+ let key_depth = atproto_mst_depth(&rkey_s);
135+ if depth.is_none() {
136+ depth = Some(key_depth);
137+ } else if Some(key_depth) != depth {
138+ return Err(de::Error::invalid_value(
139+ Unexpected::Bytes(&prefix),
140+ &"all rkeys to have equal MST depth",
141+ ));
142+ }
143+144+ things.push(NodeThing {
145+ cid: entry.value,
146+ kind: ThingKind::Value { rkey: rkey_s },
147+ });
148+149+ if let Some(cid) = entry.tree {
150+ things.push(NodeThing {
151+ cid,
152+ kind: ThingKind::Tree,
153+ });
154+ }
155+156+ prefix = rkey;
157+ }
158+ }
159+ f => return Err(de::Error::unknown_field(f, NODE_FIELDS)),
160+ }
161+ }
162+ if !found_left {
163+ return Err(de::Error::missing_field("l"));
164+ }
165+ if !found_entries {
166+ return Err(de::Error::missing_field("e"));
167+ }
168+169+ things.reverse();
170+ if let Some(l) = left {
171+ things.push(l);
172+ }
173+174+ Ok(MstNode { depth, things })
175+ }
176+ }
177+178+ const NODE_FIELDS: &[&str] = &["l", "e"];
179+ deserializer.deserialize_struct("MstNode", NODE_FIELDS, NodeVisitor)
180+ }
181}
182183+impl MstNode {
184+ pub(crate) fn is_empty(&self) -> bool {
185+ self.things.is_empty()
186+ }
187 /// test if a block could possibly be a node
188 ///
189 /// we can't eagerly decode records except where we're *sure* they cannot be
···194 /// so if a block *could be* a node, any record converter must postpone
195 /// processing. if it turns out it happens to be a very node-looking record,
196 /// well, sorry, it just has to only be processed later when that's known.
197+ #[inline(always)]
198 pub(crate) fn could_be(bytes: impl AsRef<[u8]>) -> bool {
199 const NODE_FINGERPRINT: [u8; 3] = [
200 0xA2, // map length 2 (for "l" and "e" keys)
···209 .map(|b| b & 0b1110_0000 == 0x80)
210 .unwrap_or(false)
211 }
0000000000212}
213214/// TreeEntry object
···219 #[serde(rename = "p")]
220 pub prefix_len: usize,
221 /// remainder of key for this TreeEntry, after "prefixlen" have been removed
222+ #[serde(rename = "k")]
223+ pub keysuffix: serde_bytes::ByteBuf,
224 /// link to the record data (CBOR) for this entry
225 #[serde(rename = "v")]
226 pub value: Cid,
-87
src/process.rs
···1-/*!
2-Record processor function output trait
3-4-The return type must satisfy the `Processable` trait, which requires:
5-6-- `Clone` because two rkeys can refer to the same record by CID, which may
7- only appear once in the CAR file.
8-- `Serialize + DeserializeOwned` so it can be spilled to disk.
9-10-One required function must be implemented, `get_size()`: this should return the
11-approximate total off-stack size of the type. (the on-stack size will be added
12-automatically via `std::mem::get_size`).
13-14-Note that it is **not guaranteed** that the `process` function will run on a
15-block before storing it in memory or on disk: it's not possible to know if a
16-block is a record without actually walking the MST, so the best we can do is
17-apply `process` to any block that we know *cannot* be an MST node, and otherwise
18-store the raw block bytes.
19-20-Here's a silly processing function that just collects 'eyy's found in the raw
21-record bytes
22-23-```
24-# use repo_stream::Processable;
25-# use serde::{Serialize, Deserialize};
26-#[derive(Debug, Clone, Serialize, Deserialize)]
27-struct Eyy(usize, String);
28-29-impl Processable for Eyy {
30- fn get_size(&self) -> usize {
31- // don't need to compute the usize, it's on the stack
32- self.1.capacity() // in-mem size from the string's capacity, in bytes
33- }
34-}
35-36-fn process(raw: Vec<u8>) -> Vec<Eyy> {
37- let mut out = Vec::new();
38- let to_find = "eyy".as_bytes();
39- for i in 0..(raw.len() - 3) {
40- if &raw[i..(i+3)] == to_find {
41- out.push(Eyy(i, "eyy".to_string()));
42- }
43- }
44- out
45-}
46-```
47-48-The memory sizing stuff is a little sketch but probably at least approximately
49-works.
50-*/
51-52-use serde::{Serialize, de::DeserializeOwned};
53-54-/// Output trait for record processing
55-pub trait Processable: Clone + Serialize + DeserializeOwned {
56- /// Any additional in-memory size taken by the processed type
57- ///
58- /// Do not include stack size (`std::mem::size_of`)
59- fn get_size(&self) -> usize;
60-}
61-62-/// Processor that just returns the raw blocks
63-#[inline]
64-pub fn noop(block: Vec<u8>) -> Vec<u8> {
65- block
66-}
67-68-impl Processable for u8 {
69- fn get_size(&self) -> usize {
70- 0
71- }
72-}
73-74-impl Processable for usize {
75- fn get_size(&self) -> usize {
76- 0 // no additional space taken, just its stack size (newtype is free)
77- }
78-}
79-80-impl<Item: Sized + Processable> Processable for Vec<Item> {
81- fn get_size(&self) -> usize {
82- let slot_size = std::mem::size_of::<Item>();
83- let direct_size = slot_size * self.capacity();
84- let items_referenced_size: usize = self.iter().map(|item| item.get_size()).sum();
85- direct_size + items_referenced_size
86- }
87-}