+10
-3
crates/jacquard-axum/src/service_auth.rs
+10
-3
crates/jacquard-axum/src/service_auth.rs
···
572
572
573
573
match codec {
574
574
// p256-pub (0x1200)
575
-
[0x80, 0x24] => PublicKey::from_p256_bytes(key_material).ok(),
575
+
[0x80, 0x24] => PublicKey::from_p256_bytes(key_material).inspect_err(|e| {
576
+
tracing::error!("Failed to parse p256 public key: {}", e);
577
+
}).ok(),
576
578
// secp256k1-pub (0xe7)
577
-
[0xe7, 0x01] => PublicKey::from_k256_bytes(key_material).ok(),
578
-
_ => None,
579
+
[0xe7, 0x01] => PublicKey::from_k256_bytes(key_material).inspect_err(|e| {
580
+
tracing::error!("Failed to parse secp256k1 public key: {}", e);
581
+
}).ok(),
582
+
_ => {
583
+
tracing::error!("Unsupported public key multicodec: {:?}", codec);
584
+
None
585
+
},
579
586
}
580
587
}
581
588
+1
-3
crates/jacquard-repo/Cargo.toml
+1
-3
crates/jacquard-repo/Cargo.toml
···
45
45
# Async
46
46
trait-variant.workspace = true
47
47
n0-future.workspace = true
48
-
tokio = { workspace = true, default-features = false, features = ["fs", "io-util"] }
49
-
48
+
tokio = { workspace = true, default-features = false, features = ["io-util"] }
50
49
51
50
# Crypto (for commit signing/verification)
52
51
ed25519-dalek = { version = "2", features = ["rand_core"] }
···
55
54
56
55
[dev-dependencies]
57
56
serde_ipld_dagjson = "0.2"
58
-
tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "fs"] }
59
57
tempfile = "3.14"
60
58
rand = "0.8"
61
59
hex = "0.4"
+2
-2
crates/jacquard-repo/src/car/mod.rs
+2
-2
crates/jacquard-repo/src/car/mod.rs
···
24
24
pub mod writer;
25
25
26
26
// Re-export commonly used functions and types
27
-
pub use reader::{parse_car_bytes, read_car, read_car_header, stream_car, ParsedCar};
28
-
pub use writer::{export_repo_car, write_car, write_car_bytes};
27
+
pub use reader::{parse_car_bytes, ParsedCar};
28
+
pub use writer::{write_car_bytes};
-86
crates/jacquard-repo/src/car/reader.rs
-86
crates/jacquard-repo/src/car/reader.rs
···
11
11
use std::collections::BTreeMap;
12
12
use std::path::Path;
13
13
use std::pin::Pin;
14
-
use tokio::fs::File;
15
14
16
15
/// Parsed CAR file data
17
16
#[derive(Debug, Clone)]
···
22
21
pub blocks: BTreeMap<IpldCid, Bytes>,
23
22
}
24
23
25
-
/// Read entire CAR file into memory
26
-
///
27
-
/// Returns BTreeMap of CID -> block data (sorted order for determinism).
28
-
/// For large CAR files, consider using `stream_car()` instead.
29
-
pub async fn read_car(path: impl AsRef<Path>) -> Result<BTreeMap<IpldCid, Bytes>> {
30
-
let path = path.as_ref();
31
-
let file = File::open(path)
32
-
.await
33
-
.map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?;
34
-
35
-
let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?;
36
-
37
-
let mut blocks = BTreeMap::new();
38
-
let stream = reader.stream();
39
-
n0_future::pin!(stream);
40
-
41
-
while let Some(result) = stream.next().await {
42
-
let (cid, data) = result.map_err(|e| RepoError::car_parse(e))?;
43
-
blocks.insert(cid, Bytes::from(data));
44
-
}
45
-
46
-
Ok(blocks)
47
-
}
48
-
49
-
/// Read CAR file header (roots only)
50
-
///
51
-
/// Useful for checking roots without loading all blocks.
52
-
pub async fn read_car_header(path: impl AsRef<Path>) -> Result<Vec<IpldCid>> {
53
-
let path = path.as_ref();
54
-
let file = File::open(path)
55
-
.await
56
-
.map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?;
57
-
58
-
let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?;
59
-
60
-
Ok(reader.header().roots().to_vec())
61
-
}
62
-
63
24
/// Parse CAR bytes into root and block map
64
25
///
65
26
/// For in-memory CAR data (e.g., from firehose commit messages, merkle proofs).
···
85
46
}
86
47
87
48
Ok(ParsedCar { root, blocks })
88
-
}
89
-
90
-
/// Stream CAR blocks without loading entire file into memory
91
-
///
92
-
/// Useful for processing large CAR files incrementally.
93
-
pub async fn stream_car(path: impl AsRef<Path>) -> Result<CarBlockStream> {
94
-
let path = path.as_ref();
95
-
let file = File::open(path)
96
-
.await
97
-
.map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?;
98
-
99
-
let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?;
100
-
101
-
let roots = reader.header().roots().to_vec();
102
-
let stream = Box::pin(reader.stream());
103
-
104
-
Ok(CarBlockStream { stream, roots })
105
-
}
106
-
107
-
/// Streaming CAR block reader
108
-
///
109
-
/// Iterates through CAR blocks without loading entire file into memory.
110
-
pub struct CarBlockStream {
111
-
stream: Pin<
112
-
Box<dyn Stream<Item = std::result::Result<(IpldCid, Vec<u8>), iroh_car::Error>> + Send>,
113
-
>,
114
-
roots: Vec<IpldCid>,
115
-
}
116
-
117
-
impl CarBlockStream {
118
-
/// Get next block from the stream
119
-
///
120
-
/// Returns `None` when stream is exhausted.
121
-
pub async fn next(&mut self) -> Result<Option<(IpldCid, Bytes)>> {
122
-
match self.stream.next().await {
123
-
Some(result) => {
124
-
let (cid, data) = result.map_err(|e| RepoError::car_parse(e))?;
125
-
Ok(Some((cid, Bytes::from(data))))
126
-
}
127
-
None => Ok(None),
128
-
}
129
-
}
130
-
131
-
/// Get the CAR file roots
132
-
pub fn roots(&self) -> &[IpldCid] {
133
-
&self.roots
134
-
}
135
49
}
136
50
137
51
#[cfg(test)]
-83
crates/jacquard-repo/src/car/writer.rs
-83
crates/jacquard-repo/src/car/writer.rs
···
10
10
use iroh_car::CarWriter;
11
11
use std::collections::BTreeMap;
12
12
use std::path::Path;
13
-
use tokio::fs::File;
14
13
use tokio::io::AsyncWriteExt;
15
14
16
-
/// Write blocks to CAR file
17
-
///
18
-
/// Roots should contain commit CID(s).
19
-
/// Blocks are written in sorted CID order (BTreeMap) for determinism.
20
-
pub async fn write_car(
21
-
path: impl AsRef<Path>,
22
-
roots: Vec<IpldCid>,
23
-
blocks: BTreeMap<IpldCid, Bytes>,
24
-
) -> Result<()> {
25
-
let path = path.as_ref();
26
-
let file = File::create(path).await.map_err(|e| {
27
-
RepoError::io(e).with_context(format!("creating CAR file: {}", path.display()))
28
-
})?;
29
-
30
-
let header = iroh_car::CarHeader::new_v1(roots);
31
-
let mut writer = CarWriter::new(header, file);
32
-
33
-
for (cid, data) in blocks {
34
-
writer
35
-
.write(cid, data.as_ref())
36
-
.await
37
-
.map_err(|e| RepoError::car(e).with_context(format!("writing block {}", cid)))?;
38
-
}
39
-
40
-
writer
41
-
.finish()
42
-
.await
43
-
.map_err(|e| RepoError::car(e).with_context("finalizing CAR file"))?;
44
-
45
-
Ok(())
46
-
}
47
-
48
15
/// Write blocks to CAR bytes (in-memory)
49
16
///
50
17
/// Like `write_car()` but writes to a `Vec<u8>` instead of a file.
···
72
39
.map_err(|e| RepoError::io(e).with_context("flushing CAR buffer"))?;
73
40
74
41
Ok(buffer)
75
-
}
76
-
77
-
/// Write MST + commit to CAR file
78
-
///
79
-
/// Streams blocks directly to CAR file:
80
-
/// - Commit block (from storage)
81
-
/// - All MST node blocks (from storage)
82
-
/// - All record blocks (from storage)
83
-
///
84
-
/// Uses streaming to avoid loading all blocks into memory.
85
-
///
86
-
/// Should write in the correct order for [streaming car processing](https://github.com/bluesky-social/proposals/blob/main/0006-sync-iteration/README.md#streaming-car-processing) from sync v1.1
87
-
pub async fn export_repo_car<S: BlockStore + Sync + 'static>(
88
-
path: impl AsRef<Path>,
89
-
commit_cid: IpldCid,
90
-
mst: &Mst<S>,
91
-
) -> Result<()> {
92
-
let path = path.as_ref();
93
-
let file = File::create(path).await.map_err(|e| {
94
-
RepoError::io(e).with_context(format!("creating CAR export file: {}", path.display()))
95
-
})?;
96
-
97
-
let header = iroh_car::CarHeader::new_v1(vec![commit_cid]);
98
-
let mut writer = CarWriter::new(header, file);
99
-
100
-
// Write commit block first
101
-
let storage = mst.storage();
102
-
let commit_data = storage
103
-
.get(&commit_cid)
104
-
.await?
105
-
.ok_or_else(|| {
106
-
RepoError::not_found("commit", &commit_cid)
107
-
.with_help("Commit must be persisted to storage before exporting - ensure apply_commit() was called")
108
-
})?;
109
-
110
-
writer
111
-
.write(commit_cid, &commit_data)
112
-
.await
113
-
.map_err(|e| RepoError::car(e).with_context("writing commit block"))?;
114
-
115
-
// Stream MST and record blocks
116
-
mst.write_blocks_to_car(&mut writer).await?;
117
-
118
-
// Finish writing
119
-
writer
120
-
.finish()
121
-
.await
122
-
.map_err(|e| RepoError::car(e).with_context("finalizing CAR export"))?;
123
-
124
-
Ok(())
125
42
}
126
43
127
44
#[cfg(test)]
+1
-1
crates/jacquard-repo/src/lib.rs
+1
-1
crates/jacquard-repo/src/lib.rs
···
58
58
pub use error::{RepoError, RepoErrorKind, Result};
59
59
pub use mst::{Mst, MstDiff, WriteOp};
60
60
pub use repo::{CommitData, Repository};
61
-
pub use storage::{BlockStore, FileBlockStore, LayeredBlockStore, MemoryBlockStore};
61
+
pub use storage::{BlockStore, MemoryBlockStore};
62
62
63
63
/// DAG-CBOR codec identifier for CIDs (0x71)
64
64
pub const DAG_CBOR_CID_CODEC: u64 = 0x71;
-5
crates/jacquard-repo/src/repo.rs
-5
crates/jacquard-repo/src/repo.rs
···
583
583
Ok((ops, self.apply_commit(commit_data).await?))
584
584
}
585
585
586
-
/// Export repository to CAR file
587
-
pub async fn export_car(&self, path: impl AsRef<Path>, commit_cid: IpldCid) -> Result<()> {
588
-
crate::car::export_repo_car(path, commit_cid, &self.mst).await
589
-
}
590
-
591
586
/// Get the underlying MST
592
587
pub fn mst(&self) -> &Mst<S> {
593
588
&self.mst
+2
-4
crates/jacquard-repo/src/storage/mod.rs
+2
-4
crates/jacquard-repo/src/storage/mod.rs
···
93
93
async fn apply_commit(&self, commit: CommitData) -> Result<()>;
94
94
}
95
95
96
-
pub mod file;
96
+
pub mod memory;
97
97
pub mod layered;
98
-
pub mod memory;
99
98
100
-
pub use file::FileBlockStore;
99
+
pub use memory::MemoryBlockStore;
101
100
pub use layered::LayeredBlockStore;
102
-
pub use memory::MemoryBlockStore;