lightweight com.atproto.sync.listReposByCollection
at main 603 lines 23 kB view raw
1//! `#commit` firehose event processing — nine-step ATProto sync1.1 pipeline. 2//! 3//! ## Validation steps 4//! 5//! 1. Resolve the DID; drop if resolution fails. 6//! 2. Check account status; drop if not active. 7//! 3. Verify the commit signature; on failure, evict the cached identity and 8//! retry once with a fresh resolution (only if the key actually changed). 9//! 4. Verify that the event's `commit` CID, `repo` DID, and `rev` TID all 10//! match the deserialized commit object found in the CAR. 11//! 5. Inductive proof: inverting all ops on the new MST must yield the 12//! previous MST root (prevData). Each op's prev CID is validated in-place. 13//! 6. Drop if `rev` is not strictly newer than `repo_prev.rev`. 14//! 7. Drop if `rev` timestamp is implausibly far in the future. 15//! 8. If we have a `repo_prev`: drop and queue resync if `since` ≠ `prev.rev`. 16//! 9. If we have a `repo_prev` and `prevData` is present: drop and queue 17//! resync if the bytes don't match `prev.prev_data`. 18 19use jacquard_api::com_atproto::sync::subscribe_repos::Commit; 20use jacquard_common::types::{string::Did, tid::Tid}; 21use jacquard_repo::commit::firehose::validate_v1_1; 22use tracing::{debug, error, info, warn}; 23 24use super::validate::{self, CarDrop}; 25use crate::identity::Resolver; 26use crate::mst::{self, mortality::ExtractResult}; 27use std::sync::atomic::Ordering; 28 29use crate::storage::{ 30 self, DbRef, meta, 31 pds_host::{self, Sync11Mode}, 32 repo::{AccountStatus, RepoInfo, RepoPrev, RepoState}, 33}; 34use jacquard_common::url::Host; 35 36// --------------------------------------------------------------------------- 37// Public entry point 38// --------------------------------------------------------------------------- 39 40pub(crate) async fn process_commit_event( 41 commit: Box<Commit<'static>>, 42 seq: i64, 43 resolver: &Resolver, 44 db: &DbRef, 45 client: &crate::http::ThrottledClient, 46) -> crate::error::Result<()> { 47 let did = commit.repo.clone(); 48 49 // ── Step 1: Resolve DID ────────────────────────────────────────────────── 50 let Some(resolved) = validate::resolve(&did, resolver, "commit").await else { 51 metrics::counter!("lightrail_commit_dropped_total", "reason" => "did_resolution_failed") 52 .increment(1); 53 return Ok(()); 54 }; 55 56 let pds_host: Option<Host> = resolved.pds.host().map(|h| h.to_owned()); 57 58 // ── Step 2: Account status + desync state + PDS mode ──────────────────── 59 let rev = commit.rev.clone(); 60 let db2 = db.clone(); 61 let did2 = did.clone(); 62 let pds_host2 = pds_host.clone(); 63 let now = std::time::SystemTime::now(); 64 let step2 = 65 tokio::task::spawn_blocking(move || check_step2_blocking(&db2, did2, &rev, pds_host2, now)) 66 .await??; 67 let (info, prev, pds_mode) = match step2 { 68 Step2Result::Proceed(info, prev, mode) => (info, prev, mode), 69 Step2Result::Drop => return Ok(()), 70 Step2Result::InactiveAccount(info, _prev) => { 71 // Our local record says inactive, but we may have missed a 72 // reactivation #account event. Check upstream before dropping. 73 if !validate::upstream_says_active(&pds_host, &did, client).await { 74 metrics::counter!("lightrail_event_dropped_total", 75 "event_type" => "commit", "reason" => "account_inactive") 76 .increment(1); 77 debug!(did = %did, status = info.status.as_str(), 78 "commit dropped: account not active (confirmed upstream)"); 79 return Ok(()); 80 } 81 // Upstream says active — persist the reactivation and re-run 82 // the step-2 checks with the updated status. 83 metrics::counter!("lightrail_account_reactivated_total", 84 "trigger" => "commit") 85 .increment(1); 86 info!(did = %did, "account reactivated via upstream check (triggered by #commit)"); 87 let db_ra = db.clone(); 88 let did_ra = did.clone(); 89 let rev_ra = commit.rev.clone(); 90 let pds_host_ra = pds_host.clone(); 91 let step2_retry = tokio::task::spawn_blocking(move || { 92 reactivate_and_recheck(&db_ra, &did_ra, &rev_ra, pds_host_ra, now) 93 }) 94 .await??; 95 match step2_retry { 96 Step2Result::Proceed(i, p, m) => (i, p, m), 97 _ => return Ok(()), 98 } 99 } 100 Step2Result::Buffer => { 101 // Repo is mid-resync. Serialize the commit and buffer it so it can 102 // be replayed after the resync fetch completes. 103 let cbor = serde_ipld_dagcbor::to_vec(&*commit) 104 .map_err(|e| crate::error::Error::Other(format!("commit cbor encode: {e}")))?; 105 let seq_u64 = seq as u64; 106 let db_buf = db.clone(); 107 let did_buf = did.clone(); 108 tokio::task::spawn_blocking(move || { 109 storage::resync_buffer::push_buffer(&db_buf, did_buf, seq_u64, &cbor) 110 }) 111 .await??; 112 metrics::counter!("lightrail_commit_buffered_total").increment(1); 113 return Ok(()); 114 } 115 }; 116 117 // ── Steps 3–4: CAR parse + signature verification + field consistency ──── 118 let (new_mst_root_bytes, parsed) = match validate_car(&commit, &resolved.pubkey).await { 119 Ok(v) => v, 120 Err(CarDrop::InvalidSignature) => { 121 let Some(fresh) = 122 validate::fresh_key_after_sig_failure(&did, resolver, &resolved, "commit").await 123 else { 124 metrics::counter!("lightrail_commit_dropped_total", 125 "reason" => "invalid_signature") 126 .increment(1); 127 return Ok(()); 128 }; 129 match validate_car(&commit, &fresh.pubkey).await { 130 Ok(v) => v, 131 Err(e) => { 132 metrics::counter!("lightrail_commit_dropped_total", 133 "reason" => e.label()) 134 .increment(1); 135 debug!(did = %did, reason = %e, 136 "commit dropped: still invalid with fresh key"); 137 return Ok(()); 138 } 139 } 140 } 141 Err(e) => { 142 metrics::counter!("lightrail_commit_dropped_total", 143 "reason" => e.label()) 144 .increment(1); 145 debug!(did = %did, reason = %e, "commit dropped"); 146 return Ok(()); 147 } 148 }; 149 150 // Clone the parsed CAR before consuming its blocks into the MST block store. 151 // not super-cheap, but the Bytes values are refcounted at least so whatever 152 let parsed_clone = parsed.clone(); 153 154 // ── Step 5: Inductive proof ─────────────────────────────────────────────── 155 if pds_mode == Sync11Mode::Strict 156 && let Err(e) = validate_v1_1(&commit, &resolved.pubkey).await 157 { 158 metrics::counter!("lightrail_commit_dropped_total", "reason" => "proof_failed") 159 .increment(1); 160 debug!(did = %did, error = %e, "commit dropped: inductive proof failed"); 161 return Ok(()); 162 } 163 164 metrics::histogram!("lightrail_commit_ops").record(commit.ops.len() as f64); 165 166 // ── Collection birth/death detection ───────────────────────────────────── 167 let mortality = mst::mortality::extract(&commit.ops, parsed_clone)?; 168 169 // ── Steps 6–9: Blocking storage checks + repo_prev update ─────────────── 170 let db = db.clone(); 171 let rev = commit.rev.clone(); 172 let since = commit.since.clone(); 173 // Extract the raw CID bytes from the event's prevData field (sync1.1). 174 let incoming_prev_data = commit 175 .prev_data 176 .map(|cid| { 177 cid.to_ipld() 178 .map_err(|e| crate::error::Error::Other(format!("bad CID format: {e}"))) 179 }) 180 .transpose()? 181 .map(|ipld_cid| ipld_cid.to_bytes()); 182 183 tokio::task::spawn_blocking(move || { 184 process_blocking( 185 &db, 186 ValidationState { 187 did, 188 info, 189 prev, 190 rev, 191 since, 192 incoming_prev_data, 193 new_mst_root_bytes, 194 mortality, 195 pds_host, 196 current_mode: pds_mode, 197 }, 198 ) 199 }) 200 .await??; 201 202 Ok(()) 203} 204 205// --------------------------------------------------------------------------- 206// CAR + signature validation (async, in-memory) 207// --------------------------------------------------------------------------- 208 209/// Parse the CAR, deserialize the commit, verify the signature, and check that 210/// the event fields are internally consistent with the commit object. 211/// 212/// Returns: 213/// - The new MST root CID as raw bytes (stored as `repo_prev.prev_data`). 214/// - The typed MST root [`IpldCid`] (for loading the MST block store). 215/// - The [`ParsedCar`] (blocks for the MST block store). 216async fn validate_car( 217 commit: &Commit<'static>, 218 pubkey: &jacquard_common::types::crypto::PublicKey<'_>, 219) -> Result<(Vec<u8>, jacquard_repo::car::ParsedCar), CarDrop> { 220 // Parse the CAR slice embedded in the firehose event. 221 let parsed = jacquard_repo::car::parse_car_bytes(commit.blocks.as_ref()) 222 .await 223 .map_err(|_| CarDrop::CarParse)?; 224 225 // Look up the commit block using the event's claimed commit CID. 226 let commit_cid = commit.commit.to_ipld().map_err(|_| CarDrop::MalformedCid)?; 227 let block = parsed 228 .blocks 229 .get(&commit_cid) 230 .ok_or(CarDrop::CommitBlockMissing)?; 231 232 // Deserialize, verify signature, and check DID + rev consistency. 233 let repo_commit = validate::deserialize_and_verify( 234 block.as_ref(), 235 pubkey, 236 commit.repo.clone(), 237 commit.rev.clone(), 238 )?; 239 240 // Step 4 (CID): verify that the commit's content-addressed CID matches 241 // the CID the event claimed (not just the one we used to find the block). 242 let actual_cid = repo_commit 243 .to_cid() 244 .map_err(|_| CarDrop::CommitDeserialize)?; 245 if actual_cid != commit_cid { 246 return Err(CarDrop::CidMismatch); 247 } 248 249 // The new MST root (= `repo_commit.data`) becomes the next commit's 250 // `prevData`. Return its bytes for storage alongside the typed CID (needed 251 // for block store construction) and the parsed CAR (the blocks themselves). 252 metrics::histogram!("lightrail_commit_car_bytes").record(commit.blocks.len() as f64); 253 let mst_root_cid = repo_commit.data; 254 Ok((mst_root_cid.to_bytes(), parsed)) 255} 256 257// --------------------------------------------------------------------------- 258// Storage checks (blocking) 259// --------------------------------------------------------------------------- 260 261/// Outcome of the step-2 storage check. 262enum Step2Result { 263 /// All good — proceed with signature verification and the rest of the pipeline. 264 Proceed(RepoInfo, Option<RepoPrev>, Sync11Mode), 265 /// Drop this event (desynchronized state, stale rev, future rev, etc.). 266 Drop, 267 /// Repo is mid-resync. Caller should buffer the commit for replay. 268 Buffer, 269 /// Account is locally inactive. Caller may check upstream before dropping. 270 InactiveAccount(RepoInfo, Option<RepoPrev>), 271} 272 273/// Step 2: load the repo state and decide how to handle this commit. 274/// 275/// - Unknown repo: creates a `Desynchronized` entry and enqueues a resync so 276/// we discover repos that appear on the firehose before backfill reaches them. 277/// - `Resyncing` repo: returns `Step2Result::Buffer` — the commit must be 278/// stored in the resync buffer and replayed after the fetch completes. 279/// - Inactive / desynchronized / stale / future rev: returns `Step2Result::Drop`. 280/// - All clear: returns `Step2Result::Proceed(info, prev)` so the caller can 281/// pass the pre-loaded repo state to `process_blocking`. 282fn check_step2_blocking( 283 db: &DbRef, 284 did: Did<'static>, 285 rev: &Tid, 286 pds_host: Option<Host>, 287 now: std::time::SystemTime, 288) -> crate::error::Result<Step2Result> { 289 let Some((info, prev)) = storage::repo::get(db, &did)? else { 290 // Unknown repo — create an entry and enqueue for initial fetch so that 291 // repos appearing on the firehose before backfill reaches them are not 292 // silently skipped. 293 let mut batch = db.database.batch(); 294 storage::repo::put_info_into( 295 &mut batch, 296 db, 297 &did, 298 &RepoInfo { 299 state: RepoState::Desynchronized, 300 status: AccountStatus::Active, 301 error: None, 302 }, 303 ); 304 storage::resync_queue::enqueue_into( 305 &mut batch, 306 db, 307 now, 308 &crate::storage::resync_queue::ResyncItem { 309 did, 310 retry_count: 0, 311 retry_reason: "first firehose event for unknown repo".to_string(), 312 commit_cbor: vec![], 313 }, 314 ); 315 batch 316 .commit() 317 .map_err(Into::<crate::storage::StorageError>::into)?; 318 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); 319 metrics::counter!("lightrail_commit_dropped_total", "reason" => "unknown_repo") 320 .increment(1); 321 return Ok(Step2Result::Drop); 322 }; 323 if info.state == RepoState::Resyncing { 324 return Ok(Step2Result::Buffer); 325 } 326 // Separate inactive check so the caller can probe upstream before dropping. 327 if !info.status.is_active() { 328 return Ok(Step2Result::InactiveAccount(info, prev)); 329 } 330 if validate::should_drop(&info, prev.as_ref(), rev, "commit", &did, now) { 331 return Ok(Step2Result::Drop); 332 } 333 let mode = match pds_host { 334 Some(ref host) => pds_host::get(db, host)?.sync11_mode, 335 None => Sync11Mode::Lenient, 336 }; 337 Ok(Step2Result::Proceed(info, prev, mode)) 338} 339 340/// Write `AccountStatus::Active` to storage for `did`, then re-run the 341/// step-2 checks so the commit pipeline can continue as normal. 342fn reactivate_and_recheck( 343 db: &DbRef, 344 did: &Did<'static>, 345 rev: &Tid, 346 pds_host: Option<Host>, 347 now: std::time::SystemTime, 348) -> crate::error::Result<Step2Result> { 349 let Some((mut info, _)) = storage::repo::get(db, did)? else { 350 return Ok(Step2Result::Drop); 351 }; 352 info.status = AccountStatus::Active; 353 let mut batch = db.database.batch(); 354 storage::repo::put_info_into(&mut batch, db, did, &info); 355 batch 356 .commit() 357 .map_err(Into::<crate::storage::StorageError>::into)?; 358 // Re-run the full step-2 check; the updated Active status means 359 // InactiveAccount will not be returned again. 360 check_step2_blocking(db, did.clone(), rev, pds_host, now) 361} 362 363/// everything needed to ship off to the blocking step 364struct ValidationState { 365 did: Did<'static>, 366 info: RepoInfo, 367 prev: Option<RepoPrev>, 368 rev: Tid, 369 since: Option<Tid>, 370 incoming_prev_data: Option<Vec<u8>>, 371 new_mst_root_bytes: Vec<u8>, 372 mortality: ExtractResult, 373 pds_host: Option<Host>, 374 current_mode: Sync11Mode, 375} 376 377/// Perform the storage-backed validation steps (6–9) and, if all pass, 378/// persist the updated `repo_prev`. 379/// 380/// `info` and `prev` are pre-loaded by [`check_step2_blocking`] (step 2). 381fn process_blocking( 382 db: &DbRef, 383 ValidationState { 384 did, 385 info, 386 prev, 387 rev, 388 since, 389 incoming_prev_data, 390 new_mst_root_bytes, 391 mortality, 392 pds_host, 393 current_mode, 394 }: ValidationState, 395) -> crate::error::Result<()> { 396 if let Some(prev) = &prev { 397 // Step 8: `since` must match repo_prev's rev (the previous rev in the chain). 398 if let Some(since) = &since 399 && since.compare_to(&prev.rev) != 0 400 { 401 metrics::counter!("lightrail_commit_desync_total", 402 "reason" => "since_mismatch") 403 .increment(1); 404 warn!(did = %did.as_str(), 405 commit_since = since.as_str(), prev_rev = prev.rev.as_str(), 406 "commit dropped: since/rev mismatch; queueing resync"); 407 return enqueue_desync(db, did, info.status, "since/rev mismatch"); 408 } 409 410 // Step 9: `prevData` must match the MST root stored from the last commit. 411 if let Some(incoming) = &incoming_prev_data 412 && incoming != &prev.prev_data 413 { 414 metrics::counter!("lightrail_commit_desync_total", 415 "reason" => "prev_data_mismatch") 416 .increment(1); 417 warn!(did = %did.as_str(), 418 "commit dropped: prevData mismatch; queueing resync"); 419 return enqueue_desync(db, did, info.status, "prevData mismatch"); 420 } 421 } 422 423 if !mortality.born.is_empty() { 424 let names = mortality.born.keys().collect::<Vec<_>>(); 425 info!(did = %did, collections = ?names, "collection birth"); 426 } 427 if !mortality.died.is_empty() { 428 let names = mortality.died.keys().collect::<Vec<_>>(); 429 info!(did = %did, collections = ?names, "collection death"); 430 } 431 432 // All checks passed — atomically update the prev_data and the collection 433 // index (born → insert, died → remove). Also record each born collection 434 // in the global collection list (blind overwrite, never deleted). 435 let n_born = mortality.born.len() as u64; 436 let n_died = mortality.died.len() as u64; 437 let mut batch = db.database.batch(); 438 storage::repo::put_prev_into( 439 &mut batch, 440 db, 441 &did, 442 &RepoPrev { 443 rev, 444 prev_data: new_mst_root_bytes, 445 }, 446 ); 447 for coll in mortality.born.keys() { 448 // TODO(temporary): detect spurious births to confirm pre-sync1.1 hypothesis 449 if storage::collection_index::has_collection(db, &did, coll.clone())? { 450 if incoming_prev_data.is_some() { 451 error!( 452 did = %did, 453 collection = %coll, 454 sync11 = true, 455 "spurious birth, already indexed" 456 ); 457 } else { 458 error!( 459 did = %did, 460 collection = %coll, 461 sync11 = false, 462 "spurious birth, already indexed" 463 ); 464 } 465 } 466 storage::collection_index::insert_into(&mut batch, db, &did, coll.clone()); 467 } 468 let mut all_deaths_proven = true; 469 for (coll, proven) in &mortality.died { 470 // TODO(remove): detect phandom deaths??? 471 if !storage::collection_index::has_collection(db, &did, coll.clone())? { 472 if incoming_prev_data.is_some() { 473 error!( 474 did = %did, 475 collection = %coll, 476 proven, 477 sync11 = true, 478 "phantom death, collection not indexed" 479 ); 480 } else { 481 error!( 482 did = %did, 483 collection = %coll, 484 proven, 485 sync11 = false, 486 "phantom death, collection not indexed" 487 ); 488 } 489 } 490 491 if !proven { 492 all_deaths_proven = false; 493 continue; 494 } 495 496 storage::collection_index::remove_into(&mut batch, db, &did, coll.clone()); 497 } 498 499 // If mortality detection found a possible-but-unproven collection death, 500 // queue a full-repo resync so the index can be reconciled once we have 501 // a complete view of the repo's current MST. 502 if !all_deaths_proven { 503 let maybes: Vec<_> = mortality 504 .died 505 .iter() 506 .filter(|(_, p)| !*p) 507 .map(|(k, _)| k) 508 .collect(); 509 error!(did = %did, maybe_deaths = ?maybes, "queuing resync due to unprovable death"); 510 storage::resync_queue::enqueue_into( 511 &mut batch, 512 db, 513 std::time::SystemTime::now(), 514 &crate::storage::resync_queue::ResyncItem { 515 did: did.clone(), 516 retry_count: 0, 517 retry_reason: "unprovable_death".to_string(), 518 commit_cbor: vec![], 519 }, 520 ); 521 } 522 523 // Upgrade PDS to strict on first prevData-bearing commit — atomically with 524 // the prev_data and collection index writes above. 525 if incoming_prev_data.is_some() 526 && current_mode == Sync11Mode::Lenient 527 && let Some(ref host) = pds_host 528 { 529 let mut pds_info = pds_host::get(db, host)?; 530 pds_info.sync11_mode = Sync11Mode::Strict; 531 pds_host::put_into(&mut batch, db, host, &pds_info); 532 metrics::counter!("lightrail_pds_mode_upgraded_total", "trigger" => "commit").increment(1); 533 info!(pds = %host, "PDS upgraded to strict sync1.1 mode"); 534 } 535 536 batch 537 .commit() 538 .map_err(Into::<crate::storage::StorageError>::into)?; 539 540 if n_born > 0 { 541 db.stats 542 .collection_births_firehose 543 .fetch_add(n_born, Ordering::Relaxed); 544 metrics::counter!("lightrail_collection_births_total", "source" => "firehose") 545 .increment(n_born); 546 } 547 if n_died > 0 { 548 db.stats 549 .collection_deaths_firehose 550 .fetch_add(n_died, Ordering::Relaxed); 551 metrics::counter!("lightrail_collection_deaths_total", "source" => "firehose") 552 .increment(n_died); 553 } 554 meta::insert_str(&db.stats.sketch_accounts_all, did.as_str()); 555 match current_mode { 556 Sync11Mode::Strict => { 557 meta::insert_str(&db.stats.sketch_accounts_commit_strict, did.as_str()) 558 } 559 Sync11Mode::Lenient => { 560 meta::insert_str(&db.stats.sketch_accounts_commit_lenient, did.as_str()) 561 } 562 } 563 metrics::counter!("lightrail_commits_indexed_total", "mode" => current_mode.as_str()) 564 .increment(1); 565 566 Ok(()) 567} 568 569/// Mark the repo as desynchronized and enqueue it for a full resync. 570fn enqueue_desync( 571 db: &DbRef, 572 did: Did<'static>, 573 existing_status: AccountStatus, 574 reason: &str, 575) -> crate::error::Result<()> { 576 let mut batch = db.database.batch(); 577 storage::repo::put_info_into( 578 &mut batch, 579 db, 580 &did, 581 &RepoInfo { 582 state: RepoState::Desynchronized, 583 status: existing_status, 584 error: None, 585 }, 586 ); 587 storage::resync_queue::enqueue_into( 588 &mut batch, 589 db, 590 std::time::SystemTime::now(), 591 &crate::storage::resync_queue::ResyncItem { 592 did: did.clone(), 593 retry_count: 0, 594 retry_reason: reason.to_string(), 595 commit_cbor: vec![], 596 }, 597 ); 598 batch 599 .commit() 600 .map_err(Into::<crate::storage::StorageError>::into)?; 601 meta::insert_str(&db.stats.sketch_accounts_desynced, did.as_str()); 602 Ok(()) 603}