QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
52
fork

Configure Feed

Select the types of activity you want to include in your feed.

feature: rate limited handle resolver

+440 -21
+8 -4
CLAUDE.md
··· 32 32 33 33 1. **Handle Resolution** (`src/handle_resolver.rs`) 34 34 - `BaseHandleResolver`: Core resolution using DNS and HTTP 35 + - `RateLimitedHandleResolver`: Semaphore-based rate limiting for concurrent resolutions 35 36 - `CachingHandleResolver`: In-memory caching layer 36 37 - `RedisHandleResolver`: Redis-backed persistent caching with 90-day TTL 38 + - `SqliteHandleResolver`: SQLite-backed persistent caching 37 39 - Uses binary serialization via `HandleResolutionResult` for space efficiency 38 40 39 41 2. **Binary Serialization** (`src/handle_resolution_result.rs`) ··· 64 66 - **Key Prefixes**: Configurable via `QUEUE_REDIS_PREFIX` environment variable 65 67 66 68 ### Handle Resolution Flow 67 - 1. Check Redis cache (if configured) 68 - 2. Fall back to in-memory cache 69 + 1. Check cache (Redis/SQLite/in-memory based on configuration) 70 + 2. If not cached, acquire rate limit permit (if rate limiting enabled) 69 71 3. Perform DNS TXT lookup or HTTP well-known query 70 72 4. Cache result with appropriate TTL 71 73 5. Return DID or error ··· 80 82 - `HTTP_PORT`: Server port (default: 8080) 81 83 - `PLC_HOSTNAME`: PLC directory hostname (default: plc.directory) 82 84 - `REDIS_URL`: Redis connection URL for caching 83 - - `QUEUE_ADAPTER`: Queue type - 'mpsc' or 'redis' (default: mpsc) 85 + - `QUEUE_ADAPTER`: Queue type - 'mpsc', 'redis', 'sqlite', or 'noop' (default: mpsc) 84 86 - `QUEUE_REDIS_PREFIX`: Redis key prefix for queues (default: queue:handleresolver:) 85 - - `QUEUE_WORKER_ID`: Worker ID for Redis queue (auto-generated if not set) 87 + - `QUEUE_WORKER_ID`: Worker ID for Redis queue (default: worker1) 88 + - `RESOLVER_MAX_CONCURRENT`: Maximum concurrent handle resolutions (default: 0 = disabled) 86 89 - `RUST_LOG`: Logging level (e.g., debug, info) 87 90 88 91 ## Error Handling ··· 134 137 - MetroHash64 for fast key generation 135 138 - Connection pooling for Redis 136 139 - Configurable TTLs for cache entries 140 + - Rate limiting via semaphore-based concurrency control 137 141 138 142 ### Code Style 139 143 - Follow existing Rust idioms and patterns
+119 -3
docs/configuration-reference.md
··· 8 8 - [Network Configuration](#network-configuration) 9 9 - [Caching Configuration](#caching-configuration) 10 10 - [Queue Configuration](#queue-configuration) 11 - - [Security Configuration](#security-configuration) 12 - - [Advanced Configuration](#advanced-configuration) 11 + - [Rate Limiting Configuration](#rate-limiting-configuration) 13 12 - [Configuration Examples](#configuration-examples) 14 13 - [Validation Rules](#validation-rules) 15 14 ··· 453 452 - **Disk space concerns**: Lower values (1000-5000) 454 453 - **High ingestion rate**: Higher values (50000-1000000) 455 454 455 + ## Rate Limiting Configuration 456 + 457 + ### `RESOLVER_MAX_CONCURRENT` 458 + 459 + **Required**: No 460 + **Type**: Integer 461 + **Default**: `0` (disabled) 462 + **Range**: 0-10000 463 + **Constraints**: Must be between 0 and 10000 464 + 465 + Maximum concurrent handle resolutions allowed. When set to a value greater than 0, enables semaphore-based rate limiting to protect upstream DNS and HTTP services from being overwhelmed. 466 + 467 + **How it works**: 468 + - Uses a semaphore to limit concurrent resolutions 469 + - Applied between the base resolver and caching layers 470 + - Requests wait for an available permit before resolution 471 + - Helps prevent overwhelming upstream services 472 + 473 + **Examples**: 474 + ```bash 475 + # Disabled (default) 476 + RESOLVER_MAX_CONCURRENT=0 477 + 478 + # Light rate limiting 479 + RESOLVER_MAX_CONCURRENT=10 480 + 481 + # Moderate rate limiting 482 + RESOLVER_MAX_CONCURRENT=50 483 + 484 + # Heavy traffic with rate limiting 485 + RESOLVER_MAX_CONCURRENT=100 486 + 487 + # Maximum allowed 488 + RESOLVER_MAX_CONCURRENT=10000 489 + ``` 490 + 491 + **Recommendations**: 492 + - **Development**: 0 (disabled) or 10-50 for testing 493 + - **Production (low traffic)**: 50-100 494 + - **Production (high traffic)**: 100-500 495 + - **Production (very high traffic)**: 500-1000 496 + - **Testing rate limiting**: 1-5 to observe behavior 497 + 498 + **Placement in resolver stack**: 499 + ``` 500 + Request → Cache → RateLimited → Base → DNS/HTTP 501 + ``` 502 + 503 + ### `RESOLVER_MAX_CONCURRENT_TIMEOUT_MS` 504 + 505 + **Required**: No 506 + **Type**: Integer (milliseconds) 507 + **Default**: `0` (no timeout) 508 + **Range**: 0-60000 509 + **Constraints**: Must be between 0 and 60000 (60 seconds max) 510 + 511 + Timeout for acquiring a rate limit permit in milliseconds. When set to a value greater than 0, requests will timeout if they cannot acquire a permit within the specified time, preventing them from waiting indefinitely when the rate limiter is at capacity. 512 + 513 + **How it works**: 514 + - Applied when `RESOLVER_MAX_CONCURRENT` is enabled (> 0) 515 + - Uses `tokio::time::timeout` to limit permit acquisition time 516 + - Returns an error if timeout expires before permit is acquired 517 + - Prevents request queue buildup during high load 518 + 519 + **Examples**: 520 + ```bash 521 + # No timeout (default) 522 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=0 523 + 524 + # Quick timeout for responsive failures (100ms) 525 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=100 526 + 527 + # Moderate timeout (1 second) 528 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=1000 529 + 530 + # Longer timeout for production (5 seconds) 531 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=5000 532 + 533 + # Maximum allowed (60 seconds) 534 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=60000 535 + ``` 536 + 537 + **Recommendations**: 538 + - **Development**: 100-1000ms for quick feedback 539 + - **Production (low latency)**: 1000-5000ms 540 + - **Production (high latency tolerance)**: 5000-30000ms 541 + - **Testing**: 100ms to quickly identify bottlenecks 542 + - **0**: Use when you want requests to wait indefinitely 543 + 544 + **Error behavior**: 545 + When a timeout occurs, the request fails with: 546 + ``` 547 + Rate limit permit acquisition timed out after {timeout}ms 548 + ``` 549 + 456 550 ## Configuration Examples 457 551 458 552 ### Minimal Development Configuration ··· 486 580 QUEUE_REDIS_TIMEOUT=5 487 581 QUEUE_BUFFER_SIZE=5000 488 582 583 + # Rate Limiting (optional, recommended for production) 584 + RESOLVER_MAX_CONCURRENT=100 585 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=5000 # 5 second timeout 586 + 489 587 # Logging 490 588 RUST_LOG=info 491 589 ``` ··· 511 609 QUEUE_ADAPTER=sqlite 512 610 QUEUE_BUFFER_SIZE=5000 513 611 QUEUE_SQLITE_MAX_SIZE=10000 612 + 613 + # Rate Limiting (optional, recommended for production) 614 + RESOLVER_MAX_CONCURRENT=100 615 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=5000 # 5 second timeout 514 616 515 617 # Logging 516 618 RUST_LOG=info ··· 543 645 # Performance 544 646 QUEUE_BUFFER_SIZE=10000 545 647 648 + # Rate Limiting (important for HA deployments) 649 + RESOLVER_MAX_CONCURRENT=500 650 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=10000 # 10 second timeout for HA 651 + 546 652 # Logging 547 653 RUST_LOG=warn 548 654 ``` ··· 655 761 - Must be one of: `mpsc`, `redis`, `sqlite`, `noop`, `none` 656 762 - Case-sensitive 657 763 658 - 4. **Port** (`HTTP_PORT`): 764 + 4. **Rate Limiting** (`RESOLVER_MAX_CONCURRENT`): 765 + - Must be between 0 and 10000 766 + - 0 = disabled (default) 767 + - Values > 10000 will fail validation 768 + 769 + 5. **Rate Limiting Timeout** (`RESOLVER_MAX_CONCURRENT_TIMEOUT_MS`): 770 + - Must be between 0 and 60000 (milliseconds) 771 + - 0 = no timeout (default) 772 + - Values > 60000 will fail validation 773 + 774 + 6. **Port** (`HTTP_PORT`): 659 775 - Must be valid port number (1-65535) 660 776 - Ports < 1024 require elevated privileges 661 777
+26
docs/production-deployment.md
··· 184 184 # RUST_LOG_FORMAT=json 185 185 186 186 # ---------------------------------------------------------------------------- 187 + # RATE LIMITING CONFIGURATION 188 + # ---------------------------------------------------------------------------- 189 + 190 + # Maximum concurrent handle resolutions (default: 0 = disabled) 191 + # When > 0, enables semaphore-based rate limiting 192 + # Range: 0-10000 (0 = disabled) 193 + # Protects upstream DNS/HTTP services from being overwhelmed 194 + RESOLVER_MAX_CONCURRENT=0 195 + 196 + # Timeout for acquiring rate limit permit in milliseconds (default: 0 = no timeout) 197 + # When > 0, requests will timeout if they can't acquire a permit within this time 198 + # Range: 0-60000 (max 60 seconds) 199 + # Prevents requests from waiting indefinitely when rate limiter is at capacity 200 + RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=0 201 + 202 + # ---------------------------------------------------------------------------- 187 203 # PERFORMANCE TUNING 188 204 # ---------------------------------------------------------------------------- 189 205 ··· 838 854 3. **Queue Adapter** (`QUEUE_ADAPTER`): 839 855 - Must be one of: `mpsc`, `redis`, `sqlite`, `noop`, `none` 840 856 - Case-sensitive 857 + 858 + 4. **Rate Limiting** (`RESOLVER_MAX_CONCURRENT`): 859 + - Must be between 0 and 10000 860 + - 0 = disabled (default) 861 + - When > 0, limits concurrent handle resolutions 862 + 863 + 5. **Rate Limiting Timeout** (`RESOLVER_MAX_CONCURRENT_TIMEOUT_MS`): 864 + - Must be between 0 and 60000 (milliseconds) 865 + - 0 = no timeout (default) 866 + - Maximum: 60000ms (60 seconds) 841 867 842 868 ### Validation Errors 843 869
+26 -3
src/bin/quickdid.rs
··· 8 8 cache::create_redis_pool, 9 9 config::Config, 10 10 handle_resolver::{ 11 - create_base_resolver, create_caching_resolver, create_redis_resolver_with_ttl, 12 - create_sqlite_resolver_with_ttl, 11 + create_base_resolver, create_caching_resolver, create_rate_limited_resolver_with_timeout, 12 + create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl, 13 13 }, 14 14 sqlite_schema::create_sqlite_pool, 15 15 handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config}, ··· 99 99 println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)"); 100 100 println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)"); 101 101 println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)"); 102 + println!(); 103 + println!(" RATE LIMITING:"); 104 + println!(" RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)"); 105 + println!(" RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)"); 102 106 println!(); 103 107 println!("For more information, visit: https://github.com/smokesignal.events/quickdid"); 104 108 return true; ··· 190 194 let dns_resolver_arc = Arc::new(dns_resolver); 191 195 192 196 // Create base handle resolver using factory function 193 - let base_handle_resolver = create_base_resolver(dns_resolver_arc.clone(), http_client.clone()); 197 + let mut base_handle_resolver = create_base_resolver(dns_resolver_arc.clone(), http_client.clone()); 198 + 199 + // Apply rate limiting if configured 200 + if config.resolver_max_concurrent > 0 { 201 + let timeout_info = if config.resolver_max_concurrent_timeout_ms > 0 { 202 + format!(", {}ms timeout", config.resolver_max_concurrent_timeout_ms) 203 + } else { 204 + String::new() 205 + }; 206 + tracing::info!( 207 + "Applying rate limiting to handle resolver (max {} concurrent resolutions{})", 208 + config.resolver_max_concurrent, 209 + timeout_info 210 + ); 211 + base_handle_resolver = create_rate_limited_resolver_with_timeout( 212 + base_handle_resolver, 213 + config.resolver_max_concurrent, 214 + config.resolver_max_concurrent_timeout_ms 215 + ); 216 + } 194 217 195 218 // Create Redis pool if configured 196 219 let redis_pool = config
+11 -3
src/cache.rs
··· 1 1 //! Redis cache utilities for QuickDID 2 2 3 - use anyhow::Result; 4 3 use deadpool_redis::{Config, Pool, Runtime}; 4 + use thiserror::Error; 5 + 6 + /// Cache-specific errors 7 + #[derive(Debug, Error)] 8 + pub enum CacheError { 9 + /// Redis pool creation failed 10 + #[error("error-quickdid-cache-1 Redis pool creation failed: {0}")] 11 + RedisPoolCreationFailed(String), 12 + } 5 13 6 14 /// Create a Redis connection pool from a Redis URL. 7 15 /// ··· 14 22 /// Returns an error if: 15 23 /// - The Redis URL is invalid 16 24 /// - Pool creation fails 17 - pub fn create_redis_pool(redis_url: &str) -> Result<Pool> { 25 + pub fn create_redis_pool(redis_url: &str) -> Result<Pool, CacheError> { 18 26 let config = Config::from_url(redis_url); 19 27 let pool = config 20 28 .create_pool(Some(Runtime::Tokio1)) 21 - .map_err(|e| anyhow::anyhow!("error-quickdid-cache-1 Redis pool creation failed: {}", e))?; 29 + .map_err(|e| CacheError::RedisPoolCreationFailed(e.to_string()))?; 22 30 Ok(pool) 23 31 }
+26 -4
src/config.rs
··· 44 44 45 45 /// Invalid configuration value that doesn't meet expected format or constraints 46 46 /// 47 - /// Example: Invalid QUEUE_ADAPTER value (must be 'mpsc', 'redis', or 'noop') 47 + /// Example: Invalid QUEUE_ADAPTER value (must be 'mpsc', 'redis', 'sqlite', 'noop', or 'none') 48 48 #[error("error-quickdid-config-2 Invalid configuration value: {0}")] 49 49 InvalidValue(String), 50 50 ··· 84 84 /// Validated configuration for QuickDID service 85 85 /// 86 86 /// This struct contains all configuration after validation and processing. 87 - /// Use `Config::from_args()` to create from command-line arguments and environment variables. 87 + /// Use `Config::from_env()` to create from environment variables. 88 88 /// 89 89 /// ## Example 90 90 /// ··· 164 164 /// When exceeded, oldest entries are deleted to maintain this limit. 165 165 /// Set to 0 to disable work shedding (unlimited queue size). 166 166 pub queue_sqlite_max_size: u64, 167 + 168 + /// Maximum concurrent handle resolutions allowed (rate limiting). 169 + /// When set to > 0, enables rate limiting using a semaphore. 170 + /// Default: 0 (disabled) 171 + pub resolver_max_concurrent: usize, 172 + 173 + /// Timeout for acquiring rate limit permit in milliseconds. 174 + /// When set to > 0, requests will timeout if they can't acquire a permit within this time. 175 + /// Default: 0 (no timeout) 176 + pub resolver_max_concurrent_timeout_ms: u64, 167 177 } 168 178 169 179 impl Config { ··· 242 252 cache_ttl_sqlite: parse_env("CACHE_TTL_SQLITE", 7776000)?, 243 253 queue_redis_timeout: parse_env("QUEUE_REDIS_TIMEOUT", 5)?, 244 254 queue_sqlite_max_size: parse_env("QUEUE_SQLITE_MAX_SIZE", 10000)?, 255 + resolver_max_concurrent: parse_env("RESOLVER_MAX_CONCURRENT", 0)?, 256 + resolver_max_concurrent_timeout_ms: parse_env("RESOLVER_MAX_CONCURRENT_TIMEOUT_MS", 0)?, 245 257 }) 246 258 } 247 259 ··· 250 262 /// Checks: 251 263 /// - Cache TTL values are positive (> 0) 252 264 /// - Queue timeout is positive (> 0) 253 - /// - Queue adapter is a valid value ('mpsc', 'redis', 'noop', 'none') 265 + /// - Queue adapter is a valid value ('mpsc', 'redis', 'sqlite', 'noop', 'none') 254 266 /// 255 267 /// ## Example 256 268 /// ··· 293 305 "mpsc" | "redis" | "sqlite" | "noop" | "none" => {} 294 306 _ => { 295 307 return Err(ConfigError::InvalidValue(format!( 296 - "Invalid QUEUE_ADAPTER '{}', must be 'mpsc', 'redis', 'sqlite', or 'noop'", 308 + "Invalid QUEUE_ADAPTER '{}', must be 'mpsc', 'redis', 'sqlite', 'noop', or 'none'", 297 309 self.queue_adapter 298 310 ))); 299 311 } 312 + } 313 + if self.resolver_max_concurrent > 10000 { 314 + return Err(ConfigError::InvalidValue( 315 + "RESOLVER_MAX_CONCURRENT must be between 0 and 10000".to_string(), 316 + )); 317 + } 318 + if self.resolver_max_concurrent_timeout_ms > 60000 { 319 + return Err(ConfigError::InvalidTimeout( 320 + "RESOLVER_MAX_CONCURRENT_TIMEOUT_MS must be <= 60000 (60 seconds)".to_string(), 321 + )); 300 322 } 301 323 Ok(()) 302 324 }
+6 -3
src/handle_resolution_result.rs
··· 11 11 /// Errors that can occur during handle resolution result operations 12 12 #[derive(Debug, Error)] 13 13 pub enum HandleResolutionError { 14 - #[error("error-quickdid-resolution-1 System time error: {0}")] 14 + /// System time error when getting timestamp 15 + #[error("error-quickdid-result-1 System time error: {0}")] 15 16 SystemTime(String), 16 17 17 - #[error("error-quickdid-serialization-1 Failed to serialize resolution result: {0}")] 18 + /// Failed to serialize resolution result to binary format 19 + #[error("error-quickdid-result-2 Failed to serialize resolution result: {0}")] 18 20 Serialization(String), 19 21 20 - #[error("error-quickdid-serialization-2 Failed to deserialize resolution result: {0}")] 22 + /// Failed to deserialize resolution result from binary format 23 + #[error("error-quickdid-result-3 Failed to deserialize resolution result: {0}")] 21 24 Deserialization(String), 22 25 } 23 26
+3
src/handle_resolver/mod.rs
··· 9 9 //! implementations: 10 10 //! 11 11 //! - [`BaseHandleResolver`]: Core resolver that performs actual DNS/HTTP lookups 12 + //! - [`RateLimitedHandleResolver`]: Rate limiting wrapper using semaphore-based concurrency control 12 13 //! - [`CachingHandleResolver`]: In-memory caching wrapper with configurable TTL 13 14 //! - [`RedisHandleResolver`]: Redis-backed persistent caching with binary serialization 14 15 //! - [`SqliteHandleResolver`]: SQLite-backed persistent caching for single-instance deployments ··· 43 44 mod base; 44 45 mod errors; 45 46 mod memory; 47 + mod rate_limited; 46 48 mod redis; 47 49 mod sqlite; 48 50 mod traits; ··· 54 56 // Factory functions for creating resolvers 55 57 pub use base::create_base_resolver; 56 58 pub use memory::create_caching_resolver; 59 + pub use rate_limited::{create_rate_limited_resolver, create_rate_limited_resolver_with_timeout}; 57 60 pub use redis::{create_redis_resolver, create_redis_resolver_with_ttl}; 58 61 pub use sqlite::{create_sqlite_resolver, create_sqlite_resolver_with_ttl};
+213
src/handle_resolver/rate_limited.rs
··· 1 + //! Rate-limited handle resolver implementation. 2 + //! 3 + //! This module provides a handle resolver wrapper that limits concurrent 4 + //! resolution requests using a semaphore to implement basic rate limiting. 5 + 6 + use super::errors::HandleResolverError; 7 + use super::traits::HandleResolver; 8 + use async_trait::async_trait; 9 + use std::sync::Arc; 10 + use std::time::Duration; 11 + use tokio::sync::Semaphore; 12 + use tokio::time::timeout; 13 + 14 + /// Rate-limited handle resolver that constrains concurrent resolutions. 15 + /// 16 + /// This resolver wraps an inner resolver and uses a semaphore to limit 17 + /// the number of concurrent resolution requests. This provides basic 18 + /// rate limiting and protects upstream services from being overwhelmed. 19 + /// 20 + /// # Architecture 21 + /// 22 + /// The rate limiter should be placed between the base resolver and any 23 + /// caching layers: 24 + /// ```text 25 + /// Request -> Cache -> RateLimited -> Base -> DNS/HTTP 26 + /// ``` 27 + /// 28 + /// # Example 29 + /// 30 + /// ```no_run 31 + /// use std::sync::Arc; 32 + /// use quickdid::handle_resolver::{ 33 + /// create_base_resolver, 34 + /// create_rate_limited_resolver, 35 + /// HandleResolver, 36 + /// }; 37 + /// 38 + /// # async fn example() { 39 + /// # use atproto_identity::resolve::HickoryDnsResolver; 40 + /// # use reqwest::Client; 41 + /// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 42 + /// # let http_client = Client::new(); 43 + /// // Create base resolver 44 + /// let base = create_base_resolver(dns_resolver, http_client); 45 + /// 46 + /// // Wrap with rate limiting (max 10 concurrent resolutions) 47 + /// let rate_limited = create_rate_limited_resolver(base, 10); 48 + /// 49 + /// // Use the rate-limited resolver 50 + /// let did = rate_limited.resolve("alice.bsky.social").await.unwrap(); 51 + /// # } 52 + /// ``` 53 + pub(super) struct RateLimitedHandleResolver { 54 + /// Inner resolver that performs actual resolution. 55 + inner: Arc<dyn HandleResolver>, 56 + 57 + /// Semaphore for limiting concurrent resolutions. 58 + semaphore: Arc<Semaphore>, 59 + 60 + /// Optional timeout for acquiring permits (in milliseconds). 61 + /// When None or 0, no timeout is applied. 62 + timeout_ms: Option<u64>, 63 + } 64 + 65 + impl RateLimitedHandleResolver { 66 + /// Create a new rate-limited resolver. 67 + /// 68 + /// # Arguments 69 + /// 70 + /// * `inner` - The inner resolver to wrap 71 + /// * `max_concurrent` - Maximum number of concurrent resolutions allowed 72 + pub fn new(inner: Arc<dyn HandleResolver>, max_concurrent: usize) -> Self { 73 + Self { 74 + inner, 75 + semaphore: Arc::new(Semaphore::new(max_concurrent)), 76 + timeout_ms: None, 77 + } 78 + } 79 + 80 + /// Create a new rate-limited resolver with timeout. 81 + /// 82 + /// # Arguments 83 + /// 84 + /// * `inner` - The inner resolver to wrap 85 + /// * `max_concurrent` - Maximum number of concurrent resolutions allowed 86 + /// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout) 87 + pub fn new_with_timeout(inner: Arc<dyn HandleResolver>, max_concurrent: usize, timeout_ms: u64) -> Self { 88 + Self { 89 + inner, 90 + semaphore: Arc::new(Semaphore::new(max_concurrent)), 91 + timeout_ms: if timeout_ms > 0 { Some(timeout_ms) } else { None }, 92 + } 93 + } 94 + } 95 + 96 + #[async_trait] 97 + impl HandleResolver for RateLimitedHandleResolver { 98 + async fn resolve(&self, s: &str) -> Result<String, HandleResolverError> { 99 + // Acquire a permit from the semaphore, with optional timeout 100 + let _permit = match self.timeout_ms { 101 + Some(timeout_ms) if timeout_ms > 0 => { 102 + // Apply timeout when acquiring permit 103 + let duration = Duration::from_millis(timeout_ms); 104 + match timeout(duration, self.semaphore.acquire()).await { 105 + Ok(Ok(permit)) => permit, 106 + Ok(Err(e)) => { 107 + // Semaphore error (e.g., closed) 108 + return Err(HandleResolverError::ResolutionFailed( 109 + format!("Failed to acquire rate limit permit: {}", e) 110 + )); 111 + } 112 + Err(_) => { 113 + // Timeout occurred 114 + return Err(HandleResolverError::ResolutionFailed( 115 + format!("Rate limit permit acquisition timed out after {}ms", timeout_ms) 116 + )); 117 + } 118 + } 119 + } 120 + _ => { 121 + // No timeout configured, wait indefinitely 122 + self.semaphore.acquire().await 123 + .map_err(|e| HandleResolverError::ResolutionFailed( 124 + format!("Failed to acquire rate limit permit: {}", e) 125 + ))? 126 + } 127 + }; 128 + 129 + // With permit acquired, forward to inner resolver 130 + self.inner.resolve(s).await 131 + } 132 + } 133 + 134 + /// Create a rate-limited handle resolver. 135 + /// 136 + /// This factory function creates a new [`RateLimitedHandleResolver`] that wraps 137 + /// the provided inner resolver with concurrency limiting. 138 + /// 139 + /// # Arguments 140 + /// 141 + /// * `inner` - The resolver to wrap with rate limiting 142 + /// * `max_concurrent` - Maximum number of concurrent resolutions allowed 143 + /// 144 + /// # Returns 145 + /// 146 + /// An `Arc<dyn HandleResolver>` that can be used wherever a handle resolver is needed. 147 + /// 148 + /// # Example 149 + /// 150 + /// ```no_run 151 + /// use std::sync::Arc; 152 + /// use quickdid::handle_resolver::{ 153 + /// create_base_resolver, 154 + /// create_rate_limited_resolver, 155 + /// }; 156 + /// 157 + /// # async fn example() { 158 + /// # use atproto_identity::resolve::HickoryDnsResolver; 159 + /// # use reqwest::Client; 160 + /// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 161 + /// # let http_client = Client::new(); 162 + /// let base = create_base_resolver(dns_resolver, http_client); 163 + /// let rate_limited = create_rate_limited_resolver(base, 10); 164 + /// # } 165 + /// ``` 166 + pub fn create_rate_limited_resolver( 167 + inner: Arc<dyn HandleResolver>, 168 + max_concurrent: usize, 169 + ) -> Arc<dyn HandleResolver> { 170 + Arc::new(RateLimitedHandleResolver::new(inner, max_concurrent)) 171 + } 172 + 173 + /// Create a rate-limited handle resolver with timeout. 174 + /// 175 + /// This factory function creates a new [`RateLimitedHandleResolver`] that wraps 176 + /// the provided inner resolver with concurrency limiting and timeout for permit acquisition. 177 + /// 178 + /// # Arguments 179 + /// 180 + /// * `inner` - The resolver to wrap with rate limiting 181 + /// * `max_concurrent` - Maximum number of concurrent resolutions allowed 182 + /// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout) 183 + /// 184 + /// # Returns 185 + /// 186 + /// An `Arc<dyn HandleResolver>` that can be used wherever a handle resolver is needed. 187 + /// 188 + /// # Example 189 + /// 190 + /// ```no_run 191 + /// use std::sync::Arc; 192 + /// use quickdid::handle_resolver::{ 193 + /// create_base_resolver, 194 + /// create_rate_limited_resolver_with_timeout, 195 + /// }; 196 + /// 197 + /// # async fn example() { 198 + /// # use atproto_identity::resolve::HickoryDnsResolver; 199 + /// # use reqwest::Client; 200 + /// # let dns_resolver = Arc::new(HickoryDnsResolver::create_resolver(&[])); 201 + /// # let http_client = Client::new(); 202 + /// let base = create_base_resolver(dns_resolver, http_client); 203 + /// // Rate limit with 10 concurrent resolutions and 5 second timeout 204 + /// let rate_limited = create_rate_limited_resolver_with_timeout(base, 10, 5000); 205 + /// # } 206 + /// ``` 207 + pub fn create_rate_limited_resolver_with_timeout( 208 + inner: Arc<dyn HandleResolver>, 209 + max_concurrent: usize, 210 + timeout_ms: u64, 211 + ) -> Arc<dyn HandleResolver> { 212 + Arc::new(RateLimitedHandleResolver::new_with_timeout(inner, max_concurrent, timeout_ms)) 213 + }
+2 -1
src/handle_resolver_task.rs
··· 16 16 /// Handle resolver task errors 17 17 #[derive(Error, Debug)] 18 18 pub(crate) enum HandleResolverError { 19 - #[error("Queue adapter health check failed: adapter is not healthy")] 19 + /// Queue adapter health check failed 20 + #[error("error-quickdid-task-1 Queue adapter health check failed: adapter is not healthy")] 20 21 QueueAdapterUnhealthy, 21 22 } 22 23