forked from
smokesignal.events/quickdid
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Queue adapter trait definition.
2//!
3//! This module defines the core `QueueAdapter` trait that provides a common
4//! interface for different queue implementations (MPSC, Redis, SQLite, etc.).
5
6use super::error::Result;
7use async_trait::async_trait;
8
9/// Generic trait for queue adapters that can work with any work type.
10///
11/// This trait provides a common interface for different queue implementations
12/// (MPSC, Redis, PostgreSQL, SQLite, etc.) allowing them to be used interchangeably.
13///
14/// # Type Parameters
15///
16/// * `T` - The type of work items that this queue processes. Must be `Send + Sync + 'static`.
17///
18/// # Implementation Notes
19///
20/// Implementors should ensure that:
21/// - `pull()` blocks until an item is available or the queue is closed
22/// - `push()` may block if the queue has a bounded capacity
23/// - `ack()` is used for reliable delivery semantics (can be no-op for simple queues)
24/// - `try_push()` never blocks and returns an error if the queue is full
25///
26/// # Examples
27///
28/// ```no_run
29/// use quickdid::queue::{QueueAdapter, MpscQueueAdapter};
30/// use std::sync::Arc;
31///
32/// # async fn example() -> anyhow::Result<()> {
33/// // Create a queue adapter for String work items
34/// let queue: Arc<dyn QueueAdapter<String>> = Arc::new(MpscQueueAdapter::new(100));
35///
36/// // Push work to the queue
37/// queue.push("process-this".to_string()).await?;
38///
39/// // Pull work from the queue
40/// if let Some(work) = queue.pull().await {
41/// println!("Processing: {}", work);
42/// // Acknowledge completion
43/// queue.ack(&work).await?;
44/// }
45/// # Ok(())
46/// # }
47/// ```
48#[async_trait]
49pub trait QueueAdapter<T>: Send + Sync
50where
51 T: Send + Sync + 'static,
52{
53 /// Pull the next work item from the queue.
54 ///
55 /// This method blocks until an item is available or the queue is closed.
56 /// Returns `None` if the queue is closed or empty (depending on implementation).
57 ///
58 /// # Returns
59 ///
60 /// * `Some(T)` - The next work item from the queue
61 /// * `None` - The queue is closed or empty
62 async fn pull(&self) -> Option<T>;
63
64 /// Push a work item to the queue.
65 ///
66 /// This method may block if the queue has bounded capacity and is full.
67 ///
68 /// # Arguments
69 ///
70 /// * `work` - The work item to add to the queue
71 ///
72 /// # Errors
73 ///
74 /// Returns an error if:
75 /// - The queue is full (for bounded queues)
76 /// - The queue is closed
77 /// - Serialization fails (for persistent queues)
78 /// - Backend connection fails (for Redis/SQLite)
79 async fn push(&self, work: T) -> Result<()>;
80
81 /// Acknowledge that a work item has been successfully processed.
82 ///
83 /// This is used by reliable queue implementations to remove the item
84 /// from a temporary processing queue. Implementations that don't require
85 /// acknowledgment (like MPSC) can use the default no-op implementation.
86 ///
87 /// # Arguments
88 ///
89 /// * `item` - The work item to acknowledge
90 ///
91 /// # Errors
92 ///
93 /// Returns an error if acknowledgment fails (backend-specific).
94 async fn ack(&self, _item: &T) -> Result<()> {
95 // Default no-op implementation for queues that don't need acknowledgment
96 Ok(())
97 }
98
99 /// Try to push a work item without blocking.
100 ///
101 /// This method returns immediately with an error if the queue is full.
102 ///
103 /// # Arguments
104 ///
105 /// * `work` - The work item to add to the queue
106 ///
107 /// # Errors
108 ///
109 /// Returns an error if:
110 /// - The queue is full
111 /// - The queue is closed
112 /// - Other backend-specific errors occur
113 async fn try_push(&self, work: T) -> Result<()> {
114 // Default implementation uses regular push
115 self.push(work).await
116 }
117
118 /// Get the current queue depth if available.
119 ///
120 /// # Returns
121 ///
122 /// * `Some(usize)` - The number of items currently in the queue
123 /// * `None` - Queue depth is not available or cannot be determined
124 async fn depth(&self) -> Option<usize> {
125 None
126 }
127
128 /// Check if the queue is healthy.
129 ///
130 /// Used for health checks and monitoring. Implementations should verify
131 /// backend connectivity and basic functionality.
132 ///
133 /// # Returns
134 ///
135 /// * `true` - The queue is operational
136 /// * `false` - The queue has issues or is disconnected
137 async fn is_healthy(&self) -> bool {
138 true
139 }
140}
141
142#[cfg(test)]
143mod tests {
144 use super::*;
145
146 // Mock implementation for testing the trait
147 struct MockQueue<T> {
148 _phantom: std::marker::PhantomData<T>,
149 }
150
151 impl<T> MockQueue<T> {
152 fn new() -> Self {
153 Self {
154 _phantom: std::marker::PhantomData,
155 }
156 }
157 }
158
159 #[async_trait]
160 impl<T> QueueAdapter<T> for MockQueue<T>
161 where
162 T: Send + Sync + 'static,
163 {
164 async fn pull(&self) -> Option<T> {
165 None
166 }
167
168 async fn push(&self, _work: T) -> Result<()> {
169 Ok(())
170 }
171 }
172
173 #[tokio::test]
174 async fn test_default_trait_methods() {
175 let queue = MockQueue::<String>::new();
176
177 // Test default ack implementation
178 assert!(queue.ack(&"test".to_string()).await.is_ok());
179
180 // Test default try_push implementation
181 assert!(queue.try_push("test".to_string()).await.is_ok());
182
183 // Test default depth implementation
184 assert_eq!(queue.depth().await, None);
185
186 // Test default is_healthy implementation
187 assert!(queue.is_healthy().await);
188 }
189}