this repo has no description

Compare changes

Choose any two refs to compare.

Changed files
+16 -187
crates
jacquard-axum
jacquard-repo
+10 -3
crates/jacquard-axum/src/service_auth.rs
··· 572 572 573 573 match codec { 574 574 // p256-pub (0x1200) 575 - [0x80, 0x24] => PublicKey::from_p256_bytes(key_material).ok(), 575 + [0x80, 0x24] => PublicKey::from_p256_bytes(key_material).inspect_err(|e| { 576 + tracing::error!("Failed to parse p256 public key: {}", e); 577 + }).ok(), 576 578 // secp256k1-pub (0xe7) 577 - [0xe7, 0x01] => PublicKey::from_k256_bytes(key_material).ok(), 578 - _ => None, 579 + [0xe7, 0x01] => PublicKey::from_k256_bytes(key_material).inspect_err(|e| { 580 + tracing::error!("Failed to parse secp256k1 public key: {}", e); 581 + }).ok(), 582 + _ => { 583 + tracing::error!("Unsupported public key multicodec: {:?}", codec); 584 + None 585 + }, 579 586 } 580 587 } 581 588
+1 -3
crates/jacquard-repo/Cargo.toml
··· 45 45 # Async 46 46 trait-variant.workspace = true 47 47 n0-future.workspace = true 48 - tokio = { workspace = true, default-features = false, features = ["fs", "io-util"] } 49 - 48 + tokio = { workspace = true, default-features = false, features = ["io-util"] } 50 49 51 50 # Crypto (for commit signing/verification) 52 51 ed25519-dalek = { version = "2", features = ["rand_core"] } ··· 55 54 56 55 [dev-dependencies] 57 56 serde_ipld_dagjson = "0.2" 58 - tokio = { workspace = true, features = ["macros", "rt", "rt-multi-thread", "fs"] } 59 57 tempfile = "3.14" 60 58 rand = "0.8" 61 59 hex = "0.4"
+2 -2
crates/jacquard-repo/src/car/mod.rs
··· 24 24 pub mod writer; 25 25 26 26 // Re-export commonly used functions and types 27 - pub use reader::{parse_car_bytes, read_car, read_car_header, stream_car, ParsedCar}; 28 - pub use writer::{export_repo_car, write_car, write_car_bytes}; 27 + pub use reader::{parse_car_bytes, ParsedCar}; 28 + pub use writer::{write_car_bytes};
-86
crates/jacquard-repo/src/car/reader.rs
··· 11 11 use std::collections::BTreeMap; 12 12 use std::path::Path; 13 13 use std::pin::Pin; 14 - use tokio::fs::File; 15 14 16 15 /// Parsed CAR file data 17 16 #[derive(Debug, Clone)] ··· 22 21 pub blocks: BTreeMap<IpldCid, Bytes>, 23 22 } 24 23 25 - /// Read entire CAR file into memory 26 - /// 27 - /// Returns BTreeMap of CID -> block data (sorted order for determinism). 28 - /// For large CAR files, consider using `stream_car()` instead. 29 - pub async fn read_car(path: impl AsRef<Path>) -> Result<BTreeMap<IpldCid, Bytes>> { 30 - let path = path.as_ref(); 31 - let file = File::open(path) 32 - .await 33 - .map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?; 34 - 35 - let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?; 36 - 37 - let mut blocks = BTreeMap::new(); 38 - let stream = reader.stream(); 39 - n0_future::pin!(stream); 40 - 41 - while let Some(result) = stream.next().await { 42 - let (cid, data) = result.map_err(|e| RepoError::car_parse(e))?; 43 - blocks.insert(cid, Bytes::from(data)); 44 - } 45 - 46 - Ok(blocks) 47 - } 48 - 49 - /// Read CAR file header (roots only) 50 - /// 51 - /// Useful for checking roots without loading all blocks. 52 - pub async fn read_car_header(path: impl AsRef<Path>) -> Result<Vec<IpldCid>> { 53 - let path = path.as_ref(); 54 - let file = File::open(path) 55 - .await 56 - .map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?; 57 - 58 - let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?; 59 - 60 - Ok(reader.header().roots().to_vec()) 61 - } 62 - 63 24 /// Parse CAR bytes into root and block map 64 25 /// 65 26 /// For in-memory CAR data (e.g., from firehose commit messages, merkle proofs). ··· 85 46 } 86 47 87 48 Ok(ParsedCar { root, blocks }) 88 - } 89 - 90 - /// Stream CAR blocks without loading entire file into memory 91 - /// 92 - /// Useful for processing large CAR files incrementally. 93 - pub async fn stream_car(path: impl AsRef<Path>) -> Result<CarBlockStream> { 94 - let path = path.as_ref(); 95 - let file = File::open(path) 96 - .await 97 - .map_err(|e| RepoError::io(e).with_context(format!("opening CAR file: {}", path.display())))?; 98 - 99 - let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?; 100 - 101 - let roots = reader.header().roots().to_vec(); 102 - let stream = Box::pin(reader.stream()); 103 - 104 - Ok(CarBlockStream { stream, roots }) 105 - } 106 - 107 - /// Streaming CAR block reader 108 - /// 109 - /// Iterates through CAR blocks without loading entire file into memory. 110 - pub struct CarBlockStream { 111 - stream: Pin< 112 - Box<dyn Stream<Item = std::result::Result<(IpldCid, Vec<u8>), iroh_car::Error>> + Send>, 113 - >, 114 - roots: Vec<IpldCid>, 115 - } 116 - 117 - impl CarBlockStream { 118 - /// Get next block from the stream 119 - /// 120 - /// Returns `None` when stream is exhausted. 121 - pub async fn next(&mut self) -> Result<Option<(IpldCid, Bytes)>> { 122 - match self.stream.next().await { 123 - Some(result) => { 124 - let (cid, data) = result.map_err(|e| RepoError::car_parse(e))?; 125 - Ok(Some((cid, Bytes::from(data)))) 126 - } 127 - None => Ok(None), 128 - } 129 - } 130 - 131 - /// Get the CAR file roots 132 - pub fn roots(&self) -> &[IpldCid] { 133 - &self.roots 134 - } 135 49 } 136 50 137 51 #[cfg(test)]
-83
crates/jacquard-repo/src/car/writer.rs
··· 10 10 use iroh_car::CarWriter; 11 11 use std::collections::BTreeMap; 12 12 use std::path::Path; 13 - use tokio::fs::File; 14 13 use tokio::io::AsyncWriteExt; 15 14 16 - /// Write blocks to CAR file 17 - /// 18 - /// Roots should contain commit CID(s). 19 - /// Blocks are written in sorted CID order (BTreeMap) for determinism. 20 - pub async fn write_car( 21 - path: impl AsRef<Path>, 22 - roots: Vec<IpldCid>, 23 - blocks: BTreeMap<IpldCid, Bytes>, 24 - ) -> Result<()> { 25 - let path = path.as_ref(); 26 - let file = File::create(path).await.map_err(|e| { 27 - RepoError::io(e).with_context(format!("creating CAR file: {}", path.display())) 28 - })?; 29 - 30 - let header = iroh_car::CarHeader::new_v1(roots); 31 - let mut writer = CarWriter::new(header, file); 32 - 33 - for (cid, data) in blocks { 34 - writer 35 - .write(cid, data.as_ref()) 36 - .await 37 - .map_err(|e| RepoError::car(e).with_context(format!("writing block {}", cid)))?; 38 - } 39 - 40 - writer 41 - .finish() 42 - .await 43 - .map_err(|e| RepoError::car(e).with_context("finalizing CAR file"))?; 44 - 45 - Ok(()) 46 - } 47 - 48 15 /// Write blocks to CAR bytes (in-memory) 49 16 /// 50 17 /// Like `write_car()` but writes to a `Vec<u8>` instead of a file. ··· 72 39 .map_err(|e| RepoError::io(e).with_context("flushing CAR buffer"))?; 73 40 74 41 Ok(buffer) 75 - } 76 - 77 - /// Write MST + commit to CAR file 78 - /// 79 - /// Streams blocks directly to CAR file: 80 - /// - Commit block (from storage) 81 - /// - All MST node blocks (from storage) 82 - /// - All record blocks (from storage) 83 - /// 84 - /// Uses streaming to avoid loading all blocks into memory. 85 - /// 86 - /// Should write in the correct order for [streaming car processing](https://github.com/bluesky-social/proposals/blob/main/0006-sync-iteration/README.md#streaming-car-processing) from sync v1.1 87 - pub async fn export_repo_car<S: BlockStore + Sync + 'static>( 88 - path: impl AsRef<Path>, 89 - commit_cid: IpldCid, 90 - mst: &Mst<S>, 91 - ) -> Result<()> { 92 - let path = path.as_ref(); 93 - let file = File::create(path).await.map_err(|e| { 94 - RepoError::io(e).with_context(format!("creating CAR export file: {}", path.display())) 95 - })?; 96 - 97 - let header = iroh_car::CarHeader::new_v1(vec![commit_cid]); 98 - let mut writer = CarWriter::new(header, file); 99 - 100 - // Write commit block first 101 - let storage = mst.storage(); 102 - let commit_data = storage 103 - .get(&commit_cid) 104 - .await? 105 - .ok_or_else(|| { 106 - RepoError::not_found("commit", &commit_cid) 107 - .with_help("Commit must be persisted to storage before exporting - ensure apply_commit() was called") 108 - })?; 109 - 110 - writer 111 - .write(commit_cid, &commit_data) 112 - .await 113 - .map_err(|e| RepoError::car(e).with_context("writing commit block"))?; 114 - 115 - // Stream MST and record blocks 116 - mst.write_blocks_to_car(&mut writer).await?; 117 - 118 - // Finish writing 119 - writer 120 - .finish() 121 - .await 122 - .map_err(|e| RepoError::car(e).with_context("finalizing CAR export"))?; 123 - 124 - Ok(()) 125 42 } 126 43 127 44 #[cfg(test)]
+1 -1
crates/jacquard-repo/src/lib.rs
··· 58 58 pub use error::{RepoError, RepoErrorKind, Result}; 59 59 pub use mst::{Mst, MstDiff, WriteOp}; 60 60 pub use repo::{CommitData, Repository}; 61 - pub use storage::{BlockStore, FileBlockStore, LayeredBlockStore, MemoryBlockStore}; 61 + pub use storage::{BlockStore, MemoryBlockStore}; 62 62 63 63 /// DAG-CBOR codec identifier for CIDs (0x71) 64 64 pub const DAG_CBOR_CID_CODEC: u64 = 0x71;
-5
crates/jacquard-repo/src/repo.rs
··· 583 583 Ok((ops, self.apply_commit(commit_data).await?)) 584 584 } 585 585 586 - /// Export repository to CAR file 587 - pub async fn export_car(&self, path: impl AsRef<Path>, commit_cid: IpldCid) -> Result<()> { 588 - crate::car::export_repo_car(path, commit_cid, &self.mst).await 589 - } 590 - 591 586 /// Get the underlying MST 592 587 pub fn mst(&self) -> &Mst<S> { 593 588 &self.mst
+2 -4
crates/jacquard-repo/src/storage/mod.rs
··· 93 93 async fn apply_commit(&self, commit: CommitData) -> Result<()>; 94 94 } 95 95 96 - pub mod file; 96 + pub mod memory; 97 97 pub mod layered; 98 - pub mod memory; 99 98 100 - pub use file::FileBlockStore; 99 + pub use memory::MemoryBlockStore; 101 100 pub use layered::LayeredBlockStore; 102 - pub use memory::MemoryBlockStore;