forked from
smokesignal.events/quickdid
QuickDID is a high-performance AT Protocol identity resolution service written in Rust. It provides handle-to-DID resolution with Redis-backed caching and queue processing.
1use anyhow::Result;
2use atproto_identity::{
3 config::{CertificateBundles, DnsNameservers},
4 resolve::HickoryDnsResolver,
5};
6use atproto_jetstream::{Consumer as JetstreamConsumer, ConsumerTaskConfig};
7use quickdid::{
8 cache::create_redis_pool,
9 config::Config,
10 handle_resolver::{
11 create_base_resolver, create_caching_resolver,
12 create_proactive_refresh_resolver_with_metrics, create_rate_limited_resolver_with_timeout,
13 create_redis_resolver_with_ttl, create_sqlite_resolver_with_ttl,
14 },
15 handle_resolver_task::{HandleResolverTaskConfig, create_handle_resolver_task_with_config},
16 http::{AppContext, create_router},
17 jetstream_handler::QuickDidEventHandler,
18 metrics::create_metrics_publisher,
19 queue::{
20 HandleResolutionWork, QueueAdapter, create_mpsc_queue_from_channel, create_noop_queue,
21 create_redis_queue, create_redis_queue_with_dedup, create_sqlite_queue,
22 create_sqlite_queue_with_max_size,
23 },
24 sqlite_schema::create_sqlite_pool,
25 task_manager::spawn_cancellable_task,
26};
27use std::sync::Arc;
28use tokio::signal;
29use tokio_util::{sync::CancellationToken, task::TaskTracker};
30use tracing_subscriber::{layer::SubscriberExt, util::SubscriberInitExt};
31
32/// Helper function to create a Redis pool with consistent error handling
33fn try_create_redis_pool(redis_url: &str, purpose: &str) -> Option<deadpool_redis::Pool> {
34 match create_redis_pool(redis_url) {
35 Ok(pool) => {
36 tracing::info!("Redis pool created for {}", purpose);
37 Some(pool)
38 }
39 Err(e) => {
40 tracing::warn!("Failed to create Redis pool for {}: {}", purpose, e);
41 None
42 }
43 }
44}
45
46/// Helper function to create a SQLite pool with consistent error handling
47async fn try_create_sqlite_pool(sqlite_url: &str, purpose: &str) -> Option<sqlx::SqlitePool> {
48 match create_sqlite_pool(sqlite_url).await {
49 Ok(pool) => {
50 tracing::info!("SQLite pool created for {}", purpose);
51 Some(pool)
52 }
53 Err(e) => {
54 tracing::warn!("Failed to create SQLite pool for {}: {}", purpose, e);
55 None
56 }
57 }
58}
59
60/// Simple command-line argument handling for --version and --help
61fn handle_simple_args() -> bool {
62 let args: Vec<String> = std::env::args().collect();
63
64 if args.len() > 1 {
65 match args[1].as_str() {
66 "--version" | "-V" => {
67 println!("quickdid {}", env!("CARGO_PKG_VERSION"));
68 return true;
69 }
70 "--help" | "-h" => {
71 println!("QuickDID - AT Protocol Identity Resolver Service");
72 println!("Version: {}", env!("CARGO_PKG_VERSION"));
73 println!();
74 println!("USAGE:");
75 println!(" quickdid [OPTIONS]");
76 println!();
77 println!("OPTIONS:");
78 println!(" -h, --help Print help information");
79 println!(" -V, --version Print version information");
80 println!();
81 println!("ENVIRONMENT VARIABLES:");
82 println!(
83 " HTTP_EXTERNAL External hostname for service endpoints (required)"
84 );
85 println!(" HTTP_PORT HTTP server port (default: 8080)");
86 println!(" PLC_HOSTNAME PLC directory hostname (default: plc.directory)");
87 println!(
88 " USER_AGENT HTTP User-Agent header (auto-generated with version)"
89 );
90 println!(" DNS_NAMESERVERS Custom DNS nameservers (comma-separated IPs)");
91 println!(
92 " CERTIFICATE_BUNDLES Additional CA certificates (comma-separated paths)"
93 );
94 println!();
95 println!(" CACHING:");
96 println!(" REDIS_URL Redis URL for handle resolution caching");
97 println!(
98 " SQLITE_URL SQLite database URL for handle resolution caching"
99 );
100 println!(
101 " CACHE_TTL_MEMORY TTL for in-memory cache in seconds (default: 600)"
102 );
103 println!(
104 " CACHE_TTL_REDIS TTL for Redis cache in seconds (default: 7776000)"
105 );
106 println!(
107 " CACHE_TTL_SQLITE TTL for SQLite cache in seconds (default: 7776000)"
108 );
109 println!();
110 println!(" QUEUE CONFIGURATION:");
111 println!(
112 " QUEUE_ADAPTER Queue adapter: 'mpsc', 'redis', 'sqlite', 'noop' (default: mpsc)"
113 );
114 println!(" QUEUE_REDIS_URL Redis URL for queue adapter");
115 println!(
116 " QUEUE_REDIS_PREFIX Redis key prefix for queues (default: queue:handleresolver:)"
117 );
118 println!(" QUEUE_REDIS_TIMEOUT Queue blocking timeout in seconds (default: 5)");
119 println!(
120 " QUEUE_REDIS_DEDUP_ENABLED Enable queue deduplication (default: false)"
121 );
122 println!(" QUEUE_REDIS_DEDUP_TTL TTL for dedup keys in seconds (default: 60)");
123 println!(" QUEUE_WORKER_ID Worker ID for Redis queue (default: worker1)");
124 println!(" QUEUE_BUFFER_SIZE Buffer size for MPSC queue (default: 1000)");
125 println!(" QUEUE_SQLITE_MAX_SIZE Maximum SQLite queue size (default: 10000)");
126 println!();
127 println!(" RATE LIMITING:");
128 println!(
129 " RESOLVER_MAX_CONCURRENT Maximum concurrent resolutions (default: 0 = disabled)"
130 );
131 println!(
132 " RESOLVER_MAX_CONCURRENT_TIMEOUT_MS Timeout for acquiring permits in ms (default: 0 = no timeout)"
133 );
134 println!();
135 println!(" METRICS:");
136 println!(
137 " METRICS_ADAPTER Metrics adapter: 'noop' or 'statsd' (default: noop)"
138 );
139 println!(
140 " METRICS_STATSD_HOST StatsD host when using statsd adapter (e.g., localhost:8125)"
141 );
142 println!(
143 " METRICS_STATSD_BIND Bind address for StatsD UDP socket (default: [::]:0)"
144 );
145 println!(" METRICS_PREFIX Prefix for all metrics (default: quickdid)");
146 println!(
147 " METRICS_TAGS Default tags for metrics (comma-separated key:value pairs)"
148 );
149 println!();
150 println!(" PROACTIVE CACHE REFRESH:");
151 println!(
152 " PROACTIVE_REFRESH_ENABLED Enable proactive cache refresh (default: false)"
153 );
154 println!(
155 " PROACTIVE_REFRESH_THRESHOLD Threshold as percentage of TTL (0.0-1.0, default: 0.8)"
156 );
157 println!();
158 println!(" JETSTREAM:");
159 println!(" JETSTREAM_ENABLED Enable Jetstream consumer (default: false)");
160 println!(
161 " JETSTREAM_HOSTNAME Jetstream hostname (default: jetstream.atproto.tools)"
162 );
163 println!();
164 println!(
165 "For more information, visit: https://github.com/smokesignal.events/quickdid"
166 );
167 return true;
168 }
169 _ => {}
170 }
171 }
172
173 false
174}
175
176#[tokio::main]
177async fn main() -> Result<()> {
178 // Handle --version and --help
179 if handle_simple_args() {
180 return Ok(());
181 }
182
183 // Initialize tracing
184 tracing_subscriber::registry()
185 .with(
186 tracing_subscriber::EnvFilter::try_from_default_env().unwrap_or_else(|_| {
187 "quickdid=info,atproto_identity=debug,atproto_xrpcs=debug".into()
188 }),
189 )
190 .with(tracing_subscriber::fmt::layer())
191 .init();
192
193 let config = Config::from_env()?;
194
195 // Validate configuration
196 config.validate()?;
197
198 tracing::info!("Starting QuickDID service on port {}", config.http_port);
199 tracing::info!(
200 "Cache TTL - Memory: {}s, Redis: {}s, SQLite: {}s",
201 config.cache_ttl_memory,
202 config.cache_ttl_redis,
203 config.cache_ttl_sqlite
204 );
205
206 // Parse certificate bundles if provided
207 let certificate_bundles: CertificateBundles = config
208 .certificate_bundles
209 .clone()
210 .unwrap_or_default()
211 .try_into()?;
212
213 // Parse DNS nameservers if provided
214 let dns_nameservers: DnsNameservers = config
215 .dns_nameservers
216 .clone()
217 .unwrap_or_default()
218 .try_into()?;
219
220 // Build HTTP client
221 let mut client_builder = reqwest::Client::builder();
222 for ca_certificate in certificate_bundles.as_ref() {
223 let cert = std::fs::read(ca_certificate)?;
224 let cert = reqwest::Certificate::from_pem(&cert)?;
225 client_builder = client_builder.add_root_certificate(cert);
226 }
227 client_builder = client_builder.user_agent(&config.user_agent);
228 let http_client = client_builder.build()?;
229
230 // Create DNS resolver
231 let dns_resolver = HickoryDnsResolver::create_resolver(dns_nameservers.as_ref());
232
233 // Create DNS resolver Arc for sharing
234 let dns_resolver_arc = Arc::new(dns_resolver);
235
236 // Create metrics publisher based on configuration
237 let metrics_publisher = create_metrics_publisher(&config).map_err(|e| {
238 tracing::error!("Failed to create metrics publisher: {}", e);
239 anyhow::anyhow!("Failed to create metrics publisher: {}", e)
240 })?;
241
242 tracing::info!(
243 "Metrics publisher created with {} adapter",
244 config.metrics_adapter
245 );
246
247 metrics_publisher.gauge("server", 1).await;
248
249 // Create base handle resolver using factory function
250 let mut base_handle_resolver = create_base_resolver(
251 dns_resolver_arc.clone(),
252 http_client.clone(),
253 metrics_publisher.clone(),
254 );
255
256 // Apply rate limiting if configured
257 if config.resolver_max_concurrent > 0 {
258 let timeout_info = if config.resolver_max_concurrent_timeout_ms > 0 {
259 format!(", {}ms timeout", config.resolver_max_concurrent_timeout_ms)
260 } else {
261 String::new()
262 };
263 tracing::info!(
264 "Applying rate limiting to handle resolver (max {} concurrent resolutions{})",
265 config.resolver_max_concurrent,
266 timeout_info
267 );
268 base_handle_resolver = create_rate_limited_resolver_with_timeout(
269 base_handle_resolver,
270 config.resolver_max_concurrent,
271 config.resolver_max_concurrent_timeout_ms,
272 metrics_publisher.clone(),
273 );
274 }
275
276 // Create Redis pool if configured
277 let redis_pool = config
278 .redis_url
279 .as_ref()
280 .and_then(|url| try_create_redis_pool(url, "handle resolver cache"));
281
282 // Create SQLite pool if configured
283 let sqlite_pool = if let Some(url) = config.sqlite_url.as_ref() {
284 try_create_sqlite_pool(url, "handle resolver cache").await
285 } else {
286 None
287 };
288
289 // Create task tracker and cancellation token
290 let tracker = TaskTracker::new();
291 let token = CancellationToken::new();
292
293 // Create the queue adapter first (needed for proactive refresh)
294 let handle_queue: Arc<dyn QueueAdapter<HandleResolutionWork>> = {
295 // Create queue adapter based on configuration
296 let adapter: Arc<dyn QueueAdapter<HandleResolutionWork>> = match config
297 .queue_adapter
298 .as_str()
299 {
300 "redis" => {
301 // Use queue-specific Redis URL, fall back to general Redis URL
302 let queue_redis_url = config
303 .queue_redis_url
304 .as_ref()
305 .or(config.redis_url.as_ref());
306
307 if let Some(url) = queue_redis_url {
308 if let Some(pool) = try_create_redis_pool(url, "queue adapter") {
309 tracing::info!(
310 "Creating Redis queue adapter with prefix: {}, dedup: {}, dedup_ttl: {}s",
311 config.queue_redis_prefix,
312 config.queue_redis_dedup_enabled,
313 config.queue_redis_dedup_ttl
314 );
315 if config.queue_redis_dedup_enabled {
316 create_redis_queue_with_dedup::<HandleResolutionWork>(
317 pool,
318 config.queue_worker_id.clone(),
319 config.queue_redis_prefix.clone(),
320 config.queue_redis_timeout,
321 true,
322 config.queue_redis_dedup_ttl,
323 )
324 } else {
325 create_redis_queue::<HandleResolutionWork>(
326 pool,
327 config.queue_worker_id.clone(),
328 config.queue_redis_prefix.clone(),
329 config.queue_redis_timeout,
330 )
331 }
332 } else {
333 tracing::warn!("Falling back to MPSC queue adapter");
334 // Fall back to MPSC if Redis fails
335 let (handle_sender, handle_receiver) =
336 tokio::sync::mpsc::channel::<HandleResolutionWork>(
337 config.queue_buffer_size,
338 );
339 create_mpsc_queue_from_channel(handle_sender, handle_receiver)
340 }
341 } else {
342 tracing::warn!(
343 "Redis queue adapter requested but no Redis URL configured, using no-op adapter"
344 );
345 create_noop_queue::<HandleResolutionWork>()
346 }
347 }
348 "sqlite" => {
349 // Use SQLite adapter
350 if let Some(url) = config.sqlite_url.as_ref() {
351 if let Some(pool) = try_create_sqlite_pool(url, "queue adapter").await {
352 if config.queue_sqlite_max_size > 0 {
353 tracing::info!(
354 "Creating SQLite queue adapter with work shedding (max_size: {})",
355 config.queue_sqlite_max_size
356 );
357 create_sqlite_queue_with_max_size::<HandleResolutionWork>(
358 pool,
359 config.queue_sqlite_max_size,
360 )
361 } else {
362 tracing::info!("Creating SQLite queue adapter (unlimited size)");
363 create_sqlite_queue::<HandleResolutionWork>(pool)
364 }
365 } else {
366 tracing::warn!(
367 "Failed to create SQLite pool for queue, falling back to MPSC queue adapter"
368 );
369 // Fall back to MPSC if SQLite fails
370 let (handle_sender, handle_receiver) =
371 tokio::sync::mpsc::channel::<HandleResolutionWork>(
372 config.queue_buffer_size,
373 );
374 create_mpsc_queue_from_channel(handle_sender, handle_receiver)
375 }
376 } else {
377 tracing::warn!(
378 "SQLite queue adapter requested but no SQLite URL configured, using no-op adapter"
379 );
380 create_noop_queue::<HandleResolutionWork>()
381 }
382 }
383 "mpsc" => {
384 // Use MPSC adapter
385 tracing::info!(
386 "Using MPSC queue adapter with buffer size: {}",
387 config.queue_buffer_size
388 );
389 let (handle_sender, handle_receiver) =
390 tokio::sync::mpsc::channel::<HandleResolutionWork>(config.queue_buffer_size);
391 create_mpsc_queue_from_channel(handle_sender, handle_receiver)
392 }
393 "noop" | "none" => {
394 // Use no-op adapter
395 tracing::info!("Using no-op queue adapter (queuing disabled)");
396 create_noop_queue::<HandleResolutionWork>()
397 }
398 _ => {
399 // Default to no-op adapter for unknown types
400 tracing::warn!(
401 "Unknown queue adapter type '{}', using no-op adapter",
402 config.queue_adapter
403 );
404 create_noop_queue::<HandleResolutionWork>()
405 }
406 };
407
408 adapter
409 };
410
411 // Create handle resolver with cache priority: Redis > SQLite > In-memory
412 let (mut handle_resolver, cache_ttl): (
413 Arc<dyn quickdid::handle_resolver::HandleResolver>,
414 u64,
415 ) = if let Some(pool) = redis_pool {
416 tracing::info!(
417 "Using Redis-backed handle resolver with {}-second cache TTL",
418 config.cache_ttl_redis
419 );
420 (
421 create_redis_resolver_with_ttl(
422 base_handle_resolver,
423 pool,
424 config.cache_ttl_redis,
425 metrics_publisher.clone(),
426 ),
427 config.cache_ttl_redis,
428 )
429 } else if let Some(pool) = sqlite_pool {
430 tracing::info!(
431 "Using SQLite-backed handle resolver with {}-second cache TTL",
432 config.cache_ttl_sqlite
433 );
434 (
435 create_sqlite_resolver_with_ttl(
436 base_handle_resolver,
437 pool,
438 config.cache_ttl_sqlite,
439 metrics_publisher.clone(),
440 ),
441 config.cache_ttl_sqlite,
442 )
443 } else {
444 tracing::info!(
445 "Using in-memory handle resolver with {}-second cache TTL",
446 config.cache_ttl_memory
447 );
448 (
449 create_caching_resolver(
450 base_handle_resolver,
451 config.cache_ttl_memory,
452 metrics_publisher.clone(),
453 ),
454 config.cache_ttl_memory,
455 )
456 };
457
458 // Apply proactive refresh if enabled
459 if config.proactive_refresh_enabled && !matches!(config.queue_adapter.as_str(), "noop" | "none")
460 {
461 tracing::info!(
462 "Enabling proactive cache refresh with {}% threshold",
463 (config.proactive_refresh_threshold * 100.0) as u32
464 );
465 handle_resolver = create_proactive_refresh_resolver_with_metrics(
466 handle_resolver,
467 handle_queue.clone(),
468 metrics_publisher.clone(),
469 cache_ttl,
470 config.proactive_refresh_threshold,
471 );
472 } else if config.proactive_refresh_enabled {
473 tracing::warn!(
474 "Proactive refresh enabled but queue adapter is no-op, skipping proactive refresh"
475 );
476 }
477
478 // Setup background handle resolution task
479 {
480 let adapter_for_task = handle_queue.clone();
481
482 // Only spawn handle resolver task if not using noop adapter
483 if !matches!(config.queue_adapter.as_str(), "noop" | "none") {
484 // Create handle resolver task configuration
485 let handle_task_config = HandleResolverTaskConfig {
486 default_timeout_ms: 10000,
487 };
488
489 // Create and start handle resolver task
490 let handle_task = create_handle_resolver_task_with_config(
491 adapter_for_task,
492 handle_resolver.clone(),
493 token.clone(),
494 handle_task_config,
495 metrics_publisher.clone(),
496 );
497
498 // Spawn the handle resolver task
499 spawn_cancellable_task(
500 &tracker,
501 token.clone(),
502 "handle_resolver",
503 |cancel_token| async move {
504 tokio::select! {
505 result = handle_task.run() => {
506 if let Err(e) = result {
507 tracing::error!(error = ?e, "Handle resolver task failed");
508 Err(anyhow::anyhow!(e))
509 } else {
510 Ok(())
511 }
512 }
513 _ = cancel_token.cancelled() => {
514 tracing::info!("Handle resolver task cancelled");
515 Ok(())
516 }
517 }
518 },
519 );
520
521 tracing::info!(
522 "Background handle resolution task started with {} adapter",
523 config.queue_adapter
524 );
525 } else {
526 tracing::info!("Background handle resolution task disabled (using no-op adapter)");
527 }
528 };
529
530 // Create app context with the queue adapter
531 let app_context = AppContext::new(
532 handle_resolver.clone(),
533 handle_queue,
534 metrics_publisher.clone(),
535 config.etag_seed.clone(),
536 config.cache_control_header.clone(),
537 config.static_files_dir.clone(),
538 );
539
540 // Create router
541 let router = create_router(app_context);
542
543 // Setup signal handler
544 {
545 let signal_tracker = tracker.clone();
546 let signal_token = token.clone();
547
548 // Spawn signal handler without using the managed task helper since it's special
549 tracing::info!("Starting signal handler task");
550 tokio::spawn(async move {
551 let ctrl_c = async {
552 signal::ctrl_c()
553 .await
554 .expect("failed to install Ctrl+C handler");
555 };
556
557 #[cfg(unix)]
558 let terminate = async {
559 signal::unix::signal(signal::unix::SignalKind::terminate())
560 .expect("failed to install signal handler")
561 .recv()
562 .await;
563 };
564
565 #[cfg(not(unix))]
566 let terminate = std::future::pending::<()>();
567
568 tokio::select! {
569 () = signal_token.cancelled() => {
570 tracing::info!("Signal handler task shutting down gracefully");
571 },
572 _ = terminate => {
573 tracing::info!("Received SIGTERM signal, initiating shutdown");
574 },
575 _ = ctrl_c => {
576 tracing::info!("Received Ctrl+C signal, initiating shutdown");
577 },
578 }
579
580 signal_tracker.close();
581 signal_token.cancel();
582 tracing::info!("Signal handler task completed");
583 });
584 }
585
586 // Start Jetstream consumer if enabled
587 if config.jetstream_enabled {
588 let jetstream_resolver = handle_resolver.clone();
589 let jetstream_metrics = metrics_publisher.clone();
590 let jetstream_hostname = config.jetstream_hostname.clone();
591 let jetstream_user_agent = config.user_agent.clone();
592
593 spawn_cancellable_task(
594 &tracker,
595 token.clone(),
596 "jetstream_consumer",
597 move |cancel_token| async move {
598 tracing::info!(hostname = %jetstream_hostname, "Starting Jetstream consumer");
599
600 // Create event handler
601 let event_handler = Arc::new(QuickDidEventHandler::new(
602 jetstream_resolver,
603 jetstream_metrics.clone(),
604 ));
605
606 // Reconnection loop
607 let mut reconnect_count = 0u32;
608 let max_reconnects_per_minute = 5;
609 let reconnect_window = std::time::Duration::from_secs(60);
610 let mut last_disconnect = std::time::Instant::now() - reconnect_window;
611
612 while !cancel_token.is_cancelled() {
613 let now = std::time::Instant::now();
614 if now.duration_since(last_disconnect) < reconnect_window {
615 reconnect_count += 1;
616 if reconnect_count > max_reconnects_per_minute {
617 tracing::warn!(
618 count = reconnect_count,
619 "Too many Jetstream reconnects, waiting 60 seconds"
620 );
621 tokio::time::sleep(reconnect_window).await;
622 reconnect_count = 0;
623 last_disconnect = now;
624 continue;
625 }
626 } else {
627 reconnect_count = 0;
628 }
629
630 // Create consumer configuration
631 let consumer_config = ConsumerTaskConfig {
632 user_agent: jetstream_user_agent.clone(),
633 compression: false,
634 zstd_dictionary_location: String::new(),
635 jetstream_hostname: jetstream_hostname.clone(),
636 // Listen to the "community.lexicon.collection.fake" collection
637 // so that we keep an active connection open but only for
638 // account and identity events.
639 collections: vec!["community.lexicon.collection.fake".to_string()], // Listen to all collections
640 dids: vec![],
641 max_message_size_bytes: None,
642 cursor: None,
643 require_hello: true,
644 };
645
646 let consumer = JetstreamConsumer::new(consumer_config);
647
648 // Register event handler
649 if let Err(e) = consumer.register_handler(event_handler.clone()).await {
650 tracing::error!(error = ?e, "Failed to register Jetstream event handler");
651 continue;
652 }
653
654 // Run consumer with cancellation support
655 match consumer.run_background(cancel_token.clone()).await {
656 Ok(()) => {
657 tracing::info!("Jetstream consumer stopped normally");
658 if cancel_token.is_cancelled() {
659 break;
660 }
661 last_disconnect = std::time::Instant::now();
662 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
663 }
664 Err(e) => {
665 tracing::error!(error = ?e, "Jetstream consumer connection failed, will reconnect");
666 jetstream_metrics.incr("jetstream.connection.error").await;
667 last_disconnect = std::time::Instant::now();
668
669 if !cancel_token.is_cancelled() {
670 tokio::time::sleep(std::time::Duration::from_secs(5)).await;
671 }
672 }
673 }
674 }
675
676 tracing::info!("Jetstream consumer task shutting down");
677 Ok(())
678 },
679 );
680 } else {
681 tracing::info!("Jetstream consumer disabled");
682 }
683
684 // Start HTTP server with cancellation support
685 let bind_address = format!("0.0.0.0:{}", config.http_port);
686 spawn_cancellable_task(
687 &tracker,
688 token.clone(),
689 "http",
690 move |cancel_token| async move {
691 let listener = tokio::net::TcpListener::bind(&bind_address)
692 .await
693 .map_err(|e| anyhow::anyhow!("Failed to bind to {}: {}", bind_address, e))?;
694
695 tracing::info!("QuickDID service listening on {}", bind_address);
696
697 let shutdown_token = cancel_token.clone();
698 axum::serve(listener, router)
699 .with_graceful_shutdown(async move {
700 shutdown_token.cancelled().await;
701 })
702 .await
703 .map_err(|e| anyhow::anyhow!("HTTP server error: {}", e))?;
704
705 Ok(())
706 },
707 );
708
709 // Wait for all tasks to complete
710 tracing::info!("Waiting for all tasks to complete...");
711 tracker.wait().await;
712
713 tracing::info!("All tasks completed, application shutting down");
714
715 Ok(())
716}