Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at HEAD 285 lines 10 kB view raw
1use anyhow::{Context, anyhow}; 2use base64::Engine; 3use clap::{Parser, Subcommand}; 4use dotenvy::dotenv; 5use jacquard::url; 6use jacquard_common::xrpc::XrpcExt; 7use lexicon_types_crate::com_pdsmoover::admin::request_instance_backup::RequestInstanceBackup; 8use lexicon_types_crate::com_pdsmoover::admin::request_pds_backup::RequestPdsBackup; 9use lexicon_types_crate::com_pdsmoover::admin::request_repo_backup::RequestRepoBackup; 10use lexicon_types_crate::com_pdsmoover::admin::sign_up_pds::SignUpPds; 11use log; 12use reqwest::header; 13use reqwest::header::{HeaderMap, HeaderValue}; 14use s3::creds::Credentials; 15use s3::{Bucket, Region}; 16use sqlx::PgPool; 17use std::env; 18 19fn init_logging() { 20 // Load .env if present 21 let _ = dotenv(); 22 23 // Initialize env_logger with default filter if RUST_LOG is not set 24 let env = env_logger::Env::default().filter_or("RUST_LOG", "info"); 25 let _ = env_logger::Builder::from_env(env).try_init(); 26} 27 28/// Admin CLI for pds_moover 29#[derive(Debug, Parser)] 30#[command(name = "admin_cli", version, about = "Administrative CLI for pds_moover", long_about = None)] 31struct Cli { 32 /// Admin password (optional). If not provided, the program will read the `admin_password` environment variable. 33 #[arg(short = 'p', long = "admin-password", global = true)] 34 admin_password: Option<String>, 35 36 /// PDS MOOver endpoint (optional). If not provided, the program will call the default endpoint at https://pdsmoover.com. 37 #[arg(short = 'm', long = "moover-host", global = true)] 38 pds_moover_host: Option<String>, 39 40 #[command(subcommand)] 41 command: Commands, 42} 43 44#[derive(Debug, Subcommand)] 45enum Commands { 46 /// PDS related administrative actions 47 Pds { 48 #[command(subcommand)] 49 action: PdsAction, 50 }, 51 /// Repo related administrative actions 52 Repo { 53 #[command(subcommand)] 54 action: RepoAction, 55 }, 56 /// Trigger an instance-wide backup job (no parameters) 57 RequestInstanceBackup, 58 /// Verify all backups in S3 against the database 59 VerifyBackups, 60} 61 62#[derive(Debug, Subcommand)] 63enum PdsAction { 64 /// Sign up a PDS by hostname 65 Signup { 66 /// Hostname of the PDS to sign up 67 hostname: String, 68 }, 69 /// Request a backup for a PDS by hostname 70 RequestBackup { 71 /// Hostname of the PDS to back up 72 hostname: String, 73 }, 74 75 /// Remove a PDS by hostname (not yet implemented server-side) 76 Remove { 77 /// Hostname of the PDS to remove 78 hostname: String, 79 }, 80} 81 82#[derive(Debug, Subcommand)] 83enum RepoAction { 84 /// Request a backup for a specific repo DID 85 RequestBackup { 86 /// DID of the repo to back up 87 did: String, 88 }, 89} 90 91fn resolve_admin_password(opt: &Option<String>) -> anyhow::Result<String> { 92 if let Some(pw) = opt.as_ref() { 93 return Ok(pw.clone()); 94 } 95 match env::var("ADMIN_PASSWORD") { 96 Ok(val) if !val.is_empty() => Ok(val), 97 _ => Err(anyhow!( 98 "Admin password not provided. Pass --admin-password or set env var ADMIN_PASSWORD" 99 )), 100 } 101} 102 103fn build_basic_auth_header(admin_password: &str) -> HeaderValue { 104 // Build Basic base64("admin:<password>") per temporary spec 105 let creds = format!("admin:{}", admin_password); 106 let encoded = base64::engine::general_purpose::STANDARD.encode(creds.as_bytes()); 107 let value = format!("Basic {}", encoded); 108 // Safe unwrap: constructing from known ASCII 109 HeaderValue::from_str(&value).expect("valid basic auth header") 110} 111 112#[tokio::main] 113async fn main() -> anyhow::Result<()> { 114 init_logging(); 115 116 let cli = Cli::parse(); 117 let admin_password = 118 resolve_admin_password(&cli.admin_password).context("failed to resolve admin password")?; 119 120 let mut headers = HeaderMap::new(); 121 headers.insert( 122 header::AUTHORIZATION, 123 build_basic_auth_header(&admin_password), 124 ); 125 126 let http = reqwest::Client::builder() 127 .default_headers(headers) 128 .user_agent("PDS MOOver Admin cli/0.0.1") 129 .build()?; 130 131 let base = url::Url::parse( 132 cli.pds_moover_host 133 .as_deref() 134 // TODO: change this away from dev in prod 135 .unwrap_or("https://pdsmoover.com"), 136 )?; 137 138 match cli.command { 139 Commands::Pds { action } => match action { 140 PdsAction::Signup { hostname } => { 141 log::info!("Signing up PDS"); 142 143 let req = SignUpPds { 144 hostname: hostname.clone().into(), 145 extra_data: Default::default(), 146 }; 147 // Send the typed XRPC request 148 match http.xrpc(base.clone()).send(&req).await { 149 Ok(result) => { 150 if result.status().is_success() { 151 log::info!("Sign up request sent successfully for: {}", hostname); 152 } else { 153 let error = result.parse().unwrap_err(); 154 log::error!("Sign up request failed: {}", error); 155 } 156 } 157 Err(err) => { 158 log::error!("Sign up request failed: {}", err); 159 } 160 } 161 } 162 PdsAction::RequestBackup { hostname } => { 163 log::info!("Requesting PDS backup for host: {}", hostname); 164 let req = RequestPdsBackup { 165 hostname: hostname.clone().into(), 166 extra_data: Default::default(), 167 }; 168 169 match http.xrpc(base.clone()).send(&req).await { 170 Ok(result) => { 171 if result.status().is_success() { 172 log::info!( 173 "PDS backup request enqueued successfully for: {}", 174 hostname 175 ); 176 } else { 177 let error = result.parse().unwrap_err(); 178 log::error!("PDS backup request failed: {}", error); 179 } 180 } 181 Err(err) => { 182 log::error!("PDS backup request failed: {}", err); 183 } 184 } 185 } 186 PdsAction::Remove { hostname: _ } => { 187 log::info!("Removing PDS (not implemented yet)"); 188 // TODO: Implement call to backend API for removal when endpoint is available 189 } 190 }, 191 Commands::Repo { action } => match action { 192 RepoAction::RequestBackup { did } => { 193 log::info!("Requesting repo backup for DID: {}", did); 194 let req = RequestRepoBackup { 195 did: did.clone().into(), 196 extra_data: Default::default(), 197 }; 198 match http.xrpc(base).send(&req).await { 199 Ok(result) => { 200 if result.status().is_success() { 201 log::info!("Repo backup request enqueued successfully for: {}", did); 202 } else { 203 let error = result.parse().unwrap_err(); 204 log::error!("Repo backup request failed: {}", error); 205 } 206 } 207 Err(err) => { 208 log::error!("Repo backup request failed: {}", err); 209 } 210 } 211 } 212 }, 213 Commands::RequestInstanceBackup => { 214 log::info!("Requesting instance-wide backup start"); 215 let req = RequestInstanceBackup; 216 match http.xrpc(base.clone()).send(&req).await { 217 Ok(result) => { 218 if result.status().is_success() { 219 log::info!("Instance backup start enqueued successfully"); 220 } else { 221 let error = result.parse().unwrap_err(); 222 log::error!("Instance backup request failed: {}", error); 223 } 224 } 225 Err(err) => { 226 log::error!("Instance backup request failed: {}", err); 227 } 228 } 229 } 230 Commands::VerifyBackups => { 231 //Not really a part of the cli per say. But I needed it and is a good place as any 232 log::info!("Verifying backups in S3..."); 233 234 // Get database URL from environment 235 let database_url = 236 env::var("DATABASE_URL").context("DATABASE_URL environment variable not set")?; 237 238 // Connect to database 239 let pool = PgPool::connect(&database_url) 240 .await 241 .context("Failed to connect to database")?; 242 243 // Setup S3 client 244 let region_name = env::var("S3_REGION")?; 245 let endpoint = env::var("S3_ENDPOINT")?; 246 let region = Region::Custom { 247 region: region_name, 248 endpoint, 249 }; 250 let bucket = Bucket::new( 251 env::var("S3_BUCKET_NAME")?.as_str(), 252 region, 253 Credentials::new( 254 Some(env::var("S3_ACCESS_KEY")?.as_str()), 255 Some(env::var("S3_SECRET_KEY")?.as_str()), 256 None, 257 None, 258 None, 259 )?, 260 )?; 261 262 // Call the verify_backups function 263 match shared::jobs::verify_backups::verify_backups(&pool, &bucket).await { 264 Ok(missing_blobs) => { 265 if missing_blobs.is_empty() { 266 log::info!("✓ All backups verified successfully! No missing blobs found."); 267 } else { 268 log::error!("✗ Found {} missing blobs:", missing_blobs.len()); 269 for missing in &missing_blobs { 270 println!( 271 "Missing: DID={}, CID/REV={}, TYPE={:?}, PATH={}", 272 missing.did, missing.cid_or_rev, missing.blob_type, missing.s3_path 273 ); 274 } 275 } 276 } 277 Err(err) => { 278 log::error!("Failed to verify backups: {}", err); 279 } 280 } 281 } 282 } 283 284 Ok(()) 285}