+1
Cargo.lock
+1
Cargo.lock
+3
Cargo.toml
+3
Cargo.toml
+91
-36
src/bin/quickdid.rs
+91
-36
src/bin/quickdid.rs
···
8
8
cache::create_redis_pool,
9
9
config::Config,
10
10
handle_resolver::{
11
-
create_base_resolver, create_caching_resolver, create_rate_limited_resolver_with_timeout,
12
-
create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl,
11
+
create_base_resolver, create_caching_resolver, create_proactive_refresh_resolver_dyn,
12
+
create_rate_limited_resolver_with_timeout, create_redis_resolver_with_ttl,
13
+
create_sqlite_resolver_with_ttl,
13
14
},
14
15
handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config},
15
16
http::{AppContext, create_router},
···
134
135
println!(
135
136
" METRICS_STATSD_HOST StatsD host when using statsd adapter (e.g., localhost:8125)"
136
137
);
138
+
println!(" METRICS_PREFIX Prefix for all metrics (default: quickdid)");
137
139
println!(
138
-
" METRICS_PREFIX Prefix for all metrics (default: quickdid)"
140
+
" METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)"
139
141
);
142
+
println!();
143
+
println!(" PROACTIVE CACHE REFRESH:");
140
144
println!(
141
-
" METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)"
145
+
" PROACTIVE_REFRESH_ENABLED Enable proactive cache refresh (default: false)"
146
+
);
147
+
println!(
148
+
" PROACTIVE_REFRESH_THRESHOLD Threshold as percentage of TTL (0.0-1.0, default: 0.8)"
142
149
);
143
150
println!();
144
151
println!(
···
237
244
tracing::error!("Failed to create metrics publisher: {}", e);
238
245
anyhow::anyhow!("Failed to create metrics publisher: {}", e)
239
246
})?;
240
-
247
+
241
248
tracing::info!(
242
249
"Metrics publisher created with {} adapter",
243
250
config.metrics_adapter
···
246
253
metrics_publisher.gauge("server", 1).await;
247
254
248
255
// Create base handle resolver using factory function
249
-
let mut base_handle_resolver =
250
-
create_base_resolver(dns_resolver_arc.clone(), http_client.clone(), metrics_publisher.clone());
256
+
let mut base_handle_resolver = create_base_resolver(
257
+
dns_resolver_arc.clone(),
258
+
http_client.clone(),
259
+
metrics_publisher.clone(),
260
+
);
251
261
252
262
// Apply rate limiting if configured
253
263
if config.resolver_max_concurrent > 0 {
···
282
292
None
283
293
};
284
294
285
-
// Create handle resolver with cache priority: Redis > SQLite > In-memory
286
-
let handle_resolver: Arc<dyn quickdid::handle_resolver::HandleResolver> =
287
-
if let Some(pool) = redis_pool {
288
-
tracing::info!(
289
-
"Using Redis-backed handle resolver with {}-second cache TTL",
290
-
config.cache_ttl_redis
291
-
);
292
-
create_redis_resolver_with_ttl(base_handle_resolver, pool, config.cache_ttl_redis, metrics_publisher.clone())
293
-
} else if let Some(pool) = sqlite_pool {
294
-
tracing::info!(
295
-
"Using SQLite-backed handle resolver with {}-second cache TTL",
296
-
config.cache_ttl_sqlite
297
-
);
298
-
create_sqlite_resolver_with_ttl(base_handle_resolver, pool, config.cache_ttl_sqlite, metrics_publisher.clone())
299
-
} else {
300
-
tracing::info!(
301
-
"Using in-memory handle resolver with {}-second cache TTL",
302
-
config.cache_ttl_memory
303
-
);
304
-
create_caching_resolver(base_handle_resolver, config.cache_ttl_memory, metrics_publisher.clone())
305
-
};
306
-
307
295
// Create task tracker and cancellation token
308
296
let tracker = TaskTracker::new();
309
297
let token = CancellationToken::new();
310
298
311
-
// Setup background handle resolution task and get the queue adapter
299
+
// Create the queue adapter first (needed for proactive refresh)
312
300
let handle_queue: Arc<dyn QueueAdapter<HandleResolutionWork>> = {
313
301
// Create queue adapter based on configuration
314
302
let adapter: Arc<dyn QueueAdapter<HandleResolutionWork>> = match config
···
410
398
}
411
399
};
412
400
413
-
// Keep a reference to the adapter for the AppContext
414
-
let adapter_for_context = adapter.clone();
401
+
adapter
402
+
};
403
+
404
+
// Create handle resolver with cache priority: Redis > SQLite > In-memory
405
+
let (mut handle_resolver, cache_ttl): (
406
+
Arc<dyn quickdid::handle_resolver::HandleResolver>,
407
+
u64,
408
+
) = if let Some(pool) = redis_pool {
409
+
tracing::info!(
410
+
"Using Redis-backed handle resolver with {}-second cache TTL",
411
+
config.cache_ttl_redis
412
+
);
413
+
(
414
+
create_redis_resolver_with_ttl(
415
+
base_handle_resolver,
416
+
pool,
417
+
config.cache_ttl_redis,
418
+
metrics_publisher.clone(),
419
+
),
420
+
config.cache_ttl_redis,
421
+
)
422
+
} else if let Some(pool) = sqlite_pool {
423
+
tracing::info!(
424
+
"Using SQLite-backed handle resolver with {}-second cache TTL",
425
+
config.cache_ttl_sqlite
426
+
);
427
+
(
428
+
create_sqlite_resolver_with_ttl(
429
+
base_handle_resolver,
430
+
pool,
431
+
config.cache_ttl_sqlite,
432
+
metrics_publisher.clone(),
433
+
),
434
+
config.cache_ttl_sqlite,
435
+
)
436
+
} else {
437
+
tracing::info!(
438
+
"Using in-memory handle resolver with {}-second cache TTL",
439
+
config.cache_ttl_memory
440
+
);
441
+
(
442
+
create_caching_resolver(
443
+
base_handle_resolver,
444
+
config.cache_ttl_memory,
445
+
metrics_publisher.clone(),
446
+
),
447
+
config.cache_ttl_memory,
448
+
)
449
+
};
450
+
451
+
// Apply proactive refresh if enabled
452
+
if config.proactive_refresh_enabled && !matches!(config.queue_adapter.as_str(), "noop" | "none")
453
+
{
454
+
tracing::info!(
455
+
"Enabling proactive cache refresh with {}% threshold",
456
+
(config.proactive_refresh_threshold * 100.0) as u32
457
+
);
458
+
handle_resolver = create_proactive_refresh_resolver_dyn(
459
+
handle_resolver,
460
+
handle_queue.clone(),
461
+
cache_ttl,
462
+
config.proactive_refresh_threshold,
463
+
);
464
+
} else if config.proactive_refresh_enabled {
465
+
tracing::warn!(
466
+
"Proactive refresh enabled but queue adapter is no-op, skipping proactive refresh"
467
+
);
468
+
}
469
+
470
+
// Setup background handle resolution task
471
+
{
472
+
let adapter_for_task = handle_queue.clone();
415
473
416
474
// Only spawn handle resolver task if not using noop adapter
417
475
if !matches!(config.queue_adapter.as_str(), "noop" | "none") {
···
422
480
423
481
// Create and start handle resolver task
424
482
let handle_task = create_handle_resolver_task_with_config(
425
-
adapter,
483
+
adapter_for_task,
426
484
handle_resolver.clone(),
427
485
token.clone(),
428
486
handle_task_config,
···
459
517
} else {
460
518
tracing::info!("Background handle resolution task disabled (using no-op adapter)");
461
519
}
462
-
463
-
// Return the adapter to be used in AppContext
464
-
adapter_for_context
465
520
};
466
521
467
522
// Create app context with the queue adapter
+24
-3
src/config.rs
+24
-3
src/config.rs
···
227
227
/// Default tags for all metrics (comma-separated key:value pairs)
228
228
/// Example: "env:production,service:quickdid"
229
229
pub metrics_tags: Option<String>,
230
+
231
+
/// Enable proactive cache refresh for frequently accessed handles.
232
+
/// When enabled, cache entries that have reached the refresh threshold
233
+
/// will be queued for background refresh to keep the cache warm.
234
+
/// Default: false
235
+
pub proactive_refresh_enabled: bool,
236
+
237
+
/// Threshold as a percentage (0.0-1.0) of cache TTL when to trigger proactive refresh.
238
+
/// For example, 0.8 means refresh when an entry has lived for 80% of its TTL.
239
+
/// Default: 0.8 (80%)
240
+
pub proactive_refresh_threshold: f64,
230
241
}
231
242
232
243
impl Config {
···
322
333
metrics_statsd_host: get_env_or_default("METRICS_STATSD_HOST", None),
323
334
metrics_prefix: get_env_or_default("METRICS_PREFIX", Some("quickdid")).unwrap(),
324
335
metrics_tags: get_env_or_default("METRICS_TAGS", None),
336
+
proactive_refresh_enabled: parse_env("PROACTIVE_REFRESH_ENABLED", false)?,
337
+
proactive_refresh_threshold: parse_env("PROACTIVE_REFRESH_THRESHOLD", 0.8)?,
325
338
};
326
339
327
340
// Calculate the Cache-Control header value if enabled
···
410
423
"RESOLVER_MAX_CONCURRENT_TIMEOUT_MS must be <= 60000 (60 seconds)".to_string(),
411
424
));
412
425
}
413
-
426
+
414
427
// Validate metrics configuration
415
428
match self.metrics_adapter.as_str() {
416
429
"noop" | "statsd" => {}
···
421
434
)));
422
435
}
423
436
}
424
-
437
+
425
438
// If statsd is configured, ensure host is provided
426
439
if self.metrics_adapter == "statsd" && self.metrics_statsd_host.is_none() {
427
440
return Err(ConfigError::MissingRequired(
428
441
"METRICS_STATSD_HOST is required when METRICS_ADAPTER is 'statsd'".to_string(),
429
442
));
430
443
}
431
-
444
+
445
+
// Validate proactive refresh threshold
446
+
if self.proactive_refresh_threshold < 0.0 || self.proactive_refresh_threshold > 1.0 {
447
+
return Err(ConfigError::InvalidValue(format!(
448
+
"PROACTIVE_REFRESH_THRESHOLD must be between 0.0 and 1.0, got {}",
449
+
self.proactive_refresh_threshold
450
+
)));
451
+
}
452
+
432
453
Ok(())
433
454
}
434
455
}
+18
-6
src/handle_resolver/base.rs
+18
-6
src/handle_resolver/base.rs
···
58
58
impl HandleResolver for BaseHandleResolver {
59
59
async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> {
60
60
let start_time = std::time::Instant::now();
61
-
61
+
62
62
// Perform DNS/HTTP resolution
63
63
let result = resolve_subject(&self.http_client, &*self.dns_resolver, s)
64
64
.await
65
65
.map_err(|e| HandleResolverError::ResolutionFailed(e.to_string()));
66
66
67
67
let duration_ms = start_time.elapsed().as_millis() as u64;
68
-
68
+
69
69
// Publish metrics
70
-
70
+
71
71
match result {
72
72
Ok(did) => {
73
-
self.metrics.time_with_tags("resolver.base.duration_ms", duration_ms, &[("success", "1")]).await;
74
-
73
+
self.metrics
74
+
.time_with_tags(
75
+
"resolver.base.duration_ms",
76
+
duration_ms,
77
+
&[("success", "1")],
78
+
)
79
+
.await;
80
+
75
81
let timestamp = SystemTime::now()
76
82
.duration_since(UNIX_EPOCH)
77
83
.map_err(|e| {
···
82
88
Ok((did, timestamp))
83
89
}
84
90
Err(e) => {
85
-
self.metrics.time_with_tags("resolver.base.duration_ms", duration_ms, &[("success", "0")]).await;
91
+
self.metrics
92
+
.time_with_tags(
93
+
"resolver.base.duration_ms",
94
+
duration_ms,
95
+
&[("success", "0")],
96
+
)
97
+
.await;
86
98
Err(e)
87
99
}
88
100
}
+12
-4
src/handle_resolver/memory.rs
+12
-4
src/handle_resolver/memory.rs
···
70
70
/// * `inner` - The underlying resolver to use for actual resolution
71
71
/// * `ttl_seconds` - How long to cache results in seconds
72
72
/// * `metrics` - Metrics publisher for telemetry
73
-
pub fn new(inner: Arc<dyn HandleResolver>, ttl_seconds: u64, metrics: SharedMetricsPublisher) -> Self {
73
+
pub fn new(
74
+
inner: Arc<dyn HandleResolver>,
75
+
ttl_seconds: u64,
76
+
metrics: SharedMetricsPublisher,
77
+
) -> Self {
74
78
Self {
75
79
inner,
76
80
cache: Arc::new(RwLock::new(HashMap::new())),
···
118
122
handle,
119
123
error
120
124
);
121
-
self.metrics.incr("resolver.memory.cache_hit_not_resolved").await;
125
+
self.metrics
126
+
.incr("resolver.memory.cache_hit_not_resolved")
127
+
.await;
122
128
return Err(HandleResolverError::HandleNotFoundCached(error.clone()));
123
129
}
124
130
tracing::debug!("Cache entry expired for handle {}", handle);
···
159
165
tracing::debug!("Cached failed resolution for handle {}: {}", handle, e);
160
166
}
161
167
}
162
-
168
+
163
169
// Track cache size
164
170
let cache_size = cache.len() as u64;
165
-
self.metrics.gauge("resolver.memory.cache_entries", cache_size).await;
171
+
self.metrics
172
+
.gauge("resolver.memory.cache_entries", cache_size)
173
+
.await;
166
174
}
167
175
168
176
result
+5
src/handle_resolver/mod.rs
+5
src/handle_resolver/mod.rs
···
47
47
mod base;
48
48
mod errors;
49
49
mod memory;
50
+
mod proactive_refresh;
50
51
mod rate_limited;
51
52
mod redis;
52
53
mod sqlite;
···
59
60
// Factory functions for creating resolvers
60
61
pub use base::create_base_resolver;
61
62
pub use memory::create_caching_resolver;
63
+
pub use proactive_refresh::{
64
+
ProactiveRefreshResolver, create_proactive_refresh_resolver,
65
+
create_proactive_refresh_resolver_dyn, create_proactive_refresh_resolver_with_threshold,
66
+
};
62
67
pub use rate_limited::{create_rate_limited_resolver, create_rate_limited_resolver_with_timeout};
63
68
pub use redis::{create_redis_resolver, create_redis_resolver_with_ttl};
64
69
pub use sqlite::{create_sqlite_resolver, create_sqlite_resolver_with_ttl};
+393
src/handle_resolver/proactive_refresh.rs
+393
src/handle_resolver/proactive_refresh.rs
···
1
+
use crate::handle_resolution_result::HandleResolutionResult;
2
+
use crate::handle_resolver::{HandleResolver, HandleResolverError};
3
+
use crate::queue::{HandleResolutionWork, QueueAdapter};
4
+
use async_trait::async_trait;
5
+
use std::sync::Arc;
6
+
use std::time::{SystemTime, UNIX_EPOCH};
7
+
use tracing::{debug, trace};
8
+
9
+
/// Create a ProactiveRefreshResolver with default 80% threshold
10
+
///
11
+
/// # Arguments
12
+
/// * `inner` - The inner resolver to wrap
13
+
/// * `queue` - The queue adapter for background refresh tasks
14
+
/// * `cache_ttl` - The TTL in seconds for cache entries
15
+
pub fn create_proactive_refresh_resolver<R, Q>(
16
+
inner: Arc<R>,
17
+
queue: Arc<Q>,
18
+
cache_ttl: u64,
19
+
) -> Arc<ProactiveRefreshResolver<R, Q>>
20
+
where
21
+
R: HandleResolver + Send + Sync + 'static,
22
+
Q: QueueAdapter<HandleResolutionWork> + Send + Sync + 'static,
23
+
{
24
+
Arc::new(ProactiveRefreshResolver::new(inner, queue, cache_ttl))
25
+
}
26
+
27
+
/// Create a ProactiveRefreshResolver with custom threshold
28
+
///
29
+
/// # Arguments
30
+
/// * `inner` - The inner resolver to wrap
31
+
/// * `queue` - The queue adapter for background refresh tasks
32
+
/// * `cache_ttl` - The TTL in seconds for cache entries
33
+
/// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh
34
+
pub fn create_proactive_refresh_resolver_with_threshold<R, Q>(
35
+
inner: Arc<R>,
36
+
queue: Arc<Q>,
37
+
cache_ttl: u64,
38
+
threshold: f64,
39
+
) -> Arc<ProactiveRefreshResolver<R, Q>>
40
+
where
41
+
R: HandleResolver + Send + Sync + 'static,
42
+
Q: QueueAdapter<HandleResolutionWork> + Send + Sync + 'static,
43
+
{
44
+
Arc::new(ProactiveRefreshResolver::with_threshold(
45
+
inner, queue, cache_ttl, threshold,
46
+
))
47
+
}
48
+
49
+
/// Wrapper struct for dynamic dispatch with proactive refresh
50
+
/// This works with trait objects instead of concrete types
51
+
pub struct DynProactiveRefreshResolver {
52
+
inner: Arc<dyn HandleResolver>,
53
+
queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
54
+
#[allow(dead_code)]
55
+
cache_ttl: u64,
56
+
#[allow(dead_code)]
57
+
refresh_threshold: f64,
58
+
}
59
+
60
+
impl DynProactiveRefreshResolver {
61
+
pub fn new(
62
+
inner: Arc<dyn HandleResolver>,
63
+
queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
64
+
cache_ttl: u64,
65
+
refresh_threshold: f64,
66
+
) -> Self {
67
+
Self {
68
+
inner,
69
+
queue,
70
+
cache_ttl,
71
+
refresh_threshold: refresh_threshold.clamp(0.0, 1.0),
72
+
}
73
+
}
74
+
75
+
async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) {
76
+
// If resolution took less than 5ms, it was probably a cache hit
77
+
if resolve_time < 5000 {
78
+
trace!(
79
+
handle = handle,
80
+
resolve_time_us = resolve_time,
81
+
"Fast resolution detected, considering proactive refresh"
82
+
);
83
+
84
+
// Simple heuristic: queue for refresh with some probability
85
+
let now = SystemTime::now()
86
+
.duration_since(UNIX_EPOCH)
87
+
.unwrap_or_default()
88
+
.as_secs();
89
+
90
+
// Queue every N seconds for frequently accessed handles
91
+
if now % 60 == 0 {
92
+
let work = HandleResolutionWork {
93
+
handle: handle.to_string(),
94
+
};
95
+
96
+
if let Err(e) = self.queue.push(work).await {
97
+
debug!(
98
+
handle = handle,
99
+
error = %e,
100
+
"Failed to queue handle for proactive refresh"
101
+
);
102
+
} else {
103
+
debug!(handle = handle, "Queued handle for proactive refresh");
104
+
}
105
+
}
106
+
}
107
+
}
108
+
}
109
+
110
+
#[async_trait]
111
+
impl HandleResolver for DynProactiveRefreshResolver {
112
+
async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> {
113
+
// Resolve through the inner resolver
114
+
let (did, resolve_time) = self.inner.resolve(handle).await?;
115
+
116
+
// Check if we should queue for refresh based on resolution time
117
+
self.maybe_queue_for_refresh(handle, resolve_time).await;
118
+
119
+
Ok((did, resolve_time))
120
+
}
121
+
}
122
+
123
+
/// Create a ProactiveRefreshResolver with custom threshold using trait objects
124
+
/// This version works with dyn HandleResolver and dyn QueueAdapter
125
+
///
126
+
/// # Arguments
127
+
/// * `inner` - The inner resolver to wrap
128
+
/// * `queue` - The queue adapter for background refresh tasks
129
+
/// * `cache_ttl` - The TTL in seconds for cache entries
130
+
/// * `threshold` - The threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh
131
+
pub fn create_proactive_refresh_resolver_dyn(
132
+
inner: Arc<dyn HandleResolver>,
133
+
queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
134
+
cache_ttl: u64,
135
+
threshold: f64,
136
+
) -> Arc<dyn HandleResolver> {
137
+
Arc::new(DynProactiveRefreshResolver::new(
138
+
inner, queue, cache_ttl, threshold,
139
+
))
140
+
}
141
+
142
+
/// A handle resolver that proactively refreshes cache entries when they reach
143
+
/// a certain staleness threshold (default 80% of TTL).
144
+
///
145
+
/// This resolver wraps another resolver and checks successful resolutions from cache.
146
+
/// When a cached entry has lived for more than the threshold percentage of its TTL,
147
+
/// it queues the handle for background refresh to keep the cache warm.
148
+
///
149
+
/// Note: Due to the current trait design, this implementation uses the resolution time
150
+
/// as a heuristic. When resolve_time is 0 (instant cache hit), it may queue for refresh.
151
+
/// For full functionality, the trait would need to expose cache timestamps.
152
+
pub struct ProactiveRefreshResolver<R: HandleResolver, Q: QueueAdapter<HandleResolutionWork>> {
153
+
inner: Arc<R>,
154
+
queue: Arc<Q>,
155
+
/// TTL in seconds for cache entries
156
+
cache_ttl: u64,
157
+
/// Threshold as a percentage (0.0 to 1.0) of TTL when to trigger refresh
158
+
/// Default is 0.8 (80%)
159
+
refresh_threshold: f64,
160
+
}
161
+
162
+
impl<R: HandleResolver, Q: QueueAdapter<HandleResolutionWork>> ProactiveRefreshResolver<R, Q> {
163
+
pub fn new(inner: Arc<R>, queue: Arc<Q>, cache_ttl: u64) -> Self {
164
+
Self::with_threshold(inner, queue, cache_ttl, 0.8)
165
+
}
166
+
167
+
pub fn with_threshold(
168
+
inner: Arc<R>,
169
+
queue: Arc<Q>,
170
+
cache_ttl: u64,
171
+
refresh_threshold: f64,
172
+
) -> Self {
173
+
Self {
174
+
inner,
175
+
queue,
176
+
cache_ttl,
177
+
refresh_threshold: refresh_threshold.clamp(0.0, 1.0),
178
+
}
179
+
}
180
+
181
+
/// Check if a cached entry needs proactive refresh based on its age
182
+
#[allow(dead_code)]
183
+
fn needs_refresh(&self, result: &HandleResolutionResult) -> bool {
184
+
let now = SystemTime::now()
185
+
.duration_since(UNIX_EPOCH)
186
+
.unwrap_or_default()
187
+
.as_secs();
188
+
189
+
let age = now.saturating_sub(result.timestamp);
190
+
let threshold = (self.cache_ttl as f64 * self.refresh_threshold) as u64;
191
+
192
+
let needs_refresh = age >= threshold;
193
+
194
+
if needs_refresh {
195
+
debug!(
196
+
handle = ?result.to_did(),
197
+
age_seconds = age,
198
+
threshold_seconds = threshold,
199
+
cache_ttl = self.cache_ttl,
200
+
"Cache entry needs proactive refresh"
201
+
);
202
+
} else {
203
+
trace!(
204
+
handle = ?result.to_did(),
205
+
age_seconds = age,
206
+
threshold_seconds = threshold,
207
+
"Cache entry still fresh"
208
+
);
209
+
}
210
+
211
+
needs_refresh
212
+
}
213
+
214
+
/// Queue a handle for background refresh
215
+
async fn queue_for_refresh(&self, handle: &str) {
216
+
let work = HandleResolutionWork {
217
+
handle: handle.to_string(),
218
+
};
219
+
220
+
match self.queue.push(work).await {
221
+
Ok(_) => {
222
+
debug!(handle = handle, "Queued handle for proactive refresh");
223
+
}
224
+
Err(e) => {
225
+
// Don't fail the request if we can't queue for refresh
226
+
debug!(
227
+
handle = handle,
228
+
error = %e,
229
+
"Failed to queue handle for proactive refresh"
230
+
);
231
+
}
232
+
}
233
+
}
234
+
235
+
/// Check if we should queue for refresh based on resolution time
236
+
///
237
+
/// This is a heuristic approach:
238
+
/// - If resolve_time is very low (< 5ms), it was likely a cache hit
239
+
/// - We probabilistically queue for refresh based on time since service start
240
+
///
241
+
/// For proper implementation, the HandleResolver trait would need to expose
242
+
/// cache metadata or return HandleResolutionResult directly.
243
+
async fn maybe_queue_for_refresh(&self, handle: &str, resolve_time: u64) {
244
+
// If resolution took less than 5ms, it was probably a cache hit
245
+
if resolve_time < 5000 {
246
+
// Use a simple probabilistic approach for demonstration
247
+
// In production, you'd want access to the actual cache timestamp
248
+
trace!(
249
+
handle = handle,
250
+
resolve_time_us = resolve_time,
251
+
"Fast resolution detected, considering proactive refresh"
252
+
);
253
+
254
+
// Queue for refresh with some probability to avoid overwhelming the queue
255
+
// This is a simplified approach - ideally we'd have access to cache metadata
256
+
let now = SystemTime::now()
257
+
.duration_since(UNIX_EPOCH)
258
+
.unwrap_or_default()
259
+
.as_secs();
260
+
261
+
// Simple heuristic: queue every N seconds for frequently accessed handles
262
+
if now % 60 == 0 {
263
+
self.queue_for_refresh(handle).await;
264
+
}
265
+
}
266
+
}
267
+
}
268
+
269
+
#[async_trait]
270
+
impl<R, Q> HandleResolver for ProactiveRefreshResolver<R, Q>
271
+
where
272
+
R: HandleResolver + Send + Sync,
273
+
Q: QueueAdapter<HandleResolutionWork> + Send + Sync,
274
+
{
275
+
async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> {
276
+
// Resolve through the inner resolver
277
+
let (did, resolve_time) = self.inner.resolve(handle).await?;
278
+
279
+
// Check if we should queue for refresh based on resolution time
280
+
self.maybe_queue_for_refresh(handle, resolve_time).await;
281
+
282
+
Ok((did, resolve_time))
283
+
}
284
+
}
285
+
286
+
#[cfg(test)]
287
+
mod tests {
288
+
use super::*;
289
+
use crate::handle_resolution_result::DidMethodType;
290
+
291
+
#[test]
292
+
fn test_needs_refresh_calculation() {
293
+
// Create a resolver with 100 second TTL and 80% threshold
294
+
let inner = Arc::new(MockResolver);
295
+
let queue = Arc::new(MockQueueAdapter);
296
+
let resolver = ProactiveRefreshResolver::new(inner, queue, 100);
297
+
298
+
let now = SystemTime::now()
299
+
.duration_since(UNIX_EPOCH)
300
+
.unwrap()
301
+
.as_secs();
302
+
303
+
// Test entry that's 50% through TTL (should not refresh)
304
+
let fresh_result = HandleResolutionResult {
305
+
timestamp: now - 50,
306
+
method_type: DidMethodType::Plc,
307
+
payload: "alice123".to_string(),
308
+
};
309
+
assert!(!resolver.needs_refresh(&fresh_result));
310
+
311
+
// Test entry that's 80% through TTL (should refresh)
312
+
let stale_result = HandleResolutionResult {
313
+
timestamp: now - 80,
314
+
method_type: DidMethodType::Plc,
315
+
payload: "alice123".to_string(),
316
+
};
317
+
assert!(resolver.needs_refresh(&stale_result));
318
+
319
+
// Test entry that's 90% through TTL (should definitely refresh)
320
+
let very_stale_result = HandleResolutionResult {
321
+
timestamp: now - 90,
322
+
method_type: DidMethodType::Plc,
323
+
payload: "alice123".to_string(),
324
+
};
325
+
assert!(resolver.needs_refresh(&very_stale_result));
326
+
}
327
+
328
+
#[test]
329
+
fn test_custom_threshold() {
330
+
let inner = Arc::new(MockResolver);
331
+
let queue = Arc::new(MockQueueAdapter);
332
+
333
+
// Create resolver with 50% threshold
334
+
let resolver = ProactiveRefreshResolver::with_threshold(inner, queue, 100, 0.5);
335
+
336
+
let now = SystemTime::now()
337
+
.duration_since(UNIX_EPOCH)
338
+
.unwrap()
339
+
.as_secs();
340
+
341
+
// Test entry that's 40% through TTL (should not refresh with 50% threshold)
342
+
let result_40 = HandleResolutionResult {
343
+
timestamp: now - 40,
344
+
method_type: DidMethodType::Plc,
345
+
payload: "alice123".to_string(),
346
+
};
347
+
assert!(!resolver.needs_refresh(&result_40));
348
+
349
+
// Test entry that's 60% through TTL (should refresh with 50% threshold)
350
+
let result_60 = HandleResolutionResult {
351
+
timestamp: now - 60,
352
+
method_type: DidMethodType::Plc,
353
+
payload: "alice123".to_string(),
354
+
};
355
+
assert!(resolver.needs_refresh(&result_60));
356
+
}
357
+
358
+
// Mock resolver for testing
359
+
struct MockResolver;
360
+
361
+
#[async_trait]
362
+
impl HandleResolver for MockResolver {
363
+
async fn resolve(&self, handle: &str) -> Result<(String, u64), HandleResolverError> {
364
+
Ok((format!("did:plc:{}", handle), 1000))
365
+
}
366
+
}
367
+
368
+
// Mock queue adapter for testing
369
+
struct MockQueueAdapter;
370
+
371
+
#[async_trait]
372
+
impl QueueAdapter<HandleResolutionWork> for MockQueueAdapter {
373
+
async fn pull(&self) -> Option<HandleResolutionWork> {
374
+
None
375
+
}
376
+
377
+
async fn push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> {
378
+
Ok(())
379
+
}
380
+
381
+
async fn ack(&self, _item: &HandleResolutionWork) -> crate::queue::Result<()> {
382
+
Ok(())
383
+
}
384
+
385
+
async fn try_push(&self, _work: HandleResolutionWork) -> crate::queue::Result<()> {
386
+
Ok(())
387
+
}
388
+
389
+
async fn is_healthy(&self) -> bool {
390
+
true
391
+
}
392
+
}
393
+
}
+27
-8
src/handle_resolver/rate_limited.rs
+27
-8
src/handle_resolver/rate_limited.rs
···
76
76
/// * `inner` - The inner resolver to wrap
77
77
/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
78
78
/// * `metrics` - Metrics publisher for telemetry
79
-
pub fn new(inner: Arc<dyn HandleResolver>, max_concurrent: usize, metrics: SharedMetricsPublisher) -> Self {
79
+
pub fn new(
80
+
inner: Arc<dyn HandleResolver>,
81
+
max_concurrent: usize,
82
+
metrics: SharedMetricsPublisher,
83
+
) -> Self {
80
84
Self {
81
85
inner,
82
86
semaphore: Arc::new(Semaphore::new(max_concurrent)),
···
116
120
impl HandleResolver for RateLimitedHandleResolver {
117
121
async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> {
118
122
let permit_start = std::time::Instant::now();
119
-
123
+
120
124
// Track rate limiter queue depth
121
125
let available_permits = self.semaphore.available_permits();
122
-
self.metrics.gauge("resolver.rate_limit.available_permits", available_permits as u64).await;
123
-
126
+
self.metrics
127
+
.gauge(
128
+
"resolver.rate_limit.available_permits",
129
+
available_permits as u64,
130
+
)
131
+
.await;
132
+
124
133
// Acquire a permit from the semaphore, with optional timeout
125
134
let _permit = match self.timeout_ms {
126
135
Some(timeout_ms) if timeout_ms > 0 => {
···
129
138
match timeout(duration, self.semaphore.acquire()).await {
130
139
Ok(Ok(permit)) => {
131
140
let wait_ms = permit_start.elapsed().as_millis() as u64;
132
-
self.metrics.time("resolver.rate_limit.permit_acquired", wait_ms).await;
141
+
self.metrics
142
+
.time("resolver.rate_limit.permit_acquired", wait_ms)
143
+
.await;
133
144
permit
134
145
}
135
146
Ok(Err(e)) => {
···
142
153
}
143
154
Err(_) => {
144
155
// Timeout occurred
145
-
self.metrics.incr("resolver.rate_limit.permit_timeout").await;
156
+
self.metrics
157
+
.incr("resolver.rate_limit.permit_timeout")
158
+
.await;
146
159
return Err(HandleResolverError::ResolutionFailed(format!(
147
160
"Rate limit permit acquisition timed out after {}ms",
148
161
timeout_ms
···
155
168
match self.semaphore.acquire().await {
156
169
Ok(permit) => {
157
170
let wait_ms = permit_start.elapsed().as_millis() as u64;
158
-
self.metrics.time("resolver.rate_limit.permit_acquired", wait_ms).await;
171
+
self.metrics
172
+
.time("resolver.rate_limit.permit_acquired", wait_ms)
173
+
.await;
159
174
permit
160
175
}
161
176
Err(e) => {
···
214
229
max_concurrent: usize,
215
230
metrics: SharedMetricsPublisher,
216
231
) -> Arc<dyn HandleResolver> {
217
-
Arc::new(RateLimitedHandleResolver::new(inner, max_concurrent, metrics))
232
+
Arc::new(RateLimitedHandleResolver::new(
233
+
inner,
234
+
max_concurrent,
235
+
metrics,
236
+
))
218
237
}
219
238
220
239
/// Create a rate-limited handle resolver with timeout.
+28
-8
src/handle_resolver/redis.rs
+28
-8
src/handle_resolver/redis.rs
···
67
67
68
68
impl RedisHandleResolver {
69
69
/// Create a new Redis-backed handle resolver with default 90-day TTL.
70
-
fn new(inner: Arc<dyn HandleResolver>, pool: RedisPool, metrics: SharedMetricsPublisher) -> Self {
70
+
fn new(
71
+
inner: Arc<dyn HandleResolver>,
72
+
pool: RedisPool,
73
+
metrics: SharedMetricsPublisher,
74
+
) -> Self {
71
75
Self::with_ttl(inner, pool, 90 * 24 * 60 * 60, metrics) // 90 days default
72
76
}
73
77
74
78
/// Create a new Redis-backed handle resolver with custom TTL.
75
-
fn with_ttl(inner: Arc<dyn HandleResolver>, pool: RedisPool, ttl_seconds: u64, metrics: SharedMetricsPublisher) -> Self {
79
+
fn with_ttl(
80
+
inner: Arc<dyn HandleResolver>,
81
+
pool: RedisPool,
82
+
ttl_seconds: u64,
83
+
metrics: SharedMetricsPublisher,
84
+
) -> Self {
76
85
Self::with_full_config(inner, pool, "handle:".to_string(), ttl_seconds, metrics)
77
86
}
78
87
···
139
148
return Ok((did, cached_result.timestamp));
140
149
} else {
141
150
tracing::debug!("Cache hit (not resolved) for handle {}", handle);
142
-
self.metrics.incr("resolver.redis.cache_hit_not_resolved").await;
151
+
self.metrics
152
+
.incr("resolver.redis.cache_hit_not_resolved")
153
+
.await;
143
154
return Err(HandleResolverError::HandleNotFound);
144
155
}
145
156
}
···
172
183
Ok(res) => res,
173
184
Err(e) => {
174
185
tracing::warn!("Failed to create resolution result: {}", e);
175
-
self.metrics.incr("resolver.redis.result_create_error").await;
186
+
self.metrics
187
+
.incr("resolver.redis.result_create_error")
188
+
.await;
176
189
return result;
177
190
}
178
191
}
···
183
196
Ok(res) => res,
184
197
Err(err) => {
185
198
tracing::warn!("Failed to create not_resolved result: {}", err);
186
-
self.metrics.incr("resolver.redis.result_create_error").await;
199
+
self.metrics
200
+
.incr("resolver.redis.result_create_error")
201
+
.await;
187
202
return result;
188
203
}
189
204
}
···
285
300
ttl_seconds: u64,
286
301
metrics: SharedMetricsPublisher,
287
302
) -> Arc<dyn HandleResolver> {
288
-
Arc::new(RedisHandleResolver::with_ttl(inner, pool, ttl_seconds, metrics))
303
+
Arc::new(RedisHandleResolver::with_ttl(
304
+
inner,
305
+
pool,
306
+
ttl_seconds,
307
+
metrics,
308
+
))
289
309
}
290
310
291
311
#[cfg(test)]
···
329
349
330
350
// Create metrics publisher
331
351
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
332
-
352
+
333
353
// Create Redis-backed resolver with a unique key prefix for testing
334
354
let test_prefix = format!(
335
355
"test:handle:{}:",
···
381
401
382
402
// Create metrics publisher
383
403
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
384
-
404
+
385
405
// Create Redis-backed resolver with a unique key prefix for testing
386
406
let test_prefix = format!(
387
407
"test:handle:{}:",
+32
-10
src/handle_resolver/sqlite.rs
+32
-10
src/handle_resolver/sqlite.rs
···
67
67
68
68
impl SqliteHandleResolver {
69
69
/// Create a new SQLite-backed handle resolver with default 90-day TTL.
70
-
fn new(inner: Arc<dyn HandleResolver>, pool: SqlitePool, metrics: SharedMetricsPublisher) -> Self {
70
+
fn new(
71
+
inner: Arc<dyn HandleResolver>,
72
+
pool: SqlitePool,
73
+
metrics: SharedMetricsPublisher,
74
+
) -> Self {
71
75
Self::with_ttl(inner, pool, 90 * 24 * 60 * 60, metrics) // 90 days default
72
76
}
73
77
74
78
/// Create a new SQLite-backed handle resolver with custom TTL.
75
-
fn with_ttl(inner: Arc<dyn HandleResolver>, pool: SqlitePool, ttl_seconds: u64, metrics: SharedMetricsPublisher) -> Self {
79
+
fn with_ttl(
80
+
inner: Arc<dyn HandleResolver>,
81
+
pool: SqlitePool,
82
+
ttl_seconds: u64,
83
+
metrics: SharedMetricsPublisher,
84
+
) -> Self {
76
85
Self {
77
86
inner,
78
87
pool,
···
132
141
return Ok((did, cached_result.timestamp));
133
142
} else {
134
143
tracing::debug!("Cache hit (not resolved) for handle {}", handle);
135
-
self.metrics.incr("resolver.sqlite.cache_hit_not_resolved").await;
144
+
self.metrics
145
+
.incr("resolver.sqlite.cache_hit_not_resolved")
146
+
.await;
136
147
return Err(HandleResolverError::HandleNotFound);
137
148
}
138
149
}
···
178
189
Ok(res) => res,
179
190
Err(e) => {
180
191
tracing::warn!("Failed to create resolution result: {}", e);
181
-
self.metrics.incr("resolver.sqlite.result_create_error").await;
192
+
self.metrics
193
+
.incr("resolver.sqlite.result_create_error")
194
+
.await;
182
195
return result;
183
196
}
184
197
}
···
189
202
Ok(res) => res,
190
203
Err(err) => {
191
204
tracing::warn!("Failed to create not_resolved result: {}", err);
192
-
self.metrics.incr("resolver.sqlite.result_create_error").await;
205
+
self.metrics
206
+
.incr("resolver.sqlite.result_create_error")
207
+
.await;
193
208
return result;
194
209
}
195
210
}
···
298
313
ttl_seconds: u64,
299
314
metrics: SharedMetricsPublisher,
300
315
) -> Arc<dyn HandleResolver> {
301
-
Arc::new(SqliteHandleResolver::with_ttl(inner, pool, ttl_seconds, metrics))
316
+
Arc::new(SqliteHandleResolver::with_ttl(
317
+
inner,
318
+
pool,
319
+
ttl_seconds,
320
+
metrics,
321
+
))
302
322
}
303
323
304
324
#[cfg(test)]
···
347
367
348
368
// Create metrics publisher
349
369
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
350
-
370
+
351
371
// Create SQLite-backed resolver
352
-
let sqlite_resolver = SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics);
372
+
let sqlite_resolver =
373
+
SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics);
353
374
354
375
let test_handle = "alice.bsky.social";
355
376
let expected_key = sqlite_resolver.make_key(test_handle) as i64;
···
439
460
440
461
// Create metrics publisher
441
462
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
442
-
463
+
443
464
// Create SQLite-backed resolver
444
-
let sqlite_resolver = SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics);
465
+
let sqlite_resolver =
466
+
SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics);
445
467
446
468
let test_handle = "error.bsky.social";
447
469
let expected_key = sqlite_resolver.make_key(test_handle) as i64;
+25
-7
src/handle_resolver_task.rs
+25
-7
src/handle_resolver_task.rs
···
169
169
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
170
170
171
171
// Publish metrics
172
-
self.metrics_publisher.incr("task.handle_resolution.processed").await;
173
-
self.metrics_publisher.time("task.handle_resolution.duration_ms", duration_ms).await;
172
+
self.metrics_publisher
173
+
.incr("task.handle_resolution.processed")
174
+
.await;
175
+
self.metrics_publisher
176
+
.time("task.handle_resolution.duration_ms", duration_ms)
177
+
.await;
174
178
175
179
match result {
176
180
Ok(Ok((did, _timestamp))) => {
···
182
186
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
183
187
184
188
// Publish success metrics
185
-
self.metrics_publisher.incr("task.handle_resolution.success").await;
186
-
self.metrics_publisher.incr("task.handle_resolution.cached").await;
189
+
self.metrics_publisher
190
+
.incr("task.handle_resolution.success")
191
+
.await;
192
+
self.metrics_publisher
193
+
.incr("task.handle_resolution.cached")
194
+
.await;
187
195
188
196
info!(
189
197
handle = %work.handle,
···
198
206
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
199
207
200
208
// Publish failure metrics
201
-
self.metrics_publisher.incr("task.handle_resolution.failed").await;
209
+
self.metrics_publisher
210
+
.incr("task.handle_resolution.failed")
211
+
.await;
202
212
203
213
error!(
204
214
handle = %work.handle,
···
213
223
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
214
224
215
225
// Publish timeout metrics
216
-
self.metrics_publisher.incr("task.handle_resolution.timeout").await;
226
+
self.metrics_publisher
227
+
.incr("task.handle_resolution.timeout")
228
+
.await;
217
229
218
230
error!(
219
231
handle = %work.handle,
···
277
289
metrics_publisher: SharedMetricsPublisher,
278
290
) -> HandleResolverTaskHandle {
279
291
HandleResolverTaskHandle {
280
-
task: HandleResolverTask::with_config(adapter, handle_resolver, cancel_token, config, metrics_publisher),
292
+
task: HandleResolverTask::with_config(
293
+
adapter,
294
+
handle_resolver,
295
+
cancel_token,
296
+
config,
297
+
metrics_publisher,
298
+
),
281
299
}
282
300
}
283
301
+33
-8
src/http/handle_xrpc_resolve_handle.rs
+33
-8
src/http/handle_xrpc_resolve_handle.rs
···
203
203
let handle = match params.handle {
204
204
Some(h) => h,
205
205
None => {
206
-
metrics.incr_with_tags("xrpc.com.atproto.identity.resolveHandle.invalid_handle", &[("reason", "missing")]).await;
206
+
metrics
207
+
.incr_with_tags(
208
+
"xrpc.com.atproto.identity.resolveHandle.invalid_handle",
209
+
&[("reason", "missing")],
210
+
)
211
+
.await;
207
212
return (
208
213
StatusCode::BAD_REQUEST,
209
214
Json(ErrorResponse {
···
220
225
Ok(InputType::Handle(value)) => value,
221
226
Ok(InputType::Plc(_)) | Ok(InputType::Web(_)) => {
222
227
// It's a DID, not a handle
223
-
metrics.incr_with_tags("xrpc.com.atproto.identity.resolveHandle.invalid_handle", &[("reason", "did")]).await;
228
+
metrics
229
+
.incr_with_tags(
230
+
"xrpc.com.atproto.identity.resolveHandle.invalid_handle",
231
+
&[("reason", "did")],
232
+
)
233
+
.await;
224
234
return (
225
235
StatusCode::BAD_REQUEST,
226
236
Json(ErrorResponse {
···
231
241
.into_response();
232
242
}
233
243
Err(_) => {
234
-
metrics.incr_with_tags("xrpc.com.atproto.identity.resolveHandle.invalid_handle", &[("reason", "error")]).await;
244
+
metrics
245
+
.incr_with_tags(
246
+
"xrpc.com.atproto.identity.resolveHandle.invalid_handle",
247
+
&[("reason", "error")],
248
+
)
249
+
.await;
235
250
return (
236
251
StatusCode::BAD_REQUEST,
237
252
Json(ErrorResponse {
···
244
259
};
245
260
246
261
if validating {
247
-
metrics.incr("xrpc.com.atproto.identity.resolveHandle").await;
262
+
metrics
263
+
.incr("xrpc.com.atproto.identity.resolveHandle")
264
+
.await;
248
265
return StatusCode::NO_CONTENT.into_response();
249
266
}
250
267
···
255
272
// Queue the work
256
273
match queue.push(work).await {
257
274
Ok(()) => {
258
-
metrics.incr("xrpc.com.atproto.identity.resolveHandle").await;
275
+
metrics
276
+
.incr("xrpc.com.atproto.identity.resolveHandle")
277
+
.await;
259
278
tracing::debug!("Queued handle resolution for {}", handle);
260
279
}
261
280
Err(e) => {
262
-
metrics.incr("xrpc.com.atproto.identity.resolveHandle.queue_failure").await;
281
+
metrics
282
+
.incr("xrpc.com.atproto.identity.resolveHandle.queue_failure")
283
+
.await;
263
284
tracing::error!("Failed to queue handle resolution: {}", e);
264
285
}
265
286
}
···
278
299
Ok((did, timestamp)) => {
279
300
tracing::debug!(handle, did, "Found cached DID for handle");
280
301
281
-
metrics.incr_with_tags("handle.resolution.request", &[("success", "1")]).await;
302
+
metrics
303
+
.incr_with_tags("handle.resolution.request", &[("success", "1")])
304
+
.await;
282
305
283
306
let etag = calculate_etag(&did, app_context.etag_seed());
284
307
ResolutionResult::Success {
···
289
312
}
290
313
Err(err) => {
291
314
tracing::debug!(error = ?err, handle, "Error resolving handle");
292
-
metrics.incr_with_tags("handle.resolution.request", &[("success", "0")]).await;
315
+
metrics
316
+
.incr_with_tags("handle.resolution.request", &[("success", "0")])
317
+
.await;
293
318
let error_content = format!("error:{}:{}", handle, err);
294
319
let etag = calculate_etag(&error_content, app_context.etag_seed());
295
320
let timestamp = SystemTime::now()
+13
-11
src/http/server.rs
+13
-11
src/http/server.rs
···
102
102
103
103
// Process the request
104
104
let response = next.run(request).await;
105
-
105
+
106
106
// Calculate duration
107
107
let duration_ms = start.elapsed().as_millis() as u64;
108
108
let status_code = response.status().as_u16().to_string();
109
-
109
+
110
110
// Publish metrics with tags
111
-
metrics.time_with_tags(
112
-
"http.request.duration_ms",
113
-
duration_ms,
114
-
&[
115
-
("method", &method),
116
-
("path", &path),
117
-
("status", &status_code),
118
-
],
119
-
).await;
111
+
metrics
112
+
.time_with_tags(
113
+
"http.request.duration_ms",
114
+
duration_ms,
115
+
&[
116
+
("method", &method),
117
+
("path", &path),
118
+
("status", &status_code),
119
+
],
120
+
)
121
+
.await;
120
122
121
123
response
122
124
}
+125
-73
src/metrics.rs
+125
-73
src/metrics.rs
···
1
1
use crate::config::Config;
2
2
use async_trait::async_trait;
3
-
use cadence::{BufferedUdpMetricSink, Counted, CountedExt, Gauged, Metric, QueuingMetricSink, StatsdClient, Timed};
3
+
use cadence::{
4
+
BufferedUdpMetricSink, Counted, CountedExt, Gauged, Metric, QueuingMetricSink, StatsdClient,
5
+
Timed,
6
+
};
4
7
use std::net::UdpSocket;
5
8
use std::sync::Arc;
6
9
use thiserror::Error;
···
91
94
pub fn new(host: &str, prefix: &str) -> Result<Self, Box<dyn std::error::Error>> {
92
95
Self::new_with_tags(host, prefix, vec![])
93
96
}
94
-
97
+
95
98
/// Create a new StatsdMetricsPublisher with default tags
96
99
pub fn new_with_tags(
97
-
host: &str,
98
-
prefix: &str,
99
-
default_tags: Vec<(String, String)>
100
+
host: &str,
101
+
prefix: &str,
102
+
default_tags: Vec<(String, String)>,
100
103
) -> Result<Self, Box<dyn std::error::Error>> {
101
-
tracing::info!("Creating StatsdMetricsPublisher: host={}, prefix={}, tags={:?}", host, prefix, default_tags);
102
-
104
+
tracing::info!(
105
+
"Creating StatsdMetricsPublisher: host={}, prefix={}, tags={:?}",
106
+
host,
107
+
prefix,
108
+
default_tags
109
+
);
110
+
103
111
// let socket = UdpSocket::bind("0.0.0.0:0")?;
104
112
let socket = UdpSocket::bind("[::0]:0")?;
105
113
socket.set_nonblocking(true)?;
106
-
114
+
107
115
let buffered_sink = BufferedUdpMetricSink::from(host, socket)?;
108
116
let queuing_sink = QueuingMetricSink::builder()
109
117
.with_error_handler(move |error| {
···
111
119
})
112
120
.build(buffered_sink);
113
121
let client = StatsdClient::from_sink(prefix, queuing_sink);
114
-
122
+
115
123
tracing::info!("StatsdMetricsPublisher created successfully");
116
-
Ok(Self { client, default_tags })
124
+
Ok(Self {
125
+
client,
126
+
default_tags,
127
+
})
117
128
}
118
-
129
+
119
130
/// Create from an existing StatsdClient
120
131
pub fn from_client(client: StatsdClient) -> Self {
121
132
Self::from_client_with_tags(client, vec![])
122
133
}
123
-
134
+
124
135
/// Create from an existing StatsdClient with default tags
125
-
pub fn from_client_with_tags(client: StatsdClient, default_tags: Vec<(String, String)>) -> Self {
126
-
Self { client, default_tags }
136
+
pub fn from_client_with_tags(
137
+
client: StatsdClient,
138
+
default_tags: Vec<(String, String)>,
139
+
) -> Self {
140
+
Self {
141
+
client,
142
+
default_tags,
143
+
}
127
144
}
128
-
145
+
129
146
/// Apply default tags to a builder
130
-
fn apply_default_tags<'a, M>(&'a self, mut builder: cadence::MetricBuilder<'a, 'a, M>) -> cadence::MetricBuilder<'a, 'a, M>
147
+
fn apply_default_tags<'a, M>(
148
+
&'a self,
149
+
mut builder: cadence::MetricBuilder<'a, 'a, M>,
150
+
) -> cadence::MetricBuilder<'a, 'a, M>
131
151
where
132
152
M: Metric + From<String>,
133
153
{
···
236
256
/// Failed to create metrics publisher
237
257
#[error("error-quickdid-metrics-1 Failed to create metrics publisher: {0}")]
238
258
CreationFailed(String),
239
-
259
+
240
260
/// Invalid configuration for metrics
241
261
#[error("error-quickdid-metrics-2 Invalid metrics configuration: {0}")]
242
262
InvalidConfig(String),
···
248
268
/// `metrics_adapter` configuration value.
249
269
///
250
270
/// ## Example
251
-
///
271
+
///
252
272
/// ```rust,no_run
253
273
/// use quickdid::config::Config;
254
274
/// use quickdid::metrics::create_metrics_publisher;
255
-
///
275
+
///
256
276
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
257
277
/// let config = Config::from_env()?;
258
278
/// let metrics = create_metrics_publisher(&config)?;
259
-
///
279
+
///
260
280
/// // Use the metrics publisher
261
281
/// metrics.incr("request.count").await;
262
282
/// # Ok(())
···
264
284
/// ```
265
285
pub fn create_metrics_publisher(config: &Config) -> Result<SharedMetricsPublisher, MetricsError> {
266
286
match config.metrics_adapter.as_str() {
267
-
"noop" => {
268
-
Ok(Arc::new(NoOpMetricsPublisher::new()))
269
-
}
287
+
"noop" => Ok(Arc::new(NoOpMetricsPublisher::new())),
270
288
"statsd" => {
271
-
let host = config.metrics_statsd_host.as_ref()
272
-
.ok_or_else(|| MetricsError::InvalidConfig(
273
-
"METRICS_STATSD_HOST is required when using statsd adapter".to_string()
274
-
))?;
275
-
289
+
let host = config.metrics_statsd_host.as_ref().ok_or_else(|| {
290
+
MetricsError::InvalidConfig(
291
+
"METRICS_STATSD_HOST is required when using statsd adapter".to_string(),
292
+
)
293
+
})?;
294
+
276
295
// Parse tags from comma-separated key:value pairs
277
296
let default_tags = if let Some(tags_str) = &config.metrics_tags {
278
297
tags_str
···
290
309
} else {
291
310
vec![]
292
311
};
293
-
294
-
let publisher = StatsdMetricsPublisher::new_with_tags(
295
-
host,
296
-
&config.metrics_prefix,
297
-
default_tags
298
-
).map_err(|e| MetricsError::CreationFailed(e.to_string()))?;
299
-
312
+
313
+
let publisher =
314
+
StatsdMetricsPublisher::new_with_tags(host, &config.metrics_prefix, default_tags)
315
+
.map_err(|e| MetricsError::CreationFailed(e.to_string()))?;
316
+
300
317
Ok(Arc::new(publisher))
301
318
}
302
-
_ => {
303
-
Err(MetricsError::InvalidConfig(format!(
304
-
"Unknown metrics adapter: {}",
305
-
config.metrics_adapter
306
-
)))
307
-
}
319
+
_ => Err(MetricsError::InvalidConfig(format!(
320
+
"Unknown metrics adapter: {}",
321
+
config.metrics_adapter
322
+
))),
308
323
}
309
324
}
310
325
311
326
#[cfg(test)]
312
327
mod tests {
313
328
use super::*;
329
+
use once_cell::sync::Lazy;
330
+
use std::sync::Mutex;
331
+
332
+
// Use a mutex to serialize tests that modify environment variables
333
+
static ENV_MUTEX: Lazy<Mutex<()>> = Lazy::new(|| Mutex::new(()));
314
334
315
335
#[tokio::test]
316
336
async fn test_noop_metrics() {
317
337
let metrics = NoOpMetricsPublisher::new();
318
-
338
+
319
339
// These should all be no-ops and not panic
320
340
metrics.incr("test.counter").await;
321
341
metrics.count("test.counter", 5).await;
322
-
metrics.incr_with_tags("test.counter", &[("env", "test")]).await;
323
-
metrics.count_with_tags("test.counter", 10, &[("env", "test"), ("service", "quickdid")]).await;
342
+
metrics
343
+
.incr_with_tags("test.counter", &[("env", "test")])
344
+
.await;
345
+
metrics
346
+
.count_with_tags(
347
+
"test.counter",
348
+
10,
349
+
&[("env", "test"), ("service", "quickdid")],
350
+
)
351
+
.await;
324
352
metrics.gauge("test.gauge", 100).await;
325
-
metrics.gauge_with_tags("test.gauge", 200, &[("host", "localhost")]).await;
353
+
metrics
354
+
.gauge_with_tags("test.gauge", 200, &[("host", "localhost")])
355
+
.await;
326
356
metrics.time("test.timing", 42).await;
327
-
metrics.time_with_tags("test.timing", 84, &[("endpoint", "/resolve")]).await;
357
+
metrics
358
+
.time_with_tags("test.timing", 84, &[("endpoint", "/resolve")])
359
+
.await;
328
360
}
329
361
330
362
#[tokio::test]
331
363
async fn test_shared_metrics() {
332
364
let metrics: SharedMetricsPublisher = Arc::new(NoOpMetricsPublisher::new());
333
-
365
+
334
366
// Verify it can be used as a shared reference
335
367
metrics.incr("shared.counter").await;
336
368
metrics.gauge("shared.gauge", 50).await;
337
-
369
+
338
370
// Verify it can be cloned
339
371
let metrics2 = Arc::clone(&metrics);
340
372
metrics2.count("cloned.counter", 3).await;
341
373
}
342
-
374
+
343
375
#[test]
344
376
fn test_create_noop_publisher() {
345
377
use std::env;
346
-
378
+
379
+
// Lock mutex to prevent concurrent environment variable modification
380
+
let _guard = ENV_MUTEX.lock().unwrap();
381
+
347
382
// Clean up any existing environment variables first
348
383
unsafe {
349
384
env::remove_var("METRICS_ADAPTER");
···
351
386
env::remove_var("METRICS_PREFIX");
352
387
env::remove_var("METRICS_TAGS");
353
388
}
354
-
389
+
355
390
// Set up environment for noop adapter
356
391
unsafe {
357
392
env::set_var("HTTP_EXTERNAL", "test.example.com");
358
393
env::set_var("SERVICE_KEY", "did:key:test");
359
394
env::set_var("METRICS_ADAPTER", "noop");
360
395
}
361
-
396
+
362
397
let config = Config::from_env().unwrap();
363
398
let metrics = create_metrics_publisher(&config).unwrap();
364
-
399
+
365
400
// Should create successfully - actual type checking happens at compile time
366
401
assert!(Arc::strong_count(&metrics) == 1);
367
-
402
+
368
403
// Clean up
369
404
unsafe {
370
405
env::remove_var("METRICS_ADAPTER");
406
+
env::remove_var("HTTP_EXTERNAL");
407
+
env::remove_var("SERVICE_KEY");
371
408
}
372
409
}
373
-
410
+
374
411
#[test]
375
412
fn test_create_statsd_publisher() {
376
413
use std::env;
377
-
414
+
415
+
// Lock mutex to prevent concurrent environment variable modification
416
+
let _guard = ENV_MUTEX.lock().unwrap();
417
+
378
418
// Clean up any existing environment variables first
379
419
unsafe {
380
420
env::remove_var("METRICS_ADAPTER");
···
382
422
env::remove_var("METRICS_PREFIX");
383
423
env::remove_var("METRICS_TAGS");
384
424
}
385
-
425
+
386
426
// Set up environment for statsd adapter
387
427
unsafe {
388
428
env::set_var("HTTP_EXTERNAL", "test.example.com");
···
392
432
env::set_var("METRICS_PREFIX", "test");
393
433
env::set_var("METRICS_TAGS", "env:test,service:quickdid");
394
434
}
395
-
435
+
396
436
let config = Config::from_env().unwrap();
397
437
let metrics = create_metrics_publisher(&config).unwrap();
398
-
438
+
399
439
// Should create successfully
400
440
assert!(Arc::strong_count(&metrics) == 1);
401
-
441
+
402
442
// Clean up
403
443
unsafe {
404
444
env::remove_var("METRICS_ADAPTER");
405
445
env::remove_var("METRICS_STATSD_HOST");
406
446
env::remove_var("METRICS_PREFIX");
407
447
env::remove_var("METRICS_TAGS");
448
+
env::remove_var("HTTP_EXTERNAL");
449
+
env::remove_var("SERVICE_KEY");
408
450
}
409
451
}
410
-
452
+
411
453
#[test]
412
454
fn test_missing_statsd_host() {
413
455
use std::env;
414
-
456
+
457
+
// Lock mutex to prevent concurrent environment variable modification
458
+
let _guard = ENV_MUTEX.lock().unwrap();
459
+
415
460
// Clean up any existing environment variables first
416
461
unsafe {
417
462
env::remove_var("METRICS_ADAPTER");
···
419
464
env::remove_var("METRICS_PREFIX");
420
465
env::remove_var("METRICS_TAGS");
421
466
}
422
-
467
+
423
468
// Set up environment for statsd adapter without host
424
469
unsafe {
425
470
env::set_var("HTTP_EXTERNAL", "test.example.com");
···
427
472
env::set_var("METRICS_ADAPTER", "statsd");
428
473
env::remove_var("METRICS_STATSD_HOST");
429
474
}
430
-
475
+
431
476
let config = Config::from_env().unwrap();
432
477
let result = create_metrics_publisher(&config);
433
-
478
+
434
479
// Should fail with invalid config error
435
480
assert!(result.is_err());
436
481
if let Err(e) = result {
437
482
assert!(matches!(e, MetricsError::InvalidConfig(_)));
438
483
}
439
-
484
+
440
485
// Clean up
441
486
unsafe {
442
487
env::remove_var("METRICS_ADAPTER");
488
+
env::remove_var("HTTP_EXTERNAL");
489
+
env::remove_var("SERVICE_KEY");
443
490
}
444
491
}
445
-
492
+
446
493
#[test]
447
494
fn test_invalid_adapter() {
448
495
use std::env;
449
-
496
+
497
+
// Lock mutex to prevent concurrent environment variable modification
498
+
let _guard = ENV_MUTEX.lock().unwrap();
499
+
450
500
// Clean up any existing environment variables first
451
501
unsafe {
452
502
env::remove_var("METRICS_ADAPTER");
···
454
504
env::remove_var("METRICS_PREFIX");
455
505
env::remove_var("METRICS_TAGS");
456
506
}
457
-
507
+
458
508
// Set up environment with invalid adapter
459
509
unsafe {
460
510
env::set_var("HTTP_EXTERNAL", "test.example.com");
···
462
512
env::set_var("METRICS_ADAPTER", "invalid");
463
513
env::remove_var("METRICS_STATSD_HOST"); // Clean up from other tests
464
514
}
465
-
515
+
466
516
let config = Config::from_env().unwrap();
467
-
517
+
468
518
// Config validation should catch this
469
519
let validation_result = config.validate();
470
520
assert!(validation_result.is_err());
471
-
521
+
472
522
// Clean up
473
523
unsafe {
474
524
env::remove_var("METRICS_ADAPTER");
525
+
env::remove_var("HTTP_EXTERNAL");
526
+
env::remove_var("SERVICE_KEY");
475
527
}
476
528
}
477
-
}
529
+
}