forked from
tranquil.farm/tranquil-pds
Our Personal Data Server from scratch!
1pub use tranquil_infra::{Cache, CacheError, DistributedRateLimiter};
2
3use async_trait::async_trait;
4use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64};
5use std::sync::Arc;
6use std::time::Duration;
7
8#[derive(Clone)]
9pub struct ValkeyCache {
10 conn: redis::aio::ConnectionManager,
11}
12
13impl ValkeyCache {
14 pub async fn new(url: &str) -> Result<Self, CacheError> {
15 let client = redis::Client::open(url).map_err(|e| CacheError::Connection(e.to_string()))?;
16 let manager = client
17 .get_connection_manager()
18 .await
19 .map_err(|e| CacheError::Connection(e.to_string()))?;
20 Ok(Self { conn: manager })
21 }
22
23 pub fn connection(&self) -> redis::aio::ConnectionManager {
24 self.conn.clone()
25 }
26}
27
28#[async_trait]
29impl Cache for ValkeyCache {
30 async fn get(&self, key: &str) -> Option<String> {
31 let mut conn = self.conn.clone();
32 redis::cmd("GET")
33 .arg(key)
34 .query_async::<Option<String>>(&mut conn)
35 .await
36 .ok()
37 .flatten()
38 }
39
40 async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> {
41 let mut conn = self.conn.clone();
42 redis::cmd("SET")
43 .arg(key)
44 .arg(value)
45 .arg("EX")
46 .arg(ttl.as_secs() as i64)
47 .query_async::<()>(&mut conn)
48 .await
49 .map_err(|e| CacheError::Connection(e.to_string()))
50 }
51
52 async fn delete(&self, key: &str) -> Result<(), CacheError> {
53 let mut conn = self.conn.clone();
54 redis::cmd("DEL")
55 .arg(key)
56 .query_async::<()>(&mut conn)
57 .await
58 .map_err(|e| CacheError::Connection(e.to_string()))
59 }
60
61 async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> {
62 self.get(key).await.and_then(|s| BASE64.decode(&s).ok())
63 }
64
65 async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> {
66 let encoded = BASE64.encode(value);
67 self.set(key, &encoded, ttl).await
68 }
69}
70
71pub struct NoOpCache;
72
73#[async_trait]
74impl Cache for NoOpCache {
75 async fn get(&self, _key: &str) -> Option<String> {
76 None
77 }
78
79 async fn set(&self, _key: &str, _value: &str, _ttl: Duration) -> Result<(), CacheError> {
80 Ok(())
81 }
82
83 async fn delete(&self, _key: &str) -> Result<(), CacheError> {
84 Ok(())
85 }
86
87 async fn get_bytes(&self, _key: &str) -> Option<Vec<u8>> {
88 None
89 }
90
91 async fn set_bytes(&self, _key: &str, _value: &[u8], _ttl: Duration) -> Result<(), CacheError> {
92 Ok(())
93 }
94
95 fn is_available(&self) -> bool {
96 false
97 }
98}
99
100#[derive(Clone)]
101pub struct RedisRateLimiter {
102 conn: redis::aio::ConnectionManager,
103}
104
105impl RedisRateLimiter {
106 pub fn new(conn: redis::aio::ConnectionManager) -> Self {
107 Self { conn }
108 }
109}
110
111#[async_trait]
112impl DistributedRateLimiter for RedisRateLimiter {
113 async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool {
114 let mut conn = self.conn.clone();
115 let full_key = format!("rl:{}", key);
116 let window_secs = window_ms.div_ceil(1000).max(1) as i64;
117 let count: Result<i64, _> = redis::cmd("INCR")
118 .arg(&full_key)
119 .query_async(&mut conn)
120 .await;
121 let count = match count {
122 Ok(c) => c,
123 Err(e) => {
124 tracing::warn!("Redis rate limit INCR failed: {}. Allowing request.", e);
125 return true;
126 }
127 };
128 if count == 1 {
129 let _: Result<bool, redis::RedisError> = redis::cmd("EXPIRE")
130 .arg(&full_key)
131 .arg(window_secs)
132 .query_async(&mut conn)
133 .await;
134 }
135 count <= limit as i64
136 }
137}
138
139pub struct NoOpRateLimiter;
140
141#[async_trait]
142impl DistributedRateLimiter for NoOpRateLimiter {
143 async fn check_rate_limit(&self, _key: &str, _limit: u32, _window_ms: u64) -> bool {
144 true
145 }
146}
147
148pub async fn create_cache() -> (Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>) {
149 match std::env::var("VALKEY_URL") {
150 Ok(url) => match ValkeyCache::new(&url).await {
151 Ok(cache) => {
152 tracing::info!("Connected to Valkey cache at {}", url);
153 let rate_limiter = Arc::new(RedisRateLimiter::new(cache.connection()));
154 (Arc::new(cache), rate_limiter)
155 }
156 Err(e) => {
157 tracing::warn!("Failed to connect to Valkey: {}. Running without cache.", e);
158 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter))
159 }
160 },
161 Err(_) => {
162 tracing::info!("VALKEY_URL not set. Running without cache.");
163 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter))
164 }
165 }
166}