//! Establish ground truth about a repository's collections. //! //! "Resync" covers three scenarios: //! //! - **Backfill**: first-time indexing of a repo found via `listRepos`; diff //! against an empty set. //! - **`#sync` event**: the PDS declared a repo discontinuity; diff against //! the current index entries. //! - **Detected discontinuity**: a firehose commit's `prev` CID doesn't match //! our record; diff against the current index entries. //! //! Each submodule implements one approach to fetching the ground-truth //! collection list as `fetch_collections(client, base, did: Did<'_>) //! -> std::result::Result`. //! This uniform signature makes it straightforward to test approaches in //! isolation and to add new ones later. pub mod describe_repo; pub mod dispatcher; pub mod get_repo; use std::collections::BTreeSet; use std::sync::Arc; use std::sync::atomic::Ordering; use std::time::Duration; use cid::Cid as RawCid; use jacquard_common::{ IntoStatic, http_client::{HttpClient, HttpClientExt}, types::string::{Did, Nsid, Tid}, }; use tracing::{error, info}; use crate::storage::DbRef; use crate::storage::repo::{AccountStatus, RepoInfo, RepoPrev, RepoState}; use crate::util::TokenExt; pub use dispatcher::DispatcherConfig; type Result = std::result::Result; /// Errors that can occur during a resync operation. #[derive(Debug, thiserror::Error)] pub enum ResyncError { #[error("identity resolution failed: {0}")] Identity(#[from] crate::identity::IdentityError), #[error("collection fetch failed: {0}")] Fetch(GetCollectionsError), #[error(transparent)] Storage(#[from] crate::storage::StorageError), /// The PDS returned HTTP 429. Carries the PDS URL for host-level cooldown. #[error("rate limited by {0}")] RateLimited(Arc), #[error("invalid DID in queue: {0}")] InvalidDid(String), #[error("blocking storage task panicked: {0}")] TaskPanic(String), /// The PDS returned a definitive "repo not found" for this DID. /// The repo state has been updated; the caller may schedule a slow retry. #[error("repo not found on PDS")] RepoNotFound, #[error("externally cancelled")] Cancelled, #[error("wtf, {0}")] Wtf(String), } /// A snapshot of a repository's state as observed during a resync. #[derive(Debug, PartialEq)] pub struct RepoSnapshot { /// Sorted list of collection NSIDs present in the repository. pub collections: BTreeSet>, // TODO btreeset /// Revision TID of the latest commit. pub rev: Tid, /// MST root CID pub data: RawCid, } /// Why fetching a repository's collection list failed. #[derive(Debug, thiserror::Error)] pub enum GetCollectionsError { /// No repository exists for this DID on the PDS. #[error("repo not found")] RepoNotFound, /// The repo exists but its current state makes it inaccessible. #[error("repo {0}")] RepoGone(RepoGoneReason), /// The PDS rate-limited this request (HTTP 429). #[error("rate limited by {0} (HTTP 429)")] RateLimited(String), /// Network or HTTP failure; may be transient. #[error("request failed: {0}")] Request(String), /// The server returned data that could not be parsed or is structurally invalid. #[error("invalid data: {0}")] InvalidData(String), /// The server returned an unrecognised XRPC error code. #[error("unexpected XRPC error: {0}")] UnexpectedXrpc(String), /// The repo is likely tiny, intentionally fall through to sync.getRepo #[error("should getRepo because it's likely tiny")] GetSmallRepo, /// The request was externally cancelled #[error("externally cancelled")] Cancelled, #[error("wtf!?, {0}")] Wtf(String), } /// The specific reason a repository is inaccessible. #[derive(Debug, thiserror::Error)] pub enum RepoGoneReason { #[error("taken down")] Takendown, #[error("suspended")] Suspended, #[error("deactivated")] Deactivated, } /// Bundle of useful stuff from the app /// /// should probably become a proper AppState #[derive(Clone)] pub struct AppStuff { resolver: Arc, client: crate::http::ThrottledClient, db: DbRef, token: tokio_util::sync::CancellationToken, } /// Establish the current collection set for `did` and write it to the index. /// /// Tries `describeRepo` first (cheap). Falls back to a full `getRepo` CAR walk /// if the PDS doesn't support it or returns an empty collection list. /// /// Returns `Ok(())` silently if the repo is not found or inaccessible; /// the caller decides whether to retry on transient errors. pub async fn index_repo( AppStuff { resolver, client, db, token, }: AppStuff, did: Did<'_>, describe_timeout: Duration, get_repo_timeout: Duration, force_get_repo: bool, ) -> Result<()> { // Own the DID for the duration of the function so we can move it into // spawn_blocking closures (which require 'static captures). let did: Did<'static> = did.into_static(); let Some(resolved) = token.run(resolver.resolve(&did)).await else { return Err(ResyncError::Cancelled); }; let resolved = resolved?; let base = &*resolved.pds; let Some(_) = base.host_str() else { error!(did = %did, host = %resolved.pds, "index_repo received a URL without a host???"); return Err(ResyncError::Wtf(format!( "URL without host?? {:?}", resolved.pds ))); }; let repo_snapshot = match fetch_collections( &client, base, did.clone(), token, describe_timeout, get_repo_timeout, force_get_repo, ) .await { Ok(s) => s, Err(GetCollectionsError::RepoNotFound) => { info!(did = %did.as_str(), "repo not found on PDS; marking state"); let db = db.clone(); tokio::task::spawn_blocking(move || mark_not_found(&db, &did)) .await .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; return Err(ResyncError::RepoNotFound); } Err(GetCollectionsError::RepoGone(reason)) => { let (state, status) = match reason { RepoGoneReason::Takendown => (RepoState::Takendown, AccountStatus::Takendown), RepoGoneReason::Suspended => (RepoState::Suspended, AccountStatus::Suspended), RepoGoneReason::Deactivated => (RepoState::Deactivated, AccountStatus::Deactivated), }; info!(did = %did.as_str(), status = status.as_str(), "repo gone; updating account status"); let db = db.clone(); tokio::task::spawn_blocking(move || update_account_status(&db, &did, state, status)) .await .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; return Ok(()); } Err(GetCollectionsError::RateLimited(_)) => { return Err(ResyncError::RateLimited(resolved.pds.clone())); } Err(e) => return Err(ResyncError::Fetch(e)), }; let db_w = db.clone(); let collections = repo_snapshot.collections; let rev = repo_snapshot.rev; let prev_data = repo_snapshot.data.to_bytes(); let (n_inserted, n_removed) = tokio::task::spawn_blocking(move || { crate::storage::repo::put_prev(&db_w, &did, &RepoPrev { rev, prev_data })?; crate::storage::collection_index::sync_collections(&db_w, &did, &collections) }) .await .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; if n_inserted > 0 { db.stats .collection_births_resync .fetch_add(n_inserted as u64, Ordering::Relaxed); metrics::counter!("lightrail_collection_births_total", "source" => "resync") .increment(n_inserted as u64); } if n_removed > 0 { db.stats .collection_deaths_resync .fetch_add(n_removed as u64, Ordering::Relaxed); metrics::counter!("lightrail_collection_deaths_total", "source" => "resync") .increment(n_removed as u64); } Ok(()) } /// Update the stored `RepoState` and `AccountStatus` for a repo that the PDS /// reports as gone (takendown, suspended, or deactivated). /// /// If no record exists yet (e.g. a backfill race), one is created. The `error` /// field is cleared since the gone state is fully described by `state`/`status`. fn update_account_status( db: &DbRef, did: &Did<'_>, state: RepoState, status: AccountStatus, ) -> Result<()> { let new_info = match crate::storage::repo::get(db, did)? { Some((mut info, _)) => { info.state = state; info.status = status; info.error = None; info } None => RepoInfo { state, status, error: None, }, }; crate::storage::repo::put_info(db, did, &new_info)?; Ok(()) } /// Set `RepoState::NotFound` for a DID whose PDS reports no repo. /// /// Preserves the existing `AccountStatus` if a record exists (we don't know /// the true account status from a not-found response). The `error` field is /// cleared since the state is fully described by `RepoState::NotFound`. fn mark_not_found(db: &DbRef, did: &Did<'_>) -> Result<()> { let new_info = match crate::storage::repo::get(db, did)? { Some((mut info, _)) => { info.state = RepoState::NotFound; info.error = None; info } None => RepoInfo { state: RepoState::NotFound, status: AccountStatus::Active, error: None, }, }; crate::storage::repo::put_info(db, did, &new_info)?; Ok(()) } /// Try each approach in preference order, falling back to `get_repo` selectively. /// /// Falls back to the `get_repo` CAR walk when `describe_repo`: /// - returns an empty collection list (possible PDS bug or unpaginated large repo), /// - fails with any error other than rate-limiting or a definitive gone/not-found. /// /// A 429 rate-limit response is propagated immediately rather than escalated to a /// heavier `getRepo` request. Definitive `RepoNotFound`/`RepoGone` results are also /// propagated, since `getRepo` would return the same answer. async fn fetch_collections( client: &C, base: &jacquard_common::url::Url, did: Did<'_>, token: tokio_util::sync::CancellationToken, describe_timeout: Duration, get_repo_timeout: Duration, force_get_repo: bool, ) -> std::result::Result where C: HttpClient + HttpClientExt + Sync, { if !force_get_repo { let Some(describe_result) = token .timeout( describe_timeout, describe_repo::fetch_collections(client, base, did.clone()), ) .await else { return Err(GetCollectionsError::Cancelled); }; match describe_result { Ok(Ok(snapshot)) => { metrics::counter!("lightrail_resync_fetch_total", "source" => "describe_repo") .increment(1); return Ok(snapshot); } // Rate-limited: don't escalate to a heavier getRepo request. Ok(Err(e @ GetCollectionsError::RateLimited(_))) => return Err(e), // Definitively gone: getRepo would return the same answer. Ok(Err(e @ (GetCollectionsError::RepoNotFound | GetCollectionsError::RepoGone(_)))) => { return Err(e); } // Any other failure or timeout: fall through. The PDS may not implement // describeRepo, or may have a bug this endpoint doesn't hit. Ok(Err(GetCollectionsError::GetSmallRepo)) | Ok(Err(_)) | Err(_) => {} } } let res = token .timeout( get_repo_timeout, get_repo::fetch_collections(client, base, did), ) .await .ok_or(GetCollectionsError::Cancelled)? .unwrap_or_else(|_| { Err(GetCollectionsError::Request( "getRepo timed out".to_string(), )) })?; metrics::counter!("lightrail_resync_fetch_total", "source" => "get_repo").increment(1); Ok(res) }