this repo has no description
at main 7.7 kB view raw
1use anyhow::Result; 2use sqlx::{sqlite::SqliteConnectOptions, SqlitePool}; 3use std::collections::HashMap; 4use std::collections::HashSet; 5use std::str::FromStr; 6use std::env; 7use supercell::cache::Cache; 8use supercell::cache::CacheTask; 9use supercell::cleanup::CleanTask; 10use supercell::vmc::VerificationMethodCacheTask; 11use tokio::net::TcpListener; 12use tokio::signal; 13use tokio_util::{sync::CancellationToken, task::TaskTracker}; 14use tracing_subscriber::prelude::*; 15 16use supercell::consumer::ConsumerTask; 17use supercell::consumer::ConsumerTaskConfig; 18use supercell::http::context::WebContext; 19use supercell::http::server::build_router; 20 21#[tokio::main] 22async fn main() -> Result<()> { 23 tracing_subscriber::registry() 24 .with(tracing_subscriber::EnvFilter::new( 25 std::env::var("RUST_LOG").unwrap_or_else(|_| "supercell=debug,info".into()), 26 )) 27 .with(tracing_subscriber::fmt::layer().pretty()) 28 .init(); 29 30 let version = supercell::config::version()?; 31 32 env::args().for_each(|arg| { 33 if arg == "--version" { 34 println!("{}", version); 35 std::process::exit(0); 36 } 37 }); 38 39 let config = supercell::config::Config::new()?; 40 41 let mut client_builder = reqwest::Client::builder(); 42 for ca_certificate in config.certificate_bundles.as_ref() { 43 tracing::info!("Loading CA certificate: {:?}", ca_certificate); 44 let cert = std::fs::read(ca_certificate)?; 45 let cert = reqwest::Certificate::from_pem(&cert)?; 46 client_builder = client_builder.add_root_certificate(cert); 47 } 48 49 client_builder = client_builder.user_agent(config.user_agent.clone()); 50 let http_client = client_builder.build()?; 51 52 let connect_options = SqliteConnectOptions::from_str(&config.database_url)? 53 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal) 54 .create_if_missing(true) 55 .synchronous(sqlx::sqlite::SqliteSynchronous::Normal); 56 57 let pool = SqlitePool::connect_with(connect_options).await?; 58 sqlx::migrate!().run(&pool).await?; 59 60 let feeds: HashMap<String, (Option<String>, HashSet<String>)> = config 61 .feeds 62 .feeds 63 .iter() 64 .map(|feed| (feed.uri.clone(), (feed.deny.clone(), feed.allow.clone()))) 65 .collect(); 66 67 let all_dids = feeds 68 .iter() 69 .flat_map(|(_, (_, allow))| allow.iter().cloned()) 70 .collect::<HashSet<String>>(); 71 72 let cache = Cache::new(20); 73 74 let web_context = WebContext::new( 75 pool.clone(), 76 config.external_base.as_str(), 77 feeds, 78 cache.clone(), 79 ); 80 81 let app = build_router(web_context.clone()); 82 83 let tracker = TaskTracker::new(); 84 let token = CancellationToken::new(); 85 86 { 87 let tracker = tracker.clone(); 88 let inner_token = token.clone(); 89 90 let ctrl_c = async { 91 signal::ctrl_c() 92 .await 93 .expect("failed to install Ctrl+C handler"); 94 }; 95 96 let terminate = async { 97 signal::unix::signal(signal::unix::SignalKind::terminate()) 98 .expect("failed to install signal handler") 99 .recv() 100 .await; 101 }; 102 103 tokio::spawn(async move { 104 tokio::select! { 105 () = inner_token.cancelled() => { }, 106 _ = terminate => {}, 107 _ = ctrl_c => {}, 108 } 109 110 tracker.close(); 111 inner_token.cancel(); 112 }); 113 } 114 115 { 116 let inner_config = config.clone(); 117 let task_enable = *inner_config.consumer_task_enable.as_ref(); 118 if task_enable { 119 let consumer_task_config = ConsumerTaskConfig { 120 user_agent: inner_config.user_agent.clone(), 121 compression: *inner_config.compression.as_ref(), 122 zstd_dictionary_location: inner_config.zstd_dictionary.clone(), 123 jetstream_hostname: inner_config.jetstream_hostname.clone(), 124 feeds: inner_config.feeds.clone(), 125 collections: inner_config.collections.as_ref().clone(), 126 }; 127 let task = ConsumerTask::new(pool.clone(), consumer_task_config, token.clone())?; 128 let inner_token = token.clone(); 129 tracker.spawn(async move { 130 if let Err(err) = task.run_background().await { 131 tracing::warn!(error = ?err, "consumer task error"); 132 } 133 inner_token.cancel(); 134 }); 135 } 136 } 137 138 { 139 let inner_config = config.clone(); 140 let task_enable = *inner_config.vmc_task_enable.as_ref(); 141 if task_enable { 142 let task = VerificationMethodCacheTask::new( 143 pool.clone(), 144 http_client, 145 inner_config.plc_hostname.clone(), 146 all_dids, 147 token.clone(), 148 ); 149 task.main().await?; 150 let inner_token = token.clone(); 151 tracker.spawn(async move { 152 if let Err(err) = task.run_background(chrono::Duration::hours(4)).await { 153 tracing::warn!(error = ?err, "consumer task error"); 154 } 155 inner_token.cancel(); 156 }); 157 } 158 } 159 160 { 161 let inner_config = config.clone(); 162 let task_enable = *inner_config.cache_task_enable.as_ref(); 163 if task_enable { 164 let task = CacheTask::new( 165 pool.clone(), 166 cache.clone(), 167 inner_config.clone(), 168 token.clone(), 169 ); 170 task.main().await?; 171 let inner_token = token.clone(); 172 let interval = *inner_config.cache_task_interval.as_ref(); 173 tracker.spawn(async move { 174 if let Err(err) = task.run_background(interval).await { 175 tracing::warn!(error = ?err, "cache task error"); 176 } 177 inner_token.cancel(); 178 }); 179 } 180 } 181 182 { 183 let inner_config = config.clone(); 184 let task_enable = *inner_config.cleanup_task_enable.as_ref(); 185 let max_age = *inner_config.cleanup_task_max_age.as_ref(); 186 if task_enable { 187 let task = CleanTask::new(pool.clone(), max_age, token.clone()); 188 task.main().await?; 189 let inner_token = token.clone(); 190 let interval = *inner_config.cleanup_task_interval.as_ref(); 191 tracker.spawn(async move { 192 if let Err(err) = task.run_background(interval).await { 193 tracing::warn!(error = ?err, "cleanup task error"); 194 } 195 inner_token.cancel(); 196 }); 197 } 198 } 199 200 { 201 let inner_config = config.clone(); 202 let http_port = *inner_config.http_port.as_ref(); 203 let inner_token = token.clone(); 204 tracker.spawn(async move { 205 let listener = TcpListener::bind(&format!("0.0.0.0:{}", http_port)) 206 .await 207 .unwrap(); 208 209 let shutdown_token = inner_token.clone(); 210 let result = axum::serve(listener, app) 211 .with_graceful_shutdown(async move { 212 tokio::select! { 213 () = shutdown_token.cancelled() => { } 214 } 215 tracing::info!("axum graceful shutdown complete"); 216 }) 217 .await; 218 if let Err(err) = result { 219 tracing::error!("axum task failed: {}", err); 220 } 221 222 inner_token.cancel(); 223 }); 224 } 225 226 tracker.wait().await; 227 228 tracing::info!("closing database connection pool"); 229 pool.close().await; 230 231 tracing::info!("shutdown complete"); 232 233 Ok(()) 234}