lightweight
com.atproto.sync.listReposByCollection
1//! DID identity resolution — maps DIDs to their home PDS endpoint and signing key.
2//!
3//! [`build_resolver`] constructs a [`Resolver`] from optional slingshot and
4//! PLC directory URLs. The four combinations are:
5//!
6//! | `slingshot_url` | `plc_url` | Behaviour |
7//! |-----------------|-------------|----------------------------------------------------|
8//! | `None` | `None` | PLC directory at `https://plc.directory` (default) |
9//! | `None` | `Some(url)` | PLC directory at `url` |
10//! | `Some(url)` | `None` | Slingshot at `url` |
11//! | `Some(url)` | `Some(fb)` | Slingshot primary, PLC directory fallback |
12//!
13//! When slingshot is active it covers both `did:plc` (via the `PlcHttp`
14//! resolution step) and `did:web` (via the `PdsResolveDid` → mini-doc
15//! fallback path built into jacquard-identity). When a PLC fallback is also
16//! configured, any error from the primary resolver retries against it.
17
18use jacquard_common::IntoStatic;
19use std::num::NonZeroU32;
20use std::sync::Arc;
21use std::time::Duration;
22
23use dashmap::DashMap;
24use governor::{DefaultDirectRateLimiter, Quota, RateLimiter};
25use jacquard_common::types::crypto::PublicKey;
26use jacquard_common::types::string::Did;
27use jacquard_common::url::Url;
28use jacquard_identity::JacquardResolver;
29use jacquard_identity::resolver::{
30 IdentityError as JacquardIdentityError, IdentityErrorKind as JacquardErrorKind,
31 IdentityResolver, PlcSource, ResolverOptions,
32};
33use mini_moka::sync::Cache;
34use tracing::{debug, info, warn};
35
36use crate::util::TokenExt;
37
38#[derive(Debug, thiserror::Error)]
39pub enum IdentityError {
40 #[error("jacquard: {0}")]
41 JacquardIdentity(Box<JacquardIdentityError>),
42 #[error("no pds endpoint for: {0}")]
43 MissingPdsEndpoint(String),
44 #[error("bad signing key: {0}")]
45 BadSigningKey(String),
46 #[error("identity: {0}")]
47 Identity(String),
48}
49
50impl From<JacquardIdentityError> for IdentityError {
51 fn from(je: JacquardIdentityError) -> Self {
52 Self::JacquardIdentity(Box::new(je))
53 }
54}
55
56/// Resolved identity for a DID: the home PDS endpoint and ATProto signing key.
57///
58/// Stored in the cache as `Arc<CachedIdentity>` so callers can borrow fields
59/// without copying. The PDS URL is an interned `Arc<Url>` (see [`UrlInterner`]);
60/// the public key bytes are owned per-entry and borrowed through the outer Arc.
61pub struct CachedIdentity {
62 pub pds: Arc<Url>,
63 pub pubkey: PublicKey<'static>,
64}
65
66const DEFAULT_PLC_URL: &str = "https://plc.directory";
67
68/// How long a cached DID→PDS mapping is considered fresh.
69///
70/// PDS migrations are rare; the firehose delivers `#identity` events when
71/// they happen, so proactive [`Resolver::invalidate_did`] handles updates
72/// before this TTL expires in practice.
73const CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60);
74
75/// How long a failed resolution is cached before retrying.
76///
77/// Short enough that transient failures recover quickly, long enough to avoid
78/// hammering the identity service for consistently-broken DIDs.
79const NEGATIVE_CACHE_TTL: Duration = Duration::from_secs(5 * 60);
80
81/// Maximum retries per identity backend on HTTP 429 responses.
82const RATE_LIMIT_RETRIES: u32 = 3;
83
84/// Initial delay between rate-limit retries; doubles each attempt (500ms → 1s → 2s).
85const RATE_LIMIT_BASE_DELAY: Duration = Duration::from_millis(500);
86
87/// Check whether a jacquard identity error is an HTTP 429 rate-limit response.
88fn is_rate_limited(err: &JacquardIdentityError) -> bool {
89 matches!(
90 err.kind(),
91 JacquardErrorKind::HttpStatus(status) if status.as_u16() == 429
92 )
93}
94
95// ---------------------------------------------------------------------------
96// Resolution throttle
97// ---------------------------------------------------------------------------
98
99/// Global identity-resolution rate limiter.
100///
101/// Backfill tasks call [`Resolver::throttle_wait`] to stay within the budget.
102/// All callers (firehose, resync, backfill) consume tokens on cache-miss
103/// resolutions via an internal notify, so firehose activity naturally reduces
104/// the budget available to backfill.
105struct IdentityThrottle {
106 limiter: DefaultDirectRateLimiter,
107 token_interval: Duration,
108}
109
110impl IdentityThrottle {
111 fn new(rate: NonZeroU32) -> Self {
112 Self {
113 limiter: RateLimiter::direct(Quota::per_second(rate)),
114 token_interval: Duration::from_secs(1) / rate.get(),
115 }
116 }
117}
118
119// ---------------------------------------------------------------------------
120// Interned PDS URL cache
121// ---------------------------------------------------------------------------
122
123/// Maximum number of distinct PDS URLs to intern.
124///
125/// The real network has ~2600 PDS hosts online; this cap is 4× that. Beyond
126/// it, `intern` returns a fresh `Arc` rather than a shared one, keeping the
127/// table bounded against identities with adversarially unique PDS URLs.
128const MAX_INTERN_ENTRIES: usize = 10_000;
129
130/// Interns `Url` values so that the ~2600 distinct PDS endpoints across the
131/// network are each allocated once, regardless of how many DIDs map to them.
132struct UrlInterner {
133 map: DashMap<String, Arc<Url>>,
134}
135
136impl UrlInterner {
137 fn new() -> Self {
138 Self {
139 map: DashMap::new(),
140 }
141 }
142
143 fn intern(&self, url: Url) -> Arc<Url> {
144 let key = url.as_str().to_owned();
145 if let Some(existing) = self.map.get(&key) {
146 return Arc::clone(&existing);
147 }
148 // Past the cap, skip interning to bound memory regardless of
149 // how many distinct PDS URLs external parties claim.
150 if self.map.len() >= MAX_INTERN_ENTRIES {
151 return Arc::new(url);
152 }
153 let arc = Arc::new(url);
154 Arc::clone(
155 self.map
156 .entry(key)
157 .or_insert_with(|| Arc::clone(&arc))
158 .value(),
159 )
160 }
161}
162
163/// Capacity-bounded, TTL-expiring cache from DID strings to resolved identities.
164///
165/// Successful resolutions are cached for [`CACHE_TTL`] (24 h). Failed
166/// resolutions are negative-cached for [`NEGATIVE_CACHE_TTL`] (5 min) so that
167/// consistently-broken DIDs don't hammer the identity service on every event.
168struct IdentityCache {
169 entries: Cache<Did<'static>, Arc<CachedIdentity>>,
170 /// DIDs whose last resolution failed. Presence means "don't retry yet".
171 negative: Cache<Did<'static>, ()>,
172 interner: UrlInterner,
173}
174
175impl IdentityCache {
176 fn new(capacity: u64) -> Self {
177 Self {
178 entries: Cache::builder()
179 .max_capacity(capacity)
180 .time_to_live(CACHE_TTL)
181 .build(),
182 negative: Cache::builder()
183 .max_capacity(capacity / 10)
184 .time_to_live(NEGATIVE_CACHE_TTL)
185 .build(),
186 interner: UrlInterner::new(),
187 }
188 }
189
190 fn get(&self, did: &Did<'_>) -> Option<Arc<CachedIdentity>> {
191 self.entries.get(&did.clone().into_static())
192 }
193
194 /// Returns `true` if this DID recently failed resolution.
195 fn is_negative(&self, did: &Did<'_>) -> bool {
196 self.negative.contains_key(&did.clone().into_static())
197 }
198
199 fn insert(
200 &self,
201 did: Did<'static>,
202 pds: Url,
203 pubkey: PublicKey<'static>,
204 ) -> Arc<CachedIdentity> {
205 let pds = self.interner.intern(pds);
206 let identity = Arc::new(CachedIdentity { pds, pubkey });
207 self.negative.invalidate(&did);
208 self.entries.insert(did, Arc::clone(&identity));
209 identity
210 }
211
212 fn insert_negative(&self, did: &Did<'_>) {
213 self.negative.insert(did.clone().into_static(), ());
214 }
215
216 fn invalidate(&self, did: &Did<'_>) {
217 let owned = did.clone().into_static();
218 self.entries.invalidate(&owned);
219 self.negative.invalidate(&owned);
220 }
221}
222
223// ---------------------------------------------------------------------------
224// Public resolver
225// ---------------------------------------------------------------------------
226
227/// DID identity resolver with a DID→identity cache and optional fallback.
228///
229/// Wrap in [`Arc`] to share across tasks; all cache operations use interior
230/// mutability so `&self` is sufficient everywhere.
231pub struct Resolver {
232 primary: JacquardResolver,
233 fallback: Option<JacquardResolver>,
234 cache: IdentityCache,
235 /// Cancellation token for cooperative shutdown.
236 token: tokio_util::sync::CancellationToken,
237 /// Optional global resolution rate limit.
238 throttle: Option<IdentityThrottle>,
239}
240
241impl Resolver {
242 /// Wait for a resolution token (backfill / resync).
243 ///
244 /// Blocks until a token is available or the cancellation token fires.
245 /// No-op when no throttle is configured.
246 async fn throttle_wait(&self) {
247 let Some(t) = &self.throttle else { return };
248 while t.limiter.check().is_err() {
249 if !self.token.sleep(t.token_interval).await {
250 return; // shutting down
251 }
252 }
253 }
254
255 /// Try to consume a resolution token without waiting (firehose).
256 ///
257 /// Subtracts from the shared budget so backfill/resync slow down when the
258 /// firehose is doing lots of resolutions. No-op if the rate is already
259 /// exceeded or no throttle is configured.
260 fn throttle_notify(&self) {
261 if let Some(t) = &self.throttle {
262 let _ = t.limiter.check();
263 }
264 }
265
266 /// Resolve the PDS endpoint and ATProto signing key for `did`.
267 ///
268 /// Returns a cached [`CachedIdentity`] on cache hit; otherwise waits for
269 /// a throttle token and resolves via the network. Used by backfill and
270 /// resync paths.
271 pub async fn resolve(&self, did: &Did<'_>) -> Result<Arc<CachedIdentity>, IdentityError> {
272 self.resolve_inner(did, false).await
273 }
274
275 /// Like [`resolve`] but never waits on the throttle — only notifies it.
276 ///
277 /// Used by the firehose path where latency matters. Cache-miss resolutions
278 /// still subtract from the shared budget so backfill/resync slow down.
279 pub async fn resolve_firehose(
280 &self,
281 did: &Did<'_>,
282 ) -> Result<Arc<CachedIdentity>, IdentityError> {
283 self.resolve_inner(did, true).await
284 }
285
286 async fn resolve_inner(
287 &self,
288 did: &Did<'_>,
289 firehose: bool,
290 ) -> Result<Arc<CachedIdentity>, IdentityError> {
291 if let Some(cached) = self.cache.get(did) {
292 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "hit")
293 .increment(1);
294 return Ok(cached);
295 }
296
297 if self.cache.is_negative(did) {
298 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "negative_hit")
299 .increment(1);
300 return Err(IdentityError::Identity(format!(
301 "recently failed for {}",
302 did.as_str()
303 )));
304 }
305
306 if firehose {
307 self.throttle_notify();
308 } else {
309 self.throttle_wait().await;
310 }
311
312 match self.resolve_uncached(did).await {
313 Ok((pds, pubkey)) => {
314 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "miss")
315 .increment(1);
316 Ok(self.cache.insert(did.clone().into_static(), pds, pubkey))
317 }
318 Err(e) => {
319 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "error")
320 .increment(1);
321 self.cache.insert_negative(did);
322 Err(e)
323 }
324 }
325 }
326
327 /// Look up `did` in the cache without making network calls.
328 /// Returns `None` on cache miss.
329 pub fn resolve_cached(&self, did: &Did<'_>) -> Option<Arc<CachedIdentity>> {
330 self.cache.get(did)
331 }
332
333 /// Insert a resolved identity into the cache without a network call.
334 ///
335 /// Used by the dispatcher to warm the cache for discovery-queue items
336 /// whose identity was already resolved during backfill. This avoids a
337 /// redundant network resolution in the resync worker.
338 pub fn insert_cached(
339 &self,
340 did: Did<'static>,
341 pds: Url,
342 pubkey: PublicKey<'static>,
343 ) -> Arc<CachedIdentity> {
344 self.cache.insert(did, pds, pubkey)
345 }
346
347 /// Evict `did` from the identity cache.
348 ///
349 /// Called when a `#identity` firehose event is received, after all
350 /// preceding commits for the same DID have been processed, so the next
351 /// resolution fetches the updated key and PDS endpoint.
352 pub fn invalidate_did(&self, did: &Did<'_>) {
353 self.cache.invalidate(did);
354 }
355
356 async fn resolve_uncached(
357 &self,
358 did: &Did<'_>,
359 ) -> Result<(Url, PublicKey<'static>), IdentityError> {
360 let doc = match &self.fallback {
361 None => self.try_backend(&self.primary, did).await,
362 Some(fb) => match self.try_backend(&self.primary, did).await {
363 ok @ Ok(_) => ok,
364 Err(primary_err) => {
365 debug!(
366 did = did.as_str(),
367 error = %primary_err,
368 "primary identity resolution failed, trying fallback"
369 );
370 self.try_backend(fb, did).await
371 }
372 },
373 }?;
374
375 let pds = doc
376 .pds_endpoint()
377 .ok_or_else(|| IdentityError::MissingPdsEndpoint(did.to_string()))?;
378
379 let pubkey = doc
380 .atproto_public_key()
381 .map_err(|e| IdentityError::BadSigningKey(e.to_string()))?
382 .ok_or_else(|| {
383 IdentityError::BadSigningKey(format!(
384 "no ATProto signing key in DID doc for {}",
385 did.as_str()
386 ))
387 })?;
388
389 Ok((pds, pubkey))
390 }
391
392 /// Try resolving a DID document against a single backend, retrying on 429.
393 ///
394 /// Non-rate-limit errors return immediately. After exhausting retries the
395 /// final 429 error is returned so the caller can fall through to a fallback
396 /// backend or propagate the error. Sleeps are cancellation-aware when a
397 /// token has been provided via [`Resolver::set_cancellation_token`].
398 async fn try_backend(
399 &self,
400 backend: &JacquardResolver,
401 did: &Did<'_>,
402 ) -> std::result::Result<
403 jacquard_common::types::did_doc::DidDocument<'static>,
404 JacquardIdentityError,
405 > {
406 for attempt in 0..=RATE_LIMIT_RETRIES {
407 match backend.resolve_did_doc_owned(did).await {
408 Ok(doc) => return Ok(doc),
409 Err(e) if is_rate_limited(&e) && attempt < RATE_LIMIT_RETRIES => {
410 metrics::counter!("lightrail_identity_rate_limited_total").increment(1);
411 let delay = RATE_LIMIT_BASE_DELAY * 2u32.pow(attempt);
412 metrics::counter!("lightrail_identity_rate_limit_backoff_ms")
413 .increment(delay.as_millis() as u64);
414 debug!(
415 did = did.as_str(),
416 attempt = attempt + 1,
417 delay_ms = delay.as_millis() as u64,
418 "identity service rate-limited, retrying"
419 );
420 if !self.token.sleep(delay).await {
421 return Err(e); // shutting down
422 }
423 }
424 Err(e) => {
425 if is_rate_limited(&e) {
426 metrics::counter!("lightrail_identity_rate_limited_total").increment(1);
427 warn!(
428 did = did.as_str(),
429 retries = RATE_LIMIT_RETRIES,
430 "identity resolution rate limit persisted after retries"
431 );
432 }
433 return Err(e);
434 }
435 }
436 }
437 unreachable!()
438 }
439}
440
441// ---------------------------------------------------------------------------
442// Construction
443// ---------------------------------------------------------------------------
444
445/// Build the identity resolver from the provided source URLs.
446///
447/// Emits an `INFO` trace on startup describing the active resolution behaviour.
448pub fn build_resolver(
449 slingshot_url: Option<Url>,
450 plc_url: Option<Url>,
451 cache_size: u64,
452 token: tokio_util::sync::CancellationToken,
453 identity_qps: Option<NonZeroU32>,
454) -> Resolver {
455 let cache = IdentityCache::new(cache_size);
456 let throttle = identity_qps.map(|rate| {
457 info!(rate = rate.get(), "identity resolution throttle enabled");
458 IdentityThrottle::new(rate)
459 });
460
461 match (slingshot_url, plc_url) {
462 (None, plc) => {
463 let base = plc.unwrap_or_else(|| DEFAULT_PLC_URL.parse().expect("valid url"));
464 info!(
465 source = "plc",
466 url = %base,
467 "did resolution: plc directory; \
468 did:plc via plc directory, did:web via https well-known"
469 );
470 Resolver {
471 primary: make_resolver(PlcSource::PlcDirectory { base }),
472 fallback: None,
473 cache,
474 token: token.clone(),
475 throttle,
476 }
477 }
478 (Some(slingshot), None) => {
479 info!(
480 source = "slingshot",
481 url = %slingshot,
482 "did resolution: slingshot; \
483 did:plc via slingshot, \
484 did:web via https well-known then slingshot mini-doc"
485 );
486 Resolver {
487 primary: make_resolver(PlcSource::Slingshot { base: slingshot }),
488 fallback: None,
489 cache,
490 token: token.clone(),
491 throttle,
492 }
493 }
494 (Some(slingshot), Some(plc)) => {
495 info!(
496 slingshot_url = %slingshot,
497 plc_fallback_url = %plc,
498 "did resolution: slingshot primary with plc fallback; \
499 did:plc via slingshot (plc on error), \
500 did:web via https well-known then slingshot mini-doc"
501 );
502 Resolver {
503 primary: make_resolver(PlcSource::Slingshot { base: slingshot }),
504 fallback: Some(make_resolver(PlcSource::PlcDirectory { base: plc })),
505 cache,
506 token,
507 throttle,
508 }
509 }
510 }
511}
512
513fn make_resolver(plc_source: PlcSource) -> JacquardResolver {
514 let opts = ResolverOptions {
515 plc_source,
516 ..Default::default()
517 };
518 let client = reqwest::Client::builder()
519 .user_agent(concat!("microcosm lightrail/v", env!("CARGO_PKG_VERSION")))
520 .build()
521 .expect("failed to build resolver HTTP client");
522 JacquardResolver::new(client, opts)
523}