lightweight com.atproto.sync.listReposByCollection
at main 504 lines 18 kB view raw
1//! Walk `com.atproto.sync.listRepos` and feed newly discovered repos into the 2//! resync queue for the dispatcher to process. 3//! 4//! Each page of results is processed and the cursor persisted before moving 5//! on, so the walk can be safely resumed after a restart. Already-known repos 6//! (any state) are skipped — the dispatcher's retry mechanism handles repos 7//! that need re-syncing. 8//! 9//! Accounts listed as non-active (takendown, suspended, deactivated, deleted) 10//! have their status recorded without queuing a resync — there's nothing to 11//! fetch. 12 13use std::sync::Arc; 14 15use jacquard_api::com_atproto::sync::list_repos::{ListRepos, RepoStatus}; 16use jacquard_common::{ 17 error::ClientErrorKind, 18 types::{crypto::PublicKey, string::Did}, 19 url::{Host, Url}, 20 {IntoStatic, xrpc::XrpcExt}, 21}; 22use tokio::time::Duration; 23use tokio_util::sync::CancellationToken; 24use tracing::{debug, error, info, trace, warn}; 25 26use crate::{ 27 error::Result, 28 http::ThrottledClient, 29 identity::Resolver, 30 storage::{ 31 self, DbRef, 32 backfill_progress::{BackfillProgress, get, set}, 33 repo::{AccountStatus, RepoInfo, RepoState}, 34 }, 35 sync::discovery_queue::{DiscoveryItem, DiscoveryQueue}, 36 util::TokenExt, 37}; 38use std::sync::atomic::Ordering; 39use std::time::SystemTime; 40 41const PAGE_LIMIT: i64 = 500; 42 43// --------------------------------------------------------------------------- 44// Mode + retry policy 45// --------------------------------------------------------------------------- 46 47/// Which host role this backfill is running against. 48/// 49/// Bundles two decisions that always move together: 50/// - whether to validate DIDs against the host (trust model), and 51/// - how patiently to retry transient page failures (criticality). 52#[derive(Clone, Copy, Debug, PartialEq, Eq)] 53pub enum BackfillMode { 54 /// Primary relay / upstream host. Trusted for DID listings; retried 55 /// generously because a transient outage here will take the whole service 56 /// down when the task exits. 57 Relay, 58 /// Untrusted PDS discovered via deep crawl. DIDs are validated against the 59 /// host, and a few transient failures are enough to move on. 60 DeepCrawl, 61} 62 63impl BackfillMode { 64 /// True when listed DIDs must be independently verified to actually live 65 /// on `host` before we trust their account status. 66 fn validates_dids(self) -> bool { 67 matches!(self, Self::DeepCrawl) 68 } 69 70 fn retry_policy(self) -> RetryPolicy { 71 match self { 72 Self::Relay => RetryPolicy::RELAY, 73 Self::DeepCrawl => RetryPolicy::DEEP_CRAWL, 74 } 75 } 76} 77 78/// How many transient page failures to tolerate and how long to wait between. 79struct RetryPolicy { 80 max_page_failures: u32, 81 retry_delay: Duration, 82} 83 84impl RetryPolicy { 85 /// Relay budget: 60 attempts × 30s = 30 minutes of retries before giving 86 /// up and triggering service shutdown. Enough to absorb typical relay 87 /// hiccups without flapping. 88 const RELAY: Self = Self { 89 max_page_failures: 60, 90 retry_delay: Duration::from_secs(30), 91 }; 92 /// Deep-crawl budget: 3 × 10s. Fast give-up so dead PDSes don't monopolise 93 /// crawl workers. 94 const DEEP_CRAWL: Self = Self { 95 max_page_failures: 3, 96 retry_delay: Duration::from_secs(10), 97 }; 98} 99 100// --------------------------------------------------------------------------- 101// Listed account state 102// --------------------------------------------------------------------------- 103 104/// Map the `active`/`status` fields from a listRepos `Repo` entry to a 105/// [`ListedAccountState`]. Follows the same mapping as firehose `#account` 106/// events in `account_event.rs`. 107fn classify_repo(active: Option<bool>, status: &Option<RepoStatus<'_>>) -> RepoState { 108 // active-as-fallback is safe: 109 if active.unwrap_or(true) { 110 return RepoState::Active; 111 } 112 let Some(jac_status) = status else { 113 return RepoState::Active; 114 }; 115 match jac_status { 116 RepoStatus::Takendown => RepoState::Takendown, 117 RepoStatus::Suspended => RepoState::Suspended, 118 RepoStatus::Deleted => RepoState::Deleted, 119 RepoStatus::Deactivated => RepoState::Deactivated, 120 RepoStatus::Desynchronized => RepoState::Desynchronized, 121 RepoStatus::Throttled => RepoState::Throttled, 122 RepoStatus::Other(_) => RepoState::Error, 123 } 124} 125 126// --------------------------------------------------------------------------- 127// Main loop 128// --------------------------------------------------------------------------- 129 130/// Walk `listRepos` on `host` and enqueue new repos for resync. 131/// 132/// `mode` selects both the trust model (whether DIDs are validated against 133/// `host`) and the retry policy (how long we retry transient page failures 134/// before giving up). See [`BackfillMode`]. 135/// 136/// In `DeepCrawl` mode, DIDs are verified to actually live on `host` and 137/// non-matching ones are rejected. Validated DIDs are considered authoritative 138/// for account status — if the PDS says an account is deactivated, we trust 139/// it even if our local state is Active. In `Relay` mode we only write 140/// non-active status for accounts we haven't seen before, to avoid 141/// overwriting Active state with stale relay data (e.g. accounts that 142/// migrated off and appear deactivated on the old PDS). 143pub async fn run( 144 host: Host, 145 db: DbRef, 146 client: ThrottledClient, 147 token: CancellationToken, 148 resolver: Arc<Resolver>, 149 mode: BackfillMode, 150 discovery_queue: Arc<DiscoveryQueue>, 151) -> Result<bool> { 152 let validate = mode.validates_dids(); 153 let retry_policy = mode.retry_policy(); 154 let base: jacquard_common::url::Url = format!("https://{host}") 155 .parse() 156 .map_err(|e: jacquard_common::url::ParseError| crate::error::Error::Other(e.to_string()))?; 157 158 // Resume from the last saved cursor (empty string → start from beginning). 159 let mut cursor: Option<String> = { 160 let db = db.clone(); 161 let host = host.clone(); 162 tokio::task::spawn_blocking(move || get(&db, &host)).await?? 163 } 164 .map(|p| p.cursor); 165 166 info!( 167 host = %host, 168 resume_cursor = cursor.as_deref().unwrap_or("(start)"), 169 "backfill started" 170 ); 171 172 let mut total_queued: u64 = 0; 173 174 loop { 175 if token.is_cancelled() { 176 return Ok(false); 177 } 178 179 let (dids, next_cursor) = match fetch_page( 180 &host, 181 &base, 182 cursor.as_deref(), 183 &client, 184 &token, 185 &retry_policy, 186 ) 187 .await 188 { 189 Some(page) => page, 190 None => return Ok(false), 191 }; 192 193 let page_len = dids.len(); 194 195 // For untrusted hosts (deep crawl), filter DIDs to those whose 196 // resolved PDS actually matches this host. 197 let dids = if validate { 198 validate_dids(dids, &resolver, &host, &token).await 199 } else { 200 dids 201 }; 202 203 // Resolve each DID's actual PDS host for discovery queue routing. 204 // Cache hits are free; misses fall back to the listed host. 205 // 206 // TODO: ...this is basically redundant with validate_dids now? 207 let dids_with_hosts: Vec<(Did<'static>, Arc<Url>, PublicKey<'static>, RepoState)> = { 208 let mut out = Vec::with_capacity(dids.len()); 209 for (did, account_state) in dids { 210 let Some(res) = token.run(resolver.resolve(&did)).await else { 211 return Ok(false); // cancelled 212 }; 213 let resolved = match res { 214 Ok(resolved) => resolved, 215 Err(e) => { 216 error!(did = %did, error = %e, "failed to resolve host for validated did; skipping"); 217 continue; 218 } 219 }; 220 out.push(( 221 did, 222 resolved.pds.clone(), 223 resolved.pubkey.clone(), 224 account_state, 225 )); 226 } 227 out 228 }; 229 230 let progress_cursor = next_cursor.clone().unwrap_or_default(); 231 let (page_queued, page_inactive, new_items) = { 232 let db = db.clone(); 233 let host = host.clone(); 234 tokio::task::spawn_blocking(move || { 235 persist_page(&db, &host, dids_with_hosts, progress_cursor, validate) 236 }) 237 .await?? 238 }; 239 240 // Push newly-discovered repos into the in-memory discovery queue. 241 // Each push may block if the queue is full (backpressure). 242 for (did, pds, pubkey) in new_items { 243 let Some(_) = token 244 .run(discovery_queue.push(DiscoveryItem { did, pds, pubkey })) 245 .await 246 else { 247 return Ok(false); 248 }; 249 } 250 251 total_queued += page_queued; 252 253 trace!( 254 host = %host, 255 page_repos = page_len, 256 page_queued, 257 page_inactive, 258 total_queued, 259 next_cursor = next_cursor.as_deref().unwrap_or("(done)"), 260 "backfill page" 261 ); 262 263 if next_cursor != cursor { 264 cursor = next_cursor; 265 continue; 266 } 267 268 if let Some(c) = next_cursor { 269 warn!(host = %host, cursor = c, "evil cursor! (unchanged), bailing on this host."); 270 // TODO: mark host as not trustworthy 271 }; 272 273 // persist cursor so we can restart 274 let db = db.clone(); 275 let host_owned = host.clone(); 276 tokio::task::spawn_blocking(move || { 277 set( 278 &db, 279 &host_owned, 280 &BackfillProgress { 281 cursor: "".to_string(), 282 completed_at: Some(crate::util::to_millis(SystemTime::now()).to_string()), 283 }, 284 ) 285 }) 286 .await??; 287 info!(host = %host, total_queued, "backfill complete"); 288 return Ok(true); 289 } 290} 291 292// --------------------------------------------------------------------------- 293// Page fetch 294// --------------------------------------------------------------------------- 295 296/// Fetch one `listRepos` page with retry logic. 297/// 298/// Returns `None` if the host gives a 4xx, exceeds `policy.max_page_failures` 299/// transient errors, or the token is cancelled (including mid-request). 300async fn fetch_page( 301 host: &Host, 302 base: &jacquard_common::url::Url, 303 cursor: Option<&str>, 304 client: &ThrottledClient, 305 token: &CancellationToken, 306 policy: &RetryPolicy, 307) -> Option<(Vec<(Did<'static>, RepoState)>, Option<String>)> { 308 let req = ListRepos { 309 cursor: cursor.map(Into::into), 310 limit: Some(PAGE_LIMIT), 311 }; 312 let mut failures: u32 = 0; 313 loop { 314 let result = match token.run(client.xrpc(base.clone()).send(&req)).await? { 315 Err(e) => { 316 let is_client_err = matches!( 317 e.kind(), 318 ClientErrorKind::Http { status } if status.is_client_error() 319 ); 320 if is_client_err { 321 warn!(error = %e, host = %host, 322 "listRepos failed with client error; giving up on this host"); 323 return None; 324 } 325 warn!(error = %e, host = %host, "listRepos request failed"); 326 None 327 } 328 Ok(resp) => match resp.parse() { 329 Ok(out) => { 330 let next = out.cursor.as_deref().map(str::to_owned); 331 let listed = out 332 .repos 333 .into_iter() 334 .map(|r| { 335 let state = classify_repo(r.active, &r.status); 336 (r.did.into_static(), state) 337 }) 338 .collect::<Vec<_>>(); 339 Some((listed, next)) 340 } 341 Err(e) => { 342 warn!(error = %e, host = %host, "listRepos response parse failed"); 343 None 344 } 345 }, 346 }; 347 348 match result { 349 Some(page) => return Some(page), 350 None => { 351 failures += 1; 352 if failures >= policy.max_page_failures { 353 warn!(host = %host, failures, max = policy.max_page_failures, 354 "listRepos page failed too many times; giving up on this host"); 355 return None; 356 } 357 if !token.sleep(policy.retry_delay).await { 358 return None; 359 } 360 } 361 } 362 } 363} 364 365// --------------------------------------------------------------------------- 366// DID validation 367// --------------------------------------------------------------------------- 368 369/// Filter `dids` to those whose resolved PDS endpoint matches `host`. 370/// 371/// Returns early with whatever has been validated so far if the token is cancelled. 372async fn validate_dids( 373 dids: Vec<(Did<'static>, RepoState)>, 374 resolver: &Resolver, 375 host: &Host, 376 token: &CancellationToken, 377) -> Vec<(Did<'static>, RepoState)> { 378 let host_str = host.to_string(); 379 let mut valid = Vec::with_capacity(dids.len()); 380 for (did, account_state) in dids { 381 let Some(r) = token.run(resolver.resolve(&did)).await else { 382 break; 383 }; 384 match r { 385 Ok(resolved) if resolved.pds.host_str() == Some(host_str.as_str()) => { 386 valid.push((did, account_state)); 387 } 388 Ok(resolved) => { 389 metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "pds_mismatch") 390 .increment(1); 391 trace!(did = %did, resolved_pds = %resolved.pds, expected = %host, 392 "DID resolves to different PDS; skipping"); 393 } 394 Err(e) => { 395 metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "resolution_failed") 396 .increment(1); 397 trace!(did = %did, error = %e, 398 "DID resolution failed during backfill validation; skipping"); 399 } 400 }; 401 } 402 valid 403} 404 405// --------------------------------------------------------------------------- 406// Storage commit 407// --------------------------------------------------------------------------- 408 409type DidWithPds = (Did<'static>, Arc<Url>, PublicKey<'static>); 410 411/// Insert newly-seen DIDs into the repo table and persist the backfill cursor. 412/// 413/// Active accounts are returned for the caller to push into the discovery 414/// queue. Non-active accounts have their status written directly (no resync 415/// needed). When `authoritative` is false (relay backfill), non-active status 416/// is only written for accounts that have no existing Active record — we don't 417/// overwrite a locally-Active account with stale relay data. 418/// 419/// Returns `(active_count, inactive_count, new_active_items)`. 420fn persist_page( 421 db: &DbRef, 422 host: &Host, 423 items: Vec<(Did<'static>, Arc<Url>, PublicKey<'static>, RepoState)>, 424 progress_cursor: String, 425 authoritative: bool, 426) -> Result<(u64, u64, Vec<DidWithPds>)> { 427 let mut new_active: Vec<DidWithPds> = Vec::new(); 428 let mut inactive_count: u64 = 0; 429 for (did, pds, pubkey, repo_state) in items { 430 if let Some(deactiveated_account_status) = repo_state.to_account_inactive() { 431 if write_inactive_status( 432 db, 433 &did, 434 deactiveated_account_status, 435 repo_state, 436 authoritative, 437 )? { 438 inactive_count += 1; 439 } 440 } else { 441 let newly_inserted = storage::repo::ensure_repo(db, &did)?; 442 if newly_inserted { 443 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed); 444 new_active.push((did, pds, pubkey)); 445 } 446 } 447 } 448 // Persist progress before advancing so a crash during the next 449 // page restarts from here, not the beginning. 450 set( 451 db, 452 host, 453 &BackfillProgress { 454 cursor: progress_cursor, 455 completed_at: None, 456 }, 457 )?; 458 Ok((new_active.len() as u64, inactive_count, new_active)) 459} 460 461/// Write a non-active account status from a listRepos entry. 462/// 463/// Returns `true` if the status was written, `false` if skipped. 464/// 465/// When `authoritative` is true (deep crawl — PDS confirmed via identity 466/// resolution), the status is always written. When false (relay backfill), 467/// we only write if the existing local status is not Active, to avoid 468/// overwriting with stale data (e.g. an account that migrated off this PDS 469/// and shows as deactivated on the old host). 470fn write_inactive_status( 471 db: &DbRef, 472 did: &Did<'_>, 473 status: AccountStatus, 474 state: RepoState, 475 authoritative: bool, 476) -> Result<bool> { 477 let existing = storage::repo::get_status(db, did)?; 478 match existing { 479 Some(ref existing_status) if existing_status.is_active() && !authoritative => { 480 debug!( 481 did = %did, 482 listed_status = status.as_str(), 483 "skipping non-active status from non-authoritative source; local account is active" 484 ); 485 return Ok(false); 486 } 487 Some(ref existing_status) if *existing_status == status => { 488 return Ok(false); // already matches 489 } 490 _ => {} 491 } 492 let info = RepoInfo { 493 state, 494 status: status.clone(), 495 error: None, 496 }; 497 storage::repo::put_info(db, did, &info)?; 498 metrics::counter!( 499 "lightrail_backfill_inactive_total", 500 "status" => status.as_str() 501 ) 502 .increment(1); 503 Ok(true) 504}