QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 11 kB view raw
1//! SQLite schema management for QuickDID. 2//! 3//! This module provides functionality to create and manage the SQLite database 4//! schema used by the SQLite-backed handle resolver cache. 5 6use anyhow::Result; 7use sqlx::{Sqlite, SqlitePool, migrate::MigrateDatabase}; 8use std::path::Path; 9 10/// SQL schema for the handle resolution cache table. 11const CREATE_HANDLE_RESOLUTION_CACHE_TABLE: &str = r#" 12CREATE TABLE IF NOT EXISTS handle_resolution_cache ( 13 key INTEGER PRIMARY KEY, 14 result BLOB NOT NULL, 15 created INTEGER NOT NULL, 16 updated INTEGER NOT NULL 17); 18 19CREATE INDEX IF NOT EXISTS idx_handle_resolution_cache_updated 20ON handle_resolution_cache(updated); 21"#; 22 23/// SQL schema for the handle resolution queue table. 24const CREATE_HANDLE_RESOLUTION_QUEUE_TABLE: &str = r#" 25CREATE TABLE IF NOT EXISTS handle_resolution_queue ( 26 id INTEGER PRIMARY KEY AUTOINCREMENT, 27 work TEXT NOT NULL, 28 queued_at INTEGER NOT NULL 29); 30 31CREATE INDEX IF NOT EXISTS idx_handle_resolution_queue_queued_at 32ON handle_resolution_queue(queued_at); 33"#; 34 35/// Create or connect to a SQLite database and ensure schema is initialized. 36/// 37/// # Arguments 38/// 39/// * `database_url` - SQLite database URL (e.g., "sqlite:./quickdid.db" or "sqlite::memory:") 40/// 41/// # Returns 42/// 43/// Returns a SqlitePool connected to the database with schema initialized. 44/// 45/// # Example 46/// 47/// ```no_run 48/// use quickdid::sqlite_schema::create_sqlite_pool; 49/// 50/// # async fn example() -> anyhow::Result<()> { 51/// // File-based database 52/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 53/// 54/// // In-memory database (for testing) 55/// let pool = create_sqlite_pool("sqlite::memory:").await?; 56/// # Ok(()) 57/// # } 58/// ``` 59pub async fn create_sqlite_pool(database_url: &str) -> Result<SqlitePool> { 60 tracing::info!("Initializing SQLite database: {}", database_url); 61 62 // Extract the database path from the URL for file-based databases 63 if let Some(path) = database_url.strip_prefix("sqlite:") 64 && path != ":memory:" 65 && !path.is_empty() 66 { 67 // Create the database file if it doesn't exist 68 if !Sqlite::database_exists(database_url).await? { 69 tracing::info!("Creating SQLite database file: {}", path); 70 Sqlite::create_database(database_url).await?; 71 } 72 73 // Ensure the parent directory exists 74 if let Some(parent) = Path::new(path).parent() 75 && !parent.exists() 76 { 77 tracing::info!("Creating directory: {}", parent.display()); 78 std::fs::create_dir_all(parent)?; 79 } 80 } 81 82 // Connect to the database 83 let pool = SqlitePool::connect(database_url).await?; 84 tracing::info!("Connected to SQLite database"); 85 86 // Create the schema 87 create_schema(&pool).await?; 88 89 Ok(pool) 90} 91 92/// Create the database schema if it doesn't exist. 93/// 94/// # Arguments 95/// 96/// * `pool` - SQLite connection pool 97/// 98/// # Example 99/// 100/// ```no_run 101/// use quickdid::sqlite_schema::create_schema; 102/// use sqlx::SqlitePool; 103/// 104/// # async fn example() -> anyhow::Result<()> { 105/// let pool = SqlitePool::connect("sqlite::memory:").await?; 106/// create_schema(&pool).await?; 107/// # Ok(()) 108/// # } 109/// ``` 110pub async fn create_schema(pool: &SqlitePool) -> Result<()> { 111 tracing::debug!("Creating SQLite schema if not exists"); 112 113 // Execute the schema creation SQL 114 sqlx::query(CREATE_HANDLE_RESOLUTION_CACHE_TABLE) 115 .execute(pool) 116 .await?; 117 118 sqlx::query(CREATE_HANDLE_RESOLUTION_QUEUE_TABLE) 119 .execute(pool) 120 .await?; 121 122 tracing::info!("SQLite schema initialized"); 123 124 Ok(()) 125} 126 127/// Clean up expired entries from the handle resolution cache. 128/// 129/// This function removes entries that are older than the specified TTL. 130/// It should be called periodically to prevent the database from growing indefinitely. 131/// 132/// # Arguments 133/// 134/// * `pool` - SQLite connection pool 135/// * `ttl_seconds` - TTL in seconds for cache entries 136/// 137/// # Returns 138/// 139/// Returns the number of entries deleted. 140/// 141/// # Example 142/// 143/// ```no_run 144/// use quickdid::sqlite_schema::cleanup_expired_entries; 145/// use sqlx::SqlitePool; 146/// 147/// # async fn example() -> anyhow::Result<()> { 148/// let pool = SqlitePool::connect("sqlite:./quickdid.db").await?; 149/// let deleted_count = cleanup_expired_entries(&pool, 7776000).await?; // 90 days 150/// println!("Deleted {} expired entries", deleted_count); 151/// # Ok(()) 152/// # } 153/// ``` 154pub async fn cleanup_expired_entries(pool: &SqlitePool, ttl_seconds: u64) -> Result<u64> { 155 let current_timestamp = std::time::SystemTime::now() 156 .duration_since(std::time::UNIX_EPOCH) 157 .unwrap_or_default() 158 .as_secs() as i64; 159 160 let cutoff_timestamp = current_timestamp - (ttl_seconds as i64); 161 162 let result = sqlx::query("DELETE FROM handle_resolution_cache WHERE updated < ?1") 163 .bind(cutoff_timestamp) 164 .execute(pool) 165 .await?; 166 167 let deleted_count = result.rows_affected(); 168 if deleted_count > 0 { 169 tracing::info!("Cleaned up {} expired cache entries", deleted_count); 170 } 171 172 Ok(deleted_count) 173} 174 175/// Get statistics about the handle resolution cache. 176/// 177/// # Arguments 178/// 179/// * `pool` - SQLite connection pool 180/// 181/// # Returns 182/// 183/// Returns a tuple of (total_entries, database_size_bytes). 184/// 185/// # Example 186/// 187/// ```no_run 188/// use quickdid::sqlite_schema::get_cache_stats; 189/// use sqlx::SqlitePool; 190/// 191/// # async fn example() -> anyhow::Result<()> { 192/// let pool = SqlitePool::connect("sqlite:./quickdid.db").await?; 193/// let (total_entries, size_bytes) = get_cache_stats(&pool).await?; 194/// println!("Cache has {} entries, {} bytes", total_entries, size_bytes); 195/// # Ok(()) 196/// # } 197/// ``` 198pub async fn get_cache_stats(pool: &SqlitePool) -> Result<(i64, i64)> { 199 // Get total entries 200 let total_entries: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 201 .fetch_one(pool) 202 .await?; 203 204 // Get database page size and page count to calculate total size 205 let page_size: i64 = sqlx::query_scalar("PRAGMA page_size") 206 .fetch_one(pool) 207 .await?; 208 209 let page_count: i64 = sqlx::query_scalar("PRAGMA page_count") 210 .fetch_one(pool) 211 .await?; 212 213 let size_bytes = page_size * page_count; 214 215 Ok((total_entries, size_bytes)) 216} 217 218/// Clean up old entries from the handle resolution queue. 219/// 220/// This function removes entries that are older than the specified age. 221/// 222/// # Arguments 223/// 224/// * `pool` - SQLite connection pool 225/// * `max_age_seconds` - Maximum age in seconds for queue entries to be kept 226/// 227/// # Returns 228/// 229/// Returns the number of entries deleted. 230/// 231/// # Example 232/// 233/// ```no_run 234/// use quickdid::sqlite_schema::cleanup_queue_entries; 235/// use sqlx::SqlitePool; 236/// 237/// # async fn example() -> anyhow::Result<()> { 238/// let pool = SqlitePool::connect("sqlite:./quickdid.db").await?; 239/// let deleted_count = cleanup_queue_entries(&pool, 86400).await?; // 1 day 240/// println!("Deleted {} old queue entries", deleted_count); 241/// # Ok(()) 242/// # } 243/// ``` 244pub async fn cleanup_queue_entries(pool: &SqlitePool, max_age_seconds: u64) -> Result<u64> { 245 let current_timestamp = std::time::SystemTime::now() 246 .duration_since(std::time::UNIX_EPOCH) 247 .unwrap_or_default() 248 .as_secs() as i64; 249 250 let cutoff_timestamp = current_timestamp - (max_age_seconds as i64); 251 252 let result = sqlx::query("DELETE FROM handle_resolution_queue WHERE queued_at < ?1") 253 .bind(cutoff_timestamp) 254 .execute(pool) 255 .await?; 256 257 let deleted_count = result.rows_affected(); 258 if deleted_count > 0 { 259 tracing::info!("Cleaned up {} old queue entries", deleted_count); 260 } 261 262 Ok(deleted_count) 263} 264 265/// Get statistics about the handle resolution queue. 266/// 267/// # Arguments 268/// 269/// * `pool` - SQLite connection pool 270/// 271/// # Returns 272/// 273/// Returns the total number of entries in the queue. 274/// 275/// # Example 276/// 277/// ```no_run 278/// use quickdid::sqlite_schema::get_queue_stats; 279/// use sqlx::SqlitePool; 280/// 281/// # async fn example() -> anyhow::Result<()> { 282/// let pool = SqlitePool::connect("sqlite:./quickdid.db").await?; 283/// let total = get_queue_stats(&pool).await?; 284/// println!("Queue: {} total entries", total); 285/// # Ok(()) 286/// # } 287/// ``` 288pub async fn get_queue_stats(pool: &SqlitePool) -> Result<i64> { 289 // Get total entries 290 let total_entries: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_queue") 291 .fetch_one(pool) 292 .await?; 293 294 Ok(total_entries) 295} 296 297#[cfg(test)] 298mod tests { 299 use super::*; 300 301 #[tokio::test] 302 async fn test_create_sqlite_pool_memory() { 303 let pool = create_sqlite_pool("sqlite::memory:") 304 .await 305 .expect("Failed to create in-memory SQLite pool"); 306 307 // Verify the table was created 308 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 309 .fetch_one(&pool) 310 .await 311 .expect("Failed to query table"); 312 313 assert_eq!(count, 0); 314 } 315 316 #[tokio::test] 317 async fn test_cleanup_expired_entries() { 318 let pool = create_sqlite_pool("sqlite::memory:") 319 .await 320 .expect("Failed to create in-memory SQLite pool"); 321 322 // Insert a test entry that's already expired 323 let old_timestamp = std::time::SystemTime::now() 324 .duration_since(std::time::UNIX_EPOCH) 325 .unwrap() 326 .as_secs() as i64 327 - 3600; // 1 hour ago 328 329 sqlx::query( 330 "INSERT INTO handle_resolution_cache (key, result, created, updated) VALUES (1, ?1, ?2, ?2)" 331 ) 332 .bind(&b"test_data"[..]) 333 .bind(old_timestamp) 334 .execute(&pool) 335 .await 336 .expect("Failed to insert test data"); 337 338 // Clean up entries older than 30 minutes (1800 seconds) 339 let deleted = cleanup_expired_entries(&pool, 1800) 340 .await 341 .expect("Failed to cleanup expired entries"); 342 343 assert_eq!(deleted, 1); 344 345 // Verify the entry was deleted 346 let count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 347 .fetch_one(&pool) 348 .await 349 .expect("Failed to query table"); 350 351 assert_eq!(count, 0); 352 } 353 354 #[tokio::test] 355 async fn test_get_cache_stats() { 356 let pool = create_sqlite_pool("sqlite::memory:") 357 .await 358 .expect("Failed to create in-memory SQLite pool"); 359 360 // Insert a test entry 361 let current_timestamp = std::time::SystemTime::now() 362 .duration_since(std::time::UNIX_EPOCH) 363 .unwrap() 364 .as_secs() as i64; 365 366 sqlx::query( 367 "INSERT INTO handle_resolution_cache (key, result, created, updated) VALUES (1, ?1, ?2, ?2)" 368 ) 369 .bind(&b"test_data"[..]) 370 .bind(current_timestamp) 371 .execute(&pool) 372 .await 373 .expect("Failed to insert test data"); 374 375 let (total_entries, size_bytes) = get_cache_stats(&pool) 376 .await 377 .expect("Failed to get cache stats"); 378 379 assert_eq!(total_entries, 1); 380 assert!(size_bytes > 0); 381 } 382}