+2
-4
src/bin/quickdid.rs
+2
-4
src/bin/quickdid.rs
···
156
156
);
157
157
println!();
158
158
println!(" JETSTREAM:");
159
-
println!(
160
-
" JETSTREAM_ENABLED Enable Jetstream consumer (default: false)"
161
-
);
159
+
println!(" JETSTREAM_ENABLED Enable Jetstream consumer (default: false)");
162
160
println!(
163
161
" JETSTREAM_HOSTNAME Jetstream hostname (default: jetstream.atproto.tools)"
164
162
);
···
635
633
compression: false,
636
634
zstd_dictionary_location: String::new(),
637
635
jetstream_hostname: jetstream_hostname.clone(),
638
-
collections: vec![], // Listen to all collections
636
+
collections: vec![], // Listen to all collections
639
637
dids: vec![],
640
638
max_message_size_bytes: None,
641
639
cursor: None,
+5
-1
src/config.rs
+5
-1
src/config.rs
···
339
339
proactive_refresh_threshold: parse_env("PROACTIVE_REFRESH_THRESHOLD", 0.8)?,
340
340
static_files_dir: get_env_or_default("STATIC_FILES_DIR", Some("www")).unwrap(),
341
341
jetstream_enabled: parse_env("JETSTREAM_ENABLED", false)?,
342
-
jetstream_hostname: get_env_or_default("JETSTREAM_HOSTNAME", Some("jetstream.atproto.tools")).unwrap(),
342
+
jetstream_hostname: get_env_or_default(
343
+
"JETSTREAM_HOSTNAME",
344
+
Some("jetstream.atproto.tools"),
345
+
)
346
+
.unwrap(),
343
347
};
344
348
345
349
// Calculate the Cache-Control header value if enabled
+3
-3
src/handle_resolver/memory.rs
+3
-3
src/handle_resolver/memory.rs
···
179
179
async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> {
180
180
// Normalize the handle to lowercase
181
181
let handle = handle.to_lowercase();
182
-
182
+
183
183
// Update the in-memory cache
184
184
{
185
185
let mut cache = self.cache.write().await;
···
190
190
);
191
191
self.metrics.incr("resolver.memory.set").await;
192
192
tracing::debug!("Set handle {} -> DID {} in memory cache", handle, did);
193
-
193
+
194
194
// Track cache size
195
195
let cache_size = cache.len() as u64;
196
196
self.metrics
197
197
.gauge("resolver.memory.cache_entries", cache_size)
198
198
.await;
199
199
}
200
-
200
+
201
201
// Chain to inner resolver
202
202
self.inner.set(&handle, did).await
203
203
}
+124
-63
src/handle_resolver/redis.rs
+124
-63
src/handle_resolver/redis.rs
···
9
9
use crate::handle_resolution_result::HandleResolutionResult;
10
10
use crate::metrics::SharedMetricsPublisher;
11
11
use async_trait::async_trait;
12
-
use atproto_identity::resolve::{parse_input, InputType};
12
+
use atproto_identity::resolve::{InputType, parse_input};
13
13
use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands};
14
14
use metrohash::MetroHash64;
15
15
use std::hash::Hasher as _;
···
124
124
/// This method removes both the handle->DID mapping and the reverse DID->handle mapping.
125
125
async fn purge_handle(&self, handle: &str) -> Result<(), HandleResolverError> {
126
126
let handle_key = self.make_key(handle);
127
-
127
+
128
128
match self.pool.get().await {
129
129
Ok(mut conn) => {
130
130
// First, try to get the cached result to find the associated DID
···
142
142
if let Ok(cached_result) = HandleResolutionResult::from_bytes(&cached_bytes) {
143
143
if let Some(did) = cached_result.to_did() {
144
144
let did_key = self.make_key(&did);
145
-
145
+
146
146
// Delete both the handle key and the DID key
147
147
let _: Result<(), _> = conn.del(&[&handle_key, &did_key]).await;
148
-
148
+
149
149
tracing::debug!("Purged handle {} and associated DID {}", handle, did);
150
-
self.metrics.incr("resolver.redis.purge_handle_success").await;
150
+
self.metrics
151
+
.incr("resolver.redis.purge_handle_success")
152
+
.await;
151
153
} else {
152
154
// Just delete the handle key if no DID was resolved
153
155
let _: Result<(), _> = conn.del(&handle_key).await;
154
156
tracing::debug!("Purged unresolved handle {}", handle);
155
-
self.metrics.incr("resolver.redis.purge_handle_unresolved").await;
157
+
self.metrics
158
+
.incr("resolver.redis.purge_handle_unresolved")
159
+
.await;
156
160
}
157
161
} else {
158
162
// If we can't deserialize, just delete the handle key
159
163
let _: Result<(), _> = conn.del(&handle_key).await;
160
164
tracing::warn!("Purged handle {} with undeserializable data", handle);
161
-
self.metrics.incr("resolver.redis.purge_handle_corrupt").await;
165
+
self.metrics
166
+
.incr("resolver.redis.purge_handle_corrupt")
167
+
.await;
162
168
}
163
169
} else {
164
170
tracing::debug!("Handle {} not found in cache for purging", handle);
165
-
self.metrics.incr("resolver.redis.purge_handle_not_found").await;
171
+
self.metrics
172
+
.incr("resolver.redis.purge_handle_not_found")
173
+
.await;
166
174
}
167
-
175
+
168
176
Ok(())
169
177
}
170
178
Err(e) => {
171
179
tracing::warn!("Failed to get Redis connection for purging: {}", e);
172
-
self.metrics.incr("resolver.redis.purge_connection_error").await;
173
-
Err(HandleResolverError::ResolutionFailed(format!("Redis connection error: {}", e)))
180
+
self.metrics
181
+
.incr("resolver.redis.purge_connection_error")
182
+
.await;
183
+
Err(HandleResolverError::ResolutionFailed(format!(
184
+
"Redis connection error: {}",
185
+
e
186
+
)))
174
187
}
175
188
}
176
189
}
···
180
193
/// This method removes both the DID->handle mapping and the handle->DID mapping.
181
194
async fn purge_did(&self, did: &str) -> Result<(), HandleResolverError> {
182
195
let did_key = self.make_key(did);
183
-
196
+
184
197
match self.pool.get().await {
185
198
Ok(mut conn) => {
186
199
// First, try to get the associated handle from the reverse mapping
···
197
210
if let Some(handle_bytes) = handle_bytes {
198
211
if let Ok(handle) = String::from_utf8(handle_bytes) {
199
212
let handle_key = self.make_key(&handle);
200
-
213
+
201
214
// Delete both the DID key and the handle key
202
215
let _: Result<(), _> = conn.del(&[&did_key, &handle_key]).await;
203
-
216
+
204
217
tracing::debug!("Purged DID {} and associated handle {}", did, handle);
205
218
self.metrics.incr("resolver.redis.purge_did_success").await;
206
219
} else {
···
211
224
}
212
225
} else {
213
226
tracing::debug!("DID {} not found in cache for purging", did);
214
-
self.metrics.incr("resolver.redis.purge_did_not_found").await;
227
+
self.metrics
228
+
.incr("resolver.redis.purge_did_not_found")
229
+
.await;
215
230
}
216
-
231
+
217
232
Ok(())
218
233
}
219
234
Err(e) => {
220
235
tracing::warn!("Failed to get Redis connection for purging: {}", e);
221
-
self.metrics.incr("resolver.redis.purge_connection_error").await;
222
-
Err(HandleResolverError::ResolutionFailed(format!("Redis connection error: {}", e)))
236
+
self.metrics
237
+
.incr("resolver.redis.purge_connection_error")
238
+
.await;
239
+
Err(HandleResolverError::ResolutionFailed(format!(
240
+
"Redis connection error: {}",
241
+
e
242
+
)))
223
243
}
224
244
}
225
245
}
···
323
343
self.metrics.incr("resolver.redis.cache_set_error").await;
324
344
} else {
325
345
self.metrics.incr("resolver.redis.cache_set").await;
326
-
346
+
327
347
// For successful resolutions, also store reverse DID -> handle mapping
328
348
if let Ok((did, _)) = &result {
329
349
let did_key = self.make_key(did);
330
350
if let Err(e) = conn
331
-
.set_ex::<_, _, ()>(&did_key, handle.as_bytes(), self.ttl_seconds())
351
+
.set_ex::<_, _, ()>(
352
+
&did_key,
353
+
handle.as_bytes(),
354
+
self.ttl_seconds(),
355
+
)
332
356
.await
333
357
{
334
-
tracing::warn!("Failed to cache reverse DID->handle mapping in Redis: {}", e);
335
-
self.metrics.incr("resolver.redis.reverse_cache_set_error").await;
358
+
tracing::warn!(
359
+
"Failed to cache reverse DID->handle mapping in Redis: {}",
360
+
e
361
+
);
362
+
self.metrics
363
+
.incr("resolver.redis.reverse_cache_set_error")
364
+
.await;
336
365
} else {
337
-
tracing::debug!("Cached reverse mapping for DID {}: {}", did, handle);
366
+
tracing::debug!(
367
+
"Cached reverse mapping for DID {}: {}",
368
+
did,
369
+
handle
370
+
);
338
371
self.metrics.incr("resolver.redis.reverse_cache_set").await;
339
372
}
340
373
}
···
366
399
367
400
async fn purge(&self, subject: &str) -> Result<(), HandleResolverError> {
368
401
// Use atproto_identity's parse_input to properly identify the input type
369
-
let parsed_input = parse_input(subject).map_err(|_| HandleResolverError::InvalidSubject(subject.to_string()))?;
402
+
let parsed_input = parse_input(subject)
403
+
.map_err(|_| HandleResolverError::InvalidSubject(subject.to_string()))?;
370
404
match parsed_input {
371
405
InputType::Handle(handle) => {
372
406
// It's a handle, purge using the lowercase version
···
391
425
let resolution_result = match HandleResolutionResult::success(did) {
392
426
Ok(res) => res,
393
427
Err(e) => {
394
-
tracing::warn!("Failed to create resolution result for set operation: {}", e);
395
-
self.metrics.incr("resolver.redis.set_result_create_error").await;
396
-
return Err(HandleResolverError::InvalidSubject(
397
-
format!("Failed to create resolution result: {}", e)
398
-
));
428
+
tracing::warn!(
429
+
"Failed to create resolution result for set operation: {}",
430
+
e
431
+
);
432
+
self.metrics
433
+
.incr("resolver.redis.set_result_create_error")
434
+
.await;
435
+
return Err(HandleResolverError::InvalidSubject(format!(
436
+
"Failed to create resolution result: {}",
437
+
e
438
+
)));
399
439
}
400
440
};
401
441
···
409
449
{
410
450
tracing::warn!("Failed to set handle->DID mapping in Redis: {}", e);
411
451
self.metrics.incr("resolver.redis.set_cache_error").await;
412
-
return Err(HandleResolverError::ResolutionFailed(
413
-
format!("Failed to set cache: {}", e)
414
-
));
452
+
return Err(HandleResolverError::ResolutionFailed(format!(
453
+
"Failed to set cache: {}",
454
+
e
455
+
)));
415
456
}
416
457
417
458
// Set the reverse DID -> handle mapping
···
420
461
.await
421
462
{
422
463
tracing::warn!("Failed to set DID->handle mapping in Redis: {}", e);
423
-
self.metrics.incr("resolver.redis.set_reverse_cache_error").await;
464
+
self.metrics
465
+
.incr("resolver.redis.set_reverse_cache_error")
466
+
.await;
424
467
// Don't fail the operation, but log the warning
425
468
}
426
469
···
429
472
Ok(())
430
473
}
431
474
Err(e) => {
432
-
tracing::warn!("Failed to serialize resolution result for set operation: {}", e);
433
-
self.metrics.incr("resolver.redis.set_serialize_error").await;
434
-
Err(HandleResolverError::InvalidSubject(
435
-
format!("Failed to serialize: {}", e)
436
-
))
475
+
tracing::warn!(
476
+
"Failed to serialize resolution result for set operation: {}",
477
+
e
478
+
);
479
+
self.metrics
480
+
.incr("resolver.redis.set_serialize_error")
481
+
.await;
482
+
Err(HandleResolverError::InvalidSubject(format!(
483
+
"Failed to serialize: {}",
484
+
e
485
+
)))
437
486
}
438
487
}
439
488
}
440
489
Err(e) => {
441
490
tracing::warn!("Failed to get Redis connection for set operation: {}", e);
442
-
self.metrics.incr("resolver.redis.set_connection_error").await;
443
-
Err(HandleResolverError::ResolutionFailed(
444
-
format!("Redis connection error: {}", e)
445
-
))
491
+
self.metrics
492
+
.incr("resolver.redis.set_connection_error")
493
+
.await;
494
+
Err(HandleResolverError::ResolutionFailed(format!(
495
+
"Redis connection error: {}",
496
+
e
497
+
)))
446
498
}
447
499
}
448
500
}
···
634
686
let mut h = MetroHash64::default();
635
687
h.write(test_handle.as_bytes());
636
688
let handle_key = format!("{}{}", test_prefix, h.finish());
637
-
689
+
638
690
let mut h2 = MetroHash64::default();
639
691
h2.write(expected_did.as_bytes());
640
692
let did_key = format!("{}{}", test_prefix, h2.finish());
641
-
693
+
642
694
// Check handle -> DID mapping exists
643
695
let handle_exists: bool = conn.exists(&handle_key).await.unwrap();
644
696
assert!(handle_exists, "Handle key should exist in cache");
645
-
697
+
646
698
// Check DID -> handle mapping exists
647
699
let did_exists: bool = conn.exists(&did_key).await.unwrap();
648
700
assert!(did_exists, "DID key should exist in cache");
649
-
701
+
650
702
// Test purge by handle using the trait method
651
703
redis_resolver.purge(test_handle).await.unwrap();
652
-
704
+
653
705
// Verify both keys were deleted
654
706
let handle_exists_after: bool = conn.exists(&handle_key).await.unwrap();
655
-
assert!(!handle_exists_after, "Handle key should be deleted after purge");
656
-
707
+
assert!(
708
+
!handle_exists_after,
709
+
"Handle key should be deleted after purge"
710
+
);
711
+
657
712
let did_exists_after: bool = conn.exists(&did_key).await.unwrap();
658
713
assert!(!did_exists_after, "DID key should be deleted after purge");
659
714
}
···
664
719
665
720
// Test purge by DID using the trait method
666
721
redis_resolver.purge(expected_did).await.unwrap();
667
-
722
+
668
723
// Verify both keys were deleted again
669
724
if let Ok(mut conn) = pool.get().await {
670
725
let mut h = MetroHash64::default();
671
726
h.write(test_handle.as_bytes());
672
727
let handle_key = format!("{}{}", test_prefix, h.finish());
673
-
728
+
674
729
let mut h2 = MetroHash64::default();
675
730
h2.write(expected_did.as_bytes());
676
731
let did_key = format!("{}{}", test_prefix, h2.finish());
677
-
732
+
678
733
let handle_exists: bool = conn.exists(&handle_key).await.unwrap();
679
-
assert!(!handle_exists, "Handle key should be deleted after DID purge");
680
-
734
+
assert!(
735
+
!handle_exists,
736
+
"Handle key should be deleted after DID purge"
737
+
);
738
+
681
739
let did_exists: bool = conn.exists(&did_key).await.unwrap();
682
740
assert!(!did_exists, "DID key should be deleted after DID purge");
683
741
}
···
717
775
718
776
// Test different input formats
719
777
let test_cases = vec![
720
-
("alice.bsky.social", "alice.bsky.social"), // Handle
721
-
("ALICE.BSKY.SOCIAL", "alice.bsky.social"), // Handle (uppercase)
722
-
("did:plc:abc123", "did:plc:abc123"), // PLC DID
778
+
("alice.bsky.social", "alice.bsky.social"), // Handle
779
+
("ALICE.BSKY.SOCIAL", "alice.bsky.social"), // Handle (uppercase)
780
+
("did:plc:abc123", "did:plc:abc123"), // PLC DID
723
781
("did:web:example.com", "did:web:example.com"), // Web DID
724
782
];
725
783
···
738
796
let mut h = MetroHash64::default();
739
797
h.write(expected_key.as_bytes());
740
798
let key = format!("{}{}", test_prefix, h.finish());
741
-
799
+
742
800
// After purge, key should not exist
743
801
let exists: bool = conn.exists(&key).await.unwrap_or(false);
744
802
assert!(!exists, "Key for {} should not exist after purge", input);
···
789
847
assert_eq!(resolved_did, test_did);
790
848
791
849
// Test that uppercase handles are normalized
792
-
redis_resolver.set("DAVE.BSKY.SOCIAL", "did:plc:dave456").await.unwrap();
850
+
redis_resolver
851
+
.set("DAVE.BSKY.SOCIAL", "did:plc:dave456")
852
+
.await
853
+
.unwrap();
793
854
let (resolved_did2, _) = redis_resolver.resolve("dave.bsky.social").await.unwrap();
794
855
assert_eq!(resolved_did2, "did:plc:dave456");
795
856
···
798
859
let mut h = MetroHash64::default();
799
860
h.write(test_handle.as_bytes());
800
861
let handle_key = format!("{}{}", test_prefix, h.finish());
801
-
862
+
802
863
let mut h2 = MetroHash64::default();
803
864
h2.write(test_did.as_bytes());
804
865
let did_key = format!("{}{}", test_prefix, h2.finish());
805
-
866
+
806
867
// Check both keys exist
807
868
let handle_exists: bool = conn.exists(&handle_key).await.unwrap();
808
869
assert!(handle_exists, "Handle key should exist after set");
809
-
870
+
810
871
let did_exists: bool = conn.exists(&did_key).await.unwrap();
811
872
assert!(did_exists, "DID key should exist after set");
812
-
873
+
813
874
// Clean up test data
814
875
let _: Result<(), _> = conn.del(&[&handle_key, &did_key]).await;
815
876
}
+21
-9
src/handle_resolver/sqlite.rs
+21
-9
src/handle_resolver/sqlite.rs
···
259
259
async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> {
260
260
// Normalize the handle to lowercase
261
261
let handle = handle.to_lowercase();
262
-
262
+
263
263
// Update the SQLite cache
264
264
if let Ok(mut conn) = self.pool.acquire().await {
265
265
// Create a resolution result for the successful mapping
266
266
let resolution_result = match HandleResolutionResult::success(did) {
267
267
Ok(res) => res,
268
268
Err(e) => {
269
-
tracing::warn!("Failed to create resolution result for set operation: {}", e);
270
-
self.metrics.incr("resolver.sqlite.set_result_create_error").await;
269
+
tracing::warn!(
270
+
"Failed to create resolution result for set operation: {}",
271
+
e
272
+
);
273
+
self.metrics
274
+
.incr("resolver.sqlite.set_result_create_error")
275
+
.await;
271
276
// Still chain to inner resolver even if we can't cache
272
277
return self.inner.set(&handle, did).await;
273
278
}
···
281
286
.duration_since(std::time::UNIX_EPOCH)
282
287
.unwrap_or_default()
283
288
.as_secs() as i64;
284
-
289
+
285
290
let expires_at = timestamp + self.ttl_seconds as i64;
286
-
291
+
287
292
match sqlx::query(
288
293
"INSERT OR REPLACE INTO handle_resolution_cache (handle, resolved_value, created_at, expires_at) VALUES (?, ?, ?, ?)"
289
294
)
···
306
311
}
307
312
}
308
313
Err(e) => {
309
-
tracing::warn!("Failed to serialize resolution result for set operation: {}", e);
310
-
self.metrics.incr("resolver.sqlite.set_serialize_error").await;
314
+
tracing::warn!(
315
+
"Failed to serialize resolution result for set operation: {}",
316
+
e
317
+
);
318
+
self.metrics
319
+
.incr("resolver.sqlite.set_serialize_error")
320
+
.await;
311
321
// Still chain to inner resolver even if serialization fails
312
322
}
313
323
}
314
324
} else {
315
325
tracing::warn!("Failed to get SQLite connection for set operation");
316
-
self.metrics.incr("resolver.sqlite.set_connection_error").await;
326
+
self.metrics
327
+
.incr("resolver.sqlite.set_connection_error")
328
+
.await;
317
329
}
318
-
330
+
319
331
// Chain to inner resolver
320
332
self.inner.set(&handle, did).await
321
333
}
+14
-4
src/handle_resolver/traits.rs
+14
-4
src/handle_resolver/traits.rs
···
121
121
#[tokio::test]
122
122
async fn test_default_purge_implementation() {
123
123
let resolver = NoOpTestResolver;
124
-
124
+
125
125
// Default implementation should always return Ok(())
126
126
assert!(resolver.purge("alice.bsky.social").await.is_ok());
127
127
assert!(resolver.purge("did:plc:xyz123").await.is_ok());
···
131
131
#[tokio::test]
132
132
async fn test_default_set_implementation() {
133
133
let resolver = NoOpTestResolver;
134
-
134
+
135
135
// Default implementation should always return Ok(())
136
-
assert!(resolver.set("alice.bsky.social", "did:plc:xyz123").await.is_ok());
137
-
assert!(resolver.set("bob.example.com", "did:web:example.com").await.is_ok());
136
+
assert!(
137
+
resolver
138
+
.set("alice.bsky.social", "did:plc:xyz123")
139
+
.await
140
+
.is_ok()
141
+
);
142
+
assert!(
143
+
resolver
144
+
.set("bob.example.com", "did:web:example.com")
145
+
.await
146
+
.is_ok()
147
+
);
138
148
assert!(resolver.set("", "").await.is_ok());
139
149
}
140
150
}
+8
-2
src/jetstream_handler.rs
+8
-2
src/jetstream_handler.rs
···
282
282
// Verify the set method was called
283
283
let set_calls = resolver.get_set_calls();
284
284
assert_eq!(set_calls.len(), 1);
285
-
assert_eq!(set_calls[0], ("alice.bsky.social".to_string(), "did:plc:testuser".to_string()));
285
+
assert_eq!(
286
+
set_calls[0],
287
+
(
288
+
"alice.bsky.social".to_string(),
289
+
"did:plc:testuser".to_string()
290
+
)
291
+
);
286
292
287
293
// Verify no purge was called
288
294
let purge_calls = resolver.get_purge_calls();
···
351
357
352
358
assert_eq!(handler.handler_id(), "quickdid_handler");
353
359
}
354
-
}
360
+
}