Alternative ATProto PDS implementation

prototype actor_store preference transactor

Changed files
+106 -4
src
actor_store
preference
+106 -4
src/actor_store/preference/transactor.rs
··· 1 - //! Transactor for preference operations. 1 + // src/actor_store/preference/transactor.rs 2 + use anyhow::{Context as _, Result, bail}; 3 + use sqlx::SqlitePool; 2 4 3 - 4 - use super::reader::PreferenceReader; 5 + use super::reader::{AccountPreference, PreferenceReader, pref_in_scope, pref_match_namespace}; 5 6 6 7 /// Transactor for preference operations. 7 - pub(crate) struct PreferenceTransactor { 8 + pub struct PreferenceTransactor { 8 9 /// Preference reader. 9 10 pub reader: PreferenceReader, 10 11 } 12 + 13 + impl PreferenceTransactor { 14 + /// Create a new preference transactor. 15 + pub fn new(db: SqlitePool, did: String) -> Self { 16 + Self { 17 + reader: PreferenceReader::new(db, did), 18 + } 19 + } 20 + 21 + /// Put preferences for a namespace. 22 + pub async fn put_preferences( 23 + &self, 24 + values: Vec<AccountPreference>, 25 + namespace: &str, 26 + scope: &str, 27 + ) -> Result<()> { 28 + // Validate all preferences match the namespace 29 + if !values 30 + .iter() 31 + .all(|value| pref_match_namespace(namespace, &value.r#type)) 32 + { 33 + bail!("Some preferences are not in the {} namespace", namespace); 34 + } 35 + 36 + // Validate scope permissions 37 + let not_in_scope = values 38 + .iter() 39 + .filter(|val| !pref_in_scope(scope, &val.r#type)) 40 + .collect::<Vec<_>>(); 41 + 42 + if !not_in_scope.is_empty() { 43 + bail!("Do not have authorization to set preferences"); 44 + } 45 + 46 + // Get current preferences 47 + let mut tx = self 48 + .reader 49 + .db 50 + .begin() 51 + .await 52 + .context("failed to begin transaction")?; 53 + 54 + // Find all preferences in the namespace 55 + let namespace_pattern = format!("{}%", namespace); 56 + let all_prefs = sqlx::query!( 57 + "SELECT id, name FROM account_pref WHERE name LIKE ? OR name = ?", 58 + namespace_pattern, 59 + namespace 60 + ) 61 + .fetch_all(&mut *tx) 62 + .await 63 + .context("failed to fetch preferences")?; 64 + 65 + // Filter to those in scope 66 + let all_pref_ids_in_namespace = all_prefs 67 + .iter() 68 + .filter(|pref| pref_match_namespace(namespace, &pref.name)) 69 + .filter(|pref| pref_in_scope(scope, &pref.name)) 70 + .map(|pref| pref.id) 71 + .collect::<Vec<i64>>(); 72 + 73 + // Delete existing preferences in namespace 74 + if !all_pref_ids_in_namespace.is_empty() { 75 + let placeholders = std::iter::repeat("?") 76 + .take(all_pref_ids_in_namespace.len()) 77 + .collect::<Vec<_>>() 78 + .join(","); 79 + 80 + let query = format!("DELETE FROM account_pref WHERE id IN ({})", placeholders); 81 + 82 + let mut query_builder = sqlx::query(&query); 83 + for id in &all_pref_ids_in_namespace { 84 + query_builder = query_builder.bind(id); 85 + } 86 + 87 + query_builder 88 + .execute(&mut *tx) 89 + .await 90 + .context("failed to delete preferences")?; 91 + } 92 + 93 + // Insert new preferences 94 + if !values.is_empty() { 95 + for pref in values { 96 + let value_json = serde_json::to_string(&pref.value)?; 97 + sqlx::query!( 98 + "INSERT INTO account_pref (name, valueJson) VALUES (?, ?)", 99 + pref.r#type, 100 + value_json 101 + ) 102 + .execute(&mut *tx) 103 + .await 104 + .context("failed to insert preference")?; 105 + } 106 + } 107 + 108 + tx.commit().await.context("failed to commit transaction")?; 109 + 110 + Ok(()) 111 + } 112 + }