Our Personal Data Server from scratch!
at main 166 lines 4.9 kB view raw
1pub use tranquil_infra::{Cache, CacheError, DistributedRateLimiter}; 2 3use async_trait::async_trait; 4use base64::{Engine as _, engine::general_purpose::STANDARD as BASE64}; 5use std::sync::Arc; 6use std::time::Duration; 7 8#[derive(Clone)] 9pub struct ValkeyCache { 10 conn: redis::aio::ConnectionManager, 11} 12 13impl ValkeyCache { 14 pub async fn new(url: &str) -> Result<Self, CacheError> { 15 let client = redis::Client::open(url).map_err(|e| CacheError::Connection(e.to_string()))?; 16 let manager = client 17 .get_connection_manager() 18 .await 19 .map_err(|e| CacheError::Connection(e.to_string()))?; 20 Ok(Self { conn: manager }) 21 } 22 23 pub fn connection(&self) -> redis::aio::ConnectionManager { 24 self.conn.clone() 25 } 26} 27 28#[async_trait] 29impl Cache for ValkeyCache { 30 async fn get(&self, key: &str) -> Option<String> { 31 let mut conn = self.conn.clone(); 32 redis::cmd("GET") 33 .arg(key) 34 .query_async::<Option<String>>(&mut conn) 35 .await 36 .ok() 37 .flatten() 38 } 39 40 async fn set(&self, key: &str, value: &str, ttl: Duration) -> Result<(), CacheError> { 41 let mut conn = self.conn.clone(); 42 redis::cmd("SET") 43 .arg(key) 44 .arg(value) 45 .arg("EX") 46 .arg(ttl.as_secs() as i64) 47 .query_async::<()>(&mut conn) 48 .await 49 .map_err(|e| CacheError::Connection(e.to_string())) 50 } 51 52 async fn delete(&self, key: &str) -> Result<(), CacheError> { 53 let mut conn = self.conn.clone(); 54 redis::cmd("DEL") 55 .arg(key) 56 .query_async::<()>(&mut conn) 57 .await 58 .map_err(|e| CacheError::Connection(e.to_string())) 59 } 60 61 async fn get_bytes(&self, key: &str) -> Option<Vec<u8>> { 62 self.get(key).await.and_then(|s| BASE64.decode(&s).ok()) 63 } 64 65 async fn set_bytes(&self, key: &str, value: &[u8], ttl: Duration) -> Result<(), CacheError> { 66 let encoded = BASE64.encode(value); 67 self.set(key, &encoded, ttl).await 68 } 69} 70 71pub struct NoOpCache; 72 73#[async_trait] 74impl Cache for NoOpCache { 75 async fn get(&self, _key: &str) -> Option<String> { 76 None 77 } 78 79 async fn set(&self, _key: &str, _value: &str, _ttl: Duration) -> Result<(), CacheError> { 80 Ok(()) 81 } 82 83 async fn delete(&self, _key: &str) -> Result<(), CacheError> { 84 Ok(()) 85 } 86 87 async fn get_bytes(&self, _key: &str) -> Option<Vec<u8>> { 88 None 89 } 90 91 async fn set_bytes(&self, _key: &str, _value: &[u8], _ttl: Duration) -> Result<(), CacheError> { 92 Ok(()) 93 } 94 95 fn is_available(&self) -> bool { 96 false 97 } 98} 99 100#[derive(Clone)] 101pub struct RedisRateLimiter { 102 conn: redis::aio::ConnectionManager, 103} 104 105impl RedisRateLimiter { 106 pub fn new(conn: redis::aio::ConnectionManager) -> Self { 107 Self { conn } 108 } 109} 110 111#[async_trait] 112impl DistributedRateLimiter for RedisRateLimiter { 113 async fn check_rate_limit(&self, key: &str, limit: u32, window_ms: u64) -> bool { 114 let mut conn = self.conn.clone(); 115 let full_key = format!("rl:{}", key); 116 let window_secs = window_ms.div_ceil(1000).max(1) as i64; 117 let count: Result<i64, _> = redis::cmd("INCR") 118 .arg(&full_key) 119 .query_async(&mut conn) 120 .await; 121 let count = match count { 122 Ok(c) => c, 123 Err(e) => { 124 tracing::warn!("Redis rate limit INCR failed: {}. Allowing request.", e); 125 return true; 126 } 127 }; 128 if count == 1 { 129 let _: Result<bool, redis::RedisError> = redis::cmd("EXPIRE") 130 .arg(&full_key) 131 .arg(window_secs) 132 .query_async(&mut conn) 133 .await; 134 } 135 count <= limit as i64 136 } 137} 138 139pub struct NoOpRateLimiter; 140 141#[async_trait] 142impl DistributedRateLimiter for NoOpRateLimiter { 143 async fn check_rate_limit(&self, _key: &str, _limit: u32, _window_ms: u64) -> bool { 144 true 145 } 146} 147 148pub async fn create_cache() -> (Arc<dyn Cache>, Arc<dyn DistributedRateLimiter>) { 149 match std::env::var("VALKEY_URL") { 150 Ok(url) => match ValkeyCache::new(&url).await { 151 Ok(cache) => { 152 tracing::info!("Connected to Valkey cache at {}", url); 153 let rate_limiter = Arc::new(RedisRateLimiter::new(cache.connection())); 154 (Arc::new(cache), rate_limiter) 155 } 156 Err(e) => { 157 tracing::warn!("Failed to connect to Valkey: {}. Running without cache.", e); 158 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter)) 159 } 160 }, 161 Err(_) => { 162 tracing::info!("VALKEY_URL not set. Running without cache."); 163 (Arc::new(NoOpCache), Arc::new(NoOpRateLimiter)) 164 } 165 } 166}