lightweight com.atproto.sync.listReposByCollection
at main 349 lines 12 kB view raw
1//! Establish ground truth about a repository's collections. 2//! 3//! "Resync" covers three scenarios: 4//! 5//! - **Backfill**: first-time indexing of a repo found via `listRepos`; diff 6//! against an empty set. 7//! - **`#sync` event**: the PDS declared a repo discontinuity; diff against 8//! the current index entries. 9//! - **Detected discontinuity**: a firehose commit's `prev` CID doesn't match 10//! our record; diff against the current index entries. 11//! 12//! Each submodule implements one approach to fetching the ground-truth 13//! collection list as `fetch_collections(client, base, did: Did<'_>) 14//! -> std::result::Result<RepoSnapshot, GetCollectionsError>`. 15//! This uniform signature makes it straightforward to test approaches in 16//! isolation and to add new ones later. 17 18pub mod describe_repo; 19pub mod dispatcher; 20pub mod get_repo; 21 22use std::collections::BTreeSet; 23use std::sync::Arc; 24use std::sync::atomic::Ordering; 25use std::time::Duration; 26 27use cid::Cid as RawCid; 28use jacquard_common::{ 29 IntoStatic, 30 http_client::{HttpClient, HttpClientExt}, 31 types::string::{Did, Nsid, Tid}, 32}; 33use tracing::{error, info}; 34 35use crate::storage::DbRef; 36use crate::storage::repo::{AccountStatus, RepoInfo, RepoPrev, RepoState}; 37use crate::util::TokenExt; 38 39pub use dispatcher::DispatcherConfig; 40 41type Result<T> = std::result::Result<T, ResyncError>; 42 43/// Errors that can occur during a resync operation. 44#[derive(Debug, thiserror::Error)] 45pub enum ResyncError { 46 #[error("identity resolution failed: {0}")] 47 Identity(#[from] crate::identity::IdentityError), 48 #[error("collection fetch failed: {0}")] 49 Fetch(GetCollectionsError), 50 #[error(transparent)] 51 Storage(#[from] crate::storage::StorageError), 52 /// The PDS returned HTTP 429. Carries the PDS URL for host-level cooldown. 53 #[error("rate limited by {0}")] 54 RateLimited(Arc<jacquard_common::url::Url>), 55 #[error("invalid DID in queue: {0}")] 56 InvalidDid(String), 57 #[error("blocking storage task panicked: {0}")] 58 TaskPanic(String), 59 /// The PDS returned a definitive "repo not found" for this DID. 60 /// The repo state has been updated; the caller may schedule a slow retry. 61 #[error("repo not found on PDS")] 62 RepoNotFound, 63 #[error("externally cancelled")] 64 Cancelled, 65 #[error("wtf, {0}")] 66 Wtf(String), 67} 68 69/// A snapshot of a repository's state as observed during a resync. 70#[derive(Debug, PartialEq)] 71pub struct RepoSnapshot { 72 /// Sorted list of collection NSIDs present in the repository. 73 pub collections: BTreeSet<Nsid<'static>>, // TODO btreeset 74 /// Revision TID of the latest commit. 75 pub rev: Tid, 76 /// MST root CID 77 pub data: RawCid, 78} 79 80/// Why fetching a repository's collection list failed. 81#[derive(Debug, thiserror::Error)] 82pub enum GetCollectionsError { 83 /// No repository exists for this DID on the PDS. 84 #[error("repo not found")] 85 RepoNotFound, 86 /// The repo exists but its current state makes it inaccessible. 87 #[error("repo {0}")] 88 RepoGone(RepoGoneReason), 89 /// The PDS rate-limited this request (HTTP 429). 90 #[error("rate limited by {0} (HTTP 429)")] 91 RateLimited(String), 92 /// Network or HTTP failure; may be transient. 93 #[error("request failed: {0}")] 94 Request(String), 95 /// The server returned data that could not be parsed or is structurally invalid. 96 #[error("invalid data: {0}")] 97 InvalidData(String), 98 /// The server returned an unrecognised XRPC error code. 99 #[error("unexpected XRPC error: {0}")] 100 UnexpectedXrpc(String), 101 /// The repo is likely tiny, intentionally fall through to sync.getRepo 102 #[error("should getRepo because it's likely tiny")] 103 GetSmallRepo, 104 /// The request was externally cancelled 105 #[error("externally cancelled")] 106 Cancelled, 107 #[error("wtf!?, {0}")] 108 Wtf(String), 109} 110 111/// The specific reason a repository is inaccessible. 112#[derive(Debug, thiserror::Error)] 113pub enum RepoGoneReason { 114 #[error("taken down")] 115 Takendown, 116 #[error("suspended")] 117 Suspended, 118 #[error("deactivated")] 119 Deactivated, 120} 121 122/// Bundle of useful stuff from the app 123/// 124/// should probably become a proper AppState 125#[derive(Clone)] 126pub struct AppStuff { 127 resolver: Arc<crate::identity::Resolver>, 128 client: crate::http::ThrottledClient, 129 db: DbRef, 130 token: tokio_util::sync::CancellationToken, 131} 132 133/// Establish the current collection set for `did` and write it to the index. 134/// 135/// Tries `describeRepo` first (cheap). Falls back to a full `getRepo` CAR walk 136/// if the PDS doesn't support it or returns an empty collection list. 137/// 138/// Returns `Ok(())` silently if the repo is not found or inaccessible; 139/// the caller decides whether to retry on transient errors. 140pub async fn index_repo( 141 AppStuff { 142 resolver, 143 client, 144 db, 145 token, 146 }: AppStuff, 147 did: Did<'_>, 148 describe_timeout: Duration, 149 get_repo_timeout: Duration, 150 force_get_repo: bool, 151) -> Result<()> { 152 // Own the DID for the duration of the function so we can move it into 153 // spawn_blocking closures (which require 'static captures). 154 let did: Did<'static> = did.into_static(); 155 156 let Some(resolved) = token.run(resolver.resolve(&did)).await else { 157 return Err(ResyncError::Cancelled); 158 }; 159 let resolved = resolved?; 160 let base = &*resolved.pds; 161 let Some(_) = base.host_str() else { 162 error!(did = %did, host = %resolved.pds, "index_repo received a URL without a host???"); 163 return Err(ResyncError::Wtf(format!( 164 "URL without host?? {:?}", 165 resolved.pds 166 ))); 167 }; 168 169 let repo_snapshot = match fetch_collections( 170 &client, 171 base, 172 did.clone(), 173 token, 174 describe_timeout, 175 get_repo_timeout, 176 force_get_repo, 177 ) 178 .await 179 { 180 Ok(s) => s, 181 Err(GetCollectionsError::RepoNotFound) => { 182 info!(did = %did.as_str(), "repo not found on PDS; marking state"); 183 let db = db.clone(); 184 tokio::task::spawn_blocking(move || mark_not_found(&db, &did)) 185 .await 186 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 187 return Err(ResyncError::RepoNotFound); 188 } 189 Err(GetCollectionsError::RepoGone(reason)) => { 190 let (state, status) = match reason { 191 RepoGoneReason::Takendown => (RepoState::Takendown, AccountStatus::Takendown), 192 RepoGoneReason::Suspended => (RepoState::Suspended, AccountStatus::Suspended), 193 RepoGoneReason::Deactivated => (RepoState::Deactivated, AccountStatus::Deactivated), 194 }; 195 info!(did = %did.as_str(), status = status.as_str(), "repo gone; updating account status"); 196 let db = db.clone(); 197 tokio::task::spawn_blocking(move || update_account_status(&db, &did, state, status)) 198 .await 199 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 200 return Ok(()); 201 } 202 Err(GetCollectionsError::RateLimited(_)) => { 203 return Err(ResyncError::RateLimited(resolved.pds.clone())); 204 } 205 Err(e) => return Err(ResyncError::Fetch(e)), 206 }; 207 208 let db_w = db.clone(); 209 let collections = repo_snapshot.collections; 210 let rev = repo_snapshot.rev; 211 let prev_data = repo_snapshot.data.to_bytes(); 212 let (n_inserted, n_removed) = tokio::task::spawn_blocking(move || { 213 crate::storage::repo::put_prev(&db_w, &did, &RepoPrev { rev, prev_data })?; 214 crate::storage::collection_index::sync_collections(&db_w, &did, &collections) 215 }) 216 .await 217 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??; 218 if n_inserted > 0 { 219 db.stats 220 .collection_births_resync 221 .fetch_add(n_inserted as u64, Ordering::Relaxed); 222 metrics::counter!("lightrail_collection_births_total", "source" => "resync") 223 .increment(n_inserted as u64); 224 } 225 if n_removed > 0 { 226 db.stats 227 .collection_deaths_resync 228 .fetch_add(n_removed as u64, Ordering::Relaxed); 229 metrics::counter!("lightrail_collection_deaths_total", "source" => "resync") 230 .increment(n_removed as u64); 231 } 232 Ok(()) 233} 234 235/// Update the stored `RepoState` and `AccountStatus` for a repo that the PDS 236/// reports as gone (takendown, suspended, or deactivated). 237/// 238/// If no record exists yet (e.g. a backfill race), one is created. The `error` 239/// field is cleared since the gone state is fully described by `state`/`status`. 240fn update_account_status( 241 db: &DbRef, 242 did: &Did<'_>, 243 state: RepoState, 244 status: AccountStatus, 245) -> Result<()> { 246 let new_info = match crate::storage::repo::get(db, did)? { 247 Some((mut info, _)) => { 248 info.state = state; 249 info.status = status; 250 info.error = None; 251 info 252 } 253 None => RepoInfo { 254 state, 255 status, 256 error: None, 257 }, 258 }; 259 crate::storage::repo::put_info(db, did, &new_info)?; 260 Ok(()) 261} 262 263/// Set `RepoState::NotFound` for a DID whose PDS reports no repo. 264/// 265/// Preserves the existing `AccountStatus` if a record exists (we don't know 266/// the true account status from a not-found response). The `error` field is 267/// cleared since the state is fully described by `RepoState::NotFound`. 268fn mark_not_found(db: &DbRef, did: &Did<'_>) -> Result<()> { 269 let new_info = match crate::storage::repo::get(db, did)? { 270 Some((mut info, _)) => { 271 info.state = RepoState::NotFound; 272 info.error = None; 273 info 274 } 275 None => RepoInfo { 276 state: RepoState::NotFound, 277 status: AccountStatus::Active, 278 error: None, 279 }, 280 }; 281 crate::storage::repo::put_info(db, did, &new_info)?; 282 Ok(()) 283} 284 285/// Try each approach in preference order, falling back to `get_repo` selectively. 286/// 287/// Falls back to the `get_repo` CAR walk when `describe_repo`: 288/// - returns an empty collection list (possible PDS bug or unpaginated large repo), 289/// - fails with any error other than rate-limiting or a definitive gone/not-found. 290/// 291/// A 429 rate-limit response is propagated immediately rather than escalated to a 292/// heavier `getRepo` request. Definitive `RepoNotFound`/`RepoGone` results are also 293/// propagated, since `getRepo` would return the same answer. 294async fn fetch_collections<C>( 295 client: &C, 296 base: &jacquard_common::url::Url, 297 did: Did<'_>, 298 token: tokio_util::sync::CancellationToken, 299 describe_timeout: Duration, 300 get_repo_timeout: Duration, 301 force_get_repo: bool, 302) -> std::result::Result<RepoSnapshot, GetCollectionsError> 303where 304 C: HttpClient + HttpClientExt + Sync, 305{ 306 if !force_get_repo { 307 let Some(describe_result) = token 308 .timeout( 309 describe_timeout, 310 describe_repo::fetch_collections(client, base, did.clone()), 311 ) 312 .await 313 else { 314 return Err(GetCollectionsError::Cancelled); 315 }; 316 317 match describe_result { 318 Ok(Ok(snapshot)) => { 319 metrics::counter!("lightrail_resync_fetch_total", "source" => "describe_repo") 320 .increment(1); 321 return Ok(snapshot); 322 } 323 // Rate-limited: don't escalate to a heavier getRepo request. 324 Ok(Err(e @ GetCollectionsError::RateLimited(_))) => return Err(e), 325 // Definitively gone: getRepo would return the same answer. 326 Ok(Err(e @ (GetCollectionsError::RepoNotFound | GetCollectionsError::RepoGone(_)))) => { 327 return Err(e); 328 } 329 // Any other failure or timeout: fall through. The PDS may not implement 330 // describeRepo, or may have a bug this endpoint doesn't hit. 331 Ok(Err(GetCollectionsError::GetSmallRepo)) | Ok(Err(_)) | Err(_) => {} 332 } 333 } 334 335 let res = token 336 .timeout( 337 get_repo_timeout, 338 get_repo::fetch_collections(client, base, did), 339 ) 340 .await 341 .ok_or(GetCollectionsError::Cancelled)? 342 .unwrap_or_else(|_| { 343 Err(GetCollectionsError::Request( 344 "getRepo timed out".to_string(), 345 )) 346 })?; 347 metrics::counter!("lightrail_resync_fetch_total", "source" => "get_repo").increment(1); 348 Ok(res) 349}