Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1use anyhow::{Context, anyhow};
2use base64::Engine;
3use clap::{Parser, Subcommand};
4use dotenvy::dotenv;
5use jacquard::url;
6use jacquard_common::xrpc::XrpcExt;
7use lexicon_types_crate::com_pdsmoover::admin::request_instance_backup::RequestInstanceBackup;
8use lexicon_types_crate::com_pdsmoover::admin::request_pds_backup::RequestPdsBackup;
9use lexicon_types_crate::com_pdsmoover::admin::request_repo_backup::RequestRepoBackup;
10use lexicon_types_crate::com_pdsmoover::admin::sign_up_pds::SignUpPds;
11use log;
12use reqwest::header;
13use reqwest::header::{HeaderMap, HeaderValue};
14use s3::creds::Credentials;
15use s3::{Bucket, Region};
16use sqlx::PgPool;
17use std::env;
18
19fn init_logging() {
20 // Load .env if present
21 let _ = dotenv();
22
23 // Initialize env_logger with default filter if RUST_LOG is not set
24 let env = env_logger::Env::default().filter_or("RUST_LOG", "info");
25 let _ = env_logger::Builder::from_env(env).try_init();
26}
27
28/// Admin CLI for pds_moover
29#[derive(Debug, Parser)]
30#[command(name = "admin_cli", version, about = "Administrative CLI for pds_moover", long_about = None)]
31struct Cli {
32 /// Admin password (optional). If not provided, the program will read the `admin_password` environment variable.
33 #[arg(short = 'p', long = "admin-password", global = true)]
34 admin_password: Option<String>,
35
36 /// PDS MOOver endpoint (optional). If not provided, the program will call the default endpoint at https://pdsmoover.com.
37 #[arg(short = 'm', long = "moover-host", global = true)]
38 pds_moover_host: Option<String>,
39
40 #[command(subcommand)]
41 command: Commands,
42}
43
44#[derive(Debug, Subcommand)]
45enum Commands {
46 /// PDS related administrative actions
47 Pds {
48 #[command(subcommand)]
49 action: PdsAction,
50 },
51 /// Repo related administrative actions
52 Repo {
53 #[command(subcommand)]
54 action: RepoAction,
55 },
56 /// Trigger an instance-wide backup job (no parameters)
57 RequestInstanceBackup,
58 /// Verify all backups in S3 against the database
59 VerifyBackups,
60}
61
62#[derive(Debug, Subcommand)]
63enum PdsAction {
64 /// Sign up a PDS by hostname
65 Signup {
66 /// Hostname of the PDS to sign up
67 hostname: String,
68 },
69 /// Request a backup for a PDS by hostname
70 RequestBackup {
71 /// Hostname of the PDS to back up
72 hostname: String,
73 },
74
75 /// Remove a PDS by hostname (not yet implemented server-side)
76 Remove {
77 /// Hostname of the PDS to remove
78 hostname: String,
79 },
80}
81
82#[derive(Debug, Subcommand)]
83enum RepoAction {
84 /// Request a backup for a specific repo DID
85 RequestBackup {
86 /// DID of the repo to back up
87 did: String,
88 },
89}
90
91fn resolve_admin_password(opt: &Option<String>) -> anyhow::Result<String> {
92 if let Some(pw) = opt.as_ref() {
93 return Ok(pw.clone());
94 }
95 match env::var("ADMIN_PASSWORD") {
96 Ok(val) if !val.is_empty() => Ok(val),
97 _ => Err(anyhow!(
98 "Admin password not provided. Pass --admin-password or set env var ADMIN_PASSWORD"
99 )),
100 }
101}
102
103fn build_basic_auth_header(admin_password: &str) -> HeaderValue {
104 // Build Basic base64("admin:<password>") per temporary spec
105 let creds = format!("admin:{}", admin_password);
106 let encoded = base64::engine::general_purpose::STANDARD.encode(creds.as_bytes());
107 let value = format!("Basic {}", encoded);
108 // Safe unwrap: constructing from known ASCII
109 HeaderValue::from_str(&value).expect("valid basic auth header")
110}
111
112#[tokio::main]
113async fn main() -> anyhow::Result<()> {
114 init_logging();
115
116 let cli = Cli::parse();
117 let admin_password =
118 resolve_admin_password(&cli.admin_password).context("failed to resolve admin password")?;
119
120 let mut headers = HeaderMap::new();
121 headers.insert(
122 header::AUTHORIZATION,
123 build_basic_auth_header(&admin_password),
124 );
125
126 let http = reqwest::Client::builder()
127 .default_headers(headers)
128 .user_agent("PDS MOOver Admin cli/0.0.1")
129 .build()?;
130
131 let base = url::Url::parse(
132 cli.pds_moover_host
133 .as_deref()
134 // TODO: change this away from dev in prod
135 .unwrap_or("https://pdsmoover.com"),
136 )?;
137
138 match cli.command {
139 Commands::Pds { action } => match action {
140 PdsAction::Signup { hostname } => {
141 log::info!("Signing up PDS");
142
143 let req = SignUpPds {
144 hostname: hostname.clone().into(),
145 extra_data: Default::default(),
146 };
147 // Send the typed XRPC request
148 match http.xrpc(base.clone()).send(&req).await {
149 Ok(result) => {
150 if result.status().is_success() {
151 log::info!("Sign up request sent successfully for: {}", hostname);
152 } else {
153 let error = result.parse().unwrap_err();
154 log::error!("Sign up request failed: {}", error);
155 }
156 }
157 Err(err) => {
158 log::error!("Sign up request failed: {}", err);
159 }
160 }
161 }
162 PdsAction::RequestBackup { hostname } => {
163 log::info!("Requesting PDS backup for host: {}", hostname);
164 let req = RequestPdsBackup {
165 hostname: hostname.clone().into(),
166 extra_data: Default::default(),
167 };
168
169 match http.xrpc(base.clone()).send(&req).await {
170 Ok(result) => {
171 if result.status().is_success() {
172 log::info!(
173 "PDS backup request enqueued successfully for: {}",
174 hostname
175 );
176 } else {
177 let error = result.parse().unwrap_err();
178 log::error!("PDS backup request failed: {}", error);
179 }
180 }
181 Err(err) => {
182 log::error!("PDS backup request failed: {}", err);
183 }
184 }
185 }
186 PdsAction::Remove { hostname: _ } => {
187 log::info!("Removing PDS (not implemented yet)");
188 // TODO: Implement call to backend API for removal when endpoint is available
189 }
190 },
191 Commands::Repo { action } => match action {
192 RepoAction::RequestBackup { did } => {
193 log::info!("Requesting repo backup for DID: {}", did);
194 let req = RequestRepoBackup {
195 did: did.clone().into(),
196 extra_data: Default::default(),
197 };
198 match http.xrpc(base).send(&req).await {
199 Ok(result) => {
200 if result.status().is_success() {
201 log::info!("Repo backup request enqueued successfully for: {}", did);
202 } else {
203 let error = result.parse().unwrap_err();
204 log::error!("Repo backup request failed: {}", error);
205 }
206 }
207 Err(err) => {
208 log::error!("Repo backup request failed: {}", err);
209 }
210 }
211 }
212 },
213 Commands::RequestInstanceBackup => {
214 log::info!("Requesting instance-wide backup start");
215 let req = RequestInstanceBackup;
216 match http.xrpc(base.clone()).send(&req).await {
217 Ok(result) => {
218 if result.status().is_success() {
219 log::info!("Instance backup start enqueued successfully");
220 } else {
221 let error = result.parse().unwrap_err();
222 log::error!("Instance backup request failed: {}", error);
223 }
224 }
225 Err(err) => {
226 log::error!("Instance backup request failed: {}", err);
227 }
228 }
229 }
230 Commands::VerifyBackups => {
231 //Not really a part of the cli per say. But I needed it and is a good place as any
232 log::info!("Verifying backups in S3...");
233
234 // Get database URL from environment
235 let database_url =
236 env::var("DATABASE_URL").context("DATABASE_URL environment variable not set")?;
237
238 // Connect to database
239 let pool = PgPool::connect(&database_url)
240 .await
241 .context("Failed to connect to database")?;
242
243 // Setup S3 client
244 let region_name = env::var("S3_REGION")?;
245 let endpoint = env::var("S3_ENDPOINT")?;
246 let region = Region::Custom {
247 region: region_name,
248 endpoint,
249 };
250 let bucket = Bucket::new(
251 env::var("S3_BUCKET_NAME")?.as_str(),
252 region,
253 Credentials::new(
254 Some(env::var("S3_ACCESS_KEY")?.as_str()),
255 Some(env::var("S3_SECRET_KEY")?.as_str()),
256 None,
257 None,
258 None,
259 )?,
260 )?;
261
262 // Call the verify_backups function
263 match shared::jobs::verify_backups::verify_backups(&pool, &bucket).await {
264 Ok(missing_blobs) => {
265 if missing_blobs.is_empty() {
266 log::info!("✓ All backups verified successfully! No missing blobs found.");
267 } else {
268 log::error!("✗ Found {} missing blobs:", missing_blobs.len());
269 for missing in &missing_blobs {
270 println!(
271 "Missing: DID={}, CID/REV={}, TYPE={:?}, PATH={}",
272 missing.did, missing.cid_or_rev, missing.blob_type, missing.s3_path
273 );
274 }
275 }
276 }
277 Err(err) => {
278 log::error!("Failed to verify backups: {}", err);
279 }
280 }
281 }
282 }
283
284 Ok(())
285}