//! Factory functions for creating queue adapters. //! //! This module provides convenient factory functions for creating different //! types of queue adapters with appropriate configurations. use deadpool_redis::Pool as RedisPool; use serde::{Deserialize, Serialize}; use std::sync::Arc; use tokio::sync::mpsc; use super::{ adapter::QueueAdapter, mpsc::MpscQueueAdapter, noop::NoopQueueAdapter, redis::RedisQueueAdapter, sqlite::SqliteQueueAdapter, work::DedupKey, }; // ========= MPSC Queue Factories ========= /// Create a new MPSC queue adapter with the specified buffer size. /// /// This creates an in-memory queue suitable for single-instance deployments. /// /// # Arguments /// /// * `buffer` - The buffer size for the channel /// /// # Examples /// /// ``` /// use quickdid::queue::create_mpsc_queue; /// /// let queue = create_mpsc_queue::(100); /// ``` pub fn create_mpsc_queue(buffer: usize) -> Arc> where T: Send + Sync + 'static, { Arc::new(MpscQueueAdapter::new(buffer)) } /// Create an MPSC queue adapter from existing channels. /// /// This allows integration with existing channel-based architectures. /// /// # Arguments /// /// * `sender` - The sender half of the channel /// * `receiver` - The receiver half of the channel /// /// # Examples /// /// ``` /// use tokio::sync::mpsc; /// use quickdid::queue::create_mpsc_queue_from_channel; /// /// let (sender, receiver) = mpsc::channel::(50); /// let queue = create_mpsc_queue_from_channel(sender, receiver); /// ``` pub fn create_mpsc_queue_from_channel( sender: mpsc::Sender, receiver: mpsc::Receiver, ) -> Arc> where T: Send + Sync + 'static, { Arc::new(MpscQueueAdapter::from_channel(sender, receiver)) } // ========= Redis Queue Factories ========= /// Create a new Redis-backed queue adapter. /// /// This creates a distributed queue suitable for multi-instance deployments. /// /// # Arguments /// /// * `pool` - Redis connection pool /// * `worker_id` - Worker identifier for this queue instance /// * `key_prefix` - Redis key prefix for queue operations /// * `timeout_seconds` - Timeout for blocking operations /// /// # Examples /// /// ```no_run /// use quickdid::queue::{create_redis_queue, HandleResolutionWork}; /// use deadpool_redis::Config; /// /// # async fn example() -> anyhow::Result<()> { /// let cfg = Config::from_url("redis://localhost:6379"); /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; /// /// let queue = create_redis_queue::( /// pool, /// "worker-1".to_string(), /// "queue:myapp:".to_string(), /// 5, /// ); /// # Ok(()) /// # } /// ``` pub fn create_redis_queue( pool: RedisPool, worker_id: String, key_prefix: String, timeout_seconds: u64, ) -> Arc> where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static, { Arc::new(RedisQueueAdapter::new( pool, worker_id, key_prefix, timeout_seconds, )) } /// Create a new Redis-backed queue adapter with deduplication. /// /// This creates a distributed queue with deduplication to prevent duplicate items /// from being queued within the specified TTL window. /// /// # Arguments /// /// * `pool` - Redis connection pool /// * `worker_id` - Worker identifier for this queue instance /// * `key_prefix` - Redis key prefix for queue operations /// * `timeout_seconds` - Timeout for blocking operations /// * `dedup_enabled` - Whether to enable deduplication /// * `dedup_ttl` - TTL for deduplication keys in seconds /// /// # Examples /// /// ```no_run /// use quickdid::queue::{create_redis_queue_with_dedup, HandleResolutionWork}; /// use deadpool_redis::Config; /// /// # async fn example() -> anyhow::Result<()> { /// let cfg = Config::from_url("redis://localhost:6379"); /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; /// /// let queue = create_redis_queue_with_dedup::( /// pool, /// "worker-1".to_string(), /// "queue:myapp:".to_string(), /// 5, /// true, // Enable deduplication /// 60, // 60 second dedup window /// ); /// # Ok(()) /// # } /// ``` pub fn create_redis_queue_with_dedup( pool: RedisPool, worker_id: String, key_prefix: String, timeout_seconds: u64, dedup_enabled: bool, dedup_ttl: u64, ) -> Arc> where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static, { Arc::new(RedisQueueAdapter::with_dedup( pool, worker_id, key_prefix, timeout_seconds, dedup_enabled, dedup_ttl, )) } // ========= SQLite Queue Factories ========= /// Create a new SQLite queue adapter with unlimited queue size. /// /// This creates a persistent queue backed by SQLite database suitable /// for single-instance deployments that need persistence across restarts. /// The queue has no size limit and may grow unbounded. /// /// # Arguments /// /// * `pool` - SQLite connection pool /// /// # Examples /// /// ```no_run /// use quickdid::queue::{create_sqlite_queue, HandleResolutionWork}; /// use quickdid::sqlite_schema::create_sqlite_pool; /// /// # async fn example() -> anyhow::Result<()> { /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; /// let queue = create_sqlite_queue::(pool); /// # Ok(()) /// # } /// ``` pub fn create_sqlite_queue(pool: sqlx::SqlitePool) -> Arc> where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, { Arc::new(SqliteQueueAdapter::new(pool)) } /// Create a new SQLite queue adapter with work shedding. /// /// This creates a persistent queue with configurable maximum size. /// When the queue exceeds `max_size`, the oldest entries are automatically /// deleted to maintain the limit, preserving the most recent work items. /// /// # Arguments /// /// * `pool` - SQLite connection pool /// * `max_size` - Maximum number of entries (0 = unlimited) /// /// # Work Shedding Behavior /// /// - New work items are always accepted /// - When queue size exceeds `max_size`, oldest entries are deleted /// - Deletion happens atomically with insertion in a single transaction /// - Essential for long-running deployments to prevent disk space issues /// /// # Examples /// /// ```no_run /// use quickdid::queue::{create_sqlite_queue_with_max_size, HandleResolutionWork}; /// use quickdid::sqlite_schema::create_sqlite_pool; /// /// # async fn example() -> anyhow::Result<()> { /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; /// // Limit queue to 10,000 entries with automatic work shedding /// let queue = create_sqlite_queue_with_max_size::(pool, 10000); /// # Ok(()) /// # } /// ``` pub fn create_sqlite_queue_with_max_size( pool: sqlx::SqlitePool, max_size: u64, ) -> Arc> where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, { Arc::new(SqliteQueueAdapter::with_max_size(pool, max_size)) } // ========= No-op Queue Factory ========= /// Create a no-operation queue adapter. /// /// This creates a queue that discards all work items, useful for testing /// or when queue processing is disabled. /// /// # Examples /// /// ``` /// use quickdid::queue::create_noop_queue; /// /// let queue = create_noop_queue::(); /// ``` pub fn create_noop_queue() -> Arc> where T: Send + Sync + 'static, { Arc::new(NoopQueueAdapter::new()) } #[cfg(test)] mod tests { use super::*; use crate::queue::HandleResolutionWork; #[tokio::test] async fn test_create_mpsc_queue() { let queue = create_mpsc_queue::(10); queue.push("test".to_string()).await.unwrap(); let item = queue.pull().await; assert_eq!(item, Some("test".to_string())); } #[tokio::test] async fn test_create_mpsc_queue_from_channel() { let (sender, receiver) = mpsc::channel(5); let queue = create_mpsc_queue_from_channel(sender.clone(), receiver); // Send via original sender sender.send("external".to_string()).await.unwrap(); // Receive via queue let item = queue.pull().await; assert_eq!(item, Some("external".to_string())); } #[tokio::test] async fn test_create_noop_queue() { let queue = create_noop_queue::(); // Should accept pushes queue.push("ignored".to_string()).await.unwrap(); // Should report as healthy assert!(queue.is_healthy().await); // Should report depth as 0 assert_eq!(queue.depth().await, Some(0)); } #[tokio::test] async fn test_create_sqlite_queue() { // Create in-memory SQLite database for testing let pool = sqlx::SqlitePool::connect("sqlite::memory:") .await .expect("Failed to connect to in-memory SQLite"); // Create the queue schema crate::sqlite_schema::create_schema(&pool) .await .expect("Failed to create schema"); let queue = create_sqlite_queue::(pool); let work = HandleResolutionWork::new("test.example.com".to_string()); queue.push(work.clone()).await.unwrap(); let pulled = queue.pull().await; assert_eq!(pulled, Some(work)); } #[tokio::test] async fn test_create_sqlite_queue_with_max_size() { // Create in-memory SQLite database for testing let pool = sqlx::SqlitePool::connect("sqlite::memory:") .await .expect("Failed to connect to in-memory SQLite"); // Create the queue schema crate::sqlite_schema::create_schema(&pool) .await .expect("Failed to create schema"); // Create queue with small max size let queue = create_sqlite_queue_with_max_size::(pool, 5); // Push items for i in 0..10 { let work = HandleResolutionWork::new(format!("test-{}.example.com", i)); queue.push(work).await.unwrap(); } // Should have limited items due to work shedding let depth = queue.depth().await.unwrap(); assert!( depth <= 5, "Queue should have at most 5 items after work shedding" ); } #[tokio::test] async fn test_create_redis_queue() { let pool = match crate::test_helpers::get_test_redis_pool() { Some(p) => p, None => { eprintln!("Skipping Redis test - no Redis connection available"); return; } }; let test_prefix = format!( "test:factory:{}:", std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap() .as_nanos() ); let queue = create_redis_queue::(pool, "test-worker".to_string(), test_prefix, 1); queue.push("test-item".to_string()).await.unwrap(); let pulled = queue.pull().await; assert_eq!(pulled, Some("test-item".to_string())); } }