Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
at main 4.3 kB view raw
1use crate::jobs::account_backup::AccountBackupJobContext; 2use crate::jobs::{account_backup, push_job_json}; 3use apalis::prelude::{Data, Error}; 4use jacquard_common::url::Url; 5use jacquard_common::xrpc::XrpcEndpoint; 6use serde::{Deserialize, Serialize}; 7use sqlx::{Pool, Postgres}; 8use std::sync::Arc; 9 10pub const JOB_NAMESPACE: &str = "apalis::PdsBackup"; 11 12#[derive(Debug, Deserialize, Serialize, Clone)] 13pub struct PdsBackupJobContext { 14 pub pds_host: String, 15} 16 17#[derive(Debug, Deserialize, Serialize, Clone)] 18struct ListReposResponseRepo { 19 pub did: String, 20 pub active: bool, 21 pub rev: String, 22} 23 24#[derive(Debug, Deserialize, Serialize, Clone)] 25struct ListReposResponse { 26 pub repos: Vec<ListReposResponseRepo>, 27 pub cursor: Option<String>, 28} 29 30/// Call com.atproto.sync.listRepos on the given PDS host, upsert active repos into accounts, 31/// and enqueue account_backup jobs for each active repo. 32pub async fn pds_backup_job( 33 job: PdsBackupJobContext, 34 pool: Data<Pool<Postgres>>, 35 atproto_client: Data<Arc<reqwest::Client>>, 36) -> Result<(), Error> { 37 let pds_host = format!("https://{}", job.pds_host.clone()); 38 log::info!("Starting a backup for the PDS: {}", pds_host); 39 40 // Mark the start time for this PDS backup to prevent duplicate scheduling and to record start time 41 sqlx::query( 42 r#" 43 UPDATE pds_hosts 44 SET last_backup_start = NOW() 45 WHERE pds_host = $1 46 "#, 47 ) 48 .bind(&job.pds_host) 49 .execute(&*pool) 50 .await 51 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 52 53 //Always expecting the host to be just the host name 54 let base_url = 55 Url::parse(&pds_host.clone()).map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 56 57 let mut cursor: Option<String> = None; 58 loop { 59 let mut url = format!( 60 "{}{}?limit={}", 61 base_url.as_str().trim_end_matches('/'), 62 jacquard_api::com_atproto::sync::list_repos::ListReposRequest::PATH, 63 1000 64 ); 65 if let Some(ref c) = cursor { 66 if !c.is_empty() { 67 url.push_str("&cursor="); 68 url.push_str(c); 69 } 70 } 71 72 let resp = atproto_client 73 .get(url) 74 .send() 75 .await 76 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))? 77 .json::<ListReposResponse>() 78 .await 79 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 80 81 // Filter active repos 82 let active_repos: Vec<(String, String)> = resp 83 .repos 84 .into_iter() 85 .filter(|r| r.active) 86 .map(|r| (r.did, r.rev)) 87 .collect(); 88 89 if !active_repos.is_empty() { 90 // Batch upsert accounts using UNNEST; preserve created_at and pds_sign_up on conflict. 91 let dids_with_revs: Vec<(String, String)> = active_repos 92 .iter() 93 .map(|(d, rev)| (d.clone(), rev.clone())) 94 .collect(); 95 let dids: Vec<String> = active_repos.iter().map(|(d, _)| d.clone()).collect(); 96 97 // Insert and set pds_sign_up=true only for new rows. Do nothing on conflict. 98 // We unnest only DIDs; pds_host is a single scalar projected for each row. 99 sqlx::query( 100 r#" 101 INSERT INTO accounts (did, pds_host, pds_sign_up) 102 SELECT did, $2::text AS pds_host, TRUE 103 FROM UNNEST($1::text[]) AS u(did) 104 ON CONFLICT (did) DO NOTHING 105 "#, 106 ) 107 .bind(&dids) 108 .bind(&job.pds_host) 109 .execute(&*pool) 110 .await 111 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 112 113 // Enqueue account_backup jobs for each active repo in this page 114 for (did, rev) in dids_with_revs.into_iter() { 115 let ctx = AccountBackupJobContext { 116 did, 117 pds_host: job.pds_host.clone(), 118 rev: Some(rev), 119 }; 120 push_job_json(&pool, account_backup::JOB_NAMESPACE, &ctx).await?; 121 } 122 } 123 cursor = resp.cursor; 124 if cursor.as_deref().map(|s| s.is_empty()).unwrap_or(true) { 125 break; 126 } 127 } 128 129 Ok(()) 130}