QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
at main 29 kB view raw
1use anyhow::Result; 2use atproto_identity::{ 3 config::{CertificateBundles, DnsNameservers}, 4 resolve::HickoryDnsResolver, 5}; 6use atproto_jetstream::{Consumer as JetstreamConsumer, ConsumerTaskConfig}; 7use quickdid::{ 8 cache::create_redis_pool, 9 config::Config, 10 handle_resolver::{ 11 create_base_resolver, create_caching_resolver, 12 create_proactive_refresh_resolver_with_metrics, create_rate_limited_resolver_with_timeout, 13 create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl, 14 }, 15 handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config}, 16 http::{AppContext, create_router}, 17 jetstream_handler::QuickDidEventHandler, 18 metrics::create_metrics_publisher, 19 queue::{ 20 HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue, 21 create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue, 22 create_sqlite_queue_with_max_size, 23 }, 24 sqlite_schema::create_sqlite_pool, 25 task_manager::spawn_cancellable_task, 26}; 27use std::sync::Arc; 28use tokio::signal; 29use tokio_util::{sync::CancellationToken, task::TaskTracker}; 30use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt}; 31 32/// Helper function to create a Redis pool with consistent error handling 33fn try_create_redis_pool(redis_url: &str, purpose: &str) -> Option<deadpool_redis::Pool> { 34 match create_redis_pool(redis_url) { 35 Ok(pool) => { 36 tracing::info!("Redis pool created for {}", purpose); 37 Some(pool) 38 } 39 Err(e) => { 40 tracing::warn!("Failed to create Redis pool for {}: {}", purpose, e); 41 None 42 } 43 } 44} 45 46/// Helper function to create a SQLite pool with consistent error handling 47async fn try_create_sqlite_pool(sqlite_url: &str, purpose: &str) -> Option<sqlx::SqlitePool> { 48 match create_sqlite_pool(sqlite_url).await { 49 Ok(pool) => { 50 tracing::info!("SQLite pool created for {}", purpose); 51 Some(pool) 52 } 53 Err(e) => { 54 tracing::warn!("Failed to create SQLite pool for {}: {}", purpose, e); 55 None 56 } 57 } 58} 59 60/// Simple command-line argument handling for --version and --help 61fn handle_simple_args() -> bool { 62 let args: Vec<String> = std::env::args().collect(); 63 64 if args.len() > 1 { 65 match args[1].as_str() { 66 "--version" | "-V" => { 67 println!("quickdid {}", env!("CARGO_PKG_VERSION")); 68 return true; 69 } 70 "--help" | "-h" => { 71 println!("QuickDID - AT Protocol Identity Resolver Service"); 72 println!("Version: {}", env!("CARGO_PKG_VERSION")); 73 println!(); 74 println!("USAGE:"); 75 println!(" quickdid [OPTIONS]"); 76 println!(); 77 println!("OPTIONS:"); 78 println!(" -h, --help Print help information"); 79 println!(" -V, --version Print version information"); 80 println!(); 81 println!("ENVIRONMENT VARIABLES:"); 82 println!( 83 " HTTP_EXTERNAL External hostname for service endpoints (required)" 84 ); 85 println!(" HTTP_PORT HTTP server port (default: 8080)"); 86 println!(" PLC_HOSTNAME PLC directory hostname (default: plc.directory)"); 87 println!( 88 " USER_AGENT HTTP User-Agent header (auto-generated with version)" 89 ); 90 println!(" DNS_NAMESERVERS Custom DNS nameservers (comma-separated IPs)"); 91 println!( 92 " CERTIFICATE_BUNDLES Additional CA certificates (comma-separated paths)" 93 ); 94 println!(); 95 println!(" CACHING:"); 96 println!(" REDIS_URL Redis URL for handle resolution caching"); 97 println!( 98 " SQLITE_URL SQLite database URL for handle resolution caching" 99 ); 100 println!( 101 " CACHE_TTL_MEMORY TTL for in-memory cache in seconds (default: 600)" 102 ); 103 println!( 104 " CACHE_TTL_REDIS TTL for Redis cache in seconds (default: 7776000)" 105 ); 106 println!( 107 " CACHE_TTL_SQLITE TTL for SQLite cache in seconds (default: 7776000)" 108 ); 109 println!(); 110 println!(" QUEUE CONFIGURATION:"); 111 println!( 112 " QUEUE_ADAPTER Queue adapter: 'mpsc', 'redis', 'sqlite', 'noop' (default: mpsc)" 113 ); 114 println!(" QUEUE_REDIS_URL Redis URL for queue adapter"); 115 println!( 116 " QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)" 117 ); 118 println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)"); 119 println!( 120 " QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)" 121 ); 122 println!(" QUEUE_REDIS_DEDUP_TTL TTL for dedup keys in seconds (default: 60)"); 123 println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)"); 124 println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)"); 125 println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)"); 126 println!(); 127 println!(" RATE LIMITING:"); 128 println!( 129 " RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)" 130 ); 131 println!( 132 " RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)" 133 ); 134 println!(); 135 println!(" METRICS:"); 136 println!( 137 " METRICS_ADAPTER Metrics adapter: 'noop' or 'statsd' (default: noop)" 138 ); 139 println!( 140 " METRICS_STATSD_HOST StatsD host when using statsd adapter (e.g., localhost:8125)" 141 ); 142 println!( 143 " METRICS_STATSD_BIND Bind address for StatsD UDP socket (default: [::]:0)" 144 ); 145 println!(" METRICS_PREFIX Prefix for all metrics (default: quickdid)"); 146 println!( 147 " METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)" 148 ); 149 println!(); 150 println!(" PROACTIVE CACHE REFRESH:"); 151 println!( 152 " PROACTIVE_REFRESH_ENABLED Enable proactive cache refresh (default: false)" 153 ); 154 println!( 155 " PROACTIVE_REFRESH_THRESHOLD Threshold as percentage of TTL (0.0-1.0, default: 0.8)" 156 ); 157 println!(); 158 println!(" JETSTREAM:"); 159 println!(" JETSTREAM_ENABLED Enable Jetstream consumer (default: false)"); 160 println!( 161 " JETSTREAM_HOSTNAME Jetstream hostname (default: jetstream.atproto.tools)" 162 ); 163 println!(); 164 println!( 165 "For more information, visit: https://github.com/smokesignal.events/quickdid" 166 ); 167 return true; 168 } 169 _ => {} 170 } 171 } 172 173 false 174} 175 176#[tokio::main] 177async fn main() -> Result<()> { 178 // Handle --version and --help 179 if handle_simple_args() { 180 return Ok(()); 181 } 182 183 // Initialize tracing 184 tracing_subscriber::registry() 185 .with( 186 tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| { 187 "quickdid=info,atproto_identity=debug,atproto_xrpcs=debug".into() 188 }), 189 ) 190 .with(tracing_subscriber::fmt::layer()) 191 .init(); 192 193 let config = Config::from_env()?; 194 195 // Validate configuration 196 config.validate()?; 197 198 tracing::info!("Starting QuickDID service on port {}", config.http_port); 199 tracing::info!( 200 "Cache TTL - Memory: {}s, Redis: {}s, SQLite: {}s", 201 config.cache_ttl_memory, 202 config.cache_ttl_redis, 203 config.cache_ttl_sqlite 204 ); 205 206 // Parse certificate bundles if provided 207 let certificate_bundles: CertificateBundles = config 208 .certificate_bundles 209 .clone() 210 .unwrap_or_default() 211 .try_into()?; 212 213 // Parse DNS nameservers if provided 214 let dns_nameservers: DnsNameservers = config 215 .dns_nameservers 216 .clone() 217 .unwrap_or_default() 218 .try_into()?; 219 220 // Build HTTP client 221 let mut client_builder = reqwest::Client::builder(); 222 for ca_certificate in certificate_bundles.as_ref() { 223 let cert = std::fs::read(ca_certificate)?; 224 let cert = reqwest::Certificate::from_pem(&cert)?; 225 client_builder = client_builder.add_root_certificate(cert); 226 } 227 client_builder = client_builder.user_agent(&config.user_agent); 228 let http_client = client_builder.build()?; 229 230 // Create DNS resolver 231 let dns_resolver = HickoryDnsResolver::create_resolver(dns_nameservers.as_ref()); 232 233 // Create DNS resolver Arc for sharing 234 let dns_resolver_arc = Arc::new(dns_resolver); 235 236 // Create metrics publisher based on configuration 237 let metrics_publisher = create_metrics_publisher(&config).map_err(|e| { 238 tracing::error!("Failed to create metrics publisher: {}", e); 239 anyhow::anyhow!("Failed to create metrics publisher: {}", e) 240 })?; 241 242 tracing::info!( 243 "Metrics publisher created with {} adapter", 244 config.metrics_adapter 245 ); 246 247 metrics_publisher.gauge("server", 1).await; 248 249 // Create base handle resolver using factory function 250 let mut base_handle_resolver = create_base_resolver( 251 dns_resolver_arc.clone(), 252 http_client.clone(), 253 metrics_publisher.clone(), 254 ); 255 256 // Apply rate limiting if configured 257 if config.resolver_max_concurrent > 0 { 258 let timeout_info = if config.resolver_max_concurrent_timeout_ms > 0 { 259 format!(", {}ms timeout", config.resolver_max_concurrent_timeout_ms) 260 } else { 261 String::new() 262 }; 263 tracing::info!( 264 "Applying rate limiting to handle resolver (max {} concurrent resolutions{})", 265 config.resolver_max_concurrent, 266 timeout_info 267 ); 268 base_handle_resolver = create_rate_limited_resolver_with_timeout( 269 base_handle_resolver, 270 config.resolver_max_concurrent, 271 config.resolver_max_concurrent_timeout_ms, 272 metrics_publisher.clone(), 273 ); 274 } 275 276 // Create Redis pool if configured 277 let redis_pool = config 278 .redis_url 279 .as_ref() 280 .and_then(|url| try_create_redis_pool(url, "handle resolver cache")); 281 282 // Create SQLite pool if configured 283 let sqlite_pool = if let Some(url) = config.sqlite_url.as_ref() { 284 try_create_sqlite_pool(url, "handle resolver cache").await 285 } else { 286 None 287 }; 288 289 // Create task tracker and cancellation token 290 let tracker = TaskTracker::new(); 291 let token = CancellationToken::new(); 292 293 // Create the queue adapter first (needed for proactive refresh) 294 let handle_queue: Arc<dyn QueueAdapter<HandleResolutionWork>> = { 295 // Create queue adapter based on configuration 296 let adapter: Arc<dyn QueueAdapter<HandleResolutionWork>> = match config 297 .queue_adapter 298 .as_str() 299 { 300 "redis" => { 301 // Use queue-specific Redis URL, fall back to general Redis URL 302 let queue_redis_url = config 303 .queue_redis_url 304 .as_ref() 305 .or(config.redis_url.as_ref()); 306 307 if let Some(url) = queue_redis_url { 308 if let Some(pool) = try_create_redis_pool(url, "queue adapter") { 309 tracing::info!( 310 "Creating Redis queue adapter with prefix: {}, dedup: {}, dedup_ttl: {}s", 311 config.queue_redis_prefix, 312 config.queue_redis_dedup_enabled, 313 config.queue_redis_dedup_ttl 314 ); 315 if config.queue_redis_dedup_enabled { 316 create_redis_queue_with_dedup::<HandleResolutionWork>( 317 pool, 318 config.queue_worker_id.clone(), 319 config.queue_redis_prefix.clone(), 320 config.queue_redis_timeout, 321 true, 322 config.queue_redis_dedup_ttl, 323 ) 324 } else { 325 create_redis_queue::<HandleResolutionWork>( 326 pool, 327 config.queue_worker_id.clone(), 328 config.queue_redis_prefix.clone(), 329 config.queue_redis_timeout, 330 ) 331 } 332 } else { 333 tracing::warn!("Falling back to MPSC queue adapter"); 334 // Fall back to MPSC if Redis fails 335 let (handle_sender, handle_receiver) = 336 tokio::sync::mpsc::channel::<HandleResolutionWork>( 337 config.queue_buffer_size, 338 ); 339 create_mpsc_queue_from_channel(handle_sender, handle_receiver) 340 } 341 } else { 342 tracing::warn!( 343 "Redis queue adapter requested but no Redis URL configured, using no-op adapter" 344 ); 345 create_noop_queue::<HandleResolutionWork>() 346 } 347 } 348 "sqlite" => { 349 // Use SQLite adapter 350 if let Some(url) = config.sqlite_url.as_ref() { 351 if let Some(pool) = try_create_sqlite_pool(url, "queue adapter").await { 352 if config.queue_sqlite_max_size > 0 { 353 tracing::info!( 354 "Creating SQLite queue adapter with work shedding (max_size: {})", 355 config.queue_sqlite_max_size 356 ); 357 create_sqlite_queue_with_max_size::<HandleResolutionWork>( 358 pool, 359 config.queue_sqlite_max_size, 360 ) 361 } else { 362 tracing::info!("Creating SQLite queue adapter (unlimited size)"); 363 create_sqlite_queue::<HandleResolutionWork>(pool) 364 } 365 } else { 366 tracing::warn!( 367 "Failed to create SQLite pool for queue, falling back to MPSC queue adapter" 368 ); 369 // Fall back to MPSC if SQLite fails 370 let (handle_sender, handle_receiver) = 371 tokio::sync::mpsc::channel::<HandleResolutionWork>( 372 config.queue_buffer_size, 373 ); 374 create_mpsc_queue_from_channel(handle_sender, handle_receiver) 375 } 376 } else { 377 tracing::warn!( 378 "SQLite queue adapter requested but no SQLite URL configured, using no-op adapter" 379 ); 380 create_noop_queue::<HandleResolutionWork>() 381 } 382 } 383 "mpsc" => { 384 // Use MPSC adapter 385 tracing::info!( 386 "Using MPSC queue adapter with buffer size: {}", 387 config.queue_buffer_size 388 ); 389 let (handle_sender, handle_receiver) = 390 tokio::sync::mpsc::channel::<HandleResolutionWork>(config.queue_buffer_size); 391 create_mpsc_queue_from_channel(handle_sender, handle_receiver) 392 } 393 "noop" | "none" => { 394 // Use no-op adapter 395 tracing::info!("Using no-op queue adapter (queuing disabled)"); 396 create_noop_queue::<HandleResolutionWork>() 397 } 398 _ => { 399 // Default to no-op adapter for unknown types 400 tracing::warn!( 401 "Unknown queue adapter type '{}', using no-op adapter", 402 config.queue_adapter 403 ); 404 create_noop_queue::<HandleResolutionWork>() 405 } 406 }; 407 408 adapter 409 }; 410 411 // Create handle resolver with cache priority: Redis > SQLite > In-memory 412 let (mut handle_resolver, cache_ttl): ( 413 Arc<dyn quickdid::handle_resolver::HandleResolver>, 414 u64, 415 ) = if let Some(pool) = redis_pool { 416 tracing::info!( 417 "Using Redis-backed handle resolver with {}-second cache TTL", 418 config.cache_ttl_redis 419 ); 420 ( 421 create_redis_resolver_with_ttl( 422 base_handle_resolver, 423 pool, 424 config.cache_ttl_redis, 425 metrics_publisher.clone(), 426 ), 427 config.cache_ttl_redis, 428 ) 429 } else if let Some(pool) = sqlite_pool { 430 tracing::info!( 431 "Using SQLite-backed handle resolver with {}-second cache TTL", 432 config.cache_ttl_sqlite 433 ); 434 ( 435 create_sqlite_resolver_with_ttl( 436 base_handle_resolver, 437 pool, 438 config.cache_ttl_sqlite, 439 metrics_publisher.clone(), 440 ), 441 config.cache_ttl_sqlite, 442 ) 443 } else { 444 tracing::info!( 445 "Using in-memory handle resolver with {}-second cache TTL", 446 config.cache_ttl_memory 447 ); 448 ( 449 create_caching_resolver( 450 base_handle_resolver, 451 config.cache_ttl_memory, 452 metrics_publisher.clone(), 453 ), 454 config.cache_ttl_memory, 455 ) 456 }; 457 458 // Apply proactive refresh if enabled 459 if config.proactive_refresh_enabled && !matches!(config.queue_adapter.as_str(), "noop" | "none") 460 { 461 tracing::info!( 462 "Enabling proactive cache refresh with {}% threshold", 463 (config.proactive_refresh_threshold * 100.0) as u32 464 ); 465 handle_resolver = create_proactive_refresh_resolver_with_metrics( 466 handle_resolver, 467 handle_queue.clone(), 468 metrics_publisher.clone(), 469 cache_ttl, 470 config.proactive_refresh_threshold, 471 ); 472 } else if config.proactive_refresh_enabled { 473 tracing::warn!( 474 "Proactive refresh enabled but queue adapter is no-op, skipping proactive refresh" 475 ); 476 } 477 478 // Setup background handle resolution task 479 { 480 let adapter_for_task = handle_queue.clone(); 481 482 // Only spawn handle resolver task if not using noop adapter 483 if !matches!(config.queue_adapter.as_str(), "noop" | "none") { 484 // Create handle resolver task configuration 485 let handle_task_config = HandleResolverTaskConfig { 486 default_timeout_ms: 10000, 487 }; 488 489 // Create and start handle resolver task 490 let handle_task = create_handle_resolver_task_with_config( 491 adapter_for_task, 492 handle_resolver.clone(), 493 token.clone(), 494 handle_task_config, 495 metrics_publisher.clone(), 496 ); 497 498 // Spawn the handle resolver task 499 spawn_cancellable_task( 500 &tracker, 501 token.clone(), 502 "handle_resolver", 503 |cancel_token| async move { 504 tokio::select! { 505 result = handle_task.run() => { 506 if let Err(e) = result { 507 tracing::error!(error = ?e, "Handle resolver task failed"); 508 Err(anyhow::anyhow!(e)) 509 } else { 510 Ok(()) 511 } 512 } 513 _ = cancel_token.cancelled() => { 514 tracing::info!("Handle resolver task cancelled"); 515 Ok(()) 516 } 517 } 518 }, 519 ); 520 521 tracing::info!( 522 "Background handle resolution task started with {} adapter", 523 config.queue_adapter 524 ); 525 } else { 526 tracing::info!("Background handle resolution task disabled (using no-op adapter)"); 527 } 528 }; 529 530 // Create app context with the queue adapter 531 let app_context = AppContext::new( 532 handle_resolver.clone(), 533 handle_queue, 534 metrics_publisher.clone(), 535 config.etag_seed.clone(), 536 config.cache_control_header.clone(), 537 config.static_files_dir.clone(), 538 ); 539 540 // Create router 541 let router = create_router(app_context); 542 543 // Setup signal handler 544 { 545 let signal_tracker = tracker.clone(); 546 let signal_token = token.clone(); 547 548 // Spawn signal handler without using the managed task helper since it's special 549 tracing::info!("Starting signal handler task"); 550 tokio::spawn(async move { 551 let ctrl_c = async { 552 signal::ctrl_c() 553 .await 554 .expect("failed to install Ctrl+C handler"); 555 }; 556 557 #[cfg(unix)] 558 let terminate = async { 559 signal::unix::signal(signal::unix::SignalKind::terminate()) 560 .expect("failed to install signal handler") 561 .recv() 562 .await; 563 }; 564 565 #[cfg(not(unix))] 566 let terminate = std::future::pending::<()>(); 567 568 tokio::select! { 569 () = signal_token.cancelled() => { 570 tracing::info!("Signal handler task shutting down gracefully"); 571 }, 572 _ = terminate => { 573 tracing::info!("Received SIGTERM signal, initiating shutdown"); 574 }, 575 _ = ctrl_c => { 576 tracing::info!("Received Ctrl+C signal, initiating shutdown"); 577 }, 578 } 579 580 signal_tracker.close(); 581 signal_token.cancel(); 582 tracing::info!("Signal handler task completed"); 583 }); 584 } 585 586 // Start Jetstream consumer if enabled 587 if config.jetstream_enabled { 588 let jetstream_resolver = handle_resolver.clone(); 589 let jetstream_metrics = metrics_publisher.clone(); 590 let jetstream_hostname = config.jetstream_hostname.clone(); 591 let jetstream_user_agent = config.user_agent.clone(); 592 593 spawn_cancellable_task( 594 &tracker, 595 token.clone(), 596 "jetstream_consumer", 597 move |cancel_token| async move { 598 tracing::info!(hostname = %jetstream_hostname, "Starting Jetstream consumer"); 599 600 // Create event handler 601 let event_handler = Arc::new(QuickDidEventHandler::new( 602 jetstream_resolver, 603 jetstream_metrics.clone(), 604 )); 605 606 // Reconnection loop 607 let mut reconnect_count = 0u32; 608 let max_reconnects_per_minute = 5; 609 let reconnect_window = std::time::Duration::from_secs(60); 610 let mut last_disconnect = std::time::Instant::now() - reconnect_window; 611 612 while !cancel_token.is_cancelled() { 613 let now = std::time::Instant::now(); 614 if now.duration_since(last_disconnect) < reconnect_window { 615 reconnect_count += 1; 616 if reconnect_count > max_reconnects_per_minute { 617 tracing::warn!( 618 count = reconnect_count, 619 "Too many Jetstream reconnects, waiting 60 seconds" 620 ); 621 tokio::time::sleep(reconnect_window).await; 622 reconnect_count = 0; 623 last_disconnect = now; 624 continue; 625 } 626 } else { 627 reconnect_count = 0; 628 } 629 630 // Create consumer configuration 631 let consumer_config = ConsumerTaskConfig { 632 user_agent: jetstream_user_agent.clone(), 633 compression: false, 634 zstd_dictionary_location: String::new(), 635 jetstream_hostname: jetstream_hostname.clone(), 636 // Listen to the "community.lexicon.collection.fake" collection 637 // so that we keep an active connection open but only for 638 // account and identity events. 639 collections: vec!["community.lexicon.collection.fake".to_string()], // Listen to all collections 640 dids: vec![], 641 max_message_size_bytes: None, 642 cursor: None, 643 require_hello: true, 644 }; 645 646 let consumer = JetstreamConsumer::new(consumer_config); 647 648 // Register event handler 649 if let Err(e) = consumer.register_handler(event_handler.clone()).await { 650 tracing::error!(error = ?e, "Failed to register Jetstream event handler"); 651 continue; 652 } 653 654 // Run consumer with cancellation support 655 match consumer.run_background(cancel_token.clone()).await { 656 Ok(()) => { 657 tracing::info!("Jetstream consumer stopped normally"); 658 if cancel_token.is_cancelled() { 659 break; 660 } 661 last_disconnect = std::time::Instant::now(); 662 tokio::time::sleep(std::time::Duration::from_secs(5)).await; 663 } 664 Err(e) => { 665 tracing::error!(error = ?e, "Jetstream consumer connection failed, will reconnect"); 666 jetstream_metrics.incr("jetstream.connection.error").await; 667 last_disconnect = std::time::Instant::now(); 668 669 if !cancel_token.is_cancelled() { 670 tokio::time::sleep(std::time::Duration::from_secs(5)).await; 671 } 672 } 673 } 674 } 675 676 tracing::info!("Jetstream consumer task shutting down"); 677 Ok(()) 678 }, 679 ); 680 } else { 681 tracing::info!("Jetstream consumer disabled"); 682 } 683 684 // Start HTTP server with cancellation support 685 let bind_address = format!("0.0.0.0:{}", config.http_port); 686 spawn_cancellable_task( 687 &tracker, 688 token.clone(), 689 "http", 690 move |cancel_token| async move { 691 let listener = tokio::net::TcpListener::bind(&bind_address) 692 .await 693 .map_err(|e| anyhow::anyhow!("Failed to bind to {}: {}", bind_address, e))?; 694 695 tracing::info!("QuickDID service listening on {}", bind_address); 696 697 let shutdown_token = cancel_token.clone(); 698 axum::serve(listener, router) 699 .with_graceful_shutdown(async move { 700 shutdown_token.cancelled().await; 701 }) 702 .await 703 .map_err(|e| anyhow::anyhow!("HTTP server error: {}", e))?; 704 705 Ok(()) 706 }, 707 ); 708 709 // Wait for all tasks to complete 710 tracing::info!("Waiting for all tasks to complete..."); 711 tracker.wait().await; 712 713 tracing::info!("All tasks completed, application shutting down"); 714 715 Ok(()) 716}