Highly ambitious ATProtocol AppView service and sdks
at main 335 lines 11 kB view raw
1//! Cache module providing flexible caching with Redis and in-memory backends. 2//! 3//! This module provides a unified caching interface with automatic fallback 4//! from Redis to in-memory caching when Redis is unavailable. 5 6mod in_memory; 7mod redis; 8 9use anyhow::Result; 10use async_trait::async_trait; 11use serde::{Deserialize, Serialize}; 12use std::collections::HashSet; 13use tracing::{info, warn}; 14 15pub use in_memory::InMemoryCache; 16pub use redis::RedisCache; 17 18/// Generic cache trait implemented by all cache backends. 19/// 20/// Provides async methods for get, set, delete, and bulk operations. 21#[async_trait] 22pub trait Cache: Send + Sync { 23 /// Get a value from cache, returning None if not found or expired. 24 async fn get<T>(&mut self, key: &str) -> Result<Option<T>> 25 where 26 T: for<'de> Deserialize<'de> + Send; 27 28 /// Set a value in cache with optional TTL in seconds. 29 async fn set<T>(&mut self, key: &str, value: &T, ttl_seconds: Option<u64>) -> Result<()> 30 where 31 T: Serialize + Send + Sync; 32 33 /// Delete a key from cache. 34 async fn delete(&mut self, key: &str) -> Result<()>; 35 36 /// Set multiple key-value pairs efficiently (uses pipelining for Redis). 37 async fn set_multiple<T>(&mut self, items: Vec<(&str, &T, Option<u64>)>) -> Result<()> 38 where 39 T: Serialize + Send + Sync; 40 41 /// Test cache connection/health. 42 async fn ping(&mut self) -> Result<bool>; 43 44 /// Get cache info/statistics for monitoring. 45 async fn get_info(&mut self) -> Result<String>; 46} 47 48/// Cache backend implementation enum to avoid trait object overhead. 49pub enum CacheBackendImpl { 50 InMemory(InMemoryCache), 51 Redis(RedisCache), 52} 53 54impl CacheBackendImpl { 55 pub async fn get<T>(&mut self, key: &str) -> Result<Option<T>> 56 where 57 T: for<'de> Deserialize<'de> + Send, 58 { 59 match self { 60 CacheBackendImpl::InMemory(cache) => cache.get(key).await, 61 CacheBackendImpl::Redis(cache) => cache.get(key).await, 62 } 63 } 64 65 pub async fn set<T>(&mut self, key: &str, value: &T, ttl_seconds: Option<u64>) -> Result<()> 66 where 67 T: Serialize + Send + Sync, 68 { 69 match self { 70 CacheBackendImpl::InMemory(cache) => cache.set(key, value, ttl_seconds).await, 71 CacheBackendImpl::Redis(cache) => cache.set(key, value, ttl_seconds).await, 72 } 73 } 74 75 pub async fn delete(&mut self, key: &str) -> Result<()> { 76 match self { 77 CacheBackendImpl::InMemory(cache) => cache.delete(key).await, 78 CacheBackendImpl::Redis(cache) => cache.delete(key).await, 79 } 80 } 81 82 pub async fn set_multiple<T>(&mut self, items: Vec<(&str, &T, Option<u64>)>) -> Result<()> 83 where 84 T: Serialize + Send + Sync, 85 { 86 match self { 87 CacheBackendImpl::InMemory(cache) => cache.set_multiple(items).await, 88 CacheBackendImpl::Redis(cache) => cache.set_multiple(items).await, 89 } 90 } 91 92 pub async fn ping(&mut self) -> Result<bool> { 93 match self { 94 CacheBackendImpl::InMemory(cache) => cache.ping().await, 95 CacheBackendImpl::Redis(cache) => cache.ping().await, 96 } 97 } 98 99 pub async fn get_info(&mut self) -> Result<String> { 100 match self { 101 CacheBackendImpl::InMemory(cache) => cache.get_info().await, 102 CacheBackendImpl::Redis(cache) => cache.get_info().await, 103 } 104 } 105} 106 107/// Domain-specific cache wrapper with convenience methods for Slices operations. 108/// 109/// Provides typed methods for actors, lexicons, domains, collections, auth, and DID resolution. 110pub struct SliceCache { 111 cache: CacheBackendImpl, 112} 113 114impl SliceCache { 115 pub fn new(cache: CacheBackendImpl) -> Self { 116 Self { cache } 117 } 118 119 /// Actor cache methods (permanent cache - no TTL) 120 pub async fn is_actor(&mut self, did: &str, slice_uri: &str) -> Result<Option<bool>> { 121 let key = format!("actor:{}:{}", did, slice_uri); 122 self.cache.get::<bool>(&key).await 123 } 124 125 pub async fn cache_actor_exists(&mut self, did: &str, slice_uri: &str) -> Result<()> { 126 let key = format!("actor:{}:{}", did, slice_uri); 127 self.cache.set(&key, &true, None).await 128 } 129 130 pub async fn remove_actor(&mut self, did: &str, slice_uri: &str) -> Result<()> { 131 let key = format!("actor:{}:{}", did, slice_uri); 132 self.cache.delete(&key).await 133 } 134 135 pub async fn preload_actors(&mut self, actors: Vec<(String, String)>) -> Result<()> { 136 if actors.is_empty() { 137 return Ok(()); 138 } 139 140 let items: Vec<(String, bool, Option<u64>)> = actors 141 .into_iter() 142 .map(|(did, slice_uri)| (format!("actor:{}:{}", did, slice_uri), true, None)) 143 .collect(); 144 145 let items_ref: Vec<(&str, &bool, Option<u64>)> = items 146 .iter() 147 .map(|(key, value, ttl)| (key.as_str(), value, *ttl)) 148 .collect(); 149 150 self.cache.set_multiple(items_ref).await 151 } 152 153 /// Lexicon cache methods (2 hour TTL) 154 pub async fn cache_lexicons( 155 &mut self, 156 slice_uri: &str, 157 lexicons: &Vec<serde_json::Value>, 158 ) -> Result<()> { 159 let key = format!("lexicons:{}", slice_uri); 160 self.cache.set(&key, lexicons, Some(7200)).await 161 } 162 163 pub async fn get_lexicons( 164 &mut self, 165 slice_uri: &str, 166 ) -> Result<Option<Vec<serde_json::Value>>> { 167 let key = format!("lexicons:{}", slice_uri); 168 self.cache.get::<Vec<serde_json::Value>>(&key).await 169 } 170 171 /// Domain cache methods (4 hour TTL) 172 pub async fn cache_slice_domain(&mut self, slice_uri: &str, domain: &str) -> Result<()> { 173 let key = format!("domain:{}", slice_uri); 174 self.cache.set(&key, &domain.to_string(), Some(14400)).await 175 } 176 177 pub async fn get_slice_domain(&mut self, slice_uri: &str) -> Result<Option<String>> { 178 let key = format!("domain:{}", slice_uri); 179 self.cache.get::<String>(&key).await 180 } 181 182 /// Collections cache methods (2 hour TTL) 183 pub async fn cache_slice_collections( 184 &mut self, 185 slice_uri: &str, 186 collections: &HashSet<String>, 187 ) -> Result<()> { 188 let key = format!("collections:{}", slice_uri); 189 self.cache.set(&key, collections, Some(7200)).await 190 } 191 192 pub async fn get_slice_collections( 193 &mut self, 194 slice_uri: &str, 195 ) -> Result<Option<HashSet<String>>> { 196 let key = format!("collections:{}", slice_uri); 197 self.cache.get::<HashSet<String>>(&key).await 198 } 199 200 /// Auth cache methods (5 minute TTL) 201 pub async fn get_cached_oauth_userinfo( 202 &mut self, 203 token: &str, 204 ) -> Result<Option<serde_json::Value>> { 205 let key = format!("oauth_userinfo:{}", token); 206 self.cache.get(&key).await 207 } 208 209 pub async fn cache_oauth_userinfo( 210 &mut self, 211 token: &str, 212 userinfo: &serde_json::Value, 213 ttl_seconds: u64, 214 ) -> Result<()> { 215 let key = format!("oauth_userinfo:{}", token); 216 self.cache.set(&key, userinfo, Some(ttl_seconds)).await 217 } 218 219 pub async fn get_cached_atproto_session( 220 &mut self, 221 token: &str, 222 ) -> Result<Option<serde_json::Value>> { 223 let key = format!("atproto_session:{}", token); 224 self.cache.get(&key).await 225 } 226 227 pub async fn cache_atproto_session( 228 &mut self, 229 token: &str, 230 session: &serde_json::Value, 231 ttl_seconds: u64, 232 ) -> Result<()> { 233 let key = format!("atproto_session:{}", token); 234 self.cache.set(&key, session, Some(ttl_seconds)).await 235 } 236 237 /// DID resolution cache methods (24 hour TTL) 238 pub async fn get_cached_did_resolution( 239 &mut self, 240 did: &str, 241 ) -> Result<Option<serde_json::Value>> { 242 let key = format!("did_resolution:{}", did); 243 self.cache.get(&key).await 244 } 245 246 pub async fn cache_did_resolution( 247 &mut self, 248 did: &str, 249 actor_data: &serde_json::Value, 250 ) -> Result<()> { 251 let key = format!("did_resolution:{}", did); 252 self.cache.set(&key, actor_data, Some(86400)).await 253 } 254 255 pub async fn invalidate_did_resolution(&mut self, did: &str) -> Result<()> { 256 let key = format!("did_resolution:{}", did); 257 self.cache.delete(&key).await 258 } 259 260 /// Generic get/set for custom caching needs 261 pub async fn get<T>(&mut self, key: &str) -> Result<Option<T>> 262 where 263 T: for<'de> Deserialize<'de> + Send, 264 { 265 self.cache.get(key).await 266 } 267 268 pub async fn set<T>(&mut self, key: &str, value: &T, ttl_seconds: Option<u64>) -> Result<()> 269 where 270 T: Serialize + Send + Sync, 271 { 272 self.cache.set(key, value, ttl_seconds).await 273 } 274 275 /// Utility methods 276 pub async fn ping(&mut self) -> Result<bool> { 277 self.cache.ping().await 278 } 279 280 pub async fn get_info(&mut self) -> Result<String> { 281 self.cache.get_info().await 282 } 283} 284 285/// Cache backend configuration enum. 286#[derive(Debug, Clone)] 287pub enum CacheBackend { 288 InMemory { 289 ttl_seconds: Option<u64>, 290 }, 291 Redis { 292 url: String, 293 ttl_seconds: Option<u64>, 294 }, 295} 296 297/// Factory for creating cache instances with automatic Redis fallback. 298pub struct CacheFactory; 299 300impl CacheFactory { 301 /// Create a cache backend, falling back to in-memory if Redis fails. 302 pub async fn create_cache(backend: CacheBackend) -> Result<CacheBackendImpl> { 303 match backend { 304 CacheBackend::InMemory { ttl_seconds } => { 305 let ttl_display = ttl_seconds 306 .map(|t| format!("{}s", t)) 307 .unwrap_or_else(|| "default".to_string()); 308 info!("Creating in-memory cache with TTL: {}", ttl_display); 309 Ok(CacheBackendImpl::InMemory(InMemoryCache::new(ttl_seconds))) 310 } 311 CacheBackend::Redis { url, ttl_seconds } => { 312 info!("Attempting to create Redis cache at: {}", url); 313 match RedisCache::new(&url, ttl_seconds).await { 314 Ok(redis_cache) => { 315 info!("Created Redis cache successfully"); 316 Ok(CacheBackendImpl::Redis(redis_cache)) 317 } 318 Err(e) => { 319 warn!( 320 error = ?e, 321 "Failed to create Redis cache, falling back to in-memory" 322 ); 323 Ok(CacheBackendImpl::InMemory(InMemoryCache::new(ttl_seconds))) 324 } 325 } 326 } 327 } 328 } 329 330 /// Create a SliceCache with the specified backend. 331 pub async fn create_slice_cache(backend: CacheBackend) -> Result<SliceCache> { 332 let cache = Self::create_cache(backend).await?; 333 Ok(SliceCache::new(cache)) 334 } 335}