+4
-2
src/bin/quickdid.rs
+4
-2
src/bin/quickdid.rs
···
17
17
metrics::create_metrics_publisher,
18
18
queue::{
19
19
HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue,
20
-
create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue,
20
+
create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue,
21
21
create_sqlite_queue_with_max_size,
22
22
},
23
23
sqlite_schema::create_sqlite_pool,
···
117
117
" QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)"
118
118
);
119
119
println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)");
120
-
println!(" QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)");
120
+
println!(
121
+
" QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)"
122
+
);
121
123
println!(" QUEUE_REDIS_DEDUP_TTL TTL for dedup keys in seconds (default: 60)");
122
124
println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)");
123
125
println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)");
+6
-2
src/handle_resolver/proactive_refresh.rs
+6
-2
src/handle_resolver/proactive_refresh.rs
···
93
93
resolve_time_us = resolve_time,
94
94
"Fast resolution detected, considering proactive refresh"
95
95
);
96
-
96
+
97
97
if let Some(metrics) = &self.metrics {
98
98
metrics.incr("proactive_refresh.cache_hit_detected").await;
99
99
}
···
171
171
threshold: f64,
172
172
) -> Arc<dyn HandleResolver> {
173
173
Arc::new(DynProactiveRefreshResolver::with_metrics(
174
-
inner, queue, Some(metrics), cache_ttl, threshold,
174
+
inner,
175
+
queue,
176
+
Some(metrics),
177
+
cache_ttl,
178
+
threshold,
175
179
))
176
180
}
177
181
+8
-6
src/handle_resolver_task.rs
+8
-6
src/handle_resolver_task.rs
···
117
117
118
118
/// Check if an error represents a soft failure (handle not found)
119
119
/// rather than a real error condition.
120
-
///
120
+
///
121
121
/// These atproto_identity library errors indicate the handle doesn't support
122
122
/// the specific resolution method, which is normal and expected:
123
123
/// - error-atproto-identity-resolve-4: DNS resolution failed (no records)
···
130
130
error_str.contains("NoRecordsFound")
131
131
} else if error_str.starts_with("error-atproto-identity-resolve-5") {
132
132
// HTTP resolution - check if it's a hostname lookup failure
133
-
error_str.contains("No address associated with hostname") ||
134
-
error_str.contains("failed to lookup address information")
133
+
error_str.contains("No address associated with hostname")
134
+
|| error_str.contains("failed to lookup address information")
135
135
} else {
136
136
false
137
137
}
···
181
181
}
182
182
Ok(Err(e)) => {
183
183
let error_str = e.to_string();
184
-
184
+
185
185
if Self::is_soft_failure(&error_str) {
186
186
// This is a soft failure - handle simply doesn't support this resolution method
187
187
// Publish not-found metrics
···
388
388
assert!(!HandleResolverTask::is_soft_failure(dns_real_error));
389
389
390
390
// Test HTTP error that is NOT a soft failure (connection timeout)
391
-
let http_timeout = "error-atproto-identity-resolve-5 HTTP resolution failed: connection timeout";
391
+
let http_timeout =
392
+
"error-atproto-identity-resolve-5 HTTP resolution failed: connection timeout";
392
393
assert!(!HandleResolverTask::is_soft_failure(http_timeout));
393
394
394
395
// Test HTTP error that is NOT a soft failure (500 error)
···
396
397
assert!(!HandleResolverTask::is_soft_failure(http_500));
397
398
398
399
// Test QuickDID errors should never be soft failures
399
-
let quickdid_error = "error-quickdid-resolve-1 Failed to resolve subject: internal server error";
400
+
let quickdid_error =
401
+
"error-quickdid-resolve-1 Failed to resolve subject: internal server error";
400
402
assert!(!HandleResolverTask::is_soft_failure(quickdid_error));
401
403
402
404
// Test other atproto_identity error codes should not be soft failures
+4
-1
src/metrics.rs
+4
-1
src/metrics.rs
···
139
139
.build(buffered_sink);
140
140
let client = StatsdClient::from_sink(prefix, queuing_sink);
141
141
142
-
tracing::info!("StatsdMetricsPublisher created successfully with bind address: {}", bind_addr);
142
+
tracing::info!(
143
+
"StatsdMetricsPublisher created successfully with bind address: {}",
144
+
bind_addr
145
+
);
143
146
Ok(Self {
144
147
client,
145
148
default_tags,
+58
-27
src/queue/redis.rs
+58
-27
src/queue/redis.rs
···
193
193
}
194
194
195
195
let dedup_key = self.dedup_key(item_id);
196
-
196
+
197
197
// Use SET NX EX to atomically set if not exists with expiry
198
198
// Returns OK if the key was set, Nil if it already existed
199
199
let result: Option<String> = deadpool_redis::redis::cmd("SET")
···
278
278
if self.dedup_enabled {
279
279
let dedup_id = work.dedup_key();
280
280
let is_new = self.check_and_mark_dedup(&mut conn, &dedup_id).await?;
281
-
281
+
282
282
if !is_new {
283
283
debug!(
284
284
dedup_key = %dedup_id,
···
539
539
.unwrap()
540
540
.as_nanos()
541
541
);
542
-
542
+
543
543
// Create adapter with deduplication enabled
544
544
let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup(
545
545
pool.clone(),
546
546
"test-worker-dedup".to_string(),
547
547
test_prefix.clone(),
548
548
1,
549
-
true, // Enable deduplication
550
-
2, // 2 second TTL for quick testing
549
+
true, // Enable deduplication
550
+
2, // 2 second TTL for quick testing
551
551
);
552
552
553
553
let work = HandleResolutionWork::new("alice.example.com".to_string());
554
554
555
555
// First push should succeed
556
-
adapter.push(work.clone()).await.expect("First push should succeed");
557
-
556
+
adapter
557
+
.push(work.clone())
558
+
.await
559
+
.expect("First push should succeed");
560
+
558
561
// Second push of same item should be deduplicated (but still return Ok)
559
-
adapter.push(work.clone()).await.expect("Second push should succeed (deduplicated)");
560
-
562
+
adapter
563
+
.push(work.clone())
564
+
.await
565
+
.expect("Second push should succeed (deduplicated)");
566
+
561
567
// Queue should only have one item
562
568
let depth = adapter.depth().await;
563
-
assert_eq!(depth, Some(1), "Queue should only have one item after deduplication");
564
-
569
+
assert_eq!(
570
+
depth,
571
+
Some(1),
572
+
"Queue should only have one item after deduplication"
573
+
);
574
+
565
575
// Pull the item
566
576
let pulled = adapter.pull().await;
567
577
assert_eq!(pulled, Some(work.clone()));
568
-
578
+
569
579
// Queue should now be empty
570
580
let depth = adapter.depth().await;
571
581
assert_eq!(depth, Some(0), "Queue should be empty after pulling");
572
-
582
+
573
583
// Wait for dedup TTL to expire
574
584
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
575
-
585
+
576
586
// Should be able to push again after TTL expires
577
-
adapter.push(work.clone()).await.expect("Push after TTL expiry should succeed");
578
-
587
+
adapter
588
+
.push(work.clone())
589
+
.await
590
+
.expect("Push after TTL expiry should succeed");
591
+
579
592
let depth = adapter.depth().await;
580
-
assert_eq!(depth, Some(1), "Queue should have one item after TTL expiry");
593
+
assert_eq!(
594
+
depth,
595
+
Some(1),
596
+
"Queue should have one item after TTL expiry"
597
+
);
581
598
}
582
599
583
600
#[tokio::test]
···
599
616
.unwrap()
600
617
.as_nanos()
601
618
);
602
-
619
+
603
620
// Create adapter with deduplication disabled
604
621
let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup(
605
622
pool.clone(),
606
623
"test-worker-nodedup".to_string(),
607
624
test_prefix.clone(),
608
625
1,
609
-
false, // Disable deduplication
626
+
false, // Disable deduplication
610
627
60,
611
628
);
612
629
613
630
let work = HandleResolutionWork::new("bob.example.com".to_string());
614
631
615
632
// Push same item twice
616
-
adapter.push(work.clone()).await.expect("First push should succeed");
617
-
adapter.push(work.clone()).await.expect("Second push should succeed");
618
-
633
+
adapter
634
+
.push(work.clone())
635
+
.await
636
+
.expect("First push should succeed");
637
+
adapter
638
+
.push(work.clone())
639
+
.await
640
+
.expect("Second push should succeed");
641
+
619
642
// Queue should have two items (no deduplication)
620
643
let depth = adapter.depth().await;
621
-
assert_eq!(depth, Some(2), "Queue should have two items when deduplication is disabled");
622
-
644
+
assert_eq!(
645
+
depth,
646
+
Some(2),
647
+
"Queue should have two items when deduplication is disabled"
648
+
);
649
+
623
650
// Pull both items
624
651
let pulled1 = adapter.pull().await;
625
652
assert_eq!(pulled1, Some(work.clone()));
626
-
653
+
627
654
let pulled2 = adapter.pull().await;
628
655
assert_eq!(pulled2, Some(work.clone()));
629
-
656
+
630
657
// Queue should now be empty
631
658
let depth = adapter.depth().await;
632
-
assert_eq!(depth, Some(0), "Queue should be empty after pulling all items");
659
+
assert_eq!(
660
+
depth,
661
+
Some(0),
662
+
"Queue should be empty after pulling all items"
663
+
);
633
664
}
634
665
635
666
#[tokio::test]
+1
-1
src/queue/work.rs
+1
-1
src/queue/work.rs
···
125
125
// Same handle should have same dedup key
126
126
assert_eq!(work1.dedup_key(), work2.dedup_key());
127
127
assert_eq!(work1.dedup_key(), "alice.example.com");
128
-
128
+
129
129
// Different handle should have different dedup key
130
130
assert_ne!(work1.dedup_key(), work3.dedup_key());
131
131
assert_eq!(work3.dedup_key(), "bob.example.com");