//! `#commit` firehose event processing — nine-step ATProto sync1.1 pipeline. //! //! ## Validation steps //! //! 1. Resolve the DID; drop if resolution fails. //! 2. Check account status; drop if not active. //! 3. Verify the commit signature; on failure, evict the cached identity and //! retry once with a fresh resolution (only if the key actually changed). //! 4. Verify that the event's `commit` CID, `repo` DID, and `rev` TID all //! match the deserialized commit object found in the CAR. //! 5. Inductive proof: inverting all ops on the new MST must yield the //! previous MST root (prevData). Each op's prev CID is validated in-place. //! 6. Drop if `rev` is not strictly newer than `repo_prev.rev`. //! 7. Drop if `rev` timestamp is implausibly far in the future. //! 8. If we have a `repo_prev`: drop and queue resync if `since` ≠ `prev.rev`. //! 9. If we have a `repo_prev` and `prevData` is present: drop and queue //! resync if the bytes don't match `prev.prev_data`. use jacquard_api::com_atproto::sync::subscribe_repos::Commit; use jacquard_common::types::{string::Did, tid::Tid}; use jacquard_repo::commit::firehose::validate_v1_1; use tracing::{debug, error, info, warn}; use super::validate::{self, CarDrop}; use crate::identity::Resolver; use crate::mst::{self, mortality::ExtractResult}; use std::sync::atomic::Ordering; use crate::storage::{ self, DbRef, meta, pds_host::{self, Sync11Mode}, repo::{AccountStatus, RepoInfo, RepoPrev, RepoState}, }; use jacquard_common::url::Host; // --------------------------------------------------------------------------- // Public entry point // --------------------------------------------------------------------------- pub(crate) async fn process_commit_event( commit: Box>, seq: i64, resolver: &Resolver, db: &DbRef, client: &crate::http::ThrottledClient, ) -> crate::error::Result<()> { let did = commit.repo.clone(); // ── Step 1: Resolve DID ────────────────────────────────────────────────── let Some(resolved) = validate::resolve(&did, resolver, "commit").await else { metrics::counter!("lightrail_commit_dropped_total", "reason" => "did_resolution_failed") .increment(1); return Ok(()); }; let pds_host: Option = resolved.pds.host().map(|h| h.to_owned()); // ── Step 2: Account status + desync state + PDS mode ──────────────────── let rev = commit.rev.clone(); let db2 = db.clone(); let did2 = did.clone(); let pds_host2 = pds_host.clone(); let now = std::time::SystemTime::now(); let step2 = tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev, pds_host2, now)) .await??; let (info, prev, pds_mode) = match step2 { Step2Result::Proceed(info, prev, mode) => (info, prev, mode), Step2Result::Drop => return Ok(()), Step2Result::InactiveAccount(info, _prev) => { // Our local record says inactive, but we may have missed a // reactivation #account event. Check upstream before dropping. if !validate::upstream_says_active(&pds_host, &did, client).await { metrics::counter!("lightrail_event_dropped_total", "event_type" => "commit", "reason" => "account_inactive") .increment(1); debug!(did = %did, status = info.status.as_str(), "commit dropped: account not active (confirmed upstream)"); return Ok(()); } // Upstream says active — persist the reactivation and re-run // the step-2 checks with the updated status. metrics::counter!("lightrail_account_reactivated_total", "trigger" => "commit") .increment(1); info!(did = %did, "account reactivated via upstream check (triggered by #commit)"); let db_ra = db.clone(); let did_ra = did.clone(); let rev_ra = commit.rev.clone(); let pds_host_ra = pds_host.clone(); let step2_retry = tokio::task::spawn_blocking(move || { reactivate_and_recheck(&db_ra, &did_ra, &rev_ra, pds_host_ra, now) }) .await??; match step2_retry { Step2Result::Proceed(i, p, m) => (i, p, m), _ => return Ok(()), } } Step2Result::Buffer => { // Repo is mid-resync. Serialize the commit and buffer it so it can // be replayed after the resync fetch completes. let cbor = serde_ipld_dagcbor::to_vec(&*commit) .map_err(|e| crate::error::Error::Other(format!("commit cbor encode: {e}")))?; let seq_u64 = seq as u64; let db_buf = db.clone(); let did_buf = did.clone(); tokio::task::spawn_blocking(move || { storage::resync_buffer::push_buffer(&db_buf, did_buf, seq_u64, &cbor) }) .await??; metrics::counter!("lightrail_commit_buffered_total").increment(1); return Ok(()); } }; // ── Steps 3–4: CAR parse + signature verification + field consistency ──── let (new_mst_root_bytes, parsed) = match validate_car(&commit, &resolved.pubkey).await { Ok(v) => v, Err(CarDrop::InvalidSignature) => { let Some(fresh) = validate::fresh_key_after_sig_failure(&did, resolver, &resolved, "commit").await else { metrics::counter!("lightrail_commit_dropped_total", "reason" => "invalid_signature") .increment(1); return Ok(()); }; match validate_car(&commit, &fresh.pubkey).await { Ok(v) => v, Err(e) => { metrics::counter!("lightrail_commit_dropped_total", "reason" => e.label()) .increment(1); debug!(did = %did, reason = %e, "commit dropped: still invalid with fresh key"); return Ok(()); } } } Err(e) => { metrics::counter!("lightrail_commit_dropped_total", "reason" => e.label()) .increment(1); debug!(did = %did, reason = %e, "commit dropped"); return Ok(()); } }; // Clone the parsed CAR before consuming its blocks into the MST block store. // not super-cheap, but the Bytes values are refcounted at least so whatever let parsed_clone = parsed.clone(); // ── Step 5: Inductive proof ─────────────────────────────────────────────── if pds_mode == Sync11Mode::Strict && let Err(e) = validate_v1_1(&commit, &resolved.pubkey).await { metrics::counter!("lightrail_commit_dropped_total", "reason" => "proof_failed") .increment(1); debug!(did = %did, error = %e, "commit dropped: inductive proof failed"); return Ok(()); } metrics::histogram!("lightrail_commit_ops").record(commit.ops.len() as f64); // ── Collection birth/death detection ───────────────────────────────────── let mortality = mst::mortality::extract(&commit.ops, parsed_clone)?; // ── Steps 6–9: Blocking storage checks + repo_prev update ─────────────── let db = db.clone(); let rev = commit.rev.clone(); let since = commit.since.clone(); // Extract the raw CID bytes from the event's prevData field (sync1.1). let incoming_prev_data = commit .prev_data .map(|cid| { cid.to_ipld() .map_err(|e| crate::error::Error::Other(format!("bad CID format: {e}"))) }) .transpose()? .map(|ipld_cid| ipld_cid.to_bytes()); tokio::task::spawn_blocking(move || { process_blocking( &db, ValidationState { did, info, prev, rev, since, incoming_prev_data, new_mst_root_bytes, mortality, pds_host, current_mode: pds_mode, }, ) }) .await??; Ok(()) } // --------------------------------------------------------------------------- // CAR + signature validation (async, in-memory) // --------------------------------------------------------------------------- /// Parse the CAR, deserialize the commit, verify the signature, and check that /// the event fields are internally consistent with the commit object. /// /// Returns: /// - The new MST root CID as raw bytes (stored as `repo_prev.prev_data`). /// - The typed MST root [`IpldCid`] (for loading the MST block store). /// - The [`ParsedCar`] (blocks for the MST block store). async fn validate_car( commit: &Commit<'static>, pubkey: &jacquard_common::types::crypto::PublicKey<'_>, ) -> Result<(Vec, jacquard_repo::car::ParsedCar), CarDrop> { // Parse the CAR slice embedded in the firehose event. let parsed = jacquard_repo::car::parse_car_bytes(commit.blocks.as_ref()) .await .map_err(|_| CarDrop::CarParse)?; // Look up the commit block using the event's claimed commit CID. let commit_cid = commit.commit.to_ipld().map_err(|_| CarDrop::MalformedCid)?; let block = parsed .blocks .get(&commit_cid) .ok_or(CarDrop::CommitBlockMissing)?; // Deserialize, verify signature, and check DID + rev consistency. let repo_commit = validate::deserialize_and_verify( block.as_ref(), pubkey, commit.repo.clone(), commit.rev.clone(), )?; // Step 4 (CID): verify that the commit's content-addressed CID matches // the CID the event claimed (not just the one we used to find the block). let actual_cid = repo_commit .to_cid() .map_err(|_| CarDrop::CommitDeserialize)?; if actual_cid != commit_cid { return Err(CarDrop::CidMismatch); } // The new MST root (= `repo_commit.data`) becomes the next commit's // `prevData`. Return its bytes for storage alongside the typed CID (needed // for block store construction) and the parsed CAR (the blocks themselves). metrics::histogram!("lightrail_commit_car_bytes").record(commit.blocks.len() as f64); let mst_root_cid = repo_commit.data; Ok((mst_root_cid.to_bytes(), parsed)) } // --------------------------------------------------------------------------- // Storage checks (blocking) // --------------------------------------------------------------------------- /// Outcome of the step-2 storage check. enum Step2Result { /// All good — proceed with signature verification and the rest of the pipeline. Proceed(RepoInfo, Option, Sync11Mode), /// Drop this event (desynchronized state, stale rev, future rev, etc.). Drop, /// Repo is mid-resync. Caller should buffer the commit for replay. Buffer, /// Account is locally inactive. Caller may check upstream before dropping. InactiveAccount(RepoInfo, Option), } /// Step 2: load the repo state and decide how to handle this commit. /// /// - Unknown repo: creates a `Desynchronized` entry and enqueues a resync so /// we discover repos that appear on the firehose before backfill reaches them. /// - `Resyncing` repo: returns `Step2Result::Buffer` — the commit must be /// stored in the resync buffer and replayed after the fetch completes. /// - Inactive / desynchronized / stale / future rev: returns `Step2Result::Drop`. /// - All clear: returns `Step2Result::Proceed(info, prev)` so the caller can /// pass the pre-loaded repo state to `process_blocking`. fn check_step2_blocking( db: &DbRef, did: Did<'static>, rev: &Tid, pds_host: Option, now: std::time::SystemTime, ) -> crate::error::Result { let Some((info, prev)) = storage::repo::get(db, &did)? else { // Unknown repo — create an entry and enqueue for initial fetch so that // repos appearing on the firehose before backfill reaches them are not // silently skipped. let mut batch = db.database.batch(); storage::repo::put_info_into( &mut batch, db, &did, &RepoInfo { state: RepoState::Desynchronized, status: AccountStatus::Active, error: None, }, ); storage::resync_queue::enqueue_into( &mut batch, db, now, &crate::storage::resync_queue::ResyncItem { did, retry_count: 0, retry_reason: "first firehose event for unknown repo".to_string(), commit_cbor: vec![], }, ); batch .commit() .map_err(Into::::into)?; db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); metrics::counter!("lightrail_commit_dropped_total", "reason" => "unknown_repo") .increment(1); return Ok(Step2Result::Drop); }; if info.state == RepoState::Resyncing { return Ok(Step2Result::Buffer); } // Separate inactive check so the caller can probe upstream before dropping. if !info.status.is_active() { return Ok(Step2Result::InactiveAccount(info, prev)); } if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did, now) { return Ok(Step2Result::Drop); } let mode = match pds_host { Some(ref host) => pds_host::get(db, host)?.sync11_mode, None => Sync11Mode::Lenient, }; Ok(Step2Result::Proceed(info, prev, mode)) } /// Write `AccountStatus::Active` to storage for `did`, then re-run the /// step-2 checks so the commit pipeline can continue as normal. fn reactivate_and_recheck( db: &DbRef, did: &Did<'static>, rev: &Tid, pds_host: Option, now: std::time::SystemTime, ) -> crate::error::Result { let Some((mut info, _)) = storage::repo::get(db, did)? else { return Ok(Step2Result::Drop); }; info.status = AccountStatus::Active; let mut batch = db.database.batch(); storage::repo::put_info_into(&mut batch, db, did, &info); batch .commit() .map_err(Into::::into)?; // Re-run the full step-2 check; the updated Active status means // InactiveAccount will not be returned again. check_step2_blocking(db, did.clone(), rev, pds_host, now) } /// everything needed to ship off to the blocking step struct ValidationState { did: Did<'static>, info: RepoInfo, prev: Option, rev: Tid, since: Option, incoming_prev_data: Option>, new_mst_root_bytes: Vec, mortality: ExtractResult, pds_host: Option, current_mode: Sync11Mode, } /// Perform the storage-backed validation steps (6–9) and, if all pass, /// persist the updated `repo_prev`. /// /// `info` and `prev` are pre-loaded by [`check_step2_blocking`] (step 2). fn process_blocking( db: &DbRef, ValidationState { did, info, prev, rev, since, incoming_prev_data, new_mst_root_bytes, mortality, pds_host, current_mode, }: ValidationState, ) -> crate::error::Result<()> { if let Some(prev) = &prev { // Step 8: `since` must match repo_prev's rev (the previous rev in the chain). if let Some(since) = &since && since.compare_to(&prev.rev) != 0 { metrics::counter!("lightrail_commit_desync_total", "reason" => "since_mismatch") .increment(1); warn!(did = %did.as_str(), commit_since = since.as_str(), prev_rev = prev.rev.as_str(), "commit dropped: since/rev mismatch; queueing resync"); return enqueue_desync(db, did, info.status, "since/rev mismatch"); } // Step 9: `prevData` must match the MST root stored from the last commit. if let Some(incoming) = &incoming_prev_data && incoming != &prev.prev_data { metrics::counter!("lightrail_commit_desync_total", "reason" => "prev_data_mismatch") .increment(1); warn!(did = %did.as_str(), "commit dropped: prevData mismatch; queueing resync"); return enqueue_desync(db, did, info.status, "prevData mismatch"); } } if !mortality.born.is_empty() { let names = mortality.born.keys().collect::>(); info!(did = %did, collections = ?names, "collection birth"); } if !mortality.died.is_empty() { let names = mortality.died.keys().collect::>(); info!(did = %did, collections = ?names, "collection death"); } // All checks passed — atomically update the prev_data and the collection // index (born → insert, died → remove). Also record each born collection // in the global collection list (blind overwrite, never deleted). let n_born = mortality.born.len() as u64; let n_died = mortality.died.len() as u64; let mut batch = db.database.batch(); storage::repo::put_prev_into( &mut batch, db, &did, &RepoPrev { rev, prev_data: new_mst_root_bytes, }, ); for coll in mortality.born.keys() { // TODO(temporary): detect spurious births to confirm pre-sync1.1 hypothesis if storage::collection_index::has_collection(db, &did, coll.clone())? { if incoming_prev_data.is_some() { error!( did = %did, collection = %coll, sync11 = true, "spurious birth, already indexed" ); } else { error!( did = %did, collection = %coll, sync11 = false, "spurious birth, already indexed" ); } } storage::collection_index::insert_into(&mut batch, db, &did, coll.clone()); } let mut all_deaths_proven = true; for (coll, proven) in &mortality.died { // TODO(remove): detect phandom deaths??? if !storage::collection_index::has_collection(db, &did, coll.clone())? { if incoming_prev_data.is_some() { error!( did = %did, collection = %coll, proven, sync11 = true, "phantom death, collection not indexed" ); } else { error!( did = %did, collection = %coll, proven, sync11 = false, "phantom death, collection not indexed" ); } } if !proven { all_deaths_proven = false; continue; } storage::collection_index::remove_into(&mut batch, db, &did, coll.clone()); } // If mortality detection found a possible-but-unproven collection death, // queue a full-repo resync so the index can be reconciled once we have // a complete view of the repo's current MST. if !all_deaths_proven { let maybes: Vec<_> = mortality .died .iter() .filter(|(_, p)| !*p) .map(|(k, _)| k) .collect(); error!(did = %did, maybe_deaths = ?maybes, "queuing resync due to unprovable death"); storage::resync_queue::enqueue_into( &mut batch, db, std::time::SystemTime::now(), &crate::storage::resync_queue::ResyncItem { did: did.clone(), retry_count: 0, retry_reason: "unprovable_death".to_string(), commit_cbor: vec![], }, ); } // Upgrade PDS to strict on first prevData-bearing commit — atomically with // the prev_data and collection index writes above. if incoming_prev_data.is_some() && current_mode == Sync11Mode::Lenient && let Some(ref host) = pds_host { let mut pds_info = pds_host::get(db, host)?; pds_info.sync11_mode = Sync11Mode::Strict; pds_host::put_into(&mut batch, db, host, &pds_info); metrics::counter!("lightrail_pds_mode_upgraded_total", "trigger" => "commit").increment(1); info!(pds = %host, "PDS upgraded to strict sync1.1 mode"); } batch .commit() .map_err(Into::::into)?; if n_born > 0 { db.stats .collection_births_firehose .fetch_add(n_born, Ordering::Relaxed); metrics::counter!("lightrail_collection_births_total", "source" => "firehose") .increment(n_born); } if n_died > 0 { db.stats .collection_deaths_firehose .fetch_add(n_died, Ordering::Relaxed); metrics::counter!("lightrail_collection_deaths_total", "source" => "firehose") .increment(n_died); } meta::insert_str(&db.stats.sketch_accounts_all, did.as_str()); match current_mode { Sync11Mode::Strict => { meta::insert_str(&db.stats.sketch_accounts_commit_strict, did.as_str()) } Sync11Mode::Lenient => { meta::insert_str(&db.stats.sketch_accounts_commit_lenient, did.as_str()) } } metrics::counter!("lightrail_commits_indexed_total", "mode" => current_mode.as_str()) .increment(1); Ok(()) } /// Mark the repo as desynchronized and enqueue it for a full resync. fn enqueue_desync( db: &DbRef, did: Did<'static>, existing_status: AccountStatus, reason: &str, ) -> crate::error::Result<()> { let mut batch = db.database.batch(); storage::repo::put_info_into( &mut batch, db, &did, &RepoInfo { state: RepoState::Desynchronized, status: existing_status, error: None, }, ); storage::resync_queue::enqueue_into( &mut batch, db, std::time::SystemTime::now(), &crate::storage::resync_queue::ResyncItem { did: did.clone(), retry_count: 0, retry_reason: reason.to_string(), commit_cbor: vec![], }, ); batch .commit() .map_err(Into::::into)?; meta::insert_str(&db.stats.sketch_accounts_desynced, did.as_str()); Ok(()) }