Alternative ATProto PDS implementation

prototype actor_store preference.rs

Changed files
+108 -145
src
actor_store
+108 -145
src/actor_store/preference.rs
··· 1 1 //! Preference handling for actor store. 2 2 3 - use anyhow::{Context as _, Result, bail}; 4 - use diesel::prelude::*; 5 - use serde::{Deserialize, Serialize}; 6 - use serde_json::Value as JsonValue; 7 - use std::sync::Arc; 3 + use anyhow::{Result, bail}; 4 + use diesel::*; 5 + use rsky_lexicon::app::bsky::actor::RefPreferences; 6 + use rsky_pds::{ 7 + actor_store::preference::{pref_match_namespace, util::pref_in_scope}, 8 + auth_verifier::AuthScope, 9 + models::AccountPref, 10 + }; 8 11 9 12 use crate::actor_store::db::ActorDb; 10 13 11 - /// Constants for preference-related operations 12 - const FULL_ACCESS_ONLY_PREFS: &[&str] = &["app.bsky.actor.defs#personalDetailsPref"]; 13 - 14 - /// User preference with type information. 15 - #[derive(Debug, Clone, Serialize, Deserialize)] 16 - pub(crate) struct AccountPreference { 17 - /// Type of the preference. 18 - pub r#type: String, 19 - /// Preference data as JSON. 20 - pub value: JsonValue, 21 - } 22 - 23 14 /// Handler for preference operations with both read and write capabilities. 24 15 pub(crate) struct PreferenceHandler { 25 16 /// Database connection. ··· 30 21 31 22 impl PreferenceHandler { 32 23 /// Create a new preference handler. 33 - pub(crate) fn new(db: ActorDb, did: String) -> Self { 24 + pub(crate) fn new(did: String, db: ActorDb) -> Self { 34 25 Self { db, did } 35 26 } 36 27 37 28 /// Get preferences for a namespace. 38 - pub(crate) async fn get_preferences( 29 + pub async fn get_preferences( 39 30 &self, 40 - namespace: Option<&str>, 41 - scope: &str, 42 - ) -> Result<Vec<AccountPreference>> { 43 - use rsky_pds::schema::pds::account_pref::dsl::*; 31 + namespace: Option<String>, 32 + scope: AuthScope, 33 + ) -> Result<Vec<RefPreferences>> { 34 + use rsky_pds::schema::pds::account_pref::dsl as AccountPrefSchema; 44 35 45 36 let did = self.did.clone(); 46 - let namespace_clone = namespace.map(|ns| ns.to_string()); 47 - let scope_clone = scope.to_string(); 48 - 49 - let prefs_res = self 50 - .db 37 + self.db 51 38 .run(move |conn| { 52 - let prefs = account_pref 53 - .filter(did.eq(&did)) 54 - .order(id.asc()) 55 - .load::<rsky_pds::models::AccountPref>(conn) 56 - .context("Failed to fetch preferences")?; 57 - 58 - Ok::<Vec<rsky_pds::models::AccountPref>, diesel::result::Error>(prefs) 59 - }) 60 - .await?; 61 - 62 - // Filter preferences based on namespace and scope 63 - let filtered_prefs = prefs_res 64 - .into_iter() 65 - .filter(|pref| { 66 - namespace_clone 67 - .as_ref() 68 - .map_or(true, |ns| pref_match_namespace(ns, &pref.name)) 39 + let prefs_res = AccountPrefSchema::account_pref 40 + .filter(AccountPrefSchema::did.eq(&did)) 41 + .select(AccountPref::as_select()) 42 + .order(AccountPrefSchema::id.asc()) 43 + .load(conn)?; 44 + let account_prefs = prefs_res 45 + .into_iter() 46 + .filter(|pref| match &namespace { 47 + None => true, 48 + Some(namespace) => pref_match_namespace(namespace, &pref.name), 49 + }) 50 + .filter(|pref| pref_in_scope(scope.clone(), pref.name.clone())) 51 + .map(|pref| { 52 + let value_json_res = match pref.value_json { 53 + None => bail!("preferences json null for {}", pref.name), 54 + Some(value_json) => serde_json::from_str::<RefPreferences>(&value_json), 55 + }; 56 + match value_json_res { 57 + Err(error) => bail!(error.to_string()), 58 + Ok(value_json) => Ok(value_json), 59 + } 60 + }) 61 + .collect::<Result<Vec<RefPreferences>>>()?; 62 + Ok(account_prefs) 69 63 }) 70 - .filter(|pref| pref_in_scope(scope, &pref.name)) 71 - .map(|pref| -> Result<AccountPreference> { 72 - let value_json = match pref.value_json { 73 - Some(json) => serde_json::from_str(&json) 74 - .context(format!("Failed to parse preference JSON for {}", pref.name))?, 75 - None => bail!("Preference JSON is null for {}", pref.name), 76 - }; 77 - 78 - Ok(AccountPreference { 79 - r#type: pref.name, 80 - value: value_json, 81 - }) 82 - }) 83 - .collect::<Result<Vec<_>>>()?; 84 - 85 - Ok(filtered_prefs) 64 + .await 86 65 } 87 66 88 67 /// Put preferences for a namespace. 89 - pub(crate) async fn put_preferences( 68 + #[tracing::instrument(skip_all)] 69 + pub async fn put_preferences( 90 70 &self, 91 - values: Vec<AccountPreference>, 92 - namespace: &str, 93 - scope: &str, 71 + values: Vec<RefPreferences>, 72 + namespace: String, 73 + scope: AuthScope, 94 74 ) -> Result<()> { 95 - // Validate all preferences match the namespace 96 - if !values 97 - .iter() 98 - .all(|value| pref_match_namespace(namespace, &value.r#type)) 99 - { 100 - bail!("Some preferences are not in the {} namespace", namespace); 101 - } 102 - 103 - // Validate scope permissions 104 - let not_in_scope = values 105 - .iter() 106 - .filter(|val| !pref_in_scope(scope, &val.r#type)) 107 - .collect::<Vec<_>>(); 108 - 109 - if !not_in_scope.is_empty() { 110 - bail!("Do not have authorization to set preferences"); 111 - } 112 - 113 75 let did = self.did.clone(); 114 - let namespace_str = namespace.to_string(); 115 - let scope_str = scope.to_string(); 116 - 117 - // Convert preferences to serialized form 118 - let serialized_prefs = values 119 - .into_iter() 120 - .map(|pref| -> Result<(String, String)> { 121 - let json = serde_json::to_string(&pref.value) 122 - .context("Failed to serialize preference value")?; 123 - Ok((pref.r#type, json)) 124 - }) 125 - .collect::<Result<Vec<_>>>()?; 126 - 127 - // Execute transaction 128 76 self.db 129 - .transaction(move |conn| { 130 - use rsky_pds::schema::pds::account_pref::dsl::*; 131 - 132 - // Find all preferences in the namespace 133 - let namespace_pattern = format!("{}%", namespace_str); 134 - let all_prefs = account_pref 135 - .filter(did.eq(&did)) 136 - .filter(name.eq(&namespace_str).or(name.like(&namespace_pattern))) 137 - .load::<rsky_pds::models::AccountPref>(conn) 138 - .context("Failed to fetch preferences")?; 139 - 140 - // Filter to those in scope 141 - let all_pref_ids_in_namespace = all_prefs 77 + .run(move |conn| { 78 + match values 142 79 .iter() 143 - .filter(|pref| pref_match_namespace(&namespace_str, &pref.name)) 144 - .filter(|pref| pref_in_scope(&scope_str, &pref.name)) 145 - .map(|pref| pref.id) 146 - .collect::<Vec<i32>>(); 147 - 148 - // Delete existing preferences in namespace 149 - if !all_pref_ids_in_namespace.is_empty() { 150 - diesel::delete(account_pref) 151 - .filter(id.eq_any(all_pref_ids_in_namespace)) 152 - .execute(conn) 153 - .context("Failed to delete existing preferences")?; 154 - } 80 + .all(|value| pref_match_namespace(&namespace, &value.get_type())) 81 + { 82 + false => bail!("Some preferences are not in the {namespace} namespace"), 83 + true => { 84 + let not_in_scope = values 85 + .iter() 86 + .filter(|value| !pref_in_scope(scope.clone(), value.get_type())) 87 + .collect::<Vec<&RefPreferences>>(); 88 + if !not_in_scope.is_empty() { 89 + tracing::info!( 90 + "@LOG: PreferenceReader::put_preferences() debug scope: {:?}, values: {:?}", 91 + scope, 92 + values 93 + ); 94 + bail!("Do not have authorization to set preferences."); 95 + } 96 + // get all current prefs for user and prep new pref rows 97 + use rsky_pds::schema::pds::account_pref::dsl as AccountPrefSchema; 98 + let all_prefs = AccountPrefSchema::account_pref 99 + .filter(AccountPrefSchema::did.eq(&did)) 100 + .select(AccountPref::as_select()) 101 + .load(conn)?; 102 + let put_prefs = values 103 + .into_iter() 104 + .map(|value| { 105 + Ok(AccountPref { 106 + id: 0, 107 + name: value.get_type(), 108 + value_json: Some(serde_json::to_string(&value)?), 109 + }) 110 + }) 111 + .collect::<Result<Vec<AccountPref>>>()?; 155 112 156 - // Insert new preferences 157 - if !serialized_prefs.is_empty() { 158 - for (pref_type, pref_json) in serialized_prefs { 159 - diesel::insert_into(account_pref) 160 - .values(( 161 - did.eq(&did), 162 - name.eq(&pref_type), 163 - valueJson.eq(Some(&pref_json)), 164 - )) 165 - .execute(conn) 166 - .context("Failed to insert preference")?; 113 + let all_pref_ids_in_namespace = all_prefs 114 + .iter() 115 + .filter(|pref| pref_match_namespace(&namespace, &pref.name)) 116 + .filter(|pref| pref_in_scope(scope.clone(), pref.name.clone())) 117 + .map(|pref| pref.id) 118 + .collect::<Vec<i32>>(); 119 + // replace all prefs in given namespace 120 + if !all_pref_ids_in_namespace.is_empty() { 121 + delete(AccountPrefSchema::account_pref) 122 + .filter(AccountPrefSchema::id.eq_any(all_pref_ids_in_namespace)) 123 + .execute(conn)?; 124 + } 125 + if !put_prefs.is_empty() { 126 + insert_into(AccountPrefSchema::account_pref) 127 + .values( 128 + put_prefs 129 + .into_iter() 130 + .map(|pref| { 131 + ( 132 + AccountPrefSchema::did.eq(&did), 133 + AccountPrefSchema::name.eq(pref.name), 134 + AccountPrefSchema::valueJson.eq(pref.value_json), 135 + ) 136 + }) 137 + .collect::<Vec<_>>(), 138 + ) 139 + .execute(conn)?; 140 + } 141 + Ok(()) 167 142 } 168 143 } 169 - 170 - Ok(()) 171 144 }) 172 145 .await 173 146 } 174 147 } 175 - 176 - /// Check if a preference matches a namespace. 177 - pub(super) fn pref_match_namespace(namespace: &str, fullname: &str) -> bool { 178 - fullname == namespace || fullname.starts_with(&format!("{}.", namespace)) 179 - } 180 - 181 - /// Check if a preference is in scope. 182 - pub(super) fn pref_in_scope(scope: &str, pref_type: &str) -> bool { 183 - scope == "access" || !FULL_ACCESS_ONLY_PREFS.contains(&pref_type) 184 - }