this repo has no description
at main 146 lines 5.0 kB view raw
1/// Storage impls to persis OAuth sessions if you are not using the memory stores 2/// https://github.com/bluesky-social/statusphere-example-app/blob/main/src/auth/storage.ts 3use crate::cache::{ 4 ATRIUM_SESSION_STORE_PREFIX, ATRIUM_STATE_STORE_KEY, Cache, create_prefixed_key, 5}; 6use atrium_api::types::string::Did; 7use atrium_common::store::Store; 8use atrium_oauth::store::session::SessionStore; 9use atrium_oauth::store::state::StateStore; 10use bb8::Pool; 11use bb8_redis::RedisConnectionManager; 12use bb8_redis::redis::AsyncCommands; 13use serde::Serialize; 14use serde::de::DeserializeOwned; 15use std::fmt::Debug; 16use std::hash::Hash; 17use thiserror::Error; 18 19#[derive(Error, Debug)] 20pub enum AtriumStoreError { 21 #[error("No session found")] 22 NoSessionFound, 23 #[error("Redis error: {0}")] 24 RedisError(#[from] bb8_redis::redis::RedisError), 25 #[error("Serialization error: {0}")] 26 SerializationError(#[from] serde_json::Error), 27 #[error("Database error: {0}")] 28 DatabaseError(#[from] sqlx::Error), 29 #[error("Pool error: {0}")] 30 PoolError(#[from] bb8::RunError<bb8_redis::redis::RedisError>), 31} 32 33impl SessionStore for AtriumSessionStore {} 34 35pub struct AtriumSessionStore { 36 cache_pool: Pool<RedisConnectionManager>, 37} 38 39impl AtriumSessionStore { 40 pub fn new(pool: Pool<RedisConnectionManager>) -> Self { 41 Self { cache_pool: pool } 42 } 43} 44 45impl<K, V> Store<K, V> for AtriumSessionStore 46where 47 K: Debug + Eq + Hash + Send + Sync + 'static + From<Did> + AsRef<str>, 48 V: Debug + Clone + Send + Sync + 'static + Serialize + DeserializeOwned, 49{ 50 type Error = AtriumStoreError; 51 async fn get(&self, key: &K) -> Result<Option<V>, Self::Error> { 52 let key = create_prefixed_key(ATRIUM_SESSION_STORE_PREFIX, key.as_ref()); 53 let mut cache = self.cache_pool.get().await?; 54 let value: Option<String> = cache.get(key).await?; 55 match value { 56 Some(json_str) => { 57 let deserialized: V = serde_json::from_str(&json_str)?; 58 Ok(Some(deserialized)) 59 } 60 None => Ok(None), 61 } 62 } 63 64 async fn set(&self, key: K, value: V) -> Result<(), Self::Error> { 65 let cache_key = create_prefixed_key(ATRIUM_SESSION_STORE_PREFIX, key.as_ref()); 66 let json_value = serde_json::to_string(&value)?; 67 let mut cache = self.cache_pool.get().await?; 68 let _: () = cache.set(cache_key, json_value).await?; 69 Ok(()) 70 } 71 72 async fn del(&self, key: &K) -> Result<(), Self::Error> { 73 let cache_key = create_prefixed_key(ATRIUM_SESSION_STORE_PREFIX, key.as_ref()); 74 let mut cache = self.cache_pool.get().await?; 75 let _: usize = cache.del(cache_key).await?; 76 Ok(()) 77 } 78 79 async fn clear(&self) -> Result<(), Self::Error> { 80 let pattern = format!("{}*", ATRIUM_SESSION_STORE_PREFIX); 81 let mut cache = self.cache_pool.get().await?; 82 let keys: Vec<String> = cache.keys(pattern).await?; 83 if !keys.is_empty() { 84 let _: usize = cache.del(keys).await?; 85 } 86 Ok(()) 87 } 88} 89 90impl StateStore for AtriumStateStore {} 91 92pub struct AtriumStateStore { 93 cache_pool: Pool<RedisConnectionManager>, 94} 95 96impl AtriumStateStore { 97 pub fn new(pool: Pool<RedisConnectionManager>) -> Self { 98 Self { cache_pool: pool } 99 } 100} 101 102impl<K, V> Store<K, V> for AtriumStateStore 103where 104 K: Debug + Eq + Hash + Send + Sync + 'static + From<Did> + AsRef<str>, 105 V: Debug + Clone + Send + Sync + 'static + Serialize + DeserializeOwned, 106{ 107 type Error = AtriumStoreError; 108 async fn get(&self, key: &K) -> Result<Option<V>, Self::Error> { 109 let key = create_prefixed_key(ATRIUM_STATE_STORE_KEY, key.as_ref()); 110 let mut cache = self.cache_pool.get().await?; 111 let value: Option<String> = cache.get(key).await?; 112 match value { 113 Some(json_str) => { 114 let deserialized: V = serde_json::from_str(&json_str)?; 115 Ok(Some(deserialized)) 116 } 117 None => Ok(None), 118 } 119 } 120 121 async fn set(&self, key: K, value: V) -> Result<(), Self::Error> { 122 let cache_key = create_prefixed_key(ATRIUM_STATE_STORE_KEY, key.as_ref()); 123 let mut cache = Cache::new(self.cache_pool.get().await?); 124 let _ = cache 125 .write_to_cache_with_seconds(&cache_key, value, 3_6000) 126 .await?; 127 Ok(()) 128 } 129 130 async fn del(&self, key: &K) -> Result<(), Self::Error> { 131 let cache_key = create_prefixed_key(ATRIUM_STATE_STORE_KEY, key.as_ref()); 132 let mut cache = self.cache_pool.get().await?; 133 let _: usize = cache.del(cache_key).await?; 134 Ok(()) 135 } 136 137 async fn clear(&self) -> Result<(), Self::Error> { 138 let pattern = format!("{}*", ATRIUM_STATE_STORE_KEY); 139 let mut cache = self.cache_pool.get().await?; 140 let keys: Vec<String> = cache.keys(pattern).await?; 141 if !keys.is_empty() { 142 let _: usize = cache.del(keys).await?; 143 } 144 Ok(()) 145 } 146}