I've been saying "PDSes seem easy enough, they're what, some CRUD to a db? I can do that in my sleep". well i'm sleeping rn so let's go
at main 5.7 kB view raw
1use std::net::SocketAddr; 2use std::process::ExitCode; 3use std::sync::Arc; 4use tokio::sync::watch; 5use tracing::{error, info, warn}; 6use tranquil_pds::comms::{CommsService, DiscordSender, EmailSender, SignalSender, TelegramSender}; 7use tranquil_pds::crawlers::{Crawlers, start_crawlers_service}; 8use tranquil_pds::scheduled::{ 9 backfill_genesis_commit_blocks, backfill_record_blobs, backfill_repo_rev, backfill_user_blocks, 10 start_backup_tasks, start_scheduled_tasks, 11}; 12use tranquil_pds::state::AppState; 13 14#[tokio::main] 15async fn main() -> ExitCode { 16 dotenvy::dotenv().ok(); 17 tracing_subscriber::fmt::init(); 18 tranquil_pds::metrics::init_metrics(); 19 20 match run().await { 21 Ok(()) => ExitCode::SUCCESS, 22 Err(e) => { 23 error!("Fatal error: {}", e); 24 ExitCode::FAILURE 25 } 26 } 27} 28 29async fn run() -> Result<(), Box<dyn std::error::Error>> { 30 let state = AppState::new().await?; 31 tranquil_pds::sync::listener::start_sequencer_listener(state.clone()).await; 32 33 let (shutdown_tx, shutdown_rx) = watch::channel(false); 34 35 let backfill_db = state.db.clone(); 36 let backfill_block_store = state.block_store.clone(); 37 tokio::spawn(async move { 38 tokio::join!( 39 backfill_genesis_commit_blocks(&backfill_db, backfill_block_store.clone()), 40 backfill_repo_rev(&backfill_db, backfill_block_store.clone()), 41 backfill_user_blocks(&backfill_db, backfill_block_store.clone()), 42 backfill_record_blobs(&backfill_db, backfill_block_store), 43 ); 44 }); 45 46 let mut comms_service = CommsService::new(state.db.clone()); 47 48 if let Some(email_sender) = EmailSender::from_env() { 49 info!("Email comms enabled"); 50 comms_service = comms_service.register_sender(email_sender); 51 } else { 52 warn!("Email comms disabled (MAIL_FROM_ADDRESS not set)"); 53 } 54 55 if let Some(discord_sender) = DiscordSender::from_env() { 56 info!("Discord comms enabled"); 57 comms_service = comms_service.register_sender(discord_sender); 58 } 59 60 if let Some(telegram_sender) = TelegramSender::from_env() { 61 info!("Telegram comms enabled"); 62 comms_service = comms_service.register_sender(telegram_sender); 63 } 64 65 if let Some(signal_sender) = SignalSender::from_env() { 66 info!("Signal comms enabled"); 67 comms_service = comms_service.register_sender(signal_sender); 68 } 69 70 let comms_handle = tokio::spawn(comms_service.run(shutdown_rx.clone())); 71 72 let crawlers_handle = if let Some(crawlers) = Crawlers::from_env() { 73 let crawlers = Arc::new( 74 crawlers.with_circuit_breaker(state.circuit_breakers.relay_notification.clone()), 75 ); 76 let firehose_rx = state.firehose_tx.subscribe(); 77 info!("Crawlers notification service enabled"); 78 Some(tokio::spawn(start_crawlers_service( 79 crawlers, 80 firehose_rx, 81 shutdown_rx.clone(), 82 ))) 83 } else { 84 warn!("Crawlers notification service disabled (PDS_HOSTNAME or CRAWLERS not set)"); 85 None 86 }; 87 88 let backup_handle = if let Some(backup_storage) = state.backup_storage.clone() { 89 info!("Backup service enabled"); 90 Some(tokio::spawn(start_backup_tasks( 91 state.db.clone(), 92 state.block_store.clone(), 93 backup_storage, 94 shutdown_rx.clone(), 95 ))) 96 } else { 97 warn!("Backup service disabled (BACKUP_S3_BUCKET not set or BACKUP_ENABLED=false)"); 98 None 99 }; 100 101 let scheduled_handle = tokio::spawn(start_scheduled_tasks( 102 state.db.clone(), 103 state.blob_store.clone(), 104 shutdown_rx, 105 )); 106 107 let app = tranquil_pds::app(state); 108 109 let host = std::env::var("SERVER_HOST").unwrap_or_else(|_| "127.0.0.1".to_string()); 110 let port: u16 = std::env::var("SERVER_PORT") 111 .ok() 112 .and_then(|p| p.parse().ok()) 113 .unwrap_or(3000); 114 115 let addr: SocketAddr = format!("{}:{}", host, port) 116 .parse() 117 .map_err(|e| format!("Invalid SERVER_HOST or SERVER_PORT: {}", e))?; 118 119 info!("listening on {}", addr); 120 121 let listener = tokio::net::TcpListener::bind(addr) 122 .await 123 .map_err(|e| format!("Failed to bind to {}: {}", addr, e))?; 124 125 let server_result = axum::serve(listener, app) 126 .with_graceful_shutdown(shutdown_signal(shutdown_tx)) 127 .await; 128 129 comms_handle.await.ok(); 130 131 if let Some(handle) = crawlers_handle { 132 handle.await.ok(); 133 } 134 135 if let Some(handle) = backup_handle { 136 handle.await.ok(); 137 } 138 139 scheduled_handle.await.ok(); 140 141 if let Err(e) = server_result { 142 return Err(format!("Server error: {}", e).into()); 143 } 144 145 Ok(()) 146} 147 148async fn shutdown_signal(shutdown_tx: watch::Sender<bool>) { 149 let ctrl_c = async { 150 match tokio::signal::ctrl_c().await { 151 Ok(()) => {} 152 Err(e) => { 153 error!("Failed to install Ctrl+C handler: {}", e); 154 } 155 } 156 }; 157 158 #[cfg(unix)] 159 let terminate = async { 160 match tokio::signal::unix::signal(tokio::signal::unix::SignalKind::terminate()) { 161 Ok(mut signal) => { 162 signal.recv().await; 163 } 164 Err(e) => { 165 error!("Failed to install SIGTERM handler: {}", e); 166 std::future::pending::<()>().await; 167 } 168 } 169 }; 170 171 #[cfg(not(unix))] 172 let terminate = std::future::pending::<()>(); 173 174 tokio::select! { 175 _ = ctrl_c => {}, 176 _ = terminate => {}, 177 } 178 179 info!("Shutdown signal received, stopping services..."); 180 shutdown_tx.send(true).ok(); 181}