forked from
smokesignal.events/quickdid
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! Factory functions for creating queue adapters.
2//!
3//! This module provides convenient factory functions for creating different
4//! types of queue adapters with appropriate configurations.
5
6use deadpool_redis::Pool as RedisPool;
7use serde::{Deserialize, Serialize};
8use std::sync::Arc;
9use tokio::sync::mpsc;
10
11use super::{
12 adapter::QueueAdapter, mpsc::MpscQueueAdapter, noop::NoopQueueAdapter,
13 redis::RedisQueueAdapter, sqlite::SqliteQueueAdapter, work::DedupKey,
14};
15
16// ========= MPSC Queue Factories =========
17
18/// Create a new MPSC queue adapter with the specified buffer size.
19///
20/// This creates an in-memory queue suitable for single-instance deployments.
21///
22/// # Arguments
23///
24/// * `buffer` - The buffer size for the channel
25///
26/// # Examples
27///
28/// ```
29/// use quickdid::queue::create_mpsc_queue;
30///
31/// let queue = create_mpsc_queue::<String>(100);
32/// ```
33pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>>
34where
35 T: Send + Sync + 'static,
36{
37 Arc::new(MpscQueueAdapter::new(buffer))
38}
39
40/// Create an MPSC queue adapter from existing channels.
41///
42/// This allows integration with existing channel-based architectures.
43///
44/// # Arguments
45///
46/// * `sender` - The sender half of the channel
47/// * `receiver` - The receiver half of the channel
48///
49/// # Examples
50///
51/// ```
52/// use tokio::sync::mpsc;
53/// use quickdid::queue::create_mpsc_queue_from_channel;
54///
55/// let (sender, receiver) = mpsc::channel::<String>(50);
56/// let queue = create_mpsc_queue_from_channel(sender, receiver);
57/// ```
58pub fn create_mpsc_queue_from_channel<T>(
59 sender: mpsc::Sender<T>,
60 receiver: mpsc::Receiver<T>,
61) -> Arc<dyn QueueAdapter<T>>
62where
63 T: Send + Sync + 'static,
64{
65 Arc::new(MpscQueueAdapter::from_channel(sender, receiver))
66}
67
68// ========= Redis Queue Factories =========
69
70/// Create a new Redis-backed queue adapter.
71///
72/// This creates a distributed queue suitable for multi-instance deployments.
73///
74/// # Arguments
75///
76/// * `pool` - Redis connection pool
77/// * `worker_id` - Worker identifier for this queue instance
78/// * `key_prefix` - Redis key prefix for queue operations
79/// * `timeout_seconds` - Timeout for blocking operations
80///
81/// # Examples
82///
83/// ```no_run
84/// use quickdid::queue::{create_redis_queue, HandleResolutionWork};
85/// use deadpool_redis::Config;
86///
87/// # async fn example() -> anyhow::Result<()> {
88/// let cfg = Config::from_url("redis://localhost:6379");
89/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
90///
91/// let queue = create_redis_queue::<HandleResolutionWork>(
92/// pool,
93/// "worker-1".to_string(),
94/// "queue:myapp:".to_string(),
95/// 5,
96/// );
97/// # Ok(())
98/// # }
99/// ```
100pub fn create_redis_queue<T>(
101 pool: RedisPool,
102 worker_id: String,
103 key_prefix: String,
104 timeout_seconds: u64,
105) -> Arc<dyn QueueAdapter<T>>
106where
107 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static,
108{
109 Arc::new(RedisQueueAdapter::new(
110 pool,
111 worker_id,
112 key_prefix,
113 timeout_seconds,
114 ))
115}
116
117/// Create a new Redis-backed queue adapter with deduplication.
118///
119/// This creates a distributed queue with deduplication to prevent duplicate items
120/// from being queued within the specified TTL window.
121///
122/// # Arguments
123///
124/// * `pool` - Redis connection pool
125/// * `worker_id` - Worker identifier for this queue instance
126/// * `key_prefix` - Redis key prefix for queue operations
127/// * `timeout_seconds` - Timeout for blocking operations
128/// * `dedup_enabled` - Whether to enable deduplication
129/// * `dedup_ttl` - TTL for deduplication keys in seconds
130///
131/// # Examples
132///
133/// ```no_run
134/// use quickdid::queue::{create_redis_queue_with_dedup, HandleResolutionWork};
135/// use deadpool_redis::Config;
136///
137/// # async fn example() -> anyhow::Result<()> {
138/// let cfg = Config::from_url("redis://localhost:6379");
139/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?;
140///
141/// let queue = create_redis_queue_with_dedup::<HandleResolutionWork>(
142/// pool,
143/// "worker-1".to_string(),
144/// "queue:myapp:".to_string(),
145/// 5,
146/// true, // Enable deduplication
147/// 60, // 60 second dedup window
148/// );
149/// # Ok(())
150/// # }
151/// ```
152pub fn create_redis_queue_with_dedup<T>(
153 pool: RedisPool,
154 worker_id: String,
155 key_prefix: String,
156 timeout_seconds: u64,
157 dedup_enabled: bool,
158 dedup_ttl: u64,
159) -> Arc<dyn QueueAdapter<T>>
160where
161 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static,
162{
163 Arc::new(RedisQueueAdapter::with_dedup(
164 pool,
165 worker_id,
166 key_prefix,
167 timeout_seconds,
168 dedup_enabled,
169 dedup_ttl,
170 ))
171}
172
173// ========= SQLite Queue Factories =========
174
175/// Create a new SQLite queue adapter with unlimited queue size.
176///
177/// This creates a persistent queue backed by SQLite database suitable
178/// for single-instance deployments that need persistence across restarts.
179/// The queue has no size limit and may grow unbounded.
180///
181/// # Arguments
182///
183/// * `pool` - SQLite connection pool
184///
185/// # Examples
186///
187/// ```no_run
188/// use quickdid::queue::{create_sqlite_queue, HandleResolutionWork};
189/// use quickdid::sqlite_schema::create_sqlite_pool;
190///
191/// # async fn example() -> anyhow::Result<()> {
192/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?;
193/// let queue = create_sqlite_queue::<HandleResolutionWork>(pool);
194/// # Ok(())
195/// # }
196/// ```
197pub fn create_sqlite_queue<T>(pool: sqlx::SqlitePool) -> Arc<dyn QueueAdapter<T>>
198where
199 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
200{
201 Arc::new(SqliteQueueAdapter::new(pool))
202}
203
204/// Create a new SQLite queue adapter with work shedding.
205///
206/// This creates a persistent queue with configurable maximum size.
207/// When the queue exceeds `max_size`, the oldest entries are automatically
208/// deleted to maintain the limit, preserving the most recent work items.
209///
210/// # Arguments
211///
212/// * `pool` - SQLite connection pool
213/// * `max_size` - Maximum number of entries (0 = unlimited)
214///
215/// # Work Shedding Behavior
216///
217/// - New work items are always accepted
218/// - When queue size exceeds `max_size`, oldest entries are deleted
219/// - Deletion happens atomically with insertion in a single transaction
220/// - Essential for long-running deployments to prevent disk space issues
221///
222/// # Examples
223///
224/// ```no_run
225/// use quickdid::queue::{create_sqlite_queue_with_max_size, HandleResolutionWork};
226/// use quickdid::sqlite_schema::create_sqlite_pool;
227///
228/// # async fn example() -> anyhow::Result<()> {
229/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?;
230/// // Limit queue to 10,000 entries with automatic work shedding
231/// let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 10000);
232/// # Ok(())
233/// # }
234/// ```
235pub fn create_sqlite_queue_with_max_size<T>(
236 pool: sqlx::SqlitePool,
237 max_size: u64,
238) -> Arc<dyn QueueAdapter<T>>
239where
240 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
241{
242 Arc::new(SqliteQueueAdapter::with_max_size(pool, max_size))
243}
244
245// ========= No-op Queue Factory =========
246
247/// Create a no-operation queue adapter.
248///
249/// This creates a queue that discards all work items, useful for testing
250/// or when queue processing is disabled.
251///
252/// # Examples
253///
254/// ```
255/// use quickdid::queue::create_noop_queue;
256///
257/// let queue = create_noop_queue::<String>();
258/// ```
259pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>>
260where
261 T: Send + Sync + 'static,
262{
263 Arc::new(NoopQueueAdapter::new())
264}
265
266#[cfg(test)]
267mod tests {
268 use super::*;
269 use crate::queue::HandleResolutionWork;
270
271 #[tokio::test]
272 async fn test_create_mpsc_queue() {
273 let queue = create_mpsc_queue::<String>(10);
274
275 queue.push("test".to_string()).await.unwrap();
276 let item = queue.pull().await;
277 assert_eq!(item, Some("test".to_string()));
278 }
279
280 #[tokio::test]
281 async fn test_create_mpsc_queue_from_channel() {
282 let (sender, receiver) = mpsc::channel(5);
283 let queue = create_mpsc_queue_from_channel(sender.clone(), receiver);
284
285 // Send via original sender
286 sender.send("external".to_string()).await.unwrap();
287
288 // Receive via queue
289 let item = queue.pull().await;
290 assert_eq!(item, Some("external".to_string()));
291 }
292
293 #[tokio::test]
294 async fn test_create_noop_queue() {
295 let queue = create_noop_queue::<String>();
296
297 // Should accept pushes
298 queue.push("ignored".to_string()).await.unwrap();
299
300 // Should report as healthy
301 assert!(queue.is_healthy().await);
302
303 // Should report depth as 0
304 assert_eq!(queue.depth().await, Some(0));
305 }
306
307 #[tokio::test]
308 async fn test_create_sqlite_queue() {
309 // Create in-memory SQLite database for testing
310 let pool = sqlx::SqlitePool::connect("sqlite::memory:")
311 .await
312 .expect("Failed to connect to in-memory SQLite");
313
314 // Create the queue schema
315 crate::sqlite_schema::create_schema(&pool)
316 .await
317 .expect("Failed to create schema");
318
319 let queue = create_sqlite_queue::<HandleResolutionWork>(pool);
320
321 let work = HandleResolutionWork::new("test.example.com".to_string());
322 queue.push(work.clone()).await.unwrap();
323
324 let pulled = queue.pull().await;
325 assert_eq!(pulled, Some(work));
326 }
327
328 #[tokio::test]
329 async fn test_create_sqlite_queue_with_max_size() {
330 // Create in-memory SQLite database for testing
331 let pool = sqlx::SqlitePool::connect("sqlite::memory:")
332 .await
333 .expect("Failed to connect to in-memory SQLite");
334
335 // Create the queue schema
336 crate::sqlite_schema::create_schema(&pool)
337 .await
338 .expect("Failed to create schema");
339
340 // Create queue with small max size
341 let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 5);
342
343 // Push items
344 for i in 0..10 {
345 let work = HandleResolutionWork::new(format!("test-{}.example.com", i));
346 queue.push(work).await.unwrap();
347 }
348
349 // Should have limited items due to work shedding
350 let depth = queue.depth().await.unwrap();
351 assert!(
352 depth <= 5,
353 "Queue should have at most 5 items after work shedding"
354 );
355 }
356
357 #[tokio::test]
358 async fn test_create_redis_queue() {
359 let pool = match crate::test_helpers::get_test_redis_pool() {
360 Some(p) => p,
361 None => {
362 eprintln!("Skipping Redis test - no Redis connection available");
363 return;
364 }
365 };
366
367 let test_prefix = format!(
368 "test:factory:{}:",
369 std::time::SystemTime::now()
370 .duration_since(std::time::UNIX_EPOCH)
371 .unwrap()
372 .as_nanos()
373 );
374
375 let queue = create_redis_queue::<String>(pool, "test-worker".to_string(), test_prefix, 1);
376
377 queue.push("test-item".to_string()).await.unwrap();
378 let pulled = queue.pull().await;
379 assert_eq!(pulled, Some("test-item".to_string()));
380 }
381}