QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 8.1 kB view raw
1//! MPSC channel-based queue adapter implementation. 2//! 3//! This module provides an in-memory queue implementation using Tokio's 4//! multi-producer, single-consumer (MPSC) channels. It's suitable for 5//! single-instance deployments with moderate throughput requirements. 6 7use async_trait::async_trait; 8use std::sync::Arc; 9use tokio::sync::{Mutex, mpsc}; 10 11use super::adapter::QueueAdapter; 12use super::error::{QueueError, Result}; 13 14/// MPSC channel-based queue adapter implementation. 15/// 16/// This adapter uses tokio's multi-producer, single-consumer channel 17/// for in-memory queuing of work items. It provides fast, lock-free 18/// operation for single-instance deployments. 19/// 20/// # Features 21/// 22/// - In-memory operation (no persistence) 23/// - Bounded capacity with backpressure 24/// - Fast push/pull operations 25/// - No acknowledgment needed (fire-and-forget) 26/// 27/// # Limitations 28/// 29/// - No persistence across restarts 30/// - Single consumer only 31/// - No distributed operation 32/// 33/// # Examples 34/// 35/// ``` 36/// use quickdid::queue::MpscQueueAdapter; 37/// use quickdid::queue::QueueAdapter; 38/// 39/// # async fn example() -> anyhow::Result<()> { 40/// // Create a queue with buffer size of 100 41/// let queue = MpscQueueAdapter::<String>::new(100); 42/// 43/// // Push items 44/// queue.push("item1".to_string()).await?; 45/// queue.push("item2".to_string()).await?; 46/// 47/// // Pull items 48/// while let Some(item) = queue.pull().await { 49/// println!("Processing: {}", item); 50/// } 51/// # Ok(()) 52/// # } 53/// ``` 54pub struct MpscQueueAdapter<T> 55where 56 T: Send + Sync + 'static, 57{ 58 receiver: Arc<Mutex<mpsc::Receiver<T>>>, 59 sender: mpsc::Sender<T>, 60} 61 62impl<T> MpscQueueAdapter<T> 63where 64 T: Send + Sync + 'static, 65{ 66 /// Create a new MPSC queue adapter with the specified buffer size. 67 /// 68 /// # Arguments 69 /// 70 /// * `buffer` - The maximum number of items that can be buffered 71 /// 72 /// # Examples 73 /// 74 /// ``` 75 /// use quickdid::queue::MpscQueueAdapter; 76 /// 77 /// let queue = MpscQueueAdapter::<String>::new(100); 78 /// ``` 79 pub fn new(buffer: usize) -> Self { 80 let (sender, receiver) = mpsc::channel(buffer); 81 Self { 82 receiver: Arc::new(Mutex::new(receiver)), 83 sender, 84 } 85 } 86 87 /// Create an adapter from existing MPSC channels. 88 /// 89 /// This constructor is useful for integrating with existing channel-based 90 /// architectures or when you need custom channel configuration. 91 /// 92 /// # Arguments 93 /// 94 /// * `sender` - The sender half of the channel 95 /// * `receiver` - The receiver half of the channel 96 /// 97 /// # Examples 98 /// 99 /// ``` 100 /// use tokio::sync::mpsc; 101 /// use quickdid::queue::MpscQueueAdapter; 102 /// 103 /// let (sender, receiver) = mpsc::channel::<String>(50); 104 /// let queue = MpscQueueAdapter::from_channel(sender, receiver); 105 /// ``` 106 pub fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self { 107 Self { 108 receiver: Arc::new(Mutex::new(receiver)), 109 sender, 110 } 111 } 112} 113 114#[async_trait] 115impl<T> QueueAdapter<T> for MpscQueueAdapter<T> 116where 117 T: Send + Sync + 'static, 118{ 119 async fn pull(&self) -> Option<T> { 120 let mut receiver = self.receiver.lock().await; 121 receiver.recv().await 122 } 123 124 async fn push(&self, work: T) -> Result<()> { 125 self.sender 126 .send(work) 127 .await 128 .map_err(|e| QueueError::PushFailed(e.to_string())) 129 } 130 131 async fn try_push(&self, work: T) -> Result<()> { 132 self.sender.try_send(work).map_err(|e| match e { 133 mpsc::error::TrySendError::Full(_) => QueueError::QueueFull, 134 mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed, 135 }) 136 } 137 138 async fn depth(&self) -> Option<usize> { 139 // Note: This is an approximation as mpsc doesn't provide exact depth 140 Some(self.sender.max_capacity() - self.sender.capacity()) 141 } 142 143 async fn is_healthy(&self) -> bool { 144 !self.sender.is_closed() 145 } 146} 147 148#[cfg(test)] 149mod tests { 150 use super::*; 151 152 #[tokio::test] 153 async fn test_mpsc_queue_push_pull() { 154 let queue = MpscQueueAdapter::<String>::new(10); 155 156 // Test push 157 queue.push("test1".to_string()).await.unwrap(); 158 queue.push("test2".to_string()).await.unwrap(); 159 160 // Test pull in FIFO order 161 let item1 = queue.pull().await; 162 assert_eq!(item1, Some("test1".to_string())); 163 164 let item2 = queue.pull().await; 165 assert_eq!(item2, Some("test2".to_string())); 166 } 167 168 #[tokio::test] 169 async fn test_mpsc_queue_try_push() { 170 // Create a small queue to test full condition 171 let queue = MpscQueueAdapter::<i32>::new(2); 172 173 // Fill the queue 174 queue.push(1).await.unwrap(); 175 queue.push(2).await.unwrap(); 176 177 // Try to push when full should fail 178 let result = queue.try_push(3).await; 179 assert!(matches!(result, Err(QueueError::QueueFull))); 180 181 // Pull one item to make space 182 let _ = queue.pull().await; 183 184 // Now try_push should succeed 185 queue.try_push(3).await.unwrap(); 186 } 187 188 #[tokio::test] 189 async fn test_mpsc_queue_from_channel() { 190 let (sender, receiver) = mpsc::channel(5); 191 let queue = MpscQueueAdapter::from_channel(sender.clone(), receiver); 192 193 // Send via original sender 194 sender.send("external".to_string()).await.unwrap(); 195 196 // Send via queue 197 queue.push("internal".to_string()).await.unwrap(); 198 199 // Pull both items 200 assert_eq!(queue.pull().await, Some("external".to_string())); 201 assert_eq!(queue.pull().await, Some("internal".to_string())); 202 } 203 204 #[tokio::test] 205 async fn test_mpsc_queue_health() { 206 let queue = MpscQueueAdapter::<String>::new(10); 207 208 // Queue should be healthy initially 209 assert!(queue.is_healthy().await); 210 211 // Create a queue and drop the receiver to close it 212 let (sender, receiver) = mpsc::channel::<String>(10); 213 drop(receiver); 214 let closed_queue = MpscQueueAdapter::from_channel(sender, mpsc::channel(1).1); 215 216 // Push should fail on closed queue 217 let result = closed_queue.push("test".to_string()).await; 218 assert!(result.is_err()); 219 } 220 221 #[tokio::test] 222 async fn test_mpsc_queue_depth() { 223 let queue = MpscQueueAdapter::<i32>::new(10); 224 225 // Initially empty 226 let depth = queue.depth().await; 227 assert_eq!(depth, Some(0)); 228 229 // Add items and check depth 230 queue.push(1).await.unwrap(); 231 queue.push(2).await.unwrap(); 232 queue.push(3).await.unwrap(); 233 234 let depth = queue.depth().await; 235 assert_eq!(depth, Some(3)); 236 237 // Pull an item and check depth 238 let _ = queue.pull().await; 239 let depth = queue.depth().await; 240 assert_eq!(depth, Some(2)); 241 } 242 243 #[tokio::test] 244 async fn test_mpsc_queue_concurrent_operations() { 245 use std::sync::Arc; 246 247 let queue = Arc::new(MpscQueueAdapter::<i32>::new(100)); 248 249 // Spawn multiple producers 250 let mut handles = vec![]; 251 for i in 0..10 { 252 let q = queue.clone(); 253 handles.push(tokio::spawn(async move { 254 for j in 0..10 { 255 q.push(i * 10 + j).await.unwrap(); 256 } 257 })); 258 } 259 260 // Wait for all producers 261 for handle in handles { 262 handle.await.unwrap(); 263 } 264 265 // Verify we can pull all 100 items 266 let mut count = 0; 267 while queue.pull().await.is_some() { 268 count += 1; 269 if count >= 100 { 270 break; 271 } 272 } 273 assert_eq!(count, 100); 274 } 275 276 #[tokio::test] 277 async fn test_mpsc_queue_no_ack_needed() { 278 let queue = MpscQueueAdapter::<String>::new(10); 279 280 queue.push("test".to_string()).await.unwrap(); 281 let item = queue.pull().await.unwrap(); 282 283 // Ack should always succeed (no-op) 284 queue.ack(&item).await.unwrap(); 285 } 286}