forked from
baileytownsend.dev/pds-moover
Client side atproto account migrator in your web browser, along with services for backups and adversarial migrations.
1pub mod account_backup;
2pub mod pds_backup;
3pub mod remove_repo;
4pub mod scheduled_back_up_start;
5pub mod start_all_backup;
6pub mod upload_blob;
7pub mod verify_backups;
8
9use crate::db::models;
10use crate::db::models::BlobModel;
11use apalis::prelude::*;
12use log::info;
13use serde::{Deserialize, Serialize};
14use serde_json::{self};
15use sqlx::{Pool, Postgres, query};
16use std::collections::HashSet;
17use std::fmt;
18use std::sync::Arc;
19
20#[derive(Debug)]
21pub enum JobError {
22 SomeError(&'static str),
23}
24
25impl std::fmt::Display for JobError {
26 fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result {
27 write!(f, "{self:?}")
28 }
29}
30
31// Create a wrapper struct that implements std::error::Error
32#[derive(Debug)]
33struct AnyhowErrorWrapper(anyhow::Error);
34
35impl fmt::Display for AnyhowErrorWrapper {
36 fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
37 write!(f, "{}", self.0)
38 }
39}
40
41impl std::error::Error for AnyhowErrorWrapper {}
42
43/// Generic helper to manually enqueue any Apalis job by calling the SQL function directly.
44///
45/// - job_namespace: fully-qualified job type name as stored by Apalis (e.g., "apalis::Email").
46/// - payload: any struct that implements Serialize + Deserialize; it will be sent as JSON.
47pub async fn push_job_json<T>(
48 pool: &Pool<Postgres>,
49 job_namespace: &str,
50 payload: &T,
51) -> Result<(), Error>
52where
53 T: Serialize + for<'de> Deserialize<'de>,
54{
55 // Serialize payload to JSON and send to Postgres as json
56 let json_str =
57 serde_json::to_string(payload).map_err(|e| Error::Failed(Arc::new(Box::new(e))))?;
58
59 query("select apalis.push_job($1, $2::json)")
60 .bind(job_namespace)
61 .bind(json_str)
62 .execute(pool)
63 .await
64 .map(|_| ())
65 .map_err(|e| Error::Failed(Arc::new(Box::new(e))))
66}
67
68/// Given a list of CIDs, returns those that are NOT already present in the blobs table
69/// with blob type = 'blob' and matches the user's did in the case of duplicate blobs for each user
70pub async fn filter_missing_blob_cids(
71 pool: &Pool<Postgres>,
72 cids: &Vec<String>,
73 users_did: &String,
74) -> anyhow::Result<Vec<String>> {
75 if cids.is_empty() {
76 return Ok(Vec::new());
77 }
78
79 // Fetch the subset of provided CIDs that already exist as type 'blob'
80 let existing: Vec<String> = sqlx::query_scalar(
81 r#"SELECT cid_or_rev FROM blobs WHERE type = $1 AND cid_or_rev = ANY($2) AND account_did = $3"#,
82 )
83 .bind(crate::db::models::BlobType::Blob)
84 .bind(&cids)
85 .bind(users_did)
86 .fetch_all(pool)
87 .await?;
88
89 let existing_set: HashSet<&str> = existing.iter().map(|s| s.as_str()).collect();
90 let missing: Vec<String> = cids
91 .iter()
92 .filter(|cid| !existing_set.contains(cid.as_str()))
93 .cloned()
94 .collect();
95
96 Ok(missing)
97}
98
99pub async fn record_new_blob(
100 pool: &Pool<Postgres>,
101 did: String,
102 cid_or_rev: String,
103 size: i64,
104 blob_type: models::BlobType,
105) -> anyhow::Result<models::BlobModel> {
106 match blob_type {
107 //On repo we need to upsert on did
108 models::BlobType::Repo => {
109 // First try to update an existing 'repo' blob row for this DID.
110 if let Some(updated) = sqlx::query_as::<_, BlobModel>(
111 r#"
112 UPDATE blobs
113 SET size = $2,
114 type = $3,
115 cid_or_rev = $4
116 WHERE account_did = $1 AND type = $3
117 RETURNING id, created_at, account_did, size, type, cid_or_rev
118 "#,
119 )
120 .bind(&did)
121 .bind(size)
122 .bind(&blob_type)
123 .bind(&cid_or_rev)
124 .fetch_optional(pool)
125 .await?
126 {
127 Ok(updated)
128 } else {
129 // If no row was updated, insert a new one for this DID and repo type.
130 Ok(sqlx::query_as::<_, BlobModel>(
131 r#"
132 INSERT INTO blobs (account_did, size, type, cid_or_rev)
133 VALUES ($1, $2, $3, $4)
134 RETURNING id, created_at, account_did, size, type, cid_or_rev
135 "#,
136 )
137 .bind(did)
138 .bind(size)
139 .bind(blob_type)
140 .bind(cid_or_rev)
141 .fetch_one(pool)
142 .await?)
143 }
144 }
145 //on blob we upsert on cid (shouldnt happen ideally)
146 models::BlobType::Blob | _ => Ok(sqlx::query_as::<_, BlobModel>(
147 r#"
148 INSERT INTO blobs (account_did, size, type, cid_or_rev)
149 VALUES ($1, $2, $3, $4)
150 ON CONFLICT (cid_or_rev) DO UPDATE
151 SET account_did = EXCLUDED.account_did,
152 size = EXCLUDED.size,
153 type = EXCLUDED.type,
154 cid_or_rev = EXCLUDED.cid_or_rev
155 RETURNING id, created_at, account_did, size, type, cid_or_rev
156 "#,
157 )
158 .bind(did)
159 .bind(size)
160 .bind(blob_type)
161 .bind(cid_or_rev)
162 .fetch_one(pool)
163 .await?),
164 }
165}
166
167/// Look up the user's account by DID and return their repo_rev, if present.
168pub async fn get_repo_rev_by_did(
169 pool: &Pool<Postgres>,
170 did: &str,
171) -> anyhow::Result<Option<String>> {
172 // repo_rev is nullable; also the account row may not exist.
173 // Using fetch_optional yields Option<Option<String>> which we flatten to a single Option<String>.
174 let result: Option<Option<String>> =
175 sqlx::query_scalar(r#"SELECT repo_rev FROM accounts WHERE did = $1"#)
176 .bind(did)
177 .fetch_optional(pool)
178 .await?;
179
180 Ok(result.flatten())
181}
182
183/// Update the repo_rev for a given account identified by DID.
184/// Returns true if a row was updated.
185pub async fn update_repo_rev_by_did(
186 pool: &Pool<Postgres>,
187 did: &str,
188 repo_rev: &str,
189) -> anyhow::Result<bool> {
190 let result =
191 sqlx::query(r#"UPDATE accounts SET repo_rev = $2, last_backup = NOW() WHERE did = $1"#)
192 .bind(did)
193 .bind(repo_rev)
194 .execute(pool)
195 .await?;
196 Ok(result.rows_affected() > 0)
197}
198
199/// Update last_backup to the current timestamp for a given account identified by DID.
200/// Returns true if a row was updated.
201pub async fn update_last_backup_now_by_did(
202 pool: &Pool<Postgres>,
203 did: &str,
204) -> anyhow::Result<bool> {
205 let result = sqlx::query(r#"UPDATE accounts SET last_backup = NOW() WHERE did = $1"#)
206 .bind(did)
207 .execute(pool)
208 .await?;
209 Ok(result.rows_affected() > 0)
210}