QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1//! SQLite-backed queue adapter implementation.
2//!
3//! This module provides a persistent queue implementation using SQLite
4//! with optional work shedding to prevent unbounded growth.
5
6use async_trait::async_trait;
7use serde::{Deserialize, Serialize};
8use sqlx::{self, Row};
9use tracing::{debug, error, info, warn};
10
11use super::adapter::QueueAdapter;
12use super::error::{QueueError, Result};
13
14/// SQLite-backed queue adapter implementation.
15///
16/// This adapter uses SQLite database for persistent queuing of work items.
17/// It's suitable for single-instance deployments that need persistence
18/// across service restarts while remaining lightweight.
19///
20/// # Features
21///
22/// - Persistent queuing across service restarts
23/// - Simple FIFO ordering based on insertion time
24/// - Single consumer design (no complex locking needed)
25/// - Simple pull-and-delete semantics
26/// - Optional work shedding to prevent unbounded queue growth
27///
28/// # Work Shedding
29///
30/// When `max_size` is configured (> 0), the adapter implements work shedding:
31/// - New work items are always accepted
32/// - When the queue exceeds `max_size`, oldest entries are automatically deleted
33/// - This maintains the most recent work items while preventing unbounded growth
34/// - Essential for long-running deployments to avoid disk space issues
35///
36/// # Database Schema
37///
38/// The adapter expects the following table structure:
39/// ```sql
40/// CREATE TABLE handle_resolution_queue (
41/// id INTEGER PRIMARY KEY AUTOINCREMENT,
42/// work TEXT NOT NULL,
43/// queued_at INTEGER NOT NULL
44/// );
45/// CREATE INDEX idx_queue_timestamp ON handle_resolution_queue(queued_at);
46/// ```
47///
48/// # Examples
49///
50/// ```no_run
51/// use quickdid::queue::SqliteQueueAdapter;
52/// use quickdid::queue::QueueAdapter;
53/// use quickdid::sqlite_schema::create_sqlite_pool;
54///
55/// # async fn example() -> anyhow::Result<()> {
56/// // Create SQLite pool
57/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?;
58///
59/// // Create queue with unlimited size
60/// let queue = SqliteQueueAdapter::<String>::new(pool.clone());
61///
62/// // Or create queue with work shedding (max 10,000 items)
63/// let bounded_queue = SqliteQueueAdapter::<String>::with_max_size(pool, 10000);
64///
65/// // Use the queue
66/// queue.push("work-item".to_string()).await?;
67/// if let Some(item) = queue.pull().await {
68/// // Process item (automatically deleted from queue)
69/// println!("Processing: {}", item);
70/// }
71/// # Ok(())
72/// # }
73/// ```
74pub struct SqliteQueueAdapter<T>
75where
76 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
77{
78 /// SQLite connection pool
79 pool: sqlx::SqlitePool,
80 /// Maximum queue size (0 = unlimited)
81 /// When exceeded, oldest entries are deleted to maintain this limit
82 max_size: u64,
83 /// Type marker for generic parameter
84 _phantom: std::marker::PhantomData<T>,
85}
86
87impl<T> SqliteQueueAdapter<T>
88where
89 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
90{
91 /// Create a new SQLite queue adapter with unlimited queue size.
92 ///
93 /// # Arguments
94 ///
95 /// * `pool` - SQLite connection pool
96 ///
97 /// # Examples
98 ///
99 /// ```no_run
100 /// use quickdid::queue::SqliteQueueAdapter;
101 /// use quickdid::sqlite_schema::create_sqlite_pool;
102 ///
103 /// # async fn example() -> anyhow::Result<()> {
104 /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?;
105 /// let queue = SqliteQueueAdapter::<String>::new(pool);
106 /// # Ok(())
107 /// # }
108 /// ```
109 pub fn new(pool: sqlx::SqlitePool) -> Self {
110 Self::with_max_size(pool, 0)
111 }
112
113 /// Create a new SQLite queue adapter with specified maximum queue size.
114 ///
115 /// # Arguments
116 ///
117 /// * `pool` - SQLite connection pool
118 /// * `max_size` - Maximum number of entries in queue (0 = unlimited)
119 ///
120 /// # Work Shedding Behavior
121 ///
122 /// When `max_size` > 0:
123 /// - New work items are always accepted
124 /// - If queue size exceeds `max_size` after insertion, oldest entries are deleted
125 /// - This preserves the most recent work while preventing unbounded growth
126 ///
127 /// # Examples
128 ///
129 /// ```no_run
130 /// use quickdid::queue::SqliteQueueAdapter;
131 /// use quickdid::sqlite_schema::create_sqlite_pool;
132 ///
133 /// # async fn example() -> anyhow::Result<()> {
134 /// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?;
135 /// // Limit queue to 10,000 entries with automatic work shedding
136 /// let queue = SqliteQueueAdapter::<String>::with_max_size(pool, 10000);
137 /// # Ok(())
138 /// # }
139 /// ```
140 pub fn with_max_size(pool: sqlx::SqlitePool, max_size: u64) -> Self {
141 Self {
142 pool,
143 max_size,
144 _phantom: std::marker::PhantomData,
145 }
146 }
147}
148
149#[async_trait]
150impl<T> QueueAdapter<T> for SqliteQueueAdapter<T>
151where
152 T: Send + Sync + Serialize + for<'de> Deserialize<'de> + 'static,
153{
154 async fn pull(&self) -> Option<T> {
155 // Get the oldest queued item and delete it in a transaction
156 let mut transaction = match self.pool.begin().await {
157 Ok(tx) => tx,
158 Err(e) => {
159 error!("Failed to start SQLite transaction: {}", e);
160 return None;
161 }
162 };
163
164 // Select the oldest queued item
165 let record = match sqlx::query(
166 "SELECT id, work FROM handle_resolution_queue
167 ORDER BY queued_at ASC
168 LIMIT 1",
169 )
170 .fetch_optional(&mut *transaction)
171 .await
172 {
173 Ok(Some(row)) => row,
174 Ok(None) => {
175 // No queued items available
176 debug!("No queued items available in SQLite queue");
177 return None;
178 }
179 Err(e) => {
180 error!("Failed to query SQLite queue: {}", e);
181 return None;
182 }
183 };
184
185 let item_id: i64 = record.get("id");
186 let work_json: String = record.get("work");
187
188 // Delete the item from the queue
189 if let Err(e) = sqlx::query("DELETE FROM handle_resolution_queue WHERE id = ?1")
190 .bind(item_id)
191 .execute(&mut *transaction)
192 .await
193 {
194 error!("Failed to delete item from queue: {}", e);
195 return None;
196 }
197
198 // Commit the transaction
199 if let Err(e) = transaction.commit().await {
200 error!("Failed to commit SQLite transaction: {}", e);
201 return None;
202 }
203
204 // Deserialize the work item from JSON
205 match serde_json::from_str(&work_json) {
206 Ok(work) => {
207 debug!("Pulled work item from SQLite queue");
208 Some(work)
209 }
210 Err(e) => {
211 error!("Failed to deserialize work item: {}", e);
212 None
213 }
214 }
215 }
216
217 async fn push(&self, work: T) -> Result<()> {
218 // Serialize the entire work item as JSON
219 let work_json = serde_json::to_string(&work)
220 .map_err(|e| QueueError::SerializationFailed(e.to_string()))?;
221
222 let current_timestamp = std::time::SystemTime::now()
223 .duration_since(std::time::UNIX_EPOCH)
224 .unwrap_or_default()
225 .as_secs() as i64;
226
227 // Optimized approach: Insert first, then check if cleanup needed
228 // This avoids counting on every insert
229 sqlx::query("INSERT INTO handle_resolution_queue (work, queued_at) VALUES (?1, ?2)")
230 .bind(&work_json)
231 .bind(current_timestamp)
232 .execute(&self.pool)
233 .await
234 .map_err(|e| QueueError::PushFailed(format!("Failed to insert work item: {}", e)))?;
235
236 // Implement optimized work shedding if max_size is configured
237 if self.max_size > 0 {
238 // Optimized approach: Only check and clean periodically or when likely over limit
239 // Use a limited count to avoid full table scan
240 let check_limit = self.max_size as i64 + (self.max_size as i64 / 10).max(1); // Check 10% over limit
241 let approx_count: Option<i64> = sqlx::query_scalar(
242 "SELECT COUNT(*) FROM (
243 SELECT 1 FROM handle_resolution_queue LIMIT ?1
244 ) AS limited_count",
245 )
246 .bind(check_limit)
247 .fetch_one(&self.pool)
248 .await
249 .map_err(|e| QueueError::PushFailed(format!("Failed to check queue size: {}", e)))?;
250
251 // Only perform cleanup if we're definitely over the limit
252 if let Some(count) = approx_count
253 && count >= check_limit
254 {
255 // Perform batch cleanup - delete more than just the excess to reduce frequency
256 // Delete 20% more than needed to avoid frequent shedding
257 let target_size = (self.max_size as f64 * 0.8) as i64; // Keep 80% of max_size
258 let to_delete = count - target_size;
259
260 if to_delete > 0 {
261 // Optimized deletion: First get the cutoff id and timestamp
262 // This avoids the expensive subquery in the DELETE statement
263 let cutoff: Option<(i64, i64)> = sqlx::query_as(
264 "SELECT id, queued_at FROM handle_resolution_queue
265 ORDER BY queued_at ASC, id ASC
266 LIMIT 1 OFFSET ?1",
267 )
268 .bind(to_delete - 1)
269 .fetch_optional(&self.pool)
270 .await
271 .map_err(|e| QueueError::PushFailed(format!("Failed to find cutoff: {}", e)))?;
272
273 if let Some((cutoff_id, cutoff_timestamp)) = cutoff {
274 // Delete entries older than cutoff, or equal timestamp with lower id
275 // This handles the case where multiple entries have the same timestamp
276 let deleted_result = sqlx::query(
277 "DELETE FROM handle_resolution_queue
278 WHERE queued_at < ?1
279 OR (queued_at = ?1 AND id <= ?2)",
280 )
281 .bind(cutoff_timestamp)
282 .bind(cutoff_id)
283 .execute(&self.pool)
284 .await
285 .map_err(|e| {
286 QueueError::PushFailed(format!(
287 "Failed to delete excess entries: {}",
288 e
289 ))
290 })?;
291
292 let deleted_count = deleted_result.rows_affected();
293 if deleted_count > 0 {
294 info!(
295 "Work shedding: deleted {} oldest entries (target size: {}, max: {})",
296 deleted_count, target_size, self.max_size
297 );
298 }
299 }
300 }
301 }
302 }
303
304 debug!(
305 "Pushed work item to SQLite queue (max_size: {})",
306 self.max_size
307 );
308 Ok(())
309 }
310
311 async fn ack(&self, _item: &T) -> Result<()> {
312 // With the simplified SQLite queue design, items are deleted when pulled,
313 // so acknowledgment is a no-op (item is already processed and removed)
314 debug!("Acknowledged work item in SQLite queue (no-op)");
315 Ok(())
316 }
317
318 async fn depth(&self) -> Option<usize> {
319 match sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM handle_resolution_queue")
320 .fetch_one(&self.pool)
321 .await
322 {
323 Ok(count) => Some(count as usize),
324 Err(e) => {
325 warn!("Failed to get SQLite queue depth: {}", e);
326 None
327 }
328 }
329 }
330
331 async fn is_healthy(&self) -> bool {
332 // Test the connection by running a simple query
333 sqlx::query_scalar::<_, i64>("SELECT 1")
334 .fetch_one(&self.pool)
335 .await
336 .map(|_| true)
337 .unwrap_or(false)
338 }
339}
340
341#[cfg(test)]
342mod tests {
343 use super::*;
344 use crate::queue::HandleResolutionWork;
345
346 async fn create_test_pool() -> sqlx::SqlitePool {
347 let pool = sqlx::SqlitePool::connect("sqlite::memory:")
348 .await
349 .expect("Failed to connect to in-memory SQLite");
350
351 // Create the queue schema
352 crate::sqlite_schema::create_schema(&pool)
353 .await
354 .expect("Failed to create schema");
355
356 pool
357 }
358
359 #[tokio::test]
360 async fn test_sqlite_queue_push_pull() {
361 let pool = create_test_pool().await;
362 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool.clone());
363
364 let work = HandleResolutionWork::new("alice.example.com".to_string());
365
366 // Test push
367 adapter.push(work.clone()).await.unwrap();
368
369 // Verify depth
370 assert_eq!(adapter.depth().await, Some(1));
371
372 // Test pull
373 let pulled = adapter.pull().await;
374 assert_eq!(pulled, Some(work));
375
376 // Verify queue is empty after pull
377 assert_eq!(adapter.depth().await, Some(0));
378 assert!(adapter.pull().await.is_none());
379 }
380
381 #[tokio::test]
382 async fn test_sqlite_queue_fifo_ordering() {
383 let pool = create_test_pool().await;
384 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool);
385
386 // Push multiple items
387 let handles = vec![
388 "alice.example.com",
389 "bob.example.com",
390 "charlie.example.com",
391 ];
392 for handle in &handles {
393 let work = HandleResolutionWork::new(handle.to_string());
394 adapter.push(work).await.unwrap();
395 }
396
397 // Pull items in FIFO order
398 for expected_handle in handles {
399 let pulled = adapter.pull().await;
400 assert!(pulled.is_some());
401 assert_eq!(pulled.unwrap().handle, expected_handle);
402 }
403
404 // Queue should be empty
405 assert!(adapter.pull().await.is_none());
406 }
407
408 #[tokio::test]
409 async fn test_sqlite_queue_ack_noop() {
410 let pool = create_test_pool().await;
411 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool);
412
413 // Ack should always succeed as it's a no-op
414 let work = HandleResolutionWork::new("any.example.com".to_string());
415 adapter.ack(&work).await.unwrap();
416 }
417
418 #[tokio::test]
419 async fn test_sqlite_queue_health() {
420 let pool = create_test_pool().await;
421 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool);
422
423 // Should be healthy if SQLite is working
424 assert!(adapter.is_healthy().await);
425 }
426
427 #[tokio::test]
428 async fn test_sqlite_queue_work_shedding() {
429 let pool = create_test_pool().await;
430
431 // Create adapter with small max_size for testing
432 let max_size = 10;
433 let adapter =
434 SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(pool.clone(), max_size);
435
436 // Push items up to the limit (should not trigger shedding)
437 for i in 0..max_size {
438 let work = HandleResolutionWork::new(format!("test-{:03}", i));
439 adapter.push(work).await.expect("Push should succeed");
440 }
441
442 // Verify all items are present
443 assert_eq!(adapter.depth().await, Some(max_size as usize));
444
445 // Push beyond 110% of max_size to trigger batch shedding
446 let trigger_point = max_size + (max_size / 10) + 1;
447 for i in max_size..trigger_point {
448 let work = HandleResolutionWork::new(format!("test-{:03}", i));
449 adapter.push(work).await.expect("Push should succeed");
450 }
451
452 // After triggering shedding, queue should be around 80% of max_size
453 let depth_after_shedding = adapter.depth().await.unwrap();
454 let expected_size = (max_size as f64 * 0.8) as usize;
455
456 // Allow some variance due to batch deletion
457 assert!(
458 depth_after_shedding <= expected_size + 1,
459 "Queue size {} should be around 80% of max_size ({})",
460 depth_after_shedding,
461 expected_size
462 );
463 }
464
465 #[tokio::test]
466 async fn test_sqlite_queue_work_shedding_disabled() {
467 let pool = create_test_pool().await;
468
469 // Create adapter with max_size = 0 (disabled work shedding)
470 let adapter = SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(pool, 0);
471
472 // Push many items (should not trigger any shedding)
473 for i in 0..100 {
474 let work = HandleResolutionWork::new(format!("test-{:03}", i));
475 adapter.push(work).await.expect("Push should succeed");
476 }
477
478 // Verify all items are present (no shedding occurred)
479 assert_eq!(adapter.depth().await, Some(100));
480 }
481
482 #[tokio::test]
483 async fn test_sqlite_queue_generic_work_type() {
484 #[derive(Debug, Clone, Serialize, Deserialize, PartialEq)]
485 struct CustomWork {
486 id: u64,
487 name: String,
488 data: Vec<i32>,
489 }
490
491 let pool = create_test_pool().await;
492 let adapter = SqliteQueueAdapter::<CustomWork>::new(pool);
493
494 let work = CustomWork {
495 id: 123,
496 name: "test_work".to_string(),
497 data: vec![1, 2, 3, 4, 5],
498 };
499
500 // Test push and pull
501 adapter.push(work.clone()).await.unwrap();
502 let pulled = adapter.pull().await;
503 assert_eq!(pulled, Some(work));
504 }
505}