Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
pdsmoover.com
pds
atproto
migrations
moo
cow
1use crate::db::models::{AccountModel, PdsHostModel};
2use anyhow::Result;
3use chrono::{DateTime, Duration, Utc};
4use serde::{Deserialize, Serialize};
5use sqlx::migrate::Migrator;
6use sqlx::{FromRow, PgPool, postgres::PgPoolOptions};
7use std::primitive::bool;
8
9static MIGRATOR: Migrator = sqlx::migrate!(); // defaults to "./
10
11pub mod models;
12#[derive(Clone)]
13pub struct Db {
14 pool: PgPool,
15}
16
17impl Db {
18 pub fn new(pool: &PgPool) -> Self {
19 Db { pool: pool.clone() }
20 }
21
22 pub async fn connect(database_url: &str) -> Result<Self> {
23 let pool = PgPoolOptions::new()
24 .max_connections(5)
25 .connect(database_url)
26 .await?;
27 Ok(Self { pool })
28 }
29
30 pub fn pool(&self) -> &PgPool {
31 &self.pool
32 }
33
34 pub async fn apply_migrations(&self) -> Result<()> {
35 // The path is relative to this crate's Cargo.toml
36 sqlx::migrate!().run(&self.pool).await?;
37 Ok(())
38 }
39
40 pub async fn sign_up_new_account(&self, did: String, pds_host: String) -> Result<AccountModel> {
41 Ok(sqlx::query_as::<_, AccountModel>(
42 "INSERT INTO accounts (did, pds_host) VALUES ($1, $2) RETURNING *",
43 )
44 .bind(did)
45 .bind(pds_host)
46 .fetch_one(&self.pool)
47 .await?)
48 }
49
50 pub async fn update_account_pds_host(&self, did: &str, pds_host: &str) -> Result<u64> {
51 let result =
52 sqlx::query("UPDATE accounts SET pds_host = $2 WHERE did = $1 AND pds_host <> $2")
53 .bind(did)
54 .bind(pds_host)
55 .execute(&self.pool)
56 .await?;
57 Ok(result.rows_affected())
58 }
59
60 pub async fn delete_blobs_by_did(&self, did: &str) -> Result<u64> {
61 let result = sqlx::query("DELETE FROM blobs WHERE account_did = $1")
62 .bind(did)
63 .execute(&self.pool)
64 .await?;
65 Ok(result.rows_affected())
66 }
67
68 pub async fn delete_missing_blobs_by_did(&self, did: &str) -> Result<u64> {
69 // Table referenced in get_repo_status; delete any rows for this DID if present
70 let result = sqlx::query("DELETE FROM missing_blobs WHERE did = $1")
71 .bind(did)
72 .execute(&self.pool)
73 .await?;
74 Ok(result.rows_affected())
75 }
76
77 pub async fn delete_account_by_did(&self, did: &str) -> Result<u64> {
78 let result = sqlx::query("DELETE FROM accounts WHERE did = $1")
79 .bind(did)
80 .execute(&self.pool)
81 .await?;
82 Ok(result.rows_affected())
83 }
84
85 /// Returns account and aggregate repo backup information for the given DID.
86 ///
87 /// This is intended to provide the data needed by the
88 /// com.pdsmoover.backup.getRepoStatus endpoint handler.
89 pub async fn get_repo_status(&self, did: &str) -> Result<Option<RepoStatusRow>> {
90 let row = sqlx::query_as::<_, RepoStatusRow>(
91 r#"
92 SELECT
93 a.active AS active,
94 a.created_at AS created_at,
95 a.did AS did,
96 a.repo_rev AS repo_rev,
97 a.last_backup AS last_backup,
98 a.pds_host AS pds_host,
99 SUM(b.size)::BIGINT AS estimated_backup_size,
100 COUNT(DISTINCT b.id) AS blob_count,
101 COUNT(DISTINCT mb.id) AS missing_blob_count
102 FROM accounts a
103 LEFT JOIN blobs b ON b.account_did = a.did
104 LEFT JOIN missing_blobs mb ON mb.did = a.did
105 WHERE a.did = $1
106 GROUP BY a.active, a.created_at, a.did, a.repo_rev, a.last_backup, a.pds_host
107 "#,
108 )
109 .bind(did)
110 .fetch_optional(&self.pool)
111 .await?;
112
113 Ok(row)
114 }
115
116 pub async fn is_user_already_registered(&self, did: &str) -> Result<bool> {
117 let active = sqlx::query_scalar::<_, bool>(
118 "SELECT active FROM accounts WHERE did = $1 AND active = TRUE",
119 )
120 .bind(did)
121 .fetch_optional(&self.pool)
122 .await?;
123
124 match active {
125 None => Ok(false),
126 Some(active) => Ok(active),
127 }
128 }
129
130 pub async fn sign_up_new_pds(
131 &self,
132 pds_host: &str,
133 admin_did: Option<&str>,
134 ) -> Result<PdsHostModel> {
135 Ok(sqlx::query_as::<_, PdsHostModel>(
136 "INSERT INTO pds_hosts (pds_host, admin_did)
137 VALUES ($1, $2)
138 ON CONFLICT (pds_host) DO UPDATE
139 SET active = TRUE,
140 admin_did = COALESCE(EXCLUDED.admin_did, pds_hosts.admin_did)
141 RETURNING *",
142 )
143 .bind(pds_host)
144 .bind(admin_did)
145 .fetch_one(&self.pool)
146 .await?)
147 }
148
149 pub async fn is_pds_active(&self, pds_host: &str) -> Result<bool> {
150 let active = sqlx::query_scalar::<_, bool>(
151 "SELECT active FROM pds_hosts WHERE pds_host = $1 AND active = TRUE",
152 )
153 .bind(pds_host)
154 .fetch_optional(&self.pool)
155 .await?;
156
157 Ok(active.unwrap_or(false))
158 }
159 pub async fn describe_server(&self) -> Result<DescribeServerRow> {
160 // Aggregate server-wide stats.
161 // Note: next_backup_due_at scheduling logic not defined; return NULL.
162 let row = sqlx::query_as::<_, DescribeServerRow>(
163 r#"
164 SELECT
165 -- Total number of active repos/accounts
166 (SELECT COUNT(*)::BIGINT FROM accounts a WHERE a.active = TRUE) AS total_repos,
167 -- Total number of blobs tracked
168 COALESCE((SELECT COUNT(*)::BIGINT FROM blobs), 0) AS total_blobs,
169 -- Estimated blobs size on disk (sum of all blob sizes)
170 COALESCE((SELECT SUM(size)::BIGINT FROM blobs), 0) AS estimated_blobs_size_on_disk,
171 -- Estimated repos size (sum of sizes for repo/prefs types)
172 COALESCE((SELECT SUM(size)::BIGINT FROM blobs WHERE type IN ('repo','prefs')), 0) AS estimated_repos_size_on_disk,
173 -- Last backup time across all accounts
174 (SELECT MAX(started_at) FROM network_backup_runs) AS last_backup_at,
175 -- Placeholder for next backup due at (unknown scheduling)
176 NULL::TIMESTAMPTZ AS next_backup_due_at
177 "#,
178 )
179 .fetch_one(&self.pool)
180 .await?;
181
182 Ok(row)
183 }
184
185 /// Attempt to mark the start of a manual backup for the given DID.
186 /// Returns whether the account exists, whether it is too soon since the last start,
187 /// or whether we have successfully marked the start time.
188 pub async fn try_start_manual_backup(
189 &self,
190 did: &str,
191 min_interval: Duration,
192 ) -> Result<ManualBackupStartOutcome> {
193 // Define a wrapper struct for DateTime<Utc>
194 #[derive(FromRow)]
195 struct LastBackupRow {
196 last_backup_started: Option<DateTime<Utc>>,
197 }
198
199 // Fetch last_backup_started for this account
200 let rec = sqlx::query_as::<_, LastBackupRow>(
201 r#"SELECT last_backup_started FROM accounts WHERE did = $1"#,
202 )
203 .bind(did)
204 .fetch_optional(&self.pool)
205 .await?;
206
207 let Some(row) = rec else {
208 return Ok(ManualBackupStartOutcome::NotFound);
209 };
210
211 // If a backup was started recently, don't start another
212 if let Some(last_started) = row.last_backup_started {
213 let threshold = chrono::Utc::now() - min_interval;
214 if last_started > threshold {
215 return Ok(ManualBackupStartOutcome::TooSoon {
216 last_backup_started: last_started,
217 });
218 }
219 }
220
221 // Update last_backup_started to NOW() to rate-limit subsequent calls
222 let res = sqlx::query!(
223 r#"UPDATE accounts SET last_backup_started = NOW() WHERE did = $1"#,
224 did
225 )
226 .execute(&self.pool)
227 .await?;
228
229 if res.rows_affected() == 0 {
230 // Extremely unlikely race: row deleted after read
231 return Ok(ManualBackupStartOutcome::NotFound);
232 }
233
234 Ok(ManualBackupStartOutcome::Started)
235 }
236}
237
238#[derive(Debug, Clone)]
239pub enum ManualBackupStartOutcome {
240 NotFound,
241 TooSoon { last_backup_started: DateTime<Utc> },
242 Started,
243}
244
245#[derive(Debug, Serialize, Deserialize, FromRow)]
246pub struct DemoItem {
247 pub id: i64,
248 pub text: String,
249 pub created_at: DateTime<Utc>,
250}
251
252/// Row returned by get_repo_status, containing account details and
253/// aggregated blob info for the repo backup.
254#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
255pub struct RepoStatusRow {
256 pub active: bool,
257 pub created_at: DateTime<Utc>,
258 pub did: String,
259 pub repo_rev: Option<String>,
260 pub last_backup: Option<DateTime<Utc>>,
261 pub pds_host: String,
262 pub estimated_backup_size: Option<i64>,
263 pub blob_count: Option<i64>,
264 pub missing_blob_count: Option<i64>,
265}
266
267#[derive(Debug, Serialize, Deserialize, FromRow, Clone)]
268pub struct DescribeServerRow {
269 pub total_repos: i64,
270 pub total_blobs: i64,
271 pub estimated_blobs_size_on_disk: i64,
272 pub estimated_repos_size_on_disk: i64,
273 pub last_backup_at: Option<DateTime<Utc>>,
274 pub next_backup_due_at: Option<DateTime<Utc>>,
275}