forked from
smokesignal.events/quickdid
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Redis-backed queue adapter implementation.
2//!
3//! This module provides a distributed queue implementation using Redis lists
4//! with a reliable queue pattern for at-least-once delivery semantics.
5
6use async_trait::async_trait;
7use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands};
8use serde::{Deserialize, Serialize};
9use tracing::{debug, error, warn};
10
11use super::adapter::QueueAdapter;
12use super::error::{QueueError, Result};
13use super::work::DedupKey;
14
15/// Redis-backed queue adapter implementation.
16///
17/// This adapter uses Redis lists with a reliable queue pattern:
18/// - LPUSH to push items to the primary queue
19/// - BRPOPLPUSH to atomically move items from primary to worker queue
20/// - LREM to acknowledge processed items from worker queue
21///
22/// This ensures at-least-once delivery semantics and allows for recovery
23/// of in-flight items if a worker crashes.
24///
25/// # Features
26///
27/// - Distributed operation across multiple instances
28/// - Persistent storage with Redis
29/// - At-least-once delivery guarantees
30/// - Automatic recovery of failed items
31/// - Configurable timeouts
32///
33/// # Architecture
34///
35/// ```text
36/// Producer -> [Primary Queue] -> BRPOPLPUSH -> [Worker Queue] -> Consumer
37/// |
38/// LREM (on ack)
39/// ```
40///
41/// # Examples
42///
43/// ```no_run
44/// use quickdid::queue::{RedisQueueAdapter, QueueAdapter, HandleResolutionWork};
45/// use deadpool_redis::Config;
46///
47/// # async fn example() -> anyhow::Result<()> {
48/// // Create Redis pool
49/// let cfg = Config::from_url("redis://localhost:6379");
50/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
51///
52/// // Create queue adapter
53/// let queue = RedisQueueAdapter::<HandleResolutionWork>::new(
54/// pool,
55/// "worker-1".to_string(),
56/// "queue:myapp:".to_string(),
57/// 5, // 5 second timeout
58/// );
59///
60/// // Use the queue
61/// let work = HandleResolutionWork::new("alice.bsky.social".to_string());
62/// queue.push(work.clone()).await?;
63/// if let Some(item) = queue.pull().await {
64/// // Process item
65/// queue.ack(&item).await?;
66/// }
67/// # Ok(())
68/// # }
69/// ```
70pub struct RedisQueueAdapter<T>
71where
72 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
73{
74 /// Redis connection pool
75 pool: RedisPool,
76 /// Unique worker ID for this adapter instance
77 worker_id: String,
78 /// Key prefix for all queues (default: "queue:handleresolver:")
79 key_prefix: String,
80 /// Timeout for blocking RPOPLPUSH operations (in seconds)
81 timeout_seconds: u64,
82 /// Enable deduplication to prevent duplicate items in queue
83 dedup_enabled: bool,
84 /// TTL for deduplication keys in seconds
85 dedup_ttl: u64,
86 /// Type marker for generic parameter
87 _phantom: std::marker::PhantomData<T>,
88}
89
90impl<T> RedisQueueAdapter<T>
91where
92 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
93{
94 /// Create a new Redis queue adapter.
95 ///
96 /// # Arguments
97 ///
98 /// * `pool` - Redis connection pool
99 /// * `worker_id` - Unique identifier for this worker instance
100 /// * `key_prefix` - Redis key prefix for queue operations
101 /// * `timeout_seconds` - Timeout for blocking pull operations
102 ///
103 /// # Examples
104 ///
105 /// ```no_run
106 /// use quickdid::queue::RedisQueueAdapter;
107 /// use deadpool_redis::Config;
108 ///
109 /// # async fn example() -> anyhow::Result<()> {
110 /// let cfg = Config::from_url("redis://localhost:6379");
111 /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
112 ///
113 /// let queue = RedisQueueAdapter::<String>::new(
114 /// pool,
115 /// "worker-1".to_string(),
116 /// "queue:myapp:".to_string(),
117 /// 5,
118 /// );
119 /// # Ok(())
120 /// # }
121 /// ```
122 pub fn new(
123 pool: RedisPool,
124 worker_id: String,
125 key_prefix: String,
126 timeout_seconds: u64,
127 ) -> Self {
128 Self::with_dedup(
129 pool,
130 worker_id,
131 key_prefix,
132 timeout_seconds,
133 false,
134 60, // Default TTL of 60 seconds
135 )
136 }
137
138 /// Create a new Redis queue adapter with deduplication settings.
139 ///
140 /// # Arguments
141 ///
142 /// * `pool` - Redis connection pool
143 /// * `worker_id` - Unique identifier for this worker instance
144 /// * `key_prefix` - Redis key prefix for queue operations
145 /// * `timeout_seconds` - Timeout for blocking pull operations
146 /// * `dedup_enabled` - Whether to enable deduplication
147 /// * `dedup_ttl` - TTL for deduplication keys in seconds
148 pub fn with_dedup(
149 pool: RedisPool,
150 worker_id: String,
151 key_prefix: String,
152 timeout_seconds: u64,
153 dedup_enabled: bool,
154 dedup_ttl: u64,
155 ) -> Self {
156 Self {
157 pool,
158 worker_id,
159 key_prefix,
160 timeout_seconds,
161 dedup_enabled,
162 dedup_ttl,
163 _phantom: std::marker::PhantomData,
164 }
165 }
166
167 /// Get the primary queue key.
168 fn primary_queue_key(&self) -> String {
169 format!("{}primary", self.key_prefix)
170 }
171
172 /// Get the worker-specific temporary queue key.
173 fn worker_queue_key(&self) -> String {
174 format!("{}{}", self.key_prefix, self.worker_id)
175 }
176
177 /// Get the deduplication key for an item.
178 /// This key is used to track if an item is already queued.
179 fn dedup_key(&self, item_id: &str) -> String {
180 format!("{}dedup:{}", self.key_prefix, item_id)
181 }
182
183 /// Check and mark an item for deduplication.
184 /// Returns true if the item was successfully marked (not duplicate),
185 /// false if it was already in the deduplication set (duplicate).
186 async fn check_and_mark_dedup(
187 &self,
188 conn: &mut deadpool_redis::Connection,
189 item_id: &str,
190 ) -> Result<bool> {
191 if !self.dedup_enabled {
192 return Ok(true); // Always allow if dedup is disabled
193 }
194
195 let dedup_key = self.dedup_key(item_id);
196
197 // Use SET NX EX to atomically set if not exists with expiry
198 // Returns OK if the key was set, Nil if it already existed
199 let result: Option<String> = deadpool_redis::redis::cmd("SET")
200 .arg(&dedup_key)
201 .arg("1")
202 .arg("NX") // Only set if not exists
203 .arg("EX") // Set expiry
204 .arg(self.dedup_ttl)
205 .query_async(conn)
206 .await
207 .map_err(|e| QueueError::RedisOperationFailed {
208 operation: "SET NX EX".to_string(),
209 details: e.to_string(),
210 })?;
211
212 // If result is Some("OK"), the key was set (not duplicate)
213 // If result is None, the key already existed (duplicate)
214 Ok(result.is_some())
215 }
216}
217
218#[async_trait]
219impl<T> QueueAdapter<T> for RedisQueueAdapter<T>
220where
221 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static,
222{
223 async fn pull(&self) -> Option<T> {
224 match self.pool.get().await {
225 Ok(mut conn) => {
226 let primary_key = self.primary_queue_key();
227 let worker_key = self.worker_queue_key();
228
229 // Use blocking RPOPLPUSH to atomically move item from primary to worker queue
230 let data: Option<Vec<u8>> = match conn
231 .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64)
232 .await
233 {
234 Ok(data) => data,
235 Err(e) => {
236 error!("Failed to pull from queue: {}", e);
237 return None;
238 }
239 };
240
241 if let Some(data) = data {
242 // Deserialize the item
243 match serde_json::from_slice(&data) {
244 Ok(item) => {
245 debug!(
246 worker_id = %self.worker_id,
247 "Pulled item from queue"
248 );
249 Some(item)
250 }
251 Err(e) => {
252 error!("Failed to deserialize item: {}", e);
253 // Remove the corrupted item from worker queue
254 let _: std::result::Result<(), _> =
255 conn.lrem(&worker_key, 1, &data).await;
256 None
257 }
258 }
259 } else {
260 None
261 }
262 }
263 Err(e) => {
264 error!("Failed to get Redis connection: {}", e);
265 None
266 }
267 }
268 }
269
270 async fn push(&self, work: T) -> Result<()> {
271 let mut conn = self
272 .pool
273 .get()
274 .await
275 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?;
276
277 // Check for deduplication if enabled
278 if self.dedup_enabled {
279 let dedup_id = work.dedup_key();
280 let is_new = self.check_and_mark_dedup(&mut conn, &dedup_id).await?;
281
282 if !is_new {
283 debug!(
284 dedup_key = %dedup_id,
285 "Item already queued, skipping duplicate"
286 );
287 return Ok(()); // Successfully deduplicated
288 }
289 }
290
291 let data = serde_json::to_vec(&work)
292 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
293
294 let primary_key = self.primary_queue_key();
295
296 conn.lpush::<_, _, ()>(&primary_key, data)
297 .await
298 .map_err(|e| QueueError::RedisOperationFailed {
299 operation: "LPUSH".to_string(),
300 details: e.to_string(),
301 })?;
302
303 debug!("Pushed item to queue");
304 Ok(())
305 }
306
307 async fn ack(&self, item: &T) -> Result<()> {
308 let mut conn = self
309 .pool
310 .get()
311 .await
312 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?;
313
314 let data =
315 serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
316
317 let worker_key = self.worker_queue_key();
318
319 // Remove exactly one occurrence of this item from the worker queue
320 let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| {
321 QueueError::RedisOperationFailed {
322 operation: "LREM".to_string(),
323 details: e.to_string(),
324 }
325 })?;
326
327 if removed == 0 {
328 warn!(
329 worker_id = %self.worker_id,
330 "Item not found in worker queue during acknowledgment"
331 );
332 } else {
333 debug!(
334 worker_id = %self.worker_id,
335 "Acknowledged item"
336 );
337 }
338
339 Ok(())
340 }
341
342 async fn depth(&self) -> Option<usize> {
343 match self.pool.get().await {
344 Ok(mut conn) => {
345 let primary_key = self.primary_queue_key();
346 match conn.llen::<_, usize>(&primary_key).await {
347 Ok(len) => Some(len),
348 Err(e) => {
349 error!("Failed to get queue depth: {}", e);
350 None
351 }
352 }
353 }
354 Err(e) => {
355 error!("Failed to get Redis connection: {}", e);
356 None
357 }
358 }
359 }
360
361 async fn is_healthy(&self) -> bool {
362 match self.pool.get().await {
363 Ok(mut conn) => {
364 // Ping Redis to check health
365 match deadpool_redis::redis::cmd("PING")
366 .query_async::<String>(&mut conn)
367 .await
368 {
369 Ok(response) => response == "PONG",
370 Err(_) => false,
371 }
372 }
373 Err(_) => false,
374 }
375 }
376}
377
378#[cfg(test)]
379mod tests {
380 use super::*;
381
382 #[tokio::test]
383 async fn test_redis_queue_push_pull() {
384 let pool = match crate::test_helpers::get_test_redis_pool() {
385 Some(p) => p,
386 None => {
387 eprintln!("Skipping Redis test - no Redis connection available");
388 return;
389 }
390 };
391
392 // Create adapter with unique prefix for testing
393 let test_prefix = format!(
394 "test:queue:{}:",
395 std::time::SystemTime::now()
396 .duration_since(std::time::UNIX_EPOCH)
397 .unwrap()
398 .as_nanos()
399 );
400 let adapter = RedisQueueAdapter::<String>::new(
401 pool.clone(),
402 "test-worker".to_string(),
403 test_prefix.clone(),
404 1, // 1 second timeout for tests
405 );
406
407 // Test push
408 adapter.push("test-item".to_string()).await.unwrap();
409
410 // Test pull
411 let pulled = adapter.pull().await;
412 assert_eq!(pulled, Some("test-item".to_string()));
413
414 // Test ack
415 adapter
416 .ack(&"test-item".to_string())
417 .await
418 .expect("Ack should succeed");
419 }
420
421 #[tokio::test]
422 async fn test_redis_queue_reliable_delivery() {
423 let pool = match crate::test_helpers::get_test_redis_pool() {
424 Some(p) => p,
425 None => {
426 eprintln!("Skipping Redis test - no Redis connection available");
427 return;
428 }
429 };
430
431 let test_prefix = format!(
432 "test:queue:{}:",
433 std::time::SystemTime::now()
434 .duration_since(std::time::UNIX_EPOCH)
435 .unwrap()
436 .as_nanos()
437 );
438 let worker_id = "test-worker-reliable";
439
440 // Create adapter
441 let adapter1 = RedisQueueAdapter::<String>::new(
442 pool.clone(),
443 worker_id.to_string(),
444 test_prefix.clone(),
445 1,
446 );
447
448 // Push multiple items
449 adapter1.push("item1".to_string()).await.unwrap();
450 adapter1.push("item2".to_string()).await.unwrap();
451 adapter1.push("item3".to_string()).await.unwrap();
452
453 // Pull but don't ack (simulating worker crash)
454 let item1 = adapter1.pull().await;
455 assert_eq!(item1, Some("item1".to_string()));
456
457 // Item should be in worker queue
458 // In production, a recovery process would handle unacked items
459 // For this test, we verify the item is in the worker queue
460 let item2 = adapter1.pull().await;
461 assert_eq!(item2, Some("item2".to_string()));
462
463 // Ack the second item
464 adapter1.ack(&"item2".to_string()).await.unwrap();
465 }
466
467 #[tokio::test]
468 async fn test_redis_queue_depth() {
469 let pool = match crate::test_helpers::get_test_redis_pool() {
470 Some(p) => p,
471 None => {
472 eprintln!("Skipping Redis test - no Redis connection available");
473 return;
474 }
475 };
476
477 let test_prefix = format!(
478 "test:queue:{}:",
479 std::time::SystemTime::now()
480 .duration_since(std::time::UNIX_EPOCH)
481 .unwrap()
482 .as_nanos()
483 );
484 let adapter =
485 RedisQueueAdapter::<String>::new(pool, "test-worker-depth".to_string(), test_prefix, 1);
486
487 // Initially empty
488 assert_eq!(adapter.depth().await, Some(0));
489
490 // Push items and check depth
491 adapter.push("item1".to_string()).await.unwrap();
492 assert_eq!(adapter.depth().await, Some(1));
493
494 adapter.push("item2".to_string()).await.unwrap();
495 assert_eq!(adapter.depth().await, Some(2));
496
497 // Pull and check depth (note: depth checks primary queue)
498 let _ = adapter.pull().await;
499 assert_eq!(adapter.depth().await, Some(1));
500 }
501
502 #[tokio::test]
503 async fn test_redis_queue_health() {
504 let pool = match crate::test_helpers::get_test_redis_pool() {
505 Some(p) => p,
506 None => {
507 eprintln!("Skipping Redis test - no Redis connection available");
508 return;
509 }
510 };
511
512 let adapter = RedisQueueAdapter::<String>::new(
513 pool,
514 "test-worker-health".to_string(),
515 "test:queue:health:".to_string(),
516 1,
517 );
518
519 // Should be healthy if Redis is running
520 assert!(adapter.is_healthy().await);
521 }
522
523 #[tokio::test]
524 async fn test_redis_queue_deduplication() {
525 use crate::queue::HandleResolutionWork;
526
527 let pool = match crate::test_helpers::get_test_redis_pool() {
528 Some(p) => p,
529 None => {
530 eprintln!("Skipping Redis test - no Redis connection available");
531 return;
532 }
533 };
534
535 let test_prefix = format!(
536 "test:queue:dedup:{}:",
537 std::time::SystemTime::now()
538 .duration_since(std::time::UNIX_EPOCH)
539 .unwrap()
540 .as_nanos()
541 );
542
543 // Create adapter with deduplication enabled
544 let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup(
545 pool.clone(),
546 "test-worker-dedup".to_string(),
547 test_prefix.clone(),
548 1,
549 true, // Enable deduplication
550 2, // 2 second TTL for quick testing
551 );
552
553 let work = HandleResolutionWork::new("alice.example.com".to_string());
554
555 // First push should succeed
556 adapter
557 .push(work.clone())
558 .await
559 .expect("First push should succeed");
560
561 // Second push of same item should be deduplicated (but still return Ok)
562 adapter
563 .push(work.clone())
564 .await
565 .expect("Second push should succeed (deduplicated)");
566
567 // Queue should only have one item
568 let depth = adapter.depth().await;
569 assert_eq!(
570 depth,
571 Some(1),
572 "Queue should only have one item after deduplication"
573 );
574
575 // Pull the item
576 let pulled = adapter.pull().await;
577 assert_eq!(pulled, Some(work.clone()));
578
579 // Queue should now be empty
580 let depth = adapter.depth().await;
581 assert_eq!(depth, Some(0), "Queue should be empty after pulling");
582
583 // Wait for dedup TTL to expire
584 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await;
585
586 // Should be able to push again after TTL expires
587 adapter
588 .push(work.clone())
589 .await
590 .expect("Push after TTL expiry should succeed");
591
592 let depth = adapter.depth().await;
593 assert_eq!(
594 depth,
595 Some(1),
596 "Queue should have one item after TTL expiry"
597 );
598 }
599
600 #[tokio::test]
601 async fn test_redis_queue_deduplication_disabled() {
602 use crate::queue::HandleResolutionWork;
603
604 let pool = match crate::test_helpers::get_test_redis_pool() {
605 Some(p) => p,
606 None => {
607 eprintln!("Skipping Redis test - no Redis connection available");
608 return;
609 }
610 };
611
612 let test_prefix = format!(
613 "test:queue:nodedup:{}:",
614 std::time::SystemTime::now()
615 .duration_since(std::time::UNIX_EPOCH)
616 .unwrap()
617 .as_nanos()
618 );
619
620 // Create adapter with deduplication disabled
621 let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup(
622 pool.clone(),
623 "test-worker-nodedup".to_string(),
624 test_prefix.clone(),
625 1,
626 false, // Disable deduplication
627 60,
628 );
629
630 let work = HandleResolutionWork::new("bob.example.com".to_string());
631
632 // Push same item twice
633 adapter
634 .push(work.clone())
635 .await
636 .expect("First push should succeed");
637 adapter
638 .push(work.clone())
639 .await
640 .expect("Second push should succeed");
641
642 // Queue should have two items (no deduplication)
643 let depth = adapter.depth().await;
644 assert_eq!(
645 depth,
646 Some(2),
647 "Queue should have two items when deduplication is disabled"
648 );
649
650 // Pull both items
651 let pulled1 = adapter.pull().await;
652 assert_eq!(pulled1, Some(work.clone()));
653
654 let pulled2 = adapter.pull().await;
655 assert_eq!(pulled2, Some(work.clone()));
656
657 // Queue should now be empty
658 let depth = adapter.depth().await;
659 assert_eq!(
660 depth,
661 Some(0),
662 "Queue should be empty after pulling all items"
663 );
664 }
665
666 #[tokio::test]
667 async fn test_redis_queue_serialization() {
668 use crate::queue::HandleResolutionWork;
669
670 let pool = match crate::test_helpers::get_test_redis_pool() {
671 Some(p) => p,
672 None => {
673 eprintln!("Skipping Redis test - no Redis connection available");
674 return;
675 }
676 };
677
678 let test_prefix = format!(
679 "test:queue:{}:",
680 std::time::SystemTime::now()
681 .duration_since(std::time::UNIX_EPOCH)
682 .unwrap()
683 .as_nanos()
684 );
685 let adapter = RedisQueueAdapter::<HandleResolutionWork>::new(
686 pool,
687 "test-worker-ser".to_string(),
688 test_prefix,
689 1,
690 );
691
692 let work = HandleResolutionWork::new("alice.example.com".to_string());
693
694 // Push and pull
695 adapter.push(work.clone()).await.unwrap();
696 let pulled = adapter.pull().await;
697 assert_eq!(pulled, Some(work.clone()));
698
699 // Ack
700 adapter.ack(&work).await.unwrap();
701 }
702}