forked from
smokesignal.events/quickdid
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1use crate::handle_resolution_result::HandleResolutionResult;
2use crate::handle_resolver::{HandleResolver, HandleResolverError};
3use crate::metrics::MetricsPublisher;
4use crate::queue::{HandleResolutionWork, QueueAdapter};
5use async_trait::async_trait;
6use std::sync::Arc;
7use std::time::{SystemTime, UNIX_EPOCH};
8use tracing::{debug, trace};
9
10/// Create a ProactiveRefreshResolver with default 80% threshold
11///
12/// # Arguments
13/// * `inner` - The inner resolver to wrap
14/// * `queue` - The queue adapter for background refresh tasks
15/// * `cache_ttl` - The TTL in seconds for cache entries
16pub fn create_proactive_refresh_resolver<R, Q>(
17 inner: Arc<R>,
18 queue: Arc<Q>,
19 cache_ttl: u64,
20) -> Arc<ProactiveRefreshResolver<R, Q>>
21where
22 R: HandleResolver + Send + Sync + 'static,
23 Q: QueueAdapter<HandleResolutionWork> + Send + Sync + 'static,
24{
25 Arc::new(ProactiveRefreshResolver::new(inner, queue, cache_ttl))
26}
27
28/// Create a ProactiveRefreshResolver with custom threshold
29///
30/// # Arguments
31/// * `inner` - The inner resolver to wrap
32/// * `queue` - The queue adapter for background refresh tasks
33/// * `cache_ttl` - The TTL in seconds for cache entries
34/// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh
35pub fn create_proactive_refresh_resolver_with_threshold<R, Q>(
36 inner: Arc<R>,
37 queue: Arc<Q>,
38 cache_ttl: u64,
39 threshold: f64,
40) -> Arc<ProactiveRefreshResolver<R, Q>>
41where
42 R: HandleResolver + Send + Sync + 'static,
43 Q: QueueAdapter<HandleResolutionWork> + Send + Sync + 'static,
44{
45 Arc::new(ProactiveRefreshResolver::with_threshold(
46 inner, queue, cache_ttl, threshold,
47 ))
48}
49
50/// Wrapper struct for dynamic dispatch with proactive refresh
51/// This works with trait objects instead of concrete types
52pub struct DynProactiveRefreshResolver {
53 inner: Arc<dyn HandleResolver>,
54 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
55 metrics: Option<Arc<dyn MetricsPublisher>>,
56 #[allow(dead_code)]
57 cache_ttl: u64,
58 #[allow(dead_code)]
59 refresh_threshold: f64,
60}
61
62impl DynProactiveRefreshResolver {
63 pub fn new(
64 inner: Arc<dyn HandleResolver>,
65 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
66 cache_ttl: u64,
67 refresh_threshold: f64,
68 ) -> Self {
69 Self::with_metrics(inner, queue, None, cache_ttl, refresh_threshold)
70 }
71
72 pub fn with_metrics(
73 inner: Arc<dyn HandleResolver>,
74 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
75 metrics: Option<Arc<dyn MetricsPublisher>>,
76 cache_ttl: u64,
77 refresh_threshold: f64,
78 ) -> Self {
79 Self {
80 inner,
81 queue,
82 metrics,
83 cache_ttl,
84 refresh_threshold: refresh_threshold.clamp(0.0, 1.0),
85 }
86 }
87
88 async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) {
89 // If resolution took less than 5ms, it was probably a cache hit
90 if resolve_time < 5000 {
91 trace!(
92 handle = handle,
93 resolve_time_us = resolve_time,
94 "Fast resolution detected, considering proactive refresh"
95 );
96
97 if let Some(metrics) = &self.metrics {
98 metrics.incr("proactive_refresh.cache_hit_detected").await;
99 }
100
101 // Simple heuristic: queue for refresh with some probability
102 let now = SystemTime::now()
103 .duration_since(UNIX_EPOCH)
104 .unwrap_or_default()
105 .as_secs();
106
107 // Queue every N seconds for frequently accessed handles
108 if now % 60 == 0 {
109 let work = HandleResolutionWork {
110 handle: handle.to_string(),
111 };
112
113 if let Err(e) = self.queue.push(work).await {
114 debug!(
115 handle = handle,
116 error = %e,
117 "Failed to queue handle for proactive refresh"
118 );
119 if let Some(metrics) = &self.metrics {
120 metrics.incr("proactive_refresh.queue_error").await;
121 }
122 } else {
123 debug!(handle = handle, "Queued handle for proactive refresh");
124 if let Some(metrics) = &self.metrics {
125 metrics.incr("proactive_refresh.queued").await;
126 }
127 }
128 }
129 }
130 }
131}
132
133#[async_trait]
134impl HandleResolver for DynProactiveRefreshResolver {
135 async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> {
136 // Resolve through the inner resolver
137 let (did, resolve_time) = self.inner.resolve(handle).await?;
138
139 // Check if we should queue for refresh based on resolution time
140 self.maybe_queue_for_refresh(handle, resolve_time).await;
141
142 Ok((did, resolve_time))
143 }
144
145 async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> {
146 // Simply chain to inner resolver - no proactive refresh needed for manual sets
147 self.inner.set(handle, did).await
148 }
149}
150
151/// Create a ProactiveRefreshResolver with custom threshold using trait objects
152/// This version works with dyn HandleResolver and dyn QueueAdapter
153///
154/// # Arguments
155/// * `inner` - The inner resolver to wrap
156/// * `queue` - The queue adapter for background refresh tasks
157/// * `cache_ttl` - The TTL in seconds for cache entries
158/// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh
159pub fn create_proactive_refresh_resolver_dyn(
160 inner: Arc<dyn HandleResolver>,
161 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
162 cache_ttl: u64,
163 threshold: f64,
164) -> Arc<dyn HandleResolver> {
165 Arc::new(DynProactiveRefreshResolver::new(
166 inner, queue, cache_ttl, threshold,
167 ))
168}
169
170/// Create a ProactiveRefreshResolver with metrics support
171pub fn create_proactive_refresh_resolver_with_metrics(
172 inner: Arc<dyn HandleResolver>,
173 queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
174 metrics: Arc<dyn MetricsPublisher>,
175 cache_ttl: u64,
176 threshold: f64,
177) -> Arc<dyn HandleResolver> {
178 Arc::new(DynProactiveRefreshResolver::with_metrics(
179 inner,
180 queue,
181 Some(metrics),
182 cache_ttl,
183 threshold,
184 ))
185}
186
187/// A handle resolver that proactively refreshes cache entries when they reach
188/// a certain staleness threshold (default 80% of TTL).
189///
190/// This resolver wraps another resolver and checks successful resolutions from cache.
191/// When a cached entry has lived for more than the threshold percentage of its TTL,
192/// it queues the handle for background refresh to keep the cache warm.
193///
194/// Note: Due to the current trait design, this implementation uses the resolution time
195/// as a heuristic. When resolve_time is 0 (instant cache hit), it may queue for refresh.
196/// For full functionality, the trait would need to expose cache timestamps.
197pub struct ProactiveRefreshResolver<R: HandleResolver, Q: QueueAdapter<HandleResolutionWork>> {
198 inner: Arc<R>,
199 queue: Arc<Q>,
200 /// TTL in seconds for cache entries
201 cache_ttl: u64,
202 /// Threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh
203 /// Default is 0.8 (80%)
204 refresh_threshold: f64,
205}
206
207impl<R: HandleResolver, Q: QueueAdapter<HandleResolutionWork>> ProactiveRefreshResolver<R, Q> {
208 pub fn new(inner: Arc<R>, queue: Arc<Q>, cache_ttl: u64) -> Self {
209 Self::with_threshold(inner, queue, cache_ttl, 0.8)
210 }
211
212 pub fn with_threshold(
213 inner: Arc<R>,
214 queue: Arc<Q>,
215 cache_ttl: u64,
216 refresh_threshold: f64,
217 ) -> Self {
218 Self {
219 inner,
220 queue,
221 cache_ttl,
222 refresh_threshold: refresh_threshold.clamp(0.0, 1.0),
223 }
224 }
225
226 /// Check if a cached entry needs proactive refresh based on its age
227 #[allow(dead_code)]
228 fn needs_refresh(&self, result: &HandleResolutionResult) -> bool {
229 let now = SystemTime::now()
230 .duration_since(UNIX_EPOCH)
231 .unwrap_or_default()
232 .as_secs();
233
234 let age = now.saturating_sub(result.timestamp);
235 let threshold = (self.cache_ttl as f64 * self.refresh_threshold) as u64;
236
237 let needs_refresh = age >= threshold;
238
239 if needs_refresh {
240 debug!(
241 handle = ?result.to_did(),
242 age_seconds = age,
243 threshold_seconds = threshold,
244 cache_ttl = self.cache_ttl,
245 "Cache entry needs proactive refresh"
246 );
247 } else {
248 trace!(
249 handle = ?result.to_did(),
250 age_seconds = age,
251 threshold_seconds = threshold,
252 "Cache entry still fresh"
253 );
254 }
255
256 needs_refresh
257 }
258
259 /// Queue a handle for background refresh
260 async fn queue_for_refresh(&self, handle: &str) {
261 let work = HandleResolutionWork {
262 handle: handle.to_string(),
263 };
264
265 match self.queue.push(work).await {
266 Ok(_) => {
267 debug!(handle = handle, "Queued handle for proactive refresh");
268 }
269 Err(e) => {
270 // Don't fail the request if we can't queue for refresh
271 debug!(
272 handle = handle,
273 error = %e,
274 "Failed to queue handle for proactive refresh"
275 );
276 }
277 }
278 }
279
280 /// Check if we should queue for refresh based on resolution time
281 ///
282 /// This is a heuristic approach:
283 /// - If resolve_time is very low (< 5ms), it was likely a cache hit
284 /// - We probabilistically queue for refresh based on time since service start
285 ///
286 /// For proper implementation, the HandleResolver trait would need to expose
287 /// cache metadata or return HandleResolutionResult directly.
288 async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) {
289 // If resolution took less than 5ms, it was probably a cache hit
290 if resolve_time < 5000 {
291 // Use a simple probabilistic approach for demonstration
292 // In production, you'd want access to the actual cache timestamp
293 trace!(
294 handle = handle,
295 resolve_time_us = resolve_time,
296 "Fast resolution detected, considering proactive refresh"
297 );
298
299 // Queue for refresh with some probability to avoid overwhelming the queue
300 // This is a simplified approach - ideally we'd have access to cache metadata
301 let now = SystemTime::now()
302 .duration_since(UNIX_EPOCH)
303 .unwrap_or_default()
304 .as_secs();
305
306 // Simple heuristic: queue every N seconds for frequently accessed handles
307 if now % 60 == 0 {
308 self.queue_for_refresh(handle).await;
309 }
310 }
311 }
312}
313
314#[async_trait]
315impl<R, Q> HandleResolver for ProactiveRefreshResolver<R, Q>
316where
317 R: HandleResolver + Send + Sync,
318 Q: QueueAdapter<HandleResolutionWork> + Send + Sync,
319{
320 async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> {
321 // Resolve through the inner resolver
322 let (did, resolve_time) = self.inner.resolve(handle).await?;
323
324 // Check if we should queue for refresh based on resolution time
325 self.maybe_queue_for_refresh(handle, resolve_time).await;
326
327 Ok((did, resolve_time))
328 }
329}
330
331#[cfg(test)]
332mod tests {
333 use super::*;
334 use crate::handle_resolution_result::DidMethodType;
335
336 #[test]
337 fn test_needs_refresh_calculation() {
338 // Create a resolver with 100 second TTL and 80% threshold
339 let inner = Arc::new(MockResolver);
340 let queue = Arc::new(MockQueueAdapter);
341 let resolver = ProactiveRefreshResolver::new(inner, queue, 100);
342
343 let now = SystemTime::now()
344 .duration_since(UNIX_EPOCH)
345 .unwrap()
346 .as_secs();
347
348 // Test entry that's 50% through TTL (should not refresh)
349 let fresh_result = HandleResolutionResult {
350 timestamp: now - 50,
351 method_type: DidMethodType::Plc,
352 payload: "alice123".to_string(),
353 };
354 assert!(!resolver.needs_refresh(&fresh_result));
355
356 // Test entry that's 80% through TTL (should refresh)
357 let stale_result = HandleResolutionResult {
358 timestamp: now - 80,
359 method_type: DidMethodType::Plc,
360 payload: "alice123".to_string(),
361 };
362 assert!(resolver.needs_refresh(&stale_result));
363
364 // Test entry that's 90% through TTL (should definitely refresh)
365 let very_stale_result = HandleResolutionResult {
366 timestamp: now - 90,
367 method_type: DidMethodType::Plc,
368 payload: "alice123".to_string(),
369 };
370 assert!(resolver.needs_refresh(&very_stale_result));
371 }
372
373 #[test]
374 fn test_custom_threshold() {
375 let inner = Arc::new(MockResolver);
376 let queue = Arc::new(MockQueueAdapter);
377
378 // Create resolver with 50% threshold
379 let resolver = ProactiveRefreshResolver::with_threshold(inner, queue, 100, 0.5);
380
381 let now = SystemTime::now()
382 .duration_since(UNIX_EPOCH)
383 .unwrap()
384 .as_secs();
385
386 // Test entry that's 40% through TTL (should not refresh with 50% threshold)
387 let result_40 = HandleResolutionResult {
388 timestamp: now - 40,
389 method_type: DidMethodType::Plc,
390 payload: "alice123".to_string(),
391 };
392 assert!(!resolver.needs_refresh(&result_40));
393
394 // Test entry that's 60% through TTL (should refresh with 50% threshold)
395 let result_60 = HandleResolutionResult {
396 timestamp: now - 60,
397 method_type: DidMethodType::Plc,
398 payload: "alice123".to_string(),
399 };
400 assert!(resolver.needs_refresh(&result_60));
401 }
402
403 // Mock resolver for testing
404 struct MockResolver;
405
406 #[async_trait]
407 impl HandleResolver for MockResolver {
408 async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> {
409 Ok((format!("did:plc:{}", handle), 1000))
410 }
411 }
412
413 // Mock queue adapter for testing
414 struct MockQueueAdapter;
415
416 #[async_trait]
417 impl QueueAdapter<HandleResolutionWork> for MockQueueAdapter {
418 async fn pull(&self) -> Option<HandleResolutionWork> {
419 None
420 }
421
422 async fn push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> {
423 Ok(())
424 }
425
426 async fn ack(&self, _item: &HandleResolutionWork) -> crate::queue::Result<()> {
427 Ok(())
428 }
429
430 async fn try_push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> {
431 Ok(())
432 }
433
434 async fn is_healthy(&self) -> bool {
435 true
436 }
437 }
438}