//! Queue adapter trait definition. //! //! This module defines the core `QueueAdapter` trait that provides a common //! interface for different queue implementations (MPSC, Redis, SQLite, etc.). use super::error::Result; use async_trait::async_trait; /// Generic trait for queue adapters that can work with any work type. /// /// This trait provides a common interface for different queue implementations /// (MPSC, Redis, PostgreSQL, SQLite, etc.) allowing them to be used interchangeably. /// /// # Type Parameters /// /// * `T` - The type of work items that this queue processes. Must be `Send + Sync + 'static`. /// /// # Implementation Notes /// /// Implementors should ensure that: /// - `pull()` blocks until an item is available or the queue is closed /// - `push()` may block if the queue has a bounded capacity /// - `ack()` is used for reliable delivery semantics (can be no-op for simple queues) /// - `try_push()` never blocks and returns an error if the queue is full /// /// # Examples /// /// ```no_run /// use quickdid::queue::{QueueAdapter, MpscQueueAdapter}; /// use std::sync::Arc; /// /// # async fn example() -> anyhow::Result<()> { /// // Create a queue adapter for String work items /// let queue: Arc> = Arc::new(MpscQueueAdapter::new(100)); /// /// // Push work to the queue /// queue.push("process-this".to_string()).await?; /// /// // Pull work from the queue /// if let Some(work) = queue.pull().await { /// println!("Processing: {}", work); /// // Acknowledge completion /// queue.ack(&work).await?; /// } /// # Ok(()) /// # } /// ``` #[async_trait] pub trait QueueAdapter: Send + Sync where T: Send + Sync + 'static, { /// Pull the next work item from the queue. /// /// This method blocks until an item is available or the queue is closed. /// Returns `None` if the queue is closed or empty (depending on implementation). /// /// # Returns /// /// * `Some(T)` - The next work item from the queue /// * `None` - The queue is closed or empty async fn pull(&self) -> Option; /// Push a work item to the queue. /// /// This method may block if the queue has bounded capacity and is full. /// /// # Arguments /// /// * `work` - The work item to add to the queue /// /// # Errors /// /// Returns an error if: /// - The queue is full (for bounded queues) /// - The queue is closed /// - Serialization fails (for persistent queues) /// - Backend connection fails (for Redis/SQLite) async fn push(&self, work: T) -> Result<()>; /// Acknowledge that a work item has been successfully processed. /// /// This is used by reliable queue implementations to remove the item /// from a temporary processing queue. Implementations that don't require /// acknowledgment (like MPSC) can use the default no-op implementation. /// /// # Arguments /// /// * `item` - The work item to acknowledge /// /// # Errors /// /// Returns an error if acknowledgment fails (backend-specific). async fn ack(&self, _item: &T) -> Result<()> { // Default no-op implementation for queues that don't need acknowledgment Ok(()) } /// Try to push a work item without blocking. /// /// This method returns immediately with an error if the queue is full. /// /// # Arguments /// /// * `work` - The work item to add to the queue /// /// # Errors /// /// Returns an error if: /// - The queue is full /// - The queue is closed /// - Other backend-specific errors occur async fn try_push(&self, work: T) -> Result<()> { // Default implementation uses regular push self.push(work).await } /// Get the current queue depth if available. /// /// # Returns /// /// * `Some(usize)` - The number of items currently in the queue /// * `None` - Queue depth is not available or cannot be determined async fn depth(&self) -> Option { None } /// Check if the queue is healthy. /// /// Used for health checks and monitoring. Implementations should verify /// backend connectivity and basic functionality. /// /// # Returns /// /// * `true` - The queue is operational /// * `false` - The queue has issues or is disconnected async fn is_healthy(&self) -> bool { true } } #[cfg(test)] mod tests { use super::*; // Mock implementation for testing the trait struct MockQueue { _phantom: std::marker::PhantomData, } impl MockQueue { fn new() -> Self { Self { _phantom: std::marker::PhantomData, } } } #[async_trait] impl QueueAdapter for MockQueue where T: Send + Sync + 'static, { async fn pull(&self) -> Option { None } async fn push(&self, _work: T) -> Result<()> { Ok(()) } } #[tokio::test] async fn test_default_trait_methods() { let queue = MockQueue::::new(); // Test default ack implementation assert!(queue.ack(&"test".to_string()).await.is_ok()); // Test default try_push implementation assert!(queue.try_push("test".to_string()).await.is_ok()); // Test default depth implementation assert_eq!(queue.depth().await, None); // Test default is_healthy implementation assert!(queue.is_healthy().await); } }