lightweight
com.atproto.sync.listReposByCollection
1//! `#commit` firehose event processing — nine-step ATProto sync1.1 pipeline.
2//!
3//! ## Validation steps
4//!
5//! 1. Resolve the DID; drop if resolution fails.
6//! 2. Check account status; drop if not active.
7//! 3. Verify the commit signature; on failure, evict the cached identity and
8//! retry once with a fresh resolution (only if the key actually changed).
9//! 4. Verify that the event's `commit` CID, `repo` DID, and `rev` TID all
10//! match the deserialized commit object found in the CAR.
11//! 5. Inductive proof: inverting all ops on the new MST must yield the
12//! previous MST root (prevData). Each op's prev CID is validated in-place.
13//! 6. Drop if `rev` is not strictly newer than `repo_prev.rev`.
14//! 7. Drop if `rev` timestamp is implausibly far in the future.
15//! 8. If we have a `repo_prev`: drop and queue resync if `since` ≠ `prev.rev`.
16//! 9. If we have a `repo_prev` and `prevData` is present: drop and queue
17//! resync if the bytes don't match `prev.prev_data`.
18
19use jacquard_api::com_atproto::sync::subscribe_repos::Commit;
20use jacquard_common::types::{string::Did, tid::Tid};
21use jacquard_repo::commit::firehose::validate_v1_1;
22use tracing::{debug, error, info, warn};
23
24use super::validate::{self, CarDrop};
25use crate::identity::Resolver;
26use crate::mst::{self, mortality::ExtractResult};
27use std::sync::atomic::Ordering;
28
29use crate::storage::{
30 self, DbRef, meta,
31 pds_host::{self, Sync11Mode},
32 repo::{AccountStatus, RepoInfo, RepoPrev, RepoState},
33};
34use jacquard_common::url::Host;
35
36// ---------------------------------------------------------------------------
37// Public entry point
38// ---------------------------------------------------------------------------
39
40pub(crate) async fn process_commit_event(
41 commit: Box<Commit<'static>>,
42 seq: i64,
43 resolver: &Resolver,
44 db: &DbRef,
45 client: &crate::http::ThrottledClient,
46) -> crate::error::Result<()> {
47 let did = commit.repo.clone();
48
49 // ── Step 1: Resolve DID ──────────────────────────────────────────────────
50 let Some(resolved) = validate::resolve(&did, resolver, "commit").await else {
51 metrics::counter!("lightrail_commit_dropped_total", "reason" => "did_resolution_failed")
52 .increment(1);
53 return Ok(());
54 };
55
56 let pds_host: Option<Host> = resolved.pds.host().map(|h| h.to_owned());
57
58 // ── Step 2: Account status + desync state + PDS mode ────────────────────
59 let rev = commit.rev.clone();
60 let db2 = db.clone();
61 let did2 = did.clone();
62 let pds_host2 = pds_host.clone();
63 let now = std::time::SystemTime::now();
64 let step2 =
65 tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev, pds_host2, now))
66 .await??;
67 let (info, prev, pds_mode) = match step2 {
68 Step2Result::Proceed(info, prev, mode) => (info, prev, mode),
69 Step2Result::Drop => return Ok(()),
70 Step2Result::InactiveAccount(info, _prev) => {
71 // Our local record says inactive, but we may have missed a
72 // reactivation #account event. Check upstream before dropping.
73 if !validate::upstream_says_active(&pds_host, &did, client).await {
74 metrics::counter!("lightrail_event_dropped_total",
75 "event_type" => "commit", "reason" => "account_inactive")
76 .increment(1);
77 debug!(did = %did, status = info.status.as_str(),
78 "commit dropped: account not active (confirmed upstream)");
79 return Ok(());
80 }
81 // Upstream says active — persist the reactivation and re-run
82 // the step-2 checks with the updated status.
83 metrics::counter!("lightrail_account_reactivated_total",
84 "trigger" => "commit")
85 .increment(1);
86 info!(did = %did, "account reactivated via upstream check (triggered by #commit)");
87 let db_ra = db.clone();
88 let did_ra = did.clone();
89 let rev_ra = commit.rev.clone();
90 let pds_host_ra = pds_host.clone();
91 let step2_retry = tokio::task::spawn_blocking(move || {
92 reactivate_and_recheck(&db_ra, &did_ra, &rev_ra, pds_host_ra, now)
93 })
94 .await??;
95 match step2_retry {
96 Step2Result::Proceed(i, p, m) => (i, p, m),
97 _ => return Ok(()),
98 }
99 }
100 Step2Result::Buffer => {
101 // Repo is mid-resync. Serialize the commit and buffer it so it can
102 // be replayed after the resync fetch completes.
103 let cbor = serde_ipld_dagcbor::to_vec(&*commit)
104 .map_err(|e| crate::error::Error::Other(format!("commit cbor encode: {e}")))?;
105 let seq_u64 = seq as u64;
106 let db_buf = db.clone();
107 let did_buf = did.clone();
108 tokio::task::spawn_blocking(move || {
109 storage::resync_buffer::push_buffer(&db_buf, did_buf, seq_u64, &cbor)
110 })
111 .await??;
112 metrics::counter!("lightrail_commit_buffered_total").increment(1);
113 return Ok(());
114 }
115 };
116
117 // ── Steps 3–4: CAR parse + signature verification + field consistency ────
118 let (new_mst_root_bytes, parsed) = match validate_car(&commit, &resolved.pubkey).await {
119 Ok(v) => v,
120 Err(CarDrop::InvalidSignature) => {
121 let Some(fresh) =
122 validate::fresh_key_after_sig_failure(&did, resolver, &resolved, "commit").await
123 else {
124 metrics::counter!("lightrail_commit_dropped_total",
125 "reason" => "invalid_signature")
126 .increment(1);
127 return Ok(());
128 };
129 match validate_car(&commit, &fresh.pubkey).await {
130 Ok(v) => v,
131 Err(e) => {
132 metrics::counter!("lightrail_commit_dropped_total",
133 "reason" => e.label())
134 .increment(1);
135 debug!(did = %did, reason = %e,
136 "commit dropped: still invalid with fresh key");
137 return Ok(());
138 }
139 }
140 }
141 Err(e) => {
142 metrics::counter!("lightrail_commit_dropped_total",
143 "reason" => e.label())
144 .increment(1);
145 debug!(did = %did, reason = %e, "commit dropped");
146 return Ok(());
147 }
148 };
149
150 // Clone the parsed CAR before consuming its blocks into the MST block store.
151 // not super-cheap, but the Bytes values are refcounted at least so whatever
152 let parsed_clone = parsed.clone();
153
154 // ── Step 5: Inductive proof ───────────────────────────────────────────────
155 if pds_mode == Sync11Mode::Strict
156 && let Err(e) = validate_v1_1(&commit, &resolved.pubkey).await
157 {
158 metrics::counter!("lightrail_commit_dropped_total", "reason" => "proof_failed")
159 .increment(1);
160 debug!(did = %did, error = %e, "commit dropped: inductive proof failed");
161 return Ok(());
162 }
163
164 metrics::histogram!("lightrail_commit_ops").record(commit.ops.len() as f64);
165
166 // ── Collection birth/death detection ─────────────────────────────────────
167 let mortality = mst::mortality::extract(&commit.ops, parsed_clone)?;
168
169 // ── Steps 6–9: Blocking storage checks + repo_prev update ───────────────
170 let db = db.clone();
171 let rev = commit.rev.clone();
172 let since = commit.since.clone();
173 // Extract the raw CID bytes from the event's prevData field (sync1.1).
174 let incoming_prev_data = commit
175 .prev_data
176 .map(|cid| {
177 cid.to_ipld()
178 .map_err(|e| crate::error::Error::Other(format!("bad CID format: {e}")))
179 })
180 .transpose()?
181 .map(|ipld_cid| ipld_cid.to_bytes());
182
183 tokio::task::spawn_blocking(move || {
184 process_blocking(
185 &db,
186 ValidationState {
187 did,
188 info,
189 prev,
190 rev,
191 since,
192 incoming_prev_data,
193 new_mst_root_bytes,
194 mortality,
195 pds_host,
196 current_mode: pds_mode,
197 },
198 )
199 })
200 .await??;
201
202 Ok(())
203}
204
205// ---------------------------------------------------------------------------
206// CAR + signature validation (async, in-memory)
207// ---------------------------------------------------------------------------
208
209/// Parse the CAR, deserialize the commit, verify the signature, and check that
210/// the event fields are internally consistent with the commit object.
211///
212/// Returns:
213/// - The new MST root CID as raw bytes (stored as `repo_prev.prev_data`).
214/// - The typed MST root [`IpldCid`] (for loading the MST block store).
215/// - The [`ParsedCar`] (blocks for the MST block store).
216async fn validate_car(
217 commit: &Commit<'static>,
218 pubkey: &jacquard_common::types::crypto::PublicKey<'_>,
219) -> Result<(Vec<u8>, jacquard_repo::car::ParsedCar), CarDrop> {
220 // Parse the CAR slice embedded in the firehose event.
221 let parsed = jacquard_repo::car::parse_car_bytes(commit.blocks.as_ref())
222 .await
223 .map_err(|_| CarDrop::CarParse)?;
224
225 // Look up the commit block using the event's claimed commit CID.
226 let commit_cid = commit.commit.to_ipld().map_err(|_| CarDrop::MalformedCid)?;
227 let block = parsed
228 .blocks
229 .get(&commit_cid)
230 .ok_or(CarDrop::CommitBlockMissing)?;
231
232 // Deserialize, verify signature, and check DID + rev consistency.
233 let repo_commit = validate::deserialize_and_verify(
234 block.as_ref(),
235 pubkey,
236 commit.repo.clone(),
237 commit.rev.clone(),
238 )?;
239
240 // Step 4 (CID): verify that the commit's content-addressed CID matches
241 // the CID the event claimed (not just the one we used to find the block).
242 let actual_cid = repo_commit
243 .to_cid()
244 .map_err(|_| CarDrop::CommitDeserialize)?;
245 if actual_cid != commit_cid {
246 return Err(CarDrop::CidMismatch);
247 }
248
249 // The new MST root (= `repo_commit.data`) becomes the next commit's
250 // `prevData`. Return its bytes for storage alongside the typed CID (needed
251 // for block store construction) and the parsed CAR (the blocks themselves).
252 metrics::histogram!("lightrail_commit_car_bytes").record(commit.blocks.len() as f64);
253 let mst_root_cid = repo_commit.data;
254 Ok((mst_root_cid.to_bytes(), parsed))
255}
256
257// ---------------------------------------------------------------------------
258// Storage checks (blocking)
259// ---------------------------------------------------------------------------
260
261/// Outcome of the step-2 storage check.
262enum Step2Result {
263 /// All good — proceed with signature verification and the rest of the pipeline.
264 Proceed(RepoInfo, Option<RepoPrev>, Sync11Mode),
265 /// Drop this event (desynchronized state, stale rev, future rev, etc.).
266 Drop,
267 /// Repo is mid-resync. Caller should buffer the commit for replay.
268 Buffer,
269 /// Account is locally inactive. Caller may check upstream before dropping.
270 InactiveAccount(RepoInfo, Option<RepoPrev>),
271}
272
273/// Step 2: load the repo state and decide how to handle this commit.
274///
275/// - Unknown repo: creates a `Desynchronized` entry and enqueues a resync so
276/// we discover repos that appear on the firehose before backfill reaches them.
277/// - `Resyncing` repo: returns `Step2Result::Buffer` — the commit must be
278/// stored in the resync buffer and replayed after the fetch completes.
279/// - Inactive / desynchronized / stale / future rev: returns `Step2Result::Drop`.
280/// - All clear: returns `Step2Result::Proceed(info, prev)` so the caller can
281/// pass the pre-loaded repo state to `process_blocking`.
282fn check_step2_blocking(
283 db: &DbRef,
284 did: Did<'static>,
285 rev: &Tid,
286 pds_host: Option<Host>,
287 now: std::time::SystemTime,
288) -> crate::error::Result<Step2Result> {
289 let Some((info, prev)) = storage::repo::get(db, &did)? else {
290 // Unknown repo — create an entry and enqueue for initial fetch so that
291 // repos appearing on the firehose before backfill reaches them are not
292 // silently skipped.
293 let mut batch = db.database.batch();
294 storage::repo::put_info_into(
295 &mut batch,
296 db,
297 &did,
298 &RepoInfo {
299 state: RepoState::Desynchronized,
300 status: AccountStatus::Active,
301 error: None,
302 },
303 );
304 storage::resync_queue::enqueue_into(
305 &mut batch,
306 db,
307 now,
308 &crate::storage::resync_queue::ResyncItem {
309 did,
310 retry_count: 0,
311 retry_reason: "first firehose event for unknown repo".to_string(),
312 commit_cbor: vec![],
313 },
314 );
315 batch
316 .commit()
317 .map_err(Into::<crate::storage::StorageError>::into)?;
318 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed);
319 metrics::counter!("lightrail_commit_dropped_total", "reason" => "unknown_repo")
320 .increment(1);
321 return Ok(Step2Result::Drop);
322 };
323 if info.state == RepoState::Resyncing {
324 return Ok(Step2Result::Buffer);
325 }
326 // Separate inactive check so the caller can probe upstream before dropping.
327 if !info.status.is_active() {
328 return Ok(Step2Result::InactiveAccount(info, prev));
329 }
330 if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did, now) {
331 return Ok(Step2Result::Drop);
332 }
333 let mode = match pds_host {
334 Some(ref host) => pds_host::get(db, host)?.sync11_mode,
335 None => Sync11Mode::Lenient,
336 };
337 Ok(Step2Result::Proceed(info, prev, mode))
338}
339
340/// Write `AccountStatus::Active` to storage for `did`, then re-run the
341/// step-2 checks so the commit pipeline can continue as normal.
342fn reactivate_and_recheck(
343 db: &DbRef,
344 did: &Did<'static>,
345 rev: &Tid,
346 pds_host: Option<Host>,
347 now: std::time::SystemTime,
348) -> crate::error::Result<Step2Result> {
349 let Some((mut info, _)) = storage::repo::get(db, did)? else {
350 return Ok(Step2Result::Drop);
351 };
352 info.status = AccountStatus::Active;
353 let mut batch = db.database.batch();
354 storage::repo::put_info_into(&mut batch, db, did, &info);
355 batch
356 .commit()
357 .map_err(Into::<crate::storage::StorageError>::into)?;
358 // Re-run the full step-2 check; the updated Active status means
359 // InactiveAccount will not be returned again.
360 check_step2_blocking(db, did.clone(), rev, pds_host, now)
361}
362
363/// everything needed to ship off to the blocking step
364struct ValidationState {
365 did: Did<'static>,
366 info: RepoInfo,
367 prev: Option<RepoPrev>,
368 rev: Tid,
369 since: Option<Tid>,
370 incoming_prev_data: Option<Vec<u8>>,
371 new_mst_root_bytes: Vec<u8>,
372 mortality: ExtractResult,
373 pds_host: Option<Host>,
374 current_mode: Sync11Mode,
375}
376
377/// Perform the storage-backed validation steps (6–9) and, if all pass,
378/// persist the updated `repo_prev`.
379///
380/// `info` and `prev` are pre-loaded by [`check_step2_blocking`] (step 2).
381fn process_blocking(
382 db: &DbRef,
383 ValidationState {
384 did,
385 info,
386 prev,
387 rev,
388 since,
389 incoming_prev_data,
390 new_mst_root_bytes,
391 mortality,
392 pds_host,
393 current_mode,
394 }: ValidationState,
395) -> crate::error::Result<()> {
396 if let Some(prev) = &prev {
397 // Step 8: `since` must match repo_prev's rev (the previous rev in the chain).
398 if let Some(since) = &since
399 && since.compare_to(&prev.rev) != 0
400 {
401 metrics::counter!("lightrail_commit_desync_total",
402 "reason" => "since_mismatch")
403 .increment(1);
404 warn!(did = %did.as_str(),
405 commit_since = since.as_str(), prev_rev = prev.rev.as_str(),
406 "commit dropped: since/rev mismatch; queueing resync");
407 return enqueue_desync(db, did, info.status, "since/rev mismatch");
408 }
409
410 // Step 9: `prevData` must match the MST root stored from the last commit.
411 if let Some(incoming) = &incoming_prev_data
412 && incoming != &prev.prev_data
413 {
414 metrics::counter!("lightrail_commit_desync_total",
415 "reason" => "prev_data_mismatch")
416 .increment(1);
417 warn!(did = %did.as_str(),
418 "commit dropped: prevData mismatch; queueing resync");
419 return enqueue_desync(db, did, info.status, "prevData mismatch");
420 }
421 }
422
423 if !mortality.born.is_empty() {
424 let names = mortality.born.keys().collect::<Vec<_>>();
425 info!(did = %did, collections = ?names, "collection birth");
426 }
427 if !mortality.died.is_empty() {
428 let names = mortality.died.keys().collect::<Vec<_>>();
429 info!(did = %did, collections = ?names, "collection death");
430 }
431
432 // All checks passed — atomically update the prev_data and the collection
433 // index (born → insert, died → remove). Also record each born collection
434 // in the global collection list (blind overwrite, never deleted).
435 let n_born = mortality.born.len() as u64;
436 let n_died = mortality.died.len() as u64;
437 let mut batch = db.database.batch();
438 storage::repo::put_prev_into(
439 &mut batch,
440 db,
441 &did,
442 &RepoPrev {
443 rev,
444 prev_data: new_mst_root_bytes,
445 },
446 );
447 for coll in mortality.born.keys() {
448 // TODO(temporary): detect spurious births to confirm pre-sync1.1 hypothesis
449 if storage::collection_index::has_collection(db, &did, coll.clone())? {
450 if incoming_prev_data.is_some() {
451 error!(
452 did = %did,
453 collection = %coll,
454 sync11 = true,
455 "spurious birth, already indexed"
456 );
457 } else {
458 error!(
459 did = %did,
460 collection = %coll,
461 sync11 = false,
462 "spurious birth, already indexed"
463 );
464 }
465 }
466 storage::collection_index::insert_into(&mut batch, db, &did, coll.clone());
467 }
468 let mut all_deaths_proven = true;
469 for (coll, proven) in &mortality.died {
470 // TODO(remove): detect phandom deaths???
471 if !storage::collection_index::has_collection(db, &did, coll.clone())? {
472 if incoming_prev_data.is_some() {
473 error!(
474 did = %did,
475 collection = %coll,
476 proven,
477 sync11 = true,
478 "phantom death, collection not indexed"
479 );
480 } else {
481 error!(
482 did = %did,
483 collection = %coll,
484 proven,
485 sync11 = false,
486 "phantom death, collection not indexed"
487 );
488 }
489 }
490
491 if !proven {
492 all_deaths_proven = false;
493 continue;
494 }
495
496 storage::collection_index::remove_into(&mut batch, db, &did, coll.clone());
497 }
498
499 // If mortality detection found a possible-but-unproven collection death,
500 // queue a full-repo resync so the index can be reconciled once we have
501 // a complete view of the repo's current MST.
502 if !all_deaths_proven {
503 let maybes: Vec<_> = mortality
504 .died
505 .iter()
506 .filter(|(_, p)| !*p)
507 .map(|(k, _)| k)
508 .collect();
509 error!(did = %did, maybe_deaths = ?maybes, "queuing resync due to unprovable death");
510 storage::resync_queue::enqueue_into(
511 &mut batch,
512 db,
513 std::time::SystemTime::now(),
514 &crate::storage::resync_queue::ResyncItem {
515 did: did.clone(),
516 retry_count: 0,
517 retry_reason: "unprovable_death".to_string(),
518 commit_cbor: vec![],
519 },
520 );
521 }
522
523 // Upgrade PDS to strict on first prevData-bearing commit — atomically with
524 // the prev_data and collection index writes above.
525 if incoming_prev_data.is_some()
526 && current_mode == Sync11Mode::Lenient
527 && let Some(ref host) = pds_host
528 {
529 let mut pds_info = pds_host::get(db, host)?;
530 pds_info.sync11_mode = Sync11Mode::Strict;
531 pds_host::put_into(&mut batch, db, host, &pds_info);
532 metrics::counter!("lightrail_pds_mode_upgraded_total", "trigger" => "commit").increment(1);
533 info!(pds = %host, "PDS upgraded to strict sync1.1 mode");
534 }
535
536 batch
537 .commit()
538 .map_err(Into::<crate::storage::StorageError>::into)?;
539
540 if n_born > 0 {
541 db.stats
542 .collection_births_firehose
543 .fetch_add(n_born, Ordering::Relaxed);
544 metrics::counter!("lightrail_collection_births_total", "source" => "firehose")
545 .increment(n_born);
546 }
547 if n_died > 0 {
548 db.stats
549 .collection_deaths_firehose
550 .fetch_add(n_died, Ordering::Relaxed);
551 metrics::counter!("lightrail_collection_deaths_total", "source" => "firehose")
552 .increment(n_died);
553 }
554 meta::insert_str(&db.stats.sketch_accounts_all, did.as_str());
555 match current_mode {
556 Sync11Mode::Strict => {
557 meta::insert_str(&db.stats.sketch_accounts_commit_strict, did.as_str())
558 }
559 Sync11Mode::Lenient => {
560 meta::insert_str(&db.stats.sketch_accounts_commit_lenient, did.as_str())
561 }
562 }
563 metrics::counter!("lightrail_commits_indexed_total", "mode" => current_mode.as_str())
564 .increment(1);
565
566 Ok(())
567}
568
569/// Mark the repo as desynchronized and enqueue it for a full resync.
570fn enqueue_desync(
571 db: &DbRef,
572 did: Did<'static>,
573 existing_status: AccountStatus,
574 reason: &str,
575) -> crate::error::Result<()> {
576 let mut batch = db.database.batch();
577 storage::repo::put_info_into(
578 &mut batch,
579 db,
580 &did,
581 &RepoInfo {
582 state: RepoState::Desynchronized,
583 status: existing_status,
584 error: None,
585 },
586 );
587 storage::resync_queue::enqueue_into(
588 &mut batch,
589 db,
590 std::time::SystemTime::now(),
591 &crate::storage::resync_queue::ResyncItem {
592 did: did.clone(),
593 retry_count: 0,
594 retry_reason: reason.to_string(),
595 commit_cbor: vec![],
596 },
597 );
598 batch
599 .commit()
600 .map_err(Into::<crate::storage::StorageError>::into)?;
601 meta::insert_str(&db.stats.sketch_accounts_desynced, did.as_str());
602 Ok(())
603}