lightweight com.atproto.sync.listReposByCollection
at main 523 lines 19 kB view raw
1//! DID identity resolution — maps DIDs to their home PDS endpoint and signing key. 2//! 3//! [`build_resolver`] constructs a [`Resolver`] from optional slingshot and 4//! PLC directory URLs. The four combinations are: 5//! 6//! | `slingshot_url` | `plc_url` | Behaviour | 7//! |-----------------|-------------|----------------------------------------------------| 8//! | `None` | `None` | PLC directory at `https://plc.directory` (default) | 9//! | `None` | `Some(url)` | PLC directory at `url` | 10//! | `Some(url)` | `None` | Slingshot at `url` | 11//! | `Some(url)` | `Some(fb)` | Slingshot primary, PLC directory fallback | 12//! 13//! When slingshot is active it covers both `did:plc` (via the `PlcHttp` 14//! resolution step) and `did:web` (via the `PdsResolveDid` → mini-doc 15//! fallback path built into jacquard-identity). When a PLC fallback is also 16//! configured, any error from the primary resolver retries against it. 17 18use jacquard_common::IntoStatic; 19use std::num::NonZeroU32; 20use std::sync::Arc; 21use std::time::Duration; 22 23use dashmap::DashMap; 24use governor::{DefaultDirectRateLimiter, Quota, RateLimiter}; 25use jacquard_common::types::crypto::PublicKey; 26use jacquard_common::types::string::Did; 27use jacquard_common::url::Url; 28use jacquard_identity::JacquardResolver; 29use jacquard_identity::resolver::{ 30 IdentityError as JacquardIdentityError, IdentityErrorKind as JacquardErrorKind, 31 IdentityResolver, PlcSource, ResolverOptions, 32}; 33use mini_moka::sync::Cache; 34use tracing::{debug, info, warn}; 35 36use crate::util::TokenExt; 37 38#[derive(Debug, thiserror::Error)] 39pub enum IdentityError { 40 #[error("jacquard: {0}")] 41 JacquardIdentity(Box<JacquardIdentityError>), 42 #[error("no pds endpoint for: {0}")] 43 MissingPdsEndpoint(String), 44 #[error("bad signing key: {0}")] 45 BadSigningKey(String), 46 #[error("identity: {0}")] 47 Identity(String), 48} 49 50impl From<JacquardIdentityError> for IdentityError { 51 fn from(je: JacquardIdentityError) -> Self { 52 Self::JacquardIdentity(Box::new(je)) 53 } 54} 55 56/// Resolved identity for a DID: the home PDS endpoint and ATProto signing key. 57/// 58/// Stored in the cache as `Arc<CachedIdentity>` so callers can borrow fields 59/// without copying. The PDS URL is an interned `Arc<Url>` (see [`UrlInterner`]); 60/// the public key bytes are owned per-entry and borrowed through the outer Arc. 61pub struct CachedIdentity { 62 pub pds: Arc<Url>, 63 pub pubkey: PublicKey<'static>, 64} 65 66const DEFAULT_PLC_URL: &str = "https://plc.directory"; 67 68/// How long a cached DID→PDS mapping is considered fresh. 69/// 70/// PDS migrations are rare; the firehose delivers `#identity` events when 71/// they happen, so proactive [`Resolver::invalidate_did`] handles updates 72/// before this TTL expires in practice. 73const CACHE_TTL: Duration = Duration::from_secs(24 * 60 * 60); 74 75/// How long a failed resolution is cached before retrying. 76/// 77/// Short enough that transient failures recover quickly, long enough to avoid 78/// hammering the identity service for consistently-broken DIDs. 79const NEGATIVE_CACHE_TTL: Duration = Duration::from_secs(5 * 60); 80 81/// Maximum retries per identity backend on HTTP 429 responses. 82const RATE_LIMIT_RETRIES: u32 = 3; 83 84/// Initial delay between rate-limit retries; doubles each attempt (500ms → 1s → 2s). 85const RATE_LIMIT_BASE_DELAY: Duration = Duration::from_millis(500); 86 87/// Check whether a jacquard identity error is an HTTP 429 rate-limit response. 88fn is_rate_limited(err: &JacquardIdentityError) -> bool { 89 matches!( 90 err.kind(), 91 JacquardErrorKind::HttpStatus(status) if status.as_u16() == 429 92 ) 93} 94 95// --------------------------------------------------------------------------- 96// Resolution throttle 97// --------------------------------------------------------------------------- 98 99/// Global identity-resolution rate limiter. 100/// 101/// Backfill tasks call [`Resolver::throttle_wait`] to stay within the budget. 102/// All callers (firehose, resync, backfill) consume tokens on cache-miss 103/// resolutions via an internal notify, so firehose activity naturally reduces 104/// the budget available to backfill. 105struct IdentityThrottle { 106 limiter: DefaultDirectRateLimiter, 107 token_interval: Duration, 108} 109 110impl IdentityThrottle { 111 fn new(rate: NonZeroU32) -> Self { 112 Self { 113 limiter: RateLimiter::direct(Quota::per_second(rate)), 114 token_interval: Duration::from_secs(1) / rate.get(), 115 } 116 } 117} 118 119// --------------------------------------------------------------------------- 120// Interned PDS URL cache 121// --------------------------------------------------------------------------- 122 123/// Maximum number of distinct PDS URLs to intern. 124/// 125/// The real network has ~2600 PDS hosts online; this cap is 4× that. Beyond 126/// it, `intern` returns a fresh `Arc` rather than a shared one, keeping the 127/// table bounded against identities with adversarially unique PDS URLs. 128const MAX_INTERN_ENTRIES: usize = 10_000; 129 130/// Interns `Url` values so that the ~2600 distinct PDS endpoints across the 131/// network are each allocated once, regardless of how many DIDs map to them. 132struct UrlInterner { 133 map: DashMap<String, Arc<Url>>, 134} 135 136impl UrlInterner { 137 fn new() -> Self { 138 Self { 139 map: DashMap::new(), 140 } 141 } 142 143 fn intern(&self, url: Url) -> Arc<Url> { 144 let key = url.as_str().to_owned(); 145 if let Some(existing) = self.map.get(&key) { 146 return Arc::clone(&existing); 147 } 148 // Past the cap, skip interning to bound memory regardless of 149 // how many distinct PDS URLs external parties claim. 150 if self.map.len() >= MAX_INTERN_ENTRIES { 151 return Arc::new(url); 152 } 153 let arc = Arc::new(url); 154 Arc::clone( 155 self.map 156 .entry(key) 157 .or_insert_with(|| Arc::clone(&arc)) 158 .value(), 159 ) 160 } 161} 162 163/// Capacity-bounded, TTL-expiring cache from DID strings to resolved identities. 164/// 165/// Successful resolutions are cached for [`CACHE_TTL`] (24 h). Failed 166/// resolutions are negative-cached for [`NEGATIVE_CACHE_TTL`] (5 min) so that 167/// consistently-broken DIDs don't hammer the identity service on every event. 168struct IdentityCache { 169 entries: Cache<Did<'static>, Arc<CachedIdentity>>, 170 /// DIDs whose last resolution failed. Presence means "don't retry yet". 171 negative: Cache<Did<'static>, ()>, 172 interner: UrlInterner, 173} 174 175impl IdentityCache { 176 fn new(capacity: u64) -> Self { 177 Self { 178 entries: Cache::builder() 179 .max_capacity(capacity) 180 .time_to_live(CACHE_TTL) 181 .build(), 182 negative: Cache::builder() 183 .max_capacity(capacity / 10) 184 .time_to_live(NEGATIVE_CACHE_TTL) 185 .build(), 186 interner: UrlInterner::new(), 187 } 188 } 189 190 fn get(&self, did: &Did<'_>) -> Option<Arc<CachedIdentity>> { 191 self.entries.get(&did.clone().into_static()) 192 } 193 194 /// Returns `true` if this DID recently failed resolution. 195 fn is_negative(&self, did: &Did<'_>) -> bool { 196 self.negative.contains_key(&did.clone().into_static()) 197 } 198 199 fn insert( 200 &self, 201 did: Did<'static>, 202 pds: Url, 203 pubkey: PublicKey<'static>, 204 ) -> Arc<CachedIdentity> { 205 let pds = self.interner.intern(pds); 206 let identity = Arc::new(CachedIdentity { pds, pubkey }); 207 self.negative.invalidate(&did); 208 self.entries.insert(did, Arc::clone(&identity)); 209 identity 210 } 211 212 fn insert_negative(&self, did: &Did<'_>) { 213 self.negative.insert(did.clone().into_static(), ()); 214 } 215 216 fn invalidate(&self, did: &Did<'_>) { 217 let owned = did.clone().into_static(); 218 self.entries.invalidate(&owned); 219 self.negative.invalidate(&owned); 220 } 221} 222 223// --------------------------------------------------------------------------- 224// Public resolver 225// --------------------------------------------------------------------------- 226 227/// DID identity resolver with a DID→identity cache and optional fallback. 228/// 229/// Wrap in [`Arc`] to share across tasks; all cache operations use interior 230/// mutability so `&self` is sufficient everywhere. 231pub struct Resolver { 232 primary: JacquardResolver, 233 fallback: Option<JacquardResolver>, 234 cache: IdentityCache, 235 /// Cancellation token for cooperative shutdown. 236 token: tokio_util::sync::CancellationToken, 237 /// Optional global resolution rate limit. 238 throttle: Option<IdentityThrottle>, 239} 240 241impl Resolver { 242 /// Wait for a resolution token (backfill / resync). 243 /// 244 /// Blocks until a token is available or the cancellation token fires. 245 /// No-op when no throttle is configured. 246 async fn throttle_wait(&self) { 247 let Some(t) = &self.throttle else { return }; 248 while t.limiter.check().is_err() { 249 if !self.token.sleep(t.token_interval).await { 250 return; // shutting down 251 } 252 } 253 } 254 255 /// Try to consume a resolution token without waiting (firehose). 256 /// 257 /// Subtracts from the shared budget so backfill/resync slow down when the 258 /// firehose is doing lots of resolutions. No-op if the rate is already 259 /// exceeded or no throttle is configured. 260 fn throttle_notify(&self) { 261 if let Some(t) = &self.throttle { 262 let _ = t.limiter.check(); 263 } 264 } 265 266 /// Resolve the PDS endpoint and ATProto signing key for `did`. 267 /// 268 /// Returns a cached [`CachedIdentity`] on cache hit; otherwise waits for 269 /// a throttle token and resolves via the network. Used by backfill and 270 /// resync paths. 271 pub async fn resolve(&self, did: &Did<'_>) -> Result<Arc<CachedIdentity>, IdentityError> { 272 self.resolve_inner(did, false).await 273 } 274 275 /// Like [`resolve`] but never waits on the throttle — only notifies it. 276 /// 277 /// Used by the firehose path where latency matters. Cache-miss resolutions 278 /// still subtract from the shared budget so backfill/resync slow down. 279 pub async fn resolve_firehose( 280 &self, 281 did: &Did<'_>, 282 ) -> Result<Arc<CachedIdentity>, IdentityError> { 283 self.resolve_inner(did, true).await 284 } 285 286 async fn resolve_inner( 287 &self, 288 did: &Did<'_>, 289 firehose: bool, 290 ) -> Result<Arc<CachedIdentity>, IdentityError> { 291 if let Some(cached) = self.cache.get(did) { 292 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "hit") 293 .increment(1); 294 return Ok(cached); 295 } 296 297 if self.cache.is_negative(did) { 298 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "negative_hit") 299 .increment(1); 300 return Err(IdentityError::Identity(format!( 301 "recently failed for {}", 302 did.as_str() 303 ))); 304 } 305 306 if firehose { 307 self.throttle_notify(); 308 } else { 309 self.throttle_wait().await; 310 } 311 312 match self.resolve_uncached(did).await { 313 Ok((pds, pubkey)) => { 314 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "miss") 315 .increment(1); 316 Ok(self.cache.insert(did.clone().into_static(), pds, pubkey)) 317 } 318 Err(e) => { 319 metrics::counter!("lightrail_identity_resolution_total", "outcome" => "error") 320 .increment(1); 321 self.cache.insert_negative(did); 322 Err(e) 323 } 324 } 325 } 326 327 /// Look up `did` in the cache without making network calls. 328 /// Returns `None` on cache miss. 329 pub fn resolve_cached(&self, did: &Did<'_>) -> Option<Arc<CachedIdentity>> { 330 self.cache.get(did) 331 } 332 333 /// Insert a resolved identity into the cache without a network call. 334 /// 335 /// Used by the dispatcher to warm the cache for discovery-queue items 336 /// whose identity was already resolved during backfill. This avoids a 337 /// redundant network resolution in the resync worker. 338 pub fn insert_cached( 339 &self, 340 did: Did<'static>, 341 pds: Url, 342 pubkey: PublicKey<'static>, 343 ) -> Arc<CachedIdentity> { 344 self.cache.insert(did, pds, pubkey) 345 } 346 347 /// Evict `did` from the identity cache. 348 /// 349 /// Called when a `#identity` firehose event is received, after all 350 /// preceding commits for the same DID have been processed, so the next 351 /// resolution fetches the updated key and PDS endpoint. 352 pub fn invalidate_did(&self, did: &Did<'_>) { 353 self.cache.invalidate(did); 354 } 355 356 async fn resolve_uncached( 357 &self, 358 did: &Did<'_>, 359 ) -> Result<(Url, PublicKey<'static>), IdentityError> { 360 let doc = match &self.fallback { 361 None => self.try_backend(&self.primary, did).await, 362 Some(fb) => match self.try_backend(&self.primary, did).await { 363 ok @ Ok(_) => ok, 364 Err(primary_err) => { 365 debug!( 366 did = did.as_str(), 367 error = %primary_err, 368 "primary identity resolution failed, trying fallback" 369 ); 370 self.try_backend(fb, did).await 371 } 372 }, 373 }?; 374 375 let pds = doc 376 .pds_endpoint() 377 .ok_or_else(|| IdentityError::MissingPdsEndpoint(did.to_string()))?; 378 379 let pubkey = doc 380 .atproto_public_key() 381 .map_err(|e| IdentityError::BadSigningKey(e.to_string()))? 382 .ok_or_else(|| { 383 IdentityError::BadSigningKey(format!( 384 "no ATProto signing key in DID doc for {}", 385 did.as_str() 386 )) 387 })?; 388 389 Ok((pds, pubkey)) 390 } 391 392 /// Try resolving a DID document against a single backend, retrying on 429. 393 /// 394 /// Non-rate-limit errors return immediately. After exhausting retries the 395 /// final 429 error is returned so the caller can fall through to a fallback 396 /// backend or propagate the error. Sleeps are cancellation-aware when a 397 /// token has been provided via [`Resolver::set_cancellation_token`]. 398 async fn try_backend( 399 &self, 400 backend: &JacquardResolver, 401 did: &Did<'_>, 402 ) -> std::result::Result< 403 jacquard_common::types::did_doc::DidDocument<'static>, 404 JacquardIdentityError, 405 > { 406 for attempt in 0..=RATE_LIMIT_RETRIES { 407 match backend.resolve_did_doc_owned(did).await { 408 Ok(doc) => return Ok(doc), 409 Err(e) if is_rate_limited(&e) && attempt < RATE_LIMIT_RETRIES => { 410 metrics::counter!("lightrail_identity_rate_limited_total").increment(1); 411 let delay = RATE_LIMIT_BASE_DELAY * 2u32.pow(attempt); 412 metrics::counter!("lightrail_identity_rate_limit_backoff_ms") 413 .increment(delay.as_millis() as u64); 414 debug!( 415 did = did.as_str(), 416 attempt = attempt + 1, 417 delay_ms = delay.as_millis() as u64, 418 "identity service rate-limited, retrying" 419 ); 420 if !self.token.sleep(delay).await { 421 return Err(e); // shutting down 422 } 423 } 424 Err(e) => { 425 if is_rate_limited(&e) { 426 metrics::counter!("lightrail_identity_rate_limited_total").increment(1); 427 warn!( 428 did = did.as_str(), 429 retries = RATE_LIMIT_RETRIES, 430 "identity resolution rate limit persisted after retries" 431 ); 432 } 433 return Err(e); 434 } 435 } 436 } 437 unreachable!() 438 } 439} 440 441// --------------------------------------------------------------------------- 442// Construction 443// --------------------------------------------------------------------------- 444 445/// Build the identity resolver from the provided source URLs. 446/// 447/// Emits an `INFO` trace on startup describing the active resolution behaviour. 448pub fn build_resolver( 449 slingshot_url: Option<Url>, 450 plc_url: Option<Url>, 451 cache_size: u64, 452 token: tokio_util::sync::CancellationToken, 453 identity_qps: Option<NonZeroU32>, 454) -> Resolver { 455 let cache = IdentityCache::new(cache_size); 456 let throttle = identity_qps.map(|rate| { 457 info!(rate = rate.get(), "identity resolution throttle enabled"); 458 IdentityThrottle::new(rate) 459 }); 460 461 match (slingshot_url, plc_url) { 462 (None, plc) => { 463 let base = plc.unwrap_or_else(|| DEFAULT_PLC_URL.parse().expect("valid url")); 464 info!( 465 source = "plc", 466 url = %base, 467 "did resolution: plc directory; \ 468 did:plc via plc directory, did:web via https well-known" 469 ); 470 Resolver { 471 primary: make_resolver(PlcSource::PlcDirectory { base }), 472 fallback: None, 473 cache, 474 token: token.clone(), 475 throttle, 476 } 477 } 478 (Some(slingshot), None) => { 479 info!( 480 source = "slingshot", 481 url = %slingshot, 482 "did resolution: slingshot; \ 483 did:plc via slingshot, \ 484 did:web via https well-known then slingshot mini-doc" 485 ); 486 Resolver { 487 primary: make_resolver(PlcSource::Slingshot { base: slingshot }), 488 fallback: None, 489 cache, 490 token: token.clone(), 491 throttle, 492 } 493 } 494 (Some(slingshot), Some(plc)) => { 495 info!( 496 slingshot_url = %slingshot, 497 plc_fallback_url = %plc, 498 "did resolution: slingshot primary with plc fallback; \ 499 did:plc via slingshot (plc on error), \ 500 did:web via https well-known then slingshot mini-doc" 501 ); 502 Resolver { 503 primary: make_resolver(PlcSource::Slingshot { base: slingshot }), 504 fallback: Some(make_resolver(PlcSource::PlcDirectory { base: plc })), 505 cache, 506 token, 507 throttle, 508 } 509 } 510 } 511} 512 513fn make_resolver(plc_source: PlcSource) -> JacquardResolver { 514 let opts = ResolverOptions { 515 plc_source, 516 ..Default::default() 517 }; 518 let client = reqwest::Client::builder() 519 .user_agent(concat!("microcosm lightrail/v", env!("CARGO_PKG_VERSION"))) 520 .build() 521 .expect("failed to build resolver HTTP client"); 522 JacquardResolver::new(client, opts) 523}