this repo has no description
1use anyhow::Result;
2use sqlx::{sqlite::SqliteConnectOptions, SqlitePool};
3use std::collections::HashMap;
4use std::collections::HashSet;
5use std::str::FromStr;
6use std::env;
7use supercell::cache::Cache;
8use supercell::cache::CacheTask;
9use supercell::cleanup::CleanTask;
10use supercell::vmc::VerificationMethodCacheTask;
11use tokio::net::TcpListener;
12use tokio::signal;
13use tokio_util::{sync::CancellationToken, task::TaskTracker};
14use tracing_subscriber::prelude::*;
15
16use supercell::consumer::ConsumerTask;
17use supercell::consumer::ConsumerTaskConfig;
18use supercell::http::context::WebContext;
19use supercell::http::server::build_router;
20
21#[tokio::main]
22async fn main() -> Result<()> {
23 tracing_subscriber::registry()
24 .with(tracing_subscriber::EnvFilter::new(
25 std::env::var("RUST_LOG").unwrap_or_else(|_| "supercell=debug,info".into()),
26 ))
27 .with(tracing_subscriber::fmt::layer().pretty())
28 .init();
29
30 let version = supercell::config::version()?;
31
32 env::args().for_each(|arg| {
33 if arg == "--version" {
34 println!("{}", version);
35 std::process::exit(0);
36 }
37 });
38
39 let config = supercell::config::Config::new()?;
40
41 let mut client_builder = reqwest::Client::builder();
42 for ca_certificate in config.certificate_bundles.as_ref() {
43 tracing::info!("Loading CA certificate: {:?}", ca_certificate);
44 let cert = std::fs::read(ca_certificate)?;
45 let cert = reqwest::Certificate::from_pem(&cert)?;
46 client_builder = client_builder.add_root_certificate(cert);
47 }
48
49 client_builder = client_builder.user_agent(config.user_agent.clone());
50 let http_client = client_builder.build()?;
51
52 let connect_options = SqliteConnectOptions::from_str(&config.database_url)?
53 .journal_mode(sqlx::sqlite::SqliteJournalMode::Wal)
54 .create_if_missing(true)
55 .synchronous(sqlx::sqlite::SqliteSynchronous::Normal);
56
57 let pool = SqlitePool::connect_with(connect_options).await?;
58 sqlx::migrate!().run(&pool).await?;
59
60 let feeds: HashMap<String, (Option<String>, HashSet<String>)> = config
61 .feeds
62 .feeds
63 .iter()
64 .map(|feed| (feed.uri.clone(), (feed.deny.clone(), feed.allow.clone())))
65 .collect();
66
67 let all_dids = feeds
68 .iter()
69 .flat_map(|(_, (_, allow))| allow.iter().cloned())
70 .collect::<HashSet<String>>();
71
72 let cache = Cache::new(20);
73
74 let web_context = WebContext::new(
75 pool.clone(),
76 config.external_base.as_str(),
77 feeds,
78 cache.clone(),
79 );
80
81 let app = build_router(web_context.clone());
82
83 let tracker = TaskTracker::new();
84 let token = CancellationToken::new();
85
86 {
87 let tracker = tracker.clone();
88 let inner_token = token.clone();
89
90 let ctrl_c = async {
91 signal::ctrl_c()
92 .await
93 .expect("failed to install Ctrl+C handler");
94 };
95
96 let terminate = async {
97 signal::unix::signal(signal::unix::SignalKind::terminate())
98 .expect("failed to install signal handler")
99 .recv()
100 .await;
101 };
102
103 tokio::spawn(async move {
104 tokio::select! {
105 () = inner_token.cancelled() => { },
106 _ = terminate => {},
107 _ = ctrl_c => {},
108 }
109
110 tracker.close();
111 inner_token.cancel();
112 });
113 }
114
115 {
116 let inner_config = config.clone();
117 let task_enable = *inner_config.consumer_task_enable.as_ref();
118 if task_enable {
119 let consumer_task_config = ConsumerTaskConfig {
120 user_agent: inner_config.user_agent.clone(),
121 compression: *inner_config.compression.as_ref(),
122 zstd_dictionary_location: inner_config.zstd_dictionary.clone(),
123 jetstream_hostname: inner_config.jetstream_hostname.clone(),
124 feeds: inner_config.feeds.clone(),
125 collections: inner_config.collections.as_ref().clone(),
126 };
127 let task = ConsumerTask::new(pool.clone(), consumer_task_config, token.clone())?;
128 let inner_token = token.clone();
129 tracker.spawn(async move {
130 if let Err(err) = task.run_background().await {
131 tracing::warn!(error = ?err, "consumer task error");
132 }
133 inner_token.cancel();
134 });
135 }
136 }
137
138 {
139 let inner_config = config.clone();
140 let task_enable = *inner_config.vmc_task_enable.as_ref();
141 if task_enable {
142 let task = VerificationMethodCacheTask::new(
143 pool.clone(),
144 http_client,
145 inner_config.plc_hostname.clone(),
146 all_dids,
147 token.clone(),
148 );
149 task.main().await?;
150 let inner_token = token.clone();
151 tracker.spawn(async move {
152 if let Err(err) = task.run_background(chrono::Duration::hours(4)).await {
153 tracing::warn!(error = ?err, "consumer task error");
154 }
155 inner_token.cancel();
156 });
157 }
158 }
159
160 {
161 let inner_config = config.clone();
162 let task_enable = *inner_config.cache_task_enable.as_ref();
163 if task_enable {
164 let task = CacheTask::new(
165 pool.clone(),
166 cache.clone(),
167 inner_config.clone(),
168 token.clone(),
169 );
170 task.main().await?;
171 let inner_token = token.clone();
172 let interval = *inner_config.cache_task_interval.as_ref();
173 tracker.spawn(async move {
174 if let Err(err) = task.run_background(interval).await {
175 tracing::warn!(error = ?err, "cache task error");
176 }
177 inner_token.cancel();
178 });
179 }
180 }
181
182 {
183 let inner_config = config.clone();
184 let task_enable = *inner_config.cleanup_task_enable.as_ref();
185 let max_age = *inner_config.cleanup_task_max_age.as_ref();
186 if task_enable {
187 let task = CleanTask::new(pool.clone(), max_age, token.clone());
188 task.main().await?;
189 let inner_token = token.clone();
190 let interval = *inner_config.cleanup_task_interval.as_ref();
191 tracker.spawn(async move {
192 if let Err(err) = task.run_background(interval).await {
193 tracing::warn!(error = ?err, "cleanup task error");
194 }
195 inner_token.cancel();
196 });
197 }
198 }
199
200 {
201 let inner_config = config.clone();
202 let http_port = *inner_config.http_port.as_ref();
203 let inner_token = token.clone();
204 tracker.spawn(async move {
205 let listener = TcpListener::bind(&format!("0.0.0.0:{}", http_port))
206 .await
207 .unwrap();
208
209 let shutdown_token = inner_token.clone();
210 let result = axum::serve(listener, app)
211 .with_graceful_shutdown(async move {
212 tokio::select! {
213 () = shutdown_token.cancelled() => { }
214 }
215 tracing::info!("axum graceful shutdown complete");
216 })
217 .await;
218 if let Err(err) = result {
219 tracing::error!("axum task failed: {}", err);
220 }
221
222 inner_token.cancel();
223 });
224 }
225
226 tracker.wait().await;
227
228 tracing::info!("closing database connection pool");
229 pool.close().await;
230
231 tracing::info!("shutdown complete");
232
233 Ok(())
234}