+58
-22
CLAUDE.md
+58
-22
CLAUDE.md
···
3
3
## Overview
4
4
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
5
5
6
+
## Configuration
7
+
8
+
QuickDID follows the 12-factor app methodology and uses environment variables exclusively for configuration. There are no command-line arguments except for `--version` and `--help`.
9
+
10
+
Configuration is validated at startup, and the service will exit with specific error codes if validation fails:
11
+
- `error-quickdid-config-1`: Missing required environment variable
12
+
- `error-quickdid-config-2`: Invalid configuration value
13
+
- `error-quickdid-config-3`: Invalid TTL value (must be positive)
14
+
- `error-quickdid-config-4`: Invalid timeout value (must be positive)
15
+
6
16
## Common Commands
7
17
8
18
### Building and Running
···
10
20
# Build the project
11
21
cargo build
12
22
13
-
# Run in debug mode
14
-
cargo run
23
+
# Run in debug mode (requires environment variables)
24
+
HTTP_EXTERNAL=localhost:3007 SERVICE_KEY=did:key:z42tmZxD2mi1TfMKSFrsRfednwdaaPNZiiWHP4MPgcvXkDWK cargo run
15
25
16
26
# Run tests
17
27
cargo test
···
19
29
# Type checking
20
30
cargo check
21
31
22
-
# Run with environment variables
23
-
HTTP_EXTERNAL=localhost:3007 SERVICE_KEY=did:key:z42tmZxD2mi1TfMKSFrsRfednwdaaPNZiiWHP4MPgcvXkDWK cargo run
32
+
# Linting
33
+
cargo clippy
34
+
35
+
# Show version
36
+
cargo run -- --version
37
+
38
+
# Show help
39
+
cargo run -- --help
24
40
```
25
41
26
42
### Development with VS Code
···
30
46
31
47
### Core Components
32
48
33
-
1. **Handle Resolution** (`src/handle_resolver.rs`)
49
+
1. **Handle Resolution** (`src/handle_resolver/`)
34
50
- `BaseHandleResolver`: Core resolution using DNS and HTTP
35
-
- `RateLimitedHandleResolver`: Semaphore-based rate limiting for concurrent resolutions
51
+
- `RateLimitedHandleResolver`: Semaphore-based rate limiting with optional timeout
36
52
- `CachingHandleResolver`: In-memory caching layer
37
-
- `RedisHandleResolver`: Redis-backed persistent caching with 90-day TTL
53
+
- `RedisHandleResolver`: Redis-backed persistent caching
38
54
- `SqliteHandleResolver`: SQLite-backed persistent caching
39
55
- Uses binary serialization via `HandleResolutionResult` for space efficiency
56
+
- Resolution stack: Cache → RateLimited (optional) → Base → DNS/HTTP
40
57
41
58
2. **Binary Serialization** (`src/handle_resolution_result.rs`)
42
59
- Compact storage format using bincode
43
60
- Strips DID prefixes for did:web and did:plc methods
44
61
- Stores: timestamp (u64), method type (i16), payload (String)
45
62
46
-
3. **Queue System** (`src/queue_adapter.rs`)
47
-
- Supports MPSC (in-process) and Redis adapters
63
+
3. **Queue System** (`src/queue/`)
64
+
- Supports MPSC (in-process), Redis, SQLite, and no-op adapters
48
65
- `HandleResolutionWork` items processed asynchronously
49
66
- Redis uses reliable queue pattern (LPUSH/RPOPLPUSH/LREM)
67
+
- SQLite provides persistent queue with work shedding capabilities
50
68
51
69
4. **HTTP Server** (`src/http/`)
52
70
- XRPC endpoints for AT Protocol compatibility
···
67
85
68
86
### Handle Resolution Flow
69
87
1. Check cache (Redis/SQLite/in-memory based on configuration)
70
-
2. If not cached, acquire rate limit permit (if rate limiting enabled)
88
+
2. If cache miss and rate limiting enabled:
89
+
- Acquire semaphore permit (with optional timeout)
90
+
- If timeout configured and exceeded, return error
71
91
3. Perform DNS TXT lookup or HTTP well-known query
72
92
4. Cache result with appropriate TTL
73
93
5. Return DID or error
···
82
102
- `HTTP_PORT`: Server port (default: 8080)
83
103
- `PLC_HOSTNAME`: PLC directory hostname (default: plc.directory)
84
104
- `REDIS_URL`: Redis connection URL for caching
85
-
- `QUEUE_ADAPTER`: Queue type - 'mpsc', 'redis', 'sqlite', or 'noop' (default: mpsc)
105
+
- `SQLITE_URL`: SQLite database URL for caching (e.g., `sqlite:./quickdid.db`)
106
+
- `QUEUE_ADAPTER`: Queue type - 'mpsc', 'redis', 'sqlite', 'noop', or 'none' (default: mpsc)
86
107
- `QUEUE_REDIS_PREFIX`: Redis key prefix for queues (default: queue:handleresolver:)
87
-
- `QUEUE_WORKER_ID`: Worker ID for Redis queue (default: worker1)
108
+
- `QUEUE_WORKER_ID`: Worker ID for queue operations (default: worker1)
109
+
- `QUEUE_BUFFER_SIZE`: Buffer size for MPSC queue (default: 1000)
110
+
- `QUEUE_SQLITE_MAX_SIZE`: Max queue size for SQLite work shedding (default: 10000)
111
+
- `CACHE_TTL_MEMORY`: TTL for in-memory cache in seconds (default: 600)
112
+
- `CACHE_TTL_REDIS`: TTL for Redis cache in seconds (default: 7776000)
113
+
- `CACHE_TTL_SQLITE`: TTL for SQLite cache in seconds (default: 7776000)
114
+
- `QUEUE_REDIS_TIMEOUT`: Redis blocking timeout in seconds (default: 5)
88
115
- `RESOLVER_MAX_CONCURRENT`: Maximum concurrent handle resolutions (default: 0 = disabled)
116
+
- `RESOLVER_MAX_CONCURRENT_TIMEOUT_MS`: Timeout for acquiring rate limit permit in ms (default: 0 = no timeout)
89
117
- `RUST_LOG`: Logging level (e.g., debug, info)
90
118
91
119
## Error Handling
···
94
122
95
123
error-quickdid-<domain>-<number> <message>: <details>
96
124
97
-
Example errors:
125
+
Current error domains and examples:
98
126
99
-
* error-quickdid-resolve-1 Multiple DIDs resolved for method
100
-
* error-quickdid-plc-1 HTTP request failed: https://google.com/ Not Found
101
-
* error-quickdid-key-1 Error decoding key: invalid
127
+
* `config`: Configuration errors (e.g., error-quickdid-config-1 Missing required environment variable)
128
+
* `resolve`: Handle resolution errors (e.g., error-quickdid-resolve-1 Failed to resolve subject)
129
+
* `queue`: Queue operation errors (e.g., error-quickdid-queue-1 Failed to push to queue)
130
+
* `cache`: Cache-related errors (e.g., error-quickdid-cache-1 Redis pool creation failed)
131
+
* `result`: Serialization errors (e.g., error-quickdid-result-1 System time error)
132
+
* `task`: Task processing errors (e.g., error-quickdid-task-1 Queue adapter health check failed)
102
133
103
134
Errors should be represented as enums using the `thiserror` library.
104
135
···
128
159
## Development Patterns
129
160
130
161
### Error Handling
131
-
- Uses `anyhow::Result` for error propagation
132
-
- Graceful fallbacks when Redis is unavailable
162
+
- Uses strongly-typed errors with `thiserror` for all modules
163
+
- Each error has a unique identifier following the pattern `error-quickdid-<domain>-<number>`
164
+
- Graceful fallbacks when Redis/SQLite is unavailable
133
165
- Detailed tracing for debugging
166
+
- Avoid using `anyhow!()` or `bail!()` macros - use proper error types instead
134
167
135
168
### Performance Optimizations
136
169
- Binary serialization reduces storage by ~40%
···
153
186
3. Add test cases for the new method type
154
187
155
188
### Modifying Cache TTL
156
-
- For in-memory: Pass TTL to `CachingHandleResolver::new()`
157
-
- For Redis: Modify `RedisHandleResolver::ttl_seconds()`
189
+
- For in-memory: Set `CACHE_TTL_MEMORY` environment variable
190
+
- For Redis: Set `CACHE_TTL_REDIS` environment variable
191
+
- For SQLite: Set `CACHE_TTL_SQLITE` environment variable
158
192
159
193
### Debugging Resolution Issues
160
194
1. Enable debug logging: `RUST_LOG=debug`
161
195
2. Check Redis cache: `redis-cli GET "handle:<hash>"`
162
-
3. Monitor queue processing in logs
163
-
4. Verify DNS/HTTP connectivity to AT Protocol infrastructure
196
+
3. Check SQLite cache: `sqlite3 quickdid.db "SELECT * FROM handle_resolution_cache;"`
197
+
4. Monitor queue processing in logs
198
+
5. Check rate limiting: Look for "Rate limit permit acquisition timed out" errors
199
+
6. Verify DNS/HTTP connectivity to AT Protocol infrastructure
164
200
165
201
## Dependencies
166
202
- `atproto-identity`: Core AT Protocol identity resolution
+118
-12
README.md
+118
-12
README.md
···
1
1
# QuickDID
2
2
3
-
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides blazing-fast handle-to-DID resolution with intelligent caching strategies, supporting both in-memory and Redis-backed persistent caching with binary serialization for optimal storage efficiency.
3
+
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides blazing-fast handle-to-DID resolution with intelligent caching strategies, supporting in-memory, Redis-backed, and SQLite-backed persistent caching with binary serialization for optimal storage efficiency.
4
4
5
-
Built with minimal dependencies and optimized for production use, QuickDID delivers exceptional performance while maintaining a lean footprint.
5
+
Built following the 12-factor app methodology with minimal dependencies and optimized for production use, QuickDID delivers exceptional performance while maintaining a lean footprint. Configuration is handled exclusively through environment variables, with only `--version` and `--help` command-line arguments supported.
6
6
7
7
## ⚠️ Production Disclaimer
8
8
9
9
**This project is a release candidate and has not been fully vetted for production use.** While it includes comprehensive error handling and has been designed with production features in mind, more thorough testing is necessary before deploying in critical environments. Use at your own risk and conduct appropriate testing for your use case.
10
10
11
+
## Performance
12
+
13
+
QuickDID is designed for high throughput and low latency:
14
+
15
+
- **Binary serialization** reduces cache storage by ~40% compared to JSON
16
+
- **Rate limiting** protects upstream services from being overwhelmed
17
+
- **Work shedding** in SQLite queue adapter prevents unbounded growth
18
+
- **Configurable TTLs** allow fine-tuning cache freshness vs. performance
19
+
- **Connection pooling** for Redis minimizes connection overhead
20
+
11
21
## Features
12
22
13
23
- **Fast Handle Resolution**: Resolves AT Protocol handles to DIDs using DNS TXT records and HTTP well-known endpoints
14
-
- **Multi-Layer Caching**: In-memory caching with configurable TTL and Redis-backed persistent caching (90-day TTL)
24
+
- **Multi-Layer Caching**: Flexible caching with three tiers:
25
+
- In-memory caching with configurable TTL (default: 600 seconds)
26
+
- Redis-backed persistent caching (default: 90-day TTL)
27
+
- SQLite-backed persistent caching (default: 90-day TTL)
28
+
- **Rate Limiting**: Semaphore-based concurrency control with optional timeout to protect upstream services
15
29
- **Binary Serialization**: Compact storage format reduces cache size by ~40% compared to JSON
16
-
- **Queue Processing**: Asynchronous handle resolution with support for MPSC, Redis, and no-op queue adapters
30
+
- **Queue Processing**: Asynchronous handle resolution with multiple adapters:
31
+
- MPSC (in-memory, default)
32
+
- Redis (distributed)
33
+
- SQLite (persistent with work shedding)
34
+
- No-op (testing)
17
35
- **AT Protocol Compatible**: Implements XRPC endpoints for seamless integration with AT Protocol infrastructure
18
-
- **Comprehensive Error Handling**: Includes health checks and graceful shutdown support
36
+
- **Comprehensive Error Handling**: Structured errors with unique identifiers (e.g., `error-quickdid-config-1`), health checks, and graceful shutdown
37
+
- **12-Factor App**: Environment-based configuration following cloud-native best practices
19
38
- **Minimal Dependencies**: Optimized dependency tree for faster compilation and reduced attack surface
20
-
- **Predictable Worker IDs**: Simple default worker identification for distributed deployments
21
39
22
40
## Building
23
41
···
25
43
26
44
- Rust 1.70 or later
27
45
- Redis (optional, for persistent caching and distributed queuing)
46
+
- SQLite 3.35+ (optional, for single-instance persistent caching)
28
47
29
48
### Build Commands
30
49
···
45
64
46
65
## Minimum Configuration
47
66
48
-
QuickDID requires the following environment variables to run:
67
+
QuickDID requires the following environment variables to run. Configuration is validated at startup, and the service will exit with specific error codes if validation fails.
49
68
50
69
### Required
51
70
···
62
81
63
82
This will start QuickDID with:
64
83
- HTTP server on port 8080 (default)
65
-
- In-memory caching only (300-second TTL)
84
+
- In-memory caching only (600-second TTL default)
66
85
- MPSC queue adapter for async processing
67
86
- Default worker ID: "worker1"
68
87
- Connection to plc.directory for DID resolution
88
+
- Rate limiting disabled (default)
69
89
70
90
### Optional Configuration
71
91
72
92
For production deployments, consider these additional environment variables:
73
93
94
+
#### Network & Service
74
95
- `HTTP_PORT`: Server port (default: 8080)
75
-
- `REDIS_URL`: Redis connection URL for persistent caching (e.g., `redis://localhost:6379`)
76
-
- `QUEUE_ADAPTER`: Queue type - 'mpsc', 'redis', or 'noop' (default: mpsc)
77
-
- `QUEUE_WORKER_ID`: Worker identifier for distributed queue processing (default: worker1)
78
96
- `PLC_HOSTNAME`: PLC directory hostname (default: plc.directory)
97
+
- `USER_AGENT`: HTTP User-Agent for outgoing requests
98
+
- `DNS_NAMESERVERS`: Custom DNS servers (comma-separated)
99
+
100
+
#### Caching
101
+
- `REDIS_URL`: Redis connection URL (e.g., `redis://localhost:6379`)
102
+
- `SQLITE_URL`: SQLite database URL (e.g., `sqlite:./quickdid.db`)
103
+
- `CACHE_TTL_MEMORY`: In-memory cache TTL in seconds (default: 600)
104
+
- `CACHE_TTL_REDIS`: Redis cache TTL in seconds (default: 7776000 = 90 days)
105
+
- `CACHE_TTL_SQLITE`: SQLite cache TTL in seconds (default: 7776000 = 90 days)
106
+
107
+
#### Queue Processing
108
+
- `QUEUE_ADAPTER`: Queue type - 'mpsc', 'redis', 'sqlite', 'noop', or 'none' (default: mpsc)
109
+
- `QUEUE_WORKER_ID`: Worker identifier (default: worker1)
110
+
- `QUEUE_BUFFER_SIZE`: MPSC queue buffer size (default: 1000)
111
+
- `QUEUE_REDIS_PREFIX`: Redis key prefix for queues (default: queue:handleresolver:)
112
+
- `QUEUE_REDIS_TIMEOUT`: Redis blocking timeout in seconds (default: 5)
113
+
- `QUEUE_SQLITE_MAX_SIZE`: Max SQLite queue size for work shedding (default: 10000)
114
+
115
+
#### Rate Limiting
116
+
- `RESOLVER_MAX_CONCURRENT`: Maximum concurrent handle resolutions (default: 0 = disabled)
117
+
- `RESOLVER_MAX_CONCURRENT_TIMEOUT_MS`: Timeout for acquiring rate limit permit in ms (default: 0 = no timeout)
118
+
119
+
#### Logging
79
120
- `RUST_LOG`: Logging level (e.g., debug, info, warn, error)
80
121
81
-
### Production Example
122
+
### Production Examples
82
123
124
+
#### Redis-based (Multi-instance/HA)
83
125
```bash
84
126
HTTP_EXTERNAL=quickdid.example.com \
85
127
SERVICE_KEY=did:key:yourkeyhere \
86
128
HTTP_PORT=3000 \
87
129
REDIS_URL=redis://localhost:6379 \
130
+
CACHE_TTL_REDIS=86400 \
88
131
QUEUE_ADAPTER=redis \
89
132
QUEUE_WORKER_ID=prod-worker-1 \
133
+
RESOLVER_MAX_CONCURRENT=100 \
134
+
RESOLVER_MAX_CONCURRENT_TIMEOUT_MS=5000 \
90
135
RUST_LOG=info \
91
136
./target/release/quickdid
92
137
```
93
138
139
+
#### SQLite-based (Single-instance)
140
+
```bash
141
+
HTTP_EXTERNAL=quickdid.example.com \
142
+
SERVICE_KEY=did:key:yourkeyhere \
143
+
HTTP_PORT=3000 \
144
+
SQLITE_URL=sqlite:./quickdid.db \
145
+
CACHE_TTL_SQLITE=86400 \
146
+
QUEUE_ADAPTER=sqlite \
147
+
QUEUE_SQLITE_MAX_SIZE=10000 \
148
+
RESOLVER_MAX_CONCURRENT=50 \
149
+
RUST_LOG=info \
150
+
./target/release/quickdid
151
+
```
152
+
153
+
## Architecture
154
+
155
+
QuickDID uses a layered architecture for optimal performance:
156
+
157
+
```
158
+
Request → Cache Layer → Rate Limiter → Base Resolver → DNS/HTTP
159
+
↓ ↓ ↓
160
+
Memory/Redis/ Semaphore AT Protocol
161
+
SQLite (optional) Infrastructure
162
+
```
163
+
164
+
### Cache Priority
165
+
QuickDID checks caches in this order:
166
+
1. Redis (if configured) - Best for distributed deployments
167
+
2. SQLite (if configured) - Best for single-instance with persistence
168
+
3. In-memory (fallback) - Always available
169
+
170
+
### Deployment Strategies
171
+
172
+
- **Single-instance**: Use SQLite for both caching and queuing
173
+
- **Multi-instance/HA**: Use Redis for distributed caching and queuing
174
+
- **Development**: Use in-memory caching with MPSC queuing
175
+
94
176
## API Endpoints
95
177
96
178
- `GET /_health` - Health check endpoint
97
179
- `GET /xrpc/com.atproto.identity.resolveHandle` - Resolve handle to DID
98
180
- `GET /.well-known/atproto-did` - Serve DID document for the service
181
+
182
+
## Docker Deployment
183
+
184
+
QuickDID can be deployed using Docker. See the [production deployment guide](docs/production-deployment.md) for detailed Docker and Docker Compose configurations.
185
+
186
+
### Quick Docker Setup
187
+
188
+
```bash
189
+
# Build the image
190
+
docker build -t quickdid:latest .
191
+
192
+
# Run with environment file
193
+
docker run -d \
194
+
--name quickdid \
195
+
--env-file .env \
196
+
-p 8080:8080 \
197
+
quickdid:latest
198
+
```
199
+
200
+
## Documentation
201
+
202
+
- [Configuration Reference](docs/configuration-reference.md) - Complete list of all configuration options
203
+
- [Production Deployment Guide](docs/production-deployment.md) - Docker, monitoring, and production best practices
204
+
- [Development Guide](CLAUDE.md) - Architecture details and development patterns
99
205
100
206
## License
101
207
+46
-19
src/bin/quickdid.rs
+46
-19
src/bin/quickdid.rs
···
11
11
create_base_resolver, create_caching_resolver, create_rate_limited_resolver_with_timeout,
12
12
create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl,
13
13
},
14
-
sqlite_schema::create_sqlite_pool,
15
14
handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config},
16
15
http::{AppContext, create_router},
17
16
queue::{
18
17
HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue,
19
18
create_redis_queue, create_sqlite_queue, create_sqlite_queue_with_max_size,
20
19
},
20
+
sqlite_schema::create_sqlite_pool,
21
21
task_manager::spawn_cancellable_task,
22
22
};
23
23
use serde_json::json;
···
57
57
/// Simple command-line argument handling for --version and --help
58
58
fn handle_simple_args() -> bool {
59
59
let args: Vec<String> = std::env::args().collect();
60
-
60
+
61
61
if args.len() > 1 {
62
62
match args[1].as_str() {
63
63
"--version" | "-V" => {
···
77
77
println!();
78
78
println!("ENVIRONMENT VARIABLES:");
79
79
println!(" SERVICE_KEY Private key for service identity (required)");
80
-
println!(" HTTP_EXTERNAL External hostname for service endpoints (required)");
80
+
println!(
81
+
" HTTP_EXTERNAL External hostname for service endpoints (required)"
82
+
);
81
83
println!(" HTTP_PORT HTTP server port (default: 8080)");
82
84
println!(" PLC_HOSTNAME PLC directory hostname (default: plc.directory)");
83
-
println!(" USER_AGENT HTTP User-Agent header (auto-generated with version)");
85
+
println!(
86
+
" USER_AGENT HTTP User-Agent header (auto-generated with version)"
87
+
);
84
88
println!(" DNS_NAMESERVERS Custom DNS nameservers (comma-separated IPs)");
85
-
println!(" CERTIFICATE_BUNDLES Additional CA certificates (comma-separated paths)");
89
+
println!(
90
+
" CERTIFICATE_BUNDLES Additional CA certificates (comma-separated paths)"
91
+
);
86
92
println!();
87
93
println!(" CACHING:");
88
94
println!(" REDIS_URL Redis URL for handle resolution caching");
89
-
println!(" SQLITE_URL SQLite database URL for handle resolution caching");
90
-
println!(" CACHE_TTL_MEMORY TTL for in-memory cache in seconds (default: 600)");
91
-
println!(" CACHE_TTL_REDIS TTL for Redis cache in seconds (default: 7776000)");
92
-
println!(" CACHE_TTL_SQLITE TTL for SQLite cache in seconds (default: 7776000)");
95
+
println!(
96
+
" SQLITE_URL SQLite database URL for handle resolution caching"
97
+
);
98
+
println!(
99
+
" CACHE_TTL_MEMORY TTL for in-memory cache in seconds (default: 600)"
100
+
);
101
+
println!(
102
+
" CACHE_TTL_REDIS TTL for Redis cache in seconds (default: 7776000)"
103
+
);
104
+
println!(
105
+
" CACHE_TTL_SQLITE TTL for SQLite cache in seconds (default: 7776000)"
106
+
);
93
107
println!();
94
108
println!(" QUEUE CONFIGURATION:");
95
-
println!(" QUEUE_ADAPTER Queue adapter: 'mpsc', 'redis', 'sqlite', 'noop' (default: mpsc)");
109
+
println!(
110
+
" QUEUE_ADAPTER Queue adapter: 'mpsc', 'redis', 'sqlite', 'noop' (default: mpsc)"
111
+
);
96
112
println!(" QUEUE_REDIS_URL Redis URL for queue adapter");
97
-
println!(" QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)");
113
+
println!(
114
+
" QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)"
115
+
);
98
116
println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)");
99
117
println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)");
100
118
println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)");
101
119
println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)");
102
120
println!();
103
121
println!(" RATE LIMITING:");
104
-
println!(" RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)");
105
-
println!(" RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)");
122
+
println!(
123
+
" RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)"
124
+
);
125
+
println!(
126
+
" RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)"
127
+
);
106
128
println!();
107
-
println!("For more information, visit: https://github.com/smokesignal.events/quickdid");
129
+
println!(
130
+
"For more information, visit: https://github.com/smokesignal.events/quickdid"
131
+
);
108
132
return true;
109
133
}
110
134
_ => {}
111
135
}
112
136
}
113
-
137
+
114
138
false
115
139
}
116
140
···
194
218
let dns_resolver_arc = Arc::new(dns_resolver);
195
219
196
220
// Create base handle resolver using factory function
197
-
let mut base_handle_resolver = create_base_resolver(dns_resolver_arc.clone(), http_client.clone());
221
+
let mut base_handle_resolver =
222
+
create_base_resolver(dns_resolver_arc.clone(), http_client.clone());
198
223
199
224
// Apply rate limiting if configured
200
225
if config.resolver_max_concurrent > 0 {
···
209
234
timeout_info
210
235
);
211
236
base_handle_resolver = create_rate_limited_resolver_with_timeout(
212
-
base_handle_resolver,
237
+
base_handle_resolver,
213
238
config.resolver_max_concurrent,
214
-
config.resolver_max_concurrent_timeout_ms
239
+
config.resolver_max_concurrent_timeout_ms,
215
240
);
216
241
}
217
242
···
314
339
create_sqlite_queue::<HandleResolutionWork>(pool)
315
340
}
316
341
} else {
317
-
tracing::warn!("Failed to create SQLite pool for queue, falling back to MPSC queue adapter");
342
+
tracing::warn!(
343
+
"Failed to create SQLite pool for queue, falling back to MPSC queue adapter"
344
+
);
318
345
// Fall back to MPSC if SQLite fails
319
346
let (handle_sender, handle_receiver) =
320
347
tokio::sync::mpsc::channel::<HandleResolutionWork>(
+8
-3
src/config.rs
+8
-3
src/config.rs
···
70
70
}
71
71
72
72
/// Helper function to parse an environment variable as a specific type
73
-
fn parse_env<T: std::str::FromStr>(key: &str, default: T) -> Result<T, ConfigError>
73
+
fn parse_env<T: std::str::FromStr>(key: &str, default: T) -> Result<T, ConfigError>
74
74
where
75
75
T::Err: std::fmt::Display,
76
76
{
77
77
match env::var(key) {
78
-
Ok(val) if !val.is_empty() => val.parse::<T>()
78
+
Ok(val) if !val.is_empty() => val
79
+
.parse::<T>()
79
80
.map_err(|e| ConfigError::InvalidValue(format!("{}: {}", key, e))),
80
81
_ => Ok(default),
81
82
}
···
244
245
sqlite_url: get_env_or_default("SQLITE_URL", None),
245
246
queue_adapter: get_env_or_default("QUEUE_ADAPTER", Some("mpsc")).unwrap(),
246
247
queue_redis_url: get_env_or_default("QUEUE_REDIS_URL", None),
247
-
queue_redis_prefix: get_env_or_default("QUEUE_REDIS_PREFIX", Some("queue:handleresolver:")).unwrap(),
248
+
queue_redis_prefix: get_env_or_default(
249
+
"QUEUE_REDIS_PREFIX",
250
+
Some("queue:handleresolver:"),
251
+
)
252
+
.unwrap(),
248
253
queue_worker_id: get_env_or_default("QUEUE_WORKER_ID", Some("worker1")).unwrap(),
249
254
queue_buffer_size: parse_env("QUEUE_BUFFER_SIZE", 1000)?,
250
255
cache_ttl_memory: parse_env("CACHE_TTL_MEMORY", 600)?,
+1
-1
src/handle_resolver/memory.rs
+1
-1
src/handle_resolver/memory.rs
···
7
7
use super::errors::HandleResolverError;
8
8
use super::traits::HandleResolver;
9
9
use async_trait::async_trait;
10
-
use std::time::{SystemTime, UNIX_EPOCH};
11
10
use std::collections::HashMap;
12
11
use std::sync::Arc;
12
+
use std::time::{SystemTime, UNIX_EPOCH};
13
13
use tokio::sync::RwLock;
14
14
15
15
/// Result of a handle resolution cached in memory.
+34
-18
src/handle_resolver/rate_limited.rs
+34
-18
src/handle_resolver/rate_limited.rs
···
53
53
pub(super) struct RateLimitedHandleResolver {
54
54
/// Inner resolver that performs actual resolution.
55
55
inner: Arc<dyn HandleResolver>,
56
-
56
+
57
57
/// Semaphore for limiting concurrent resolutions.
58
58
semaphore: Arc<Semaphore>,
59
-
59
+
60
60
/// Optional timeout for acquiring permits (in milliseconds).
61
61
/// When None or 0, no timeout is applied.
62
62
timeout_ms: Option<u64>,
···
76
76
timeout_ms: None,
77
77
}
78
78
}
79
-
79
+
80
80
/// Create a new rate-limited resolver with timeout.
81
81
///
82
82
/// # Arguments
···
84
84
/// * `inner` - The inner resolver to wrap
85
85
/// * `max_concurrent` - Maximum number of concurrent resolutions allowed
86
86
/// * `timeout_ms` - Timeout in milliseconds for acquiring permits (0 = no timeout)
87
-
pub fn new_with_timeout(inner: Arc<dyn HandleResolver>, max_concurrent: usize, timeout_ms: u64) -> Self {
87
+
pub fn new_with_timeout(
88
+
inner: Arc<dyn HandleResolver>,
89
+
max_concurrent: usize,
90
+
timeout_ms: u64,
91
+
) -> Self {
88
92
Self {
89
93
inner,
90
94
semaphore: Arc::new(Semaphore::new(max_concurrent)),
91
-
timeout_ms: if timeout_ms > 0 { Some(timeout_ms) } else { None },
95
+
timeout_ms: if timeout_ms > 0 {
96
+
Some(timeout_ms)
97
+
} else {
98
+
None
99
+
},
92
100
}
93
101
}
94
102
}
···
105
113
Ok(Ok(permit)) => permit,
106
114
Ok(Err(e)) => {
107
115
// Semaphore error (e.g., closed)
108
-
return Err(HandleResolverError::ResolutionFailed(
109
-
format!("Failed to acquire rate limit permit: {}", e)
110
-
));
116
+
return Err(HandleResolverError::ResolutionFailed(format!(
117
+
"Failed to acquire rate limit permit: {}",
118
+
e
119
+
)));
111
120
}
112
121
Err(_) => {
113
122
// Timeout occurred
114
-
return Err(HandleResolverError::ResolutionFailed(
115
-
format!("Rate limit permit acquisition timed out after {}ms", timeout_ms)
116
-
));
123
+
return Err(HandleResolverError::ResolutionFailed(format!(
124
+
"Rate limit permit acquisition timed out after {}ms",
125
+
timeout_ms
126
+
)));
117
127
}
118
128
}
119
129
}
120
130
_ => {
121
131
// No timeout configured, wait indefinitely
122
-
self.semaphore.acquire().await
123
-
.map_err(|e| HandleResolverError::ResolutionFailed(
124
-
format!("Failed to acquire rate limit permit: {}", e)
125
-
))?
132
+
self.semaphore.acquire().await.map_err(|e| {
133
+
HandleResolverError::ResolutionFailed(format!(
134
+
"Failed to acquire rate limit permit: {}",
135
+
e
136
+
))
137
+
})?
126
138
}
127
139
};
128
-
140
+
129
141
// With permit acquired, forward to inner resolver
130
142
self.inner.resolve(s).await
131
143
}
···
209
221
max_concurrent: usize,
210
222
timeout_ms: u64,
211
223
) -> Arc<dyn HandleResolver> {
212
-
Arc::new(RateLimitedHandleResolver::new_with_timeout(inner, max_concurrent, timeout_ms))
213
-
}
224
+
Arc::new(RateLimitedHandleResolver::new_with_timeout(
225
+
inner,
226
+
max_concurrent,
227
+
timeout_ms,
228
+
))
229
+
}
+14
-2
src/handle_resolver/redis.rs
+14
-2
src/handle_resolver/redis.rs
···
297
297
});
298
298
299
299
// Create Redis-backed resolver with a unique key prefix for testing
300
-
let test_prefix = format!("test:handle:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
300
+
let test_prefix = format!(
301
+
"test:handle:{}:",
302
+
std::time::SystemTime::now()
303
+
.duration_since(std::time::UNIX_EPOCH)
304
+
.unwrap()
305
+
.as_nanos()
306
+
);
301
307
let redis_resolver = RedisHandleResolver::with_full_config(
302
308
mock_resolver,
303
309
pool.clone(),
···
339
345
});
340
346
341
347
// Create Redis-backed resolver with a unique key prefix for testing
342
-
let test_prefix = format!("test:handle:{}:", std::time::SystemTime::now().duration_since(std::time::UNIX_EPOCH).unwrap().as_nanos());
348
+
let test_prefix = format!(
349
+
"test:handle:{}:",
350
+
std::time::SystemTime::now()
351
+
.duration_since(std::time::UNIX_EPOCH)
352
+
.unwrap()
353
+
.as_nanos()
354
+
);
343
355
let redis_resolver = RedisHandleResolver::with_full_config(
344
356
mock_resolver,
345
357
pool.clone(),
+69
-48
src/handle_resolver/sqlite.rs
+69
-48
src/handle_resolver/sqlite.rs
···
103
103
let key = self.make_key(&handle) as i64; // SQLite uses signed integers
104
104
105
105
// Try to get from SQLite cache first
106
-
let cached_result = sqlx::query(
107
-
"SELECT result, updated FROM handle_resolution_cache WHERE key = ?1"
108
-
)
109
-
.bind(key)
110
-
.fetch_optional(&self.pool)
111
-
.await;
106
+
let cached_result =
107
+
sqlx::query("SELECT result, updated FROM handle_resolution_cache WHERE key = ?1")
108
+
.bind(key)
109
+
.fetch_optional(&self.pool)
110
+
.await;
112
111
113
112
match cached_result {
114
113
Ok(Some(row)) => {
···
198
197
ON CONFLICT(key) DO UPDATE SET
199
198
result = excluded.result,
200
199
updated = excluded.updated
201
-
"#
200
+
"#,
202
201
)
203
202
.bind(key)
204
203
.bind(&bytes)
···
334
333
assert_eq!(result1, "did:plc:testuser123");
335
334
336
335
// Verify record was inserted
337
-
let count_after_first: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
338
-
.fetch_one(&pool)
339
-
.await
340
-
.expect("Failed to query count after first resolution");
336
+
let count_after_first: i64 =
337
+
sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
338
+
.fetch_one(&pool)
339
+
.await
340
+
.expect("Failed to query count after first resolution");
341
341
assert_eq!(count_after_first, 1);
342
342
343
343
// Verify the cached record has correct key and non-empty result
344
-
let cached_record = sqlx::query("SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1")
345
-
.bind(expected_key)
346
-
.fetch_one(&pool)
347
-
.await
348
-
.expect("Failed to fetch cached record");
349
-
344
+
let cached_record = sqlx::query(
345
+
"SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1",
346
+
)
347
+
.bind(expected_key)
348
+
.fetch_one(&pool)
349
+
.await
350
+
.expect("Failed to fetch cached record");
351
+
350
352
let cached_key: i64 = cached_record.get("key");
351
353
let cached_result: Vec<u8> = cached_record.get("result");
352
354
let cached_created: i64 = cached_record.get("created");
353
355
let cached_updated: i64 = cached_record.get("updated");
354
356
355
357
assert_eq!(cached_key, expected_key);
356
-
assert!(!cached_result.is_empty(), "Cached result should not be empty");
358
+
assert!(
359
+
!cached_result.is_empty(),
360
+
"Cached result should not be empty"
361
+
);
357
362
assert!(cached_created > 0, "Created timestamp should be positive");
358
363
assert!(cached_updated > 0, "Updated timestamp should be positive");
359
-
assert_eq!(cached_created, cached_updated, "Created and updated should be equal on first insert");
364
+
assert_eq!(
365
+
cached_created, cached_updated,
366
+
"Created and updated should be equal on first insert"
367
+
);
360
368
361
369
// Verify we can deserialize the cached result
362
-
let resolution_result = crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result)
363
-
.expect("Failed to deserialize cached result");
370
+
let resolution_result =
371
+
crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result)
372
+
.expect("Failed to deserialize cached result");
364
373
let cached_did = resolution_result.to_did().expect("Should have a DID");
365
374
assert_eq!(cached_did, "did:plc:testuser123");
366
375
···
369
378
assert_eq!(result2, "did:plc:testuser123");
370
379
371
380
// Verify count hasn't changed (cache hit, no new insert)
372
-
let count_after_second: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
373
-
.fetch_one(&pool)
374
-
.await
375
-
.expect("Failed to query count after second resolution");
381
+
let count_after_second: i64 =
382
+
sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
383
+
.fetch_one(&pool)
384
+
.await
385
+
.expect("Failed to query count after second resolution");
376
386
assert_eq!(count_after_second, 1);
377
387
}
378
388
···
410
420
// First resolution - should fail and cache the failure
411
421
let result1 = sqlite_resolver.resolve(test_handle).await;
412
422
assert!(result1.is_err());
413
-
423
+
414
424
// Match the specific error type we expect
415
425
match result1 {
416
-
Err(HandleResolverError::MockResolutionFailure) => {},
426
+
Err(HandleResolverError::MockResolutionFailure) => {}
417
427
other => panic!("Expected MockResolutionFailure, got {:?}", other),
418
428
}
419
429
420
430
// Verify the failure was cached
421
-
let count_after_first: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
422
-
.fetch_one(&pool)
423
-
.await
424
-
.expect("Failed to query count after first resolution");
431
+
let count_after_first: i64 =
432
+
sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
433
+
.fetch_one(&pool)
434
+
.await
435
+
.expect("Failed to query count after first resolution");
425
436
assert_eq!(count_after_first, 1);
426
437
427
438
// Verify the cached error record
428
-
let cached_record = sqlx::query("SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1")
429
-
.bind(expected_key)
430
-
.fetch_one(&pool)
431
-
.await
432
-
.expect("Failed to fetch cached error record");
433
-
439
+
let cached_record = sqlx::query(
440
+
"SELECT key, result, created, updated FROM handle_resolution_cache WHERE key = ?1",
441
+
)
442
+
.bind(expected_key)
443
+
.fetch_one(&pool)
444
+
.await
445
+
.expect("Failed to fetch cached error record");
446
+
434
447
let cached_key: i64 = cached_record.get("key");
435
448
let cached_result: Vec<u8> = cached_record.get("result");
436
449
let cached_created: i64 = cached_record.get("created");
437
450
let cached_updated: i64 = cached_record.get("updated");
438
451
439
452
assert_eq!(cached_key, expected_key);
440
-
assert!(!cached_result.is_empty(), "Cached error result should not be empty");
453
+
assert!(
454
+
!cached_result.is_empty(),
455
+
"Cached error result should not be empty"
456
+
);
441
457
assert!(cached_created > 0, "Created timestamp should be positive");
442
458
assert!(cached_updated > 0, "Updated timestamp should be positive");
443
-
assert_eq!(cached_created, cached_updated, "Created and updated should be equal on first insert");
459
+
assert_eq!(
460
+
cached_created, cached_updated,
461
+
"Created and updated should be equal on first insert"
462
+
);
444
463
445
464
// Verify we can deserialize the cached error result
446
-
let resolution_result = crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result)
447
-
.expect("Failed to deserialize cached error result");
465
+
let resolution_result =
466
+
crate::handle_resolution_result::HandleResolutionResult::from_bytes(&cached_result)
467
+
.expect("Failed to deserialize cached error result");
448
468
let cached_did = resolution_result.to_did();
449
469
assert!(cached_did.is_none(), "Error result should have no DID");
450
470
451
471
// Second resolution - should hit cache with error (no additional database operations)
452
472
let result2 = sqlite_resolver.resolve(test_handle).await;
453
473
assert!(result2.is_err());
454
-
474
+
455
475
// Match the specific error type we expect from cache
456
476
match result2 {
457
-
Err(HandleResolverError::HandleNotFound) => {}, // Cache returns HandleNotFound for "not resolved"
477
+
Err(HandleResolverError::HandleNotFound) => {} // Cache returns HandleNotFound for "not resolved"
458
478
other => panic!("Expected HandleNotFound from cache, got {:?}", other),
459
479
}
460
480
461
481
// Verify count hasn't changed (cache hit, no new operations)
462
-
let count_after_second: i64 = sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
463
-
.fetch_one(&pool)
464
-
.await
465
-
.expect("Failed to query count after second resolution");
482
+
let count_after_second: i64 =
483
+
sqlx::query_scalar("SELECT COUNT(*) FROM handle_resolution_cache")
484
+
.fetch_one(&pool)
485
+
.await
486
+
.expect("Failed to query count after second resolution");
466
487
assert_eq!(count_after_second, 1);
467
488
}
468
-
}
489
+
}
+1
-1
src/http/server.rs
+1
-1
src/http/server.rs
+6
-6
src/queue/adapter.rs
+6
-6
src/queue/adapter.rs
···
3
3
//! This module defines the core `QueueAdapter` trait that provides a common
4
4
//! interface for different queue implementations (MPSC, Redis, SQLite, etc.).
5
5
6
-
use async_trait::async_trait;
7
6
use super::error::Result;
7
+
use async_trait::async_trait;
8
8
9
9
/// Generic trait for queue adapters that can work with any work type.
10
10
///
···
173
173
#[tokio::test]
174
174
async fn test_default_trait_methods() {
175
175
let queue = MockQueue::<String>::new();
176
-
176
+
177
177
// Test default ack implementation
178
178
assert!(queue.ack(&"test".to_string()).await.is_ok());
179
-
179
+
180
180
// Test default try_push implementation
181
181
assert!(queue.try_push("test".to_string()).await.is_ok());
182
-
182
+
183
183
// Test default depth implementation
184
184
assert_eq!(queue.depth().await, None);
185
-
185
+
186
186
// Test default is_healthy implementation
187
187
assert!(queue.is_healthy().await);
188
188
}
189
-
}
189
+
}
+4
-4
src/queue/error.rs
+4
-4
src/queue/error.rs
···
29
29
30
30
/// Redis operation failed.
31
31
#[error("error-quickdid-queue-5 Redis operation failed: {operation}: {details}")]
32
-
RedisOperationFailed {
32
+
RedisOperationFailed {
33
33
/// The Redis operation that failed
34
-
operation: String,
34
+
operation: String,
35
35
/// Details about the failure
36
-
details: String
36
+
details: String,
37
37
},
38
38
39
39
/// Failed to serialize an item for storage.
···
73
73
assert!(err.to_string().contains("LPUSH"));
74
74
assert!(err.to_string().contains("connection timeout"));
75
75
}
76
-
}
76
+
}
+18
-23
src/queue/factory.rs
+18
-23
src/queue/factory.rs
···
9
9
use tokio::sync::mpsc;
10
10
11
11
use super::{
12
-
adapter::QueueAdapter,
13
-
mpsc::MpscQueueAdapter,
14
-
noop::NoopQueueAdapter,
15
-
redis::RedisQueueAdapter,
16
-
sqlite::SqliteQueueAdapter,
12
+
adapter::QueueAdapter, mpsc::MpscQueueAdapter, noop::NoopQueueAdapter,
13
+
redis::RedisQueueAdapter, sqlite::SqliteQueueAdapter,
17
14
};
18
15
19
16
// ========= MPSC Queue Factories =========
···
218
215
#[tokio::test]
219
216
async fn test_create_mpsc_queue() {
220
217
let queue = create_mpsc_queue::<String>(10);
221
-
218
+
222
219
queue.push("test".to_string()).await.unwrap();
223
220
let item = queue.pull().await;
224
221
assert_eq!(item, Some("test".to_string()));
···
228
225
async fn test_create_mpsc_queue_from_channel() {
229
226
let (sender, receiver) = mpsc::channel(5);
230
227
let queue = create_mpsc_queue_from_channel(sender.clone(), receiver);
231
-
228
+
232
229
// Send via original sender
233
230
sender.send("external".to_string()).await.unwrap();
234
-
231
+
235
232
// Receive via queue
236
233
let item = queue.pull().await;
237
234
assert_eq!(item, Some("external".to_string()));
···
240
237
#[tokio::test]
241
238
async fn test_create_noop_queue() {
242
239
let queue = create_noop_queue::<String>();
243
-
240
+
244
241
// Should accept pushes
245
242
queue.push("ignored".to_string()).await.unwrap();
246
-
243
+
247
244
// Should report as healthy
248
245
assert!(queue.is_healthy().await);
249
-
246
+
250
247
// Should report depth as 0
251
248
assert_eq!(queue.depth().await, Some(0));
252
249
}
···
264
261
.expect("Failed to create schema");
265
262
266
263
let queue = create_sqlite_queue::<HandleResolutionWork>(pool);
267
-
264
+
268
265
let work = HandleResolutionWork::new("test.example.com".to_string());
269
266
queue.push(work.clone()).await.unwrap();
270
-
267
+
271
268
let pulled = queue.pull().await;
272
269
assert_eq!(pulled, Some(work));
273
270
}
···
286
283
287
284
// Create queue with small max size
288
285
let queue = create_sqlite_queue_with_max_size::<HandleResolutionWork>(pool, 5);
289
-
286
+
290
287
// Push items
291
288
for i in 0..10 {
292
289
let work = HandleResolutionWork::new(format!("test-{}.example.com", i));
293
290
queue.push(work).await.unwrap();
294
291
}
295
-
292
+
296
293
// Should have limited items due to work shedding
297
294
let depth = queue.depth().await.unwrap();
298
-
assert!(depth <= 5, "Queue should have at most 5 items after work shedding");
295
+
assert!(
296
+
depth <= 5,
297
+
"Queue should have at most 5 items after work shedding"
298
+
);
299
299
}
300
300
301
301
#[tokio::test]
···
316
316
.as_nanos()
317
317
);
318
318
319
-
let queue = create_redis_queue::<String>(
320
-
pool,
321
-
"test-worker".to_string(),
322
-
test_prefix,
323
-
1,
324
-
);
319
+
let queue = create_redis_queue::<String>(pool, "test-worker".to_string(), test_prefix, 1);
325
320
326
321
queue.push("test-item".to_string()).await.unwrap();
327
322
let pulled = queue.pull().await;
328
323
assert_eq!(pulled, Some("test-item".to_string()));
329
324
}
330
-
}
325
+
}
+2
-7
src/queue/mod.rs
+2
-7
src/queue/mod.rs
···
73
73
74
74
// Re-export factory functions
75
75
pub use factory::{
76
-
create_mpsc_queue,
77
-
create_mpsc_queue_from_channel,
78
-
create_noop_queue,
79
-
create_redis_queue,
80
-
create_sqlite_queue,
81
-
create_sqlite_queue_with_max_size,
76
+
create_mpsc_queue, create_mpsc_queue_from_channel, create_noop_queue, create_redis_queue,
77
+
create_sqlite_queue, create_sqlite_queue_with_max_size,
82
78
};
83
-
+4
-4
src/queue/mpsc.rs
+4
-4
src/queue/mpsc.rs
···
6
6
7
7
use async_trait::async_trait;
8
8
use std::sync::Arc;
9
-
use tokio::sync::{mpsc, Mutex};
9
+
use tokio::sync::{Mutex, mpsc};
10
10
11
11
use super::adapter::QueueAdapter;
12
12
use super::error::{QueueError, Result};
···
204
204
#[tokio::test]
205
205
async fn test_mpsc_queue_health() {
206
206
let queue = MpscQueueAdapter::<String>::new(10);
207
-
207
+
208
208
// Queue should be healthy initially
209
209
assert!(queue.is_healthy().await);
210
210
···
212
212
let (sender, receiver) = mpsc::channel::<String>(10);
213
213
drop(receiver);
214
214
let closed_queue = MpscQueueAdapter::from_channel(sender, mpsc::channel(1).1);
215
-
215
+
216
216
// Push should fail on closed queue
217
217
let result = closed_queue.push("test".to_string()).await;
218
218
assert!(result.is_err());
···
283
283
// Ack should always succeed (no-op)
284
284
queue.ack(&item).await.unwrap();
285
285
}
286
-
}
286
+
}
+1
-1
src/queue/noop.rs
+1
-1
src/queue/noop.rs
+3
-7
src/queue/redis.rs
+3
-7
src/queue/redis.rs
···
392
392
.unwrap()
393
393
.as_nanos()
394
394
);
395
-
let adapter = RedisQueueAdapter::<String>::new(
396
-
pool,
397
-
"test-worker-depth".to_string(),
398
-
test_prefix,
399
-
1,
400
-
);
395
+
let adapter =
396
+
RedisQueueAdapter::<String>::new(pool, "test-worker-depth".to_string(), test_prefix, 1);
401
397
402
398
// Initially empty
403
399
assert_eq!(adapter.depth().await, Some(0));
···
471
467
// Ack
472
468
adapter.ack(&work).await.unwrap();
473
469
}
474
-
}
470
+
}
+40
-37
src/queue/sqlite.rs
+40
-37
src/queue/sqlite.rs
···
165
165
let record = match sqlx::query(
166
166
"SELECT id, work FROM handle_resolution_queue
167
167
ORDER BY queued_at ASC
168
-
LIMIT 1"
168
+
LIMIT 1",
169
169
)
170
170
.fetch_optional(&mut *transaction)
171
171
.await
···
226
226
227
227
// Optimized approach: Insert first, then check if cleanup needed
228
228
// This avoids counting on every insert
229
-
sqlx::query(
230
-
"INSERT INTO handle_resolution_queue (work, queued_at) VALUES (?1, ?2)"
231
-
)
232
-
.bind(&work_json)
233
-
.bind(current_timestamp)
234
-
.execute(&self.pool)
235
-
.await
236
-
.map_err(|e| QueueError::PushFailed(format!("Failed to insert work item: {}", e)))?;
229
+
sqlx::query("INSERT INTO handle_resolution_queue (work, queued_at) VALUES (?1, ?2)")
230
+
.bind(&work_json)
231
+
.bind(current_timestamp)
232
+
.execute(&self.pool)
233
+
.await
234
+
.map_err(|e| QueueError::PushFailed(format!("Failed to insert work item: {}", e)))?;
237
235
238
236
// Implement optimized work shedding if max_size is configured
239
237
if self.max_size > 0 {
···
243
241
let approx_count: Option<i64> = sqlx::query_scalar(
244
242
"SELECT COUNT(*) FROM (
245
243
SELECT 1 FROM handle_resolution_queue LIMIT ?1
246
-
) AS limited_count"
244
+
) AS limited_count",
247
245
)
248
246
.bind(check_limit)
249
247
.fetch_one(&self.pool)
···
251
249
.map_err(|e| QueueError::PushFailed(format!("Failed to check queue size: {}", e)))?;
252
250
253
251
// Only perform cleanup if we're definitely over the limit
254
-
if let Some(count) = approx_count && count >= check_limit {
252
+
if let Some(count) = approx_count
253
+
&& count >= check_limit
254
+
{
255
255
// Perform batch cleanup - delete more than just the excess to reduce frequency
256
256
// Delete 20% more than needed to avoid frequent shedding
257
257
let target_size = (self.max_size as f64 * 0.8) as i64; // Keep 80% of max_size
258
258
let to_delete = count - target_size;
259
-
259
+
260
260
if to_delete > 0 {
261
261
// Optimized deletion: First get the cutoff id and timestamp
262
262
// This avoids the expensive subquery in the DELETE statement
263
263
let cutoff: Option<(i64, i64)> = sqlx::query_as(
264
264
"SELECT id, queued_at FROM handle_resolution_queue
265
265
ORDER BY queued_at ASC, id ASC
266
-
LIMIT 1 OFFSET ?1"
266
+
LIMIT 1 OFFSET ?1",
267
267
)
268
268
.bind(to_delete - 1)
269
269
.fetch_optional(&self.pool)
···
276
276
let deleted_result = sqlx::query(
277
277
"DELETE FROM handle_resolution_queue
278
278
WHERE queued_at < ?1
279
-
OR (queued_at = ?1 AND id <= ?2)"
279
+
OR (queued_at = ?1 AND id <= ?2)",
280
280
)
281
281
.bind(cutoff_timestamp)
282
282
.bind(cutoff_id)
283
283
.execute(&self.pool)
284
284
.await
285
-
.map_err(|e| QueueError::PushFailed(format!("Failed to delete excess entries: {}", e)))?;
285
+
.map_err(|e| {
286
+
QueueError::PushFailed(format!(
287
+
"Failed to delete excess entries: {}",
288
+
e
289
+
))
290
+
})?;
286
291
287
292
let deleted_count = deleted_result.rows_affected();
288
293
if deleted_count > 0 {
289
294
info!(
290
295
"Work shedding: deleted {} oldest entries (target size: {}, max: {})",
291
-
deleted_count,
292
-
target_size,
293
-
self.max_size
296
+
deleted_count, target_size, self.max_size
294
297
);
295
298
}
296
299
}
···
298
301
}
299
302
}
300
303
301
-
debug!("Pushed work item to SQLite queue (max_size: {})", self.max_size);
304
+
debug!(
305
+
"Pushed work item to SQLite queue (max_size: {})",
306
+
self.max_size
307
+
);
302
308
Ok(())
303
309
}
304
310
···
310
316
}
311
317
312
318
async fn depth(&self) -> Option<usize> {
313
-
match sqlx::query_scalar::<_, i64>(
314
-
"SELECT COUNT(*) FROM handle_resolution_queue"
315
-
)
316
-
.fetch_one(&self.pool)
317
-
.await
319
+
match sqlx::query_scalar::<_, i64>("SELECT COUNT(*) FROM handle_resolution_queue")
320
+
.fetch_one(&self.pool)
321
+
.await
318
322
{
319
323
Ok(count) => Some(count as usize),
320
324
Err(e) => {
···
380
384
let adapter = SqliteQueueAdapter::<HandleResolutionWork>::new(pool);
381
385
382
386
// Push multiple items
383
-
let handles = vec!["alice.example.com", "bob.example.com", "charlie.example.com"];
387
+
let handles = vec![
388
+
"alice.example.com",
389
+
"bob.example.com",
390
+
"charlie.example.com",
391
+
];
384
392
for handle in &handles {
385
393
let work = HandleResolutionWork::new(handle.to_string());
386
394
adapter.push(work).await.unwrap();
···
419
427
#[tokio::test]
420
428
async fn test_sqlite_queue_work_shedding() {
421
429
let pool = create_test_pool().await;
422
-
430
+
423
431
// Create adapter with small max_size for testing
424
432
let max_size = 10;
425
-
let adapter = SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(
426
-
pool.clone(),
427
-
max_size
428
-
);
433
+
let adapter =
434
+
SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(pool.clone(), max_size);
429
435
430
436
// Push items up to the limit (should not trigger shedding)
431
437
for i in 0..max_size {
···
446
452
// After triggering shedding, queue should be around 80% of max_size
447
453
let depth_after_shedding = adapter.depth().await.unwrap();
448
454
let expected_size = (max_size as f64 * 0.8) as usize;
449
-
455
+
450
456
// Allow some variance due to batch deletion
451
457
assert!(
452
458
depth_after_shedding <= expected_size + 1,
···
459
465
#[tokio::test]
460
466
async fn test_sqlite_queue_work_shedding_disabled() {
461
467
let pool = create_test_pool().await;
462
-
468
+
463
469
// Create adapter with max_size = 0 (disabled work shedding)
464
-
let adapter = SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(
465
-
pool,
466
-
0
467
-
);
470
+
let adapter = SqliteQueueAdapter::<HandleResolutionWork>::with_max_size(pool, 0);
468
471
469
472
// Push many items (should not trigger any shedding)
470
473
for i in 0..100 {
···
499
502
let pulled = adapter.pull().await;
500
503
assert_eq!(pulled, Some(work));
501
504
}
502
-
}
505
+
}
+5
-5
src/queue/work.rs
+5
-5
src/queue/work.rs
···
64
64
#[test]
65
65
fn test_handle_resolution_work_serialization() {
66
66
let work = HandleResolutionWork::new("bob.example.com".to_string());
67
-
67
+
68
68
// Test JSON serialization (which is what we actually use in the queue adapters)
69
69
let json = serde_json::to_string(&work).expect("Failed to serialize to JSON");
70
-
let deserialized: HandleResolutionWork =
70
+
let deserialized: HandleResolutionWork =
71
71
serde_json::from_str(&json).expect("Failed to deserialize from JSON");
72
72
assert_eq!(work, deserialized);
73
-
73
+
74
74
// Verify the JSON structure
75
75
let json_value: serde_json::Value = serde_json::from_str(&json).unwrap();
76
76
assert_eq!(json_value["handle"], "bob.example.com");
···
88
88
let work1 = HandleResolutionWork::new("alice.example.com".to_string());
89
89
let work2 = HandleResolutionWork::new("alice.example.com".to_string());
90
90
let work3 = HandleResolutionWork::new("bob.example.com".to_string());
91
-
91
+
92
92
assert_eq!(work1, work2);
93
93
assert_ne!(work1, work3);
94
94
}
95
-
}
95
+
}
+14
-15
src/sqlite_schema.rs
+14
-15
src/sqlite_schema.rs
···
4
4
//! schema used by the SQLite-backed handle resolver cache.
5
5
6
6
use anyhow::Result;
7
-
use sqlx::{SqlitePool, migrate::MigrateDatabase, Sqlite};
7
+
use sqlx::{Sqlite, SqlitePool, migrate::MigrateDatabase};
8
8
use std::path::Path;
9
9
10
10
/// SQL schema for the handle resolution cache table.
···
60
60
tracing::info!("Initializing SQLite database: {}", database_url);
61
61
62
62
// Extract the database path from the URL for file-based databases
63
-
if let Some(path) = database_url.strip_prefix("sqlite:")
64
-
&& path != ":memory:"
65
-
&& !path.is_empty()
63
+
if let Some(path) = database_url.strip_prefix("sqlite:")
64
+
&& path != ":memory:"
65
+
&& !path.is_empty()
66
66
{
67
67
// Create the database file if it doesn't exist
68
68
if !Sqlite::database_exists(database_url).await? {
···
71
71
}
72
72
73
73
// Ensure the parent directory exists
74
-
if let Some(parent) = Path::new(path).parent()
75
-
&& !parent.exists()
74
+
if let Some(parent) = Path::new(path).parent()
75
+
&& !parent.exists()
76
76
{
77
77
tracing::info!("Creating directory: {}", parent.display());
78
78
std::fs::create_dir_all(parent)?;
···
114
114
sqlx::query(CREATE_HANDLE_RESOLUTION_CACHE_TABLE)
115
115
.execute(pool)
116
116
.await?;
117
-
117
+
118
118
sqlx::query(CREATE_HANDLE_RESOLUTION_QUEUE_TABLE)
119
119
.execute(pool)
120
120
.await?;
···
249
249
250
250
let cutoff_timestamp = current_timestamp - (max_age_seconds as i64);
251
251
252
-
let result = sqlx::query(
253
-
"DELETE FROM handle_resolution_queue WHERE queued_at < ?1"
254
-
)
255
-
.bind(cutoff_timestamp)
256
-
.execute(pool)
257
-
.await?;
252
+
let result = sqlx::query("DELETE FROM handle_resolution_queue WHERE queued_at < ?1")
253
+
.bind(cutoff_timestamp)
254
+
.execute(pool)
255
+
.await?;
258
256
259
257
let deleted_count = result.rows_affected();
260
258
if deleted_count > 0 {
···
325
323
let old_timestamp = std::time::SystemTime::now()
326
324
.duration_since(std::time::UNIX_EPOCH)
327
325
.unwrap()
328
-
.as_secs() as i64 - 3600; // 1 hour ago
326
+
.as_secs() as i64
327
+
- 3600; // 1 hour ago
329
328
330
329
sqlx::query(
331
330
"INSERT INTO handle_resolution_cache (key, result, created, updated) VALUES (1, ?1, ?2, ?2)"
···
380
379
assert_eq!(total_entries, 1);
381
380
assert!(size_bytes > 0);
382
381
}
383
-
}
382
+
}
+2
-2
src/test_helpers.rs
+2
-2
src/test_helpers.rs
···
4
4
use deadpool_redis::Pool;
5
5
6
6
/// Helper function to get a Redis pool for testing.
7
-
///
7
+
///
8
8
/// Returns None if TEST_REDIS_URL is not set, logging a skip message.
9
9
/// This consolidates the repeated Redis test setup code.
10
10
pub(crate) fn get_test_redis_pool() -> Option<Pool> {
···
32
32
None => return,
33
33
}
34
34
};
35
-
}
35
+
}