Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
at main 6.8 kB view raw
1pub mod account_backup; 2pub mod pds_backup; 3pub mod remove_repo; 4pub mod scheduled_back_up_start; 5pub mod start_all_backup; 6pub mod upload_blob; 7pub mod verify_backups; 8 9use crate::db::models; 10use crate::db::models::BlobModel; 11use apalis::prelude::*; 12use log::info; 13use serde::{Deserialize, Serialize}; 14use serde_json::{self}; 15use sqlx::{Pool, Postgres, query}; 16use std::collections::HashSet; 17use std::fmt; 18use std::sync::Arc; 19 20#[derive(Debug)] 21pub enum JobError { 22 SomeError(&'static str), 23} 24 25impl std::fmt::Display for JobError { 26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 27 write!(f, "{self:?}") 28 } 29} 30 31// Create a wrapper struct that implements std::error::Error 32#[derive(Debug)] 33struct AnyhowErrorWrapper(anyhow::Error); 34 35impl fmt::Display for AnyhowErrorWrapper { 36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { 37 write!(f, "{}", self.0) 38 } 39} 40 41impl std::error::Error for AnyhowErrorWrapper {} 42 43/// Generic helper to manually enqueue any Apalis job by calling the SQL function directly. 44/// 45/// - job_namespace: fully-qualified job type name as stored by Apalis (e.g., "apalis::Email"). 46/// - payload: any struct that implements Serialize + Deserialize; it will be sent as JSON. 47pub async fn push_job_json<T>( 48 pool: &Pool<Postgres>, 49 job_namespace: &str, 50 payload: &T, 51) -> Result<(), Error> 52where 53 T: Serialize + for<'de> Deserialize<'de>, 54{ 55 // Serialize payload to JSON and send to Postgres as json 56 let json_str = 57 serde_json::to_string(payload).map_err(|e| Error::Failed(Arc::new(Box::new(e))))?; 58 59 query("select apalis.push_job($1, $2::json)") 60 .bind(job_namespace) 61 .bind(json_str) 62 .execute(pool) 63 .await 64 .map(|_| ()) 65 .map_err(|e| Error::Failed(Arc::new(Box::new(e)))) 66} 67 68/// Given a list of CIDs, returns those that are NOT already present in the blobs table 69/// with blob type = 'blob' and matches the user's did in the case of duplicate blobs for each user 70pub async fn filter_missing_blob_cids( 71 pool: &Pool<Postgres>, 72 cids: &Vec<String>, 73 users_did: &String, 74) -> anyhow::Result<Vec<String>> { 75 if cids.is_empty() { 76 return Ok(Vec::new()); 77 } 78 79 // Fetch the subset of provided CIDs that already exist as type 'blob' 80 let existing: Vec<String> = sqlx::query_scalar( 81 r#"SELECT cid_or_rev FROM blobs WHERE type = $1 AND cid_or_rev = ANY($2) AND account_did = $3"#, 82 ) 83 .bind(crate::db::models::BlobType::Blob) 84 .bind(&cids) 85 .bind(users_did) 86 .fetch_all(pool) 87 .await?; 88 89 let existing_set: HashSet<&str> = existing.iter().map(|s| s.as_str()).collect(); 90 let missing: Vec<String> = cids 91 .iter() 92 .filter(|cid| !existing_set.contains(cid.as_str())) 93 .cloned() 94 .collect(); 95 96 Ok(missing) 97} 98 99pub async fn record_new_blob( 100 pool: &Pool<Postgres>, 101 did: String, 102 cid_or_rev: String, 103 size: i64, 104 blob_type: models::BlobType, 105) -> anyhow::Result<models::BlobModel> { 106 match blob_type { 107 //On repo we need to upsert on did 108 models::BlobType::Repo => { 109 // First try to update an existing 'repo' blob row for this DID. 110 if let Some(updated) = sqlx::query_as::<_, BlobModel>( 111 r#" 112 UPDATE blobs 113 SET size = $2, 114 type = $3, 115 cid_or_rev = $4 116 WHERE account_did = $1 AND type = $3 117 RETURNING id, created_at, account_did, size, type, cid_or_rev 118 "#, 119 ) 120 .bind(&did) 121 .bind(size) 122 .bind(&blob_type) 123 .bind(&cid_or_rev) 124 .fetch_optional(pool) 125 .await? 126 { 127 Ok(updated) 128 } else { 129 // If no row was updated, insert a new one for this DID and repo type. 130 Ok(sqlx::query_as::<_, BlobModel>( 131 r#" 132 INSERT INTO blobs (account_did, size, type, cid_or_rev) 133 VALUES ($1, $2, $3, $4) 134 RETURNING id, created_at, account_did, size, type, cid_or_rev 135 "#, 136 ) 137 .bind(did) 138 .bind(size) 139 .bind(blob_type) 140 .bind(cid_or_rev) 141 .fetch_one(pool) 142 .await?) 143 } 144 } 145 //on blob we upsert on cid (shouldnt happen ideally) 146 models::BlobType::Blob | _ => Ok(sqlx::query_as::<_, BlobModel>( 147 r#" 148 INSERT INTO blobs (account_did, size, type, cid_or_rev) 149 VALUES ($1, $2, $3, $4) 150 ON CONFLICT (cid_or_rev) DO UPDATE 151 SET account_did = EXCLUDED.account_did, 152 size = EXCLUDED.size, 153 type = EXCLUDED.type, 154 cid_or_rev = EXCLUDED.cid_or_rev 155 RETURNING id, created_at, account_did, size, type, cid_or_rev 156 "#, 157 ) 158 .bind(did) 159 .bind(size) 160 .bind(blob_type) 161 .bind(cid_or_rev) 162 .fetch_one(pool) 163 .await?), 164 } 165} 166 167/// Look up the user's account by DID and return their repo_rev, if present. 168pub async fn get_repo_rev_by_did( 169 pool: &Pool<Postgres>, 170 did: &str, 171) -> anyhow::Result<Option<String>> { 172 // repo_rev is nullable; also the account row may not exist. 173 // Using fetch_optional yields Option<Option<String>> which we flatten to a single Option<String>. 174 let result: Option<Option<String>> = 175 sqlx::query_scalar(r#"SELECT repo_rev FROM accounts WHERE did = $1"#) 176 .bind(did) 177 .fetch_optional(pool) 178 .await?; 179 180 Ok(result.flatten()) 181} 182 183/// Update the repo_rev for a given account identified by DID. 184/// Returns true if a row was updated. 185pub async fn update_repo_rev_by_did( 186 pool: &Pool<Postgres>, 187 did: &str, 188 repo_rev: &str, 189) -> anyhow::Result<bool> { 190 let result = 191 sqlx::query(r#"UPDATE accounts SET repo_rev = $2, last_backup = NOW() WHERE did = $1"#) 192 .bind(did) 193 .bind(repo_rev) 194 .execute(pool) 195 .await?; 196 Ok(result.rows_affected() > 0) 197} 198 199/// Update last_backup to the current timestamp for a given account identified by DID. 200/// Returns true if a row was updated. 201pub async fn update_last_backup_now_by_did( 202 pool: &Pool<Postgres>, 203 did: &str, 204) -> anyhow::Result<bool> { 205 let result = sqlx::query(r#"UPDATE accounts SET last_backup = NOW() WHERE did = $1"#) 206 .bind(did) 207 .execute(pool) 208 .await?; 209 Ok(result.rows_affected() > 0) 210}