forked from
oppi.li/at-advent
this repo has no description
1/// Storage impls to persis OAuth sessions if you are not using the memory stores
2/// https://github.com/bluesky-social/statusphere-example-app/blob/main/src/auth/storage.ts
3use crate::cache::{
4 ATRIUM_SESSION_STORE_PREFIX, ATRIUM_STATE_STORE_KEY, Cache, create_prefixed_key,
5};
6use atrium_api::types::string::Did;
7use atrium_common::store::Store;
8use atrium_oauth::store::session::SessionStore;
9use atrium_oauth::store::state::StateStore;
10use bb8::Pool;
11use bb8_redis::RedisConnectionManager;
12use bb8_redis::redis::AsyncCommands;
13use serde::Serialize;
14use serde::de::DeserializeOwned;
15use std::fmt::Debug;
16use std::hash::Hash;
17use thiserror::Error;
18
19#[derive(Error, Debug)]
20pub enum AtriumStoreError {
21 #[error("No session found")]
22 NoSessionFound,
23 #[error("Redis error: {0}")]
24 RedisError(#[from] bb8_redis::redis::RedisError),
25 #[error("Serialization error: {0}")]
26 SerializationError(#[from] serde_json::Error),
27 #[error("Database error: {0}")]
28 DatabaseError(#[from] sqlx::Error),
29 #[error("Pool error: {0}")]
30 PoolError(#[from] bb8::RunError<bb8_redis::redis::RedisError>),
31}
32
33impl SessionStore for AtriumSessionStore {}
34
35pub struct AtriumSessionStore {
36 cache_pool: Pool<RedisConnectionManager>,
37}
38
39impl AtriumSessionStore {
40 pub fn new(pool: Pool<RedisConnectionManager>) -> Self {
41 Self { cache_pool: pool }
42 }
43}
44
45impl<K, V> Store<K, V> for AtriumSessionStore
46where
47 K: Debug + Eq + Hash + Send + Sync + 'static + From<Did> + AsRef<str>,
48 V: Debug + Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
49{
50 type Error = AtriumStoreError;
51 async fn get(&self, key: &K) -> Result<Option<V>, Self::Error> {
52 let key = create_prefixed_key(ATRIUM_SESSION_STORE_PREFIX, key.as_ref());
53 let mut cache = self.cache_pool.get().await?;
54 let value: Option<String> = cache.get(key).await?;
55 match value {
56 Some(json_str) => {
57 let deserialized: V = serde_json::from_str(&json_str)?;
58 Ok(Some(deserialized))
59 }
60 None => Ok(None),
61 }
62 }
63
64 async fn set(&self, key: K, value: V) -> Result<(), Self::Error> {
65 let cache_key = create_prefixed_key(ATRIUM_SESSION_STORE_PREFIX, key.as_ref());
66 let json_value = serde_json::to_string(&value)?;
67 let mut cache = self.cache_pool.get().await?;
68 let _: () = cache.set(cache_key, json_value).await?;
69 Ok(())
70 }
71
72 async fn del(&self, key: &K) -> Result<(), Self::Error> {
73 let cache_key = create_prefixed_key(ATRIUM_SESSION_STORE_PREFIX, key.as_ref());
74 let mut cache = self.cache_pool.get().await?;
75 let _: usize = cache.del(cache_key).await?;
76 Ok(())
77 }
78
79 async fn clear(&self) -> Result<(), Self::Error> {
80 let pattern = format!("{}*", ATRIUM_SESSION_STORE_PREFIX);
81 let mut cache = self.cache_pool.get().await?;
82 let keys: Vec<String> = cache.keys(pattern).await?;
83 if !keys.is_empty() {
84 let _: usize = cache.del(keys).await?;
85 }
86 Ok(())
87 }
88}
89
90impl StateStore for AtriumStateStore {}
91
92pub struct AtriumStateStore {
93 cache_pool: Pool<RedisConnectionManager>,
94}
95
96impl AtriumStateStore {
97 pub fn new(pool: Pool<RedisConnectionManager>) -> Self {
98 Self { cache_pool: pool }
99 }
100}
101
102impl<K, V> Store<K, V> for AtriumStateStore
103where
104 K: Debug + Eq + Hash + Send + Sync + 'static + From<Did> + AsRef<str>,
105 V: Debug + Clone + Send + Sync + 'static + Serialize + DeserializeOwned,
106{
107 type Error = AtriumStoreError;
108 async fn get(&self, key: &K) -> Result<Option<V>, Self::Error> {
109 let key = create_prefixed_key(ATRIUM_STATE_STORE_KEY, key.as_ref());
110 let mut cache = self.cache_pool.get().await?;
111 let value: Option<String> = cache.get(key).await?;
112 match value {
113 Some(json_str) => {
114 let deserialized: V = serde_json::from_str(&json_str)?;
115 Ok(Some(deserialized))
116 }
117 None => Ok(None),
118 }
119 }
120
121 async fn set(&self, key: K, value: V) -> Result<(), Self::Error> {
122 let cache_key = create_prefixed_key(ATRIUM_STATE_STORE_KEY, key.as_ref());
123 let mut cache = Cache::new(self.cache_pool.get().await?);
124 let _ = cache
125 .write_to_cache_with_seconds(&cache_key, value, 3_6000)
126 .await?;
127 Ok(())
128 }
129
130 async fn del(&self, key: &K) -> Result<(), Self::Error> {
131 let cache_key = create_prefixed_key(ATRIUM_STATE_STORE_KEY, key.as_ref());
132 let mut cache = self.cache_pool.get().await?;
133 let _: usize = cache.del(cache_key).await?;
134 Ok(())
135 }
136
137 async fn clear(&self) -> Result<(), Self::Error> {
138 let pattern = format!("{}*", ATRIUM_STATE_STORE_KEY);
139 let mut cache = self.cache_pool.get().await?;
140 let keys: Vec<String> = cache.keys(pattern).await?;
141 if !keys.is_empty() {
142 let _: usize = cache.del(keys).await?;
143 }
144 Ok(())
145 }
146}