Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.

wip

Changed files
+141 -2
admin_cli
shared
+3
Cargo.lock
··· 2939 2939 "lexicon_types_crate", 2940 2940 "log", 2941 2941 "reqwest", 2942 + "rust-s3", 2943 + "shared", 2944 + "sqlx", 2942 2945 "tokio", 2943 2946 ] 2944 2947
+4 -1
admin_cli/Cargo.toml
··· 16 16 jacquard.workspace = true 17 17 lexicon_types_crate.workspace = true 18 18 reqwest.workspace = true 19 - base64 = "0.22" 19 + base64 = "0.22" 20 + shared = { path = "../shared" } 21 + sqlx = { version = "0.8.6", features = ["runtime-tokio", "postgres"] } 22 + rust-s3.workspace = true
+56
admin_cli/src/main.rs
··· 11 11 use log; 12 12 use reqwest::header; 13 13 use reqwest::header::{HeaderMap, HeaderValue}; 14 + use s3::creds::Credentials; 15 + use s3::{Bucket, Region}; 16 + use sqlx::PgPool; 14 17 use std::env; 15 18 16 19 fn init_logging() { ··· 52 55 }, 53 56 /// Trigger an instance-wide backup job (no parameters) 54 57 RequestInstanceBackup, 58 + /// Verify all backups in S3 against the database 59 + VerifyBackups, 55 60 } 56 61 57 62 #[derive(Debug, Subcommand)] ··· 219 224 } 220 225 Err(err) => { 221 226 log::error!("Instance backup request failed: {}", err); 227 + } 228 + } 229 + } 230 + Commands::VerifyBackups => { 231 + log::info!("Verifying backups in S3..."); 232 + 233 + // Get database URL from environment 234 + let database_url = 235 + env::var("DATABASE_URL").context("DATABASE_URL environment variable not set")?; 236 + 237 + // Connect to database 238 + let pool = PgPool::connect(&database_url) 239 + .await 240 + .context("Failed to connect to database")?; 241 + 242 + // Setup S3 client 243 + let region_name = env::var("S3_REGION")?; 244 + let endpoint = env::var("S3_ENDPOINT")?; 245 + let region = Region::Custom { 246 + region: region_name, 247 + endpoint, 248 + }; 249 + let bucket = Bucket::new( 250 + env::var("S3_BUCKET_NAME")?.as_str(), 251 + region, 252 + Credentials::new( 253 + Some(env::var("S3_ACCESS_KEY")?.as_str()), 254 + Some(env::var("S3_SECRET_KEY")?.as_str()), 255 + None, 256 + None, 257 + None, 258 + )?, 259 + )?; 260 + 261 + // Call the verify_backups function 262 + match shared::jobs::verify_backups::verify_backups(&pool, &bucket).await { 263 + Ok(missing_blobs) => { 264 + if missing_blobs.is_empty() { 265 + log::info!("✓ All backups verified successfully! No missing blobs found."); 266 + } else { 267 + log::error!("✗ Found {} missing blobs:", missing_blobs.len()); 268 + for missing in &missing_blobs { 269 + println!( 270 + "Missing: DID={}, CID/REV={}, TYPE={:?}, PATH={}", 271 + missing.did, missing.cid_or_rev, missing.blob_type, missing.s3_path 272 + ); 273 + } 274 + } 275 + } 276 + Err(err) => { 277 + log::error!("Failed to verify backups: {}", err); 222 278 } 223 279 } 224 280 }
+1 -1
shared/src/db/models.rs
··· 30 30 pub created_at: DateTime<Utc>, 31 31 pub account_did: String, 32 32 pub size: i64, 33 - pub blob_type: BlobType, 33 + pub r#type: BlobType, 34 34 pub cid_or_rev: String, 35 35 } 36 36
+1
shared/src/jobs/mod.rs
··· 4 4 pub mod scheduled_back_up_start; 5 5 pub mod start_all_backup; 6 6 pub mod upload_blob; 7 + pub mod verify_backups; 7 8 8 9 use crate::db::models; 9 10 use crate::db::models::BlobModel;
+76
shared/src/jobs/verify_backups.rs
··· 1 + use crate::db::models::{BlobModel, BlobType}; 2 + use crate::storage::{blob_backup_path, repo_backup_path}; 3 + use anyhow::Result; 4 + use s3::Bucket; 5 + use sqlx::{Pool, Postgres}; 6 + 7 + /// Verifies that all blobs in the database exist in S3. 8 + /// Returns a Vec of missing blob information (did, cid_or_rev, blob_type). 9 + pub async fn verify_backups(pool: &Pool<Postgres>, s3_bucket: &Bucket) -> Result<Vec<MissingBlob>> { 10 + // Get all blobs from the database 11 + let blobs = sqlx::query_as::<_, BlobModel>("SELECT * FROM blobs ORDER BY created_at") 12 + .fetch_all(pool) 13 + .await?; 14 + 15 + let total_blobs = blobs.len(); 16 + log::info!("Checking {} blobs in S3...", total_blobs); 17 + 18 + let mut missing_blobs = Vec::new(); 19 + let mut checked = 0; 20 + 21 + for blob in blobs { 22 + checked += 1; 23 + if checked % 100 == 0 { 24 + log::info!("Checked {}/{} blobs...", checked, total_blobs); 25 + } 26 + 27 + let s3_path = match blob.r#type { 28 + BlobType::Repo => repo_backup_path(blob.account_did.clone()), 29 + BlobType::Blob => blob_backup_path(blob.account_did.clone(), blob.cid_or_rev.clone()), 30 + BlobType::Prefs => { 31 + // Handle prefs if needed - for now skip 32 + log::debug!("Skipping prefs blob: {:?}", blob); 33 + continue; 34 + } 35 + }; 36 + 37 + // Check if the object exists in S3 38 + match s3_bucket.head_object(&s3_path).await { 39 + Ok(_) => { 40 + // Object exists, all good 41 + log::debug!("✓ Found: {}", s3_path); 42 + } 43 + Err(e) => { 44 + // Check if it's a 404 error (not found) 45 + if e.to_string().contains("404") { 46 + log::warn!("✗ Missing: {}", s3_path); 47 + missing_blobs.push(MissingBlob { 48 + did: blob.account_did.clone(), 49 + cid_or_rev: blob.cid_or_rev.clone(), 50 + blob_type: blob.r#type.clone(), 51 + s3_path, 52 + }); 53 + } else { 54 + // Some other error - log it but don't count as missing 55 + log::error!("Error checking {}: {}", s3_path, e); 56 + } 57 + } 58 + } 59 + } 60 + 61 + log::info!( 62 + "Verification complete. Checked {} blobs, found {} missing.", 63 + checked, 64 + missing_blobs.len() 65 + ); 66 + 67 + Ok(missing_blobs) 68 + } 69 + 70 + #[derive(Debug, Clone)] 71 + pub struct MissingBlob { 72 + pub did: String, 73 + pub cid_or_rev: String, 74 + pub blob_type: BlobType, 75 + pub s3_path: String, 76 + }