tangled
alpha
login
or
join now
nonbinary.computer
/
jacquard
81
fork
atom
A better Rust ATProto crate
81
fork
atom
overview
issues
9
pulls
pipelines
swapped repo firehose types to use generated api
Orual
2 days ago
f2f45e46
bc139173
0/1
build.yml
failed
1min 11s
+244
-377
6 changed files
expand all
collapse all
unified
split
Cargo.lock
crates
jacquard-repo
Cargo.toml
src
commit
firehose.rs
mst
diff.rs
repo.rs
tests
large_proof_tests.rs
+1
Cargo.lock
···
2628
"ed25519-dalek",
2629
"hex",
2630
"iroh-car",
0
2631
"jacquard-common",
2632
"jacquard-derive",
2633
"k256",
···
2628
"ed25519-dalek",
2629
"hex",
2630
"iroh-car",
2631
+
"jacquard-api",
2632
"jacquard-common",
2633
"jacquard-derive",
2634
"k256",
+1
crates/jacquard-repo/Cargo.toml
···
18
# Internal
19
jacquard-common = { path = "../jacquard-common", version = "0.9", features = ["crypto-ed25519", "crypto-k256", "crypto-p256"] }
20
jacquard-derive = { path = "../jacquard-derive", version = "0.9" }
0
21
22
# Serialization
23
serde.workspace = true
···
18
# Internal
19
jacquard-common = { path = "../jacquard-common", version = "0.9", features = ["crypto-ed25519", "crypto-k256", "crypto-p256"] }
20
jacquard-derive = { path = "../jacquard-derive", version = "0.9" }
21
+
jacquard-api = { path = "../jacquard-api", version = "0.9", features = ["streaming"] }
22
23
# Serialization
24
serde.workspace = true
+215
-360
crates/jacquard-repo/src/commit/firehose.rs
···
4
//! to avoid a dependency on the full API crate. They represent firehose protocol messages,
5
//! which are DISTINCT from repository commit objects.
6
7
-
use bytes::Bytes;
8
-
use jacquard_common::types::cid::CidLink;
0
9
use jacquard_common::types::crypto::PublicKey;
10
-
use jacquard_common::types::string::{Datetime, Did, Tid};
11
-
use jacquard_common::{CowStr, IntoStatic};
12
use smol_str::ToSmolStr;
13
14
-
/// Firehose commit message (sync v1.0 and v1.1)
15
-
///
16
-
/// Represents an update of repository state in the firehose stream.
17
-
/// This is the message format sent over `com.atproto.sync.subscribeRepos`.
18
///
19
-
/// **Sync v1.0 vs v1.1:**
20
-
/// - v1.0: `prev_data` is None/skipped, consumers must have sufficient previous repository state to validate
21
-
/// - v1.1: `prev_data` includes previous MST root for inductive validation
22
-
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
23
-
#[serde(rename_all = "camelCase")]
24
-
pub struct FirehoseCommit<'a> {
25
-
/// The repo this event comes from
26
-
#[serde(borrow)]
27
-
pub repo: Did<'a>,
28
-
29
-
/// The rev of the emitted commit
30
-
pub rev: Tid,
31
-
32
-
/// The stream sequence number of this message
33
-
pub seq: i64,
34
-
35
-
/// The rev of the last emitted commit from this repo (if any)
36
-
pub since: Tid,
37
-
38
-
/// Timestamp of when this message was originally broadcast
39
-
pub time: Datetime,
40
-
41
-
/// Repo commit object CID
42
-
///
43
-
/// This CID points to the repository commit block (with did, version, data, rev, prev, sig).
44
-
/// It must be the first entry in the CAR header 'roots' list.
45
-
#[serde(borrow)]
46
-
pub commit: CidLink<'a>,
47
-
48
-
/// CAR file containing relevant blocks
49
-
///
50
-
/// Contains blocks as a diff since the previous repo state. The commit block
51
-
/// must be included, and its CID must be the first root in the CAR header.
52
-
///
53
-
/// For sync v1.1, may include additional MST node blocks needed for operation inversion.
54
-
#[serde(with = "super::serde_bytes_helper")]
55
-
pub blocks: Bytes,
56
-
57
-
/// Operations in this commit
58
-
#[serde(borrow)]
59
-
pub ops: Vec<RepoOp<'a>>,
60
-
61
-
/// Previous MST root CID (sync v1.1 only)
62
-
///
63
-
/// The root CID of the MST tree for the previous commit (indicated by the 'since' field).
64
-
/// Corresponds to the 'data' field in the previous repo commit object.
65
-
///
66
-
/// **Sync v1.1 inductive validation:**
67
-
/// - Enables validation without local MST state
68
-
/// - Operations can be inverted (creates→deletes, deletes→creates with prev values)
69
-
/// - Required for "inductive firehose" consumption
70
-
///
71
-
/// **Sync v1.0:**
72
-
/// - This field is None
73
-
/// - Consumers must have previous repository state
74
-
#[serde(skip_serializing_if = "Option::is_none")]
75
-
#[serde(borrow)]
76
-
pub prev_data: Option<CidLink<'a>>,
77
-
78
-
/// Blob CIDs referenced in this commit
79
-
#[serde(borrow)]
80
-
pub blobs: Vec<CidLink<'a>>,
81
-
82
-
/// DEPRECATED: Replaced by #sync event and data limits
83
-
///
84
-
/// Indicates that this commit contained too many ops, or data size was too large.
85
-
/// Consumers will need to make a separate request to get missing data.
86
-
pub too_big: bool,
87
-
88
-
/// DEPRECATED: Unused
89
-
pub rebase: bool,
90
-
}
91
-
92
-
/// A repository operation (mutation of a single record)
93
-
#[derive(Debug, Clone, PartialEq, Eq, serde::Serialize, serde::Deserialize)]
94
-
#[serde(rename_all = "camelCase")]
95
-
pub struct RepoOp<'a> {
96
-
/// Operation type: "create", "update", or "delete"
97
-
#[serde(borrow)]
98
-
pub action: CowStr<'a>,
99
-
100
-
/// Collection/rkey path (e.g., "app.bsky.feed.post/abc123")
101
-
#[serde(borrow)]
102
-
pub path: CowStr<'a>,
103
-
104
-
/// For creates and updates, the new record CID. For deletions, None (null).
105
-
#[serde(skip_serializing_if = "Option::is_none")]
106
-
#[serde(borrow)]
107
-
pub cid: Option<CidLink<'a>>,
108
-
109
-
/// For updates and deletes, the previous record CID
110
-
///
111
-
/// Required for sync v1.1 inductive firehose validation.
112
-
/// For creates, this field should not be defined.
113
-
#[serde(skip_serializing_if = "Option::is_none")]
114
-
#[serde(borrow)]
115
-
pub prev: Option<CidLink<'a>>,
116
-
}
117
-
118
-
impl<'a> RepoOp<'a> {
119
-
/// Convert to VerifiedWriteOp for v1.1 validation
120
-
///
121
-
/// Validates that all required fields are present for inversion.
122
-
pub fn to_invertible_op(&self) -> Result<VerifiedWriteOp> {
123
-
let key = self.path.to_smolstr();
124
-
125
-
match self.action.as_ref() {
126
-
"create" => {
127
-
let cid = self
128
-
.cid
129
-
.as_ref()
130
-
.ok_or_else(|| RepoError::invalid_commit("create operation missing cid field"))?
131
-
.to_ipld()
132
-
.map_err(|e| RepoError::invalid_cid_conversion(e, "create cid"))?;
133
-
134
-
Ok(VerifiedWriteOp::Create { key, cid })
135
-
}
136
-
"update" => {
137
-
let cid = self
138
-
.cid
139
-
.as_ref()
140
-
.ok_or_else(|| RepoError::invalid_commit("update operation missing cid field"))?
141
-
.to_ipld()
142
-
.map_err(|e| RepoError::invalid_cid_conversion(e, "update cid"))?;
143
-
144
-
let prev = self
145
-
.prev
146
-
.as_ref()
147
-
.ok_or_else(|| {
148
-
RepoError::invalid_commit(
149
-
"update operation missing prev field for v1.1 validation",
150
-
)
151
-
})?
152
-
.to_ipld()
153
-
.map_err(|e| RepoError::invalid_cid_conversion(e, "update prev"))?;
154
-
155
-
Ok(VerifiedWriteOp::Update { key, cid, prev })
156
-
}
157
-
"delete" => {
158
-
let prev = self
159
-
.prev
160
-
.as_ref()
161
-
.ok_or_else(|| {
162
-
RepoError::invalid_commit(
163
-
"delete operation missing prev field for v1.1 validation",
164
-
)
165
-
})?
166
-
.to_ipld()
167
-
.map_err(|e| RepoError::invalid_cid_conversion(e, "delete prev"))?;
168
169
-
Ok(VerifiedWriteOp::Delete { key, prev })
170
-
}
171
-
action => Err(RepoError::invalid_commit(format!(
172
-
"unknown action type: {}",
173
-
action
174
-
))),
175
}
176
-
}
177
-
}
0
0
0
0
0
178
179
-
impl IntoStatic for FirehoseCommit<'_> {
180
-
type Output = FirehoseCommit<'static>;
0
0
0
0
0
0
0
0
181
182
-
fn into_static(self) -> Self::Output {
183
-
FirehoseCommit {
184
-
repo: self.repo.into_static(),
185
-
rev: self.rev,
186
-
seq: self.seq,
187
-
since: self.since,
188
-
time: self.time,
189
-
commit: self.commit.into_static(),
190
-
blocks: self.blocks,
191
-
ops: self.ops.into_iter().map(|op| op.into_static()).collect(),
192
-
prev_data: self.prev_data.map(|pd| pd.into_static()),
193
-
blobs: self.blobs.into_iter().map(|b| b.into_static()).collect(),
194
-
too_big: self.too_big,
195
-
rebase: self.rebase,
196
}
197
-
}
198
-
}
199
-
200
-
impl IntoStatic for RepoOp<'_> {
201
-
type Output = RepoOp<'static>;
0
0
0
0
0
0
202
203
-
fn into_static(self) -> Self::Output {
204
-
RepoOp {
205
-
action: self.action.into_static(),
206
-
path: self.path.into_static(),
207
-
cid: self.cid.into_static(),
208
-
prev: self.prev.map(|p| p.into_static()),
209
}
0
0
0
0
210
}
211
}
212
···
220
use cid::Cid as IpldCid;
221
use std::sync::Arc;
222
223
-
impl<'a> FirehoseCommit<'a> {
224
-
/// Validate a sync v1.0 commit
225
-
///
226
-
/// **Requirements:**
227
-
/// - Must have previous MST state (potentially full repository)
228
-
/// - All blocks needed for validation must be in `self.blocks`
229
-
///
230
-
/// **Validation steps:**
231
-
/// 1. Parse CAR blocks from `self.blocks` into temporary storage
232
-
/// 2. Load commit object and verify signature
233
-
/// 3. Apply operations to previous MST (using temporary storage for new blocks)
234
-
/// 4. Verify result matches commit.data (new MST root)
235
-
///
236
-
/// Returns the new MST root CID on success.
237
-
pub async fn validate_v1_0<S: BlockStore + Sync + 'static>(
238
-
&self,
239
-
prev_mst_root: Option<IpldCid>,
240
-
prev_storage: Arc<S>,
241
-
pubkey: &PublicKey<'_>,
242
-
) -> Result<IpldCid> {
243
-
// 1. Parse CAR blocks from the firehose message into temporary storage
244
-
let parsed = parse_car_bytes(&self.blocks).await?;
245
-
let temp_storage = MemoryBlockStore::new_from_blocks(parsed.blocks);
246
247
-
// 2. Create layered storage: reads from temp first, then prev; writes to temp only
248
-
// This avoids copying all previous MST blocks
249
-
let layered_storage = LayeredBlockStore::new(temp_storage.clone(), prev_storage);
250
251
-
// 3. Extract and verify commit object from temporary storage
252
-
let commit_cid: IpldCid = self
253
-
.commit
254
-
.to_ipld()
255
-
.map_err(|e| RepoError::invalid_cid_conversion(e, "commit CID"))?;
256
-
let commit_bytes = temp_storage
257
-
.get(&commit_cid)
258
-
.await?
259
-
.ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?;
260
261
-
let commit = super::Commit::from_cbor(&commit_bytes)?;
262
263
-
// Verify DID matches
264
-
if commit.did().as_ref() != self.repo.as_ref() {
265
-
return Err(RepoError::invalid_commit(format!(
266
"DID mismatch: commit has {}, message has {}",
267
commit.did(),
268
-
self.repo
269
))
270
.with_help("DID mismatch indicates the commit was signed by a different identity - verify the commit is from the expected repository"));
271
-
}
272
-
273
-
// Verify signature
274
-
commit.verify(pubkey)?;
275
276
-
let layered_arc = Arc::new(layered_storage);
0
277
278
-
// 4. Load previous MST state from layered storage (or start empty)
279
-
let prev_mst = if let Some(prev_root) = prev_mst_root {
280
-
Mst::load(layered_arc.clone(), prev_root, None)
281
-
} else {
282
-
Mst::new(layered_arc.clone())
283
-
};
284
285
-
// 5. Load new MST from commit.data (claimed result)
286
-
let expected_root = *commit.data();
287
-
let new_mst = Mst::load(layered_arc, expected_root, None);
0
0
0
288
289
-
// 6. Compute diff to get verified write ops (with actual prev values from tree state)
290
-
let diff = prev_mst.diff(&new_mst).await?;
291
-
let verified_ops = diff.to_verified_ops();
292
293
-
// 7. Apply verified ops to prev MST
294
-
let computed_mst = prev_mst.batch(&verified_ops).await?;
0
295
296
-
// 8. Verify computed result matches claimed result
297
-
let computed_root = computed_mst.get_pointer().await?;
298
299
-
if computed_root != expected_root {
300
-
return Err(RepoError::cid_mismatch(format!(
301
-
"MST root mismatch: expected {}, got {}",
302
-
expected_root, computed_root
303
-
)));
304
-
}
305
306
-
Ok(expected_root)
0
0
0
0
307
}
308
309
-
/// Validate a sync v1.1 commit (inductive validation)
310
-
///
311
-
/// **Requirements:**
312
-
/// - `self.prev_data` must be Some (contains previous MST root)
313
-
/// - All blocks needed for validation must be in `self.blocks`
314
-
///
315
-
/// **Validation steps:**
316
-
/// 1. Parse CAR blocks from `self.blocks` into temporary storage
317
-
/// 2. Load commit object and verify signature
318
-
/// 3. Start from `prev_data` MST root (loaded from temp storage)
319
-
/// 4. Apply operations (with prev CID validation for updates/deletes)
320
-
/// 5. Verify result matches commit.data (new MST root)
321
-
///
322
-
/// Returns the new MST root CID on success.
323
-
///
324
-
/// **Inductive property:** Can validate without any external state besides the blocks
325
-
/// in this message. The `prev_data` field provides the starting MST root, and operations
326
-
/// include `prev` CIDs for validation. All necessary blocks must be in the CAR bytes.
327
-
///
328
-
/// Note: Because this uses the same merkle search tree struct as the repository itself,
329
-
/// this is far from the most efficient possible validation function possible. The repo
330
-
/// tree struct carries extra information. However,
331
-
/// it has the virtue of making everything self-validating.
332
-
pub async fn validate_v1_1(&self, pubkey: &PublicKey<'_>) -> Result<IpldCid> {
333
-
// 1. Require prev_data for v1.1
334
-
let prev_data_cid: IpldCid = self
335
-
.prev_data
336
-
.as_ref()
337
-
.ok_or_else(|| {
338
-
RepoError::invalid_commit("Sync v1.1 validation requires prev_data field")
339
-
})?
340
-
.to_ipld()
341
-
.map_err(|e| RepoError::invalid_cid_conversion(e, "prev_data CID"))?;
0
342
343
-
// 2. Parse CAR blocks from the firehose message into temporary storage
344
-
let parsed = parse_car_bytes(&self.blocks).await?;
345
346
-
let temp_storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
347
348
-
// 3. Extract and verify commit object from temporary storage
349
-
let commit_cid: IpldCid = self
350
-
.commit
351
-
.to_ipld()
352
-
.map_err(|e| RepoError::invalid_cid_conversion(e, "commit CID"))?;
353
-
let commit_bytes = temp_storage
354
-
.get(&commit_cid)
355
-
.await?
356
-
.ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?;
357
358
-
let commit = super::Commit::from_cbor(&commit_bytes)?;
359
360
-
// Verify DID matches
361
-
if commit.did().as_ref() != self.repo.as_ref() {
362
-
return Err(RepoError::invalid_commit(format!(
363
"DID mismatch: commit has {}, message has {}",
364
commit.did(),
365
-
self.repo
366
))
367
.with_help("DID mismatch indicates the commit was signed by a different identity - verify the commit is from the expected repository"));
368
-
}
369
370
-
// Verify signature
371
-
commit.verify(pubkey)?;
372
373
-
// 5. Load new MST from commit.data (claimed result)
374
-
let expected_root = *commit.data();
375
376
-
let mut new_mst = Mst::load(temp_storage, expected_root, None);
377
378
-
let verified_ops = self
379
-
.ops
380
-
.iter()
381
-
.filter_map(|op| op.to_invertible_op().ok())
382
-
.collect::<Vec<_>>();
383
-
if verified_ops.len() != self.ops.len() {
384
-
return Err(RepoError::invalid_commit(format!(
385
-
"Invalid commit: expected {} ops, got {}",
386
-
self.ops.len(),
387
-
verified_ops.len()
388
-
)));
389
-
}
390
391
-
for op in verified_ops {
392
-
if let Ok(inverted) = new_mst.invert_op(op.clone()).await {
393
-
if !inverted {
394
-
return Err(RepoError::invalid_commit(format!(
395
-
"Invalid commit: op {:?} is not invertible",
396
-
op
397
-
)));
398
-
}
399
}
400
}
401
-
// 8. Verify computed previous state matches claimed previous state
402
-
let computed_root = new_mst.get_pointer().await?;
0
403
404
-
if computed_root != prev_data_cid {
405
-
return Err(RepoError::cid_mismatch(format!(
406
-
"MST root mismatch: expected {}, got {}",
407
-
prev_data_cid, computed_root
408
-
)));
409
-
}
410
411
-
Ok(expected_root)
412
-
}
413
}
414
415
#[cfg(test)]
···
419
use crate::commit::Commit;
420
use crate::mst::{Mst, RecordWriteOp};
421
use crate::storage::MemoryBlockStore;
0
422
use jacquard_common::types::crypto::{KeyCodec, PublicKey};
0
423
use jacquard_common::types::recordkey::Rkey;
424
-
use jacquard_common::types::string::{Nsid, RecordKey};
425
use jacquard_common::types::tid::Ticker;
426
use jacquard_common::types::value::RawData;
427
use smol_str::SmolStr;
···
507
.unwrap();
508
509
// Validate using v1.1 validation
510
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
511
if let Err(ref e) = result {
512
eprintln!("Validation error: {}", e);
513
}
···
560
firehose_commit.prev_data = None;
561
562
// Validate using v1.0 validation with previous storage
563
-
let result = firehose_commit
564
-
.validate_v1_0(Some(prev_root), storage.clone(), &pubkey)
565
-
.await;
566
567
assert!(result.is_ok(), "Valid v1.0 commit should pass validation");
568
···
612
.await
613
.unwrap();
614
615
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
616
assert!(result.is_ok(), "Multiple creates should validate");
617
}
618
···
685
.await
686
.unwrap();
687
688
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
689
assert!(
690
result.is_ok(),
691
"Update and delete operations should validate"
···
740
741
firehose_commit.blocks = bad_car.into();
742
743
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
744
assert!(
745
result.is_err(),
746
"Validation should fail when commit block is missing"
···
802
803
firehose_commit.blocks = bad_car.into();
804
805
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
806
assert!(
807
result.is_err(),
808
"Validation should fail when MST blocks are missing"
···
863
.await
864
.unwrap();
865
866
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
867
assert!(
868
result.is_err(),
869
"Validation should fail when commit has wrong MST root"
···
905
906
firehose_commit.repo = wrong_did;
907
908
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
909
assert!(
910
result.is_err(),
911
"Validation should fail with mismatched DID"
···
952
.await
953
.unwrap();
954
955
-
let result = firehose_commit.validate_v1_1(&wrong_pubkey).await;
956
assert!(
957
result.is_err(),
958
"Validation should fail with wrong public key"
···
993
// Strip prev_data to make it invalid for v1.1
994
firehose_commit.prev_data = None;
995
996
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
997
assert!(
998
result.is_err(),
999
"v1.1 validation should fail without prev_data"
···
1040
// Use wrong prev_data CID (point to commit instead of MST root)
1041
firehose_commit.prev_data = Some(firehose_commit.commit.clone());
1042
1043
-
let result = firehose_commit.validate_v1_1(&pubkey).await;
1044
assert!(
1045
result.is_err(),
1046
"Validation should fail with wrong prev_data CID"
···
4
//! to avoid a dependency on the full API crate. They represent firehose protocol messages,
5
//! which are DISTINCT from repository commit objects.
6
7
+
pub use jacquard_api::com_atproto::sync::subscribe_repos::Commit as FirehoseCommit;
8
+
pub use jacquard_api::com_atproto::sync::subscribe_repos::RepoOp;
9
+
use jacquard_api::com_atproto::sync::subscribe_repos::{Commit, RepoOpAction};
10
use jacquard_common::types::crypto::PublicKey;
0
0
11
use smol_str::ToSmolStr;
12
13
+
/// Convert to VerifiedWriteOp for v1.1 validation
0
0
0
14
///
15
+
/// Validates that all required fields are present for inversion.
16
+
pub fn to_invertible_op(op: &RepoOp<'_>) -> Result<VerifiedWriteOp> {
17
+
let key = op.path.to_smolstr();
18
+
match op.action {
19
+
RepoOpAction::Create => {
20
+
let cid = op
21
+
.cid
22
+
.as_ref()
23
+
.ok_or_else(|| RepoError::invalid_commit("create operation missing cid field"))?
24
+
.to_ipld()
25
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "create cid"))?;
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
0
26
27
+
Ok(VerifiedWriteOp::Create { key, cid })
0
0
0
0
0
28
}
29
+
RepoOpAction::Update => {
30
+
let cid = op
31
+
.cid
32
+
.as_ref()
33
+
.ok_or_else(|| RepoError::invalid_commit("update operation missing cid field"))?
34
+
.to_ipld()
35
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "update cid"))?;
36
37
+
let prev = op
38
+
.prev
39
+
.as_ref()
40
+
.ok_or_else(|| {
41
+
RepoError::invalid_commit(
42
+
"update operation missing prev field for v1.1 validation",
43
+
)
44
+
})?
45
+
.to_ipld()
46
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "update prev"))?;
47
48
+
Ok(VerifiedWriteOp::Update { key, cid, prev })
0
0
0
0
0
0
0
0
0
0
0
0
0
49
}
50
+
RepoOpAction::Delete => {
51
+
let prev = op
52
+
.prev
53
+
.as_ref()
54
+
.ok_or_else(|| {
55
+
RepoError::invalid_commit(
56
+
"delete operation missing prev field for v1.1 validation",
57
+
)
58
+
})?
59
+
.to_ipld()
60
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "delete prev"))?;
61
62
+
Ok(VerifiedWriteOp::Delete { key, prev })
0
0
0
0
0
63
}
64
+
RepoOpAction::Other(ref action) => Err(RepoError::invalid_commit(format!(
65
+
"unknown action type: {}",
66
+
action
67
+
))),
68
}
69
}
70
···
78
use cid::Cid as IpldCid;
79
use std::sync::Arc;
80
81
+
/// Validate a sync v1.0 commit
82
+
///
83
+
/// **Requirements:**
84
+
/// - Must have previous MST state (potentially full repository)
85
+
/// - All blocks needed for validation must be in `self.blocks`
86
+
///
87
+
/// **Validation steps:**
88
+
/// 1. Parse CAR blocks from `self.blocks` into temporary storage
89
+
/// 2. Load commit object and verify signature
90
+
/// 3. Apply operations to previous MST (using temporary storage for new blocks)
91
+
/// 4. Verify result matches commit.data (new MST root)
92
+
///
93
+
/// Returns the new MST root CID on success.
94
+
pub async fn validate_v1_0<S: BlockStore + Sync + 'static>(
95
+
fh_commit: &Commit<'_>,
96
+
prev_mst_root: Option<IpldCid>,
97
+
prev_storage: Arc<S>,
98
+
pubkey: &PublicKey<'_>,
99
+
) -> Result<IpldCid> {
100
+
// 1. Parse CAR blocks from the firehose message into temporary storage
101
+
let parsed = parse_car_bytes(&fh_commit.blocks).await?;
102
+
let temp_storage = MemoryBlockStore::new_from_blocks(parsed.blocks);
0
103
104
+
// 2. Create layered storage: reads from temp first, then prev; writes to temp only
105
+
// This avoids copying all previous MST blocks
106
+
let layered_storage = LayeredBlockStore::new(temp_storage.clone(), prev_storage);
107
108
+
// 3. Extract and verify commit object from temporary storage
109
+
let commit_cid: IpldCid = fh_commit
110
+
.commit
111
+
.to_ipld()
112
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "commit CID"))?;
113
+
let commit_bytes = temp_storage
114
+
.get(&commit_cid)
115
+
.await?
116
+
.ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?;
117
118
+
let commit = super::Commit::from_cbor(&commit_bytes)?;
119
120
+
// Verify DID matches
121
+
if commit.did().as_ref() != fh_commit.repo.as_ref() {
122
+
return Err(RepoError::invalid_commit(format!(
123
"DID mismatch: commit has {}, message has {}",
124
commit.did(),
125
+
fh_commit.repo
126
))
127
.with_help("DID mismatch indicates the commit was signed by a different identity - verify the commit is from the expected repository"));
128
+
}
0
0
0
129
130
+
// Verify signature
131
+
commit.verify(pubkey)?;
132
133
+
let layered_arc = Arc::new(layered_storage);
0
0
0
0
0
134
135
+
// 4. Load previous MST state from layered storage (or start empty)
136
+
let prev_mst = if let Some(prev_root) = prev_mst_root {
137
+
Mst::load(layered_arc.clone(), prev_root, None)
138
+
} else {
139
+
Mst::new(layered_arc.clone())
140
+
};
141
142
+
// 5. Load new MST from commit.data (claimed result)
143
+
let expected_root = *commit.data();
144
+
let new_mst = Mst::load(layered_arc, expected_root, None);
145
146
+
// 6. Compute diff to get verified write ops (with actual prev values from tree state)
147
+
let diff = prev_mst.diff(&new_mst).await?;
148
+
let verified_ops = diff.to_verified_ops();
149
150
+
// 7. Apply verified ops to prev MST
151
+
let computed_mst = prev_mst.batch(&verified_ops).await?;
152
153
+
// 8. Verify computed result matches claimed result
154
+
let computed_root = computed_mst.get_pointer().await?;
0
0
0
0
155
156
+
if computed_root != expected_root {
157
+
return Err(RepoError::cid_mismatch(format!(
158
+
"MST root mismatch: expected {}, got {}",
159
+
expected_root, computed_root
160
+
)));
161
}
162
163
+
Ok(expected_root)
164
+
}
165
+
166
+
/// Validate a sync v1.1 commit (inductive validation)
167
+
///
168
+
/// **Requirements:**
169
+
/// - `self.prev_data` must be Some (contains previous MST root)
170
+
/// - All blocks needed for validation must be in `self.blocks`
171
+
///
172
+
/// **Validation steps:**
173
+
/// 1. Parse CAR blocks from `self.blocks` into temporary storage
174
+
/// 2. Load commit object and verify signature
175
+
/// 3. Start from `prev_data` MST root (loaded from temp storage)
176
+
/// 4. Apply operations (with prev CID validation for updates/deletes)
177
+
/// 5. Verify result matches commit.data (new MST root)
178
+
///
179
+
/// Returns the new MST root CID on success.
180
+
///
181
+
/// **Inductive property:** Can validate without any external state besides the blocks
182
+
/// in this message. The `prev_data` field provides the starting MST root, and operations
183
+
/// include `prev` CIDs for validation. All necessary blocks must be in the CAR bytes.
184
+
///
185
+
/// Note: Because this uses the same merkle search tree struct as the repository itself,
186
+
/// this is far from the most efficient possible validation function possible. The repo
187
+
/// tree struct carries extra information. However,
188
+
/// it has the virtue of making everything self-validating.
189
+
pub async fn validate_v1_1(fh_commit: &Commit<'_>, pubkey: &PublicKey<'_>) -> Result<IpldCid> {
190
+
// 1. Require prev_data for v1.1
191
+
let prev_data_cid: IpldCid = fh_commit
192
+
.prev_data
193
+
.as_ref()
194
+
.ok_or_else(|| RepoError::invalid_commit("Sync v1.1 validation requires prev_data field"))?
195
+
.to_ipld()
196
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "prev_data CID"))?;
197
198
+
// 2. Parse CAR blocks from the firehose message into temporary storage
199
+
let parsed = parse_car_bytes(&fh_commit.blocks).await?;
200
201
+
let temp_storage = Arc::new(MemoryBlockStore::new_from_blocks(parsed.blocks));
202
203
+
// 3. Extract and verify commit object from temporary storage
204
+
let commit_cid: IpldCid = fh_commit
205
+
.commit
206
+
.to_ipld()
207
+
.map_err(|e| RepoError::invalid_cid_conversion(e, "commit CID"))?;
208
+
let commit_bytes = temp_storage
209
+
.get(&commit_cid)
210
+
.await?
211
+
.ok_or_else(|| RepoError::not_found("commit block", &commit_cid))?;
212
213
+
let commit = super::Commit::from_cbor(&commit_bytes)?;
214
215
+
// Verify DID matches
216
+
if commit.did().as_ref() != fh_commit.repo.as_ref() {
217
+
return Err(RepoError::invalid_commit(format!(
218
"DID mismatch: commit has {}, message has {}",
219
commit.did(),
220
+
fh_commit.repo
221
))
222
.with_help("DID mismatch indicates the commit was signed by a different identity - verify the commit is from the expected repository"));
223
+
}
224
225
+
// Verify signature
226
+
commit.verify(pubkey)?;
227
228
+
// 5. Load new MST from commit.data (claimed result)
229
+
let expected_root = *commit.data();
230
231
+
let mut new_mst = Mst::load(temp_storage, expected_root, None);
232
233
+
let verified_ops = fh_commit
234
+
.ops
235
+
.iter()
236
+
.filter_map(|op| to_invertible_op(op).ok())
237
+
.collect::<Vec<_>>();
238
+
if verified_ops.len() != fh_commit.ops.len() {
239
+
return Err(RepoError::invalid_commit(format!(
240
+
"Invalid commit: expected {} ops, got {}",
241
+
fh_commit.ops.len(),
242
+
verified_ops.len()
243
+
)));
244
+
}
245
246
+
for op in verified_ops {
247
+
if let Ok(inverted) = new_mst.invert_op(op.clone()).await {
248
+
if !inverted {
249
+
return Err(RepoError::invalid_commit(format!(
250
+
"Invalid commit: op {:?} is not invertible",
251
+
op
252
+
)));
0
253
}
254
}
255
+
}
256
+
// 8. Verify computed previous state matches claimed previous state
257
+
let computed_root = new_mst.get_pointer().await?;
258
259
+
if computed_root != prev_data_cid {
260
+
return Err(RepoError::cid_mismatch(format!(
261
+
"MST root mismatch: expected {}, got {}",
262
+
prev_data_cid, computed_root
263
+
)));
264
+
}
265
266
+
Ok(expected_root)
0
267
}
268
269
#[cfg(test)]
···
273
use crate::commit::Commit;
274
use crate::mst::{Mst, RecordWriteOp};
275
use crate::storage::MemoryBlockStore;
276
+
use jacquard_common::IntoStatic;
277
use jacquard_common::types::crypto::{KeyCodec, PublicKey};
278
+
use jacquard_common::types::did::Did;
279
use jacquard_common::types::recordkey::Rkey;
280
+
use jacquard_common::types::string::{Datetime, Nsid, RecordKey};
281
use jacquard_common::types::tid::Ticker;
282
use jacquard_common::types::value::RawData;
283
use smol_str::SmolStr;
···
363
.unwrap();
364
365
// Validate using v1.1 validation
366
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
367
if let Err(ref e) = result {
368
eprintln!("Validation error: {}", e);
369
}
···
416
firehose_commit.prev_data = None;
417
418
// Validate using v1.0 validation with previous storage
419
+
let result =
420
+
validate_v1_0(&firehose_commit, Some(prev_root), storage.clone(), &pubkey).await;
0
421
422
assert!(result.is_ok(), "Valid v1.0 commit should pass validation");
423
···
467
.await
468
.unwrap();
469
470
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
471
assert!(result.is_ok(), "Multiple creates should validate");
472
}
473
···
540
.await
541
.unwrap();
542
543
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
544
assert!(
545
result.is_ok(),
546
"Update and delete operations should validate"
···
595
596
firehose_commit.blocks = bad_car.into();
597
598
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
599
assert!(
600
result.is_err(),
601
"Validation should fail when commit block is missing"
···
657
658
firehose_commit.blocks = bad_car.into();
659
660
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
661
assert!(
662
result.is_err(),
663
"Validation should fail when MST blocks are missing"
···
718
.await
719
.unwrap();
720
721
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
722
assert!(
723
result.is_err(),
724
"Validation should fail when commit has wrong MST root"
···
760
761
firehose_commit.repo = wrong_did;
762
763
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
764
assert!(
765
result.is_err(),
766
"Validation should fail with mismatched DID"
···
807
.await
808
.unwrap();
809
810
+
let result = validate_v1_1(&firehose_commit, &wrong_pubkey).await;
811
assert!(
812
result.is_err(),
813
"Validation should fail with wrong public key"
···
848
// Strip prev_data to make it invalid for v1.1
849
firehose_commit.prev_data = None;
850
851
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
852
assert!(
853
result.is_err(),
854
"v1.1 validation should fail without prev_data"
···
895
// Use wrong prev_data CID (point to commit instead of MST root)
896
firehose_commit.prev_data = Some(firehose_commit.commit.clone());
897
898
+
let result = validate_v1_1(&firehose_commit, &pubkey).await;
899
assert!(
900
result.is_err(),
901
"Validation should fail with wrong prev_data CID"
+19
-7
crates/jacquard-repo/src/mst/diff.rs
···
170
path: key.as_str().into(),
171
cid: Some(CidLink::from(*cid)),
172
prev: None,
0
173
});
174
}
175
···
180
path: key.as_str().into(),
181
cid: Some(CidLink::from(*new_cid)),
182
prev: Some(CidLink::from(*old_cid)),
0
183
});
184
}
185
···
190
path: key.as_str().into(),
191
cid: None, // null for deletes
192
prev: Some(CidLink::from(*old_cid)),
0
193
});
194
}
195
···
220
// Remove duplicate blocks: nodes that appear in both new_mst_blocks and removed_mst_blocks
221
// are unchanged nodes that were traversed during the diff but shouldn't be counted as created/deleted.
222
// This happens when we step into subtrees with different parent CIDs but encounter identical child nodes.
223
-
let created_set: std::collections::HashSet<_> = diff.new_mst_blocks.keys().copied().collect();
224
-
let removed_set: std::collections::HashSet<_> = diff.removed_mst_blocks.iter().copied().collect();
225
-
let duplicates: std::collections::HashSet<_> = created_set.intersection(&removed_set).copied().collect();
0
0
0
226
227
-
diff.new_mst_blocks.retain(|cid, _| !duplicates.contains(cid));
228
-
diff.removed_mst_blocks.retain(|cid| !duplicates.contains(cid));
0
0
229
230
Ok(diff)
231
}
···
420
// Serialize the MST node
421
let entries = tree.get_entries().await?;
422
let node_data = serialize_node_data(&entries).await?;
423
-
let cbor = serde_ipld_dagcbor::to_vec(&node_data)
424
-
.map_err(|e| RepoError::serialization(e).with_context(format!("serializing MST node for diff tracking: {}", tree_cid)))?;
0
0
0
0
425
426
// Track the serialized block
427
diff.new_mst_blocks.insert(tree_cid, Bytes::from(cbor));
···
170
path: key.as_str().into(),
171
cid: Some(CidLink::from(*cid)),
172
prev: None,
173
+
extra_data: None,
174
});
175
}
176
···
181
path: key.as_str().into(),
182
cid: Some(CidLink::from(*new_cid)),
183
prev: Some(CidLink::from(*old_cid)),
184
+
extra_data: None,
185
});
186
}
187
···
192
path: key.as_str().into(),
193
cid: None, // null for deletes
194
prev: Some(CidLink::from(*old_cid)),
195
+
extra_data: None,
196
});
197
}
198
···
223
// Remove duplicate blocks: nodes that appear in both new_mst_blocks and removed_mst_blocks
224
// are unchanged nodes that were traversed during the diff but shouldn't be counted as created/deleted.
225
// This happens when we step into subtrees with different parent CIDs but encounter identical child nodes.
226
+
let created_set: std::collections::HashSet<_> =
227
+
diff.new_mst_blocks.keys().copied().collect();
228
+
let removed_set: std::collections::HashSet<_> =
229
+
diff.removed_mst_blocks.iter().copied().collect();
230
+
let duplicates: std::collections::HashSet<_> =
231
+
created_set.intersection(&removed_set).copied().collect();
232
233
+
diff.new_mst_blocks
234
+
.retain(|cid, _| !duplicates.contains(cid));
235
+
diff.removed_mst_blocks
236
+
.retain(|cid| !duplicates.contains(cid));
237
238
Ok(diff)
239
}
···
428
// Serialize the MST node
429
let entries = tree.get_entries().await?;
430
let node_data = serialize_node_data(&entries).await?;
431
+
let cbor = serde_ipld_dagcbor::to_vec(&node_data).map_err(|e| {
432
+
RepoError::serialization(e).with_context(format!(
433
+
"serializing MST node for diff tracking: {}",
434
+
tree_cid
435
+
))
436
+
})?;
437
438
// Track the serialized block
439
diff.new_mst_blocks.insert(tree_cid, Bytes::from(cbor));
+2
-1
crates/jacquard-repo/src/repo.rs
···
82
repo: repo.clone().into_static(),
83
rev: self.rev.clone(),
84
seq,
85
-
since: self.since.clone().unwrap_or_else(|| self.rev.clone()),
86
time,
87
commit: CidLink::from(self.cid),
88
blocks: blocks_car.into(),
···
91
blobs,
92
too_big: false,
93
rebase: false,
0
94
})
95
}
96
}
···
82
repo: repo.clone().into_static(),
83
rev: self.rev.clone(),
84
seq,
85
+
since: Some(self.since.clone().unwrap_or_else(|| self.rev.clone())),
86
time,
87
commit: CidLink::from(self.cid),
88
blocks: blocks_car.into(),
···
91
blobs,
92
too_big: false,
93
rebase: false,
94
+
extra_data: None,
95
})
96
}
97
}
+6
-9
crates/jacquard-repo/tests/large_proof_tests.rs
···
10
use jacquard_common::types::value::RawData;
11
use jacquard_repo::Repository;
12
use jacquard_repo::car::read_car_header;
0
13
use jacquard_repo::mst::RecordWriteOp;
14
use jacquard_repo::storage::{BlockStore, MemoryBlockStore};
15
use rand::Rng;
···
224
.await
225
.unwrap();
226
227
-
firehose_commit
228
-
.validate_v1_1(&pubkey)
229
.await
230
.expect("Initial batch should validate");
231
···
266
.await
267
.unwrap();
268
269
-
firehose_commit
270
-
.validate_v1_1(&pubkey)
271
.await
272
.unwrap_or_else(|e| {
273
eprintln!(
···
336
.await
337
.unwrap();
338
339
-
firehose_commit.validate_v1_1(&pubkey).await.unwrap();
340
341
for batch_num in 1..=5000 {
342
let batch_size = rng.gen_range(1..=20);
···
355
.await
356
.unwrap();
357
358
-
firehose_commit
359
-
.validate_v1_1(&pubkey)
360
.await
361
.unwrap_or_else(|e| {
362
panic!(
···
441
.await
442
.unwrap();
443
444
-
firehose_commit
445
-
.validate_v1_1(&pubkey)
446
.await
447
.unwrap_or_else(|e| panic!("Fixture validation failed at batch {}: {}", batch_num, e));
448
}
···
10
use jacquard_common::types::value::RawData;
11
use jacquard_repo::Repository;
12
use jacquard_repo::car::read_car_header;
13
+
use jacquard_repo::commit::firehose::validate_v1_1;
14
use jacquard_repo::mst::RecordWriteOp;
15
use jacquard_repo::storage::{BlockStore, MemoryBlockStore};
16
use rand::Rng;
···
225
.await
226
.unwrap();
227
228
+
validate_v1_1(&firehose_commit, &pubkey)
0
229
.await
230
.expect("Initial batch should validate");
231
···
266
.await
267
.unwrap();
268
269
+
validate_v1_1(&firehose_commit, &pubkey)
0
270
.await
271
.unwrap_or_else(|e| {
272
eprintln!(
···
335
.await
336
.unwrap();
337
338
+
validate_v1_1(&firehose_commit, &pubkey).await.unwrap();
339
340
for batch_num in 1..=5000 {
341
let batch_size = rng.gen_range(1..=20);
···
354
.await
355
.unwrap();
356
357
+
validate_v1_1(&firehose_commit, &pubkey)
0
358
.await
359
.unwrap_or_else(|e| {
360
panic!(
···
439
.await
440
.unwrap();
441
442
+
validate_v1_1(&firehose_commit, &pubkey)
0
443
.await
444
.unwrap_or_else(|e| panic!("Fixture validation failed at batch {}: {}", batch_num, e));
445
}