A better Rust ATProto crate

cleaned up a bunch of (lack of) imports

Orual fa3474ac 5651258d

+207 -252
+16 -31
crates/jacquard-repo/src/car/reader.rs
··· 3 3 //! Provides functions for reading CAR (Content Addressable aRchive) files into memory 4 4 //! or streaming them for large repositories. 5 5 6 - use crate::error::Result; 6 + use crate::error::{RepoError, Result}; 7 7 use bytes::Bytes; 8 8 use cid::Cid as IpldCid; 9 9 use iroh_car::CarReader; 10 - use n0_future::stream::StreamExt; 10 + use n0_future::stream::{Stream, StreamExt}; 11 11 use std::collections::BTreeMap; 12 12 use std::path::Path; 13 + use std::pin::Pin; 13 14 use tokio::fs::File; 14 15 15 16 /// Parsed CAR file data ··· 26 27 /// Returns BTreeMap of CID -> block data (sorted order for determinism). 27 28 /// For large CAR files, consider using `stream_car()` instead. 28 29 pub async fn read_car(path: impl AsRef<Path>) -> Result<BTreeMap<IpldCid, Bytes>> { 29 - let file = File::open(path) 30 - .await 31 - .map_err(|e| crate::error::RepoError::io(e))?; 30 + let file = File::open(path).await.map_err(|e| RepoError::io(e))?; 32 31 33 - let reader = CarReader::new(file) 34 - .await 35 - .map_err(|e| crate::error::RepoError::car(e))?; 32 + let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?; 36 33 37 34 let mut blocks = BTreeMap::new(); 38 35 let stream = reader.stream(); 39 36 n0_future::pin!(stream); 40 37 41 38 while let Some(result) = stream.next().await { 42 - let (cid, data) = result.map_err(|e| crate::error::RepoError::car_parse(e))?; 39 + let (cid, data) = result.map_err(|e| RepoError::car_parse(e))?; 43 40 blocks.insert(cid, Bytes::from(data)); 44 41 } 45 42 ··· 50 47 /// 51 48 /// Useful for checking roots without loading all blocks. 52 49 pub async fn read_car_header(path: impl AsRef<Path>) -> Result<Vec<IpldCid>> { 53 - let file = File::open(path) 54 - .await 55 - .map_err(|e| crate::error::RepoError::io(e))?; 50 + let file = File::open(path).await.map_err(|e| RepoError::io(e))?; 56 51 57 - let reader = CarReader::new(file) 58 - .await 59 - .map_err(|e| crate::error::RepoError::car(e))?; 52 + let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?; 60 53 61 54 Ok(reader.header().roots().to_vec()) 62 55 } ··· 68 61 pub async fn parse_car_bytes(data: &[u8]) -> Result<ParsedCar> { 69 62 let reader = CarReader::new(data) 70 63 .await 71 - .map_err(|e| crate::error::RepoError::car_parse(e))?; 64 + .map_err(|e| RepoError::car_parse(e))?; 72 65 73 66 let roots = reader.header().roots(); 74 67 let root = roots 75 68 .first() 76 69 .copied() 77 - .ok_or_else(|| crate::error::RepoError::invalid("CAR file has no roots"))?; 70 + .ok_or_else(|| RepoError::invalid("CAR file has no roots"))?; 78 71 79 72 let mut blocks = BTreeMap::new(); 80 73 let stream = reader.stream(); 81 74 n0_future::pin!(stream); 82 75 83 76 while let Some(result) = stream.next().await { 84 - let (cid, data) = result.map_err(|e| crate::error::RepoError::car_parse(e))?; 77 + let (cid, data) = result.map_err(|e| RepoError::car_parse(e))?; 85 78 blocks.insert(cid, Bytes::from(data)); 86 79 } 87 80 ··· 92 85 /// 93 86 /// Useful for processing large CAR files incrementally. 94 87 pub async fn stream_car(path: impl AsRef<Path>) -> Result<CarBlockStream> { 95 - let file = File::open(path) 96 - .await 97 - .map_err(|e| crate::error::RepoError::io(e))?; 88 + let file = File::open(path).await.map_err(|e| RepoError::io(e))?; 98 89 99 - let reader = CarReader::new(file) 100 - .await 101 - .map_err(|e| crate::error::RepoError::car(e))?; 90 + let reader = CarReader::new(file).await.map_err(|e| RepoError::car(e))?; 102 91 103 92 let roots = reader.header().roots().to_vec(); 104 93 let stream = Box::pin(reader.stream()); ··· 110 99 /// 111 100 /// Iterates through CAR blocks without loading entire file into memory. 112 101 pub struct CarBlockStream { 113 - stream: std::pin::Pin< 114 - Box< 115 - dyn n0_future::stream::Stream< 116 - Item = std::result::Result<(IpldCid, Vec<u8>), iroh_car::Error>, 117 - > + Send, 118 - >, 102 + stream: Pin< 103 + Box<dyn Stream<Item = std::result::Result<(IpldCid, Vec<u8>), iroh_car::Error>> + Send>, 119 104 >, 120 105 roots: Vec<IpldCid>, 121 106 } ··· 127 112 pub async fn next(&mut self) -> Result<Option<(IpldCid, Bytes)>> { 128 113 match self.stream.next().await { 129 114 Some(result) => { 130 - let (cid, data) = result.map_err(|e| crate::error::RepoError::car_parse(e))?; 115 + let (cid, data) = result.map_err(|e| RepoError::car_parse(e))?; 131 116 Ok(Some((cid, Bytes::from(data)))) 132 117 } 133 118 None => Ok(None),
+12 -28
crates/jacquard-repo/src/car/writer.rs
··· 2 2 //! 3 3 //! Provides functions for writing blocks to CAR (Content Addressable aRchive) files. 4 4 5 - use crate::error::Result; 5 + use crate::error::{RepoError, Result}; 6 6 use crate::mst::tree::Mst; 7 7 use crate::storage::BlockStore; 8 8 use bytes::Bytes; ··· 22 22 roots: Vec<IpldCid>, 23 23 blocks: BTreeMap<IpldCid, Bytes>, 24 24 ) -> Result<()> { 25 - let file = File::create(path) 26 - .await 27 - .map_err(|e| crate::error::RepoError::io(e))?; 25 + let file = File::create(path).await.map_err(|e| RepoError::io(e))?; 28 26 29 27 let header = iroh_car::CarHeader::new_v1(roots); 30 28 let mut writer = CarWriter::new(header, file); ··· 33 31 writer 34 32 .write(cid, data.as_ref()) 35 33 .await 36 - .map_err(|e| crate::error::RepoError::car(e))?; 34 + .map_err(|e| RepoError::car(e))?; 37 35 } 38 36 39 - writer 40 - .finish() 41 - .await 42 - .map_err(|e| crate::error::RepoError::car(e))?; 37 + writer.finish().await.map_err(|e| RepoError::car(e))?; 43 38 44 39 Ok(()) 45 40 } ··· 48 43 /// 49 44 /// Like `write_car()` but writes to a `Vec<u8>` instead of a file. 50 45 /// Useful for tests and proof generation. 51 - pub async fn write_car_bytes( 52 - root: IpldCid, 53 - blocks: BTreeMap<IpldCid, Bytes>, 54 - ) -> Result<Vec<u8>> { 46 + pub async fn write_car_bytes(root: IpldCid, blocks: BTreeMap<IpldCid, Bytes>) -> Result<Vec<u8>> { 55 47 let mut buffer = Vec::new(); 56 48 let header = iroh_car::CarHeader::new_v1(vec![root]); 57 49 let mut writer = CarWriter::new(header, &mut buffer); ··· 60 52 writer 61 53 .write(cid, data.as_ref()) 62 54 .await 63 - .map_err(|e| crate::error::RepoError::car(e))?; 55 + .map_err(|e| RepoError::car(e))?; 64 56 } 65 57 66 - writer 67 - .finish() 68 - .await 69 - .map_err(|e| crate::error::RepoError::car(e))?; 58 + writer.finish().await.map_err(|e| RepoError::car(e))?; 70 59 71 - buffer.flush().await.map_err(|e| crate::error::RepoError::io(e))?; 60 + buffer.flush().await.map_err(|e| RepoError::io(e))?; 72 61 73 62 Ok(buffer) 74 63 } ··· 86 75 commit_cid: IpldCid, 87 76 mst: &Mst<S>, 88 77 ) -> Result<()> { 89 - let file = File::create(path) 90 - .await 91 - .map_err(|e| crate::error::RepoError::io(e))?; 78 + let file = File::create(path).await.map_err(|e| RepoError::io(e))?; 92 79 93 80 let header = iroh_car::CarHeader::new_v1(vec![commit_cid]); 94 81 let mut writer = CarWriter::new(header, file); ··· 98 85 let commit_data = storage 99 86 .get(&commit_cid) 100 87 .await? 101 - .ok_or_else(|| crate::error::RepoError::not_found("commit", &commit_cid))?; 88 + .ok_or_else(|| RepoError::not_found("commit", &commit_cid))?; 102 89 103 90 writer 104 91 .write(commit_cid, &commit_data) 105 92 .await 106 - .map_err(|e| crate::error::RepoError::car(e))?; 93 + .map_err(|e| RepoError::car(e))?; 107 94 108 95 // Stream MST and record blocks 109 96 mst.write_blocks_to_car(&mut writer).await?; 110 97 111 98 // Finish writing 112 - writer 113 - .finish() 114 - .await 115 - .map_err(|e| crate::error::RepoError::car(e))?; 99 + writer.finish().await.map_err(|e| RepoError::car(e))?; 116 100 117 101 Ok(()) 118 102 }
+32 -39
crates/jacquard-repo/src/commit/firehose.rs
··· 5 5 //! which are DISTINCT from repository commit objects. 6 6 7 7 use bytes::Bytes; 8 - use jacquard_common::IntoStatic; 9 - use jacquard_common::types::string::{Did, Tid}; 8 + use jacquard_common::types::cid::CidLink; 9 + use jacquard_common::types::crypto::PublicKey; 10 + use jacquard_common::types::string::{Datetime, Did, Tid}; 11 + use jacquard_common::{CowStr, IntoStatic}; 10 12 11 13 /// Firehose commit message (sync v1.0 and v1.1) 12 14 /// ··· 33 35 pub since: Tid, 34 36 35 37 /// Timestamp of when this message was originally broadcast 36 - pub time: jacquard_common::types::string::Datetime, 38 + pub time: Datetime, 37 39 38 40 /// Repo commit object CID 39 41 /// 40 42 /// This CID points to the repository commit block (with did, version, data, rev, prev, sig). 41 43 /// It must be the first entry in the CAR header 'roots' list. 42 44 #[serde(borrow)] 43 - pub commit: jacquard_common::types::cid::CidLink<'a>, 45 + pub commit: CidLink<'a>, 44 46 45 47 /// CAR file containing relevant blocks 46 48 /// ··· 70 72 /// - Consumers must have previous repository state 71 73 #[serde(skip_serializing_if = "Option::is_none")] 72 74 #[serde(borrow)] 73 - pub prev_data: Option<jacquard_common::types::cid::CidLink<'a>>, 75 + pub prev_data: Option<CidLink<'a>>, 74 76 75 77 /// Blob CIDs referenced in this commit 76 78 #[serde(borrow)] 77 - pub blobs: Vec<jacquard_common::types::cid::CidLink<'a>>, 79 + pub blobs: Vec<CidLink<'a>>, 78 80 79 81 /// DEPRECATED: Replaced by #sync event and data limits 80 82 /// ··· 92 94 pub struct RepoOp<'a> { 93 95 /// Operation type: "create", "update", or "delete" 94 96 #[serde(borrow)] 95 - pub action: jacquard_common::CowStr<'a>, 97 + pub action: CowStr<'a>, 96 98 97 99 /// Collection/rkey path (e.g., "app.bsky.feed.post/abc123") 98 100 #[serde(borrow)] 99 - pub path: jacquard_common::CowStr<'a>, 101 + pub path: CowStr<'a>, 100 102 101 103 /// For creates and updates, the new record CID. For deletions, None (null). 102 104 #[serde(skip_serializing_if = "Option::is_none")] 103 105 #[serde(borrow)] 104 - pub cid: Option<jacquard_common::types::cid::CidLink<'a>>, 106 + pub cid: Option<CidLink<'a>>, 105 107 106 108 /// For updates and deletes, the previous record CID 107 109 /// ··· 109 111 /// For creates, this field should not be defined. 110 112 #[serde(skip_serializing_if = "Option::is_none")] 111 113 #[serde(borrow)] 112 - pub prev: Option<jacquard_common::types::cid::CidLink<'a>>, 114 + pub prev: Option<CidLink<'a>>, 113 115 } 114 116 115 117 impl IntoStatic for FirehoseCommit<'_> { ··· 146 148 } 147 149 } 148 150 151 + use crate::car::parse_car_bytes; 149 152 /// Validation functions for firehose commit messages 150 153 /// 151 154 /// These functions validate commits from the `com.atproto.sync.subscribeRepos` firehose. 152 - use crate::error::Result; 155 + use crate::error::{RepoError, Result}; 153 156 use crate::mst::Mst; 154 - use crate::storage::BlockStore; 157 + use crate::storage::{BlockStore, LayeredBlockStore, MemoryBlockStore}; 155 158 use cid::Cid as IpldCid; 156 159 use std::sync::Arc; 157 160 ··· 173 176 &self, 174 177 prev_mst_root: Option<IpldCid>, 175 178 prev_storage: Arc<S>, 176 - pubkey: &jacquard_common::types::crypto::PublicKey<'_>, 179 + pubkey: &PublicKey<'_>, 177 180 ) -> Result<IpldCid> { 178 181 // 1. Parse CAR blocks from the firehose message into temporary storage 179 - let parsed = crate::car::parse_car_bytes(&self.blocks).await?; 180 - let temp_storage = crate::storage::MemoryBlockStore::new_from_blocks(parsed.blocks); 182 + let parsed = parse_car_bytes(&self.blocks).await?; 183 + let temp_storage = MemoryBlockStore::new_from_blocks(parsed.blocks); 181 184 182 185 // 2. Create layered storage: reads from temp first, then prev; writes to temp only 183 186 // This avoids copying all previous MST blocks 184 - let layered_storage = 185 - crate::storage::LayeredBlockStore::new(temp_storage.clone(), prev_storage); 187 + let layered_storage = LayeredBlockStore::new(temp_storage.clone(), prev_storage); 186 188 187 189 // 3. Extract and verify commit object from temporary storage 188 190 let commit_cid: IpldCid = self 189 191 .commit 190 192 .to_ipld() 191 - .map_err(|e| crate::error::RepoError::invalid(format!("Invalid commit CID: {}", e)))?; 193 + .map_err(|e| RepoError::invalid(format!("Invalid commit CID: {}", e)))?; 192 194 let commit_bytes = temp_storage 193 195 .get(&commit_cid) 194 196 .await? 195 - .ok_or_else(|| crate::error::RepoError::not_found("commit block", &commit_cid))?; 197 + .ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?; 196 198 197 199 let commit = super::Commit::from_cbor(&commit_bytes)?; 198 200 199 201 // Verify DID matches 200 202 if commit.did().as_ref() != self.repo.as_ref() { 201 - return Err(crate::error::RepoError::invalid_commit(format!( 203 + return Err(RepoError::invalid_commit(format!( 202 204 "DID mismatch: commit has {}, message has {}", 203 205 commit.did(), 204 206 self.repo ··· 232 234 let computed_root = computed_mst.get_pointer().await?; 233 235 234 236 if computed_root != expected_root { 235 - return Err(crate::error::RepoError::invalid_commit(format!( 237 + return Err(RepoError::invalid_commit(format!( 236 238 "MST root mismatch: expected {}, got {}", 237 239 expected_root, computed_root 238 240 ))); ··· 259 261 /// **Inductive property:** Can validate without any external state besides the blocks 260 262 /// in this message. The `prev_data` field provides the starting MST root, and operations 261 263 /// include `prev` CIDs for validation. All necessary blocks must be in the CAR bytes. 262 - pub async fn validate_v1_1( 263 - &self, 264 - pubkey: &jacquard_common::types::crypto::PublicKey<'_>, 265 - ) -> Result<IpldCid> { 264 + pub async fn validate_v1_1(&self, pubkey: &PublicKey<'_>) -> Result<IpldCid> { 266 265 // 1. Require prev_data for v1.1 267 266 let prev_data_cid: IpldCid = self 268 267 .prev_data 269 268 .as_ref() 270 269 .ok_or_else(|| { 271 - crate::error::RepoError::invalid_commit( 272 - "Sync v1.1 validation requires prev_data field", 273 - ) 270 + RepoError::invalid_commit("Sync v1.1 validation requires prev_data field") 274 271 })? 275 272 .to_ipld() 276 - .map_err(|e| { 277 - crate::error::RepoError::invalid(format!("Invalid prev_data CID: {}", e)) 278 - })?; 273 + .map_err(|e| RepoError::invalid(format!("Invalid prev_data CID: {}", e)))?; 279 274 280 275 // 2. Parse CAR blocks from the firehose message into temporary storage 281 - let parsed = crate::car::parse_car_bytes(&self.blocks).await?; 282 - let temp_storage = Arc::new(crate::storage::MemoryBlockStore::new_from_blocks( 283 - parsed.blocks, 284 - )); 276 + let parsed = parse_car_bytes(&self.blocks).await?; 277 + let temp_storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks)); 285 278 286 279 // 3. Extract and verify commit object from temporary storage 287 280 let commit_cid: IpldCid = self 288 281 .commit 289 282 .to_ipld() 290 - .map_err(|e| crate::error::RepoError::invalid(format!("Invalid commit CID: {}", e)))?; 283 + .map_err(|e| RepoError::invalid(format!("Invalid commit CID: {}", e)))?; 291 284 let commit_bytes = temp_storage 292 285 .get(&commit_cid) 293 286 .await? 294 - .ok_or_else(|| crate::error::RepoError::not_found("commit block", &commit_cid))?; 287 + .ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?; 295 288 296 289 let commit = super::Commit::from_cbor(&commit_bytes)?; 297 290 298 291 // Verify DID matches 299 292 if commit.did().as_ref() != self.repo.as_ref() { 300 - return Err(crate::error::RepoError::invalid_commit(format!( 293 + return Err(RepoError::invalid_commit(format!( 301 294 "DID mismatch: commit has {}, message has {}", 302 295 commit.did(), 303 296 self.repo ··· 325 318 let computed_root = computed_mst.get_pointer().await?; 326 319 327 320 if computed_root != expected_root { 328 - return Err(crate::error::RepoError::invalid_commit(format!( 321 + return Err(RepoError::invalid_commit(format!( 329 322 "MST root mismatch: expected {}, got {}", 330 323 expected_root, computed_root 331 324 )));
+3 -2
crates/jacquard-repo/src/commit/proof.rs
··· 22 22 use crate::mst::Mst; 23 23 use crate::storage::MemoryBlockStore; 24 24 use cid::Cid as IpldCid; 25 + use jacquard_common::CowStr; 25 26 use jacquard_common::types::string::Did; 26 27 use smol_str::format_smolstr; 27 28 use std::sync::Arc; ··· 30 31 #[derive(Debug, Clone, PartialEq, Eq)] 31 32 pub struct RecordClaim<'a> { 32 33 /// Collection NSID (e.g., "app.bsky.feed.post") 33 - pub collection: jacquard_common::CowStr<'a>, 34 + pub collection: CowStr<'a>, 34 35 35 36 /// Record key (TID or other identifier) 36 - pub rkey: jacquard_common::CowStr<'a>, 37 + pub rkey: CowStr<'a>, 37 38 38 39 /// Expected CID of the record 39 40 /// - Some(cid): claiming record exists with this CID
+1 -1
crates/jacquard-repo/src/error.rs
··· 269 269 } 270 270 271 271 /// Diff-specific errors 272 - #[derive(Debug, thiserror::Error)] 272 + #[derive(Debug, thiserror::Error, miette::Diagnostic)] 273 273 pub enum DiffError { 274 274 /// Too many operations 275 275 #[error("Too many operations: {count} (max {max})")]
+18 -23
crates/jacquard-repo/src/mst/diff.rs
··· 1 1 //! MST diff calculation 2 2 3 3 use std::collections::BTreeMap; 4 + use std::future::Future; 5 + use std::pin::Pin; 4 6 5 7 use super::cursor::{CursorPosition, MstCursor}; 6 8 use super::tree::Mst; 7 - use crate::error::Result; 9 + use super::util::serialize_node_data; 10 + use crate::commit::firehose::RepoOp; 11 + use crate::error::{RepoError, Result}; 12 + use crate::mst::NodeEntry; 8 13 use crate::storage::BlockStore; 9 14 use bytes::Bytes; 10 15 use cid::Cid as IpldCid; 16 + use jacquard_common::types::cid::CidLink; 11 17 use smol_str::SmolStr; 12 18 13 19 /// Diff between two MST states ··· 87 93 /// The sync protocol has a 200 operation limit per commit. 88 94 pub fn validate_limits(&self) -> Result<()> { 89 95 if self.op_count() > 200 { 90 - return Err(crate::error::RepoError::too_large( 96 + return Err(RepoError::too_large( 91 97 "diff operation count", 92 98 self.op_count(), 93 99 200, ··· 139 145 &self, 140 146 storage: &S, 141 147 ) -> Result<std::collections::BTreeMap<IpldCid, bytes::Bytes>> { 142 - use std::collections::BTreeMap; 143 - 144 148 let mut blocks = BTreeMap::new(); 145 149 146 150 for cid in &self.new_leaf_cids { ··· 156 160 /// 157 161 /// Returns operations in the format used by `com.atproto.sync.subscribeRepos`. 158 162 /// All update/delete operations include prev CIDs for sync v1.1 validation. 159 - pub fn to_repo_ops(&self) -> Vec<crate::commit::firehose::RepoOp<'_>> { 160 - use jacquard_common::types::cid::CidLink; 161 - 163 + pub fn to_repo_ops(&self) -> Vec<RepoOp<'_>> { 162 164 let mut ops = Vec::with_capacity(self.op_count()); 163 165 164 166 // Add creates 165 167 for (key, cid) in &self.creates { 166 - ops.push(crate::commit::firehose::RepoOp { 168 + ops.push(RepoOp { 167 169 action: "create".into(), 168 170 path: key.as_str().into(), 169 171 cid: Some(CidLink::from(*cid)), ··· 173 175 174 176 // Add updates 175 177 for (key, new_cid, old_cid) in &self.updates { 176 - ops.push(crate::commit::firehose::RepoOp { 178 + ops.push(RepoOp { 177 179 action: "update".into(), 178 180 path: key.as_str().into(), 179 181 cid: Some(CidLink::from(*new_cid)), ··· 183 185 184 186 // Add deletes 185 187 for (key, old_cid) in &self.deletes { 186 - ops.push(crate::commit::firehose::RepoOp { 188 + ops.push(RepoOp { 187 189 action: "delete".into(), 188 190 path: key.as_str().into(), 189 191 cid: None, // null for deletes ··· 223 225 old: &'a Mst<S>, 224 226 new: &'a Mst<S>, 225 227 diff: &'a mut MstDiff, 226 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 228 + ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { 227 229 Box::pin(async move { 228 230 // If CIDs are equal, trees are identical - skip entire subtree 229 231 let old_cid = old.get_pointer().await?; ··· 406 408 407 409 // Serialize the MST node 408 410 let entries = tree.get_entries().await?; 409 - let node_data = super::util::serialize_node_data(&entries).await?; 410 - let cbor = serde_ipld_dagcbor::to_vec(&node_data) 411 - .map_err(|e| crate::error::RepoError::serialization(e))?; 411 + let node_data = serialize_node_data(&entries).await?; 412 + let cbor = serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| RepoError::serialization(e))?; 412 413 413 414 // Track the serialized block 414 415 diff.new_mst_blocks.insert(tree_cid, Bytes::from(cbor)); ··· 420 421 fn track_added_tree<'a, S: BlockStore + Sync + 'static>( 421 422 tree: &'a Mst<S>, 422 423 diff: &'a mut MstDiff, 423 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 424 + ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { 424 425 Box::pin(async move { 425 - use super::node::NodeEntry; 426 - 427 426 // Serialize and track this MST node 428 427 serialize_and_track_mst(tree, diff).await?; 429 428 ··· 448 447 fn track_removed_tree<'a, S: BlockStore + Sync + 'static>( 449 448 tree: &'a Mst<S>, 450 449 diff: &'a mut MstDiff, 451 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 450 + ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { 452 451 Box::pin(async move { 453 - use super::node::NodeEntry; 454 - 455 452 // Track this MST node as removed 456 453 let tree_cid = tree.get_pointer().await?; 457 454 diff.removed_mst_blocks.push(tree_cid); ··· 489 486 fn track_removed_tree_all<'a, S: BlockStore + Sync + 'static>( 490 487 tree: &'a Mst<S>, 491 488 diff: &'a mut MstDiff, 492 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 489 + ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { 493 490 Box::pin(async move { 494 - use super::node::NodeEntry; 495 - 496 491 // Track this node as removed 497 492 let tree_cid = tree.get_pointer().await?; 498 493 diff.removed_mst_blocks.push(tree_cid);
+6 -4
crates/jacquard-repo/src/mst/node.rs
··· 6 6 use cid::Cid as IpldCid; 7 7 use smol_str::SmolStr; 8 8 9 + use crate::{mst::Mst, storage::BlockStore}; 10 + 9 11 /// Entry in an MST node - either a subtree or a leaf 10 12 /// 11 13 /// This is the in-memory representation used for tree operations. ··· 14 16 /// 15 17 /// The wire format (CBOR) is different - see `NodeData` and `TreeEntry`. 16 18 #[derive(Clone)] 17 - pub enum NodeEntry<S: crate::storage::BlockStore> { 19 + pub enum NodeEntry<S> { 18 20 /// Subtree reference 19 21 /// 20 22 /// Will be lazily loaded from storage when needed. 21 - Tree(crate::mst::Mst<S>), 23 + Tree(Mst<S>), 22 24 23 25 /// Leaf node with key-value pair 24 26 Leaf { ··· 29 31 }, 30 32 } 31 33 32 - impl<S: crate::storage::BlockStore> fmt::Debug for NodeEntry<S> { 34 + impl<S: BlockStore> fmt::Debug for NodeEntry<S> { 33 35 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 34 36 match self { 35 37 NodeEntry::Tree(t) => write!(f, "{:?}", t), ··· 40 42 } 41 43 } 42 44 43 - impl<S: crate::storage::BlockStore> NodeEntry<S> { 45 + impl<S: BlockStore> NodeEntry<S> { 44 46 /// Check if this is a tree entry 45 47 pub fn is_tree(&self) -> bool { 46 48 matches!(self, NodeEntry::Tree(_))
+18 -32
crates/jacquard-repo/src/mst/tree.rs
··· 171 171 /// - Layer = floor(leading_zeros / 2) for ~4 fanout 172 172 /// - Deterministic and insertion-order independent 173 173 #[derive(Clone)] 174 - pub struct Mst<S: BlockStore> { 174 + pub struct Mst<S> { 175 175 /// Block storage for loading/saving nodes (shared via Arc) 176 176 storage: Arc<S>, 177 177 ··· 366 366 /// For nodes with no leaves, recursively checks subtrees. 367 367 pub(crate) fn get_layer<'a>( 368 368 &'a self, 369 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<usize>> + Send + 'a>> { 369 + ) -> Pin<Box<dyn Future<Output = Result<usize>> + Send + 'a>> { 370 370 Box::pin(async move { 371 371 if let Some(layer) = self.layer { 372 372 return Ok(layer); ··· 415 415 pub fn get<'a>( 416 416 &'a self, 417 417 key: &'a str, 418 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Option<IpldCid>>> + Send + 'a>> 419 - { 418 + ) -> Pin<Box<dyn Future<Output = Result<Option<IpldCid>>> + Send + 'a>> { 420 419 Box::pin(async move { 421 420 validate_key(key)?; 422 421 ··· 452 451 &'a self, 453 452 key: &'a str, 454 453 cid: IpldCid, 455 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Mst<S>>> + Send + 'a>> { 454 + ) -> Pin<Box<dyn Future<Output = Result<Mst<S>>> + Send + 'a>> { 456 455 Box::pin(async move { 457 456 validate_key(key)?; 458 457 ··· 575 574 pub fn delete<'a>( 576 575 &'a self, 577 576 key: &'a str, 578 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Mst<S>>> + Send + 'a>> { 577 + ) -> Pin<Box<dyn Future<Output = Result<Mst<S>>> + Send + 'a>> { 579 578 Box::pin(async move { 580 579 validate_key(key)?; 581 580 ··· 588 587 fn delete_recurse<'a>( 589 588 &'a self, 590 589 key: &'a str, 591 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Mst<S>>> + Send + 'a>> { 590 + ) -> Pin<Box<dyn Future<Output = Result<Mst<S>>> + Send + 'a>> { 592 591 Box::pin(async move { 593 592 let entries = self.get_entries().await?; 594 593 let index = Self::find_gt_or_equal_leaf_index_in(&entries, key); ··· 704 703 } 705 704 706 705 /// Trim top node if it only contains one subtree 707 - fn trim_top( 708 - self, 709 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Mst<S>>> + Send>> { 706 + fn trim_top(self) -> Pin<Box<dyn Future<Output = Result<Mst<S>>> + Send>> { 710 707 Box::pin(async move { 711 708 let entries = self.get_entries().await?; 712 709 ··· 730 727 pub fn split_around<'a>( 731 728 &'a self, 732 729 key: &'a str, 733 - ) -> std::pin::Pin< 734 - Box<dyn std::future::Future<Output = Result<(Option<Mst<S>>, Option<Mst<S>>)>> + Send + 'a>, 735 - > { 730 + ) -> Pin<Box<dyn Future<Output = Result<(Option<Mst<S>>, Option<Mst<S>>)>> + Send + 'a>> { 736 731 Box::pin(async move { 737 732 let entries = self.get_entries().await?; 738 733 let index = Self::find_gt_or_equal_leaf_index_in(&entries, key); ··· 783 778 pub fn append_merge<'a>( 784 779 &'a self, 785 780 to_merge: &'a Mst<S>, 786 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Mst<S>>> + Send + 'a>> { 781 + ) -> Pin<Box<dyn Future<Output = Result<Mst<S>>> + Send + 'a>> { 787 782 Box::pin(async move { 788 783 // Check same layer 789 784 let self_layer = self.get_layer().await?; ··· 878 873 /// Uses parallel traversal to collect leaves from independent subtrees concurrently. 879 874 pub fn leaves<'a>( 880 875 &'a self, 881 - ) -> std::pin::Pin< 882 - Box< 883 - dyn std::future::Future<Output = Result<Vec<(smol_str::SmolStr, IpldCid)>>> + Send + 'a, 884 - >, 885 - > { 876 + ) -> Pin<Box<dyn Future<Output = Result<Vec<(smol_str::SmolStr, IpldCid)>>> + Send + 'a>> { 886 877 Box::pin(async move { collect_leaves_parallel(self.clone()).await }) 887 878 } 888 879 889 880 /// Get all leaf entries sequentially (for benchmarking) 890 881 pub fn leaves_sequential<'a>( 891 882 &'a self, 892 - ) -> std::pin::Pin< 893 - Box< 894 - dyn std::future::Future<Output = Result<Vec<(smol_str::SmolStr, IpldCid)>>> + Send + 'a, 895 - >, 896 - > { 883 + ) -> Pin<Box<dyn Future<Output = Result<Vec<(smol_str::SmolStr, IpldCid)>>> + Send + 'a>> { 897 884 Box::pin(async move { 898 885 let mut result = Vec::new(); 899 886 self.collect_leaves_sequential(&mut result).await?; ··· 905 892 fn collect_leaves_sequential<'a>( 906 893 &'a self, 907 894 result: &'a mut Vec<(smol_str::SmolStr, IpldCid)>, 908 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 895 + ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { 909 896 Box::pin(async move { 910 897 let entries = self.get_entries().await?; 911 898 ··· 1011 998 /// Uses parallel traversal to collect blocks from independent subtrees concurrently. 1012 999 pub fn collect_blocks<'a>( 1013 1000 &'a self, 1014 - ) -> std::pin::Pin< 1001 + ) -> Pin< 1015 1002 Box< 1016 - dyn std::future::Future< 1003 + dyn Future< 1017 1004 Output = Result<(IpldCid, std::collections::BTreeMap<IpldCid, bytes::Bytes>)>, 1018 1005 > + Send 1019 1006 + 'a, ··· 1025 1012 /// Collect all blocks sequentially (for benchmarking) 1026 1013 pub fn collect_blocks_sequential<'a>( 1027 1014 &'a self, 1028 - ) -> std::pin::Pin< 1015 + ) -> Pin< 1029 1016 Box< 1030 - dyn std::future::Future< 1017 + dyn Future< 1031 1018 Output = Result<(IpldCid, std::collections::BTreeMap<IpldCid, bytes::Bytes>)>, 1032 1019 > + Send 1033 1020 + 'a, ··· 1123 1110 pub fn cids_for_path<'a>( 1124 1111 &'a self, 1125 1112 key: &'a str, 1126 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<Vec<IpldCid>>> + Send + 'a>> 1127 - { 1113 + ) -> Pin<Box<dyn Future<Output = Result<Vec<IpldCid>>> + Send + 'a>> { 1128 1114 Box::pin(async move { 1129 1115 validate_key(key)?; 1130 1116 ··· 1195 1181 &'a self, 1196 1182 writer: &'a mut iroh_car::CarWriter<W>, 1197 1183 leaf_cids: &'a mut Vec<IpldCid>, 1198 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<()>> + Send + 'a>> { 1184 + ) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> { 1199 1185 Box::pin(async move { 1200 1186 let pointer = self.get_pointer().await?; 1201 1187
+7 -4
crates/jacquard-repo/src/mst/util.rs
··· 1 1 //! MST utility functions 2 2 3 + use std::future::Future; 4 + use std::pin::Pin; 5 + use std::sync::Arc; 6 + 3 7 use super::node::{NodeData, NodeEntry, TreeEntry}; 8 + use crate::Mst; 4 9 use crate::error::{MstError, Result}; 5 10 use crate::storage::BlockStore; 6 11 use bytes::Bytes; ··· 108 113 /// - `Tree` after `Leaf` → that leaf's `tree` pointer 109 114 pub fn serialize_node_data<'a, S: BlockStore + Sync + 'static>( 110 115 entries: &'a [NodeEntry<S>], 111 - ) -> std::pin::Pin<Box<dyn std::future::Future<Output = Result<NodeData>> + Send + 'a>> { 116 + ) -> Pin<Box<dyn Future<Output = Result<NodeData>> + Send + 'a>> { 112 117 Box::pin(async move { 113 118 let mut data = NodeData { 114 119 left: None, ··· 174 179 /// - Each entry → `Leaf` with reconstructed full key 175 180 /// - `tree` pointer → append `Tree` entry 176 181 pub fn deserialize_node_data<S: BlockStore + Sync + 'static>( 177 - storage: std::sync::Arc<S>, 182 + storage: Arc<S>, 178 183 data: &NodeData, 179 184 layer: Option<usize>, 180 185 ) -> Result<Vec<NodeEntry<S>>> { 181 - use crate::mst::Mst; 182 - 183 186 let mut entries = Vec::new(); 184 187 185 188 // Left pointer → prepend Tree
+77 -79
crates/jacquard-repo/src/repo.rs
··· 3 3 //! Optional convenience layer over MST primitives. Provides type-safe record operations, 4 4 //! batch writes, commit creation, and CAR export. 5 5 6 - use crate::commit::Commit; 7 - use crate::error::Result; 8 - use crate::mst::Mst; 6 + use crate::commit::firehose::{FirehoseCommit, RepoOp}; 7 + use crate::commit::{Commit, SigningKey}; 8 + use crate::error::{RepoError, Result}; 9 + use crate::mst::{Mst, MstDiff, RecordWriteOp}; 9 10 use crate::storage::BlockStore; 10 11 use cid::Cid as IpldCid; 11 12 use jacquard_common::IntoStatic; 12 - use jacquard_common::types::string::{Did, Nsid, RecordKey, Tid}; 13 + use jacquard_common::types::cid::CidLink; 14 + use jacquard_common::types::recordkey::RecordKeyType; 15 + use jacquard_common::types::string::{Datetime, Did, Nsid, RecordKey, Tid}; 13 16 use jacquard_common::types::tid::Ticker; 17 + use smol_str::format_smolstr; 14 18 use std::collections::BTreeMap; 15 19 use std::path::Path; 16 20 use std::sync::Arc; ··· 39 43 /// Previous MST root CID (for sync v1.1) 40 44 pub prev_data: Option<IpldCid>, 41 45 42 - /// All blocks to persist (MST nodes + record data + commit block) 43 - /// 44 - /// Includes: 45 - /// - All new MST node blocks from `diff.new_mst_blocks` 46 - /// - All new record data blocks (from creates + updates) 47 - /// - The commit block itself 46 + /// New blocks to persist (MST nodes + record data + commit block) 48 47 pub blocks: BTreeMap<IpldCid, bytes::Bytes>, 49 48 50 49 /// Relevant blocks for firehose (sync v1.1 inductive validation) ··· 55 54 /// - Includes "adjacent" blocks needed for operation inversion 56 55 pub relevant_blocks: BTreeMap<IpldCid, bytes::Bytes>, 57 56 58 - /// CIDs of blocks to delete from storage 59 - /// 60 - /// Contains CIDs that are no longer referenced by the current tree: 61 - /// - Record CIDs from deleted records 62 - /// - Old record CIDs from updated records 63 - /// 64 - /// **Note:** Actual deletion should consider whether previous commits still 65 - /// reference these CIDs. A proper GC strategy might: 66 - /// - Only delete if previous commits are also being GC'd 67 - /// - Use reference counting across all retained commits 68 - /// - Perform periodic reachability analysis 69 - /// 70 - /// For simple single-commit repos or when old commits are discarded, direct 71 - /// deletion is safe. 57 + /// CIDs of blocks to delete 72 58 pub deleted_cids: Vec<IpldCid>, 73 59 } 74 60 ··· 81 67 &self, 82 68 repo: &Did<'_>, 83 69 seq: i64, 84 - time: jacquard_common::types::string::Datetime, 85 - ops: Vec<crate::commit::firehose::RepoOp<'static>>, 86 - blobs: Vec<jacquard_common::types::cid::CidLink<'static>>, 87 - ) -> Result<crate::commit::firehose::FirehoseCommit<'static>> { 88 - use jacquard_common::types::cid::CidLink; 89 - 70 + time: Datetime, 71 + ops: Vec<RepoOp<'static>>, 72 + blobs: Vec<CidLink<'static>>, 73 + ) -> Result<FirehoseCommit<'static>> { 90 74 // Convert relevant blocks to CAR format 91 75 let blocks_car = 92 76 crate::car::write_car_bytes(self.cid, self.relevant_blocks.clone()).await?; 93 77 94 - Ok(crate::commit::firehose::FirehoseCommit { 78 + Ok(FirehoseCommit { 95 79 repo: repo.clone().into_static(), 96 80 rev: self.rev.clone(), 97 81 seq, ··· 161 145 let commit_bytes = storage 162 146 .get(commit_cid) 163 147 .await? 164 - .ok_or_else(|| crate::error::RepoError::not_found("commit", commit_cid))?; 148 + .ok_or_else(|| RepoError::not_found("commit", commit_cid))?; 165 149 166 150 let commit = Commit::from_cbor(&commit_bytes)?; 167 151 let mst_root = commit.data(); ··· 177 161 } 178 162 179 163 /// Get a record by collection and rkey 180 - pub async fn get_record<T: jacquard_common::types::recordkey::RecordKeyType>( 164 + pub async fn get_record<T: RecordKeyType>( 181 165 &self, 182 166 collection: &Nsid<'_>, 183 167 rkey: &RecordKey<T>, ··· 187 171 } 188 172 189 173 /// Create a record (error if exists) 190 - pub async fn create_record<T: jacquard_common::types::recordkey::RecordKeyType>( 174 + pub async fn create_record<T: RecordKeyType>( 191 175 &mut self, 192 176 collection: &Nsid<'_>, 193 177 rkey: &RecordKey<T>, ··· 196 180 let key = format!("{}/{}", collection.as_ref(), rkey.as_ref()); 197 181 198 182 if self.mst.get(&key).await?.is_some() { 199 - return Err(crate::error::RepoError::already_exists("record", &key)); 183 + return Err(RepoError::already_exists("record", &key)); 200 184 } 201 185 202 186 self.mst = self.mst.add(&key, record_cid).await?; ··· 204 188 } 205 189 206 190 /// Update a record (error if not exists, returns previous CID) 207 - pub async fn update_record<T: jacquard_common::types::recordkey::RecordKeyType>( 191 + pub async fn update_record<T: RecordKeyType>( 208 192 &mut self, 209 193 collection: &Nsid<'_>, 210 194 rkey: &RecordKey<T>, ··· 216 200 .mst 217 201 .get(&key) 218 202 .await? 219 - .ok_or_else(|| crate::error::RepoError::not_found("record", &key))?; 203 + .ok_or_else(|| RepoError::not_found("record", &key))?; 220 204 221 205 self.mst = self.mst.update(&key, record_cid).await?; 222 206 Ok(old_cid) 223 207 } 224 208 225 209 /// Delete a record (error if not exists, returns deleted CID) 226 - pub async fn delete_record<T: jacquard_common::types::recordkey::RecordKeyType>( 210 + pub async fn delete_record<T: RecordKeyType>( 227 211 &mut self, 228 212 collection: &Nsid<'_>, 229 213 rkey: &RecordKey<T>, ··· 234 218 .mst 235 219 .get(&key) 236 220 .await? 237 - .ok_or_else(|| crate::error::RepoError::not_found("record", &key))?; 221 + .ok_or_else(|| RepoError::not_found("record", &key))?; 238 222 239 223 self.mst = self.mst.delete(&key).await?; 240 224 Ok(old_cid) 241 225 } 242 226 227 + // TODO(cursor-based queries): Potential future API additions 228 + // 229 + // The current API is purely single-record CRUD. Cursor-based traversal (see mst/cursor.rs) 230 + // would enable efficient collection/range queries: 231 + // 232 + // - list_collection(collection: &Nsid) -> Vec<(RecordKey, IpldCid)> 233 + // Enumerate all records in a collection via prefix search on "collection/" 234 + // Uses cursor.advance() + cursor.skip_subtree() to skip irrelevant branches 235 + // 236 + // - list_collection_range(collection: &Nsid, start: &Rkey, end: &Rkey) -> Vec<...> 237 + // Range query: advance to start key, collect until > end, skip subtrees outside range 238 + // Useful for pagination / time-bounded queries (since Rkeys are often TIDs) 239 + // 240 + // - list_all_collections() -> Vec<Nsid> 241 + // Walk tree, track collection prefixes, skip subtrees once prefix changes 242 + // 243 + // Current single-key get() is already optimal (O(log n) targeted lookup). 244 + // But these bulk operations would benefit significantly from cursor's skip_subtree() 245 + // to avoid traversing unrelated branches when searching lexicographically-organized data. 246 + 243 247 /// Apply record write operations with inline data 244 248 /// 245 249 /// Serializes record data to DAG-CBOR, computes CIDs, stores data blocks, 246 250 /// then applies write operations to the MST. Returns the diff for inspection. 247 251 /// 248 252 /// For creating commits with operations, use `create_commit()` instead. 249 - pub async fn apply_record_writes( 250 - &mut self, 251 - ops: &[crate::mst::RecordWriteOp<'_>], 252 - ) -> Result<crate::mst::MstDiff> { 253 - use crate::mst::RecordWriteOp; 253 + pub async fn apply_record_writes(&mut self, ops: &[RecordWriteOp<'_>]) -> Result<MstDiff> { 254 254 use smol_str::format_smolstr; 255 255 256 256 let mut updated_tree = self.mst.clone(); ··· 266 266 267 267 // Serialize record to DAG-CBOR 268 268 let cbor = serde_ipld_dagcbor::to_vec(record) 269 - .map_err(|e| crate::error::RepoError::serialization(e))?; 269 + .map_err(|e| RepoError::serialization(e))?; 270 270 271 271 // Compute CID and store data 272 272 let cid = self.storage.put(&cbor).await?; ··· 283 283 284 284 // Serialize record to DAG-CBOR 285 285 let cbor = serde_ipld_dagcbor::to_vec(record) 286 - .map_err(|e| crate::error::RepoError::serialization(e))?; 286 + .map_err(|e| RepoError::serialization(e))?; 287 287 288 288 // Compute CID and store data 289 289 let cid = self.storage.put(&cbor).await?; ··· 291 291 // Validate prev if provided 292 292 if let Some(prev_cid) = prev { 293 293 if &cid != prev_cid { 294 - return Err(crate::error::RepoError::invalid(format!( 294 + return Err(RepoError::invalid(format!( 295 295 "Update prev CID mismatch for key {}: expected {}, got {}", 296 296 key, prev_cid, cid 297 297 ))); ··· 308 308 let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 309 309 310 310 // Check exists 311 - let current = self.mst.get(key.as_str()).await?.ok_or_else(|| { 312 - crate::error::RepoError::not_found("record", key.as_str()) 313 - })?; 311 + let current = self 312 + .mst 313 + .get(key.as_str()) 314 + .await? 315 + .ok_or_else(|| RepoError::not_found("record", key.as_str()))?; 314 316 315 317 // Validate prev if provided 316 318 if let Some(prev_cid) = prev { 317 319 if &current != prev_cid { 318 - return Err(crate::error::RepoError::invalid(format!( 320 + return Err(RepoError::invalid(format!( 319 321 "Delete prev CID mismatch for key {}: expected {}, got {}", 320 322 key, prev_cid, current 321 323 ))); ··· 347 349 /// Returns `(ops, CommitData)` - ops are needed for `to_firehose_commit()`. 348 350 pub async fn create_commit<K>( 349 351 &mut self, 350 - ops: &[crate::mst::RecordWriteOp<'_>], 352 + ops: &[RecordWriteOp<'_>], 351 353 did: &Did<'_>, 352 354 prev: Option<IpldCid>, 353 355 signing_key: &K, 354 - ) -> Result<(Vec<crate::commit::firehose::RepoOp<'static>>, CommitData)> 356 + ) -> Result<(Vec<RepoOp<'static>>, CommitData)> 355 357 where 356 - K: crate::commit::SigningKey, 358 + K: SigningKey, 357 359 { 358 - use crate::mst::RecordWriteOp; 359 - use smol_str::format_smolstr; 360 - 361 360 // Step 1: Apply all write operations to build new MST 362 361 let mut updated_tree = self.mst.clone(); 363 362 ··· 372 371 373 372 // Serialize record to DAG-CBOR 374 373 let cbor = serde_ipld_dagcbor::to_vec(record) 375 - .map_err(|e| crate::error::RepoError::serialization(e))?; 374 + .map_err(|e| RepoError::serialization(e))?; 376 375 377 376 // Compute CID and store data 378 377 let cid = self.storage.put(&cbor).await?; ··· 389 388 390 389 // Serialize record to DAG-CBOR 391 390 let cbor = serde_ipld_dagcbor::to_vec(record) 392 - .map_err(|e| crate::error::RepoError::serialization(e))?; 391 + .map_err(|e| RepoError::serialization(e))?; 393 392 394 393 // Compute CID and store data 395 394 let cid = self.storage.put(&cbor).await?; ··· 397 396 // Validate prev if provided 398 397 if let Some(prev_cid) = prev { 399 398 if &cid != prev_cid { 400 - return Err(crate::error::RepoError::invalid(format!( 399 + return Err(RepoError::invalid(format!( 401 400 "Update prev CID mismatch for key {}: expected {}, got {}", 402 401 key, prev_cid, cid 403 402 ))); ··· 414 413 let key = format_smolstr!("{}/{}", collection.as_ref(), rkey.as_ref()); 415 414 416 415 // Check exists 417 - let current = self.mst.get(key.as_str()).await?.ok_or_else(|| { 418 - crate::error::RepoError::not_found("record", key.as_str()) 419 - })?; 416 + let current = self 417 + .mst 418 + .get(key.as_str()) 419 + .await? 420 + .ok_or_else(|| RepoError::not_found("record", key.as_str()))?; 420 421 421 422 // Validate prev if provided 422 423 if let Some(prev_cid) = prev { 423 424 if &current != prev_cid { 424 - return Err(crate::error::RepoError::invalid(format!( 425 + return Err(RepoError::invalid(format!( 425 426 "Delete prev CID mismatch for key {}: expected {}, got {}", 426 427 key, prev_cid, current 427 428 ))); ··· 518 519 .storage 519 520 .get(&commit_cid) 520 521 .await? 521 - .ok_or_else(|| crate::error::RepoError::not_found("commit block", &commit_cid))?; 522 + .ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?; 522 523 let commit = Commit::from_cbor(&commit_bytes)?; 523 524 524 525 self.commit = commit.into_static(); ··· 540 541 did: &Did<'_>, 541 542 prev: Option<IpldCid>, 542 543 signing_key: &K, 543 - ) -> Result<(Vec<crate::commit::firehose::RepoOp<'static>>, IpldCid)> 544 + ) -> Result<(Vec<RepoOp<'static>>, IpldCid)> 544 545 where 545 - K: crate::commit::SigningKey, 546 + K: SigningKey, 546 547 { 547 548 let (ops, commit_data) = self.create_commit(&[], did, prev, signing_key).await?; 548 549 Ok((ops, self.apply_commit(commit_data).await?)) ··· 581 582 582 583 #[cfg(test)] 583 584 mod tests { 584 - use std::str::FromStr; 585 + use std::{collections::BTreeMap, str::FromStr}; 585 586 586 587 use super::*; 587 588 use crate::storage::MemoryBlockStore; 588 - use jacquard_common::types::recordkey::Rkey; 589 + use jacquard_common::types::{ 590 + crypto::{KeyCodec, PublicKey}, 591 + recordkey::Rkey, 592 + value::RawData, 593 + }; 589 594 use smol_str::SmolStr; 590 595 591 596 fn make_test_cid(value: u8) -> IpldCid { ··· 598 603 IpldCid::new_v1(DAG_CBOR_CID_CODEC, mh) 599 604 } 600 605 601 - fn make_test_record( 602 - n: u32, 603 - ) -> std::collections::BTreeMap<SmolStr, jacquard_common::types::value::RawData<'static>> { 604 - use jacquard_common::types::value::RawData; 605 - use smol_str::SmolStr; 606 - 607 - let mut record = std::collections::BTreeMap::new(); 606 + fn make_test_record(n: u32) -> BTreeMap<SmolStr, RawData<'static>> { 607 + let mut record = BTreeMap::new(); 608 608 record.insert( 609 609 SmolStr::new("$type"), 610 610 RawData::String("app.bsky.feed.post".into()), ··· 887 887 888 888 #[tokio::test] 889 889 async fn test_commit_signature_verification() { 890 - use jacquard_common::types::crypto::{KeyCodec, PublicKey}; 891 - 892 890 let storage = Arc::new(MemoryBlockStore::new()); 893 891 let mut repo = create_test_repo(storage.clone()).await; 894 892 ··· 1085 1083 1086 1084 // Verify we can deserialize the record data 1087 1085 let record1_bytes = commit_data.blocks.get(&cid1).unwrap(); 1088 - let record1: std::collections::BTreeMap<SmolStr, jacquard_common::types::value::RawData> = 1086 + let record1: BTreeMap<SmolStr, RawData> = 1089 1087 serde_ipld_dagcbor::from_slice(record1_bytes).unwrap(); 1090 1088 assert_eq!( 1091 1089 record1.get(&SmolStr::new("text")).unwrap(),
+11 -6
crates/jacquard-repo/src/storage/layered.rs
··· 96 96 self.base.has(cid).await 97 97 } 98 98 99 - async fn put_many(&self, blocks: impl IntoIterator<Item = (IpldCid, Bytes)> + Send) -> Result<()> { 99 + async fn put_many( 100 + &self, 101 + blocks: impl IntoIterator<Item = (IpldCid, Bytes)> + Send, 102 + ) -> Result<()> { 100 103 // All writes go to writable layer 101 104 self.writable.put_many(blocks).await 102 105 } ··· 119 122 120 123 #[cfg(test)] 121 124 mod tests { 125 + use std::sync::Arc; 126 + 122 127 use super::*; 123 128 use crate::storage::MemoryBlockStore; 124 129 125 130 #[tokio::test] 126 131 async fn test_layered_read_from_writable() { 127 - let base = std::sync::Arc::new(MemoryBlockStore::new()); 132 + let base = Arc::new(MemoryBlockStore::new()); 128 133 let writable = MemoryBlockStore::new(); 129 134 130 135 // Put data in writable layer ··· 139 144 140 145 #[tokio::test] 141 146 async fn test_layered_fallback_to_base() { 142 - let base = std::sync::Arc::new(MemoryBlockStore::new()); 147 + let base = Arc::new(MemoryBlockStore::new()); 143 148 let writable = MemoryBlockStore::new(); 144 149 145 150 // Put data in base layer ··· 154 159 155 160 #[tokio::test] 156 161 async fn test_layered_writable_overrides_base() { 157 - let base = std::sync::Arc::new(MemoryBlockStore::new()); 162 + let base = Arc::new(MemoryBlockStore::new()); 158 163 let writable = MemoryBlockStore::new(); 159 164 160 165 // Put same content in both layers (will have same CID) ··· 183 188 184 189 #[tokio::test] 185 190 async fn test_layered_writes_to_writable_only() { 186 - let base = std::sync::Arc::new(MemoryBlockStore::new()); 191 + let base = Arc::new(MemoryBlockStore::new()); 187 192 let writable = MemoryBlockStore::new(); 188 193 189 194 let layered = LayeredBlockStore::new(writable.clone(), base.clone()); ··· 200 205 201 206 #[tokio::test] 202 207 async fn test_layered_has_checks_both_layers() { 203 - let base = std::sync::Arc::new(MemoryBlockStore::new()); 208 + let base = Arc::new(MemoryBlockStore::new()); 204 209 let writable = MemoryBlockStore::new(); 205 210 206 211 let base_cid = base.put(b"base").await.unwrap();
+6 -3
crates/jacquard-repo/src/storage/mod.rs
··· 1 1 //! Block storage abstraction for MST nodes and records 2 2 3 + use crate::{error::Result, repo::CommitData}; 3 4 use bytes::Bytes; 4 5 use cid::Cid as IpldCid; 5 - use crate::error::Result; 6 6 7 7 /// Async block storage trait 8 8 /// ··· 68 68 /// 69 69 /// The provided CIDs should match the data, but implementations may choose to 70 70 /// recalculate and validate them. 71 - async fn put_many(&self, blocks: impl IntoIterator<Item = (IpldCid, Bytes)> + Send) -> Result<()>; 71 + async fn put_many( 72 + &self, 73 + blocks: impl IntoIterator<Item = (IpldCid, Bytes)> + Send, 74 + ) -> Result<()>; 72 75 73 76 /// Get multiple blocks at once (optimization for batch reads) 74 77 /// ··· 87 90 /// This should be atomic where possible - either both operations succeed or both fail. 88 91 /// For implementations that don't support atomic operations, writes should happen first, 89 92 /// then deletes. 90 - async fn apply_commit(&self, commit: crate::repo::CommitData) -> Result<()>; 93 + async fn apply_commit(&self, commit: CommitData) -> Result<()>; 91 94 } 92 95 93 96 pub mod file;