Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at bugfix/DetectNewPds 178 lines 6.8 kB view raw
1use apalis::layers::retry::RetryPolicy; 2use apalis::prelude::*; 3use apalis_sql::{ 4 Config, 5 postgres::{PgListen, PgPool, PostgresStorage}, 6}; 7use dotenvy::dotenv; 8use log::{debug, info}; 9use s3::creds::Credentials; 10use s3::{Bucket, Region}; 11use shared::jobs::account_backup::{AccountBackupJobContext, account_backup_job}; 12use shared::jobs::pds_backup::{PdsBackupJobContext, pds_backup_job}; 13use shared::jobs::remove_repo::RemoveRepoJobContext; 14use shared::jobs::scheduled_back_up_start::{ 15 ScheduledBackUpStartJobContext, scheduled_back_up_start_job, 16}; 17use shared::jobs::upload_blob::{UploadBlobJobContext, upload_blob_job}; 18use shared::jobs::{account_backup, pds_backup, remove_repo, scheduled_back_up_start, upload_blob}; 19use std::env; 20use std::sync::Arc; 21use tracing_subscriber::prelude::*; 22 23#[tokio::main] 24async fn main() -> Result<(), Box<dyn std::error::Error>> { 25 use tracing_subscriber::EnvFilter; 26 let _ = dotenv(); 27 let fmt_layer = tracing_subscriber::fmt::layer().with_target(false); 28 let filter_layer = 29 EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("debug"))?; 30 tracing_subscriber::registry() 31 .with(filter_layer) 32 .with(fmt_layer) 33 .init(); 34 35 let worker_node_name = 36 std::env::var("WORKER_NODE_NAME").expect("Must specify worker node name"); 37 38 //job backend setup 39 let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db"); 40 41 let pool = PgPool::connect(&database_url).await?; 42 43 let mut start_backup_storage: PostgresStorage<AccountBackupJobContext> = 44 PostgresStorage::new_with_config(pool.clone(), Config::new(account_backup::JOB_NAMESPACE)); 45 46 let mut upload_blob_storage: PostgresStorage<UploadBlobJobContext> = 47 PostgresStorage::new_with_config(pool.clone(), Config::new(upload_blob::JOB_NAMESPACE)); 48 49 let mut start_back_up_listener = PgListen::new(pool.clone()).await?; 50 start_back_up_listener.subscribe_with(&mut start_backup_storage); 51 52 let mut pull_blobs_listener = PgListen::new(pool.clone()).await?; 53 pull_blobs_listener.subscribe_with(&mut upload_blob_storage); 54 55 let mut pds_backup_storage: PostgresStorage<PdsBackupJobContext> = 56 PostgresStorage::new_with_config(pool.clone(), Config::new(pds_backup::JOB_NAMESPACE)); 57 let mut pds_backup_listener = PgListen::new(pool.clone()).await?; 58 pds_backup_listener.subscribe_with(&mut pds_backup_storage); 59 60 let mut scheduled_backup_storage: PostgresStorage<ScheduledBackUpStartJobContext> = 61 PostgresStorage::new_with_config( 62 pool.clone(), 63 Config::new(scheduled_back_up_start::JOB_NAMESPACE), 64 ); 65 let mut scheduled_backup_listener = PgListen::new(pool.clone()).await?; 66 scheduled_backup_listener.subscribe_with(&mut scheduled_backup_storage); 67 68 let mut remove_repo_storage: PostgresStorage<RemoveRepoJobContext> = 69 PostgresStorage::new_with_config(pool.clone(), Config::new(remove_repo::JOB_NAMESPACE)); 70 let mut remove_repo_listener = PgListen::new(pool.clone()).await?; 71 remove_repo_listener.subscribe_with(&mut remove_repo_storage); 72 73 tokio::spawn(async move { 74 //TODO bad? 75 start_back_up_listener.listen().await.unwrap(); 76 pull_blobs_listener.listen().await.unwrap(); 77 pds_backup_listener.listen().await.unwrap(); 78 scheduled_backup_listener.listen().await.unwrap(); 79 remove_repo_listener.listen().await.unwrap(); 80 }); 81 82 //Atrpoto client setup 83 let atproto_client = Arc::new( 84 reqwest::Client::builder() 85 .user_agent("pds-moover-backups/0.0.1") 86 .build()?, 87 ); 88 89 //S3 90 let region_name = env::var("S3_REGION")?; 91 let endpoint = env::var("S3_ENDPOINT")?; 92 let region = Region::Custom { 93 region: region_name, 94 endpoint, 95 }; 96 97 let bucket = Bucket::new( 98 env::var("S3_BUCKET_NAME")?.as_str(), 99 region, 100 // Credentials are collected from environment, config, profile or instance metadata 101 Credentials::new( 102 Some(env::var("S3_ACCESS_KEY")?.as_str()), 103 Some(env::var("S3_SECRET_KEY")?.as_str()), 104 None, 105 None, 106 None, 107 )?, 108 )?; 109 110 let s3_bucket = Arc::new(bucket); 111 112 log::info!("Starting the worker node: {}", worker_node_name); 113 114 Monitor::new() 115 .register({ 116 WorkerBuilder::new(format!("{}-start-backup", worker_node_name)) 117 .data(pool.clone()) 118 .data(atproto_client.clone()) 119 .concurrency(5) 120 .retry(RetryPolicy::retries(5)) 121 .enable_tracing() 122 .backend(start_backup_storage) 123 .build_fn(account_backup_job) 124 }) 125 .register({ 126 WorkerBuilder::new(format!("{}-upload-blob", worker_node_name)) 127 .data(pool.clone()) 128 .data(atproto_client.clone()) 129 .data(s3_bucket.clone()) 130 .concurrency(20) 131 .retry(RetryPolicy::retries(5)) 132 .enable_tracing() 133 .backend(upload_blob_storage) 134 .build_fn(upload_blob_job) 135 // .chain(|s| { 136 // //Should be a performance boost of the job 137 // s.map_future(|f| async { 138 // let fut = tokio::spawn(f); 139 // let fut = fut.await?; 140 // fut 141 // }) 142 // }) 143 }) 144 .register({ 145 WorkerBuilder::new(format!("{}-pds-backup", worker_node_name)) 146 .data(pool.clone()) 147 .data(atproto_client.clone()) 148 .retry(RetryPolicy::retries(1)) 149 .enable_tracing() 150 .backend(pds_backup_storage) 151 .build_fn(pds_backup_job) 152 }) 153 .register({ 154 WorkerBuilder::new(format!("{}-scheduled-backup-start", worker_node_name)) 155 .data(pool.clone()) 156 .retry(RetryPolicy::retries(5)) 157 .enable_tracing() 158 .backend(scheduled_backup_storage) 159 .build_fn(scheduled_back_up_start_job) 160 }) 161 .register({ 162 WorkerBuilder::new(format!("{}-delete-repo", worker_node_name)) 163 .data(pool.clone()) 164 .data(s3_bucket.clone()) 165 .retry(RetryPolicy::retries(5)) 166 .enable_tracing() 167 .backend(remove_repo_storage) 168 .build_fn(remove_repo::run) 169 }) 170 .on_event(|e| debug!("{e}")) 171 .run_with_signal(async { 172 tokio::signal::ctrl_c().await?; 173 info!("Shutting down the system"); 174 Ok(()) 175 }) 176 .await?; 177 Ok(()) 178}