Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at bugfix/DetectNewPds 285 lines 10 kB view raw
1use anyhow::{Context, anyhow}; 2use base64::Engine; 3use clap::{Parser, Subcommand}; 4use dotenvy::dotenv; 5use jacquard::url; 6use jacquard_common::xrpc::XrpcExt; 7use lexicon_types_crate::com_pdsmoover::admin::request_instance_backup::RequestInstanceBackup; 8use lexicon_types_crate::com_pdsmoover::admin::request_pds_backup::RequestPdsBackup; 9use lexicon_types_crate::com_pdsmoover::admin::request_repo_backup::RequestRepoBackup; 10use lexicon_types_crate::com_pdsmoover::admin::sign_up_pds::SignUpPds; 11use log; 12use reqwest::header; 13use reqwest::header::{HeaderMap, HeaderValue}; 14use s3::creds::Credentials; 15use s3::{Bucket, Region}; 16use sqlx::PgPool; 17use std::env; 18 19fn init_logging() { 20 // Load .env if present 21 let _ = dotenv(); 22 23 // Initialize env_logger with default filter if RUST_LOG is not set 24 let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); 25 let _ = env_logger::Builder::from_env(env).try_init(); 26} 27 28/// Admin CLI for pds_moover 29#[derive(Debug, Parser)] 30#[command(name = "admin_cli", version, about = "Administrative CLI for pds_moover", long_about = None)] 31struct Cli { 32 /// Admin password (optional). If not provided, the program will read the `admin_password` environment variable. 33 #[arg(short = 'p', long = "admin-password", global = true)] 34 admin_password: Option<String>, 35 36 /// PDS MOOver endpoint (optional). If not provided, the program will call the default endpoint at https://pdsmoover.com. 37 #[arg(short = 'm', long = "moover-host", global = true)] 38 pds_moover_host: Option<String>, 39 40 #[command(subcommand)] 41 command: Commands, 42} 43 44#[derive(Debug, Subcommand)] 45enum Commands { 46 /// PDS related administrative actions 47 Pds { 48 #[command(subcommand)] 49 action: PdsAction, 50 }, 51 /// Repo related administrative actions 52 Repo { 53 #[command(subcommand)] 54 action: RepoAction, 55 }, 56 /// Trigger an instance-wide backup job (no parameters) 57 RequestInstanceBackup, 58 /// Verify all backups in S3 against the database 59 VerifyBackups, 60} 61 62#[derive(Debug, Subcommand)] 63enum PdsAction { 64 /// Sign up a PDS by hostname 65 Signup { 66 /// Hostname of the PDS to sign up 67 hostname: String, 68 }, 69 /// Request a backup for a PDS by hostname 70 RequestBackup { 71 /// Hostname of the PDS to back up 72 hostname: String, 73 }, 74 75 /// Remove a PDS by hostname (not yet implemented server-side) 76 Remove { 77 /// Hostname of the PDS to remove 78 hostname: String, 79 }, 80} 81 82#[derive(Debug, Subcommand)] 83enum RepoAction { 84 /// Request a backup for a specific repo DID 85 RequestBackup { 86 /// DID of the repo to back up 87 did: String, 88 }, 89} 90 91fn resolve_admin_password(opt: &Option<String>) -> anyhow::Result<String> { 92 if let Some(pw) = opt.as_ref() { 93 return Ok(pw.clone()); 94 } 95 match env::var("ADMIN_PASSWORD") { 96 Ok(val) if !val.is_empty() => Ok(val), 97 _ => Err(anyhow!( 98 "Admin password not provided. Pass --admin-password or set env var ADMIN_PASSWORD" 99 )), 100 } 101} 102 103fn build_basic_auth_header(admin_password: &str) -> HeaderValue { 104 // Build Basic base64("admin:<password>") per temporary spec 105 let creds = format!("admin:{}", admin_password); 106 let encoded = base64::engine::general_purpose::STANDARD.encode(creds.as_bytes()); 107 let value = format!("Basic {}", encoded); 108 // Safe unwrap: constructing from known ASCII 109 HeaderValue::from_str(&value).expect("valid basic auth header") 110} 111 112#[tokio::main] 113async fn main() -> anyhow::Result<()> { 114 init_logging(); 115 116 let cli = Cli::parse(); 117 let admin_password = 118 resolve_admin_password(&cli.admin_password).context("failed to resolve admin password")?; 119 120 let mut headers = HeaderMap::new(); 121 headers.insert( 122 header::AUTHORIZATION, 123 build_basic_auth_header(&admin_password), 124 ); 125 126 let http = reqwest::Client::builder() 127 .default_headers(headers) 128 .user_agent("PDS MOOver Admin cli/0.0.1") 129 .build()?; 130 131 let base = url::Url::parse( 132 cli.pds_moover_host 133 .as_deref() 134 // TODO: change this away from dev in prod 135 .unwrap_or("https://pdsmoover.com"), 136 )?; 137 138 match cli.command { 139 Commands::Pds { action } => match action { 140 PdsAction::Signup { hostname } => { 141 log::info!("Signing up PDS"); 142 143 let req = SignUpPds { 144 hostname: hostname.clone().into(), 145 extra_data: Default::default(), 146 }; 147 // Send the typed XRPC request 148 match http.xrpc(base.clone()).send(&req).await { 149 Ok(result) => { 150 if result.status().is_success() { 151 log::info!("Sign up request sent successfully for: {}", hostname); 152 } else { 153 let error = result.parse().unwrap_err(); 154 log::error!("Sign up request failed: {}", error); 155 } 156 } 157 Err(err) => { 158 log::error!("Sign up request failed: {}", err); 159 } 160 } 161 } 162 PdsAction::RequestBackup { hostname } => { 163 log::info!("Requesting PDS backup for host: {}", hostname); 164 let req = RequestPdsBackup { 165 hostname: hostname.clone().into(), 166 extra_data: Default::default(), 167 }; 168 169 match http.xrpc(base.clone()).send(&req).await { 170 Ok(result) => { 171 if result.status().is_success() { 172 log::info!( 173 "PDS backup request enqueued successfully for: {}", 174 hostname 175 ); 176 } else { 177 let error = result.parse().unwrap_err(); 178 log::error!("PDS backup request failed: {}", error); 179 } 180 } 181 Err(err) => { 182 log::error!("PDS backup request failed: {}", err); 183 } 184 } 185 } 186 PdsAction::Remove { hostname: _ } => { 187 log::info!("Removing PDS (not implemented yet)"); 188 // TODO: Implement call to backend API for removal when endpoint is available 189 } 190 }, 191 Commands::Repo { action } => match action { 192 RepoAction::RequestBackup { did } => { 193 log::info!("Requesting repo backup for DID: {}", did); 194 let req = RequestRepoBackup { 195 did: did.clone().into(), 196 extra_data: Default::default(), 197 }; 198 match http.xrpc(base).send(&req).await { 199 Ok(result) => { 200 if result.status().is_success() { 201 log::info!("Repo backup request enqueued successfully for: {}", did); 202 } else { 203 let error = result.parse().unwrap_err(); 204 log::error!("Repo backup request failed: {}", error); 205 } 206 } 207 Err(err) => { 208 log::error!("Repo backup request failed: {}", err); 209 } 210 } 211 } 212 }, 213 Commands::RequestInstanceBackup => { 214 log::info!("Requesting instance-wide backup start"); 215 let req = RequestInstanceBackup; 216 match http.xrpc(base.clone()).send(&req).await { 217 Ok(result) => { 218 if result.status().is_success() { 219 log::info!("Instance backup start enqueued successfully"); 220 } else { 221 let error = result.parse().unwrap_err(); 222 log::error!("Instance backup request failed: {}", error); 223 } 224 } 225 Err(err) => { 226 log::error!("Instance backup request failed: {}", err); 227 } 228 } 229 } 230 Commands::VerifyBackups => { 231 //Not really a part of the cli per say. But I needed it and is a good place as any 232 log::info!("Verifying backups in S3..."); 233 234 // Get database URL from environment 235 let database_url = 236 env::var("DATABASE_URL").context("DATABASE_URL environment variable not set")?; 237 238 // Connect to database 239 let pool = PgPool::connect(&database_url) 240 .await 241 .context("Failed to connect to database")?; 242 243 // Setup S3 client 244 let region_name = env::var("S3_REGION")?; 245 let endpoint = env::var("S3_ENDPOINT")?; 246 let region = Region::Custom { 247 region: region_name, 248 endpoint, 249 }; 250 let bucket = Bucket::new( 251 env::var("S3_BUCKET_NAME")?.as_str(), 252 region, 253 Credentials::new( 254 Some(env::var("S3_ACCESS_KEY")?.as_str()), 255 Some(env::var("S3_SECRET_KEY")?.as_str()), 256 None, 257 None, 258 None, 259 )?, 260 )?; 261 262 // Call the verify_backups function 263 match shared::jobs::verify_backups::verify_backups(&pool, &bucket).await { 264 Ok(missing_blobs) => { 265 if missing_blobs.is_empty() { 266 log::info!("✓ All backups verified successfully! No missing blobs found."); 267 } else { 268 log::error!("✗ Found {} missing blobs:", missing_blobs.len()); 269 for missing in &missing_blobs { 270 println!( 271 "Missing: DID={}, CID/REV={}, TYPE={:?}, PATH={}", 272 missing.did, missing.cid_or_rev, missing.blob_type, missing.s3_path 273 ); 274 } 275 } 276 } 277 Err(err) => { 278 log::error!("Failed to verify backups: {}", err); 279 } 280 } 281 } 282 } 283 284 Ok(()) 285}