QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 15 kB view raw
1use crate::handle_resolution_result::HandleResolutionResult; 2use crate::handle_resolver::{HandleResolver, HandleResolverError}; 3use crate::metrics::MetricsPublisher; 4use crate::queue::{HandleResolutionWork, QueueAdapter}; 5use async_trait::async_trait; 6use std::sync::Arc; 7use std::time::{SystemTime, UNIX_EPOCH}; 8use tracing::{debug, trace}; 9 10/// Create a ProactiveRefreshResolver with default 80% threshold 11/// 12/// # Arguments 13/// * `inner` - The inner resolver to wrap 14/// * `queue` - The queue adapter for background refresh tasks 15/// * `cache_ttl` - The TTL in seconds for cache entries 16pub fn create_proactive_refresh_resolver<R, Q>( 17 inner: Arc<R>, 18 queue: Arc<Q>, 19 cache_ttl: u64, 20) -> Arc<ProactiveRefreshResolver<R, Q>> 21where 22 R: HandleResolver + Send + Sync + 'static, 23 Q: QueueAdapter<HandleResolutionWork> + Send + Sync + 'static, 24{ 25 Arc::new(ProactiveRefreshResolver::new(inner, queue, cache_ttl)) 26} 27 28/// Create a ProactiveRefreshResolver with custom threshold 29/// 30/// # Arguments 31/// * `inner` - The inner resolver to wrap 32/// * `queue` - The queue adapter for background refresh tasks 33/// * `cache_ttl` - The TTL in seconds for cache entries 34/// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh 35pub fn create_proactive_refresh_resolver_with_threshold<R, Q>( 36 inner: Arc<R>, 37 queue: Arc<Q>, 38 cache_ttl: u64, 39 threshold: f64, 40) -> Arc<ProactiveRefreshResolver<R, Q>> 41where 42 R: HandleResolver + Send + Sync + 'static, 43 Q: QueueAdapter<HandleResolutionWork> + Send + Sync + 'static, 44{ 45 Arc::new(ProactiveRefreshResolver::with_threshold( 46 inner, queue, cache_ttl, threshold, 47 )) 48} 49 50/// Wrapper struct for dynamic dispatch with proactive refresh 51/// This works with trait objects instead of concrete types 52pub struct DynProactiveRefreshResolver { 53 inner: Arc<dyn HandleResolver>, 54 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>, 55 metrics: Option<Arc<dyn MetricsPublisher>>, 56 #[allow(dead_code)] 57 cache_ttl: u64, 58 #[allow(dead_code)] 59 refresh_threshold: f64, 60} 61 62impl DynProactiveRefreshResolver { 63 pub fn new( 64 inner: Arc<dyn HandleResolver>, 65 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>, 66 cache_ttl: u64, 67 refresh_threshold: f64, 68 ) -> Self { 69 Self::with_metrics(inner, queue, None, cache_ttl, refresh_threshold) 70 } 71 72 pub fn with_metrics( 73 inner: Arc<dyn HandleResolver>, 74 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>, 75 metrics: Option<Arc<dyn MetricsPublisher>>, 76 cache_ttl: u64, 77 refresh_threshold: f64, 78 ) -> Self { 79 Self { 80 inner, 81 queue, 82 metrics, 83 cache_ttl, 84 refresh_threshold: refresh_threshold.clamp(0.0, 1.0), 85 } 86 } 87 88 async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) { 89 // If resolution took less than 5ms, it was probably a cache hit 90 if resolve_time < 5000 { 91 trace!( 92 handle = handle, 93 resolve_time_us = resolve_time, 94 "Fast resolution detected, considering proactive refresh" 95 ); 96 97 if let Some(metrics) = &self.metrics { 98 metrics.incr("proactive_refresh.cache_hit_detected").await; 99 } 100 101 // Simple heuristic: queue for refresh with some probability 102 let now = SystemTime::now() 103 .duration_since(UNIX_EPOCH) 104 .unwrap_or_default() 105 .as_secs(); 106 107 // Queue every N seconds for frequently accessed handles 108 if now % 60 == 0 { 109 let work = HandleResolutionWork { 110 handle: handle.to_string(), 111 }; 112 113 if let Err(e) = self.queue.push(work).await { 114 debug!( 115 handle = handle, 116 error = %e, 117 "Failed to queue handle for proactive refresh" 118 ); 119 if let Some(metrics) = &self.metrics { 120 metrics.incr("proactive_refresh.queue_error").await; 121 } 122 } else { 123 debug!(handle = handle, "Queued handle for proactive refresh"); 124 if let Some(metrics) = &self.metrics { 125 metrics.incr("proactive_refresh.queued").await; 126 } 127 } 128 } 129 } 130 } 131} 132 133#[async_trait] 134impl HandleResolver for DynProactiveRefreshResolver { 135 async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> { 136 // Resolve through the inner resolver 137 let (did, resolve_time) = self.inner.resolve(handle).await?; 138 139 // Check if we should queue for refresh based on resolution time 140 self.maybe_queue_for_refresh(handle, resolve_time).await; 141 142 Ok((did, resolve_time)) 143 } 144 145 async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> { 146 // Simply chain to inner resolver - no proactive refresh needed for manual sets 147 self.inner.set(handle, did).await 148 } 149} 150 151/// Create a ProactiveRefreshResolver with custom threshold using trait objects 152/// This version works with dyn HandleResolver and dyn QueueAdapter 153/// 154/// # Arguments 155/// * `inner` - The inner resolver to wrap 156/// * `queue` - The queue adapter for background refresh tasks 157/// * `cache_ttl` - The TTL in seconds for cache entries 158/// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh 159pub fn create_proactive_refresh_resolver_dyn( 160 inner: Arc<dyn HandleResolver>, 161 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>, 162 cache_ttl: u64, 163 threshold: f64, 164) -> Arc<dyn HandleResolver> { 165 Arc::new(DynProactiveRefreshResolver::new( 166 inner, queue, cache_ttl, threshold, 167 )) 168} 169 170/// Create a ProactiveRefreshResolver with metrics support 171pub fn create_proactive_refresh_resolver_with_metrics( 172 inner: Arc<dyn HandleResolver>, 173 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>, 174 metrics: Arc<dyn MetricsPublisher>, 175 cache_ttl: u64, 176 threshold: f64, 177) -> Arc<dyn HandleResolver> { 178 Arc::new(DynProactiveRefreshResolver::with_metrics( 179 inner, 180 queue, 181 Some(metrics), 182 cache_ttl, 183 threshold, 184 )) 185} 186 187/// A handle resolver that proactively refreshes cache entries when they reach 188/// a certain staleness threshold (default 80% of TTL). 189/// 190/// This resolver wraps another resolver and checks successful resolutions from cache. 191/// When a cached entry has lived for more than the threshold percentage of its TTL, 192/// it queues the handle for background refresh to keep the cache warm. 193/// 194/// Note: Due to the current trait design, this implementation uses the resolution time 195/// as a heuristic. When resolve_time is 0 (instant cache hit), it may queue for refresh. 196/// For full functionality, the trait would need to expose cache timestamps. 197pub struct ProactiveRefreshResolver<R: HandleResolver, Q: QueueAdapter<HandleResolutionWork>> { 198 inner: Arc<R>, 199 queue: Arc<Q>, 200 /// TTL in seconds for cache entries 201 cache_ttl: u64, 202 /// Threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh 203 /// Default is 0.8 (80%) 204 refresh_threshold: f64, 205} 206 207impl<R: HandleResolver, Q: QueueAdapter<HandleResolutionWork>> ProactiveRefreshResolver<R, Q> { 208 pub fn new(inner: Arc<R>, queue: Arc<Q>, cache_ttl: u64) -> Self { 209 Self::with_threshold(inner, queue, cache_ttl, 0.8) 210 } 211 212 pub fn with_threshold( 213 inner: Arc<R>, 214 queue: Arc<Q>, 215 cache_ttl: u64, 216 refresh_threshold: f64, 217 ) -> Self { 218 Self { 219 inner, 220 queue, 221 cache_ttl, 222 refresh_threshold: refresh_threshold.clamp(0.0, 1.0), 223 } 224 } 225 226 /// Check if a cached entry needs proactive refresh based on its age 227 #[allow(dead_code)] 228 fn needs_refresh(&self, result: &HandleResolutionResult) -> bool { 229 let now = SystemTime::now() 230 .duration_since(UNIX_EPOCH) 231 .unwrap_or_default() 232 .as_secs(); 233 234 let age = now.saturating_sub(result.timestamp); 235 let threshold = (self.cache_ttl as f64 * self.refresh_threshold) as u64; 236 237 let needs_refresh = age >= threshold; 238 239 if needs_refresh { 240 debug!( 241 handle = ?result.to_did(), 242 age_seconds = age, 243 threshold_seconds = threshold, 244 cache_ttl = self.cache_ttl, 245 "Cache entry needs proactive refresh" 246 ); 247 } else { 248 trace!( 249 handle = ?result.to_did(), 250 age_seconds = age, 251 threshold_seconds = threshold, 252 "Cache entry still fresh" 253 ); 254 } 255 256 needs_refresh 257 } 258 259 /// Queue a handle for background refresh 260 async fn queue_for_refresh(&self, handle: &str) { 261 let work = HandleResolutionWork { 262 handle: handle.to_string(), 263 }; 264 265 match self.queue.push(work).await { 266 Ok(_) => { 267 debug!(handle = handle, "Queued handle for proactive refresh"); 268 } 269 Err(e) => { 270 // Don't fail the request if we can't queue for refresh 271 debug!( 272 handle = handle, 273 error = %e, 274 "Failed to queue handle for proactive refresh" 275 ); 276 } 277 } 278 } 279 280 /// Check if we should queue for refresh based on resolution time 281 /// 282 /// This is a heuristic approach: 283 /// - If resolve_time is very low (< 5ms), it was likely a cache hit 284 /// - We probabilistically queue for refresh based on time since service start 285 /// 286 /// For proper implementation, the HandleResolver trait would need to expose 287 /// cache metadata or return HandleResolutionResult directly. 288 async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) { 289 // If resolution took less than 5ms, it was probably a cache hit 290 if resolve_time < 5000 { 291 // Use a simple probabilistic approach for demonstration 292 // In production, you'd want access to the actual cache timestamp 293 trace!( 294 handle = handle, 295 resolve_time_us = resolve_time, 296 "Fast resolution detected, considering proactive refresh" 297 ); 298 299 // Queue for refresh with some probability to avoid overwhelming the queue 300 // This is a simplified approach - ideally we'd have access to cache metadata 301 let now = SystemTime::now() 302 .duration_since(UNIX_EPOCH) 303 .unwrap_or_default() 304 .as_secs(); 305 306 // Simple heuristic: queue every N seconds for frequently accessed handles 307 if now % 60 == 0 { 308 self.queue_for_refresh(handle).await; 309 } 310 } 311 } 312} 313 314#[async_trait] 315impl<R, Q> HandleResolver for ProactiveRefreshResolver<R, Q> 316where 317 R: HandleResolver + Send + Sync, 318 Q: QueueAdapter<HandleResolutionWork> + Send + Sync, 319{ 320 async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> { 321 // Resolve through the inner resolver 322 let (did, resolve_time) = self.inner.resolve(handle).await?; 323 324 // Check if we should queue for refresh based on resolution time 325 self.maybe_queue_for_refresh(handle, resolve_time).await; 326 327 Ok((did, resolve_time)) 328 } 329} 330 331#[cfg(test)] 332mod tests { 333 use super::*; 334 use crate::handle_resolution_result::DidMethodType; 335 336 #[test] 337 fn test_needs_refresh_calculation() { 338 // Create a resolver with 100 second TTL and 80% threshold 339 let inner = Arc::new(MockResolver); 340 let queue = Arc::new(MockQueueAdapter); 341 let resolver = ProactiveRefreshResolver::new(inner, queue, 100); 342 343 let now = SystemTime::now() 344 .duration_since(UNIX_EPOCH) 345 .unwrap() 346 .as_secs(); 347 348 // Test entry that's 50% through TTL (should not refresh) 349 let fresh_result = HandleResolutionResult { 350 timestamp: now - 50, 351 method_type: DidMethodType::Plc, 352 payload: "alice123".to_string(), 353 }; 354 assert!(!resolver.needs_refresh(&fresh_result)); 355 356 // Test entry that's 80% through TTL (should refresh) 357 let stale_result = HandleResolutionResult { 358 timestamp: now - 80, 359 method_type: DidMethodType::Plc, 360 payload: "alice123".to_string(), 361 }; 362 assert!(resolver.needs_refresh(&stale_result)); 363 364 // Test entry that's 90% through TTL (should definitely refresh) 365 let very_stale_result = HandleResolutionResult { 366 timestamp: now - 90, 367 method_type: DidMethodType::Plc, 368 payload: "alice123".to_string(), 369 }; 370 assert!(resolver.needs_refresh(&very_stale_result)); 371 } 372 373 #[test] 374 fn test_custom_threshold() { 375 let inner = Arc::new(MockResolver); 376 let queue = Arc::new(MockQueueAdapter); 377 378 // Create resolver with 50% threshold 379 let resolver = ProactiveRefreshResolver::with_threshold(inner, queue, 100, 0.5); 380 381 let now = SystemTime::now() 382 .duration_since(UNIX_EPOCH) 383 .unwrap() 384 .as_secs(); 385 386 // Test entry that's 40% through TTL (should not refresh with 50% threshold) 387 let result_40 = HandleResolutionResult { 388 timestamp: now - 40, 389 method_type: DidMethodType::Plc, 390 payload: "alice123".to_string(), 391 }; 392 assert!(!resolver.needs_refresh(&result_40)); 393 394 // Test entry that's 60% through TTL (should refresh with 50% threshold) 395 let result_60 = HandleResolutionResult { 396 timestamp: now - 60, 397 method_type: DidMethodType::Plc, 398 payload: "alice123".to_string(), 399 }; 400 assert!(resolver.needs_refresh(&result_60)); 401 } 402 403 // Mock resolver for testing 404 struct MockResolver; 405 406 #[async_trait] 407 impl HandleResolver for MockResolver { 408 async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> { 409 Ok((format!("did:plc:{}", handle), 1000)) 410 } 411 } 412 413 // Mock queue adapter for testing 414 struct MockQueueAdapter; 415 416 #[async_trait] 417 impl QueueAdapter<HandleResolutionWork> for MockQueueAdapter { 418 async fn pull(&self) -> Option<HandleResolutionWork> { 419 None 420 } 421 422 async fn push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> { 423 Ok(()) 424 } 425 426 async fn ack(&self, _item: &HandleResolutionWork) -> crate::queue::Result<()> { 427 Ok(()) 428 } 429 430 async fn try_push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> { 431 Ok(()) 432 } 433 434 async fn is_healthy(&self) -> bool { 435 true 436 } 437 } 438}