+2
CLAUDE.md
+2
CLAUDE.md
···
127
127
- `QUEUE_BUFFER_SIZE`: Buffer size for MPSC queue (default: 1000)
128
128
- `QUEUE_SQLITE_MAX_SIZE`: Max queue size for SQLite work shedding (default: 10000)
129
129
- `QUEUE_REDIS_TIMEOUT`: Redis blocking timeout in seconds (default: 5)
130
+
- `QUEUE_REDIS_DEDUP_ENABLED`: Enable queue deduplication to prevent duplicate handles (default: false)
131
+
- `QUEUE_REDIS_DEDUP_TTL`: TTL for deduplication keys in seconds (default: 60)
130
132
131
133
### Optional - Rate Limiting
132
134
- `RESOLVER_MAX_CONCURRENT`: Maximum concurrent handle resolutions (default: 0 = disabled)
+2
README.md
+2
README.md
···
119
119
- `QUEUE_BUFFER_SIZE`: MPSC queue buffer size (default: 1000)
120
120
- `QUEUE_REDIS_PREFIX`: Redis key prefix for queues (default: queue:handleresolver:)
121
121
- `QUEUE_REDIS_TIMEOUT`: Redis blocking timeout in seconds (default: 5)
122
+
- `QUEUE_REDIS_DEDUP_ENABLED`: Enable queue deduplication (default: false)
123
+
- `QUEUE_REDIS_DEDUP_TTL`: TTL for deduplication keys in seconds (default: 60)
122
124
- `QUEUE_SQLITE_MAX_SIZE`: Max SQLite queue size for work shedding (default: 10000)
123
125
124
126
#### Rate Limiting
+25
-9
src/bin/quickdid.rs
+25
-9
src/bin/quickdid.rs
···
17
17
metrics::create_metrics_publisher,
18
18
queue::{
19
19
HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue,
20
-
create_redis_queue, create_sqlite_queue, create_sqlite_queue_with_max_size,
20
+
create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue,
21
+
create_sqlite_queue_with_max_size,
21
22
},
22
23
sqlite_schema::create_sqlite_pool,
23
24
task_manager::spawn_cancellable_task,
···
116
117
" QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)"
117
118
);
118
119
println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)");
120
+
println!(" QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)");
121
+
println!(" QUEUE_REDIS_DEDUP_TTL TTL for dedup keys in seconds (default: 60)");
119
122
println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)");
120
123
println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)");
121
124
println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)");
···
316
319
if let Some(url) = queue_redis_url {
317
320
if let Some(pool) = try_create_redis_pool(url, "queue adapter") {
318
321
tracing::info!(
319
-
"Creating Redis queue adapter with prefix: {}",
320
-
config.queue_redis_prefix
322
+
"Creating Redis queue adapter with prefix: {}, dedup: {}, dedup_ttl: {}s",
323
+
config.queue_redis_prefix,
324
+
config.queue_redis_dedup_enabled,
325
+
config.queue_redis_dedup_ttl
321
326
);
322
-
create_redis_queue::<HandleResolutionWork>(
323
-
pool,
324
-
config.queue_worker_id.clone(),
325
-
config.queue_redis_prefix.clone(),
326
-
config.queue_redis_timeout,
327
-
)
327
+
if config.queue_redis_dedup_enabled {
328
+
create_redis_queue_with_dedup::<HandleResolutionWork>(
329
+
pool,
330
+
config.queue_worker_id.clone(),
331
+
config.queue_redis_prefix.clone(),
332
+
config.queue_redis_timeout,
333
+
true,
334
+
config.queue_redis_dedup_ttl,
335
+
)
336
+
} else {
337
+
create_redis_queue::<HandleResolutionWork>(
338
+
pool,
339
+
config.queue_worker_id.clone(),
340
+
config.queue_redis_prefix.clone(),
341
+
config.queue_redis_timeout,
342
+
)
343
+
}
328
344
} else {
329
345
tracing::warn!("Falling back to MPSC queue adapter");
330
346
// Fall back to MPSC if Redis fails
+15
src/config.rs
+15
src/config.rs
···
161
161
/// Redis blocking timeout for queue operations in seconds (e.g., 5)
162
162
pub queue_redis_timeout: u64,
163
163
164
+
/// Enable deduplication for Redis queue to prevent duplicate handles
165
+
/// Default: false
166
+
pub queue_redis_dedup_enabled: bool,
167
+
168
+
/// TTL for Redis queue deduplication keys in seconds
169
+
/// Default: 60 (1 minute)
170
+
pub queue_redis_dedup_ttl: u64,
171
+
164
172
/// Maximum queue size for SQLite adapter work shedding (e.g., 10000)
165
173
/// When exceeded, oldest entries are deleted to maintain this limit.
166
174
/// Set to 0 to disable work shedding (unlimited queue size).
···
323
331
cache_ttl_redis: parse_env("CACHE_TTL_REDIS", 7776000)?,
324
332
cache_ttl_sqlite: parse_env("CACHE_TTL_SQLITE", 7776000)?,
325
333
queue_redis_timeout: parse_env("QUEUE_REDIS_TIMEOUT", 5)?,
334
+
queue_redis_dedup_enabled: parse_env("QUEUE_REDIS_DEDUP_ENABLED", false)?,
335
+
queue_redis_dedup_ttl: parse_env("QUEUE_REDIS_DEDUP_TTL", 60)?,
326
336
queue_sqlite_max_size: parse_env("QUEUE_SQLITE_MAX_SIZE", 10000)?,
327
337
resolver_max_concurrent: parse_env("RESOLVER_MAX_CONCURRENT", 0)?,
328
338
resolver_max_concurrent_timeout_ms: parse_env("RESOLVER_MAX_CONCURRENT_TIMEOUT_MS", 0)?,
···
407
417
if self.queue_redis_timeout == 0 {
408
418
return Err(ConfigError::InvalidTimeout(
409
419
"QUEUE_REDIS_TIMEOUT must be > 0".to_string(),
420
+
));
421
+
}
422
+
if self.queue_redis_dedup_enabled && self.queue_redis_dedup_ttl == 0 {
423
+
return Err(ConfigError::InvalidTtl(
424
+
"QUEUE_REDIS_DEDUP_TTL must be > 0 when deduplication is enabled".to_string(),
410
425
));
411
426
}
412
427
match self.queue_adapter.as_str() {
+60
-4
src/queue/factory.rs
+60
-4
src/queue/factory.rs
···
10
10
11
11
use super::{
12
12
adapter::QueueAdapter, mpsc::MpscQueueAdapter, noop::NoopQueueAdapter,
13
-
redis::RedisQueueAdapter, sqlite::SqliteQueueAdapter,
13
+
redis::RedisQueueAdapter, sqlite::SqliteQueueAdapter, work::DedupKey,
14
14
};
15
15
16
16
// ========= MPSC Queue Factories =========
···
81
81
/// # Examples
82
82
///
83
83
/// ```no_run
84
-
/// use quickdid::queue::create_redis_queue;
84
+
/// use quickdid::queue::{create_redis_queue, HandleResolutionWork};
85
85
/// use deadpool_redis::Config;
86
86
///
87
87
/// # async fn example() -> anyhow::Result<()> {
88
88
/// let cfg = Config::from_url("redis://localhost:6379");
89
89
/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
90
90
///
91
-
/// let queue = create_redis_queue::<String>(
91
+
/// let queue = create_redis_queue::<HandleResolutionWork>(
92
92
/// pool,
93
93
/// "worker-1".to_string(),
94
94
/// "queue:myapp:".to_string(),
···
104
104
timeout_seconds: u64,
105
105
) -> Arc<dyn QueueAdapter<T>>
106
106
where
107
-
T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
107
+
T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static,
108
108
{
109
109
Arc::new(RedisQueueAdapter::new(
110
110
pool,
111
111
worker_id,
112
112
key_prefix,
113
113
timeout_seconds,
114
+
))
115
+
}
116
+
117
+
/// Create a new Redis-backed queue adapter with deduplication.
118
+
///
119
+
/// This creates a distributed queue with deduplication to prevent duplicate items
120
+
/// from being queued within the specified TTL window.
121
+
///
122
+
/// # Arguments
123
+
///
124
+
/// * `pool` - Redis connection pool
125
+
/// * `worker_id` - Worker identifier for this queue instance
126
+
/// * `key_prefix` - Redis key prefix for queue operations
127
+
/// * `timeout_seconds` - Timeout for blocking operations
128
+
/// * `dedup_enabled` - Whether to enable deduplication
129
+
/// * `dedup_ttl` - TTL for deduplication keys in seconds
130
+
///
131
+
/// # Examples
132
+
///
133
+
/// ```no_run
134
+
/// use quickdid::queue::{create_redis_queue_with_dedup, HandleResolutionWork};
135
+
/// use deadpool_redis::Config;
136
+
///
137
+
/// # async fn example() -> anyhow::Result<()> {
138
+
/// let cfg = Config::from_url("redis://localhost:6379");
139
+
/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
140
+
///
141
+
/// let queue = create_redis_queue_with_dedup::<HandleResolutionWork>(
142
+
/// pool,
143
+
/// "worker-1".to_string(),
144
+
/// "queue:myapp:".to_string(),
145
+
/// 5,
146
+
/// true, // Enable deduplication
147
+
/// 60, // 60 second dedup window
148
+
/// );
149
+
/// # Ok(())
150
+
/// # }
151
+
/// ```
152
+
pub fn create_redis_queue_with_dedup<T>(
153
+
pool: RedisPool,
154
+
worker_id: String,
155
+
key_prefix: String,
156
+
timeout_seconds: u64,
157
+
dedup_enabled: bool,
158
+
dedup_ttl: u64,
159
+
) -> Arc<dyn QueueAdapter<T>>
160
+
where
161
+
T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static,
162
+
{
163
+
Arc::new(RedisQueueAdapter::with_dedup(
164
+
pool,
165
+
worker_id,
166
+
key_prefix,
167
+
timeout_seconds,
168
+
dedup_enabled,
169
+
dedup_ttl,
114
170
))
115
171
}
116
172
+2
-2
src/queue/mod.rs
+2
-2
src/queue/mod.rs
···
63
63
// Re-export core types
64
64
pub use adapter::QueueAdapter;
65
65
pub use error::{QueueError, Result};
66
-
pub use work::HandleResolutionWork;
66
+
pub use work::{DedupKey, HandleResolutionWork};
67
67
68
68
// Re-export implementations (with limited visibility)
69
69
pub use mpsc::MpscQueueAdapter;
···
74
74
// Re-export factory functions
75
75
pub use factory::{
76
76
create_mpsc_queue, create_mpsc_queue_from_channel, create_noop_queue, create_redis_queue,
77
-
create_sqlite_queue, create_sqlite_queue_with_max_size,
77
+
create_redis_queue_with_dedup, create_sqlite_queue, create_sqlite_queue_with_max_size,
78
78
};
+206
-5
src/queue/redis.rs
+206
-5
src/queue/redis.rs
···
10
10
11
11
use super::adapter::QueueAdapter;
12
12
use super::error::{QueueError, Result};
13
+
use super::work::DedupKey;
13
14
14
15
/// Redis-backed queue adapter implementation.
15
16
///
···
40
41
/// # Examples
41
42
///
42
43
/// ```no_run
43
-
/// use quickdid::queue::RedisQueueAdapter;
44
-
/// use quickdid::queue::QueueAdapter;
44
+
/// use quickdid::queue::{RedisQueueAdapter, QueueAdapter, HandleResolutionWork};
45
45
/// use deadpool_redis::Config;
46
46
///
47
47
/// # async fn example() -> anyhow::Result<()> {
···
50
50
/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
51
51
///
52
52
/// // Create queue adapter
53
-
/// let queue = RedisQueueAdapter::<String>::new(
53
+
/// let queue = RedisQueueAdapter::<HandleResolutionWork>::new(
54
54
/// pool,
55
55
/// "worker-1".to_string(),
56
56
/// "queue:myapp:".to_string(),
···
58
58
/// );
59
59
///
60
60
/// // Use the queue
61
-
/// queue.push("work-item".to_string()).await?;
61
+
/// let work = HandleResolutionWork::new("alice.bsky.social".to_string());
62
+
/// queue.push(work.clone()).await?;
62
63
/// if let Some(item) = queue.pull().await {
63
64
/// // Process item
64
65
/// queue.ack(&item).await?;
···
78
79
key_prefix: String,
79
80
/// Timeout for blocking RPOPLPUSH operations (in seconds)
80
81
timeout_seconds: u64,
82
+
/// Enable deduplication to prevent duplicate items in queue
83
+
dedup_enabled: bool,
84
+
/// TTL for deduplication keys in seconds
85
+
dedup_ttl: u64,
81
86
/// Type marker for generic parameter
82
87
_phantom: std::marker::PhantomData<T>,
83
88
}
···
120
125
key_prefix: String,
121
126
timeout_seconds: u64,
122
127
) -> Self {
128
+
Self::with_dedup(
129
+
pool,
130
+
worker_id,
131
+
key_prefix,
132
+
timeout_seconds,
133
+
false,
134
+
60, // Default TTL of 60 seconds
135
+
)
136
+
}
137
+
138
+
/// Create a new Redis queue adapter with deduplication settings.
139
+
///
140
+
/// # Arguments
141
+
///
142
+
/// * `pool` - Redis connection pool
143
+
/// * `worker_id` - Unique identifier for this worker instance
144
+
/// * `key_prefix` - Redis key prefix for queue operations
145
+
/// * `timeout_seconds` - Timeout for blocking pull operations
146
+
/// * `dedup_enabled` - Whether to enable deduplication
147
+
/// * `dedup_ttl` - TTL for deduplication keys in seconds
148
+
pub fn with_dedup(
149
+
pool: RedisPool,
150
+
worker_id: String,
151
+
key_prefix: String,
152
+
timeout_seconds: u64,
153
+
dedup_enabled: bool,
154
+
dedup_ttl: u64,
155
+
) -> Self {
123
156
Self {
124
157
pool,
125
158
worker_id,
126
159
key_prefix,
127
160
timeout_seconds,
161
+
dedup_enabled,
162
+
dedup_ttl,
128
163
_phantom: std::marker::PhantomData,
129
164
}
130
165
}
···
138
173
fn worker_queue_key(&self) -> String {
139
174
format!("{}{}", self.key_prefix, self.worker_id)
140
175
}
176
+
177
+
/// Get the deduplication key for an item.
178
+
/// This key is used to track if an item is already queued.
179
+
fn dedup_key(&self, item_id: &str) -> String {
180
+
format!("{}dedup:{}", self.key_prefix, item_id)
181
+
}
182
+
183
+
/// Check and mark an item for deduplication.
184
+
/// Returns true if the item was successfully marked (not duplicate),
185
+
/// false if it was already in the deduplication set (duplicate).
186
+
async fn check_and_mark_dedup(
187
+
&self,
188
+
conn: &mut deadpool_redis::Connection,
189
+
item_id: &str,
190
+
) -> Result<bool> {
191
+
if !self.dedup_enabled {
192
+
return Ok(true); // Always allow if dedup is disabled
193
+
}
194
+
195
+
let dedup_key = self.dedup_key(&item_id.to_string());
196
+
197
+
// Use SET NX EX to atomically set if not exists with expiry
198
+
// Returns OK if the key was set, Nil if it already existed
199
+
let result: Option<String> = deadpool_redis::redis::cmd("SET")
200
+
.arg(&dedup_key)
201
+
.arg("1")
202
+
.arg("NX") // Only set if not exists
203
+
.arg("EX") // Set expiry
204
+
.arg(self.dedup_ttl)
205
+
.query_async(conn)
206
+
.await
207
+
.map_err(|e| QueueError::RedisOperationFailed {
208
+
operation: "SET NX EX".to_string(),
209
+
details: e.to_string(),
210
+
})?;
211
+
212
+
// If result is Some("OK"), the key was set (not duplicate)
213
+
// If result is None, the key already existed (duplicate)
214
+
Ok(result.is_some())
215
+
}
141
216
}
142
217
143
218
#[async_trait]
144
219
impl<T> QueueAdapter<T> for RedisQueueAdapter<T>
145
220
where
146
-
T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
221
+
T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static,
147
222
{
148
223
async fn pull(&self) -> Option<T> {
149
224
match self.pool.get().await {
···
198
273
.get()
199
274
.await
200
275
.map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?;
276
+
277
+
// Check for deduplication if enabled
278
+
if self.dedup_enabled {
279
+
let dedup_id = work.dedup_key();
280
+
let is_new = self.check_and_mark_dedup(&mut conn, &dedup_id).await?;
281
+
282
+
if !is_new {
283
+
debug!(
284
+
dedup_key = %dedup_id,
285
+
"Item already queued, skipping duplicate"
286
+
);
287
+
return Ok(()); // Successfully deduplicated
288
+
}
289
+
}
201
290
202
291
let data = serde_json::to_vec(&work)
203
292
.map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
···
429
518
430
519
// Should be healthy if Redis is running
431
520
assert!(adapter.is_healthy().await);
521
+
}
522
+
523
+
#[tokio::test]
524
+
async fn test_redis_queue_deduplication() {
525
+
use crate::queue::HandleResolutionWork;
526
+
527
+
let pool = match crate::test_helpers::get_test_redis_pool() {
528
+
Some(p) => p,
529
+
None => {
530
+
eprintln!("Skipping Redis test - no Redis connection available");
531
+
return;
532
+
}
533
+
};
534
+
535
+
let test_prefix = format!(
536
+
"test:queue:dedup:{}:",
537
+
std::time::SystemTime::now()
538
+
.duration_since(std::time::UNIX_EPOCH)
539
+
.unwrap()
540
+
.as_nanos()
541
+
);
542
+
543
+
// Create adapter with deduplication enabled
544
+
let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup(
545
+
pool.clone(),
546
+
"test-worker-dedup".to_string(),
547
+
test_prefix.clone(),
548
+
1,
549
+
true, // Enable deduplication
550
+
2, // 2 second TTL for quick testing
551
+
);
552
+
553
+
let work = HandleResolutionWork::new("alice.example.com".to_string());
554
+
555
+
// First push should succeed
556
+
adapter.push(work.clone()).await.expect("First push should succeed");
557
+
558
+
// Second push of same item should be deduplicated (but still return Ok)
559
+
adapter.push(work.clone()).await.expect("Second push should succeed (deduplicated)");
560
+
561
+
// Queue should only have one item
562
+
let depth = adapter.depth().await;
563
+
assert_eq!(depth, Some(1), "Queue should only have one item after deduplication");
564
+
565
+
// Pull the item
566
+
let pulled = adapter.pull().await;
567
+
assert_eq!(pulled, Some(work.clone()));
568
+
569
+
// Queue should now be empty
570
+
let depth = adapter.depth().await;
571
+
assert_eq!(depth, Some(0), "Queue should be empty after pulling");
572
+
573
+
// Wait for dedup TTL to expire
574
+
tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
575
+
576
+
// Should be able to push again after TTL expires
577
+
adapter.push(work.clone()).await.expect("Push after TTL expiry should succeed");
578
+
579
+
let depth = adapter.depth().await;
580
+
assert_eq!(depth, Some(1), "Queue should have one item after TTL expiry");
581
+
}
582
+
583
+
#[tokio::test]
584
+
async fn test_redis_queue_deduplication_disabled() {
585
+
use crate::queue::HandleResolutionWork;
586
+
587
+
let pool = match crate::test_helpers::get_test_redis_pool() {
588
+
Some(p) => p,
589
+
None => {
590
+
eprintln!("Skipping Redis test - no Redis connection available");
591
+
return;
592
+
}
593
+
};
594
+
595
+
let test_prefix = format!(
596
+
"test:queue:nodedup:{}:",
597
+
std::time::SystemTime::now()
598
+
.duration_since(std::time::UNIX_EPOCH)
599
+
.unwrap()
600
+
.as_nanos()
601
+
);
602
+
603
+
// Create adapter with deduplication disabled
604
+
let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup(
605
+
pool.clone(),
606
+
"test-worker-nodedup".to_string(),
607
+
test_prefix.clone(),
608
+
1,
609
+
false, // Disable deduplication
610
+
60,
611
+
);
612
+
613
+
let work = HandleResolutionWork::new("bob.example.com".to_string());
614
+
615
+
// Push same item twice
616
+
adapter.push(work.clone()).await.expect("First push should succeed");
617
+
adapter.push(work.clone()).await.expect("Second push should succeed");
618
+
619
+
// Queue should have two items (no deduplication)
620
+
let depth = adapter.depth().await;
621
+
assert_eq!(depth, Some(2), "Queue should have two items when deduplication is disabled");
622
+
623
+
// Pull both items
624
+
let pulled1 = adapter.pull().await;
625
+
assert_eq!(pulled1, Some(work.clone()));
626
+
627
+
let pulled2 = adapter.pull().await;
628
+
assert_eq!(pulled2, Some(work.clone()));
629
+
630
+
// Queue should now be empty
631
+
let depth = adapter.depth().await;
632
+
assert_eq!(depth, Some(0), "Queue should be empty after pulling all items");
432
633
}
433
634
434
635
#[tokio::test]
+38
src/queue/work.rs
+38
src/queue/work.rs
···
50
50
}
51
51
}
52
52
53
+
/// Trait for getting a unique deduplication key from a work item.
54
+
/// This is used by the Redis queue adapter to prevent duplicate items.
55
+
pub trait DedupKey {
56
+
/// Get a unique key for deduplication purposes.
57
+
/// This should return a consistent identifier for equivalent work items.
58
+
fn dedup_key(&self) -> String;
59
+
}
60
+
61
+
impl DedupKey for HandleResolutionWork {
62
+
fn dedup_key(&self) -> String {
63
+
// Use the handle itself as the dedup key
64
+
self.handle.clone()
65
+
}
66
+
}
67
+
68
+
// For testing purposes, implement DedupKey for String
69
+
#[cfg(test)]
70
+
impl DedupKey for String {
71
+
fn dedup_key(&self) -> String {
72
+
self.clone()
73
+
}
74
+
}
75
+
53
76
#[cfg(test)]
54
77
mod tests {
55
78
use super::*;
···
91
114
92
115
assert_eq!(work1, work2);
93
116
assert_ne!(work1, work3);
117
+
}
118
+
119
+
#[test]
120
+
fn test_handle_resolution_work_dedup_key() {
121
+
let work1 = HandleResolutionWork::new("alice.example.com".to_string());
122
+
let work2 = HandleResolutionWork::new("alice.example.com".to_string());
123
+
let work3 = HandleResolutionWork::new("bob.example.com".to_string());
124
+
125
+
// Same handle should have same dedup key
126
+
assert_eq!(work1.dedup_key(), work2.dedup_key());
127
+
assert_eq!(work1.dedup_key(), "alice.example.com");
128
+
129
+
// Different handle should have different dedup key
130
+
assert_ne!(work1.dedup_key(), work3.dedup_key());
131
+
assert_eq!(work3.dedup_key(), "bob.example.com");
94
132
}
95
133
}