Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
0
fork

Configure Feed

Select the types of activity you want to include in your feed.

at main 99 lines 3.0 kB view raw
1use crate::jobs::upload_blob::{BlobType, UploadBlobJobContext}; 2use crate::jobs::{AnyhowErrorWrapper, filter_missing_blob_cids, push_job_json, upload_blob}; 3use apalis::prelude::{Data, Error}; 4use jacquard_api::com_atproto::sync::list_blobs::ListBlobsRequest; 5use jacquard_common::xrpc::XrpcEndpoint; 6use serde::{Deserialize, Serialize}; 7use sqlx::{Pool, Postgres}; 8use std::sync::Arc; 9 10pub const JOB_NAMESPACE: &str = "apalis::AccountBackup"; 11 12#[derive(Debug, Deserialize, Serialize, Clone)] 13pub struct AccountBackupJobContext { 14 pub did: String, 15 // If the job start comes from the PDS backup this is already set saving a request to the PDS 16 pub rev: Option<String>, 17 pub pds_host: String, 18} 19 20#[derive(Deserialize)] 21struct ListBlobsResponse { 22 #[allow(dead_code)] 23 cursor: Option<String>, 24 cids: Vec<String>, 25} 26 27pub async fn account_backup_job( 28 job: AccountBackupJobContext, 29 pool: Data<Pool<Postgres>>, 30 atproto_client: Data<Arc<reqwest::Client>>, 31) -> Result<(), Error> { 32 log::info!("Starting backup for did: {}", job.did); 33 34 push_job_json( 35 &pool, 36 upload_blob::JOB_NAMESPACE, 37 &UploadBlobJobContext { 38 account_backup_job_context: job.clone(), 39 blob_type: BlobType::Repo(job.rev.clone()), 40 last_upload_batch: false, 41 }, 42 ) 43 .await?; 44 45 let mut cursor: Option<String> = None; 46 loop { 47 let mut url = format!( 48 "https://{}{}?did={}&limit={}", 49 job.pds_host, 50 ListBlobsRequest::PATH, 51 job.did, 52 1000 53 ); 54 55 if let Some(ref c) = cursor { 56 if !c.is_empty() { 57 url.push_str("&cursor="); 58 url.push_str(c); 59 } 60 } 61 62 let resp = atproto_client 63 .get(url) 64 .send() 65 .await 66 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))? 67 .json::<ListBlobsResponse>() 68 .await 69 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 70 71 let missing_cids = filter_missing_blob_cids(&pool, &resp.cids, &job.did) 72 .await 73 .map_err(|e| Error::Failed(Arc::new(Box::new(AnyhowErrorWrapper(e)))))?; 74 75 // Process missing CIDs in batches of 5 76 let mut processed = 0; 77 78 for chunk in missing_cids.chunks(5) { 79 processed += chunk.len(); 80 let last_chunk = processed >= missing_cids.len(); 81 push_job_json( 82 &pool, 83 upload_blob::JOB_NAMESPACE, 84 &UploadBlobJobContext { 85 account_backup_job_context: job.clone(), 86 blob_type: BlobType::Blob(chunk.to_vec()), 87 last_upload_batch: last_chunk, 88 }, 89 ) 90 .await?; 91 } 92 93 cursor = resp.cursor; 94 if cursor.is_none() || cursor.as_ref().unwrap().is_empty() { 95 break; 96 } 97 } 98 Ok(()) 99}