QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 22 kB view raw
1//! Redis-backed queue adapter implementation. 2//! 3//! This module provides a distributed queue implementation using Redis lists 4//! with a reliable queue pattern for at-least-once delivery semantics. 5 6use async_trait::async_trait; 7use deadpool_redis::{Pool as RedisPool, redis::AsyncCommands}; 8use serde::{Deserialize, Serialize}; 9use tracing::{debug, error, warn}; 10 11use super::adapter::QueueAdapter; 12use super::error::{QueueError, Result}; 13use super::work::DedupKey; 14 15/// Redis-backed queue adapter implementation. 16/// 17/// This adapter uses Redis lists with a reliable queue pattern: 18/// - LPUSH to push items to the primary queue 19/// - BRPOPLPUSH to atomically move items from primary to worker queue 20/// - LREM to acknowledge processed items from worker queue 21/// 22/// This ensures at-least-once delivery semantics and allows for recovery 23/// of in-flight items if a worker crashes. 24/// 25/// # Features 26/// 27/// - Distributed operation across multiple instances 28/// - Persistent storage with Redis 29/// - At-least-once delivery guarantees 30/// - Automatic recovery of failed items 31/// - Configurable timeouts 32/// 33/// # Architecture 34/// 35/// ```text 36/// Producer -> [Primary Queue] -> BRPOPLPUSH -> [Worker Queue] -> Consumer 37/// | 38/// LREM (on ack) 39/// ``` 40/// 41/// # Examples 42/// 43/// ```no_run 44/// use quickdid::queue::{RedisQueueAdapter, QueueAdapter, HandleResolutionWork}; 45/// use deadpool_redis::Config; 46/// 47/// # async fn example() -> anyhow::Result<()> { 48/// // Create Redis pool 49/// let cfg = Config::from_url("redis://localhost:6379"); 50/// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; 51/// 52/// // Create queue adapter 53/// let queue = RedisQueueAdapter::<HandleResolutionWork>::new( 54/// pool, 55/// "worker-1".to_string(), 56/// "queue:myapp:".to_string(), 57/// 5, // 5 second timeout 58/// ); 59/// 60/// // Use the queue 61/// let work = HandleResolutionWork::new("alice.bsky.social".to_string()); 62/// queue.push(work.clone()).await?; 63/// if let Some(item) = queue.pull().await { 64/// // Process item 65/// queue.ack(&item).await?; 66/// } 67/// # Ok(()) 68/// # } 69/// ``` 70pub struct RedisQueueAdapter<T> 71where 72 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 73{ 74 /// Redis connection pool 75 pool: RedisPool, 76 /// Unique worker ID for this adapter instance 77 worker_id: String, 78 /// Key prefix for all queues (default: "queue:handleresolver:") 79 key_prefix: String, 80 /// Timeout for blocking RPOPLPUSH operations (in seconds) 81 timeout_seconds: u64, 82 /// Enable deduplication to prevent duplicate items in queue 83 dedup_enabled: bool, 84 /// TTL for deduplication keys in seconds 85 dedup_ttl: u64, 86 /// Type marker for generic parameter 87 _phantom: std::marker::PhantomData<T>, 88} 89 90impl<T> RedisQueueAdapter<T> 91where 92 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 93{ 94 /// Create a new Redis queue adapter. 95 /// 96 /// # Arguments 97 /// 98 /// * `pool` - Redis connection pool 99 /// * `worker_id` - Unique identifier for this worker instance 100 /// * `key_prefix` - Redis key prefix for queue operations 101 /// * `timeout_seconds` - Timeout for blocking pull operations 102 /// 103 /// # Examples 104 /// 105 /// ```no_run 106 /// use quickdid::queue::RedisQueueAdapter; 107 /// use deadpool_redis::Config; 108 /// 109 /// # async fn example() -> anyhow::Result<()> { 110 /// let cfg = Config::from_url("redis://localhost:6379"); 111 /// let pool = cfg.create_pool(Some(deadpool_redis::Runtime::Tokio1))?; 112 /// 113 /// let queue = RedisQueueAdapter::<String>::new( 114 /// pool, 115 /// "worker-1".to_string(), 116 /// "queue:myapp:".to_string(), 117 /// 5, 118 /// ); 119 /// # Ok(()) 120 /// # } 121 /// ``` 122 pub fn new( 123 pool: RedisPool, 124 worker_id: String, 125 key_prefix: String, 126 timeout_seconds: u64, 127 ) -> Self { 128 Self::with_dedup( 129 pool, 130 worker_id, 131 key_prefix, 132 timeout_seconds, 133 false, 134 60, // Default TTL of 60 seconds 135 ) 136 } 137 138 /// Create a new Redis queue adapter with deduplication settings. 139 /// 140 /// # Arguments 141 /// 142 /// * `pool` - Redis connection pool 143 /// * `worker_id` - Unique identifier for this worker instance 144 /// * `key_prefix` - Redis key prefix for queue operations 145 /// * `timeout_seconds` - Timeout for blocking pull operations 146 /// * `dedup_enabled` - Whether to enable deduplication 147 /// * `dedup_ttl` - TTL for deduplication keys in seconds 148 pub fn with_dedup( 149 pool: RedisPool, 150 worker_id: String, 151 key_prefix: String, 152 timeout_seconds: u64, 153 dedup_enabled: bool, 154 dedup_ttl: u64, 155 ) -> Self { 156 Self { 157 pool, 158 worker_id, 159 key_prefix, 160 timeout_seconds, 161 dedup_enabled, 162 dedup_ttl, 163 _phantom: std::marker::PhantomData, 164 } 165 } 166 167 /// Get the primary queue key. 168 fn primary_queue_key(&self) -> String { 169 format!("{}primary", self.key_prefix) 170 } 171 172 /// Get the worker-specific temporary queue key. 173 fn worker_queue_key(&self) -> String { 174 format!("{}{}", self.key_prefix, self.worker_id) 175 } 176 177 /// Get the deduplication key for an item. 178 /// This key is used to track if an item is already queued. 179 fn dedup_key(&self, item_id: &str) -> String { 180 format!("{}dedup:{}", self.key_prefix, item_id) 181 } 182 183 /// Check and mark an item for deduplication. 184 /// Returns true if the item was successfully marked (not duplicate), 185 /// false if it was already in the deduplication set (duplicate). 186 async fn check_and_mark_dedup( 187 &self, 188 conn: &mut deadpool_redis::Connection, 189 item_id: &str, 190 ) -> Result<bool> { 191 if !self.dedup_enabled { 192 return Ok(true); // Always allow if dedup is disabled 193 } 194 195 let dedup_key = self.dedup_key(item_id); 196 197 // Use SET NX EX to atomically set if not exists with expiry 198 // Returns OK if the key was set, Nil if it already existed 199 let result: Option<String> = deadpool_redis::redis::cmd("SET") 200 .arg(&dedup_key) 201 .arg("1") 202 .arg("NX") // Only set if not exists 203 .arg("EX") // Set expiry 204 .arg(self.dedup_ttl) 205 .query_async(conn) 206 .await 207 .map_err(|e| QueueError::RedisOperationFailed { 208 operation: "SET NX EX".to_string(), 209 details: e.to_string(), 210 })?; 211 212 // If result is Some("OK"), the key was set (not duplicate) 213 // If result is None, the key already existed (duplicate) 214 Ok(result.is_some()) 215 } 216} 217 218#[async_trait] 219impl<T> QueueAdapter<T> for RedisQueueAdapter<T> 220where 221 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + DedupKey + 'static, 222{ 223 async fn pull(&self) -> Option<T> { 224 match self.pool.get().await { 225 Ok(mut conn) => { 226 let primary_key = self.primary_queue_key(); 227 let worker_key = self.worker_queue_key(); 228 229 // Use blocking RPOPLPUSH to atomically move item from primary to worker queue 230 let data: Option<Vec<u8>> = match conn 231 .brpoplpush(&primary_key, &worker_key, self.timeout_seconds as f64) 232 .await 233 { 234 Ok(data) => data, 235 Err(e) => { 236 error!("Failed to pull from queue: {}", e); 237 return None; 238 } 239 }; 240 241 if let Some(data) = data { 242 // Deserialize the item 243 match serde_json::from_slice(&data) { 244 Ok(item) => { 245 debug!( 246 worker_id = %self.worker_id, 247 "Pulled item from queue" 248 ); 249 Some(item) 250 } 251 Err(e) => { 252 error!("Failed to deserialize item: {}", e); 253 // Remove the corrupted item from worker queue 254 let _: std::result::Result<(), _> = 255 conn.lrem(&worker_key, 1, &data).await; 256 None 257 } 258 } 259 } else { 260 None 261 } 262 } 263 Err(e) => { 264 error!("Failed to get Redis connection: {}", e); 265 None 266 } 267 } 268 } 269 270 async fn push(&self, work: T) -> Result<()> { 271 let mut conn = self 272 .pool 273 .get() 274 .await 275 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 276 277 // Check for deduplication if enabled 278 if self.dedup_enabled { 279 let dedup_id = work.dedup_key(); 280 let is_new = self.check_and_mark_dedup(&mut conn, &dedup_id).await?; 281 282 if !is_new { 283 debug!( 284 dedup_key = %dedup_id, 285 "Item already queued, skipping duplicate" 286 ); 287 return Ok(()); // Successfully deduplicated 288 } 289 } 290 291 let data = serde_json::to_vec(&work) 292 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 293 294 let primary_key = self.primary_queue_key(); 295 296 conn.lpush::<_, _, ()>(&primary_key, data) 297 .await 298 .map_err(|e| QueueError::RedisOperationFailed { 299 operation: "LPUSH".to_string(), 300 details: e.to_string(), 301 })?; 302 303 debug!("Pushed item to queue"); 304 Ok(()) 305 } 306 307 async fn ack(&self, item: &T) -> Result<()> { 308 let mut conn = self 309 .pool 310 .get() 311 .await 312 .map_err(|e| QueueError::RedisConnectionFailed(e.to_string()))?; 313 314 let data = 315 serde_json::to_vec(item).map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 316 317 let worker_key = self.worker_queue_key(); 318 319 // Remove exactly one occurrence of this item from the worker queue 320 let removed: i32 = conn.lrem(&worker_key, 1, &data).await.map_err(|e| { 321 QueueError::RedisOperationFailed { 322 operation: "LREM".to_string(), 323 details: e.to_string(), 324 } 325 })?; 326 327 if removed == 0 { 328 warn!( 329 worker_id = %self.worker_id, 330 "Item not found in worker queue during acknowledgment" 331 ); 332 } else { 333 debug!( 334 worker_id = %self.worker_id, 335 "Acknowledged item" 336 ); 337 } 338 339 Ok(()) 340 } 341 342 async fn depth(&self) -> Option<usize> { 343 match self.pool.get().await { 344 Ok(mut conn) => { 345 let primary_key = self.primary_queue_key(); 346 match conn.llen::<_, usize>(&primary_key).await { 347 Ok(len) => Some(len), 348 Err(e) => { 349 error!("Failed to get queue depth: {}", e); 350 None 351 } 352 } 353 } 354 Err(e) => { 355 error!("Failed to get Redis connection: {}", e); 356 None 357 } 358 } 359 } 360 361 async fn is_healthy(&self) -> bool { 362 match self.pool.get().await { 363 Ok(mut conn) => { 364 // Ping Redis to check health 365 match deadpool_redis::redis::cmd("PING") 366 .query_async::<String>(&mut conn) 367 .await 368 { 369 Ok(response) => response == "PONG", 370 Err(_) => false, 371 } 372 } 373 Err(_) => false, 374 } 375 } 376} 377 378#[cfg(test)] 379mod tests { 380 use super::*; 381 382 #[tokio::test] 383 async fn test_redis_queue_push_pull() { 384 let pool = match crate::test_helpers::get_test_redis_pool() { 385 Some(p) => p, 386 None => { 387 eprintln!("Skipping Redis test - no Redis connection available"); 388 return; 389 } 390 }; 391 392 // Create adapter with unique prefix for testing 393 let test_prefix = format!( 394 "test:queue:{}:", 395 std::time::SystemTime::now() 396 .duration_since(std::time::UNIX_EPOCH) 397 .unwrap() 398 .as_nanos() 399 ); 400 let adapter = RedisQueueAdapter::<String>::new( 401 pool.clone(), 402 "test-worker".to_string(), 403 test_prefix.clone(), 404 1, // 1 second timeout for tests 405 ); 406 407 // Test push 408 adapter.push("test-item".to_string()).await.unwrap(); 409 410 // Test pull 411 let pulled = adapter.pull().await; 412 assert_eq!(pulled, Some("test-item".to_string())); 413 414 // Test ack 415 adapter 416 .ack(&"test-item".to_string()) 417 .await 418 .expect("Ack should succeed"); 419 } 420 421 #[tokio::test] 422 async fn test_redis_queue_reliable_delivery() { 423 let pool = match crate::test_helpers::get_test_redis_pool() { 424 Some(p) => p, 425 None => { 426 eprintln!("Skipping Redis test - no Redis connection available"); 427 return; 428 } 429 }; 430 431 let test_prefix = format!( 432 "test:queue:{}:", 433 std::time::SystemTime::now() 434 .duration_since(std::time::UNIX_EPOCH) 435 .unwrap() 436 .as_nanos() 437 ); 438 let worker_id = "test-worker-reliable"; 439 440 // Create adapter 441 let adapter1 = RedisQueueAdapter::<String>::new( 442 pool.clone(), 443 worker_id.to_string(), 444 test_prefix.clone(), 445 1, 446 ); 447 448 // Push multiple items 449 adapter1.push("item1".to_string()).await.unwrap(); 450 adapter1.push("item2".to_string()).await.unwrap(); 451 adapter1.push("item3".to_string()).await.unwrap(); 452 453 // Pull but don't ack (simulating worker crash) 454 let item1 = adapter1.pull().await; 455 assert_eq!(item1, Some("item1".to_string())); 456 457 // Item should be in worker queue 458 // In production, a recovery process would handle unacked items 459 // For this test, we verify the item is in the worker queue 460 let item2 = adapter1.pull().await; 461 assert_eq!(item2, Some("item2".to_string())); 462 463 // Ack the second item 464 adapter1.ack(&"item2".to_string()).await.unwrap(); 465 } 466 467 #[tokio::test] 468 async fn test_redis_queue_depth() { 469 let pool = match crate::test_helpers::get_test_redis_pool() { 470 Some(p) => p, 471 None => { 472 eprintln!("Skipping Redis test - no Redis connection available"); 473 return; 474 } 475 }; 476 477 let test_prefix = format!( 478 "test:queue:{}:", 479 std::time::SystemTime::now() 480 .duration_since(std::time::UNIX_EPOCH) 481 .unwrap() 482 .as_nanos() 483 ); 484 let adapter = 485 RedisQueueAdapter::<String>::new(pool, "test-worker-depth".to_string(), test_prefix, 1); 486 487 // Initially empty 488 assert_eq!(adapter.depth().await, Some(0)); 489 490 // Push items and check depth 491 adapter.push("item1".to_string()).await.unwrap(); 492 assert_eq!(adapter.depth().await, Some(1)); 493 494 adapter.push("item2".to_string()).await.unwrap(); 495 assert_eq!(adapter.depth().await, Some(2)); 496 497 // Pull and check depth (note: depth checks primary queue) 498 let _ = adapter.pull().await; 499 assert_eq!(adapter.depth().await, Some(1)); 500 } 501 502 #[tokio::test] 503 async fn test_redis_queue_health() { 504 let pool = match crate::test_helpers::get_test_redis_pool() { 505 Some(p) => p, 506 None => { 507 eprintln!("Skipping Redis test - no Redis connection available"); 508 return; 509 } 510 }; 511 512 let adapter = RedisQueueAdapter::<String>::new( 513 pool, 514 "test-worker-health".to_string(), 515 "test:queue:health:".to_string(), 516 1, 517 ); 518 519 // Should be healthy if Redis is running 520 assert!(adapter.is_healthy().await); 521 } 522 523 #[tokio::test] 524 async fn test_redis_queue_deduplication() { 525 use crate::queue::HandleResolutionWork; 526 527 let pool = match crate::test_helpers::get_test_redis_pool() { 528 Some(p) => p, 529 None => { 530 eprintln!("Skipping Redis test - no Redis connection available"); 531 return; 532 } 533 }; 534 535 let test_prefix = format!( 536 "test:queue:dedup:{}:", 537 std::time::SystemTime::now() 538 .duration_since(std::time::UNIX_EPOCH) 539 .unwrap() 540 .as_nanos() 541 ); 542 543 // Create adapter with deduplication enabled 544 let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup( 545 pool.clone(), 546 "test-worker-dedup".to_string(), 547 test_prefix.clone(), 548 1, 549 true, // Enable deduplication 550 2, // 2 second TTL for quick testing 551 ); 552 553 let work = HandleResolutionWork::new("alice.example.com".to_string()); 554 555 // First push should succeed 556 adapter 557 .push(work.clone()) 558 .await 559 .expect("First push should succeed"); 560 561 // Second push of same item should be deduplicated (but still return Ok) 562 adapter 563 .push(work.clone()) 564 .await 565 .expect("Second push should succeed (deduplicated)"); 566 567 // Queue should only have one item 568 let depth = adapter.depth().await; 569 assert_eq!( 570 depth, 571 Some(1), 572 "Queue should only have one item after deduplication" 573 ); 574 575 // Pull the item 576 let pulled = adapter.pull().await; 577 assert_eq!(pulled, Some(work.clone())); 578 579 // Queue should now be empty 580 let depth = adapter.depth().await; 581 assert_eq!(depth, Some(0), "Queue should be empty after pulling"); 582 583 // Wait for dedup TTL to expire 584 tokio::time::sleep(tokio::time::Duration::from_secs(3)).await; 585 586 // Should be able to push again after TTL expires 587 adapter 588 .push(work.clone()) 589 .await 590 .expect("Push after TTL expiry should succeed"); 591 592 let depth = adapter.depth().await; 593 assert_eq!( 594 depth, 595 Some(1), 596 "Queue should have one item after TTL expiry" 597 ); 598 } 599 600 #[tokio::test] 601 async fn test_redis_queue_deduplication_disabled() { 602 use crate::queue::HandleResolutionWork; 603 604 let pool = match crate::test_helpers::get_test_redis_pool() { 605 Some(p) => p, 606 None => { 607 eprintln!("Skipping Redis test - no Redis connection available"); 608 return; 609 } 610 }; 611 612 let test_prefix = format!( 613 "test:queue:nodedup:{}:", 614 std::time::SystemTime::now() 615 .duration_since(std::time::UNIX_EPOCH) 616 .unwrap() 617 .as_nanos() 618 ); 619 620 // Create adapter with deduplication disabled 621 let adapter = RedisQueueAdapter::<HandleResolutionWork>::with_dedup( 622 pool.clone(), 623 "test-worker-nodedup".to_string(), 624 test_prefix.clone(), 625 1, 626 false, // Disable deduplication 627 60, 628 ); 629 630 let work = HandleResolutionWork::new("bob.example.com".to_string()); 631 632 // Push same item twice 633 adapter 634 .push(work.clone()) 635 .await 636 .expect("First push should succeed"); 637 adapter 638 .push(work.clone()) 639 .await 640 .expect("Second push should succeed"); 641 642 // Queue should have two items (no deduplication) 643 let depth = adapter.depth().await; 644 assert_eq!( 645 depth, 646 Some(2), 647 "Queue should have two items when deduplication is disabled" 648 ); 649 650 // Pull both items 651 let pulled1 = adapter.pull().await; 652 assert_eq!(pulled1, Some(work.clone())); 653 654 let pulled2 = adapter.pull().await; 655 assert_eq!(pulled2, Some(work.clone())); 656 657 // Queue should now be empty 658 let depth = adapter.depth().await; 659 assert_eq!( 660 depth, 661 Some(0), 662 "Queue should be empty after pulling all items" 663 ); 664 } 665 666 #[tokio::test] 667 async fn test_redis_queue_serialization() { 668 use crate::queue::HandleResolutionWork; 669 670 let pool = match crate::test_helpers::get_test_redis_pool() { 671 Some(p) => p, 672 None => { 673 eprintln!("Skipping Redis test - no Redis connection available"); 674 return; 675 } 676 }; 677 678 let test_prefix = format!( 679 "test:queue:{}:", 680 std::time::SystemTime::now() 681 .duration_since(std::time::UNIX_EPOCH) 682 .unwrap() 683 .as_nanos() 684 ); 685 let adapter = RedisQueueAdapter::<HandleResolutionWork>::new( 686 pool, 687 "test-worker-ser".to_string(), 688 test_prefix, 689 1, 690 ); 691 692 let work = HandleResolutionWork::new("alice.example.com".to_string()); 693 694 // Push and pull 695 adapter.push(work.clone()).await.unwrap(); 696 let pulled = adapter.pull().await; 697 assert_eq!(pulled, Some(work.clone())); 698 699 // Ack 700 adapter.ack(&work).await.unwrap(); 701 } 702}