Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1use apalis::layers::retry::RetryPolicy;
2use apalis::prelude::*;
3use apalis_sql::{
4 Config,
5 postgres::{PgListen, PgPool, PostgresStorage},
6};
7use dotenvy::dotenv;
8use log::{debug, info};
9use s3::creds::Credentials;
10use s3::{Bucket, Region};
11use shared::jobs::account_backup::{AccountBackupJobContext, account_backup_job};
12use shared::jobs::pds_backup::{PdsBackupJobContext, pds_backup_job};
13use shared::jobs::remove_repo::RemoveRepoJobContext;
14use shared::jobs::scheduled_back_up_start::{
15 ScheduledBackUpStartJobContext, scheduled_back_up_start_job,
16};
17use shared::jobs::upload_blob::{UploadBlobJobContext, upload_blob_job};
18use shared::jobs::{account_backup, pds_backup, remove_repo, scheduled_back_up_start, upload_blob};
19use std::env;
20use std::sync::Arc;
21use tracing_subscriber::prelude::*;
22
23#[tokio::main]
24async fn main() -> Result<(), Box<dyn std::error::Error>> {
25 use tracing_subscriber::EnvFilter;
26 let _ = dotenv();
27 let fmt_layer = tracing_subscriber::fmt::layer().with_target(false);
28 let filter_layer =
29 EnvFilter::try_from_default_env().or_else(|_| EnvFilter::try_new("debug"))?;
30 tracing_subscriber::registry()
31 .with(filter_layer)
32 .with(fmt_layer)
33 .init();
34
35 let worker_node_name =
36 std::env::var("WORKER_NODE_NAME").expect("Must specify worker node name");
37
38 //job backend setup
39 let database_url = std::env::var("DATABASE_URL").expect("Must specify path to db");
40
41 let pool = PgPool::connect(&database_url).await?;
42
43 let mut start_backup_storage: PostgresStorage<AccountBackupJobContext> =
44 PostgresStorage::new_with_config(pool.clone(), Config::new(account_backup::JOB_NAMESPACE));
45
46 let mut upload_blob_storage: PostgresStorage<UploadBlobJobContext> =
47 PostgresStorage::new_with_config(pool.clone(), Config::new(upload_blob::JOB_NAMESPACE));
48
49 let mut start_back_up_listener = PgListen::new(pool.clone()).await?;
50 start_back_up_listener.subscribe_with(&mut start_backup_storage);
51
52 let mut pull_blobs_listener = PgListen::new(pool.clone()).await?;
53 pull_blobs_listener.subscribe_with(&mut upload_blob_storage);
54
55 let mut pds_backup_storage: PostgresStorage<PdsBackupJobContext> =
56 PostgresStorage::new_with_config(pool.clone(), Config::new(pds_backup::JOB_NAMESPACE));
57 let mut pds_backup_listener = PgListen::new(pool.clone()).await?;
58 pds_backup_listener.subscribe_with(&mut pds_backup_storage);
59
60 let mut scheduled_backup_storage: PostgresStorage<ScheduledBackUpStartJobContext> =
61 PostgresStorage::new_with_config(
62 pool.clone(),
63 Config::new(scheduled_back_up_start::JOB_NAMESPACE),
64 );
65 let mut scheduled_backup_listener = PgListen::new(pool.clone()).await?;
66 scheduled_backup_listener.subscribe_with(&mut scheduled_backup_storage);
67
68 let mut remove_repo_storage: PostgresStorage<RemoveRepoJobContext> =
69 PostgresStorage::new_with_config(pool.clone(), Config::new(remove_repo::JOB_NAMESPACE));
70 let mut remove_repo_listener = PgListen::new(pool.clone()).await?;
71 remove_repo_listener.subscribe_with(&mut remove_repo_storage);
72
73 tokio::spawn(async move {
74 //TODO bad?
75 start_back_up_listener.listen().await.unwrap();
76 pull_blobs_listener.listen().await.unwrap();
77 pds_backup_listener.listen().await.unwrap();
78 scheduled_backup_listener.listen().await.unwrap();
79 remove_repo_listener.listen().await.unwrap();
80 });
81
82 //Atrpoto client setup
83 let atproto_client = Arc::new(
84 reqwest::Client::builder()
85 .user_agent("pds-moover-backups/0.0.1")
86 .build()?,
87 );
88
89 //S3
90 let region_name = env::var("S3_REGION")?;
91 let endpoint = env::var("S3_ENDPOINT")?;
92 let region = Region::Custom {
93 region: region_name,
94 endpoint,
95 };
96
97 let bucket = Bucket::new(
98 env::var("S3_BUCKET_NAME")?.as_str(),
99 region,
100 // Credentials are collected from environment, config, profile or instance metadata
101 Credentials::new(
102 Some(env::var("S3_ACCESS_KEY")?.as_str()),
103 Some(env::var("S3_SECRET_KEY")?.as_str()),
104 None,
105 None,
106 None,
107 )?,
108 )?;
109
110 let s3_bucket = Arc::new(bucket);
111
112 log::info!("Starting the worker node: {}", worker_node_name);
113
114 Monitor::new()
115 .register({
116 WorkerBuilder::new(format!("{}-start-backup", worker_node_name))
117 .data(pool.clone())
118 .data(atproto_client.clone())
119 .concurrency(5)
120 .retry(RetryPolicy::retries(5))
121 .enable_tracing()
122 .backend(start_backup_storage)
123 .build_fn(account_backup_job)
124 })
125 .register({
126 WorkerBuilder::new(format!("{}-upload-blob", worker_node_name))
127 .data(pool.clone())
128 .data(atproto_client.clone())
129 .data(s3_bucket.clone())
130 .concurrency(20)
131 .retry(RetryPolicy::retries(5))
132 .enable_tracing()
133 .backend(upload_blob_storage)
134 .build_fn(upload_blob_job)
135 // .chain(|s| {
136 // //Should be a performance boost of the job
137 // s.map_future(|f| async {
138 // let fut = tokio::spawn(f);
139 // let fut = fut.await?;
140 // fut
141 // })
142 // })
143 })
144 .register({
145 WorkerBuilder::new(format!("{}-pds-backup", worker_node_name))
146 .data(pool.clone())
147 .data(atproto_client.clone())
148 .retry(RetryPolicy::retries(1))
149 .enable_tracing()
150 .backend(pds_backup_storage)
151 .build_fn(pds_backup_job)
152 })
153 .register({
154 WorkerBuilder::new(format!("{}-scheduled-backup-start", worker_node_name))
155 .data(pool.clone())
156 .retry(RetryPolicy::retries(5))
157 .enable_tracing()
158 .backend(scheduled_backup_storage)
159 .build_fn(scheduled_back_up_start_job)
160 })
161 .register({
162 WorkerBuilder::new(format!("{}-delete-repo", worker_node_name))
163 .data(pool.clone())
164 .data(s3_bucket.clone())
165 .retry(RetryPolicy::retries(5))
166 .enable_tracing()
167 .backend(remove_repo_storage)
168 .build_fn(remove_repo::run)
169 })
170 .on_event(|e| debug!("{e}"))
171 .run_with_signal(async {
172 tokio::signal::ctrl_c().await?;
173 info!("Shutting down the system");
174 Ok(())
175 })
176 .await?;
177 Ok(())
178}