use anyhow::Result; use atproto_identity::{ config::{CertificateBundles, DnsNameservers}, resolve::HickoryDnsResolver, }; use atproto_jetstream::{Consumer as JetstreamConsumer, ConsumerTaskConfig}; use quickdid::{ cache::create_redis_pool, config::Config, handle_resolver::{ create_base_resolver, create_caching_resolver, create_proactive_refresh_resolver_with_metrics, create_rate_limited_resolver_with_timeout, create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl, }, handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config}, http::{AppContext, create_router}, jetstream_handler::QuickDidEventHandler, metrics::create_metrics_publisher, queue::{ HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue, create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue, create_sqlite_queue_with_max_size, }, sqlite_schema::create_sqlite_pool, task_manager::spawn_cancellable_task, }; use std::sync::Arc; use tokio::signal; use tokio_util::{sync::CancellationToken, task::TaskTracker}; use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; /// Helper function to create a Redis pool with consistent error handling fn try_create_redis_pool(redis_url: &str, purpose: &str) -> Option { match create_redis_pool(redis_url) { Ok(pool) => { tracing::info!("Redis pool created for {}", purpose); Some(pool) } Err(e) => { tracing::warn!("Failed to create Redis pool for {}: {}", purpose, e); None } } } /// Helper function to create a SQLite pool with consistent error handling async fn try_create_sqlite_pool(sqlite_url: &str, purpose: &str) -> Option { match create_sqlite_pool(sqlite_url).await { Ok(pool) => { tracing::info!("SQLite pool created for {}", purpose); Some(pool) } Err(e) => { tracing::warn!("Failed to create SQLite pool for {}: {}", purpose, e); None } } } /// Simple command-line argument handling for --version and --help fn handle_simple_args() -> bool { let args: Vec = std::env::args().collect(); if args.len() > 1 { match args[1].as_str() { "--version" | "-V" => { println!("quickdid {}", env!("CARGO_PKG_VERSION")); return true; } "--help" | "-h" => { println!("QuickDID - AT Protocol Identity Resolver Service"); println!("Version: {}", env!("CARGO_PKG_VERSION")); println!(); println!("USAGE:"); println!(" quickdid [OPTIONS]"); println!(); println!("OPTIONS:"); println!(" -h, --help Print help information"); println!(" -V, --version Print version information"); println!(); println!("ENVIRONMENT VARIABLES:"); println!( " HTTP_EXTERNAL External hostname for service endpoints (required)" ); println!(" HTTP_PORT HTTP server port (default: 8080)"); println!(" PLC_HOSTNAME PLC directory hostname (default: plc.directory)"); println!( " USER_AGENT HTTP User-Agent header (auto-generated with version)" ); println!(" DNS_NAMESERVERS Custom DNS nameservers (comma-separated IPs)"); println!( " CERTIFICATE_BUNDLES Additional CA certificates (comma-separated paths)" ); println!(); println!(" CACHING:"); println!(" REDIS_URL Redis URL for handle resolution caching"); println!( " SQLITE_URL SQLite database URL for handle resolution caching" ); println!( " CACHE_TTL_MEMORY TTL for in-memory cache in seconds (default: 600)" ); println!( " CACHE_TTL_REDIS TTL for Redis cache in seconds (default: 7776000)" ); println!( " CACHE_TTL_SQLITE TTL for SQLite cache in seconds (default: 7776000)" ); println!(); println!(" QUEUE CONFIGURATION:"); println!( " QUEUE_ADAPTER Queue adapter: 'mpsc', 'redis', 'sqlite', 'noop' (default: mpsc)" ); println!(" QUEUE_REDIS_URL Redis URL for queue adapter"); println!( " QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)" ); println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)"); println!( " QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)" ); println!(" QUEUE_REDIS_DEDUP_TTL TTL for dedup keys in seconds (default: 60)"); println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)"); println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)"); println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)"); println!(); println!(" RATE LIMITING:"); println!( " RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)" ); println!( " RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)" ); println!(); println!(" METRICS:"); println!( " METRICS_ADAPTER Metrics adapter: 'noop' or 'statsd' (default: noop)" ); println!( " METRICS_STATSD_HOST StatsD host when using statsd adapter (e.g., localhost:8125)" ); println!( " METRICS_STATSD_BIND Bind address for StatsD UDP socket (default: [::]:0)" ); println!(" METRICS_PREFIX Prefix for all metrics (default: quickdid)"); println!( " METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)" ); println!(); println!(" PROACTIVE CACHE REFRESH:"); println!( " PROACTIVE_REFRESH_ENABLED Enable proactive cache refresh (default: false)" ); println!( " PROACTIVE_REFRESH_THRESHOLD Threshold as percentage of TTL (0.0-1.0, default: 0.8)" ); println!(); println!(" JETSTREAM:"); println!(" JETSTREAM_ENABLED Enable Jetstream consumer (default: false)"); println!( " JETSTREAM_HOSTNAME Jetstream hostname (default: jetstream.atproto.tools)" ); println!(); println!( "For more information, visit: https://github.com/smokesignal.events/quickdid" ); return true; } _ => {} } } false } #[tokio::main] async fn main() -> Result<()> { // Handle --version and --help if handle_simple_args() { return Ok(()); } // Initialize tracing tracing_subscriber::registry() .with( tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { "quickdid=info,atproto_identity=debug,atproto_xrpcs=debug".into() }), ) .with(tracing_subscriber::fmt::layer()) .init(); let config = Config::from_env()?; // Validate configuration config.validate()?; tracing::info!("Starting QuickDID service on port {}", config.http_port); tracing::info!( "Cache TTL - Memory: {}s, Redis: {}s, SQLite: {}s", config.cache_ttl_memory, config.cache_ttl_redis, config.cache_ttl_sqlite ); // Parse certificate bundles if provided let certificate_bundles: CertificateBundles = config .certificate_bundles .clone() .unwrap_or_default() .try_into()?; // Parse DNS nameservers if provided let dns_nameservers: DnsNameservers = config .dns_nameservers .clone() .unwrap_or_default() .try_into()?; // Build HTTP client let mut client_builder = reqwest::Client::builder(); for ca_certificate in certificate_bundles.as_ref() { let cert = std::fs::read(ca_certificate)?; let cert = reqwest::Certificate::from_pem(&cert)?; client_builder = client_builder.add_root_certificate(cert); } client_builder = client_builder.user_agent(&config.user_agent); let http_client = client_builder.build()?; // Create DNS resolver let dns_resolver = HickoryDnsResolver::create_resolver(dns_nameservers.as_ref()); // Create DNS resolver Arc for sharing let dns_resolver_arc = Arc::new(dns_resolver); // Create metrics publisher based on configuration let metrics_publisher = create_metrics_publisher(&config).map_err(|e| { tracing::error!("Failed to create metrics publisher: {}", e); anyhow::anyhow!("Failed to create metrics publisher: {}", e) })?; tracing::info!( "Metrics publisher created with {} adapter", config.metrics_adapter ); metrics_publisher.gauge("server", 1).await; // Create base handle resolver using factory function let mut base_handle_resolver = create_base_resolver( dns_resolver_arc.clone(), http_client.clone(), metrics_publisher.clone(), ); // Apply rate limiting if configured if config.resolver_max_concurrent > 0 { let timeout_info = if config.resolver_max_concurrent_timeout_ms > 0 { format!(", {}ms timeout", config.resolver_max_concurrent_timeout_ms) } else { String::new() }; tracing::info!( "Applying rate limiting to handle resolver (max {} concurrent resolutions{})", config.resolver_max_concurrent, timeout_info ); base_handle_resolver = create_rate_limited_resolver_with_timeout( base_handle_resolver, config.resolver_max_concurrent, config.resolver_max_concurrent_timeout_ms, metrics_publisher.clone(), ); } // Create Redis pool if configured let redis_pool = config .redis_url .as_ref() .and_then(|url| try_create_redis_pool(url, "handle resolver cache")); // Create SQLite pool if configured let sqlite_pool = if let Some(url) = config.sqlite_url.as_ref() { try_create_sqlite_pool(url, "handle resolver cache").await } else { None }; // Create task tracker and cancellation token let tracker = TaskTracker::new(); let token = CancellationToken::new(); // Create the queue adapter first (needed for proactive refresh) let handle_queue: Arc> = { // Create queue adapter based on configuration let adapter: Arc> = match config .queue_adapter .as_str() { "redis" => { // Use queue-specific Redis URL, fall back to general Redis URL let queue_redis_url = config .queue_redis_url .as_ref() .or(config.redis_url.as_ref()); if let Some(url) = queue_redis_url { if let Some(pool) = try_create_redis_pool(url, "queue adapter") { tracing::info!( "Creating Redis queue adapter with prefix: {}, dedup: {}, dedup_ttl: {}s", config.queue_redis_prefix, config.queue_redis_dedup_enabled, config.queue_redis_dedup_ttl ); if config.queue_redis_dedup_enabled { create_redis_queue_with_dedup::( pool, config.queue_worker_id.clone(), config.queue_redis_prefix.clone(), config.queue_redis_timeout, true, config.queue_redis_dedup_ttl, ) } else { create_redis_queue::( pool, config.queue_worker_id.clone(), config.queue_redis_prefix.clone(), config.queue_redis_timeout, ) } } else { tracing::warn!("Falling back to MPSC queue adapter"); // Fall back to MPSC if Redis fails let (handle_sender, handle_receiver) = tokio::sync::mpsc::channel::( config.queue_buffer_size, ); create_mpsc_queue_from_channel(handle_sender, handle_receiver) } } else { tracing::warn!( "Redis queue adapter requested but no Redis URL configured, using no-op adapter" ); create_noop_queue::() } } "sqlite" => { // Use SQLite adapter if let Some(url) = config.sqlite_url.as_ref() { if let Some(pool) = try_create_sqlite_pool(url, "queue adapter").await { if config.queue_sqlite_max_size > 0 { tracing::info!( "Creating SQLite queue adapter with work shedding (max_size: {})", config.queue_sqlite_max_size ); create_sqlite_queue_with_max_size::( pool, config.queue_sqlite_max_size, ) } else { tracing::info!("Creating SQLite queue adapter (unlimited size)"); create_sqlite_queue::(pool) } } else { tracing::warn!( "Failed to create SQLite pool for queue, falling back to MPSC queue adapter" ); // Fall back to MPSC if SQLite fails let (handle_sender, handle_receiver) = tokio::sync::mpsc::channel::( config.queue_buffer_size, ); create_mpsc_queue_from_channel(handle_sender, handle_receiver) } } else { tracing::warn!( "SQLite queue adapter requested but no SQLite URL configured, using no-op adapter" ); create_noop_queue::() } } "mpsc" => { // Use MPSC adapter tracing::info!( "Using MPSC queue adapter with buffer size: {}", config.queue_buffer_size ); let (handle_sender, handle_receiver) = tokio::sync::mpsc::channel::(config.queue_buffer_size); create_mpsc_queue_from_channel(handle_sender, handle_receiver) } "noop" | "none" => { // Use no-op adapter tracing::info!("Using no-op queue adapter (queuing disabled)"); create_noop_queue::() } _ => { // Default to no-op adapter for unknown types tracing::warn!( "Unknown queue adapter type '{}', using no-op adapter", config.queue_adapter ); create_noop_queue::() } }; adapter }; // Create handle resolver with cache priority: Redis > SQLite > In-memory let (mut handle_resolver, cache_ttl): ( Arc, u64, ) = if let Some(pool) = redis_pool { tracing::info!( "Using Redis-backed handle resolver with {}-second cache TTL", config.cache_ttl_redis ); ( create_redis_resolver_with_ttl( base_handle_resolver, pool, config.cache_ttl_redis, metrics_publisher.clone(), ), config.cache_ttl_redis, ) } else if let Some(pool) = sqlite_pool { tracing::info!( "Using SQLite-backed handle resolver with {}-second cache TTL", config.cache_ttl_sqlite ); ( create_sqlite_resolver_with_ttl( base_handle_resolver, pool, config.cache_ttl_sqlite, metrics_publisher.clone(), ), config.cache_ttl_sqlite, ) } else { tracing::info!( "Using in-memory handle resolver with {}-second cache TTL", config.cache_ttl_memory ); ( create_caching_resolver( base_handle_resolver, config.cache_ttl_memory, metrics_publisher.clone(), ), config.cache_ttl_memory, ) }; // Apply proactive refresh if enabled if config.proactive_refresh_enabled && !matches!(config.queue_adapter.as_str(), "noop" | "none") { tracing::info!( "Enabling proactive cache refresh with {}% threshold", (config.proactive_refresh_threshold * 100.0) as u32 ); handle_resolver = create_proactive_refresh_resolver_with_metrics( handle_resolver, handle_queue.clone(), metrics_publisher.clone(), cache_ttl, config.proactive_refresh_threshold, ); } else if config.proactive_refresh_enabled { tracing::warn!( "Proactive refresh enabled but queue adapter is no-op, skipping proactive refresh" ); } // Setup background handle resolution task { let adapter_for_task = handle_queue.clone(); // Only spawn handle resolver task if not using noop adapter if !matches!(config.queue_adapter.as_str(), "noop" | "none") { // Create handle resolver task configuration let handle_task_config = HandleResolverTaskConfig { default_timeout_ms: 10000, }; // Create and start handle resolver task let handle_task = create_handle_resolver_task_with_config( adapter_for_task, handle_resolver.clone(), token.clone(), handle_task_config, metrics_publisher.clone(), ); // Spawn the handle resolver task spawn_cancellable_task( &tracker, token.clone(), "handle_resolver", |cancel_token| async move { tokio::select! { result = handle_task.run() => { if let Err(e) = result { tracing::error!(error = ?e, "Handle resolver task failed"); Err(anyhow::anyhow!(e)) } else { Ok(()) } } _ = cancel_token.cancelled() => { tracing::info!("Handle resolver task cancelled"); Ok(()) } } }, ); tracing::info!( "Background handle resolution task started with {} adapter", config.queue_adapter ); } else { tracing::info!("Background handle resolution task disabled (using no-op adapter)"); } }; // Create app context with the queue adapter let app_context = AppContext::new( handle_resolver.clone(), handle_queue, metrics_publisher.clone(), config.etag_seed.clone(), config.cache_control_header.clone(), config.static_files_dir.clone(), ); // Create router let router = create_router(app_context); // Setup signal handler { let signal_tracker = tracker.clone(); let signal_token = token.clone(); // Spawn signal handler without using the managed task helper since it's special tracing::info!("Starting signal handler task"); tokio::spawn(async move { let ctrl_c = async { signal::ctrl_c() .await .expect("failed to install Ctrl+C handler"); }; #[cfg(unix)] let terminate = async { signal::unix::signal(signal::unix::SignalKind::terminate()) .expect("failed to install signal handler") .recv() .await; }; #[cfg(not(unix))] let terminate = std::future::pending::<()>(); tokio::select! { () = signal_token.cancelled() => { tracing::info!("Signal handler task shutting down gracefully"); }, _ = terminate => { tracing::info!("Received SIGTERM signal, initiating shutdown"); }, _ = ctrl_c => { tracing::info!("Received Ctrl+C signal, initiating shutdown"); }, } signal_tracker.close(); signal_token.cancel(); tracing::info!("Signal handler task completed"); }); } // Start Jetstream consumer if enabled if config.jetstream_enabled { let jetstream_resolver = handle_resolver.clone(); let jetstream_metrics = metrics_publisher.clone(); let jetstream_hostname = config.jetstream_hostname.clone(); let jetstream_user_agent = config.user_agent.clone(); spawn_cancellable_task( &tracker, token.clone(), "jetstream_consumer", move |cancel_token| async move { tracing::info!(hostname = %jetstream_hostname, "Starting Jetstream consumer"); // Create event handler let event_handler = Arc::new(QuickDidEventHandler::new( jetstream_resolver, jetstream_metrics.clone(), )); // Reconnection loop let mut reconnect_count = 0u32; let max_reconnects_per_minute = 5; let reconnect_window = std::time::Duration::from_secs(60); let mut last_disconnect = std::time::Instant::now() - reconnect_window; while !cancel_token.is_cancelled() { let now = std::time::Instant::now(); if now.duration_since(last_disconnect) < reconnect_window { reconnect_count += 1; if reconnect_count > max_reconnects_per_minute { tracing::warn!( count = reconnect_count, "Too many Jetstream reconnects, waiting 60 seconds" ); tokio::time::sleep(reconnect_window).await; reconnect_count = 0; last_disconnect = now; continue; } } else { reconnect_count = 0; } // Create consumer configuration let consumer_config = ConsumerTaskConfig { user_agent: jetstream_user_agent.clone(), compression: false, zstd_dictionary_location: String::new(), jetstream_hostname: jetstream_hostname.clone(), // Listen to the "community.lexicon.collection.fake" collection // so that we keep an active connection open but only for // account and identity events. collections: vec!["community.lexicon.collection.fake".to_string()], // Listen to all collections dids: vec![], max_message_size_bytes: None, cursor: None, require_hello: true, }; let consumer = JetstreamConsumer::new(consumer_config); // Register event handler if let Err(e) = consumer.register_handler(event_handler.clone()).await { tracing::error!(error = ?e, "Failed to register Jetstream event handler"); continue; } // Run consumer with cancellation support match consumer.run_background(cancel_token.clone()).await { Ok(()) => { tracing::info!("Jetstream consumer stopped normally"); if cancel_token.is_cancelled() { break; } last_disconnect = std::time::Instant::now(); tokio::time::sleep(std::time::Duration::from_secs(5)).await; } Err(e) => { tracing::error!(error = ?e, "Jetstream consumer connection failed, will reconnect"); jetstream_metrics.incr("jetstream.connection.error").await; last_disconnect = std::time::Instant::now(); if !cancel_token.is_cancelled() { tokio::time::sleep(std::time::Duration::from_secs(5)).await; } } } } tracing::info!("Jetstream consumer task shutting down"); Ok(()) }, ); } else { tracing::info!("Jetstream consumer disabled"); } // Start HTTP server with cancellation support let bind_address = format!("0.0.0.0:{}", config.http_port); spawn_cancellable_task( &tracker, token.clone(), "http", move |cancel_token| async move { let listener = tokio::net::TcpListener::bind(&bind_address) .await .map_err(|e| anyhow::anyhow!("Failed to bind to {}: {}", bind_address, e))?; tracing::info!("QuickDID service listening on {}", bind_address); let shutdown_token = cancel_token.clone(); axum::serve(listener, router) .with_graceful_shutdown(async move { shutdown_token.cancelled().await; }) .await .map_err(|e| anyhow::anyhow!("HTTP server error: {}", e))?; Ok(()) }, ); // Wait for all tasks to complete tracing::info!("Waiting for all tasks to complete..."); tracker.wait().await; tracing::info!("All tasks completed, application shutting down"); Ok(()) }