//! SQLite-backed queue adapter implementation. //! //! This module provides a persistent queue implementation using SQLite //! with optional work shedding to prevent unbounded growth. use async_trait::async_trait; use serde::{Deserialize, Serialize}; use sqlx::{self, Row}; use tracing::{debug, error, info, warn}; use super::adapter::QueueAdapter; use super::error::{QueueError, Result}; /// SQLite-backed queue adapter implementation. /// /// This adapter uses SQLite database for persistent queuing of work items. /// It's suitable for single-instance deployments that need persistence /// across service restarts while remaining lightweight. /// /// # Features /// /// - Persistent queuing across service restarts /// - Simple FIFO ordering based on insertion time /// - Single consumer design (no complex locking needed) /// - Simple pull-and-delete semantics /// - Optional work shedding to prevent unbounded queue growth /// /// # Work Shedding /// /// When `max_size` is configured (> 0), the adapter implements work shedding: /// - New work items are always accepted /// - When the queue exceeds `max_size`, oldest entries are automatically deleted /// - This maintains the most recent work items while preventing unbounded growth /// - Essential for long-running deployments to avoid disk space issues /// /// # Database Schema /// /// The adapter expects the following table structure: /// ```sql /// CREATE TABLE handle_resolution_queue ( /// id INTEGER PRIMARY KEY AUTOINCREMENT, /// work TEXT NOT NULL, /// queued_at INTEGER NOT NULL /// ); /// CREATE INDEX idx_queue_timestamp ON handle_resolution_queue(queued_at); /// ``` /// /// # Examples /// /// ```no_run /// use quickdid::queue::SqliteQueueAdapter; /// use quickdid::queue::QueueAdapter; /// use quickdid::sqlite_schema::create_sqlite_pool; /// /// # async fn example() -> anyhow::Result<()> { /// // Create SQLite pool /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; /// /// // Create queue with unlimited size /// let queue = SqliteQueueAdapter::::new(pool.clone()); /// /// // Or create queue with work shedding (max 10,000 items) /// let bounded_queue = SqliteQueueAdapter::::with_max_size(pool, 10000); /// /// // Use the queue /// queue.push("work-item".to_string()).await?; /// if let Some(item) = queue.pull().await { /// // Process item (automatically deleted from queue) /// println!("Processing: {}", item); /// } /// # Ok(()) /// # } /// ``` pub struct SqliteQueueAdapter where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, { /// SQLite connection pool pool: sqlx::SqlitePool, /// Maximum queue size (0 = unlimited) /// When exceeded, oldest entries are deleted to maintain this limit max_size: u64, /// Type marker for generic parameter _phantom: std::marker::PhantomData, } impl SqliteQueueAdapter where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, { /// Create a new SQLite queue adapter with unlimited queue size. /// /// # Arguments /// /// * `pool` - SQLite connection pool /// /// # Examples /// /// ```no_run /// use quickdid::queue::SqliteQueueAdapter; /// use quickdid::sqlite_schema::create_sqlite_pool; /// /// # async fn example() -> anyhow::Result<()> { /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; /// let queue = SqliteQueueAdapter::::new(pool); /// # Ok(()) /// # } /// ``` pub fn new(pool: sqlx::SqlitePool) -> Self { Self::with_max_size(pool, 0) } /// Create a new SQLite queue adapter with specified maximum queue size. /// /// # Arguments /// /// * `pool` - SQLite connection pool /// * `max_size` - Maximum number of entries in queue (0 = unlimited) /// /// # Work Shedding Behavior /// /// When `max_size` > 0: /// - New work items are always accepted /// - If queue size exceeds `max_size` after insertion, oldest entries are deleted /// - This preserves the most recent work while preventing unbounded growth /// /// # Examples /// /// ```no_run /// use quickdid::queue::SqliteQueueAdapter; /// use quickdid::sqlite_schema::create_sqlite_pool; /// /// # async fn example() -> anyhow::Result<()> { /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; /// // Limit queue to 10,000 entries with automatic work shedding /// let queue = SqliteQueueAdapter::::with_max_size(pool, 10000); /// # Ok(()) /// # } /// ``` pub fn with_max_size(pool: sqlx::SqlitePool, max_size: u64) -> Self { Self { pool, max_size, _phantom: std::marker::PhantomData, } } } #[async_trait] impl QueueAdapter for SqliteQueueAdapter where T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static, { async fn pull(&self) -> Option { // Get the oldest queued item and delete it in a transaction let mut transaction = match self.pool.begin().await { Ok(tx) => tx, Err(e) => { error!("Failed to start SQLite transaction: {}", e); return None; } }; // Select the oldest queued item let record = match sqlx::query( "SELECT id, work FROM handle_resolution_queue ORDER BY queued_at ASC LIMIT 1", ) .fetch_optional(&mut *transaction) .await { Ok(Some(row)) => row, Ok(None) => { // No queued items available debug!("No queued items available in SQLite queue"); return None; } Err(e) => { error!("Failed to query SQLite queue: {}", e); return None; } }; let item_id: i64 = record.get("id"); let work_json: String = record.get("work"); // Delete the item from the queue if let Err(e) = sqlx::query("DELETE FROM handle_resolution_queue WHERE id = ?1") .bind(item_id) .execute(&mut *transaction) .await { error!("Failed to delete item from queue: {}", e); return None; } // Commit the transaction if let Err(e) = transaction.commit().await { error!("Failed to commit SQLite transaction: {}", e); return None; } // Deserialize the work item from JSON match serde_json::from_str(&work_json) { Ok(work) => { debug!("Pulled work item from SQLite queue"); Some(work) } Err(e) => { error!("Failed to deserialize work item: {}", e); None } } } async fn push(&self, work: T) -> Result<()> { // Serialize the entire work item as JSON let work_json = serde_json::to_string(&work) .map_err(|e| QueueError::SerializationFailed(e.to_string()))?; let current_timestamp = std::time::SystemTime::now() .duration_since(std::time::UNIX_EPOCH) .unwrap_or_default() .as_secs() as i64; // Optimized approach: Insert first, then check if cleanup needed // This avoids counting on every insert sqlx::query("INSERT INTO handle_resolution_queue (work, queued_at) VALUES (?1, ?2)") .bind(&work_json) .bind(current_timestamp) .execute(&self.pool) .await .map_err(|e| QueueError::PushFailed(format!("Failed to insert work item: {}", e)))?; // Implement optimized work shedding if max_size is configured if self.max_size > 0 { // Optimized approach: Only check and clean periodically or when likely over limit // Use a limited count to avoid full table scan let check_limit = self.max_size as i64 + (self.max_size as i64 / 10).max(1); // Check 10% over limit let approx_count: Option = sqlx::query_scalar( "SELECT COUNT(*) FROM ( SELECT 1 FROM handle_resolution_queue LIMIT ?1 ) AS limited_count", ) .bind(check_limit) .fetch_one(&self.pool) .await .map_err(|e| QueueError::PushFailed(format!("Failed to check queue size: {}", e)))?; // Only perform cleanup if we're definitely over the limit if let Some(count) = approx_count && count >= check_limit { // Perform batch cleanup - delete more than just the excess to reduce frequency // Delete 20% more than needed to avoid frequent shedding let target_size = (self.max_size as f64 * 0.8) as i64; // Keep 80% of max_size let to_delete = count - target_size; if to_delete > 0 { // Optimized deletion: First get the cutoff id and timestamp // This avoids the expensive subquery in the DELETE statement let cutoff: Option<(i64, i64)> = sqlx::query_as( "SELECT id, queued_at FROM handle_resolution_queue ORDER BY queued_at ASC, id ASC LIMIT 1 OFFSET ?1", ) .bind(to_delete - 1) .fetch_optional(&self.pool) .await .map_err(|e| QueueError::PushFailed(format!("Failed to find cutoff: {}", e)))?; if let Some((cutoff_id, cutoff_timestamp)) = cutoff { // Delete entries older than cutoff, or equal timestamp with lower id // This handles the case where multiple entries have the same timestamp let deleted_result = sqlx::query( "DELETE FROM handle_resolution_queue WHERE queued_at < ?1 OR (queued_at = ?1 AND id <= ?2)", ) .bind(cutoff_timestamp) .bind(cutoff_id) .execute(&self.pool) .await .map_err(|e| { QueueError::PushFailed(format!( "Failed to delete excess entries: {}", e )) })?; let deleted_count = deleted_result.rows_affected(); if deleted_count > 0 { info!( "Work shedding: deleted {} oldest entries (target size: {}, max: {})", deleted_count, target_size, self.max_size ); } } } } } debug!( "Pushed work item to SQLite queue (max_size: {})", self.max_size ); Ok(()) } async fn ack(&self, _item: &T) -> Result<()> { // With the simplified SQLite queue design, items are deleted when pulled, // so acknowledgment is a no-op (item is already processed and removed) debug!("Acknowledged work item in SQLite queue (no-op)"); Ok(()) } async fn depth(&self) -> Option { match sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM handle_resolution_queue") .fetch_one(&self.pool) .await { Ok(count) => Some(count as usize), Err(e) => { warn!("Failed to get SQLite queue depth: {}", e); None } } } async fn is_healthy(&self) -> bool { // Test the connection by running a simple query sqlx::query_scalar::<_, i64>("SELECT 1") .fetch_one(&self.pool) .await .map(|_| true) .unwrap_or(false) } } #[cfg(test)] mod tests { use super::*; use crate::queue::HandleResolutionWork; async fn create_test_pool() -> sqlx::SqlitePool { let pool = sqlx::SqlitePool::connect("sqlite::memory:") .await .expect("Failed to connect to in-memory SQLite"); // Create the queue schema crate::sqlite_schema::create_schema(&pool) .await .expect("Failed to create schema"); pool } #[tokio::test] async fn test_sqlite_queue_push_pull() { let pool = create_test_pool().await; let adapter = SqliteQueueAdapter::::new(pool.clone()); let work = HandleResolutionWork::new("alice.example.com".to_string()); // Test push adapter.push(work.clone()).await.unwrap(); // Verify depth assert_eq!(adapter.depth().await, Some(1)); // Test pull let pulled = adapter.pull().await; assert_eq!(pulled, Some(work)); // Verify queue is empty after pull assert_eq!(adapter.depth().await, Some(0)); assert!(adapter.pull().await.is_none()); } #[tokio::test] async fn test_sqlite_queue_fifo_ordering() { let pool = create_test_pool().await; let adapter = SqliteQueueAdapter::::new(pool); // Push multiple items let handles = vec![ "alice.example.com", "bob.example.com", "charlie.example.com", ]; for handle in &handles { let work = HandleResolutionWork::new(handle.to_string()); adapter.push(work).await.unwrap(); } // Pull items in FIFO order for expected_handle in handles { let pulled = adapter.pull().await; assert!(pulled.is_some()); assert_eq!(pulled.unwrap().handle, expected_handle); } // Queue should be empty assert!(adapter.pull().await.is_none()); } #[tokio::test] async fn test_sqlite_queue_ack_noop() { let pool = create_test_pool().await; let adapter = SqliteQueueAdapter::::new(pool); // Ack should always succeed as it's a no-op let work = HandleResolutionWork::new("any.example.com".to_string()); adapter.ack(&work).await.unwrap(); } #[tokio::test] async fn test_sqlite_queue_health() { let pool = create_test_pool().await; let adapter = SqliteQueueAdapter::::new(pool); // Should be healthy if SQLite is working assert!(adapter.is_healthy().await); } #[tokio::test] async fn test_sqlite_queue_work_shedding() { let pool = create_test_pool().await; // Create adapter with small max_size for testing let max_size = 10; let adapter = SqliteQueueAdapter::::with_max_size(pool.clone(), max_size); // Push items up to the limit (should not trigger shedding) for i in 0..max_size { let work = HandleResolutionWork::new(format!("test-{:03}", i)); adapter.push(work).await.expect("Push should succeed"); } // Verify all items are present assert_eq!(adapter.depth().await, Some(max_size as usize)); // Push beyond 110% of max_size to trigger batch shedding let trigger_point = max_size + (max_size / 10) + 1; for i in max_size..trigger_point { let work = HandleResolutionWork::new(format!("test-{:03}", i)); adapter.push(work).await.expect("Push should succeed"); } // After triggering shedding, queue should be around 80% of max_size let depth_after_shedding = adapter.depth().await.unwrap(); let expected_size = (max_size as f64 * 0.8) as usize; // Allow some variance due to batch deletion assert!( depth_after_shedding <= expected_size + 1, "Queue size {} should be around 80% of max_size ({})", depth_after_shedding, expected_size ); } #[tokio::test] async fn test_sqlite_queue_work_shedding_disabled() { let pool = create_test_pool().await; // Create adapter with max_size = 0 (disabled work shedding) let adapter = SqliteQueueAdapter::::with_max_size(pool, 0); // Push many items (should not trigger any shedding) for i in 0..100 { let work = HandleResolutionWork::new(format!("test-{:03}", i)); adapter.push(work).await.expect("Push should succeed"); } // Verify all items are present (no shedding occurred) assert_eq!(adapter.depth().await, Some(100)); } #[tokio::test] async fn test_sqlite_queue_generic_work_type() { #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)] struct CustomWork { id: u64, name: String, data: Vec, } let pool = create_test_pool().await; let adapter = SqliteQueueAdapter::::new(pool); let work = CustomWork { id: 123, name: "test_work".to_string(), data: vec![1, 2, 3, 4, 5], }; // Test push and pull adapter.push(work.clone()).await.unwrap(); let pulled = adapter.pull().await; assert_eq!(pulled, Some(work)); } }