QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 11 kB view raw
1//! Factory functions for creating queue adapters. 2//! 3//! This module provides convenient factory functions for creating different 4//! types of queue adapters with appropriate configurations. 5 6use deadpool_redis::Pool as RedisPool; 7use serde::{Deserialize, Serialize}; 8use std::sync::Arc; 9use tokio::sync::mpsc; 10 11use super::{ 12 adapter::QueueAdapter, mpsc::MpscQueueAdapter, noop::NoopQueueAdapter, 13 redis::RedisQueueAdapter, sqlite::SqliteQueueAdapter, work::DedupKey, 14}; 15 16// ========= MPSC Queue Factories ========= 17 18/// Create a new MPSC queue adapter with the specified buffer size. 19/// 20/// This creates an in-memory queue suitable for single-instance deployments. 21/// 22/// # Arguments 23/// 24/// * `buffer` - The buffer size for the channel 25/// 26/// # Examples 27/// 28/// ``` 29/// use quickdid::queue::create_mpsc_queue; 30/// 31/// let queue = create_mpsc_queue::<String>(100); 32/// ``` 33pub fn create_mpsc_queue<T>(buffer: usize) -> Arc<dyn QueueAdapter<T>> 34where 35 T: Send + Sync + 'static, 36{ 37 Arc::new(MpscQueueAdapter::new(buffer)) 38} 39 40/// Create an MPSC queue adapter from existing channels. 41/// 42/// This allows integration with existing channel-based architectures. 43/// 44/// # Arguments 45/// 46/// * `sender` - The sender half of the channel 47/// * `receiver` - The receiver half of the channel 48/// 49/// # Examples 50/// 51/// ``` 52/// use tokio::sync::mpsc; 53/// use quickdid::queue::create_mpsc_queue_from_channel; 54/// 55/// let (sender, receiver) = mpsc::channel::<String>(50); 56/// let queue = create_mpsc_queue_from_channel(sender, receiver); 57/// ``` 58pub fn create_mpsc_queue_from_channel<T>( 59 sender: mpsc::Sender<T>, 60 receiver: mpsc::Receiver<T>, 61) -> Arc<dyn QueueAdapter<T>> 62where 63 T: Send + Sync + 'static, 64{ 65 Arc::new(MpscQueueAdapter::from_channel(sender, receiver)) 66} 67 68// ========= Redis Queue Factories ========= 69 70/// Create a new Redis-backed queue adapter. 71/// 72/// This creates a distributed queue suitable for multi-instance deployments. 73/// 74/// # Arguments 75/// 76/// * `pool` - Redis connection pool 77/// * `worker_id` - Worker identifier for this queue instance 78/// * `key_prefix` - Redis key prefix for queue operations 79/// * `timeout_seconds` - Timeout for blocking operations 80/// 81/// # Examples 82/// 83/// ```no_run 84/// use quickdid::queue::{create_redis_queue, HandleResolutionWork}; 85/// use deadpool_redis::Config; 86/// 87/// # async fn example() -> anyhow::Result<()> { 88/// let cfg = Config::from_url("redis://localhost:6379"); 89/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; 90/// 91/// let queue = create_redis_queue::<HandleResolutionWork>( 92/// pool, 93/// "worker-1".to_string(), 94/// "queue:myapp:".to_string(), 95/// 5, 96/// ); 97/// # Ok(()) 98/// # } 99/// ``` 100pub fn create_redis_queue<T>( 101 pool: RedisPool, 102 worker_id: String, 103 key_prefix: String, 104 timeout_seconds: u64, 105) -> Arc<dyn QueueAdapter<T>> 106where 107 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static, 108{ 109 Arc::new(RedisQueueAdapter::new( 110 pool, 111 worker_id, 112 key_prefix, 113 timeout_seconds, 114 )) 115} 116 117/// Create a new Redis-backed queue adapter with deduplication. 118/// 119/// This creates a distributed queue with deduplication to prevent duplicate items 120/// from being queued within the specified TTL window. 121/// 122/// # Arguments 123/// 124/// * `pool` - Redis connection pool 125/// * `worker_id` - Worker identifier for this queue instance 126/// * `key_prefix` - Redis key prefix for queue operations 127/// * `timeout_seconds` - Timeout for blocking operations 128/// * `dedup_enabled` - Whether to enable deduplication 129/// * `dedup_ttl` - TTL for deduplication keys in seconds 130/// 131/// # Examples 132/// 133/// ```no_run 134/// use quickdid::queue::{create_redis_queue_with_dedup, HandleResolutionWork}; 135/// use deadpool_redis::Config; 136/// 137/// # async fn example() -> anyhow::Result<()> { 138/// let cfg = Config::from_url("redis://localhost:6379"); 139/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; 140/// 141/// let queue = create_redis_queue_with_dedup::<HandleResolutionWork>( 142/// pool, 143/// "worker-1".to_string(), 144/// "queue:myapp:".to_string(), 145/// 5, 146/// true, // Enable deduplication 147/// 60, // 60 second dedup window 148/// ); 149/// # Ok(()) 150/// # } 151/// ``` 152pub fn create_redis_queue_with_dedup<T>( 153 pool: RedisPool, 154 worker_id: String, 155 key_prefix: String, 156 timeout_seconds: u64, 157 dedup_enabled: bool, 158 dedup_ttl: u64, 159) -> Arc<dyn QueueAdapter<T>> 160where 161 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static, 162{ 163 Arc::new(RedisQueueAdapter::with_dedup( 164 pool, 165 worker_id, 166 key_prefix, 167 timeout_seconds, 168 dedup_enabled, 169 dedup_ttl, 170 )) 171} 172 173// ========= SQLite Queue Factories ========= 174 175/// Create a new SQLite queue adapter with unlimited queue size. 176/// 177/// This creates a persistent queue backed by SQLite database suitable 178/// for single-instance deployments that need persistence across restarts. 179/// The queue has no size limit and may grow unbounded. 180/// 181/// # Arguments 182/// 183/// * `pool` - SQLite connection pool 184/// 185/// # Examples 186/// 187/// ```no_run 188/// use quickdid::queue::{create_sqlite_queue, HandleResolutionWork}; 189/// use quickdid::sqlite_schema::create_sqlite_pool; 190/// 191/// # async fn example() -> anyhow::Result<()> { 192/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 193/// let queue = create_sqlite_queue::<HandleResolutionWork>(pool); 194/// # Ok(()) 195/// # } 196/// ``` 197pub fn create_sqlite_queue<T>(pool: sqlx::SqlitePool) -> Arc<dyn QueueAdapter<T>> 198where 199 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 200{ 201 Arc::new(SqliteQueueAdapter::new(pool)) 202} 203 204/// Create a new SQLite queue adapter with work shedding. 205/// 206/// This creates a persistent queue with configurable maximum size. 207/// When the queue exceeds `max_size`, the oldest entries are automatically 208/// deleted to maintain the limit, preserving the most recent work items. 209/// 210/// # Arguments 211/// 212/// * `pool` - SQLite connection pool 213/// * `max_size` - Maximum number of entries (0 = unlimited) 214/// 215/// # Work Shedding Behavior 216/// 217/// - New work items are always accepted 218/// - When queue size exceeds `max_size`, oldest entries are deleted 219/// - Deletion happens atomically with insertion in a single transaction 220/// - Essential for long-running deployments to prevent disk space issues 221/// 222/// # Examples 223/// 224/// ```no_run 225/// use quickdid::queue::{create_sqlite_queue_with_max_size, HandleResolutionWork}; 226/// use quickdid::sqlite_schema::create_sqlite_pool; 227/// 228/// # async fn example() -> anyhow::Result<()> { 229/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 230/// // Limit queue to 10,000 entries with automatic work shedding 231/// let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 10000); 232/// # Ok(()) 233/// # } 234/// ``` 235pub fn create_sqlite_queue_with_max_size<T>( 236 pool: sqlx::SqlitePool, 237 max_size: u64, 238) -> Arc<dyn QueueAdapter<T>> 239where 240 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 241{ 242 Arc::new(SqliteQueueAdapter::with_max_size(pool, max_size)) 243} 244 245// ========= No-op Queue Factory ========= 246 247/// Create a no-operation queue adapter. 248/// 249/// This creates a queue that discards all work items, useful for testing 250/// or when queue processing is disabled. 251/// 252/// # Examples 253/// 254/// ``` 255/// use quickdid::queue::create_noop_queue; 256/// 257/// let queue = create_noop_queue::<String>(); 258/// ``` 259pub fn create_noop_queue<T>() -> Arc<dyn QueueAdapter<T>> 260where 261 T: Send + Sync + 'static, 262{ 263 Arc::new(NoopQueueAdapter::new()) 264} 265 266#[cfg(test)] 267mod tests { 268 use super::*; 269 use crate::queue::HandleResolutionWork; 270 271 #[tokio::test] 272 async fn test_create_mpsc_queue() { 273 let queue = create_mpsc_queue::<String>(10); 274 275 queue.push("test".to_string()).await.unwrap(); 276 let item = queue.pull().await; 277 assert_eq!(item, Some("test".to_string())); 278 } 279 280 #[tokio::test] 281 async fn test_create_mpsc_queue_from_channel() { 282 let (sender, receiver) = mpsc::channel(5); 283 let queue = create_mpsc_queue_from_channel(sender.clone(), receiver); 284 285 // Send via original sender 286 sender.send("external".to_string()).await.unwrap(); 287 288 // Receive via queue 289 let item = queue.pull().await; 290 assert_eq!(item, Some("external".to_string())); 291 } 292 293 #[tokio::test] 294 async fn test_create_noop_queue() { 295 let queue = create_noop_queue::<String>(); 296 297 // Should accept pushes 298 queue.push("ignored".to_string()).await.unwrap(); 299 300 // Should report as healthy 301 assert!(queue.is_healthy().await); 302 303 // Should report depth as 0 304 assert_eq!(queue.depth().await, Some(0)); 305 } 306 307 #[tokio::test] 308 async fn test_create_sqlite_queue() { 309 // Create in-memory SQLite database for testing 310 let pool = sqlx::SqlitePool::connect("sqlite::memory:") 311 .await 312 .expect("Failed to connect to in-memory SQLite"); 313 314 // Create the queue schema 315 crate::sqlite_schema::create_schema(&pool) 316 .await 317 .expect("Failed to create schema"); 318 319 let queue = create_sqlite_queue::<HandleResolutionWork>(pool); 320 321 let work = HandleResolutionWork::new("test.example.com".to_string()); 322 queue.push(work.clone()).await.unwrap(); 323 324 let pulled = queue.pull().await; 325 assert_eq!(pulled, Some(work)); 326 } 327 328 #[tokio::test] 329 async fn test_create_sqlite_queue_with_max_size() { 330 // Create in-memory SQLite database for testing 331 let pool = sqlx::SqlitePool::connect("sqlite::memory:") 332 .await 333 .expect("Failed to connect to in-memory SQLite"); 334 335 // Create the queue schema 336 crate::sqlite_schema::create_schema(&pool) 337 .await 338 .expect("Failed to create schema"); 339 340 // Create queue with small max size 341 let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 5); 342 343 // Push items 344 for i in 0..10 { 345 let work = HandleResolutionWork::new(format!("test-{}.example.com", i)); 346 queue.push(work).await.unwrap(); 347 } 348 349 // Should have limited items due to work shedding 350 let depth = queue.depth().await.unwrap(); 351 assert!( 352 depth <= 5, 353 "Queue should have at most 5 items after work shedding" 354 ); 355 } 356 357 #[tokio::test] 358 async fn test_create_redis_queue() { 359 let pool = match crate::test_helpers::get_test_redis_pool() { 360 Some(p) => p, 361 None => { 362 eprintln!("Skipping Redis test - no Redis connection available"); 363 return; 364 } 365 }; 366 367 let test_prefix = format!( 368 "test:factory:{}:", 369 std::time::SystemTime::now() 370 .duration_since(std::time::UNIX_EPOCH) 371 .unwrap() 372 .as_nanos() 373 ); 374 375 let queue = create_redis_queue::<String>(pool, "test-worker".to_string(), test_prefix, 1); 376 377 queue.push("test-item".to_string()).await.unwrap(); 378 let pulled = queue.pull().await; 379 assert_eq!(pulled, Some("test-item".to_string())); 380 } 381}