lightweight
com.atproto.sync.listReposByCollection
1//! Walk `com.atproto.sync.listRepos` and feed newly discovered repos into the
2//! resync queue for the dispatcher to process.
3//!
4//! Each page of results is processed and the cursor persisted before moving
5//! on, so the walk can be safely resumed after a restart. Already-known repos
6//! (any state) are skipped — the dispatcher's retry mechanism handles repos
7//! that need re-syncing.
8//!
9//! Accounts listed as non-active (takendown, suspended, deactivated, deleted)
10//! have their status recorded without queuing a resync — there's nothing to
11//! fetch.
12
13use std::sync::Arc;
14
15use jacquard_api::com_atproto::sync::list_repos::{ListRepos, RepoStatus};
16use jacquard_common::{
17 error::ClientErrorKind,
18 types::{crypto::PublicKey, string::Did},
19 url::{Host, Url},
20 {IntoStatic, xrpc::XrpcExt},
21};
22use tokio::time::Duration;
23use tokio_util::sync::CancellationToken;
24use tracing::{debug, error, info, trace, warn};
25
26use crate::{
27 error::Result,
28 http::ThrottledClient,
29 identity::Resolver,
30 storage::{
31 self, DbRef,
32 backfill_progress::{BackfillProgress, get, set},
33 repo::{AccountStatus, RepoInfo, RepoState},
34 },
35 sync::discovery_queue::{DiscoveryItem, DiscoveryQueue},
36 util::TokenExt,
37};
38use std::sync::atomic::Ordering;
39use std::time::SystemTime;
40
41const PAGE_LIMIT: i64 = 500;
42
43// ---------------------------------------------------------------------------
44// Mode + retry policy
45// ---------------------------------------------------------------------------
46
47/// Which host role this backfill is running against.
48///
49/// Bundles two decisions that always move together:
50/// - whether to validate DIDs against the host (trust model), and
51/// - how patiently to retry transient page failures (criticality).
52#[derive(Clone, Copy, Debug, PartialEq, Eq)]
53pub enum BackfillMode {
54 /// Primary relay / upstream host. Trusted for DID listings; retried
55 /// generously because a transient outage here will take the whole service
56 /// down when the task exits.
57 Relay,
58 /// Untrusted PDS discovered via deep crawl. DIDs are validated against the
59 /// host, and a few transient failures are enough to move on.
60 DeepCrawl,
61}
62
63impl BackfillMode {
64 /// True when listed DIDs must be independently verified to actually live
65 /// on `host` before we trust their account status.
66 fn validates_dids(self) -> bool {
67 matches!(self, Self::DeepCrawl)
68 }
69
70 fn retry_policy(self) -> RetryPolicy {
71 match self {
72 Self::Relay => RetryPolicy::RELAY,
73 Self::DeepCrawl => RetryPolicy::DEEP_CRAWL,
74 }
75 }
76}
77
78/// How many transient page failures to tolerate and how long to wait between.
79struct RetryPolicy {
80 max_page_failures: u32,
81 retry_delay: Duration,
82}
83
84impl RetryPolicy {
85 /// Relay budget: 60 attempts × 30s = 30 minutes of retries before giving
86 /// up and triggering service shutdown. Enough to absorb typical relay
87 /// hiccups without flapping.
88 const RELAY: Self = Self {
89 max_page_failures: 60,
90 retry_delay: Duration::from_secs(30),
91 };
92 /// Deep-crawl budget: 3 × 10s. Fast give-up so dead PDSes don't monopolise
93 /// crawl workers.
94 const DEEP_CRAWL: Self = Self {
95 max_page_failures: 3,
96 retry_delay: Duration::from_secs(10),
97 };
98}
99
100// ---------------------------------------------------------------------------
101// Listed account state
102// ---------------------------------------------------------------------------
103
104/// Map the `active`/`status` fields from a listRepos `Repo` entry to a
105/// [`ListedAccountState`]. Follows the same mapping as firehose `#account`
106/// events in `account_event.rs`.
107fn classify_repo(active: Option<bool>, status: &Option<RepoStatus<'_>>) -> RepoState {
108 // active-as-fallback is safe:
109 if active.unwrap_or(true) {
110 return RepoState::Active;
111 }
112 let Some(jac_status) = status else {
113 return RepoState::Active;
114 };
115 match jac_status {
116 RepoStatus::Takendown => RepoState::Takendown,
117 RepoStatus::Suspended => RepoState::Suspended,
118 RepoStatus::Deleted => RepoState::Deleted,
119 RepoStatus::Deactivated => RepoState::Deactivated,
120 RepoStatus::Desynchronized => RepoState::Desynchronized,
121 RepoStatus::Throttled => RepoState::Throttled,
122 RepoStatus::Other(_) => RepoState::Error,
123 }
124}
125
126// ---------------------------------------------------------------------------
127// Main loop
128// ---------------------------------------------------------------------------
129
130/// Walk `listRepos` on `host` and enqueue new repos for resync.
131///
132/// `mode` selects both the trust model (whether DIDs are validated against
133/// `host`) and the retry policy (how long we retry transient page failures
134/// before giving up). See [`BackfillMode`].
135///
136/// In `DeepCrawl` mode, DIDs are verified to actually live on `host` and
137/// non-matching ones are rejected. Validated DIDs are considered authoritative
138/// for account status — if the PDS says an account is deactivated, we trust
139/// it even if our local state is Active. In `Relay` mode we only write
140/// non-active status for accounts we haven't seen before, to avoid
141/// overwriting Active state with stale relay data (e.g. accounts that
142/// migrated off and appear deactivated on the old PDS).
143pub async fn run(
144 host: Host,
145 db: DbRef,
146 client: ThrottledClient,
147 token: CancellationToken,
148 resolver: Arc<Resolver>,
149 mode: BackfillMode,
150 discovery_queue: Arc<DiscoveryQueue>,
151) -> Result<bool> {
152 let validate = mode.validates_dids();
153 let retry_policy = mode.retry_policy();
154 let base: jacquard_common::url::Url = format!("https://{host}")
155 .parse()
156 .map_err(|e: jacquard_common::url::ParseError| crate::error::Error::Other(e.to_string()))?;
157
158 // Resume from the last saved cursor (empty string → start from beginning).
159 let mut cursor: Option<String> = {
160 let db = db.clone();
161 let host = host.clone();
162 tokio::task::spawn_blocking(move || get(&db, &host)).await??
163 }
164 .map(|p| p.cursor);
165
166 info!(
167 host = %host,
168 resume_cursor = cursor.as_deref().unwrap_or("(start)"),
169 "backfill started"
170 );
171
172 let mut total_queued: u64 = 0;
173
174 loop {
175 if token.is_cancelled() {
176 return Ok(false);
177 }
178
179 let (dids, next_cursor) = match fetch_page(
180 &host,
181 &base,
182 cursor.as_deref(),
183 &client,
184 &token,
185 &retry_policy,
186 )
187 .await
188 {
189 Some(page) => page,
190 None => return Ok(false),
191 };
192
193 let page_len = dids.len();
194
195 // For untrusted hosts (deep crawl), filter DIDs to those whose
196 // resolved PDS actually matches this host.
197 let dids = if validate {
198 validate_dids(dids, &resolver, &host, &token).await
199 } else {
200 dids
201 };
202
203 // Resolve each DID's actual PDS host for discovery queue routing.
204 // Cache hits are free; misses fall back to the listed host.
205 //
206 // TODO: ...this is basically redundant with validate_dids now?
207 let dids_with_hosts: Vec<(Did<'static>, Arc<Url>, PublicKey<'static>, RepoState)> = {
208 let mut out = Vec::with_capacity(dids.len());
209 for (did, account_state) in dids {
210 let Some(res) = token.run(resolver.resolve(&did)).await else {
211 return Ok(false); // cancelled
212 };
213 let resolved = match res {
214 Ok(resolved) => resolved,
215 Err(e) => {
216 error!(did = %did, error = %e, "failed to resolve host for validated did; skipping");
217 continue;
218 }
219 };
220 out.push((
221 did,
222 resolved.pds.clone(),
223 resolved.pubkey.clone(),
224 account_state,
225 ));
226 }
227 out
228 };
229
230 let progress_cursor = next_cursor.clone().unwrap_or_default();
231 let (page_queued, page_inactive, new_items) = {
232 let db = db.clone();
233 let host = host.clone();
234 tokio::task::spawn_blocking(move || {
235 persist_page(&db, &host, dids_with_hosts, progress_cursor, validate)
236 })
237 .await??
238 };
239
240 // Push newly-discovered repos into the in-memory discovery queue.
241 // Each push may block if the queue is full (backpressure).
242 for (did, pds, pubkey) in new_items {
243 let Some(_) = token
244 .run(discovery_queue.push(DiscoveryItem { did, pds, pubkey }))
245 .await
246 else {
247 return Ok(false);
248 };
249 }
250
251 total_queued += page_queued;
252
253 trace!(
254 host = %host,
255 page_repos = page_len,
256 page_queued,
257 page_inactive,
258 total_queued,
259 next_cursor = next_cursor.as_deref().unwrap_or("(done)"),
260 "backfill page"
261 );
262
263 if next_cursor != cursor {
264 cursor = next_cursor;
265 continue;
266 }
267
268 if let Some(c) = next_cursor {
269 warn!(host = %host, cursor = c, "evil cursor! (unchanged), bailing on this host.");
270 // TODO: mark host as not trustworthy
271 };
272
273 // persist cursor so we can restart
274 let db = db.clone();
275 let host_owned = host.clone();
276 tokio::task::spawn_blocking(move || {
277 set(
278 &db,
279 &host_owned,
280 &BackfillProgress {
281 cursor: "".to_string(),
282 completed_at: Some(crate::util::to_millis(SystemTime::now()).to_string()),
283 },
284 )
285 })
286 .await??;
287 info!(host = %host, total_queued, "backfill complete");
288 return Ok(true);
289 }
290}
291
292// ---------------------------------------------------------------------------
293// Page fetch
294// ---------------------------------------------------------------------------
295
296/// Fetch one `listRepos` page with retry logic.
297///
298/// Returns `None` if the host gives a 4xx, exceeds `policy.max_page_failures`
299/// transient errors, or the token is cancelled (including mid-request).
300async fn fetch_page(
301 host: &Host,
302 base: &jacquard_common::url::Url,
303 cursor: Option<&str>,
304 client: &ThrottledClient,
305 token: &CancellationToken,
306 policy: &RetryPolicy,
307) -> Option<(Vec<(Did<'static>, RepoState)>, Option<String>)> {
308 let req = ListRepos {
309 cursor: cursor.map(Into::into),
310 limit: Some(PAGE_LIMIT),
311 };
312 let mut failures: u32 = 0;
313 loop {
314 let result = match token.run(client.xrpc(base.clone()).send(&req)).await? {
315 Err(e) => {
316 let is_client_err = matches!(
317 e.kind(),
318 ClientErrorKind::Http { status } if status.is_client_error()
319 );
320 if is_client_err {
321 warn!(error = %e, host = %host,
322 "listRepos failed with client error; giving up on this host");
323 return None;
324 }
325 warn!(error = %e, host = %host, "listRepos request failed");
326 None
327 }
328 Ok(resp) => match resp.parse() {
329 Ok(out) => {
330 let next = out.cursor.as_deref().map(str::to_owned);
331 let listed = out
332 .repos
333 .into_iter()
334 .map(|r| {
335 let state = classify_repo(r.active, &r.status);
336 (r.did.into_static(), state)
337 })
338 .collect::<Vec<_>>();
339 Some((listed, next))
340 }
341 Err(e) => {
342 warn!(error = %e, host = %host, "listRepos response parse failed");
343 None
344 }
345 },
346 };
347
348 match result {
349 Some(page) => return Some(page),
350 None => {
351 failures += 1;
352 if failures >= policy.max_page_failures {
353 warn!(host = %host, failures, max = policy.max_page_failures,
354 "listRepos page failed too many times; giving up on this host");
355 return None;
356 }
357 if !token.sleep(policy.retry_delay).await {
358 return None;
359 }
360 }
361 }
362 }
363}
364
365// ---------------------------------------------------------------------------
366// DID validation
367// ---------------------------------------------------------------------------
368
369/// Filter `dids` to those whose resolved PDS endpoint matches `host`.
370///
371/// Returns early with whatever has been validated so far if the token is cancelled.
372async fn validate_dids(
373 dids: Vec<(Did<'static>, RepoState)>,
374 resolver: &Resolver,
375 host: &Host,
376 token: &CancellationToken,
377) -> Vec<(Did<'static>, RepoState)> {
378 let host_str = host.to_string();
379 let mut valid = Vec::with_capacity(dids.len());
380 for (did, account_state) in dids {
381 let Some(r) = token.run(resolver.resolve(&did)).await else {
382 break;
383 };
384 match r {
385 Ok(resolved) if resolved.pds.host_str() == Some(host_str.as_str()) => {
386 valid.push((did, account_state));
387 }
388 Ok(resolved) => {
389 metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "pds_mismatch")
390 .increment(1);
391 trace!(did = %did, resolved_pds = %resolved.pds, expected = %host,
392 "DID resolves to different PDS; skipping");
393 }
394 Err(e) => {
395 metrics::counter!("lightrail_backfill_did_rejected_total", "reason" => "resolution_failed")
396 .increment(1);
397 trace!(did = %did, error = %e,
398 "DID resolution failed during backfill validation; skipping");
399 }
400 };
401 }
402 valid
403}
404
405// ---------------------------------------------------------------------------
406// Storage commit
407// ---------------------------------------------------------------------------
408
409type DidWithPds = (Did<'static>, Arc<Url>, PublicKey<'static>);
410
411/// Insert newly-seen DIDs into the repo table and persist the backfill cursor.
412///
413/// Active accounts are returned for the caller to push into the discovery
414/// queue. Non-active accounts have their status written directly (no resync
415/// needed). When `authoritative` is false (relay backfill), non-active status
416/// is only written for accounts that have no existing Active record — we don't
417/// overwrite a locally-Active account with stale relay data.
418///
419/// Returns `(active_count, inactive_count, new_active_items)`.
420fn persist_page(
421 db: &DbRef,
422 host: &Host,
423 items: Vec<(Did<'static>, Arc<Url>, PublicKey<'static>, RepoState)>,
424 progress_cursor: String,
425 authoritative: bool,
426) -> Result<(u64, u64, Vec<DidWithPds>)> {
427 let mut new_active: Vec<DidWithPds> = Vec::new();
428 let mut inactive_count: u64 = 0;
429 for (did, pds, pubkey, repo_state) in items {
430 if let Some(deactiveated_account_status) = repo_state.to_account_inactive() {
431 if write_inactive_status(
432 db,
433 &did,
434 deactiveated_account_status,
435 repo_state,
436 authoritative,
437 )? {
438 inactive_count += 1;
439 }
440 } else {
441 let newly_inserted = storage::repo::ensure_repo(db, &did)?;
442 if newly_inserted {
443 db.stats.repos_queued_total.fetch_add(1, Ordering::Relaxed);
444 new_active.push((did, pds, pubkey));
445 }
446 }
447 }
448 // Persist progress before advancing so a crash during the next
449 // page restarts from here, not the beginning.
450 set(
451 db,
452 host,
453 &BackfillProgress {
454 cursor: progress_cursor,
455 completed_at: None,
456 },
457 )?;
458 Ok((new_active.len() as u64, inactive_count, new_active))
459}
460
461/// Write a non-active account status from a listRepos entry.
462///
463/// Returns `true` if the status was written, `false` if skipped.
464///
465/// When `authoritative` is true (deep crawl — PDS confirmed via identity
466/// resolution), the status is always written. When false (relay backfill),
467/// we only write if the existing local status is not Active, to avoid
468/// overwriting with stale data (e.g. an account that migrated off this PDS
469/// and shows as deactivated on the old host).
470fn write_inactive_status(
471 db: &DbRef,
472 did: &Did<'_>,
473 status: AccountStatus,
474 state: RepoState,
475 authoritative: bool,
476) -> Result<bool> {
477 let existing = storage::repo::get_status(db, did)?;
478 match existing {
479 Some(ref existing_status) if existing_status.is_active() && !authoritative => {
480 debug!(
481 did = %did,
482 listed_status = status.as_str(),
483 "skipping non-active status from non-authoritative source; local account is active"
484 );
485 return Ok(false);
486 }
487 Some(ref existing_status) if *existing_status == status => {
488 return Ok(false); // already matches
489 }
490 _ => {}
491 }
492 let info = RepoInfo {
493 state,
494 status: status.clone(),
495 error: None,
496 };
497 storage::repo::put_info(db, did, &info)?;
498 metrics::counter!(
499 "lightrail_backfill_inactive_total",
500 "status" => status.as_str()
501 )
502 .increment(1);
503 Ok(true)
504}