QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 18 kB view raw
1//! SQLite-backed queue adapter implementation. 2//! 3//! This module provides a persistent queue implementation using SQLite 4//! with optional work shedding to prevent unbounded growth. 5 6use async_trait::async_trait; 7use serde::{Deserialize, Serialize}; 8use sqlx::{self, Row}; 9use tracing::{debug, error, info, warn}; 10 11use super::adapter::QueueAdapter; 12use super::error::{QueueError, Result}; 13 14/// SQLite-backed queue adapter implementation. 15/// 16/// This adapter uses SQLite database for persistent queuing of work items. 17/// It's suitable for single-instance deployments that need persistence 18/// across service restarts while remaining lightweight. 19/// 20/// # Features 21/// 22/// - Persistent queuing across service restarts 23/// - Simple FIFO ordering based on insertion time 24/// - Single consumer design (no complex locking needed) 25/// - Simple pull-and-delete semantics 26/// - Optional work shedding to prevent unbounded queue growth 27/// 28/// # Work Shedding 29/// 30/// When `max_size` is configured (> 0), the adapter implements work shedding: 31/// - New work items are always accepted 32/// - When the queue exceeds `max_size`, oldest entries are automatically deleted 33/// - This maintains the most recent work items while preventing unbounded growth 34/// - Essential for long-running deployments to avoid disk space issues 35/// 36/// # Database Schema 37/// 38/// The adapter expects the following table structure: 39/// ```sql 40/// CREATE TABLE handle_resolution_queue ( 41/// id INTEGER PRIMARY KEY AUTOINCREMENT, 42/// work TEXT NOT NULL, 43/// queued_at INTEGER NOT NULL 44/// ); 45/// CREATE INDEX idx_queue_timestamp ON handle_resolution_queue(queued_at); 46/// ``` 47/// 48/// # Examples 49/// 50/// ```no_run 51/// use quickdid::queue::SqliteQueueAdapter; 52/// use quickdid::queue::QueueAdapter; 53/// use quickdid::sqlite_schema::create_sqlite_pool; 54/// 55/// # async fn example() -> anyhow::Result<()> { 56/// // Create SQLite pool 57/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 58/// 59/// // Create queue with unlimited size 60/// let queue = SqliteQueueAdapter::<String>::new(pool.clone()); 61/// 62/// // Or create queue with work shedding (max 10,000 items) 63/// let bounded_queue = SqliteQueueAdapter::<String>::with_max_size(pool, 10000); 64/// 65/// // Use the queue 66/// queue.push("work-item".to_string()).await?; 67/// if let Some(item) = queue.pull().await { 68/// // Process item (automatically deleted from queue) 69/// println!("Processing: {}", item); 70/// } 71/// # Ok(()) 72/// # } 73/// ``` 74pub struct SqliteQueueAdapter<T> 75where 76 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 77{ 78 /// SQLite connection pool 79 pool: sqlx::SqlitePool, 80 /// Maximum queue size (0 = unlimited) 81 /// When exceeded, oldest entries are deleted to maintain this limit 82 max_size: u64, 83 /// Type marker for generic parameter 84 _phantom: std::marker::PhantomData<T>, 85} 86 87impl<T> SqliteQueueAdapter<T> 88where 89 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 90{ 91 /// Create a new SQLite queue adapter with unlimited queue size. 92 /// 93 /// # Arguments 94 /// 95 /// * `pool` - SQLite connection pool 96 /// 97 /// # Examples 98 /// 99 /// ```no_run 100 /// use quickdid::queue::SqliteQueueAdapter; 101 /// use quickdid::sqlite_schema::create_sqlite_pool; 102 /// 103 /// # async fn example() -> anyhow::Result<()> { 104 /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 105 /// let queue = SqliteQueueAdapter::<String>::new(pool); 106 /// # Ok(()) 107 /// # } 108 /// ``` 109 pub fn new(pool: sqlx::SqlitePool) -> Self { 110 Self::with_max_size(pool, 0) 111 } 112 113 /// Create a new SQLite queue adapter with specified maximum queue size. 114 /// 115 /// # Arguments 116 /// 117 /// * `pool` - SQLite connection pool 118 /// * `max_size` - Maximum number of entries in queue (0 = unlimited) 119 /// 120 /// # Work Shedding Behavior 121 /// 122 /// When `max_size` > 0: 123 /// - New work items are always accepted 124 /// - If queue size exceeds `max_size` after insertion, oldest entries are deleted 125 /// - This preserves the most recent work while preventing unbounded growth 126 /// 127 /// # Examples 128 /// 129 /// ```no_run 130 /// use quickdid::queue::SqliteQueueAdapter; 131 /// use quickdid::sqlite_schema::create_sqlite_pool; 132 /// 133 /// # async fn example() -> anyhow::Result<()> { 134 /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 135 /// // Limit queue to 10,000 entries with automatic work shedding 136 /// let queue = SqliteQueueAdapter::<String>::with_max_size(pool, 10000); 137 /// # Ok(()) 138 /// # } 139 /// ``` 140 pub fn with_max_size(pool: sqlx::SqlitePool, max_size: u64) -> Self { 141 Self { 142 pool, 143 max_size, 144 _phantom: std::marker::PhantomData, 145 } 146 } 147} 148 149#[async_trait] 150impl<T> QueueAdapter<T> for SqliteQueueAdapter<T> 151where 152 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, 153{ 154 async fn pull(&self) -> Option<T> { 155 // Get the oldest queued item and delete it in a transaction 156 let mut transaction = match self.pool.begin().await { 157 Ok(tx) => tx, 158 Err(e) => { 159 error!("Failed to start SQLite transaction: {}", e); 160 return None; 161 } 162 }; 163 164 // Select the oldest queued item 165 let record = match sqlx::query( 166 "SELECT id, work FROM handle_resolution_queue 167 ORDER BY queued_at ASC 168 LIMIT 1", 169 ) 170 .fetch_optional(&mut *transaction) 171 .await 172 { 173 Ok(Some(row)) => row, 174 Ok(None) => { 175 // No queued items available 176 debug!("No queued items available in SQLite queue"); 177 return None; 178 } 179 Err(e) => { 180 error!("Failed to query SQLite queue: {}", e); 181 return None; 182 } 183 }; 184 185 let item_id: i64 = record.get("id"); 186 let work_json: String = record.get("work"); 187 188 // Delete the item from the queue 189 if let Err(e) = sqlx::query("DELETE FROM handle_resolution_queue WHERE id = ?1") 190 .bind(item_id) 191 .execute(&mut *transaction) 192 .await 193 { 194 error!("Failed to delete item from queue: {}", e); 195 return None; 196 } 197 198 // Commit the transaction 199 if let Err(e) = transaction.commit().await { 200 error!("Failed to commit SQLite transaction: {}", e); 201 return None; 202 } 203 204 // Deserialize the work item from JSON 205 match serde_json::from_str(&work_json) { 206 Ok(work) => { 207 debug!("Pulled work item from SQLite queue"); 208 Some(work) 209 } 210 Err(e) => { 211 error!("Failed to deserialize work item: {}", e); 212 None 213 } 214 } 215 } 216 217 async fn push(&self, work: T) -> Result<()> { 218 // Serialize the entire work item as JSON 219 let work_json = serde_json::to_string(&work) 220 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; 221 222 let current_timestamp = std::time::SystemTime::now() 223 .duration_since(std::time::UNIX_EPOCH) 224 .unwrap_or_default() 225 .as_secs() as i64; 226 227 // Optimized approach: Insert first, then check if cleanup needed 228 // This avoids counting on every insert 229 sqlx::query("INSERT INTO handle_resolution_queue (work, queued_at) VALUES (?1, ?2)") 230 .bind(&work_json) 231 .bind(current_timestamp) 232 .execute(&self.pool) 233 .await 234 .map_err(|e| QueueError::PushFailed(format!("Failed to insert work item: {}", e)))?; 235 236 // Implement optimized work shedding if max_size is configured 237 if self.max_size > 0 { 238 // Optimized approach: Only check and clean periodically or when likely over limit 239 // Use a limited count to avoid full table scan 240 let check_limit = self.max_size as i64 + (self.max_size as i64 / 10).max(1); // Check 10% over limit 241 let approx_count: Option<i64> = sqlx::query_scalar( 242 "SELECT COUNT(*) FROM ( 243 SELECT 1 FROM handle_resolution_queue LIMIT ?1 244 ) AS limited_count", 245 ) 246 .bind(check_limit) 247 .fetch_one(&self.pool) 248 .await 249 .map_err(|e| QueueError::PushFailed(format!("Failed to check queue size: {}", e)))?; 250 251 // Only perform cleanup if we're definitely over the limit 252 if let Some(count) = approx_count 253 && count >= check_limit 254 { 255 // Perform batch cleanup - delete more than just the excess to reduce frequency 256 // Delete 20% more than needed to avoid frequent shedding 257 let target_size = (self.max_size as f64 * 0.8) as i64; // Keep 80% of max_size 258 let to_delete = count - target_size; 259 260 if to_delete > 0 { 261 // Optimized deletion: First get the cutoff id and timestamp 262 // This avoids the expensive subquery in the DELETE statement 263 let cutoff: Option<(i64, i64)> = sqlx::query_as( 264 "SELECT id, queued_at FROM handle_resolution_queue 265 ORDER BY queued_at ASC, id ASC 266 LIMIT 1 OFFSET ?1", 267 ) 268 .bind(to_delete - 1) 269 .fetch_optional(&self.pool) 270 .await 271 .map_err(|e| QueueError::PushFailed(format!("Failed to find cutoff: {}", e)))?; 272 273 if let Some((cutoff_id, cutoff_timestamp)) = cutoff { 274 // Delete entries older than cutoff, or equal timestamp with lower id 275 // This handles the case where multiple entries have the same timestamp 276 let deleted_result = sqlx::query( 277 "DELETE FROM handle_resolution_queue 278 WHERE queued_at < ?1 279 OR (queued_at = ?1 AND id <= ?2)", 280 ) 281 .bind(cutoff_timestamp) 282 .bind(cutoff_id) 283 .execute(&self.pool) 284 .await 285 .map_err(|e| { 286 QueueError::PushFailed(format!( 287 "Failed to delete excess entries: {}", 288 e 289 )) 290 })?; 291 292 let deleted_count = deleted_result.rows_affected(); 293 if deleted_count > 0 { 294 info!( 295 "Work shedding: deleted {} oldest entries (target size: {}, max: {})", 296 deleted_count, target_size, self.max_size 297 ); 298 } 299 } 300 } 301 } 302 } 303 304 debug!( 305 "Pushed work item to SQLite queue (max_size: {})", 306 self.max_size 307 ); 308 Ok(()) 309 } 310 311 async fn ack(&self, _item: &T) -> Result<()> { 312 // With the simplified SQLite queue design, items are deleted when pulled, 313 // so acknowledgment is a no-op (item is already processed and removed) 314 debug!("Acknowledged work item in SQLite queue (no-op)"); 315 Ok(()) 316 } 317 318 async fn depth(&self) -> Option<usize> { 319 match sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM handle_resolution_queue") 320 .fetch_one(&self.pool) 321 .await 322 { 323 Ok(count) => Some(count as usize), 324 Err(e) => { 325 warn!("Failed to get SQLite queue depth: {}", e); 326 None 327 } 328 } 329 } 330 331 async fn is_healthy(&self) -> bool { 332 // Test the connection by running a simple query 333 sqlx::query_scalar::<_, i64>("SELECT 1") 334 .fetch_one(&self.pool) 335 .await 336 .map(|_| true) 337 .unwrap_or(false) 338 } 339} 340 341#[cfg(test)] 342mod tests { 343 use super::*; 344 use crate::queue::HandleResolutionWork; 345 346 async fn create_test_pool() -> sqlx::SqlitePool { 347 let pool = sqlx::SqlitePool::connect("sqlite::memory:") 348 .await 349 .expect("Failed to connect to in-memory SQLite"); 350 351 // Create the queue schema 352 crate::sqlite_schema::create_schema(&pool) 353 .await 354 .expect("Failed to create schema"); 355 356 pool 357 } 358 359 #[tokio::test] 360 async fn test_sqlite_queue_push_pull() { 361 let pool = create_test_pool().await; 362 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool.clone()); 363 364 let work = HandleResolutionWork::new("alice.example.com".to_string()); 365 366 // Test push 367 adapter.push(work.clone()).await.unwrap(); 368 369 // Verify depth 370 assert_eq!(adapter.depth().await, Some(1)); 371 372 // Test pull 373 let pulled = adapter.pull().await; 374 assert_eq!(pulled, Some(work)); 375 376 // Verify queue is empty after pull 377 assert_eq!(adapter.depth().await, Some(0)); 378 assert!(adapter.pull().await.is_none()); 379 } 380 381 #[tokio::test] 382 async fn test_sqlite_queue_fifo_ordering() { 383 let pool = create_test_pool().await; 384 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool); 385 386 // Push multiple items 387 let handles = vec![ 388 "alice.example.com", 389 "bob.example.com", 390 "charlie.example.com", 391 ]; 392 for handle in &handles { 393 let work = HandleResolutionWork::new(handle.to_string()); 394 adapter.push(work).await.unwrap(); 395 } 396 397 // Pull items in FIFO order 398 for expected_handle in handles { 399 let pulled = adapter.pull().await; 400 assert!(pulled.is_some()); 401 assert_eq!(pulled.unwrap().handle, expected_handle); 402 } 403 404 // Queue should be empty 405 assert!(adapter.pull().await.is_none()); 406 } 407 408 #[tokio::test] 409 async fn test_sqlite_queue_ack_noop() { 410 let pool = create_test_pool().await; 411 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool); 412 413 // Ack should always succeed as it's a no-op 414 let work = HandleResolutionWork::new("any.example.com".to_string()); 415 adapter.ack(&work).await.unwrap(); 416 } 417 418 #[tokio::test] 419 async fn test_sqlite_queue_health() { 420 let pool = create_test_pool().await; 421 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool); 422 423 // Should be healthy if SQLite is working 424 assert!(adapter.is_healthy().await); 425 } 426 427 #[tokio::test] 428 async fn test_sqlite_queue_work_shedding() { 429 let pool = create_test_pool().await; 430 431 // Create adapter with small max_size for testing 432 let max_size = 10; 433 let adapter = 434 SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(pool.clone(), max_size); 435 436 // Push items up to the limit (should not trigger shedding) 437 for i in 0..max_size { 438 let work = HandleResolutionWork::new(format!("test-{:03}", i)); 439 adapter.push(work).await.expect("Push should succeed"); 440 } 441 442 // Verify all items are present 443 assert_eq!(adapter.depth().await, Some(max_size as usize)); 444 445 // Push beyond 110% of max_size to trigger batch shedding 446 let trigger_point = max_size + (max_size / 10) + 1; 447 for i in max_size..trigger_point { 448 let work = HandleResolutionWork::new(format!("test-{:03}", i)); 449 adapter.push(work).await.expect("Push should succeed"); 450 } 451 452 // After triggering shedding, queue should be around 80% of max_size 453 let depth_after_shedding = adapter.depth().await.unwrap(); 454 let expected_size = (max_size as f64 * 0.8) as usize; 455 456 // Allow some variance due to batch deletion 457 assert!( 458 depth_after_shedding <= expected_size + 1, 459 "Queue size {} should be around 80% of max_size ({})", 460 depth_after_shedding, 461 expected_size 462 ); 463 } 464 465 #[tokio::test] 466 async fn test_sqlite_queue_work_shedding_disabled() { 467 let pool = create_test_pool().await; 468 469 // Create adapter with max_size = 0 (disabled work shedding) 470 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(pool, 0); 471 472 // Push many items (should not trigger any shedding) 473 for i in 0..100 { 474 let work = HandleResolutionWork::new(format!("test-{:03}", i)); 475 adapter.push(work).await.expect("Push should succeed"); 476 } 477 478 // Verify all items are present (no shedding occurred) 479 assert_eq!(adapter.depth().await, Some(100)); 480 } 481 482 #[tokio::test] 483 async fn test_sqlite_queue_generic_work_type() { 484 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] 485 struct CustomWork { 486 id: u64, 487 name: String, 488 data: Vec<i32>, 489 } 490 491 let pool = create_test_pool().await; 492 let adapter = SqliteQueueAdapter::<CustomWork>::new(pool); 493 494 let work = CustomWork { 495 id: 123, 496 name: "test_work".to_string(), 497 data: vec![1, 2, 3, 4, 5], 498 }; 499 500 // Test push and pull 501 adapter.push(work.clone()).await.unwrap(); 502 let pulled = adapter.pull().await; 503 assert_eq!(pulled, Some(work)); 504 } 505}