use crate::handle_resolution_result::HandleResolutionResult; use crate::handle_resolver::{HandleResolver, HandleResolverError}; use crate::metrics::MetricsPublisher; use crate::queue::{HandleResolutionWork, QueueAdapter}; use async_trait::async_trait; use std::sync::Arc; use std::time::{SystemTime, UNIX_EPOCH}; use tracing::{debug, trace}; /// Create a ProactiveRefreshResolver with default 80% threshold /// /// # Arguments /// * `inner` - The inner resolver to wrap /// * `queue` - The queue adapter for background refresh tasks /// * `cache_ttl` - The TTL in seconds for cache entries pub fn create_proactive_refresh_resolver( inner: Arc, queue: Arc, cache_ttl: u64, ) -> Arc> where R: HandleResolver + Send + Sync + 'static, Q: QueueAdapter + Send + Sync + 'static, { Arc::new(ProactiveRefreshResolver::new(inner, queue, cache_ttl)) } /// Create a ProactiveRefreshResolver with custom threshold /// /// # Arguments /// * `inner` - The inner resolver to wrap /// * `queue` - The queue adapter for background refresh tasks /// * `cache_ttl` - The TTL in seconds for cache entries /// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh pub fn create_proactive_refresh_resolver_with_threshold( inner: Arc, queue: Arc, cache_ttl: u64, threshold: f64, ) -> Arc> where R: HandleResolver + Send + Sync + 'static, Q: QueueAdapter + Send + Sync + 'static, { Arc::new(ProactiveRefreshResolver::with_threshold( inner, queue, cache_ttl, threshold, )) } /// Wrapper struct for dynamic dispatch with proactive refresh /// This works with trait objects instead of concrete types pub struct DynProactiveRefreshResolver { inner: Arc, queue: Arc>, metrics: Option>, #[allow(dead_code)] cache_ttl: u64, #[allow(dead_code)] refresh_threshold: f64, } impl DynProactiveRefreshResolver { pub fn new( inner: Arc, queue: Arc>, cache_ttl: u64, refresh_threshold: f64, ) -> Self { Self::with_metrics(inner, queue, None, cache_ttl, refresh_threshold) } pub fn with_metrics( inner: Arc, queue: Arc>, metrics: Option>, cache_ttl: u64, refresh_threshold: f64, ) -> Self { Self { inner, queue, metrics, cache_ttl, refresh_threshold: refresh_threshold.clamp(0.0, 1.0), } } async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) { // If resolution took less than 5ms, it was probably a cache hit if resolve_time < 5000 { trace!( handle = handle, resolve_time_us = resolve_time, "Fast resolution detected, considering proactive refresh" ); if let Some(metrics) = &self.metrics { metrics.incr("proactive_refresh.cache_hit_detected").await; } // Simple heuristic: queue for refresh with some probability let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); // Queue every N seconds for frequently accessed handles if now % 60 == 0 { let work = HandleResolutionWork { handle: handle.to_string(), }; if let Err(e) = self.queue.push(work).await { debug!( handle = handle, error = %e, "Failed to queue handle for proactive refresh" ); if let Some(metrics) = &self.metrics { metrics.incr("proactive_refresh.queue_error").await; } } else { debug!(handle = handle, "Queued handle for proactive refresh"); if let Some(metrics) = &self.metrics { metrics.incr("proactive_refresh.queued").await; } } } } } } #[async_trait] impl HandleResolver for DynProactiveRefreshResolver { async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> { // Resolve through the inner resolver let (did, resolve_time) = self.inner.resolve(handle).await?; // Check if we should queue for refresh based on resolution time self.maybe_queue_for_refresh(handle, resolve_time).await; Ok((did, resolve_time)) } async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> { // Simply chain to inner resolver - no proactive refresh needed for manual sets self.inner.set(handle, did).await } } /// Create a ProactiveRefreshResolver with custom threshold using trait objects /// This version works with dyn HandleResolver and dyn QueueAdapter /// /// # Arguments /// * `inner` - The inner resolver to wrap /// * `queue` - The queue adapter for background refresh tasks /// * `cache_ttl` - The TTL in seconds for cache entries /// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh pub fn create_proactive_refresh_resolver_dyn( inner: Arc, queue: Arc>, cache_ttl: u64, threshold: f64, ) -> Arc { Arc::new(DynProactiveRefreshResolver::new( inner, queue, cache_ttl, threshold, )) } /// Create a ProactiveRefreshResolver with metrics support pub fn create_proactive_refresh_resolver_with_metrics( inner: Arc, queue: Arc>, metrics: Arc, cache_ttl: u64, threshold: f64, ) -> Arc { Arc::new(DynProactiveRefreshResolver::with_metrics( inner, queue, Some(metrics), cache_ttl, threshold, )) } /// A handle resolver that proactively refreshes cache entries when they reach /// a certain staleness threshold (default 80% of TTL). /// /// This resolver wraps another resolver and checks successful resolutions from cache. /// When a cached entry has lived for more than the threshold percentage of its TTL, /// it queues the handle for background refresh to keep the cache warm. /// /// Note: Due to the current trait design, this implementation uses the resolution time /// as a heuristic. When resolve_time is 0 (instant cache hit), it may queue for refresh. /// For full functionality, the trait would need to expose cache timestamps. pub struct ProactiveRefreshResolver> { inner: Arc, queue: Arc, /// TTL in seconds for cache entries cache_ttl: u64, /// Threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh /// Default is 0.8 (80%) refresh_threshold: f64, } impl> ProactiveRefreshResolver { pub fn new(inner: Arc, queue: Arc, cache_ttl: u64) -> Self { Self::with_threshold(inner, queue, cache_ttl, 0.8) } pub fn with_threshold( inner: Arc, queue: Arc, cache_ttl: u64, refresh_threshold: f64, ) -> Self { Self { inner, queue, cache_ttl, refresh_threshold: refresh_threshold.clamp(0.0, 1.0), } } /// Check if a cached entry needs proactive refresh based on its age #[allow(dead_code)] fn needs_refresh(&self, result: &HandleResolutionResult) -> bool { let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); let age = now.saturating_sub(result.timestamp); let threshold = (self.cache_ttl as f64 * self.refresh_threshold) as u64; let needs_refresh = age >= threshold; if needs_refresh { debug!( handle = ?result.to_did(), age_seconds = age, threshold_seconds = threshold, cache_ttl = self.cache_ttl, "Cache entry needs proactive refresh" ); } else { trace!( handle = ?result.to_did(), age_seconds = age, threshold_seconds = threshold, "Cache entry still fresh" ); } needs_refresh } /// Queue a handle for background refresh async fn queue_for_refresh(&self, handle: &str) { let work = HandleResolutionWork { handle: handle.to_string(), }; match self.queue.push(work).await { Ok(_) => { debug!(handle = handle, "Queued handle for proactive refresh"); } Err(e) => { // Don't fail the request if we can't queue for refresh debug!( handle = handle, error = %e, "Failed to queue handle for proactive refresh" ); } } } /// Check if we should queue for refresh based on resolution time /// /// This is a heuristic approach: /// - If resolve_time is very low (< 5ms), it was likely a cache hit /// - We probabilistically queue for refresh based on time since service start /// /// For proper implementation, the HandleResolver trait would need to expose /// cache metadata or return HandleResolutionResult directly. async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) { // If resolution took less than 5ms, it was probably a cache hit if resolve_time < 5000 { // Use a simple probabilistic approach for demonstration // In production, you'd want access to the actual cache timestamp trace!( handle = handle, resolve_time_us = resolve_time, "Fast resolution detected, considering proactive refresh" ); // Queue for refresh with some probability to avoid overwhelming the queue // This is a simplified approach - ideally we'd have access to cache metadata let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap_or_default() .as_secs(); // Simple heuristic: queue every N seconds for frequently accessed handles if now % 60 == 0 { self.queue_for_refresh(handle).await; } } } } #[async_trait] impl HandleResolver for ProactiveRefreshResolver where R: HandleResolver + Send + Sync, Q: QueueAdapter + Send + Sync, { async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> { // Resolve through the inner resolver let (did, resolve_time) = self.inner.resolve(handle).await?; // Check if we should queue for refresh based on resolution time self.maybe_queue_for_refresh(handle, resolve_time).await; Ok((did, resolve_time)) } } #[cfg(test)] mod tests { use super::*; use crate::handle_resolution_result::DidMethodType; #[test] fn test_needs_refresh_calculation() { // Create a resolver with 100 second TTL and 80% threshold let inner = Arc::new(MockResolver); let queue = Arc::new(MockQueueAdapter); let resolver = ProactiveRefreshResolver::new(inner, queue, 100); let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); // Test entry that's 50% through TTL (should not refresh) let fresh_result = HandleResolutionResult { timestamp: now - 50, method_type: DidMethodType::Plc, payload: "alice123".to_string(), }; assert!(!resolver.needs_refresh(&fresh_result)); // Test entry that's 80% through TTL (should refresh) let stale_result = HandleResolutionResult { timestamp: now - 80, method_type: DidMethodType::Plc, payload: "alice123".to_string(), }; assert!(resolver.needs_refresh(&stale_result)); // Test entry that's 90% through TTL (should definitely refresh) let very_stale_result = HandleResolutionResult { timestamp: now - 90, method_type: DidMethodType::Plc, payload: "alice123".to_string(), }; assert!(resolver.needs_refresh(&very_stale_result)); } #[test] fn test_custom_threshold() { let inner = Arc::new(MockResolver); let queue = Arc::new(MockQueueAdapter); // Create resolver with 50% threshold let resolver = ProactiveRefreshResolver::with_threshold(inner, queue, 100, 0.5); let now = SystemTime::now() .duration_since(UNIX_EPOCH) .unwrap() .as_secs(); // Test entry that's 40% through TTL (should not refresh with 50% threshold) let result_40 = HandleResolutionResult { timestamp: now - 40, method_type: DidMethodType::Plc, payload: "alice123".to_string(), }; assert!(!resolver.needs_refresh(&result_40)); // Test entry that's 60% through TTL (should refresh with 50% threshold) let result_60 = HandleResolutionResult { timestamp: now - 60, method_type: DidMethodType::Plc, payload: "alice123".to_string(), }; assert!(resolver.needs_refresh(&result_60)); } // Mock resolver for testing struct MockResolver; #[async_trait] impl HandleResolver for MockResolver { async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> { Ok((format!("did:plc:{}", handle), 1000)) } } // Mock queue adapter for testing struct MockQueueAdapter; #[async_trait] impl QueueAdapter for MockQueueAdapter { async fn pull(&self) -> Option { None } async fn push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> { Ok(()) } async fn ack(&self, _item: &HandleResolutionWork) -> crate::queue::Result<()> { Ok(()) } async fn try_push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> { Ok(()) } async fn is_healthy(&self) -> bool { true } } }