//! MPSC channel-based queue adapter implementation. //! //! This module provides an in-memory queue implementation using Tokio's //! multi-producer, single-consumer (MPSC) channels. It's suitable for //! single-instance deployments with moderate throughput requirements. use async_trait::async_trait; use std::sync::Arc; use tokio::sync::{Mutex, mpsc}; use super::adapter::QueueAdapter; use super::error::{QueueError, Result}; /// MPSC channel-based queue adapter implementation. /// /// This adapter uses tokio's multi-producer, single-consumer channel /// for in-memory queuing of work items. It provides fast, lock-free /// operation for single-instance deployments. /// /// # Features /// /// - In-memory operation (no persistence) /// - Bounded capacity with backpressure /// - Fast push/pull operations /// - No acknowledgment needed (fire-and-forget) /// /// # Limitations /// /// - No persistence across restarts /// - Single consumer only /// - No distributed operation /// /// # Examples /// /// ``` /// use quickdid::queue::MpscQueueAdapter; /// use quickdid::queue::QueueAdapter; /// /// # async fn example() -> anyhow::Result<()> { /// // Create a queue with buffer size of 100 /// let queue = MpscQueueAdapter::::new(100); /// /// // Push items /// queue.push("item1".to_string()).await?; /// queue.push("item2".to_string()).await?; /// /// // Pull items /// while let Some(item) = queue.pull().await { /// println!("Processing: {}", item); /// } /// # Ok(()) /// # } /// ``` pub struct MpscQueueAdapter where T: Send + Sync + 'static, { receiver: Arc>>, sender: mpsc::Sender, } impl MpscQueueAdapter where T: Send + Sync + 'static, { /// Create a new MPSC queue adapter with the specified buffer size. /// /// # Arguments /// /// * `buffer` - The maximum number of items that can be buffered /// /// # Examples /// /// ``` /// use quickdid::queue::MpscQueueAdapter; /// /// let queue = MpscQueueAdapter::::new(100); /// ``` pub fn new(buffer: usize) -> Self { let (sender, receiver) = mpsc::channel(buffer); Self { receiver: Arc::new(Mutex::new(receiver)), sender, } } /// Create an adapter from existing MPSC channels. /// /// This constructor is useful for integrating with existing channel-based /// architectures or when you need custom channel configuration. /// /// # Arguments /// /// * `sender` - The sender half of the channel /// * `receiver` - The receiver half of the channel /// /// # Examples /// /// ``` /// use tokio::sync::mpsc; /// use quickdid::queue::MpscQueueAdapter; /// /// let (sender, receiver) = mpsc::channel::(50); /// let queue = MpscQueueAdapter::from_channel(sender, receiver); /// ``` pub fn from_channel(sender: mpsc::Sender, receiver: mpsc::Receiver) -> Self { Self { receiver: Arc::new(Mutex::new(receiver)), sender, } } } #[async_trait] impl QueueAdapter for MpscQueueAdapter where T: Send + Sync + 'static, { async fn pull(&self) -> Option { let mut receiver = self.receiver.lock().await; receiver.recv().await } async fn push(&self, work: T) -> Result<()> { self.sender .send(work) .await .map_err(|e| QueueError::PushFailed(e.to_string())) } async fn try_push(&self, work: T) -> Result<()> { self.sender.try_send(work).map_err(|e| match e { mpsc::error::TrySendError::Full(_) => QueueError::QueueFull, mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed, }) } async fn depth(&self) -> Option { // Note: This is an approximation as mpsc doesn't provide exact depth Some(self.sender.max_capacity() - self.sender.capacity()) } async fn is_healthy(&self) -> bool { !self.sender.is_closed() } } #[cfg(test)] mod tests { use super::*; #[tokio::test] async fn test_mpsc_queue_push_pull() { let queue = MpscQueueAdapter::::new(10); // Test push queue.push("test1".to_string()).await.unwrap(); queue.push("test2".to_string()).await.unwrap(); // Test pull in FIFO order let item1 = queue.pull().await; assert_eq!(item1, Some("test1".to_string())); let item2 = queue.pull().await; assert_eq!(item2, Some("test2".to_string())); } #[tokio::test] async fn test_mpsc_queue_try_push() { // Create a small queue to test full condition let queue = MpscQueueAdapter::::new(2); // Fill the queue queue.push(1).await.unwrap(); queue.push(2).await.unwrap(); // Try to push when full should fail let result = queue.try_push(3).await; assert!(matches!(result, Err(QueueError::QueueFull))); // Pull one item to make space let _ = queue.pull().await; // Now try_push should succeed queue.try_push(3).await.unwrap(); } #[tokio::test] async fn test_mpsc_queue_from_channel() { let (sender, receiver) = mpsc::channel(5); let queue = MpscQueueAdapter::from_channel(sender.clone(), receiver); // Send via original sender sender.send("external".to_string()).await.unwrap(); // Send via queue queue.push("internal".to_string()).await.unwrap(); // Pull both items assert_eq!(queue.pull().await, Some("external".to_string())); assert_eq!(queue.pull().await, Some("internal".to_string())); } #[tokio::test] async fn test_mpsc_queue_health() { let queue = MpscQueueAdapter::::new(10); // Queue should be healthy initially assert!(queue.is_healthy().await); // Create a queue and drop the receiver to close it let (sender, receiver) = mpsc::channel::(10); drop(receiver); let closed_queue = MpscQueueAdapter::from_channel(sender, mpsc::channel(1).1); // Push should fail on closed queue let result = closed_queue.push("test".to_string()).await; assert!(result.is_err()); } #[tokio::test] async fn test_mpsc_queue_depth() { let queue = MpscQueueAdapter::::new(10); // Initially empty let depth = queue.depth().await; assert_eq!(depth, Some(0)); // Add items and check depth queue.push(1).await.unwrap(); queue.push(2).await.unwrap(); queue.push(3).await.unwrap(); let depth = queue.depth().await; assert_eq!(depth, Some(3)); // Pull an item and check depth let _ = queue.pull().await; let depth = queue.depth().await; assert_eq!(depth, Some(2)); } #[tokio::test] async fn test_mpsc_queue_concurrent_operations() { use std::sync::Arc; let queue = Arc::new(MpscQueueAdapter::::new(100)); // Spawn multiple producers let mut handles = vec![]; for i in 0..10 { let q = queue.clone(); handles.push(tokio::spawn(async move { for j in 0..10 { q.push(i * 10 + j).await.unwrap(); } })); } // Wait for all producers for handle in handles { handle.await.unwrap(); } // Verify we can pull all 100 items let mut count = 0; while queue.pull().await.is_some() { count += 1; if count >= 100 { break; } } assert_eq!(count, 100); } #[tokio::test] async fn test_mpsc_queue_no_ack_needed() { let queue = MpscQueueAdapter::::new(10); queue.push("test".to_string()).await.unwrap(); let item = queue.pull().await.unwrap(); // Ack should always succeed (no-op) queue.ack(&item).await.unwrap(); } }