//! Redis-backed queue adapter implementation. //! //! This module provides a distributed queue implementation using Redis lists //! with a reliable queue pattern for at-least-once delivery semantics. use async_trait::async_trait; use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands}; use serde::{Deserialize, Serialize}; use tracing::{debug, error, warn}; use super::adapter::QueueAdapter; use super::error::{QueueError, Result}; use super::work::DedupKey; /// Redis-backed queue adapter implementation. /// /// This adapter uses Redis lists with a reliable queue pattern: /// - LPUSH to push items to the primary queue /// - BRPOPLPUSH to atomically move items from primary to worker queue /// - LREM to acknowledge processed items from worker queue /// /// This ensures at-least-once delivery semantics and allows for recovery /// of in-flight items if a worker crashes. /// /// # Features /// /// - Distributed operation across multiple instances /// - Persistent storage with Redis /// - At-least-once delivery guarantees /// - Automatic recovery of failed items /// - Configurable timeouts /// /// # Architecture /// /// ```text /// Producer -> [Primary Queue] -> BRPOPLPUSH -> [Worker Queue] -> Consumer /// | /// LREM (on ack) /// ``` /// /// # Examples /// /// ```no_run /// use quickdid::queue::{RedisQueueAdapter, QueueAdapter, HandleResolutionWork}; /// use deadpool_redis::Config; /// /// # async fn example() -> anyhow::Result<()> { /// // Create Redis pool /// let cfg = Config::from_url("redis://localhost:6379"); /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; /// /// // Create queue adapter /// let queue = RedisQueueAdapter::::new( /// pool, /// "worker-1".to_string(), /// "queue:myapp:".to_string(), /// 5, // 5 second timeout /// ); /// /// // Use the queue /// let work = HandleResolutionWork::new("alice.bsky.social".to_string()); /// queue.push(work.clone()).await?; /// if let Some(item) = queue.pull().await { /// // Process item /// queue.ack(&item).await?; /// } /// # Ok(()) /// # } /// ``` pub struct RedisQueueAdapter where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, { /// Redis connection pool pool: RedisPool, /// Unique worker ID for this adapter instance worker_id: String, /// Key prefix for all queues (default: "queue:handleresolver:") key_prefix: String, /// Timeout for blocking RPOPLPUSH operations (in seconds) timeout_seconds: u64, /// Enable deduplication to prevent duplicate items in queue dedup_enabled: bool, /// TTL for deduplication keys in seconds dedup_ttl: u64, /// Type marker for generic parameter _phantom: std::marker::PhantomData, } impl RedisQueueAdapter where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, { /// Create a new Redis queue adapter. /// /// # Arguments /// /// * `pool` - Redis connection pool /// * `worker_id` - Unique identifier for this worker instance /// * `key_prefix` - Redis key prefix for queue operations /// * `timeout_seconds` - Timeout for blocking pull operations /// /// # Examples /// /// ```no_run /// use quickdid::queue::RedisQueueAdapter; /// use deadpool_redis::Config; /// /// # async fn example() -> anyhow::Result<()> { /// let cfg = Config::from_url("redis://localhost:6379"); /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; /// /// let queue = RedisQueueAdapter::::new( /// pool, /// "worker-1".to_string(), /// "queue:myapp:".to_string(), /// 5, /// ); /// # Ok(()) /// # } /// ``` pub fn new( pool: RedisPool, worker_id: String, key_prefix: String, timeout_seconds: u64, ) -> Self { Self::with_dedup( pool, worker_id, key_prefix, timeout_seconds, false, 60, // Default TTL of 60 seconds ) } /// Create a new Redis queue adapter with deduplication settings. /// /// # Arguments /// /// * `pool` - Redis connection pool /// * `worker_id` - Unique identifier for this worker instance /// * `key_prefix` - Redis key prefix for queue operations /// * `timeout_seconds` - Timeout for blocking pull operations /// * `dedup_enabled` - Whether to enable deduplication /// * `dedup_ttl` - TTL for deduplication keys in seconds pub fn with_dedup( pool: RedisPool, worker_id: String, key_prefix: String, timeout_seconds: u64, dedup_enabled: bool, dedup_ttl: u64, ) -> Self { Self { pool, worker_id, key_prefix, timeout_seconds, dedup_enabled, dedup_ttl, _phantom: std::marker::PhantomData, } } /// Get the primary queue key. fn primary_queue_key(&self) -> String { format!("{}primary", self.key_prefix) } /// Get the worker-specific temporary queue key. fn worker_queue_key(&self) -> String { format!("{}{}", self.key_prefix, self.worker_id) } /// Get the deduplication key for an item. /// This key is used to track if an item is already queued. fn dedup_key(&self, item_id: &str) -> String { format!("{}dedup:{}", self.key_prefix, item_id) } /// Check and mark an item for deduplication. /// Returns true if the item was successfully marked (not duplicate), /// false if it was already in the deduplication set (duplicate). async fn check_and_mark_dedup( &self, conn: &mut deadpool_redis::Connection, item_id: &str, ) -> Result { if !self.dedup_enabled { return Ok(true); // Always allow if dedup is disabled } let dedup_key = self.dedup_key(item_id); // Use SET NX EX to atomically set if not exists with expiry // Returns OK if the key was set, Nil if it already existed let result: Option = deadpool_redis::redis::cmd("SET") .arg(&dedup_key) .arg("1") .arg("NX") // Only set if not exists .arg("EX") // Set expiry .arg(self.dedup_ttl) .query_async(conn) .await .map_err(|e| QueueError::RedisOperationFailed { operation: "SET NX EX".to_string(), details: e.to_string(), })?; // If result is Some("OK"), the key was set (not duplicate) // If result is None, the key already existed (duplicate) Ok(result.is_some()) } } #[async_trait] impl QueueAdapter for RedisQueueAdapter where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static, { async fn pull(&self) -> Option { match self.pool.get().await { Ok(mut conn) => { let primary_key = self.primary_queue_key(); let worker_key = self.worker_queue_key(); // Use blocking RPOPLPUSH to atomically move item from primary to worker queue let data: Option> = match conn .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64) .await { Ok(data) => data, Err(e) => { error!("Failed to pull from queue: {}", e); return None; } }; if let Some(data) = data { // Deserialize the item match serde_json::from_slice(&data) { Ok(item) => { debug!( worker_id = %self.worker_id, "Pulled item from queue" ); Some(item) } Err(e) => { error!("Failed to deserialize item: {}", e); // Remove the corrupted item from worker queue let _: std::result::Result<(), _> = conn.lrem(&worker_key, 1, &data).await; None } } } else { None } } Err(e) => { error!("Failed to get Redis connection: {}", e); None } } } async fn push(&self, work: T) -> Result<()> { let mut conn = self .pool .get() .await .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; // Check for deduplication if enabled if self.dedup_enabled { let dedup_id = work.dedup_key(); let is_new = self.check_and_mark_dedup(&mut conn, &dedup_id).await?; if !is_new { debug!( dedup_key = %dedup_id, "Item already queued, skipping duplicate" ); return Ok(()); // Successfully deduplicated } } let data = serde_json::to_vec(&work) .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; let primary_key = self.primary_queue_key(); conn.lpush::<_, _, ()>(&primary_key, data) .await .map_err(|e| QueueError::RedisOperationFailed { operation: "LPUSH".to_string(), details: e.to_string(), })?; debug!("Pushed item to queue"); Ok(()) } async fn ack(&self, item: &T) -> Result<()> { let mut conn = self .pool .get() .await .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; let data = serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?; let worker_key = self.worker_queue_key(); // Remove exactly one occurrence of this item from the worker queue let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| { QueueError::RedisOperationFailed { operation: "LREM".to_string(), details: e.to_string(), } })?; if removed == 0 { warn!( worker_id = %self.worker_id, "Item not found in worker queue during acknowledgment" ); } else { debug!( worker_id = %self.worker_id, "Acknowledged item" ); } Ok(()) } async fn depth(&self) -> Option { match self.pool.get().await { Ok(mut conn) => { let primary_key = self.primary_queue_key(); match conn.llen::<_, usize>(&primary_key).await { Ok(len) => Some(len), Err(e) => { error!("Failed to get queue depth: {}", e); None } } } Err(e) => { error!("Failed to get Redis connection: {}", e); None } } } async fn is_healthy(&self) -> bool { match self.pool.get().await { Ok(mut conn) => { // Ping Redis to check health match deadpool_redis::redis::cmd("PING") .query_async::(&mut conn) .await { Ok(response) => response == "PONG", Err(_) => false, } } Err(_) => false, } } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_redis_queue_push_pull() { let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; // Create adapter with unique prefix for testing let test_prefix = format!( "test:queue:{}:", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos() ); let adapter = RedisQueueAdapter::::new( pool.clone(), "test-worker".to_string(), test_prefix.clone(), 1, // 1 second timeout for tests ); // Test push adapter.push("test-item".to_string()).await.unwrap(); // Test pull let pulled = adapter.pull().await; assert_eq!(pulled, Some("test-item".to_string())); // Test ack adapter .ack(&"test-item".to_string()) .await .expect("Ack should succeed"); } #[tokio::test] async fn test_redis_queue_reliable_delivery() { let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; let test_prefix = format!( "test:queue:{}:", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos() ); let worker_id = "test-worker-reliable"; // Create adapter let adapter1 = RedisQueueAdapter::::new( pool.clone(), worker_id.to_string(), test_prefix.clone(), 1, ); // Push multiple items adapter1.push("item1".to_string()).await.unwrap(); adapter1.push("item2".to_string()).await.unwrap(); adapter1.push("item3".to_string()).await.unwrap(); // Pull but don't ack (simulating worker crash) let item1 = adapter1.pull().await; assert_eq!(item1, Some("item1".to_string())); // Item should be in worker queue // In production, a recovery process would handle unacked items // For this test, we verify the item is in the worker queue let item2 = adapter1.pull().await; assert_eq!(item2, Some("item2".to_string())); // Ack the second item adapter1.ack(&"item2".to_string()).await.unwrap(); } #[tokio::test] async fn test_redis_queue_depth() { let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; let test_prefix = format!( "test:queue:{}:", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos() ); let adapter = RedisQueueAdapter::::new(pool, "test-worker-depth".to_string(), test_prefix, 1); // Initially empty assert_eq!(adapter.depth().await, Some(0)); // Push items and check depth adapter.push("item1".to_string()).await.unwrap(); assert_eq!(adapter.depth().await, Some(1)); adapter.push("item2".to_string()).await.unwrap(); assert_eq!(adapter.depth().await, Some(2)); // Pull and check depth (note: depth checks primary queue) let _ = adapter.pull().await; assert_eq!(adapter.depth().await, Some(1)); } #[tokio::test] async fn test_redis_queue_health() { let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; let adapter = RedisQueueAdapter::::new( pool, "test-worker-health".to_string(), "test:queue:health:".to_string(), 1, ); // Should be healthy if Redis is running assert!(adapter.is_healthy().await); } #[tokio::test] async fn test_redis_queue_deduplication() { use crate::queue::HandleResolutionWork; let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; let test_prefix = format!( "test:queue:dedup:{}:", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos() ); // Create adapter with deduplication enabled let adapter = RedisQueueAdapter::::with_dedup( pool.clone(), "test-worker-dedup".to_string(), test_prefix.clone(), 1, true, // Enable deduplication 2, // 2 second TTL for quick testing ); let work = HandleResolutionWork::new("alice.example.com".to_string()); // First push should succeed adapter .push(work.clone()) .await .expect("First push should succeed"); // Second push of same item should be deduplicated (but still return Ok) adapter .push(work.clone()) .await .expect("Second push should succeed (deduplicated)"); // Queue should only have one item let depth = adapter.depth().await; assert_eq!( depth, Some(1), "Queue should only have one item after deduplication" ); // Pull the item let pulled = adapter.pull().await; assert_eq!(pulled, Some(work.clone())); // Queue should now be empty let depth = adapter.depth().await; assert_eq!(depth, Some(0), "Queue should be empty after pulling"); // Wait for dedup TTL to expire tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; // Should be able to push again after TTL expires adapter .push(work.clone()) .await .expect("Push after TTL expiry should succeed"); let depth = adapter.depth().await; assert_eq!( depth, Some(1), "Queue should have one item after TTL expiry" ); } #[tokio::test] async fn test_redis_queue_deduplication_disabled() { use crate::queue::HandleResolutionWork; let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; let test_prefix = format!( "test:queue:nodedup:{}:", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos() ); // Create adapter with deduplication disabled let adapter = RedisQueueAdapter::::with_dedup( pool.clone(), "test-worker-nodedup".to_string(), test_prefix.clone(), 1, false, // Disable deduplication 60, ); let work = HandleResolutionWork::new("bob.example.com".to_string()); // Push same item twice adapter .push(work.clone()) .await .expect("First push should succeed"); adapter .push(work.clone()) .await .expect("Second push should succeed"); // Queue should have two items (no deduplication) let depth = adapter.depth().await; assert_eq!( depth, Some(2), "Queue should have two items when deduplication is disabled" ); // Pull both items let pulled1 = adapter.pull().await; assert_eq!(pulled1, Some(work.clone())); let pulled2 = adapter.pull().await; assert_eq!(pulled2, Some(work.clone())); // Queue should now be empty let depth = adapter.depth().await; assert_eq!( depth, Some(0), "Queue should be empty after pulling all items" ); } #[tokio::test] async fn test_redis_queue_serialization() { use crate::queue::HandleResolutionWork; let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; let test_prefix = format!( "test:queue:{}:", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos() ); let adapter = RedisQueueAdapter::::new( pool, "test-worker-ser".to_string(), test_prefix, 1, ); let work = HandleResolutionWork::new("alice.example.com".to_string()); // Push and pull adapter.push(work.clone()).await.unwrap(); let pulled = adapter.pull().await; assert_eq!(pulled, Some(work.clone())); // Ack adapter.ack(&work).await.unwrap(); } }