QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 5.6 kB view raw
1//! Queue adapter trait definition. 2//! 3//! This module defines the core `QueueAdapter` trait that provides a common 4//! interface for different queue implementations (MPSC, Redis, SQLite, etc.). 5 6use super::error::Result; 7use async_trait::async_trait; 8 9/// Generic trait for queue adapters that can work with any work type. 10/// 11/// This trait provides a common interface for different queue implementations 12/// (MPSC, Redis, PostgreSQL, SQLite, etc.) allowing them to be used interchangeably. 13/// 14/// # Type Parameters 15/// 16/// * `T` - The type of work items that this queue processes. Must be `Send + Sync + 'static`. 17/// 18/// # Implementation Notes 19/// 20/// Implementors should ensure that: 21/// - `pull()` blocks until an item is available or the queue is closed 22/// - `push()` may block if the queue has a bounded capacity 23/// - `ack()` is used for reliable delivery semantics (can be no-op for simple queues) 24/// - `try_push()` never blocks and returns an error if the queue is full 25/// 26/// # Examples 27/// 28/// ```no_run 29/// use quickdid::queue::{QueueAdapter, MpscQueueAdapter}; 30/// use std::sync::Arc; 31/// 32/// # async fn example() -> anyhow::Result<()> { 33/// // Create a queue adapter for String work items 34/// let queue: Arc<dyn QueueAdapter<String>> = Arc::new(MpscQueueAdapter::new(100)); 35/// 36/// // Push work to the queue 37/// queue.push("process-this".to_string()).await?; 38/// 39/// // Pull work from the queue 40/// if let Some(work) = queue.pull().await { 41/// println!("Processing: {}", work); 42/// // Acknowledge completion 43/// queue.ack(&work).await?; 44/// } 45/// # Ok(()) 46/// # } 47/// ``` 48#[async_trait] 49pub trait QueueAdapter<T>: Send + Sync 50where 51 T: Send + Sync + 'static, 52{ 53 /// Pull the next work item from the queue. 54 /// 55 /// This method blocks until an item is available or the queue is closed. 56 /// Returns `None` if the queue is closed or empty (depending on implementation). 57 /// 58 /// # Returns 59 /// 60 /// * `Some(T)` - The next work item from the queue 61 /// * `None` - The queue is closed or empty 62 async fn pull(&self) -> Option<T>; 63 64 /// Push a work item to the queue. 65 /// 66 /// This method may block if the queue has bounded capacity and is full. 67 /// 68 /// # Arguments 69 /// 70 /// * `work` - The work item to add to the queue 71 /// 72 /// # Errors 73 /// 74 /// Returns an error if: 75 /// - The queue is full (for bounded queues) 76 /// - The queue is closed 77 /// - Serialization fails (for persistent queues) 78 /// - Backend connection fails (for Redis/SQLite) 79 async fn push(&self, work: T) -> Result<()>; 80 81 /// Acknowledge that a work item has been successfully processed. 82 /// 83 /// This is used by reliable queue implementations to remove the item 84 /// from a temporary processing queue. Implementations that don't require 85 /// acknowledgment (like MPSC) can use the default no-op implementation. 86 /// 87 /// # Arguments 88 /// 89 /// * `item` - The work item to acknowledge 90 /// 91 /// # Errors 92 /// 93 /// Returns an error if acknowledgment fails (backend-specific). 94 async fn ack(&self, _item: &T) -> Result<()> { 95 // Default no-op implementation for queues that don't need acknowledgment 96 Ok(()) 97 } 98 99 /// Try to push a work item without blocking. 100 /// 101 /// This method returns immediately with an error if the queue is full. 102 /// 103 /// # Arguments 104 /// 105 /// * `work` - The work item to add to the queue 106 /// 107 /// # Errors 108 /// 109 /// Returns an error if: 110 /// - The queue is full 111 /// - The queue is closed 112 /// - Other backend-specific errors occur 113 async fn try_push(&self, work: T) -> Result<()> { 114 // Default implementation uses regular push 115 self.push(work).await 116 } 117 118 /// Get the current queue depth if available. 119 /// 120 /// # Returns 121 /// 122 /// * `Some(usize)` - The number of items currently in the queue 123 /// * `None` - Queue depth is not available or cannot be determined 124 async fn depth(&self) -> Option<usize> { 125 None 126 } 127 128 /// Check if the queue is healthy. 129 /// 130 /// Used for health checks and monitoring. Implementations should verify 131 /// backend connectivity and basic functionality. 132 /// 133 /// # Returns 134 /// 135 /// * `true` - The queue is operational 136 /// * `false` - The queue has issues or is disconnected 137 async fn is_healthy(&self) -> bool { 138 true 139 } 140} 141 142#[cfg(test)] 143mod tests { 144 use super::*; 145 146 // Mock implementation for testing the trait 147 struct MockQueue<T> { 148 _phantom: std::marker::PhantomData<T>, 149 } 150 151 impl<T> MockQueue<T> { 152 fn new() -> Self { 153 Self { 154 _phantom: std::marker::PhantomData, 155 } 156 } 157 } 158 159 #[async_trait] 160 impl<T> QueueAdapter<T> for MockQueue<T> 161 where 162 T: Send + Sync + 'static, 163 { 164 async fn pull(&self) -> Option<T> { 165 None 166 } 167 168 async fn push(&self, _work: T) -> Result<()> { 169 Ok(()) 170 } 171 } 172 173 #[tokio::test] 174 async fn test_default_trait_methods() { 175 let queue = MockQueue::<String>::new(); 176 177 // Test default ack implementation 178 assert!(queue.ack(&"test".to_string()).await.is_ok()); 179 180 // Test default try_push implementation 181 assert!(queue.try_push("test".to_string()).await.is_ok()); 182 183 // Test default depth implementation 184 assert_eq!(queue.depth().await, None); 185 186 // Test default is_healthy implementation 187 assert!(queue.is_healthy().await); 188 } 189}