QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 24 kB view raw
1//! SQLite-backed caching handle resolver. 2//! 3//! This module provides a handle resolver that caches resolution results in SQLite 4//! with configurable expiration times. SQLite caching provides persistence across 5//! service restarts while remaining lightweight for single-instance deployments. 6 7use super::errors::HandleResolverError; 8use super::traits::HandleResolver; 9use crate::handle_resolution_result::HandleResolutionResult; 10use crate::metrics::SharedMetricsPublisher; 11use async_trait::async_trait; 12use metrohash::MetroHash64; 13use sqlx::{Row, SqlitePool}; 14use std::hash::Hasher as _; 15use std::sync::Arc; 16use std::time::{SystemTime, UNIX_EPOCH}; 17 18/// SQLite-backed caching handle resolver. 19/// 20/// This resolver caches handle resolution results in SQLite with a configurable TTL. 21/// Results are stored in a compact binary format using bincode serialization 22/// to minimize storage overhead. 23/// 24/// # Features 25/// 26/// - Persistent caching across service restarts 27/// - Lightweight single-file database 28/// - Configurable TTL (default: 90 days) 29/// - Compact binary storage format 30/// - Automatic schema management 31/// - Graceful fallback if SQLite is unavailable 32/// 33/// # Example 34/// 35/// ```no_run 36/// use std::sync::Arc; 37/// use sqlx::SqlitePool; 38/// use quickdid::handle_resolver::{create_base_resolver, create_sqlite_resolver, HandleResolver}; 39/// use quickdid::metrics::NoOpMetricsPublisher; 40/// 41/// # async fn example() { 42/// # use atproto_identity::resolve::HickoryDnsResolver; 43/// # use reqwest::Client; 44/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 45/// # let http_client = Client::new(); 46/// # let metrics = Arc::new(NoOpMetricsPublisher); 47/// # let base_resolver = create_base_resolver(dns_resolver, http_client, metrics.clone()); 48/// # let sqlite_pool: SqlitePool = todo!(); 49/// // Create with default 90-day TTL 50/// let resolver = create_sqlite_resolver( 51/// base_resolver, 52/// sqlite_pool, 53/// metrics 54/// ); 55/// # } 56/// ``` 57pub(super) struct SqliteHandleResolver { 58 /// Base handle resolver to perform actual resolution 59 inner: Arc<dyn HandleResolver>, 60 /// SQLite connection pool 61 pool: SqlitePool, 62 /// TTL for cache entries in seconds 63 ttl_seconds: u64, 64 /// Metrics publisher for telemetry 65 metrics: SharedMetricsPublisher, 66} 67 68impl SqliteHandleResolver { 69 /// Create a new SQLite-backed handle resolver with default 90-day TTL. 70 fn new( 71 inner: Arc<dyn HandleResolver>, 72 pool: SqlitePool, 73 metrics: SharedMetricsPublisher, 74 ) -> Self { 75 Self::with_ttl(inner, pool, 90 * 24 * 60 * 60, metrics) // 90 days default 76 } 77 78 /// Create a new SQLite-backed handle resolver with custom TTL. 79 fn with_ttl( 80 inner: Arc<dyn HandleResolver>, 81 pool: SqlitePool, 82 ttl_seconds: u64, 83 metrics: SharedMetricsPublisher, 84 ) -> Self { 85 Self { 86 inner, 87 pool, 88 ttl_seconds, 89 metrics, 90 } 91 } 92 93 /// Generate the cache key for a handle. 94 /// 95 /// Uses MetroHash64 to generate a consistent hash of the handle 96 /// for use as the primary key. This provides better key distribution 97 /// and avoids issues with special characters in handles. 98 fn make_key(&self, handle: &str) -> u64 { 99 let mut h = MetroHash64::default(); 100 h.write(handle.as_bytes()); 101 h.finish() 102 } 103 104 /// Check if a cache entry is expired. 105 fn is_expired(&self, updated_timestamp: i64) -> bool { 106 let current_timestamp = SystemTime::now() 107 .duration_since(UNIX_EPOCH) 108 .unwrap_or_default() 109 .as_secs() as i64; 110 111 (current_timestamp - updated_timestamp) > (self.ttl_seconds as i64) 112 } 113} 114 115#[async_trait] 116impl HandleResolver for SqliteHandleResolver { 117 async fn resolve(&self, s: &str) -> Result<(String, u64), HandleResolverError> { 118 let handle = s.to_string(); 119 let key = self.make_key(&handle) as i64; // SQLite uses signed integers 120 121 // Try to get from SQLite cache first 122 let cached_result = 123 sqlx::query("SELECT result, updated FROM handle_resolution_cache WHERE key = ?1") 124 .bind(key) 125 .fetch_optional(&self.pool) 126 .await; 127 128 match cached_result { 129 Ok(Some(row)) => { 130 let cached_bytes: Vec<u8> = row.get("result"); 131 let updated_timestamp: i64 = row.get("updated"); 132 133 // Check if the entry is expired 134 if !self.is_expired(updated_timestamp) { 135 // Deserialize the cached result 136 match HandleResolutionResult::from_bytes(&cached_bytes) { 137 Ok(cached_result) => { 138 if let Some(did) = cached_result.to_did() { 139 tracing::debug!("Cache hit for handle {}: {}", handle, did); 140 self.metrics.incr("resolver.sqlite.cache_hit").await; 141 return Ok((did, cached_result.timestamp)); 142 } else { 143 tracing::debug!("Cache hit (not resolved) for handle {}", handle); 144 self.metrics 145 .incr("resolver.sqlite.cache_hit_not_resolved") 146 .await; 147 return Err(HandleResolverError::HandleNotFound); 148 } 149 } 150 Err(e) => { 151 tracing::warn!( 152 "Failed to deserialize cached result for handle {}: {}", 153 handle, 154 e 155 ); 156 self.metrics.incr("resolver.sqlite.deserialize_error").await; 157 // Fall through to re-resolve if deserialization fails 158 } 159 } 160 } else { 161 tracing::debug!("Cache entry expired for handle {}", handle); 162 self.metrics.incr("resolver.sqlite.cache_expired").await; 163 // Entry is expired, we'll re-resolve and update it 164 } 165 } 166 Ok(None) => { 167 tracing::debug!("Cache miss for handle {}, resolving...", handle); 168 self.metrics.incr("resolver.sqlite.cache_miss").await; 169 } 170 Err(e) => { 171 tracing::warn!("Failed to query SQLite cache for handle {}: {}", handle, e); 172 self.metrics.incr("resolver.sqlite.query_error").await; 173 // Fall through to resolve without caching on database error 174 } 175 } 176 177 // Not in cache or expired, resolve through inner resolver 178 let result = self.inner.resolve(s).await; 179 180 // Create and serialize resolution result 181 let resolution_result = match &result { 182 Ok((did, _timestamp)) => { 183 tracing::debug!( 184 "Caching successful resolution for handle {}: {}", 185 handle, 186 did 187 ); 188 match HandleResolutionResult::success(did) { 189 Ok(res) => res, 190 Err(e) => { 191 tracing::warn!("Failed to create resolution result: {}", e); 192 self.metrics 193 .incr("resolver.sqlite.result_create_error") 194 .await; 195 return result; 196 } 197 } 198 } 199 Err(e) => { 200 tracing::debug!("Caching failed resolution for handle {}: {}", handle, e); 201 match HandleResolutionResult::not_resolved() { 202 Ok(res) => res, 203 Err(err) => { 204 tracing::warn!("Failed to create not_resolved result: {}", err); 205 self.metrics 206 .incr("resolver.sqlite.result_create_error") 207 .await; 208 return result; 209 } 210 } 211 } 212 }; 213 214 // Serialize to bytes 215 match resolution_result.to_bytes() { 216 Ok(bytes) => { 217 let current_timestamp = SystemTime::now() 218 .duration_since(UNIX_EPOCH) 219 .unwrap_or_default() 220 .as_secs() as i64; 221 222 // Insert or update the cache entry 223 let query_result = sqlx::query( 224 r#" 225 INSERT INTO handle_resolution_cache (key, result, created, updated) 226 VALUES (?1, ?2, ?3, ?4) 227 ON CONFLICT(key) DO UPDATE SET 228 result = excluded.result, 229 updated = excluded.updated 230 "#, 231 ) 232 .bind(key) 233 .bind(&bytes) 234 .bind(current_timestamp) 235 .bind(current_timestamp) 236 .execute(&self.pool) 237 .await; 238 239 if let Err(e) = query_result { 240 tracing::warn!("Failed to cache handle resolution in SQLite: {}", e); 241 self.metrics.incr("resolver.sqlite.cache_set_error").await; 242 } else { 243 self.metrics.incr("resolver.sqlite.cache_set").await; 244 } 245 } 246 Err(e) => { 247 tracing::warn!( 248 "Failed to serialize resolution result for handle {}: {}", 249 handle, 250 e 251 ); 252 self.metrics.incr("resolver.sqlite.serialize_error").await; 253 } 254 } 255 256 result 257 } 258 259 async fn set(&self, handle: &str, did: &str) -> Result<(), HandleResolverError> { 260 // Normalize the handle to lowercase 261 let handle = handle.to_lowercase(); 262 263 // Update the SQLite cache 264 if let Ok(mut conn) = self.pool.acquire().await { 265 // Create a resolution result for the successful mapping 266 let resolution_result = match HandleResolutionResult::success(did) { 267 Ok(res) => res, 268 Err(e) => { 269 tracing::warn!( 270 "Failed to create resolution result for set operation: {}", 271 e 272 ); 273 self.metrics 274 .incr("resolver.sqlite.set_result_create_error") 275 .await; 276 // Still chain to inner resolver even if we can't cache 277 return self.inner.set(&handle, did).await; 278 } 279 }; 280 281 // Serialize to bytes 282 match resolution_result.to_bytes() { 283 Ok(bytes) => { 284 // Insert or update the cache entry 285 let timestamp = std::time::SystemTime::now() 286 .duration_since(std::time::UNIX_EPOCH) 287 .unwrap_or_default() 288 .as_secs() as i64; 289 290 let expires_at = timestamp + self.ttl_seconds as i64; 291 292 match sqlx::query( 293 "INSERT OR REPLACE INTO handle_resolution_cache (handle, resolved_value, created_at, expires_at) VALUES (?, ?, ?, ?)" 294 ) 295 .bind(&handle) 296 .bind(&bytes) 297 .bind(timestamp) 298 .bind(expires_at) 299 .execute(&mut *conn) 300 .await 301 { 302 Ok(_) => { 303 tracing::debug!("Set handle {} -> DID {} in SQLite cache", handle, did); 304 self.metrics.incr("resolver.sqlite.set_success").await; 305 } 306 Err(e) => { 307 tracing::warn!("Failed to set handle->DID mapping in SQLite: {}", e); 308 self.metrics.incr("resolver.sqlite.set_cache_error").await; 309 // Still chain to inner resolver even if cache update fails 310 } 311 } 312 } 313 Err(e) => { 314 tracing::warn!( 315 "Failed to serialize resolution result for set operation: {}", 316 e 317 ); 318 self.metrics 319 .incr("resolver.sqlite.set_serialize_error") 320 .await; 321 // Still chain to inner resolver even if serialization fails 322 } 323 } 324 } else { 325 tracing::warn!("Failed to get SQLite connection for set operation"); 326 self.metrics 327 .incr("resolver.sqlite.set_connection_error") 328 .await; 329 } 330 331 // Chain to inner resolver 332 self.inner.set(&handle, did).await 333 } 334} 335 336/// Create a new SQLite-backed handle resolver with default 90-day TTL. 337/// 338/// # Arguments 339/// 340/// * `inner` - The underlying resolver to use for actual resolution 341/// * `pool` - SQLite connection pool 342/// * `metrics` - Metrics publisher for telemetry 343/// 344/// # Example 345/// 346/// ```no_run 347/// use std::sync::Arc; 348/// use quickdid::handle_resolver::{create_base_resolver, create_sqlite_resolver, HandleResolver}; 349/// use quickdid::sqlite_schema::create_sqlite_pool; 350/// use quickdid::metrics::NoOpMetricsPublisher; 351/// 352/// # async fn example() -> anyhow::Result<()> { 353/// # use atproto_identity::resolve::HickoryDnsResolver; 354/// # use reqwest::Client; 355/// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 356/// # let http_client = Client::new(); 357/// # let metrics = Arc::new(NoOpMetricsPublisher); 358/// let base = create_base_resolver( 359/// dns_resolver, 360/// http_client, 361/// metrics.clone(), 362/// ); 363/// 364/// let pool = create_sqlite_pool("sqlite:./quickdid.db").await?; 365/// let resolver = create_sqlite_resolver(base, pool, metrics); 366/// let (did, timestamp) = resolver.resolve("alice.bsky.social").await.unwrap(); 367/// # Ok(()) 368/// # } 369/// ``` 370pub fn create_sqlite_resolver( 371 inner: Arc<dyn HandleResolver>, 372 pool: SqlitePool, 373 metrics: SharedMetricsPublisher, 374) -> Arc<dyn HandleResolver> { 375 Arc::new(SqliteHandleResolver::new(inner, pool, metrics)) 376} 377 378/// Create a new SQLite-backed handle resolver with custom TTL. 379/// 380/// # Arguments 381/// 382/// * `inner` - The underlying resolver to use for actual resolution 383/// * `pool` - SQLite connection pool 384/// * `ttl_seconds` - TTL for cache entries in seconds 385/// * `metrics` - Metrics publisher for telemetry 386pub fn create_sqlite_resolver_with_ttl( 387 inner: Arc<dyn HandleResolver>, 388 pool: SqlitePool, 389 ttl_seconds: u64, 390 metrics: SharedMetricsPublisher, 391) -> Arc<dyn HandleResolver> { 392 Arc::new(SqliteHandleResolver::with_ttl( 393 inner, 394 pool, 395 ttl_seconds, 396 metrics, 397 )) 398} 399 400#[cfg(test)] 401mod tests { 402 use super::*; 403 404 // Mock handle resolver for testing 405 #[derive(Clone)] 406 struct MockHandleResolver { 407 should_fail: bool, 408 expected_did: String, 409 } 410 411 #[async_trait] 412 impl HandleResolver for MockHandleResolver { 413 async fn resolve(&self, _handle: &str) -> Result<(String, u64), HandleResolverError> { 414 if self.should_fail { 415 Err(HandleResolverError::MockResolutionFailure) 416 } else { 417 let timestamp = std::time::SystemTime::now() 418 .duration_since(std::time::UNIX_EPOCH) 419 .unwrap_or_default() 420 .as_secs(); 421 Ok((self.expected_did.clone(), timestamp)) 422 } 423 } 424 } 425 426 #[tokio::test] 427 async fn test_sqlite_handle_resolver_cache_hit() { 428 // Create in-memory SQLite database for testing 429 let pool = SqlitePool::connect("sqlite::memory:") 430 .await 431 .expect("Failed to connect to in-memory SQLite"); 432 433 // Create the schema 434 crate::sqlite_schema::create_schema(&pool) 435 .await 436 .expect("Failed to create schema"); 437 438 // Create mock resolver 439 let mock_resolver = Arc::new(MockHandleResolver { 440 should_fail: false, 441 expected_did: "did:plc:testuser123".to_string(), 442 }); 443 444 // Create metrics publisher 445 let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher); 446 447 // Create SQLite-backed resolver 448 let sqlite_resolver = 449 SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics); 450 451 let test_handle = "alice.bsky.social"; 452 let expected_key = sqlite_resolver.make_key(test_handle) as i64; 453 454 // Verify database is empty initially 455 let initial_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 456 .fetch_one(&pool) 457 .await 458 .expect("Failed to query initial count"); 459 assert_eq!(initial_count, 0); 460 461 // First resolution - should call inner resolver and cache the result 462 let (result1, _timestamp1) = sqlite_resolver.resolve(test_handle).await.unwrap(); 463 assert_eq!(result1, "did:plc:testuser123"); 464 465 // Verify record was inserted 466 let count_after_first: i64 = 467 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 468 .fetch_one(&pool) 469 .await 470 .expect("Failed to query count after first resolution"); 471 assert_eq!(count_after_first, 1); 472 473 // Verify the cached record has correct key and non-empty result 474 let cached_record = sqlx::query( 475 "SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1", 476 ) 477 .bind(expected_key) 478 .fetch_one(&pool) 479 .await 480 .expect("Failed to fetch cached record"); 481 482 let cached_key: i64 = cached_record.get("key"); 483 let cached_result: Vec<u8> = cached_record.get("result"); 484 let cached_created: i64 = cached_record.get("created"); 485 let cached_updated: i64 = cached_record.get("updated"); 486 487 assert_eq!(cached_key, expected_key); 488 assert!( 489 !cached_result.is_empty(), 490 "Cached result should not be empty" 491 ); 492 assert!(cached_created > 0, "Created timestamp should be positive"); 493 assert!(cached_updated > 0, "Updated timestamp should be positive"); 494 assert_eq!( 495 cached_created, cached_updated, 496 "Created and updated should be equal on first insert" 497 ); 498 499 // Verify we can deserialize the cached result 500 let resolution_result = 501 crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result) 502 .expect("Failed to deserialize cached result"); 503 let cached_did = resolution_result.to_did().expect("Should have a DID"); 504 assert_eq!(cached_did, "did:plc:testuser123"); 505 506 // Second resolution - should hit cache (no additional database insert) 507 let (result2, _timestamp2) = sqlite_resolver.resolve(test_handle).await.unwrap(); 508 assert_eq!(result2, "did:plc:testuser123"); 509 510 // Verify count hasn't changed (cache hit, no new insert) 511 let count_after_second: i64 = 512 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 513 .fetch_one(&pool) 514 .await 515 .expect("Failed to query count after second resolution"); 516 assert_eq!(count_after_second, 1); 517 } 518 519 #[tokio::test] 520 async fn test_sqlite_handle_resolver_cache_error() { 521 // Create in-memory SQLite database for testing 522 let pool = SqlitePool::connect("sqlite::memory:") 523 .await 524 .expect("Failed to connect to in-memory SQLite"); 525 526 // Create the schema 527 crate::sqlite_schema::create_schema(&pool) 528 .await 529 .expect("Failed to create schema"); 530 531 // Create mock resolver that fails 532 let mock_resolver = Arc::new(MockHandleResolver { 533 should_fail: true, 534 expected_did: String::new(), 535 }); 536 537 // Create metrics publisher 538 let metrics = Arc::new(crate::metrics::NoOpMetricsPublisher); 539 540 // Create SQLite-backed resolver 541 let sqlite_resolver = 542 SqliteHandleResolver::with_ttl(mock_resolver, pool.clone(), 3600, metrics); 543 544 let test_handle = "error.bsky.social"; 545 let expected_key = sqlite_resolver.make_key(test_handle) as i64; 546 547 // Verify database is empty initially 548 let initial_count: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 549 .fetch_one(&pool) 550 .await 551 .expect("Failed to query initial count"); 552 assert_eq!(initial_count, 0); 553 554 // First resolution - should fail and cache the failure 555 let result1 = sqlite_resolver.resolve(test_handle).await; 556 assert!(result1.is_err()); 557 558 // Match the specific error type we expect 559 match result1 { 560 Err(HandleResolverError::MockResolutionFailure) => {} 561 other => panic!("Expected MockResolutionFailure, got {:?}", other), 562 } 563 564 // Verify the failure was cached 565 let count_after_first: i64 = 566 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 567 .fetch_one(&pool) 568 .await 569 .expect("Failed to query count after first resolution"); 570 assert_eq!(count_after_first, 1); 571 572 // Verify the cached error record 573 let cached_record = sqlx::query( 574 "SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1", 575 ) 576 .bind(expected_key) 577 .fetch_one(&pool) 578 .await 579 .expect("Failed to fetch cached error record"); 580 581 let cached_key: i64 = cached_record.get("key"); 582 let cached_result: Vec<u8> = cached_record.get("result"); 583 let cached_created: i64 = cached_record.get("created"); 584 let cached_updated: i64 = cached_record.get("updated"); 585 586 assert_eq!(cached_key, expected_key); 587 assert!( 588 !cached_result.is_empty(), 589 "Cached error result should not be empty" 590 ); 591 assert!(cached_created > 0, "Created timestamp should be positive"); 592 assert!(cached_updated > 0, "Updated timestamp should be positive"); 593 assert_eq!( 594 cached_created, cached_updated, 595 "Created and updated should be equal on first insert" 596 ); 597 598 // Verify we can deserialize the cached error result 599 let resolution_result = 600 crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result) 601 .expect("Failed to deserialize cached error result"); 602 let cached_did = resolution_result.to_did(); 603 assert!(cached_did.is_none(), "Error result should have no DID"); 604 605 // Second resolution - should hit cache with error (no additional database operations) 606 let result2 = sqlite_resolver.resolve(test_handle).await; 607 assert!(result2.is_err()); 608 609 // Match the specific error type we expect from cache 610 match result2 { 611 Err(HandleResolverError::HandleNotFound) => {} // Cache returns HandleNotFound for "not resolved" 612 other => panic!("Expected HandleNotFound from cache, got {:?}", other), 613 } 614 615 // Verify count hasn't changed (cache hit, no new operations) 616 let count_after_second: i64 = 617 sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache") 618 .fetch_one(&pool) 619 .await 620 .expect("Failed to query count after second resolution"); 621 assert_eq!(count_after_second, 1); 622 } 623}