lightweight
com.atproto.sync.listReposByCollection
1//! Establish ground truth about a repository's collections.
2//!
3//! "Resync" covers three scenarios:
4//!
5//! - **Backfill**: first-time indexing of a repo found via `listRepos`; diff
6//! against an empty set.
7//! - **`#sync` event**: the PDS declared a repo discontinuity; diff against
8//! the current index entries.
9//! - **Detected discontinuity**: a firehose commit's `prev` CID doesn't match
10//! our record; diff against the current index entries.
11//!
12//! Each submodule implements one approach to fetching the ground-truth
13//! collection list as `fetch_collections(client, base, did: Did<'_>)
14//! -> std::result::Result<RepoSnapshot, GetCollectionsError>`.
15//! This uniform signature makes it straightforward to test approaches in
16//! isolation and to add new ones later.
17
18pub mod describe_repo;
19pub mod dispatcher;
20pub mod get_repo;
21
22use std::collections::BTreeSet;
23use std::sync::Arc;
24use std::sync::atomic::Ordering;
25use std::time::Duration;
26
27use cid::Cid as RawCid;
28use jacquard_common::{
29 IntoStatic,
30 http_client::{HttpClient, HttpClientExt},
31 types::string::{Did, Nsid, Tid},
32};
33use tracing::{error, info};
34
35use crate::storage::DbRef;
36use crate::storage::repo::{AccountStatus, RepoInfo, RepoPrev, RepoState};
37use crate::util::TokenExt;
38
39pub use dispatcher::DispatcherConfig;
40
41type Result<T> = std::result::Result<T, ResyncError>;
42
43/// Errors that can occur during a resync operation.
44#[derive(Debug, thiserror::Error)]
45pub enum ResyncError {
46 #[error("identity resolution failed: {0}")]
47 Identity(#[from] crate::identity::IdentityError),
48 #[error("collection fetch failed: {0}")]
49 Fetch(GetCollectionsError),
50 #[error(transparent)]
51 Storage(#[from] crate::storage::StorageError),
52 /// The PDS returned HTTP 429. Carries the PDS URL for host-level cooldown.
53 #[error("rate limited by {0}")]
54 RateLimited(Arc<jacquard_common::url::Url>),
55 #[error("invalid DID in queue: {0}")]
56 InvalidDid(String),
57 #[error("blocking storage task panicked: {0}")]
58 TaskPanic(String),
59 /// The PDS returned a definitive "repo not found" for this DID.
60 /// The repo state has been updated; the caller may schedule a slow retry.
61 #[error("repo not found on PDS")]
62 RepoNotFound,
63 #[error("externally cancelled")]
64 Cancelled,
65 #[error("wtf, {0}")]
66 Wtf(String),
67}
68
69/// A snapshot of a repository's state as observed during a resync.
70#[derive(Debug, PartialEq)]
71pub struct RepoSnapshot {
72 /// Sorted list of collection NSIDs present in the repository.
73 pub collections: BTreeSet<Nsid<'static>>, // TODO btreeset
74 /// Revision TID of the latest commit.
75 pub rev: Tid,
76 /// MST root CID
77 pub data: RawCid,
78}
79
80/// Why fetching a repository's collection list failed.
81#[derive(Debug, thiserror::Error)]
82pub enum GetCollectionsError {
83 /// No repository exists for this DID on the PDS.
84 #[error("repo not found")]
85 RepoNotFound,
86 /// The repo exists but its current state makes it inaccessible.
87 #[error("repo {0}")]
88 RepoGone(RepoGoneReason),
89 /// The PDS rate-limited this request (HTTP 429).
90 #[error("rate limited by {0} (HTTP 429)")]
91 RateLimited(String),
92 /// Network or HTTP failure; may be transient.
93 #[error("request failed: {0}")]
94 Request(String),
95 /// The server returned data that could not be parsed or is structurally invalid.
96 #[error("invalid data: {0}")]
97 InvalidData(String),
98 /// The server returned an unrecognised XRPC error code.
99 #[error("unexpected XRPC error: {0}")]
100 UnexpectedXrpc(String),
101 /// The repo is likely tiny, intentionally fall through to sync.getRepo
102 #[error("should getRepo because it's likely tiny")]
103 GetSmallRepo,
104 /// The request was externally cancelled
105 #[error("externally cancelled")]
106 Cancelled,
107 #[error("wtf!?, {0}")]
108 Wtf(String),
109}
110
111/// The specific reason a repository is inaccessible.
112#[derive(Debug, thiserror::Error)]
113pub enum RepoGoneReason {
114 #[error("taken down")]
115 Takendown,
116 #[error("suspended")]
117 Suspended,
118 #[error("deactivated")]
119 Deactivated,
120}
121
122/// Bundle of useful stuff from the app
123///
124/// should probably become a proper AppState
125#[derive(Clone)]
126pub struct AppStuff {
127 resolver: Arc<crate::identity::Resolver>,
128 client: crate::http::ThrottledClient,
129 db: DbRef,
130 token: tokio_util::sync::CancellationToken,
131}
132
133/// Establish the current collection set for `did` and write it to the index.
134///
135/// Tries `describeRepo` first (cheap). Falls back to a full `getRepo` CAR walk
136/// if the PDS doesn't support it or returns an empty collection list.
137///
138/// Returns `Ok(())` silently if the repo is not found or inaccessible;
139/// the caller decides whether to retry on transient errors.
140pub async fn index_repo(
141 AppStuff {
142 resolver,
143 client,
144 db,
145 token,
146 }: AppStuff,
147 did: Did<'_>,
148 describe_timeout: Duration,
149 get_repo_timeout: Duration,
150 force_get_repo: bool,
151) -> Result<()> {
152 // Own the DID for the duration of the function so we can move it into
153 // spawn_blocking closures (which require 'static captures).
154 let did: Did<'static> = did.into_static();
155
156 let Some(resolved) = token.run(resolver.resolve(&did)).await else {
157 return Err(ResyncError::Cancelled);
158 };
159 let resolved = resolved?;
160 let base = &*resolved.pds;
161 let Some(_) = base.host_str() else {
162 error!(did = %did, host = %resolved.pds, "index_repo received a URL without a host???");
163 return Err(ResyncError::Wtf(format!(
164 "URL without host?? {:?}",
165 resolved.pds
166 )));
167 };
168
169 let repo_snapshot = match fetch_collections(
170 &client,
171 base,
172 did.clone(),
173 token,
174 describe_timeout,
175 get_repo_timeout,
176 force_get_repo,
177 )
178 .await
179 {
180 Ok(s) => s,
181 Err(GetCollectionsError::RepoNotFound) => {
182 info!(did = %did.as_str(), "repo not found on PDS; marking state");
183 let db = db.clone();
184 tokio::task::spawn_blocking(move || mark_not_found(&db, &did))
185 .await
186 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??;
187 return Err(ResyncError::RepoNotFound);
188 }
189 Err(GetCollectionsError::RepoGone(reason)) => {
190 let (state, status) = match reason {
191 RepoGoneReason::Takendown => (RepoState::Takendown, AccountStatus::Takendown),
192 RepoGoneReason::Suspended => (RepoState::Suspended, AccountStatus::Suspended),
193 RepoGoneReason::Deactivated => (RepoState::Deactivated, AccountStatus::Deactivated),
194 };
195 info!(did = %did.as_str(), status = status.as_str(), "repo gone; updating account status");
196 let db = db.clone();
197 tokio::task::spawn_blocking(move || update_account_status(&db, &did, state, status))
198 .await
199 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??;
200 return Ok(());
201 }
202 Err(GetCollectionsError::RateLimited(_)) => {
203 return Err(ResyncError::RateLimited(resolved.pds.clone()));
204 }
205 Err(e) => return Err(ResyncError::Fetch(e)),
206 };
207
208 let db_w = db.clone();
209 let collections = repo_snapshot.collections;
210 let rev = repo_snapshot.rev;
211 let prev_data = repo_snapshot.data.to_bytes();
212 let (n_inserted, n_removed) = tokio::task::spawn_blocking(move || {
213 crate::storage::repo::put_prev(&db_w, &did, &RepoPrev { rev, prev_data })?;
214 crate::storage::collection_index::sync_collections(&db_w, &did, &collections)
215 })
216 .await
217 .map_err(|e| ResyncError::TaskPanic(e.to_string()))??;
218 if n_inserted > 0 {
219 db.stats
220 .collection_births_resync
221 .fetch_add(n_inserted as u64, Ordering::Relaxed);
222 metrics::counter!("lightrail_collection_births_total", "source" => "resync")
223 .increment(n_inserted as u64);
224 }
225 if n_removed > 0 {
226 db.stats
227 .collection_deaths_resync
228 .fetch_add(n_removed as u64, Ordering::Relaxed);
229 metrics::counter!("lightrail_collection_deaths_total", "source" => "resync")
230 .increment(n_removed as u64);
231 }
232 Ok(())
233}
234
235/// Update the stored `RepoState` and `AccountStatus` for a repo that the PDS
236/// reports as gone (takendown, suspended, or deactivated).
237///
238/// If no record exists yet (e.g. a backfill race), one is created. The `error`
239/// field is cleared since the gone state is fully described by `state`/`status`.
240fn update_account_status(
241 db: &DbRef,
242 did: &Did<'_>,
243 state: RepoState,
244 status: AccountStatus,
245) -> Result<()> {
246 let new_info = match crate::storage::repo::get(db, did)? {
247 Some((mut info, _)) => {
248 info.state = state;
249 info.status = status;
250 info.error = None;
251 info
252 }
253 None => RepoInfo {
254 state,
255 status,
256 error: None,
257 },
258 };
259 crate::storage::repo::put_info(db, did, &new_info)?;
260 Ok(())
261}
262
263/// Set `RepoState::NotFound` for a DID whose PDS reports no repo.
264///
265/// Preserves the existing `AccountStatus` if a record exists (we don't know
266/// the true account status from a not-found response). The `error` field is
267/// cleared since the state is fully described by `RepoState::NotFound`.
268fn mark_not_found(db: &DbRef, did: &Did<'_>) -> Result<()> {
269 let new_info = match crate::storage::repo::get(db, did)? {
270 Some((mut info, _)) => {
271 info.state = RepoState::NotFound;
272 info.error = None;
273 info
274 }
275 None => RepoInfo {
276 state: RepoState::NotFound,
277 status: AccountStatus::Active,
278 error: None,
279 },
280 };
281 crate::storage::repo::put_info(db, did, &new_info)?;
282 Ok(())
283}
284
285/// Try each approach in preference order, falling back to `get_repo` selectively.
286///
287/// Falls back to the `get_repo` CAR walk when `describe_repo`:
288/// - returns an empty collection list (possible PDS bug or unpaginated large repo),
289/// - fails with any error other than rate-limiting or a definitive gone/not-found.
290///
291/// A 429 rate-limit response is propagated immediately rather than escalated to a
292/// heavier `getRepo` request. Definitive `RepoNotFound`/`RepoGone` results are also
293/// propagated, since `getRepo` would return the same answer.
294async fn fetch_collections<C>(
295 client: &C,
296 base: &jacquard_common::url::Url,
297 did: Did<'_>,
298 token: tokio_util::sync::CancellationToken,
299 describe_timeout: Duration,
300 get_repo_timeout: Duration,
301 force_get_repo: bool,
302) -> std::result::Result<RepoSnapshot, GetCollectionsError>
303where
304 C: HttpClient + HttpClientExt + Sync,
305{
306 if !force_get_repo {
307 let Some(describe_result) = token
308 .timeout(
309 describe_timeout,
310 describe_repo::fetch_collections(client, base, did.clone()),
311 )
312 .await
313 else {
314 return Err(GetCollectionsError::Cancelled);
315 };
316
317 match describe_result {
318 Ok(Ok(snapshot)) => {
319 metrics::counter!("lightrail_resync_fetch_total", "source" => "describe_repo")
320 .increment(1);
321 return Ok(snapshot);
322 }
323 // Rate-limited: don't escalate to a heavier getRepo request.
324 Ok(Err(e @ GetCollectionsError::RateLimited(_))) => return Err(e),
325 // Definitively gone: getRepo would return the same answer.
326 Ok(Err(e @ (GetCollectionsError::RepoNotFound | GetCollectionsError::RepoGone(_)))) => {
327 return Err(e);
328 }
329 // Any other failure or timeout: fall through. The PDS may not implement
330 // describeRepo, or may have a bug this endpoint doesn't hit.
331 Ok(Err(GetCollectionsError::GetSmallRepo)) | Ok(Err(_)) | Err(_) => {}
332 }
333 }
334
335 let res = token
336 .timeout(
337 get_repo_timeout,
338 get_repo::fetch_collections(client, base, did),
339 )
340 .await
341 .ok_or(GetCollectionsError::Cancelled)?
342 .unwrap_or_else(|_| {
343 Err(GetCollectionsError::Request(
344 "getRepo timed out".to_string(),
345 ))
346 })?;
347 metrics::counter!("lightrail_resync_fetch_total", "source" => "get_repo").increment(1);
348 Ok(res)
349}