forked from
smokesignal.events/quickdid
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! MPSC channel-based queue adapter implementation.
2//!
3//! This module provides an in-memory queue implementation using Tokio's
4//! multi-producer, single-consumer (MPSC) channels. It's suitable for
5//! single-instance deployments with moderate throughput requirements.
6
7use async_trait::async_trait;
8use std::sync::Arc;
9use tokio::sync::{Mutex, mpsc};
10
11use super::adapter::QueueAdapter;
12use super::error::{QueueError, Result};
13
14/// MPSC channel-based queue adapter implementation.
15///
16/// This adapter uses tokio's multi-producer, single-consumer channel
17/// for in-memory queuing of work items. It provides fast, lock-free
18/// operation for single-instance deployments.
19///
20/// # Features
21///
22/// - In-memory operation (no persistence)
23/// - Bounded capacity with backpressure
24/// - Fast push/pull operations
25/// - No acknowledgment needed (fire-and-forget)
26///
27/// # Limitations
28///
29/// - No persistence across restarts
30/// - Single consumer only
31/// - No distributed operation
32///
33/// # Examples
34///
35/// ```
36/// use quickdid::queue::MpscQueueAdapter;
37/// use quickdid::queue::QueueAdapter;
38///
39/// # async fn example() -> anyhow::Result<()> {
40/// // Create a queue with buffer size of 100
41/// let queue = MpscQueueAdapter::<String>::new(100);
42///
43/// // Push items
44/// queue.push("item1".to_string()).await?;
45/// queue.push("item2".to_string()).await?;
46///
47/// // Pull items
48/// while let Some(item) = queue.pull().await {
49/// println!("Processing: {}", item);
50/// }
51/// # Ok(())
52/// # }
53/// ```
54pub struct MpscQueueAdapter<T>
55where
56 T: Send + Sync + 'static,
57{
58 receiver: Arc<Mutex<mpsc::Receiver<T>>>,
59 sender: mpsc::Sender<T>,
60}
61
62impl<T> MpscQueueAdapter<T>
63where
64 T: Send + Sync + 'static,
65{
66 /// Create a new MPSC queue adapter with the specified buffer size.
67 ///
68 /// # Arguments
69 ///
70 /// * `buffer` - The maximum number of items that can be buffered
71 ///
72 /// # Examples
73 ///
74 /// ```
75 /// use quickdid::queue::MpscQueueAdapter;
76 ///
77 /// let queue = MpscQueueAdapter::<String>::new(100);
78 /// ```
79 pub fn new(buffer: usize) -> Self {
80 let (sender, receiver) = mpsc::channel(buffer);
81 Self {
82 receiver: Arc::new(Mutex::new(receiver)),
83 sender,
84 }
85 }
86
87 /// Create an adapter from existing MPSC channels.
88 ///
89 /// This constructor is useful for integrating with existing channel-based
90 /// architectures or when you need custom channel configuration.
91 ///
92 /// # Arguments
93 ///
94 /// * `sender` - The sender half of the channel
95 /// * `receiver` - The receiver half of the channel
96 ///
97 /// # Examples
98 ///
99 /// ```
100 /// use tokio::sync::mpsc;
101 /// use quickdid::queue::MpscQueueAdapter;
102 ///
103 /// let (sender, receiver) = mpsc::channel::<String>(50);
104 /// let queue = MpscQueueAdapter::from_channel(sender, receiver);
105 /// ```
106 pub fn from_channel(sender: mpsc::Sender<T>, receiver: mpsc::Receiver<T>) -> Self {
107 Self {
108 receiver: Arc::new(Mutex::new(receiver)),
109 sender,
110 }
111 }
112}
113
114#[async_trait]
115impl<T> QueueAdapter<T> for MpscQueueAdapter<T>
116where
117 T: Send + Sync + 'static,
118{
119 async fn pull(&self) -> Option<T> {
120 let mut receiver = self.receiver.lock().await;
121 receiver.recv().await
122 }
123
124 async fn push(&self, work: T) -> Result<()> {
125 self.sender
126 .send(work)
127 .await
128 .map_err(|e| QueueError::PushFailed(e.to_string()))
129 }
130
131 async fn try_push(&self, work: T) -> Result<()> {
132 self.sender.try_send(work).map_err(|e| match e {
133 mpsc::error::TrySendError::Full(_) => QueueError::QueueFull,
134 mpsc::error::TrySendError::Closed(_) => QueueError::QueueClosed,
135 })
136 }
137
138 async fn depth(&self) -> Option<usize> {
139 // Note: This is an approximation as mpsc doesn't provide exact depth
140 Some(self.sender.max_capacity() - self.sender.capacity())
141 }
142
143 async fn is_healthy(&self) -> bool {
144 !self.sender.is_closed()
145 }
146}
147
148#[cfg(test)]
149mod tests {
150 use super::*;
151
152 #[tokio::test]
153 async fn test_mpsc_queue_push_pull() {
154 let queue = MpscQueueAdapter::<String>::new(10);
155
156 // Test push
157 queue.push("test1".to_string()).await.unwrap();
158 queue.push("test2".to_string()).await.unwrap();
159
160 // Test pull in FIFO order
161 let item1 = queue.pull().await;
162 assert_eq!(item1, Some("test1".to_string()));
163
164 let item2 = queue.pull().await;
165 assert_eq!(item2, Some("test2".to_string()));
166 }
167
168 #[tokio::test]
169 async fn test_mpsc_queue_try_push() {
170 // Create a small queue to test full condition
171 let queue = MpscQueueAdapter::<i32>::new(2);
172
173 // Fill the queue
174 queue.push(1).await.unwrap();
175 queue.push(2).await.unwrap();
176
177 // Try to push when full should fail
178 let result = queue.try_push(3).await;
179 assert!(matches!(result, Err(QueueError::QueueFull)));
180
181 // Pull one item to make space
182 let _ = queue.pull().await;
183
184 // Now try_push should succeed
185 queue.try_push(3).await.unwrap();
186 }
187
188 #[tokio::test]
189 async fn test_mpsc_queue_from_channel() {
190 let (sender, receiver) = mpsc::channel(5);
191 let queue = MpscQueueAdapter::from_channel(sender.clone(), receiver);
192
193 // Send via original sender
194 sender.send("external".to_string()).await.unwrap();
195
196 // Send via queue
197 queue.push("internal".to_string()).await.unwrap();
198
199 // Pull both items
200 assert_eq!(queue.pull().await, Some("external".to_string()));
201 assert_eq!(queue.pull().await, Some("internal".to_string()));
202 }
203
204 #[tokio::test]
205 async fn test_mpsc_queue_health() {
206 let queue = MpscQueueAdapter::<String>::new(10);
207
208 // Queue should be healthy initially
209 assert!(queue.is_healthy().await);
210
211 // Create a queue and drop the receiver to close it
212 let (sender, receiver) = mpsc::channel::<String>(10);
213 drop(receiver);
214 let closed_queue = MpscQueueAdapter::from_channel(sender, mpsc::channel(1).1);
215
216 // Push should fail on closed queue
217 let result = closed_queue.push("test".to_string()).await;
218 assert!(result.is_err());
219 }
220
221 #[tokio::test]
222 async fn test_mpsc_queue_depth() {
223 let queue = MpscQueueAdapter::<i32>::new(10);
224
225 // Initially empty
226 let depth = queue.depth().await;
227 assert_eq!(depth, Some(0));
228
229 // Add items and check depth
230 queue.push(1).await.unwrap();
231 queue.push(2).await.unwrap();
232 queue.push(3).await.unwrap();
233
234 let depth = queue.depth().await;
235 assert_eq!(depth, Some(3));
236
237 // Pull an item and check depth
238 let _ = queue.pull().await;
239 let depth = queue.depth().await;
240 assert_eq!(depth, Some(2));
241 }
242
243 #[tokio::test]
244 async fn test_mpsc_queue_concurrent_operations() {
245 use std::sync::Arc;
246
247 let queue = Arc::new(MpscQueueAdapter::<i32>::new(100));
248
249 // Spawn multiple producers
250 let mut handles = vec![];
251 for i in 0..10 {
252 let q = queue.clone();
253 handles.push(tokio::spawn(async move {
254 for j in 0..10 {
255 q.push(i * 10 + j).await.unwrap();
256 }
257 }));
258 }
259
260 // Wait for all producers
261 for handle in handles {
262 handle.await.unwrap();
263 }
264
265 // Verify we can pull all 100 items
266 let mut count = 0;
267 while queue.pull().await.is_some() {
268 count += 1;
269 if count >= 100 {
270 break;
271 }
272 }
273 assert_eq!(count, 100);
274 }
275
276 #[tokio::test]
277 async fn test_mpsc_queue_no_ack_needed() {
278 let queue = MpscQueueAdapter::<String>::new(10);
279
280 queue.push("test".to_string()).await.unwrap();
281 let item = queue.pull().await.unwrap();
282
283 // Ack should always succeed (no-op)
284 queue.ack(&item).await.unwrap();
285 }
286}