+10
Cargo.lock
+10
Cargo.lock
···
261
261
checksum = "d71b6127be86fdcfddb610f7182ac57211d4b18a3e9c82eb2d17662f2227ad6a"
262
262
263
263
[[package]]
264
+
name = "cadence"
265
+
version = "1.6.0"
266
+
source = "registry+https://github.com/rust-lang/crates.io-index"
267
+
checksum = "3075f133bee430b7644c54fb629b9b4420346ffa275a45c81a6babe8b09b4f51"
268
+
dependencies = [
269
+
"crossbeam-channel",
270
+
]
271
+
272
+
[[package]]
264
273
name = "cbor4ii"
265
274
version = "0.2.14"
266
275
source = "registry+https://github.com/rust-lang/crates.io-index"
···
1851
1860
"atproto-identity",
1852
1861
"axum",
1853
1862
"bincode",
1863
+
"cadence",
1854
1864
"deadpool-redis",
1855
1865
"httpdate",
1856
1866
"metrohash",
+1
Cargo.toml
+1
Cargo.toml
···
19
19
atproto-identity = { version = "0.11.3" }
20
20
axum = { version = "0.8" }
21
21
bincode = { version = "2.0.1", features = ["serde"] }
22
+
cadence = "1.6.0"
22
23
deadpool-redis = { version = "0.22", features = ["connection-manager", "tokio-comp", "tokio-rustls-comp"] }
23
24
httpdate = "1.0"
24
25
metrohash = "1.0.7"
+35
-4
src/bin/quickdid.rs
+35
-4
src/bin/quickdid.rs
···
13
13
},
14
14
handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config},
15
15
http::{AppContext, create_router},
16
+
metrics::create_metrics_publisher,
16
17
queue::{
17
18
HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue,
18
19
create_redis_queue, create_sqlite_queue, create_sqlite_queue_with_max_size,
···
126
127
" RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)"
127
128
);
128
129
println!();
130
+
println!(" METRICS:");
131
+
println!(
132
+
" METRICS_ADAPTER Metrics adapter: 'noop' or 'statsd' (default: noop)"
133
+
);
134
+
println!(
135
+
" METRICS_STATSD_HOST StatsD host when using statsd adapter (e.g., localhost:8125)"
136
+
);
137
+
println!(
138
+
" METRICS_PREFIX Prefix for all metrics (default: quickdid)"
139
+
);
140
+
println!(
141
+
" METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)"
142
+
);
143
+
println!();
129
144
println!(
130
145
"For more information, visit: https://github.com/smokesignal.events/quickdid"
131
146
);
···
217
232
// Create DNS resolver Arc for sharing
218
233
let dns_resolver_arc = Arc::new(dns_resolver);
219
234
235
+
// Create metrics publisher based on configuration
236
+
let metrics_publisher = create_metrics_publisher(&config).map_err(|e| {
237
+
tracing::error!("Failed to create metrics publisher: {}", e);
238
+
anyhow::anyhow!("Failed to create metrics publisher: {}", e)
239
+
})?;
240
+
241
+
tracing::info!(
242
+
"Metrics publisher created with {} adapter",
243
+
config.metrics_adapter
244
+
);
245
+
246
+
metrics_publisher.gauge("server", 1).await;
247
+
220
248
// Create base handle resolver using factory function
221
249
let mut base_handle_resolver =
222
-
create_base_resolver(dns_resolver_arc.clone(), http_client.clone());
250
+
create_base_resolver(dns_resolver_arc.clone(), http_client.clone(), metrics_publisher.clone());
223
251
224
252
// Apply rate limiting if configured
225
253
if config.resolver_max_concurrent > 0 {
···
237
265
base_handle_resolver,
238
266
config.resolver_max_concurrent,
239
267
config.resolver_max_concurrent_timeout_ms,
268
+
metrics_publisher.clone(),
240
269
);
241
270
}
242
271
···
260
289
"Using Redis-backed handle resolver with {}-second cache TTL",
261
290
config.cache_ttl_redis
262
291
);
263
-
create_redis_resolver_with_ttl(base_handle_resolver, pool, config.cache_ttl_redis)
292
+
create_redis_resolver_with_ttl(base_handle_resolver, pool, config.cache_ttl_redis, metrics_publisher.clone())
264
293
} else if let Some(pool) = sqlite_pool {
265
294
tracing::info!(
266
295
"Using SQLite-backed handle resolver with {}-second cache TTL",
267
296
config.cache_ttl_sqlite
268
297
);
269
-
create_sqlite_resolver_with_ttl(base_handle_resolver, pool, config.cache_ttl_sqlite)
298
+
create_sqlite_resolver_with_ttl(base_handle_resolver, pool, config.cache_ttl_sqlite, metrics_publisher.clone())
270
299
} else {
271
300
tracing::info!(
272
301
"Using in-memory handle resolver with {}-second cache TTL",
273
302
config.cache_ttl_memory
274
303
);
275
-
create_caching_resolver(base_handle_resolver, config.cache_ttl_memory)
304
+
create_caching_resolver(base_handle_resolver, config.cache_ttl_memory, metrics_publisher.clone())
276
305
};
277
306
278
307
// Create task tracker and cancellation token
···
397
426
handle_resolver.clone(),
398
427
token.clone(),
399
428
handle_task_config,
429
+
metrics_publisher.clone(),
400
430
);
401
431
402
432
// Spawn the handle resolver task
···
440
470
config.service_did.clone(),
441
471
handle_resolver.clone(),
442
472
handle_queue,
473
+
metrics_publisher,
443
474
config.etag_seed.clone(),
444
475
config.cache_control_header.clone(),
445
476
);
+39
src/config.rs
+39
src/config.rs
···
211
211
/// Calculated at startup for efficiency.
212
212
/// None if cache_max_age is 0 (disabled).
213
213
pub cache_control_header: Option<String>,
214
+
215
+
/// Metrics adapter type: "noop" or "statsd"
216
+
/// Default: "noop" (no metrics collection)
217
+
pub metrics_adapter: String,
218
+
219
+
/// StatsD host for metrics collection (e.g., "localhost:8125")
220
+
/// Required when metrics_adapter is "statsd"
221
+
pub metrics_statsd_host: Option<String>,
222
+
223
+
/// Metrics prefix for all metrics (e.g., "quickdid")
224
+
/// Default: "quickdid"
225
+
pub metrics_prefix: String,
226
+
227
+
/// Default tags for all metrics (comma-separated key:value pairs)
228
+
/// Example: "env:production,service:quickdid"
229
+
pub metrics_tags: Option<String>,
214
230
}
215
231
216
232
impl Config {
···
302
318
cache_max_stale: parse_env("CACHE_MAX_STALE", 172800)?, // 48 hours
303
319
cache_min_fresh: parse_env("CACHE_MIN_FRESH", 3600)?, // 1 hour
304
320
cache_control_header: None, // Will be calculated below
321
+
metrics_adapter: get_env_or_default("METRICS_ADAPTER", Some("noop")).unwrap(),
322
+
metrics_statsd_host: get_env_or_default("METRICS_STATSD_HOST", None),
323
+
metrics_prefix: get_env_or_default("METRICS_PREFIX", Some("quickdid")).unwrap(),
324
+
metrics_tags: get_env_or_default("METRICS_TAGS", None),
305
325
};
306
326
307
327
// Calculate the Cache-Control header value if enabled
···
390
410
"RESOLVER_MAX_CONCURRENT_TIMEOUT_MS must be <= 60000 (60 seconds)".to_string(),
391
411
));
392
412
}
413
+
414
+
// Validate metrics configuration
415
+
match self.metrics_adapter.as_str() {
416
+
"noop" | "statsd" => {}
417
+
_ => {
418
+
return Err(ConfigError::InvalidValue(format!(
419
+
"Invalid METRICS_ADAPTER '{}', must be 'noop' or 'statsd'",
420
+
self.metrics_adapter
421
+
)));
422
+
}
423
+
}
424
+
425
+
// If statsd is configured, ensure host is provided
426
+
if self.metrics_adapter == "statsd" && self.metrics_statsd_host.is_none() {
427
+
return Err(ConfigError::MissingRequired(
428
+
"METRICS_STATSD_HOST is required when METRICS_ADAPTER is 'statsd'".to_string(),
429
+
));
430
+
}
431
+
393
432
Ok(())
394
433
}
395
434
}
+36
-9
src/handle_resolver/base.rs
+36
-9
src/handle_resolver/base.rs
···
5
5
6
6
use super::errors::HandleResolverError;
7
7
use super::traits::HandleResolver;
8
+
use crate::metrics::SharedMetricsPublisher;
8
9
use async_trait::async_trait;
9
10
use atproto_identity::resolve::{DnsResolver, resolve_subject};
10
11
use reqwest::Client;
···
25
26
/// use reqwest::Client;
26
27
/// use atproto_identity::resolve::HickoryDnsResolver;
27
28
/// use quickdid::handle_resolver::{create_base_resolver, HandleResolver};
29
+
/// use quickdid::metrics::NoOpMetricsPublisher;
28
30
///
29
31
/// # async fn example() {
30
32
/// let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
31
33
/// let http_client = Client::new();
34
+
/// let metrics = Arc::new(NoOpMetricsPublisher);
32
35
///
33
36
/// let resolver = create_base_resolver(
34
37
/// dns_resolver,
35
38
/// http_client,
39
+
/// metrics,
36
40
/// );
37
41
///
38
42
/// let (did, timestamp) = resolver.resolve("alice.bsky.social").await.unwrap();
···
45
49
46
50
/// HTTP client for DID document retrieval and well-known endpoint queries.
47
51
http_client: Client,
52
+
53
+
/// Metrics publisher for telemetry.
54
+
metrics: SharedMetricsPublisher,
48
55
}
49
56
50
57
#[async_trait]
51
58
impl HandleResolver for BaseHandleResolver {
52
59
async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> {
53
-
let did = resolve_subject(&self.http_client, &*self.dns_resolver, s)
60
+
let start_time = std::time::Instant::now();
61
+
62
+
// Perform DNS/HTTP resolution
63
+
let result = resolve_subject(&self.http_client, &*self.dns_resolver, s)
54
64
.await
55
-
.map_err(|e| HandleResolverError::ResolutionFailed(e.to_string()))?;
65
+
.map_err(|e| HandleResolverError::ResolutionFailed(e.to_string()));
56
66
57
-
let timestamp = SystemTime::now()
58
-
.duration_since(UNIX_EPOCH)
59
-
.map_err(|e| {
60
-
HandleResolverError::ResolutionFailed(format!("System time error: {}", e))
61
-
})?
62
-
.as_secs();
67
+
let duration_ms = start_time.elapsed().as_millis() as u64;
68
+
69
+
// Publish metrics
70
+
71
+
match result {
72
+
Ok(did) => {
73
+
self.metrics.time_with_tags("resolver.base.duration_ms", duration_ms, &[("success", "1")]).await;
74
+
75
+
let timestamp = SystemTime::now()
76
+
.duration_since(UNIX_EPOCH)
77
+
.map_err(|e| {
78
+
HandleResolverError::ResolutionFailed(format!("System time error: {}", e))
79
+
})?
80
+
.as_secs();
63
81
64
-
Ok((did, timestamp))
82
+
Ok((did, timestamp))
83
+
}
84
+
Err(e) => {
85
+
self.metrics.time_with_tags("resolver.base.duration_ms", duration_ms, &[("success", "0")]).await;
86
+
Err(e)
87
+
}
88
+
}
65
89
}
66
90
}
67
91
···
74
98
///
75
99
/// * `dns_resolver` - DNS resolver for TXT record lookups
76
100
/// * `http_client` - HTTP client for well-known endpoint queries
101
+
/// * `metrics` - Metrics publisher for telemetry
77
102
pub fn create_base_resolver(
78
103
dns_resolver: Arc<dyn DnsResolver>,
79
104
http_client: Client,
105
+
metrics: SharedMetricsPublisher,
80
106
) -> Arc<dyn HandleResolver> {
81
107
Arc::new(BaseHandleResolver {
82
108
dns_resolver,
83
109
http_client,
110
+
metrics,
84
111
})
85
112
}
+28
-5
src/handle_resolver/memory.rs
+28
-5
src/handle_resolver/memory.rs
···
6
6
7
7
use super::errors::HandleResolverError;
8
8
use super::traits::HandleResolver;
9
+
use crate::metrics::SharedMetricsPublisher;
9
10
use async_trait::async_trait;
10
11
use std::collections::HashMap;
11
12
use std::sync::Arc;
···
32
33
/// ```no_run
33
34
/// use std::sync::Arc;
34
35
/// use quickdid::handle_resolver::{create_caching_resolver, create_base_resolver, HandleResolver};
36
+
/// use quickdid::metrics::NoOpMetricsPublisher;
35
37
///
36
38
/// # async fn example() {
37
39
/// # use atproto_identity::resolve::HickoryDnsResolver;
38
40
/// # use reqwest::Client;
39
41
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
40
42
/// # let http_client = Client::new();
41
-
/// let base_resolver = create_base_resolver(dns_resolver, http_client);
43
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
44
+
/// let base_resolver = create_base_resolver(dns_resolver, http_client, metrics.clone());
42
45
/// let caching_resolver = create_caching_resolver(
43
46
/// base_resolver,
44
-
/// 300 // 5 minute TTL
47
+
/// 300, // 5 minute TTL
48
+
/// metrics
45
49
/// );
46
50
///
47
51
/// // First call hits the underlying resolver
···
55
59
inner: Arc<dyn HandleResolver>,
56
60
cache: Arc<RwLock<HashMap<String, ResolveHandleResult>>>,
57
61
ttl_seconds: u64,
62
+
metrics: SharedMetricsPublisher,
58
63
}
59
64
60
65
impl CachingHandleResolver {
···
64
69
///
65
70
/// * `inner` - The underlying resolver to use for actual resolution
66
71
/// * `ttl_seconds` - How long to cache results in seconds
67
-
pub fn new(inner: Arc<dyn HandleResolver>, ttl_seconds: u64) -> Self {
72
+
/// * `metrics` - Metrics publisher for telemetry
73
+
pub fn new(inner: Arc<dyn HandleResolver>, ttl_seconds: u64, metrics: SharedMetricsPublisher) -> Self {
68
74
Self {
69
75
inner,
70
76
cache: Arc::new(RwLock::new(HashMap::new())),
71
77
ttl_seconds,
78
+
metrics,
72
79
}
73
80
}
74
81
···
98
105
ResolveHandleResult::Found(timestamp, did) => {
99
106
if !self.is_expired(*timestamp) {
100
107
tracing::debug!("Cache hit for handle {}: {}", handle, did);
108
+
self.metrics.incr("resolver.memory.cache_hit").await;
101
109
return Ok((did.clone(), *timestamp));
102
110
}
103
111
tracing::debug!("Cache entry expired for handle {}", handle);
112
+
self.metrics.incr("resolver.memory.cache_expired").await;
104
113
}
105
114
ResolveHandleResult::NotFound(timestamp, error) => {
106
115
if !self.is_expired(*timestamp) {
···
109
118
handle,
110
119
error
111
120
);
121
+
self.metrics.incr("resolver.memory.cache_hit_not_resolved").await;
112
122
return Err(HandleResolverError::HandleNotFoundCached(error.clone()));
113
123
}
114
124
tracing::debug!("Cache entry expired for handle {}", handle);
125
+
self.metrics.incr("resolver.memory.cache_expired").await;
115
126
}
116
127
}
117
128
}
···
119
130
120
131
// Not in cache or expired, resolve through inner resolver
121
132
tracing::debug!("Cache miss for handle {}, resolving...", handle);
133
+
self.metrics.incr("resolver.memory.cache_miss").await;
122
134
let result = self.inner.resolve(s).await;
123
135
124
136
// Store in cache
···
130
142
handle.clone(),
131
143
ResolveHandleResult::Found(*timestamp, did.clone()),
132
144
);
145
+
self.metrics.incr("resolver.memory.cache_set").await;
133
146
tracing::debug!(
134
147
"Cached successful resolution for handle {}: {}",
135
148
handle,
···
142
155
handle.clone(),
143
156
ResolveHandleResult::NotFound(timestamp, e.to_string()),
144
157
);
158
+
self.metrics.incr("resolver.memory.cache_set_error").await;
145
159
tracing::debug!("Cached failed resolution for handle {}: {}", handle, e);
146
160
}
147
161
}
162
+
163
+
// Track cache size
164
+
let cache_size = cache.len() as u64;
165
+
self.metrics.gauge("resolver.memory.cache_entries", cache_size).await;
148
166
}
149
167
150
168
result
···
160
178
///
161
179
/// * `inner` - The underlying resolver to use for actual resolution
162
180
/// * `ttl_seconds` - How long to cache results in seconds
181
+
/// * `metrics` - Metrics publisher for telemetry
163
182
///
164
183
/// # Example
165
184
///
166
185
/// ```no_run
167
186
/// use std::sync::Arc;
168
187
/// use quickdid::handle_resolver::{create_base_resolver, create_caching_resolver, HandleResolver};
188
+
/// use quickdid::metrics::NoOpMetricsPublisher;
169
189
///
170
190
/// # async fn example() {
171
191
/// # use atproto_identity::resolve::HickoryDnsResolver;
172
192
/// # use reqwest::Client;
173
193
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
174
194
/// # let http_client = Client::new();
195
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
175
196
/// let base = create_base_resolver(
176
197
/// dns_resolver,
177
198
/// http_client,
199
+
/// metrics.clone(),
178
200
/// );
179
201
///
180
-
/// let resolver = create_caching_resolver(base, 300); // 5 minute TTL
202
+
/// let resolver = create_caching_resolver(base, 300, metrics); // 5 minute TTL
181
203
/// let did = resolver.resolve("alice.bsky.social").await.unwrap();
182
204
/// # }
183
205
/// ```
184
206
pub fn create_caching_resolver(
185
207
inner: Arc<dyn HandleResolver>,
186
208
ttl_seconds: u64,
209
+
metrics: SharedMetricsPublisher,
187
210
) -> Arc<dyn HandleResolver> {
188
-
Arc::new(CachingHandleResolver::new(inner, ttl_seconds))
211
+
Arc::new(CachingHandleResolver::new(inner, ttl_seconds, metrics))
189
212
}
+4
-1
src/handle_resolver/mod.rs
+4
-1
src/handle_resolver/mod.rs
···
19
19
//! ```no_run
20
20
//! use std::sync::Arc;
21
21
//! use quickdid::handle_resolver::{create_base_resolver, create_caching_resolver, HandleResolver};
22
+
//! use quickdid::metrics::NoOpMetricsPublisher;
22
23
//!
23
24
//! # async fn example() -> Result<(), Box<dyn std::error::Error>> {
24
25
//! # use atproto_identity::resolve::HickoryDnsResolver;
25
26
//! # use reqwest::Client;
26
27
//! # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
27
28
//! # let http_client = Client::new();
29
+
//! # let metrics = Arc::new(NoOpMetricsPublisher);
28
30
//! // Create base resolver using factory function
29
31
//! let base = create_base_resolver(
30
32
//! dns_resolver,
31
33
//! http_client,
34
+
//! metrics.clone(),
32
35
//! );
33
36
//!
34
37
//! // Wrap with in-memory caching
35
-
//! let resolver = create_caching_resolver(base, 300);
38
+
//! let resolver = create_caching_resolver(base, 300, metrics);
36
39
//!
37
40
//! // Resolve a handle
38
41
//! let did = resolver.resolve("alice.bsky.social").await?;
+55
-15
src/handle_resolver/rate_limited.rs
+55
-15
src/handle_resolver/rate_limited.rs
···
5
5
6
6
use super::errors::HandleResolverError;
7
7
use super::traits::HandleResolver;
8
+
use crate::metrics::SharedMetricsPublisher;
8
9
use async_trait::async_trait;
9
10
use std::sync::Arc;
10
11
use std::time::Duration;
···
34
35
/// create_rate_limited_resolver,
35
36
/// HandleResolver,
36
37
/// };
38
+
/// use quickdid::metrics::NoOpMetricsPublisher;
37
39
///
38
40
/// # async fn example() {
39
41
/// # use atproto_identity::resolve::HickoryDnsResolver;
40
42
/// # use reqwest::Client;
41
43
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
42
44
/// # let http_client = Client::new();
45
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
43
46
/// // Create base resolver
44
-
/// let base = create_base_resolver(dns_resolver, http_client);
47
+
/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone());
45
48
///
46
49
/// // Wrap with rate limiting (max 10 concurrent resolutions)
47
-
/// let rate_limited = create_rate_limited_resolver(base, 10);
50
+
/// let rate_limited = create_rate_limited_resolver(base, 10, metrics);
48
51
///
49
52
/// // Use the rate-limited resolver
50
53
/// let (did, timestamp) = rate_limited.resolve("alice.bsky.social").await.unwrap();
···
60
63
/// Optional timeout for acquiring permits (in milliseconds).
61
64
/// When None or 0, no timeout is applied.
62
65
timeout_ms: Option<u64>,
66
+
67
+
/// Metrics publisher for telemetry.
68
+
metrics: SharedMetricsPublisher,
63
69
}
64
70
65
71
impl RateLimitedHandleResolver {
···
69
75
///
70
76
/// * `inner` - The inner resolver to wrap
71
77
/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
72
-
pub fn new(inner: Arc<dyn HandleResolver>, max_concurrent: usize) -> Self {
78
+
/// * `metrics` - Metrics publisher for telemetry
79
+
pub fn new(inner: Arc<dyn HandleResolver>, max_concurrent: usize, metrics: SharedMetricsPublisher) -> Self {
73
80
Self {
74
81
inner,
75
82
semaphore: Arc::new(Semaphore::new(max_concurrent)),
76
83
timeout_ms: None,
84
+
metrics,
77
85
}
78
86
}
79
87
···
84
92
/// * `inner` - The inner resolver to wrap
85
93
/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
86
94
/// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout)
95
+
/// * `metrics` - Metrics publisher for telemetry
87
96
pub fn new_with_timeout(
88
97
inner: Arc<dyn HandleResolver>,
89
98
max_concurrent: usize,
90
99
timeout_ms: u64,
100
+
metrics: SharedMetricsPublisher,
91
101
) -> Self {
92
102
Self {
93
103
inner,
···
97
107
} else {
98
108
None
99
109
},
110
+
metrics,
100
111
}
101
112
}
102
113
}
···
104
115
#[async_trait]
105
116
impl HandleResolver for RateLimitedHandleResolver {
106
117
async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> {
118
+
let permit_start = std::time::Instant::now();
119
+
120
+
// Track rate limiter queue depth
121
+
let available_permits = self.semaphore.available_permits();
122
+
self.metrics.gauge("resolver.rate_limit.available_permits", available_permits as u64).await;
123
+
107
124
// Acquire a permit from the semaphore, with optional timeout
108
125
let _permit = match self.timeout_ms {
109
126
Some(timeout_ms) if timeout_ms > 0 => {
110
127
// Apply timeout when acquiring permit
111
128
let duration = Duration::from_millis(timeout_ms);
112
129
match timeout(duration, self.semaphore.acquire()).await {
113
-
Ok(Ok(permit)) => permit,
130
+
Ok(Ok(permit)) => {
131
+
let wait_ms = permit_start.elapsed().as_millis() as u64;
132
+
self.metrics.time("resolver.rate_limit.permit_acquired", wait_ms).await;
133
+
permit
134
+
}
114
135
Ok(Err(e)) => {
115
136
// Semaphore error (e.g., closed)
137
+
self.metrics.incr("resolver.rate_limit.permit_error").await;
116
138
return Err(HandleResolverError::ResolutionFailed(format!(
117
139
"Failed to acquire rate limit permit: {}",
118
140
e
···
120
142
}
121
143
Err(_) => {
122
144
// Timeout occurred
145
+
self.metrics.incr("resolver.rate_limit.permit_timeout").await;
123
146
return Err(HandleResolverError::ResolutionFailed(format!(
124
147
"Rate limit permit acquisition timed out after {}ms",
125
148
timeout_ms
···
129
152
}
130
153
_ => {
131
154
// No timeout configured, wait indefinitely
132
-
self.semaphore.acquire().await.map_err(|e| {
133
-
HandleResolverError::ResolutionFailed(format!(
134
-
"Failed to acquire rate limit permit: {}",
135
-
e
136
-
))
137
-
})?
155
+
match self.semaphore.acquire().await {
156
+
Ok(permit) => {
157
+
let wait_ms = permit_start.elapsed().as_millis() as u64;
158
+
self.metrics.time("resolver.rate_limit.permit_acquired", wait_ms).await;
159
+
permit
160
+
}
161
+
Err(e) => {
162
+
self.metrics.incr("resolver.rate_limit.permit_error").await;
163
+
return Err(HandleResolverError::ResolutionFailed(format!(
164
+
"Failed to acquire rate limit permit: {}",
165
+
e
166
+
)));
167
+
}
168
+
}
138
169
}
139
170
};
140
171
···
152
183
///
153
184
/// * `inner` - The resolver to wrap with rate limiting
154
185
/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
186
+
/// * `metrics` - Metrics publisher for telemetry
155
187
///
156
188
/// # Returns
157
189
///
···
169
201
/// # async fn example() {
170
202
/// # use atproto_identity::resolve::HickoryDnsResolver;
171
203
/// # use reqwest::Client;
204
+
/// # use quickdid::metrics::NoOpMetricsPublisher;
172
205
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
173
206
/// # let http_client = Client::new();
174
-
/// let base = create_base_resolver(dns_resolver, http_client);
175
-
/// let rate_limited = create_rate_limited_resolver(base, 10);
207
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
208
+
/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone());
209
+
/// let rate_limited = create_rate_limited_resolver(base, 10, metrics);
176
210
/// # }
177
211
/// ```
178
212
pub fn create_rate_limited_resolver(
179
213
inner: Arc<dyn HandleResolver>,
180
214
max_concurrent: usize,
215
+
metrics: SharedMetricsPublisher,
181
216
) -> Arc<dyn HandleResolver> {
182
-
Arc::new(RateLimitedHandleResolver::new(inner, max_concurrent))
217
+
Arc::new(RateLimitedHandleResolver::new(inner, max_concurrent, metrics))
183
218
}
184
219
185
220
/// Create a rate-limited handle resolver with timeout.
···
192
227
/// * `inner` - The resolver to wrap with rate limiting
193
228
/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
194
229
/// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout)
230
+
/// * `metrics` - Metrics publisher for telemetry
195
231
///
196
232
/// # Returns
197
233
///
···
209
245
/// # async fn example() {
210
246
/// # use atproto_identity::resolve::HickoryDnsResolver;
211
247
/// # use reqwest::Client;
248
+
/// # use quickdid::metrics::NoOpMetricsPublisher;
212
249
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
213
250
/// # let http_client = Client::new();
214
-
/// let base = create_base_resolver(dns_resolver, http_client);
251
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
252
+
/// let base = create_base_resolver(dns_resolver, http_client, metrics.clone());
215
253
/// // Rate limit with 10 concurrent resolutions and 5 second timeout
216
-
/// let rate_limited = create_rate_limited_resolver_with_timeout(base, 10, 5000);
254
+
/// let rate_limited = create_rate_limited_resolver_with_timeout(base, 10, 5000, metrics);
217
255
/// # }
218
256
/// ```
219
257
pub fn create_rate_limited_resolver_with_timeout(
220
258
inner: Arc<dyn HandleResolver>,
221
259
max_concurrent: usize,
222
260
timeout_ms: u64,
261
+
metrics: SharedMetricsPublisher,
223
262
) -> Arc<dyn HandleResolver> {
224
263
Arc::new(RateLimitedHandleResolver::new_with_timeout(
225
264
inner,
226
265
max_concurrent,
227
266
timeout_ms,
267
+
metrics,
228
268
))
229
269
}
+44
-9
src/handle_resolver/redis.rs
+44
-9
src/handle_resolver/redis.rs
···
7
7
use super::errors::HandleResolverError;
8
8
use super::traits::HandleResolver;
9
9
use crate::handle_resolution_result::HandleResolutionResult;
10
+
use crate::metrics::SharedMetricsPublisher;
10
11
use async_trait::async_trait;
11
12
use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands};
12
13
use metrohash::MetroHash64;
···
33
34
/// use std::sync::Arc;
34
35
/// use deadpool_redis::Pool;
35
36
/// use quickdid::handle_resolver::{create_base_resolver, create_redis_resolver, HandleResolver};
37
+
/// use quickdid::metrics::NoOpMetricsPublisher;
36
38
///
37
39
/// # async fn example() {
38
40
/// # use atproto_identity::resolve::HickoryDnsResolver;
39
41
/// # use reqwest::Client;
40
42
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
41
43
/// # let http_client = Client::new();
42
-
/// # let base_resolver = create_base_resolver(dns_resolver, http_client);
44
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
45
+
/// # let base_resolver = create_base_resolver(dns_resolver, http_client, metrics.clone());
43
46
/// # let redis_pool: Pool = todo!();
44
47
/// // Create with default 90-day TTL
45
48
/// let resolver = create_redis_resolver(
46
49
/// base_resolver,
47
-
/// redis_pool
50
+
/// redis_pool,
51
+
/// metrics
48
52
/// );
49
53
/// # }
50
54
/// ```
···
57
61
key_prefix: String,
58
62
/// TTL for cache entries in seconds
59
63
ttl_seconds: u64,
64
+
/// Metrics publisher for telemetry
65
+
metrics: SharedMetricsPublisher,
60
66
}
61
67
62
68
impl RedisHandleResolver {
63
69
/// Create a new Redis-backed handle resolver with default 90-day TTL.
64
-
fn new(inner: Arc<dyn HandleResolver>, pool: RedisPool) -> Self {
65
-
Self::with_ttl(inner, pool, 90 * 24 * 60 * 60) // 90 days default
70
+
fn new(inner: Arc<dyn HandleResolver>, pool: RedisPool, metrics: SharedMetricsPublisher) -> Self {
71
+
Self::with_ttl(inner, pool, 90 * 24 * 60 * 60, metrics) // 90 days default
66
72
}
67
73
68
74
/// Create a new Redis-backed handle resolver with custom TTL.
69
-
fn with_ttl(inner: Arc<dyn HandleResolver>, pool: RedisPool, ttl_seconds: u64) -> Self {
70
-
Self::with_full_config(inner, pool, "handle:".to_string(), ttl_seconds)
75
+
fn with_ttl(inner: Arc<dyn HandleResolver>, pool: RedisPool, ttl_seconds: u64, metrics: SharedMetricsPublisher) -> Self {
76
+
Self::with_full_config(inner, pool, "handle:".to_string(), ttl_seconds, metrics)
71
77
}
72
78
73
79
/// Create a new Redis-backed handle resolver with full configuration.
···
76
82
pool: RedisPool,
77
83
key_prefix: String,
78
84
ttl_seconds: u64,
85
+
metrics: SharedMetricsPublisher,
79
86
) -> Self {
80
87
Self {
81
88
inner,
82
89
pool,
83
90
key_prefix,
84
91
ttl_seconds,
92
+
metrics,
85
93
}
86
94
}
87
95
···
115
123
let cached: Option<Vec<u8>> = match conn.get(&key).await {
116
124
Ok(value) => value,
117
125
Err(e) => {
126
+
self.metrics.incr("resolver.redis.get_error").await;
118
127
tracing::warn!("Failed to get handle from Redis cache: {}", e);
119
128
None
120
129
}
···
126
135
Ok(cached_result) => {
127
136
if let Some(did) = cached_result.to_did() {
128
137
tracing::debug!("Cache hit for handle {}: {}", handle, did);
138
+
self.metrics.incr("resolver.redis.cache_hit").await;
129
139
return Ok((did, cached_result.timestamp));
130
140
} else {
131
141
tracing::debug!("Cache hit (not resolved) for handle {}", handle);
142
+
self.metrics.incr("resolver.redis.cache_hit_not_resolved").await;
132
143
return Err(HandleResolverError::HandleNotFound);
133
144
}
134
145
}
···
138
149
handle,
139
150
e
140
151
);
152
+
self.metrics.incr("resolver.redis.deserialize_error").await;
141
153
// Fall through to re-resolve if deserialization fails
142
154
}
143
155
}
···
145
157
146
158
// Not in cache, resolve through inner resolver
147
159
tracing::debug!("Cache miss for handle {}, resolving...", handle);
160
+
self.metrics.incr("resolver.redis.cache_miss").await;
148
161
let result = self.inner.resolve(s).await;
149
162
150
163
// Create and serialize resolution result
···
159
172
Ok(res) => res,
160
173
Err(e) => {
161
174
tracing::warn!("Failed to create resolution result: {}", e);
175
+
self.metrics.incr("resolver.redis.result_create_error").await;
162
176
return result;
163
177
}
164
178
}
···
169
183
Ok(res) => res,
170
184
Err(err) => {
171
185
tracing::warn!("Failed to create not_resolved result: {}", err);
186
+
self.metrics.incr("resolver.redis.result_create_error").await;
172
187
return result;
173
188
}
174
189
}
···
184
199
.await
185
200
{
186
201
tracing::warn!("Failed to cache handle resolution in Redis: {}", e);
202
+
self.metrics.incr("resolver.redis.cache_set_error").await;
203
+
} else {
204
+
self.metrics.incr("resolver.redis.cache_set").await;
187
205
}
188
206
}
189
207
Err(e) => {
···
192
210
handle,
193
211
e
194
212
);
213
+
self.metrics.incr("resolver.redis.serialize_error").await;
195
214
}
196
215
}
197
216
···
203
222
"Failed to get Redis connection, falling back to uncached resolution: {}",
204
223
e
205
224
);
225
+
self.metrics.incr("resolver.redis.connection_error").await;
206
226
self.inner.resolve(s).await
207
227
}
208
228
}
···
215
235
///
216
236
/// * `inner` - The underlying resolver to use for actual resolution
217
237
/// * `pool` - Redis connection pool
238
+
/// * `metrics` - Metrics publisher for telemetry
218
239
///
219
240
/// # Example
220
241
///
···
222
243
/// use std::sync::Arc;
223
244
/// use quickdid::handle_resolver::{create_base_resolver, create_redis_resolver, HandleResolver};
224
245
/// use quickdid::cache::create_redis_pool;
246
+
/// use quickdid::metrics::NoOpMetricsPublisher;
225
247
///
226
248
/// # async fn example() -> anyhow::Result<()> {
227
249
/// # use atproto_identity::resolve::HickoryDnsResolver;
228
250
/// # use reqwest::Client;
229
251
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
230
252
/// # let http_client = Client::new();
253
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
231
254
/// let base = create_base_resolver(
232
255
/// dns_resolver,
233
256
/// http_client,
257
+
/// metrics.clone(),
234
258
/// );
235
259
///
236
260
/// let pool = create_redis_pool("redis://localhost:6379")?;
237
-
/// let resolver = create_redis_resolver(base, pool);
261
+
/// let resolver = create_redis_resolver(base, pool, metrics);
238
262
/// let (did, timestamp) = resolver.resolve("alice.bsky.social").await.unwrap();
239
263
/// # Ok(())
240
264
/// # }
···
242
266
pub fn create_redis_resolver(
243
267
inner: Arc<dyn HandleResolver>,
244
268
pool: RedisPool,
269
+
metrics: SharedMetricsPublisher,
245
270
) -> Arc<dyn HandleResolver> {
246
-
Arc::new(RedisHandleResolver::new(inner, pool))
271
+
Arc::new(RedisHandleResolver::new(inner, pool, metrics))
247
272
}
248
273
249
274
/// Create a new Redis-backed handle resolver with custom TTL.
···
253
278
/// * `inner` - The underlying resolver to use for actual resolution
254
279
/// * `pool` - Redis connection pool
255
280
/// * `ttl_seconds` - TTL for cache entries in seconds
281
+
/// * `metrics` - Metrics publisher for telemetry
256
282
pub fn create_redis_resolver_with_ttl(
257
283
inner: Arc<dyn HandleResolver>,
258
284
pool: RedisPool,
259
285
ttl_seconds: u64,
286
+
metrics: SharedMetricsPublisher,
260
287
) -> Arc<dyn HandleResolver> {
261
-
Arc::new(RedisHandleResolver::with_ttl(inner, pool, ttl_seconds))
288
+
Arc::new(RedisHandleResolver::with_ttl(inner, pool, ttl_seconds, metrics))
262
289
}
263
290
264
291
#[cfg(test)]
···
300
327
expected_did: "did:plc:testuser123".to_string(),
301
328
});
302
329
330
+
// Create metrics publisher
331
+
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
332
+
303
333
// Create Redis-backed resolver with a unique key prefix for testing
304
334
let test_prefix = format!(
305
335
"test:handle:{}:",
···
313
343
pool.clone(),
314
344
test_prefix.clone(),
315
345
3600,
346
+
metrics,
316
347
);
317
348
318
349
let test_handle = "alice.bsky.social";
···
348
379
expected_did: String::new(),
349
380
});
350
381
382
+
// Create metrics publisher
383
+
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
384
+
351
385
// Create Redis-backed resolver with a unique key prefix for testing
352
386
let test_prefix = format!(
353
387
"test:handle:{}:",
···
361
395
pool.clone(),
362
396
test_prefix.clone(),
363
397
3600,
398
+
metrics,
364
399
);
365
400
366
401
let test_handle = "error.bsky.social";
+42
-10
src/handle_resolver/sqlite.rs
+42
-10
src/handle_resolver/sqlite.rs
···
7
7
use super::errors::HandleResolverError;
8
8
use super::traits::HandleResolver;
9
9
use crate::handle_resolution_result::HandleResolutionResult;
10
+
use crate::metrics::SharedMetricsPublisher;
10
11
use async_trait::async_trait;
11
12
use metrohash::MetroHash64;
12
13
use sqlx::{Row, SqlitePool};
···
35
36
/// use std::sync::Arc;
36
37
/// use sqlx::SqlitePool;
37
38
/// use quickdid::handle_resolver::{create_base_resolver, create_sqlite_resolver, HandleResolver};
39
+
/// use quickdid::metrics::NoOpMetricsPublisher;
38
40
///
39
41
/// # async fn example() {
40
42
/// # use atproto_identity::resolve::HickoryDnsResolver;
41
43
/// # use reqwest::Client;
42
44
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
43
45
/// # let http_client = Client::new();
44
-
/// # let base_resolver = create_base_resolver(dns_resolver, http_client);
46
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
47
+
/// # let base_resolver = create_base_resolver(dns_resolver, http_client, metrics.clone());
45
48
/// # let sqlite_pool: SqlitePool = todo!();
46
49
/// // Create with default 90-day TTL
47
50
/// let resolver = create_sqlite_resolver(
48
51
/// base_resolver,
49
-
/// sqlite_pool
52
+
/// sqlite_pool,
53
+
/// metrics
50
54
/// );
51
55
/// # }
52
56
/// ```
···
57
61
pool: SqlitePool,
58
62
/// TTL for cache entries in seconds
59
63
ttl_seconds: u64,
64
+
/// Metrics publisher for telemetry
65
+
metrics: SharedMetricsPublisher,
60
66
}
61
67
62
68
impl SqliteHandleResolver {
63
69
/// Create a new SQLite-backed handle resolver with default 90-day TTL.
64
-
fn new(inner: Arc<dyn HandleResolver>, pool: SqlitePool) -> Self {
65
-
Self::with_ttl(inner, pool, 90 * 24 * 60 * 60) // 90 days default
70
+
fn new(inner: Arc<dyn HandleResolver>, pool: SqlitePool, metrics: SharedMetricsPublisher) -> Self {
71
+
Self::with_ttl(inner, pool, 90 * 24 * 60 * 60, metrics) // 90 days default
66
72
}
67
73
68
74
/// Create a new SQLite-backed handle resolver with custom TTL.
69
-
fn with_ttl(inner: Arc<dyn HandleResolver>, pool: SqlitePool, ttl_seconds: u64) -> Self {
75
+
fn with_ttl(inner: Arc<dyn HandleResolver>, pool: SqlitePool, ttl_seconds: u64, metrics: SharedMetricsPublisher) -> Self {
70
76
Self {
71
77
inner,
72
78
pool,
73
79
ttl_seconds,
80
+
metrics,
74
81
}
75
82
}
76
83
···
121
128
Ok(cached_result) => {
122
129
if let Some(did) = cached_result.to_did() {
123
130
tracing::debug!("Cache hit for handle {}: {}", handle, did);
131
+
self.metrics.incr("resolver.sqlite.cache_hit").await;
124
132
return Ok((did, cached_result.timestamp));
125
133
} else {
126
134
tracing::debug!("Cache hit (not resolved) for handle {}", handle);
135
+
self.metrics.incr("resolver.sqlite.cache_hit_not_resolved").await;
127
136
return Err(HandleResolverError::HandleNotFound);
128
137
}
129
138
}
···
133
142
handle,
134
143
e
135
144
);
145
+
self.metrics.incr("resolver.sqlite.deserialize_error").await;
136
146
// Fall through to re-resolve if deserialization fails
137
147
}
138
148
}
139
149
} else {
140
150
tracing::debug!("Cache entry expired for handle {}", handle);
151
+
self.metrics.incr("resolver.sqlite.cache_expired").await;
141
152
// Entry is expired, we'll re-resolve and update it
142
153
}
143
154
}
144
155
Ok(None) => {
145
156
tracing::debug!("Cache miss for handle {}, resolving...", handle);
157
+
self.metrics.incr("resolver.sqlite.cache_miss").await;
146
158
}
147
159
Err(e) => {
148
160
tracing::warn!("Failed to query SQLite cache for handle {}: {}", handle, e);
161
+
self.metrics.incr("resolver.sqlite.query_error").await;
149
162
// Fall through to resolve without caching on database error
150
163
}
151
164
}
···
165
178
Ok(res) => res,
166
179
Err(e) => {
167
180
tracing::warn!("Failed to create resolution result: {}", e);
181
+
self.metrics.incr("resolver.sqlite.result_create_error").await;
168
182
return result;
169
183
}
170
184
}
···
175
189
Ok(res) => res,
176
190
Err(err) => {
177
191
tracing::warn!("Failed to create not_resolved result: {}", err);
192
+
self.metrics.incr("resolver.sqlite.result_create_error").await;
178
193
return result;
179
194
}
180
195
}
···
208
223
209
224
if let Err(e) = query_result {
210
225
tracing::warn!("Failed to cache handle resolution in SQLite: {}", e);
226
+
self.metrics.incr("resolver.sqlite.cache_set_error").await;
227
+
} else {
228
+
self.metrics.incr("resolver.sqlite.cache_set").await;
211
229
}
212
230
}
213
231
Err(e) => {
···
216
234
handle,
217
235
e
218
236
);
237
+
self.metrics.incr("resolver.sqlite.serialize_error").await;
219
238
}
220
239
}
221
240
···
229
248
///
230
249
/// * `inner` - The underlying resolver to use for actual resolution
231
250
/// * `pool` - SQLite connection pool
251
+
/// * `metrics` - Metrics publisher for telemetry
232
252
///
233
253
/// # Example
234
254
///
···
236
256
/// use std::sync::Arc;
237
257
/// use quickdid::handle_resolver::{create_base_resolver, create_sqlite_resolver, HandleResolver};
238
258
/// use quickdid::sqlite_schema::create_sqlite_pool;
259
+
/// use quickdid::metrics::NoOpMetricsPublisher;
239
260
///
240
261
/// # async fn example() -> anyhow::Result<()> {
241
262
/// # use atproto_identity::resolve::HickoryDnsResolver;
242
263
/// # use reqwest::Client;
243
264
/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[]));
244
265
/// # let http_client = Client::new();
266
+
/// # let metrics = Arc::new(NoOpMetricsPublisher);
245
267
/// let base = create_base_resolver(
246
268
/// dns_resolver,
247
269
/// http_client,
270
+
/// metrics.clone(),
248
271
/// );
249
272
///
250
273
/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?;
251
-
/// let resolver = create_sqlite_resolver(base, pool);
274
+
/// let resolver = create_sqlite_resolver(base, pool, metrics);
252
275
/// let (did, timestamp) = resolver.resolve("alice.bsky.social").await.unwrap();
253
276
/// # Ok(())
254
277
/// # }
···
256
279
pub fn create_sqlite_resolver(
257
280
inner: Arc<dyn HandleResolver>,
258
281
pool: SqlitePool,
282
+
metrics: SharedMetricsPublisher,
259
283
) -> Arc<dyn HandleResolver> {
260
-
Arc::new(SqliteHandleResolver::new(inner, pool))
284
+
Arc::new(SqliteHandleResolver::new(inner, pool, metrics))
261
285
}
262
286
263
287
/// Create a new SQLite-backed handle resolver with custom TTL.
···
267
291
/// * `inner` - The underlying resolver to use for actual resolution
268
292
/// * `pool` - SQLite connection pool
269
293
/// * `ttl_seconds` - TTL for cache entries in seconds
294
+
/// * `metrics` - Metrics publisher for telemetry
270
295
pub fn create_sqlite_resolver_with_ttl(
271
296
inner: Arc<dyn HandleResolver>,
272
297
pool: SqlitePool,
273
298
ttl_seconds: u64,
299
+
metrics: SharedMetricsPublisher,
274
300
) -> Arc<dyn HandleResolver> {
275
-
Arc::new(SqliteHandleResolver::with_ttl(inner, pool, ttl_seconds))
301
+
Arc::new(SqliteHandleResolver::with_ttl(inner, pool, ttl_seconds, metrics))
276
302
}
277
303
278
304
#[cfg(test)]
···
319
345
expected_did: "did:plc:testuser123".to_string(),
320
346
});
321
347
348
+
// Create metrics publisher
349
+
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
350
+
322
351
// Create SQLite-backed resolver
323
-
let sqlite_resolver = SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600);
352
+
let sqlite_resolver = SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics);
324
353
325
354
let test_handle = "alice.bsky.social";
326
355
let expected_key = sqlite_resolver.make_key(test_handle) as i64;
···
408
437
expected_did: String::new(),
409
438
});
410
439
440
+
// Create metrics publisher
441
+
let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher);
442
+
411
443
// Create SQLite-backed resolver
412
-
let sqlite_resolver = SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600);
444
+
let sqlite_resolver = SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics);
413
445
414
446
let test_handle = "error.bsky.social";
415
447
let expected_key = sqlite_resolver.make_key(test_handle) as i64;
+30
-2
src/handle_resolver_task.rs
+30
-2
src/handle_resolver_task.rs
···
5
5
//! and ensures resolved handles are cached for efficient subsequent lookups.
6
6
7
7
use crate::handle_resolver::HandleResolver;
8
+
use crate::metrics::SharedMetricsPublisher;
8
9
use crate::queue::{HandleResolutionWork, QueueAdapter};
9
10
use anyhow::Result;
10
11
use std::sync::Arc;
···
52
53
cancel_token: CancellationToken,
53
54
config: HandleResolverTaskConfig,
54
55
metrics: Arc<HandleResolverMetrics>,
56
+
metrics_publisher: SharedMetricsPublisher,
55
57
}
56
58
57
59
impl HandleResolverTask {
···
60
62
adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
61
63
handle_resolver: Arc<dyn HandleResolver>,
62
64
cancel_token: CancellationToken,
65
+
metrics_publisher: SharedMetricsPublisher,
63
66
) -> Self {
64
67
let config = HandleResolverTaskConfig::default();
65
68
Self {
···
68
71
cancel_token,
69
72
config,
70
73
metrics: Arc::new(HandleResolverMetrics::default()),
74
+
metrics_publisher,
71
75
}
72
76
}
73
77
···
77
81
handle_resolver: Arc<dyn HandleResolver>,
78
82
cancel_token: CancellationToken,
79
83
config: HandleResolverTaskConfig,
84
+
metrics_publisher: SharedMetricsPublisher,
80
85
) -> Self {
81
86
Self {
82
87
adapter,
···
84
89
cancel_token,
85
90
config,
86
91
metrics: Arc::new(HandleResolverMetrics::default()),
92
+
metrics_publisher,
87
93
}
88
94
}
89
95
···
162
168
.total_processed
163
169
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
164
170
171
+
// Publish metrics
172
+
self.metrics_publisher.incr("task.handle_resolution.processed").await;
173
+
self.metrics_publisher.time("task.handle_resolution.duration_ms", duration_ms).await;
174
+
165
175
match result {
166
176
Ok(Ok((did, _timestamp))) => {
167
177
self.metrics
···
171
181
.total_cached
172
182
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
173
183
184
+
// Publish success metrics
185
+
self.metrics_publisher.incr("task.handle_resolution.success").await;
186
+
self.metrics_publisher.incr("task.handle_resolution.cached").await;
187
+
174
188
info!(
175
189
handle = %work.handle,
176
190
did = %did,
···
183
197
.total_failed
184
198
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
185
199
200
+
// Publish failure metrics
201
+
self.metrics_publisher.incr("task.handle_resolution.failed").await;
202
+
186
203
error!(
187
204
handle = %work.handle,
188
205
error = %e,
···
194
211
self.metrics
195
212
.total_failed
196
213
.fetch_add(1, std::sync::atomic::Ordering::Relaxed);
214
+
215
+
// Publish timeout metrics
216
+
self.metrics_publisher.incr("task.handle_resolution.timeout").await;
197
217
198
218
error!(
199
219
handle = %work.handle,
···
228
248
/// * `adapter` - Queue adapter for work items
229
249
/// * `handle_resolver` - Handle resolver implementation
230
250
/// * `cancel_token` - Token for graceful shutdown
251
+
/// * `metrics_publisher` - Metrics publisher for telemetry
231
252
pub fn create_handle_resolver_task(
232
253
adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
233
254
handle_resolver: Arc<dyn HandleResolver>,
234
255
cancel_token: CancellationToken,
256
+
metrics_publisher: SharedMetricsPublisher,
235
257
) -> HandleResolverTaskHandle {
236
258
HandleResolverTaskHandle {
237
-
task: HandleResolverTask::new(adapter, handle_resolver, cancel_token),
259
+
task: HandleResolverTask::new(adapter, handle_resolver, cancel_token, metrics_publisher),
238
260
}
239
261
}
240
262
···
246
268
/// * `handle_resolver` - Handle resolver implementation
247
269
/// * `cancel_token` - Token for graceful shutdown
248
270
/// * `config` - Task configuration
271
+
/// * `metrics_publisher` - Metrics publisher for telemetry
249
272
pub fn create_handle_resolver_task_with_config(
250
273
adapter: Arc<dyn QueueAdapter<HandleResolutionWork>>,
251
274
handle_resolver: Arc<dyn HandleResolver>,
252
275
cancel_token: CancellationToken,
253
276
config: HandleResolverTaskConfig,
277
+
metrics_publisher: SharedMetricsPublisher,
254
278
) -> HandleResolverTaskHandle {
255
279
HandleResolverTaskHandle {
256
-
task: HandleResolverTask::with_config(adapter, handle_resolver, cancel_token, config),
280
+
task: HandleResolverTask::with_config(adapter, handle_resolver, cancel_token, config, metrics_publisher),
257
281
}
258
282
}
259
283
···
301
325
// Create cancellation token
302
326
let cancel_token = CancellationToken::new();
303
327
328
+
// Create metrics publisher
329
+
let metrics_publisher = Arc::new(crate::metrics::NoOpMetricsPublisher);
330
+
304
331
// Create task with custom config
305
332
let config = HandleResolverTaskConfig {
306
333
default_timeout_ms: 5000,
···
311
338
handle_resolver,
312
339
cancel_token.clone(),
313
340
config,
341
+
metrics_publisher,
314
342
);
315
343
316
344
// Create handle resolution work
+12
src/http/handle_xrpc_resolve_handle.rs
+12
src/http/handle_xrpc_resolve_handle.rs
···
4
4
use crate::{
5
5
handle_resolver::HandleResolver,
6
6
http::AppContext,
7
+
metrics::SharedMetricsPublisher,
7
8
queue::{HandleResolutionWork, QueueAdapter},
8
9
};
9
10
···
193
194
State(app_context): State<AppContext>,
194
195
State(handle_resolver): State<Arc<dyn HandleResolver>>,
195
196
State(queue): State<Arc<dyn QueueAdapter<HandleResolutionWork>>>,
197
+
State(metrics): State<SharedMetricsPublisher>,
196
198
) -> impl IntoResponse {
197
199
let validating = params.validate.is_some();
198
200
let queueing = params.queue.is_some();
···
201
203
let handle = match params.handle {
202
204
Some(h) => h,
203
205
None => {
206
+
metrics.incr_with_tags("xrpc.com.atproto.identity.resolveHandle.invalid_handle", &[("reason", "missing")]).await;
204
207
return (
205
208
StatusCode::BAD_REQUEST,
206
209
Json(ErrorResponse {
···
217
220
Ok(InputType::Handle(value)) => value,
218
221
Ok(InputType::Plc(_)) | Ok(InputType::Web(_)) => {
219
222
// It's a DID, not a handle
223
+
metrics.incr_with_tags("xrpc.com.atproto.identity.resolveHandle.invalid_handle", &[("reason", "did")]).await;
220
224
return (
221
225
StatusCode::BAD_REQUEST,
222
226
Json(ErrorResponse {
···
227
231
.into_response();
228
232
}
229
233
Err(_) => {
234
+
metrics.incr_with_tags("xrpc.com.atproto.identity.resolveHandle.invalid_handle", &[("reason", "error")]).await;
230
235
return (
231
236
StatusCode::BAD_REQUEST,
232
237
Json(ErrorResponse {
···
239
244
};
240
245
241
246
if validating {
247
+
metrics.incr("xrpc.com.atproto.identity.resolveHandle").await;
242
248
return StatusCode::NO_CONTENT.into_response();
243
249
}
244
250
···
249
255
// Queue the work
250
256
match queue.push(work).await {
251
257
Ok(()) => {
258
+
metrics.incr("xrpc.com.atproto.identity.resolveHandle").await;
252
259
tracing::debug!("Queued handle resolution for {}", handle);
253
260
}
254
261
Err(e) => {
262
+
metrics.incr("xrpc.com.atproto.identity.resolveHandle.queue_failure").await;
255
263
tracing::error!("Failed to queue handle resolution: {}", e);
256
264
}
257
265
}
···
269
277
let result = match handle_resolver.resolve(&handle).await {
270
278
Ok((did, timestamp)) => {
271
279
tracing::debug!(handle, did, "Found cached DID for handle");
280
+
281
+
metrics.incr_with_tags("handle.resolution.request", &[("success", "1")]).await;
282
+
272
283
let etag = calculate_etag(&did, app_context.etag_seed());
273
284
ResolutionResult::Success {
274
285
did,
···
278
289
}
279
290
Err(err) => {
280
291
tracing::debug!(error = ?err, handle, "Error resolving handle");
292
+
metrics.incr_with_tags("handle.resolution.request", &[("success", "0")]).await;
281
293
let error_content = format!("error:{}:{}", handle, err);
282
294
let etag = calculate_etag(&error_content, app_context.etag_seed());
283
295
let timestamp = SystemTime::now()
+48
-2
src/http/server.rs
+48
-2
src/http/server.rs
···
1
1
use crate::handle_resolver::HandleResolver;
2
+
use crate::metrics::SharedMetricsPublisher;
2
3
use crate::queue::{HandleResolutionWork, QueueAdapter};
3
4
use axum::{
4
5
Router,
5
-
extract::State,
6
-
http::StatusCode,
6
+
extract::{MatchedPath, State},
7
+
http::{Request, StatusCode},
8
+
middleware::{self, Next},
7
9
response::{Html, IntoResponse, Json, Response},
8
10
routing::get,
9
11
};
10
12
use serde_json::json;
11
13
use std::sync::Arc;
14
+
use std::time::Instant;
12
15
13
16
pub(crate) struct InnerAppContext {
14
17
pub(crate) service_document: serde_json::Value,
15
18
pub(crate) service_did: String,
16
19
pub(crate) handle_resolver: Arc<dyn HandleResolver>,
17
20
pub(crate) handle_queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
21
+
pub(crate) metrics: SharedMetricsPublisher,
18
22
pub(crate) etag_seed: String,
19
23
pub(crate) cache_control_header: Option<String>,
20
24
}
···
29
33
service_did: String,
30
34
handle_resolver: Arc<dyn HandleResolver>,
31
35
handle_queue: Arc<dyn QueueAdapter<HandleResolutionWork>>,
36
+
metrics: SharedMetricsPublisher,
32
37
etag_seed: String,
33
38
cache_control_header: Option<String>,
34
39
) -> Self {
···
37
42
service_did,
38
43
handle_resolver,
39
44
handle_queue,
45
+
metrics,
40
46
etag_seed,
41
47
cache_control_header,
42
48
}))
···
78
84
handle_queue,
79
85
Arc<dyn QueueAdapter<HandleResolutionWork>>
80
86
);
87
+
impl_from_ref!(AppContext, metrics, SharedMetricsPublisher);
88
+
89
+
/// Middleware to track HTTP request metrics
90
+
async fn metrics_middleware(
91
+
State(metrics): State<SharedMetricsPublisher>,
92
+
matched_path: Option<MatchedPath>,
93
+
request: Request<axum::body::Body>,
94
+
next: Next,
95
+
) -> Response {
96
+
let start = Instant::now();
97
+
let method = request.method().to_string();
98
+
let path = matched_path
99
+
.as_ref()
100
+
.map(|p| p.as_str().to_string())
101
+
.unwrap_or_else(|| "unknown".to_string());
102
+
103
+
// Process the request
104
+
let response = next.run(request).await;
105
+
106
+
// Calculate duration
107
+
let duration_ms = start.elapsed().as_millis() as u64;
108
+
let status_code = response.status().as_u16().to_string();
109
+
110
+
// Publish metrics with tags
111
+
metrics.time_with_tags(
112
+
"http.request.duration_ms",
113
+
duration_ms,
114
+
&[
115
+
("method", &method),
116
+
("path", &path),
117
+
("status", &status_code),
118
+
],
119
+
).await;
120
+
121
+
response
122
+
}
81
123
82
124
pub fn create_router(app_context: AppContext) -> Router {
83
125
Router::new()
···
93
135
get(super::handle_xrpc_resolve_handle::handle_xrpc_resolve_handle)
94
136
.options(super::handle_xrpc_resolve_handle::handle_xrpc_resolve_handle_options),
95
137
)
138
+
.layer(middleware::from_fn_with_state(
139
+
app_context.0.metrics.clone(),
140
+
metrics_middleware,
141
+
))
96
142
.with_state(app_context)
97
143
}
98
144
+1
src/lib.rs
+1
src/lib.rs
···
6
6
// Semi-public modules - needed by binary but with limited exposure
7
7
pub mod cache; // Only create_redis_pool exposed
8
8
pub mod handle_resolver_task; // Factory functions and TaskConfig exposed
9
+
pub mod metrics; // Metrics publishing trait and implementations
9
10
pub mod queue; // Queue adapter system with trait and factory functions
10
11
pub mod sqlite_schema; // SQLite schema management functions exposed
11
12
pub mod task_manager; // Only spawn_cancellable_task exposed
+476
src/metrics.rs
+476
src/metrics.rs
···
1
+
use crate::config::Config;
2
+
use async_trait::async_trait;
3
+
use cadence::{BufferedUdpMetricSink, Counted, CountedExt, Gauged, Metric, QueuingMetricSink, StatsdClient, Timed};
4
+
use std::net::UdpSocket;
5
+
use std::sync::Arc;
6
+
use thiserror::Error;
7
+
use tracing::{debug, error};
8
+
9
+
/// Trait for publishing metrics with counter and gauge support
10
+
/// Designed for minimal compatibility with cadence-style metrics
11
+
#[async_trait]
12
+
pub trait MetricsPublisher: Send + Sync {
13
+
/// Increment a counter by 1
14
+
async fn incr(&self, key: &str);
15
+
16
+
/// Increment a counter by a specific value
17
+
async fn count(&self, key: &str, value: u64);
18
+
19
+
/// Increment a counter with tags
20
+
async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]);
21
+
22
+
/// Increment a counter by a specific value with tags
23
+
async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]);
24
+
25
+
/// Record a gauge value
26
+
async fn gauge(&self, key: &str, value: u64);
27
+
28
+
/// Record a gauge value with tags
29
+
async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]);
30
+
31
+
/// Record a timing in milliseconds
32
+
async fn time(&self, key: &str, millis: u64);
33
+
34
+
/// Record a timing with tags
35
+
async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]);
36
+
}
37
+
38
+
/// No-op implementation for development and testing
39
+
#[derive(Debug, Clone, Default)]
40
+
pub struct NoOpMetricsPublisher;
41
+
42
+
impl NoOpMetricsPublisher {
43
+
pub fn new() -> Self {
44
+
Self
45
+
}
46
+
}
47
+
48
+
#[async_trait]
49
+
impl MetricsPublisher for NoOpMetricsPublisher {
50
+
async fn incr(&self, _key: &str) {
51
+
// No-op
52
+
}
53
+
54
+
async fn count(&self, _key: &str, _value: u64) {
55
+
// No-op
56
+
}
57
+
58
+
async fn incr_with_tags(&self, _key: &str, _tags: &[(&str, &str)]) {
59
+
// No-op
60
+
}
61
+
62
+
async fn count_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) {
63
+
// No-op
64
+
}
65
+
66
+
async fn gauge(&self, _key: &str, _value: u64) {
67
+
// No-op
68
+
}
69
+
70
+
async fn gauge_with_tags(&self, _key: &str, _value: u64, _tags: &[(&str, &str)]) {
71
+
// No-op
72
+
}
73
+
74
+
async fn time(&self, _key: &str, _millis: u64) {
75
+
// No-op
76
+
}
77
+
78
+
async fn time_with_tags(&self, _key: &str, _millis: u64, _tags: &[(&str, &str)]) {
79
+
// No-op
80
+
}
81
+
}
82
+
83
+
/// Statsd-backed metrics publisher using cadence
84
+
pub struct StatsdMetricsPublisher {
85
+
client: StatsdClient,
86
+
default_tags: Vec<(String, String)>,
87
+
}
88
+
89
+
impl StatsdMetricsPublisher {
90
+
/// Create a new StatsdMetricsPublisher with default configuration
91
+
pub fn new(host: &str, prefix: &str) -> Result<Self, Box<dyn std::error::Error>> {
92
+
Self::new_with_tags(host, prefix, vec![])
93
+
}
94
+
95
+
/// Create a new StatsdMetricsPublisher with default tags
96
+
pub fn new_with_tags(
97
+
host: &str,
98
+
prefix: &str,
99
+
default_tags: Vec<(String, String)>
100
+
) -> Result<Self, Box<dyn std::error::Error>> {
101
+
tracing::info!("Creating StatsdMetricsPublisher: host={}, prefix={}, tags={:?}", host, prefix, default_tags);
102
+
103
+
let socket = UdpSocket::bind("0.0.0.0:0")?;
104
+
socket.set_nonblocking(true)?;
105
+
106
+
let buffered_sink = BufferedUdpMetricSink::from(host, socket)?;
107
+
let queuing_sink = QueuingMetricSink::builder()
108
+
.with_error_handler(move |error| {
109
+
error!("Failed to send metric via sink: {}", error);
110
+
})
111
+
.build(buffered_sink);
112
+
let client = StatsdClient::from_sink(prefix, queuing_sink);
113
+
114
+
tracing::info!("StatsdMetricsPublisher created successfully");
115
+
Ok(Self { client, default_tags })
116
+
}
117
+
118
+
/// Create from an existing StatsdClient
119
+
pub fn from_client(client: StatsdClient) -> Self {
120
+
Self::from_client_with_tags(client, vec![])
121
+
}
122
+
123
+
/// Create from an existing StatsdClient with default tags
124
+
pub fn from_client_with_tags(client: StatsdClient, default_tags: Vec<(String, String)>) -> Self {
125
+
Self { client, default_tags }
126
+
}
127
+
128
+
/// Apply default tags to a builder
129
+
fn apply_default_tags<'a, M>(&'a self, mut builder: cadence::MetricBuilder<'a, 'a, M>) -> cadence::MetricBuilder<'a, 'a, M>
130
+
where
131
+
M: Metric + From<String>,
132
+
{
133
+
for (k, v) in &self.default_tags {
134
+
builder = builder.with_tag(k.as_str(), v.as_str());
135
+
}
136
+
builder
137
+
}
138
+
}
139
+
140
+
#[async_trait]
141
+
impl MetricsPublisher for StatsdMetricsPublisher {
142
+
async fn incr(&self, key: &str) {
143
+
debug!("Sending metric incr: {}", key);
144
+
if self.default_tags.is_empty() {
145
+
match self.client.incr(key) {
146
+
Ok(_) => debug!("Successfully sent metric: {}", key),
147
+
Err(e) => error!("Failed to send metric {}: {}", key, e),
148
+
}
149
+
} else {
150
+
let builder = self.client.incr_with_tags(key);
151
+
let builder = self.apply_default_tags(builder);
152
+
let _ = builder.send();
153
+
debug!("Sent metric with tags: {}", key);
154
+
}
155
+
}
156
+
157
+
async fn count(&self, key: &str, value: u64) {
158
+
if self.default_tags.is_empty() {
159
+
let _ = self.client.count(key, value);
160
+
} else {
161
+
let builder = self.client.count_with_tags(key, value);
162
+
let builder = self.apply_default_tags(builder);
163
+
let _ = builder.send();
164
+
}
165
+
}
166
+
167
+
async fn incr_with_tags(&self, key: &str, tags: &[(&str, &str)]) {
168
+
let mut builder = self.client.incr_with_tags(key);
169
+
builder = self.apply_default_tags(builder);
170
+
for (k, v) in tags {
171
+
builder = builder.with_tag(k, v);
172
+
}
173
+
let _ = builder.send();
174
+
}
175
+
176
+
async fn count_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) {
177
+
let mut builder = self.client.count_with_tags(key, value);
178
+
builder = self.apply_default_tags(builder);
179
+
for (k, v) in tags {
180
+
builder = builder.with_tag(k, v);
181
+
}
182
+
let _ = builder.send();
183
+
}
184
+
185
+
async fn gauge(&self, key: &str, value: u64) {
186
+
debug!("Sending metric gauge: {} = {}", key, value);
187
+
if self.default_tags.is_empty() {
188
+
match self.client.gauge(key, value) {
189
+
Ok(_) => debug!("Successfully sent gauge: {} = {}", key, value),
190
+
Err(e) => error!("Failed to send gauge {} = {}: {}", key, value, e),
191
+
}
192
+
} else {
193
+
let builder = self.client.gauge_with_tags(key, value);
194
+
let builder = self.apply_default_tags(builder);
195
+
builder.send();
196
+
debug!("Sent gauge with tags: {} = {}", key, value);
197
+
}
198
+
}
199
+
200
+
async fn gauge_with_tags(&self, key: &str, value: u64, tags: &[(&str, &str)]) {
201
+
let mut builder = self.client.gauge_with_tags(key, value);
202
+
builder = self.apply_default_tags(builder);
203
+
for (k, v) in tags {
204
+
builder = builder.with_tag(k, v);
205
+
}
206
+
let _ = builder.send();
207
+
}
208
+
209
+
async fn time(&self, key: &str, millis: u64) {
210
+
if self.default_tags.is_empty() {
211
+
let _ = self.client.time(key, millis);
212
+
} else {
213
+
let builder = self.client.time_with_tags(key, millis);
214
+
let builder = self.apply_default_tags(builder);
215
+
let _ = builder.send();
216
+
}
217
+
}
218
+
219
+
async fn time_with_tags(&self, key: &str, millis: u64, tags: &[(&str, &str)]) {
220
+
let mut builder = self.client.time_with_tags(key, millis);
221
+
builder = self.apply_default_tags(builder);
222
+
for (k, v) in tags {
223
+
builder = builder.with_tag(k, v);
224
+
}
225
+
let _ = builder.send();
226
+
}
227
+
}
228
+
229
+
/// Type alias for shared metrics publisher
230
+
pub type SharedMetricsPublisher = Arc<dyn MetricsPublisher>;
231
+
232
+
/// Metrics-specific errors
233
+
#[derive(Debug, Error)]
234
+
pub enum MetricsError {
235
+
/// Failed to create metrics publisher
236
+
#[error("error-quickdid-metrics-1 Failed to create metrics publisher: {0}")]
237
+
CreationFailed(String),
238
+
239
+
/// Invalid configuration for metrics
240
+
#[error("error-quickdid-metrics-2 Invalid metrics configuration: {0}")]
241
+
InvalidConfig(String),
242
+
}
243
+
244
+
/// Create a metrics publisher based on configuration
245
+
///
246
+
/// Returns either a no-op publisher or a StatsD publisher based on the
247
+
/// `metrics_adapter` configuration value.
248
+
///
249
+
/// ## Example
250
+
///
251
+
/// ```rust,no_run
252
+
/// use quickdid::config::Config;
253
+
/// use quickdid::metrics::create_metrics_publisher;
254
+
///
255
+
/// # async fn example() -> Result<(), Box<dyn std::error::Error>> {
256
+
/// let config = Config::from_env()?;
257
+
/// let metrics = create_metrics_publisher(&config)?;
258
+
///
259
+
/// // Use the metrics publisher
260
+
/// metrics.incr("request.count").await;
261
+
/// # Ok(())
262
+
/// # }
263
+
/// ```
264
+
pub fn create_metrics_publisher(config: &Config) -> Result<SharedMetricsPublisher, MetricsError> {
265
+
match config.metrics_adapter.as_str() {
266
+
"noop" => {
267
+
Ok(Arc::new(NoOpMetricsPublisher::new()))
268
+
}
269
+
"statsd" => {
270
+
let host = config.metrics_statsd_host.as_ref()
271
+
.ok_or_else(|| MetricsError::InvalidConfig(
272
+
"METRICS_STATSD_HOST is required when using statsd adapter".to_string()
273
+
))?;
274
+
275
+
// Parse tags from comma-separated key:value pairs
276
+
let default_tags = if let Some(tags_str) = &config.metrics_tags {
277
+
tags_str
278
+
.split(',')
279
+
.filter_map(|tag| {
280
+
let parts: Vec<&str> = tag.trim().split(':').collect();
281
+
if parts.len() == 2 {
282
+
Some((parts[0].to_string(), parts[1].to_string()))
283
+
} else {
284
+
error!("Invalid tag format: {}", tag);
285
+
None
286
+
}
287
+
})
288
+
.collect()
289
+
} else {
290
+
vec![]
291
+
};
292
+
293
+
let publisher = StatsdMetricsPublisher::new_with_tags(
294
+
host,
295
+
&config.metrics_prefix,
296
+
default_tags
297
+
).map_err(|e| MetricsError::CreationFailed(e.to_string()))?;
298
+
299
+
Ok(Arc::new(publisher))
300
+
}
301
+
_ => {
302
+
Err(MetricsError::InvalidConfig(format!(
303
+
"Unknown metrics adapter: {}",
304
+
config.metrics_adapter
305
+
)))
306
+
}
307
+
}
308
+
}
309
+
310
+
#[cfg(test)]
311
+
mod tests {
312
+
use super::*;
313
+
314
+
#[tokio::test]
315
+
async fn test_noop_metrics() {
316
+
let metrics = NoOpMetricsPublisher::new();
317
+
318
+
// These should all be no-ops and not panic
319
+
metrics.incr("test.counter").await;
320
+
metrics.count("test.counter", 5).await;
321
+
metrics.incr_with_tags("test.counter", &[("env", "test")]).await;
322
+
metrics.count_with_tags("test.counter", 10, &[("env", "test"), ("service", "quickdid")]).await;
323
+
metrics.gauge("test.gauge", 100).await;
324
+
metrics.gauge_with_tags("test.gauge", 200, &[("host", "localhost")]).await;
325
+
metrics.time("test.timing", 42).await;
326
+
metrics.time_with_tags("test.timing", 84, &[("endpoint", "/resolve")]).await;
327
+
}
328
+
329
+
#[tokio::test]
330
+
async fn test_shared_metrics() {
331
+
let metrics: SharedMetricsPublisher = Arc::new(NoOpMetricsPublisher::new());
332
+
333
+
// Verify it can be used as a shared reference
334
+
metrics.incr("shared.counter").await;
335
+
metrics.gauge("shared.gauge", 50).await;
336
+
337
+
// Verify it can be cloned
338
+
let metrics2 = Arc::clone(&metrics);
339
+
metrics2.count("cloned.counter", 3).await;
340
+
}
341
+
342
+
#[test]
343
+
fn test_create_noop_publisher() {
344
+
use std::env;
345
+
346
+
// Clean up any existing environment variables first
347
+
unsafe {
348
+
env::remove_var("METRICS_ADAPTER");
349
+
env::remove_var("METRICS_STATSD_HOST");
350
+
env::remove_var("METRICS_PREFIX");
351
+
env::remove_var("METRICS_TAGS");
352
+
}
353
+
354
+
// Set up environment for noop adapter
355
+
unsafe {
356
+
env::set_var("HTTP_EXTERNAL", "test.example.com");
357
+
env::set_var("SERVICE_KEY", "did:key:test");
358
+
env::set_var("METRICS_ADAPTER", "noop");
359
+
}
360
+
361
+
let config = Config::from_env().unwrap();
362
+
let metrics = create_metrics_publisher(&config).unwrap();
363
+
364
+
// Should create successfully - actual type checking happens at compile time
365
+
assert!(Arc::strong_count(&metrics) == 1);
366
+
367
+
// Clean up
368
+
unsafe {
369
+
env::remove_var("METRICS_ADAPTER");
370
+
}
371
+
}
372
+
373
+
#[test]
374
+
fn test_create_statsd_publisher() {
375
+
use std::env;
376
+
377
+
// Clean up any existing environment variables first
378
+
unsafe {
379
+
env::remove_var("METRICS_ADAPTER");
380
+
env::remove_var("METRICS_STATSD_HOST");
381
+
env::remove_var("METRICS_PREFIX");
382
+
env::remove_var("METRICS_TAGS");
383
+
}
384
+
385
+
// Set up environment for statsd adapter
386
+
unsafe {
387
+
env::set_var("HTTP_EXTERNAL", "test.example.com");
388
+
env::set_var("SERVICE_KEY", "did:key:test");
389
+
env::set_var("METRICS_ADAPTER", "statsd");
390
+
env::set_var("METRICS_STATSD_HOST", "localhost:8125");
391
+
env::set_var("METRICS_PREFIX", "test");
392
+
env::set_var("METRICS_TAGS", "env:test,service:quickdid");
393
+
}
394
+
395
+
let config = Config::from_env().unwrap();
396
+
let metrics = create_metrics_publisher(&config).unwrap();
397
+
398
+
// Should create successfully
399
+
assert!(Arc::strong_count(&metrics) == 1);
400
+
401
+
// Clean up
402
+
unsafe {
403
+
env::remove_var("METRICS_ADAPTER");
404
+
env::remove_var("METRICS_STATSD_HOST");
405
+
env::remove_var("METRICS_PREFIX");
406
+
env::remove_var("METRICS_TAGS");
407
+
}
408
+
}
409
+
410
+
#[test]
411
+
fn test_missing_statsd_host() {
412
+
use std::env;
413
+
414
+
// Clean up any existing environment variables first
415
+
unsafe {
416
+
env::remove_var("METRICS_ADAPTER");
417
+
env::remove_var("METRICS_STATSD_HOST");
418
+
env::remove_var("METRICS_PREFIX");
419
+
env::remove_var("METRICS_TAGS");
420
+
}
421
+
422
+
// Set up environment for statsd adapter without host
423
+
unsafe {
424
+
env::set_var("HTTP_EXTERNAL", "test.example.com");
425
+
env::set_var("SERVICE_KEY", "did:key:test");
426
+
env::set_var("METRICS_ADAPTER", "statsd");
427
+
env::remove_var("METRICS_STATSD_HOST");
428
+
}
429
+
430
+
let config = Config::from_env().unwrap();
431
+
let result = create_metrics_publisher(&config);
432
+
433
+
// Should fail with invalid config error
434
+
assert!(result.is_err());
435
+
if let Err(e) = result {
436
+
assert!(matches!(e, MetricsError::InvalidConfig(_)));
437
+
}
438
+
439
+
// Clean up
440
+
unsafe {
441
+
env::remove_var("METRICS_ADAPTER");
442
+
}
443
+
}
444
+
445
+
#[test]
446
+
fn test_invalid_adapter() {
447
+
use std::env;
448
+
449
+
// Clean up any existing environment variables first
450
+
unsafe {
451
+
env::remove_var("METRICS_ADAPTER");
452
+
env::remove_var("METRICS_STATSD_HOST");
453
+
env::remove_var("METRICS_PREFIX");
454
+
env::remove_var("METRICS_TAGS");
455
+
}
456
+
457
+
// Set up environment with invalid adapter
458
+
unsafe {
459
+
env::set_var("HTTP_EXTERNAL", "test.example.com");
460
+
env::set_var("SERVICE_KEY", "did:key:test");
461
+
env::set_var("METRICS_ADAPTER", "invalid");
462
+
env::remove_var("METRICS_STATSD_HOST"); // Clean up from other tests
463
+
}
464
+
465
+
let config = Config::from_env().unwrap();
466
+
467
+
// Config validation should catch this
468
+
let validation_result = config.validate();
469
+
assert!(validation_result.is_err());
470
+
471
+
// Clean up
472
+
unsafe {
473
+
env::remove_var("METRICS_ADAPTER");
474
+
}
475
+
}
476
+
}