Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations. pdsmoover.com
pds atproto migrations moo cow
at bugfix/DetectNewPds 275 lines 9.7 kB view raw
1use crate::db::models::{AccountModel, PdsHostModel}; 2use anyhow::Result; 3use chrono::{DateTime, Duration, Utc}; 4use serde::{Deserialize, Serialize}; 5use sqlx::migrate::Migrator; 6use sqlx::{FromRow, PgPool, postgres::PgPoolOptions}; 7use std::primitive::bool; 8 9static MIGRATOR: Migrator = sqlx::migrate!(); // defaults to "./ 10 11pub mod models; 12#[derive(Clone)] 13pub struct Db { 14 pool: PgPool, 15} 16 17impl Db { 18 pub fn new(pool: &PgPool) -> Self { 19 Db { pool: pool.clone() } 20 } 21 22 pub async fn connect(database_url: &str) -> Result<Self> { 23 let pool = PgPoolOptions::new() 24 .max_connections(5) 25 .connect(database_url) 26 .await?; 27 Ok(Self { pool }) 28 } 29 30 pub fn pool(&self) -> &PgPool { 31 &self.pool 32 } 33 34 pub async fn apply_migrations(&self) -> Result<()> { 35 // The path is relative to this crate's Cargo.toml 36 sqlx::migrate!().run(&self.pool).await?; 37 Ok(()) 38 } 39 40 pub async fn sign_up_new_account(&self, did: String, pds_host: String) -> Result<AccountModel> { 41 Ok(sqlx::query_as::<_, AccountModel>( 42 "INSERT INTO accounts (did, pds_host) VALUES ($1, $2) RETURNING *", 43 ) 44 .bind(did) 45 .bind(pds_host) 46 .fetch_one(&self.pool) 47 .await?) 48 } 49 50 pub async fn update_account_pds_host(&self, did: &str, pds_host: &str) -> Result<u64> { 51 let result = 52 sqlx::query("UPDATE accounts SET pds_host = $2 WHERE did = $1 AND pds_host <> $2") 53 .bind(did) 54 .bind(pds_host) 55 .execute(&self.pool) 56 .await?; 57 Ok(result.rows_affected()) 58 } 59 60 pub async fn delete_blobs_by_did(&self, did: &str) -> Result<u64> { 61 let result = sqlx::query("DELETE FROM blobs WHERE account_did = $1") 62 .bind(did) 63 .execute(&self.pool) 64 .await?; 65 Ok(result.rows_affected()) 66 } 67 68 pub async fn delete_missing_blobs_by_did(&self, did: &str) -> Result<u64> { 69 // Table referenced in get_repo_status; delete any rows for this DID if present 70 let result = sqlx::query("DELETE FROM missing_blobs WHERE did = $1") 71 .bind(did) 72 .execute(&self.pool) 73 .await?; 74 Ok(result.rows_affected()) 75 } 76 77 pub async fn delete_account_by_did(&self, did: &str) -> Result<u64> { 78 let result = sqlx::query("DELETE FROM accounts WHERE did = $1") 79 .bind(did) 80 .execute(&self.pool) 81 .await?; 82 Ok(result.rows_affected()) 83 } 84 85 /// Returns account and aggregate repo backup information for the given DID. 86 /// 87 /// This is intended to provide the data needed by the 88 /// com.pdsmoover.backup.getRepoStatus endpoint handler. 89 pub async fn get_repo_status(&self, did: &str) -> Result<Option<RepoStatusRow>> { 90 let row = sqlx::query_as::<_, RepoStatusRow>( 91 r#" 92 SELECT 93 a.active AS active, 94 a.created_at AS created_at, 95 a.did AS did, 96 a.repo_rev AS repo_rev, 97 a.last_backup AS last_backup, 98 a.pds_host AS pds_host, 99 SUM(b.size)::BIGINT AS estimated_backup_size, 100 COUNT(DISTINCT b.id) AS blob_count, 101 COUNT(DISTINCT mb.id) AS missing_blob_count 102 FROM accounts a 103 LEFT JOIN blobs b ON b.account_did = a.did 104 LEFT JOIN missing_blobs mb ON mb.did = a.did 105 WHERE a.did = $1 106 GROUP BY a.active, a.created_at, a.did, a.repo_rev, a.last_backup, a.pds_host 107 "#, 108 ) 109 .bind(did) 110 .fetch_optional(&self.pool) 111 .await?; 112 113 Ok(row) 114 } 115 116 pub async fn is_user_already_registered(&self, did: &str) -> Result<bool> { 117 let active = sqlx::query_scalar::<_, bool>( 118 "SELECT active FROM accounts WHERE did = $1 AND active = TRUE", 119 ) 120 .bind(did) 121 .fetch_optional(&self.pool) 122 .await?; 123 124 match active { 125 None => Ok(false), 126 Some(active) => Ok(active), 127 } 128 } 129 130 pub async fn sign_up_new_pds( 131 &self, 132 pds_host: &str, 133 admin_did: Option<&str>, 134 ) -> Result<PdsHostModel> { 135 Ok(sqlx::query_as::<_, PdsHostModel>( 136 "INSERT INTO pds_hosts (pds_host, admin_did) 137 VALUES ($1, $2) 138 ON CONFLICT (pds_host) DO UPDATE 139 SET active = TRUE, 140 admin_did = COALESCE(EXCLUDED.admin_did, pds_hosts.admin_did) 141 RETURNING *", 142 ) 143 .bind(pds_host) 144 .bind(admin_did) 145 .fetch_one(&self.pool) 146 .await?) 147 } 148 149 pub async fn is_pds_active(&self, pds_host: &str) -> Result<bool> { 150 let active = sqlx::query_scalar::<_, bool>( 151 "SELECT active FROM pds_hosts WHERE pds_host = $1 AND active = TRUE", 152 ) 153 .bind(pds_host) 154 .fetch_optional(&self.pool) 155 .await?; 156 157 Ok(active.unwrap_or(false)) 158 } 159 pub async fn describe_server(&self) -> Result<DescribeServerRow> { 160 // Aggregate server-wide stats. 161 // Note: next_backup_due_at scheduling logic not defined; return NULL. 162 let row = sqlx::query_as::<_, DescribeServerRow>( 163 r#" 164 SELECT 165 -- Total number of active repos/accounts 166 (SELECT COUNT(*)::BIGINT FROM accounts a WHERE a.active = TRUE) AS total_repos, 167 -- Total number of blobs tracked 168 COALESCE((SELECT COUNT(*)::BIGINT FROM blobs), 0) AS total_blobs, 169 -- Estimated blobs size on disk (sum of all blob sizes) 170 COALESCE((SELECT SUM(size)::BIGINT FROM blobs), 0) AS estimated_blobs_size_on_disk, 171 -- Estimated repos size (sum of sizes for repo/prefs types) 172 COALESCE((SELECT SUM(size)::BIGINT FROM blobs WHERE type IN ('repo','prefs')), 0) AS estimated_repos_size_on_disk, 173 -- Last backup time across all accounts 174 (SELECT MAX(started_at) FROM network_backup_runs) AS last_backup_at, 175 -- Placeholder for next backup due at (unknown scheduling) 176 NULL::TIMESTAMPTZ AS next_backup_due_at 177 "#, 178 ) 179 .fetch_one(&self.pool) 180 .await?; 181 182 Ok(row) 183 } 184 185 /// Attempt to mark the start of a manual backup for the given DID. 186 /// Returns whether the account exists, whether it is too soon since the last start, 187 /// or whether we have successfully marked the start time. 188 pub async fn try_start_manual_backup( 189 &self, 190 did: &str, 191 min_interval: Duration, 192 ) -> Result<ManualBackupStartOutcome> { 193 // Define a wrapper struct for DateTime<Utc> 194 #[derive(FromRow)] 195 struct LastBackupRow { 196 last_backup_started: Option<DateTime<Utc>>, 197 } 198 199 // Fetch last_backup_started for this account 200 let rec = sqlx::query_as::<_, LastBackupRow>( 201 r#"SELECT last_backup_started FROM accounts WHERE did = $1"#, 202 ) 203 .bind(did) 204 .fetch_optional(&self.pool) 205 .await?; 206 207 let Some(row) = rec else { 208 return Ok(ManualBackupStartOutcome::NotFound); 209 }; 210 211 // If a backup was started recently, don't start another 212 if let Some(last_started) = row.last_backup_started { 213 let threshold = chrono::Utc::now() - min_interval; 214 if last_started > threshold { 215 return Ok(ManualBackupStartOutcome::TooSoon { 216 last_backup_started: last_started, 217 }); 218 } 219 } 220 221 // Update last_backup_started to NOW() to rate-limit subsequent calls 222 let res = sqlx::query!( 223 r#"UPDATE accounts SET last_backup_started = NOW() WHERE did = $1"#, 224 did 225 ) 226 .execute(&self.pool) 227 .await?; 228 229 if res.rows_affected() == 0 { 230 // Extremely unlikely race: row deleted after read 231 return Ok(ManualBackupStartOutcome::NotFound); 232 } 233 234 Ok(ManualBackupStartOutcome::Started) 235 } 236} 237 238#[derive(Debug, Clone)] 239pub enum ManualBackupStartOutcome { 240 NotFound, 241 TooSoon { last_backup_started: DateTime<Utc> }, 242 Started, 243} 244 245#[derive(Debug, Serialize, Deserialize, FromRow)] 246pub struct DemoItem { 247 pub id: i64, 248 pub text: String, 249 pub created_at: DateTime<Utc>, 250} 251 252/// Row returned by get_repo_status, containing account details and 253/// aggregated blob info for the repo backup. 254#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] 255pub struct RepoStatusRow { 256 pub active: bool, 257 pub created_at: DateTime<Utc>, 258 pub did: String, 259 pub repo_rev: Option<String>, 260 pub last_backup: Option<DateTime<Utc>>, 261 pub pds_host: String, 262 pub estimated_backup_size: Option<i64>, 263 pub blob_count: Option<i64>, 264 pub missing_blob_count: Option<i64>, 265} 266 267#[derive(Debug, Serialize, Deserialize, FromRow, Clone)] 268pub struct DescribeServerRow { 269 pub total_repos: i64, 270 pub total_blobs: i64, 271 pub estimated_blobs_size_on_disk: i64, 272 pub estimated_repos_size_on_disk: i64, 273 pub last_backup_at: Option<DateTime<Utc>>, 274 pub next_backup_due_at: Option<DateTime<Utc>>, 275}