forked from
baileytownsend.dev/pds-moover
Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
1use crate::jobs::account_backup::AccountBackupJobContext;
2use crate::jobs::{account_backup, push_job_json};
3use apalis::prelude::{Data, Error};
4use jacquard_common::url::Url;
5use jacquard_common::xrpc::XrpcEndpoint;
6use serde::{Deserialize, Serialize};
7use sqlx::{Pool, Postgres};
8use std::sync::Arc;
9
10pub const JOB_NAMESPACE: &str = "apalis::PdsBackup";
11
12#[derive(Debug, Deserialize, Serialize, Clone)]
13pub struct PdsBackupJobContext {
14 pub pds_host: String,
15}
16
17#[derive(Debug, Deserialize, Serialize, Clone)]
18struct ListReposResponseRepo {
19 pub did: String,
20 pub active: bool,
21 pub rev: String,
22}
23
24#[derive(Debug, Deserialize, Serialize, Clone)]
25struct ListReposResponse {
26 pub repos: Vec<ListReposResponseRepo>,
27 pub cursor: Option<String>,
28}
29
30/// Call com.atproto.sync.listRepos on the given PDS host, upsert active repos into accounts,
31/// and enqueue account_backup jobs for each active repo.
32pub async fn pds_backup_job(
33 job: PdsBackupJobContext,
34 pool: Data<Pool<Postgres>>,
35 atproto_client: Data<Arc<reqwest::Client>>,
36) -> Result<(), Error> {
37 let pds_host = format!("https://{}", job.pds_host.clone());
38 log::info!("Starting a backup for the PDS: {}", pds_host);
39
40 // Mark the start time for this PDS backup to prevent duplicate scheduling and to record start time
41 sqlx::query(
42 r#"
43 UPDATE pds_hosts
44 SET last_backup_start = NOW()
45 WHERE pds_host = $1
46 "#,
47 )
48 .bind(&job.pds_host)
49 .execute(&*pool)
50 .await
51 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
52
53 //Always expecting the host to be just the host name
54 let base_url =
55 Url::parse(&pds_host.clone()).map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
56
57 let mut cursor: Option<String> = None;
58 loop {
59 let mut url = format!(
60 "{}{}?limit={}",
61 base_url.as_str().trim_end_matches('/'),
62 jacquard_api::com_atproto::sync::list_repos::ListReposRequest::PATH,
63 1000
64 );
65 if let Some(ref c) = cursor {
66 if !c.is_empty() {
67 url.push_str("&cursor=");
68 url.push_str(c);
69 }
70 }
71
72 let resp = atproto_client
73 .get(url)
74 .send()
75 .await
76 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?
77 .json::<ListReposResponse>()
78 .await
79 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
80
81 // Filter active repos
82 let active_repos: Vec<(String, String)> = resp
83 .repos
84 .into_iter()
85 .filter(|r| r.active)
86 .map(|r| (r.did, r.rev))
87 .collect();
88
89 if !active_repos.is_empty() {
90 // Batch upsert accounts using UNNEST; preserve created_at and pds_sign_up on conflict.
91 let dids_with_revs: Vec<(String, String)> = active_repos
92 .iter()
93 .map(|(d, rev)| (d.clone(), rev.clone()))
94 .collect();
95 let dids: Vec<String> = active_repos.iter().map(|(d, _)| d.clone()).collect();
96
97 // Insert and set pds_sign_up=true only for new rows. Do nothing on conflict.
98 // We unnest only DIDs; pds_host is a single scalar projected for each row.
99 sqlx::query(
100 r#"
101 INSERT INTO accounts (did, pds_host, pds_sign_up)
102 SELECT did, $2::text AS pds_host, TRUE
103 FROM UNNEST($1::text[]) AS u(did)
104 ON CONFLICT (did) DO NOTHING
105 "#,
106 )
107 .bind(&dids)
108 .bind(&job.pds_host)
109 .execute(&*pool)
110 .await
111 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
112
113 // Enqueue account_backup jobs for each active repo in this page
114 for (did, rev) in dids_with_revs.into_iter() {
115 let ctx = AccountBackupJobContext {
116 did,
117 pds_host: job.pds_host.clone(),
118 rev: Some(rev),
119 };
120 push_job_json(&pool, account_backup::JOB_NAMESPACE, &ctx).await?;
121 }
122 }
123 cursor = resp.cursor;
124 if cursor.as_deref().map(|s| s.is_empty()).unwrap_or(true) {
125 break;
126 }
127 }
128
129 Ok(())
130}