Rust AppView - highly experimental!
fork

Configure Feed

Select the types of activity you want to include in your feed.

cleanup, unify timeline cache

+7 -468
+6 -29
parakeet/src/common/cache_listener.rs
··· 103 103 /// Handle a cache invalidation message 104 104 /// 105 105 /// Unified format using actor_id and rkey only (no AT-URIs): 106 - /// - `timeline:{actor_id}:` - Invalidate all timeline entries for this actor 107 - /// - `authorfeed:{actor_id}:` - Invalidate all author feed entries 108 - /// - `profile:{actor_id}` - Invalidate profile (no-op, not cached yet) 109 - /// - `post:{actor_id}:{rkey}` - Invalidate post (no-op, not cached yet) 110 - /// - `feedgen:{actor_id}:{rkey}` - Invalidate feedgen (no-op, not cached yet) 111 - /// - `list:{actor_id}:{rkey}` - Invalidate list (no-op, not cached yet) 112 - /// - `starterpack:{actor_id}:{rkey}` - Invalidate starterpack (no-op, not cached yet) 106 + /// - `profile:{actor_id}` - Invalidate profile 107 + /// - `post:{actor_id}:{rkey}` - Invalidate post 108 + /// - `feedgen:{actor_id}:{rkey}` - Invalidate feedgen 109 + /// - `list:{actor_id}:{rkey}` - Invalidate list 110 + /// - `starterpack:{actor_id}:{rkey}` - Invalidate starterpack 113 111 /// - `labeler:{actor_id}` - Invalidate labeler (no-op, not cached yet) 114 112 async fn handle_cache_invalidation(state: &GlobalState, cache_key: &str) { 115 - // Timeline invalidation: "timeline:{actor_id}:" 116 - if let Some(prefix) = cache_key.strip_prefix("timeline:") { 117 - // Extract actor_id 118 - if let Some(actor_id_str) = prefix.strip_suffix(':') { 119 - if let Ok(actor_id) = actor_id_str.parse::<i32>() { 120 - let count = state.timeline_cache.invalidate_by_actor_id(actor_id).await; 121 - info!(actor_id = actor_id, count = count, "Invalidated timeline cache"); 122 - return; 123 - } 124 - } 125 - } 126 - 127 - // Author feed invalidation: "authorfeed:{actor_id}:" 128 - if let Some(prefix) = cache_key.strip_prefix("authorfeed:") { 129 - if let Some(actor_id_str) = prefix.strip_suffix(':') { 130 - if let Ok(actor_id) = actor_id_str.parse::<i32>() { 131 - let count = state.author_feed_cache.invalidate_by_actor_id(actor_id).await; 132 - info!(actor_id = actor_id, count = count, "Invalidated author feed cache"); 133 - return; 134 - } 135 - } 136 - } 113 + // Note: timeline and authorfeed caching removed - entity caching handles invalidation 137 114 138 115 // Profile invalidation: "profile:{actor_id}" 139 116 if let Some(actor_id_str) = cache_key.strip_prefix("profile:") {
-373
parakeet/src/common/cache_timeline.rs
··· 1 - //! Feed caching for timeline and author feed endpoints 2 - //! 3 - //! Caches feed results per-user with cursor-based pagination. 4 - //! Cache keys include user DID and cursor to support pagination. 5 - //! Short TTL (60-120 seconds) balances freshness with performance. 6 - //! 7 - //! Uses moka for in-memory caching with TinyLFU eviction. 8 - //! 9 - //! Supports: 10 - //! - Timeline feeds (getTimeline) 11 - //! - Author feeds (getAuthorFeed with filters) 12 - 13 - use moka::future::Cache; 14 - use std::time::Duration; 15 - 16 - /// Timeline cache manager 17 - #[derive(Clone)] 18 - pub struct TimelineCache { 19 - cache: Cache<String, CachedTimeline>, 20 - } 21 - 22 - /// Cached timeline result 23 - #[derive(Debug, Clone)] 24 - pub struct CachedTimeline { 25 - /// Post URIs in timeline order 26 - pub post_uris: Vec<String>, 27 - /// Pagination cursor for next page 28 - pub cursor: Option<String>, 29 - /// Timestamp when cached (for debug/monitoring) 30 - pub cached_at: i64, 31 - } 32 - 33 - impl TimelineCache { 34 - /// Create a new timeline cache with moka 35 - /// 36 - /// # Arguments 37 - /// * `ttl_secs` - Cache TTL in seconds (default: 60-120 seconds) 38 - /// * `max_capacity` - Maximum number of cached items (default: 10000) 39 - pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 40 - let cache = Cache::builder() 41 - .max_capacity(max_capacity) 42 - .time_to_live(Duration::from_secs(ttl_secs)) 43 - .support_invalidation_closures() 44 - .build(); 45 - 46 - Self { cache } 47 - } 48 - 49 - /// Build cache key for a timeline request 50 - /// 51 - /// Format: `timeline:{actor_id}:{cursor_hash}` 52 - /// Where cursor_hash is "first" for first page or the cursor value 53 - fn cache_key(actor_id: i32, cursor: Option<&str>) -> String { 54 - let cursor_str = cursor.unwrap_or("first"); 55 - format!("timeline:{}:{}", actor_id, cursor_str) 56 - } 57 - 58 - /// Get cached timeline if available 59 - /// 60 - /// Returns None if cache miss or expired 61 - pub async fn get( 62 - &self, 63 - actor_id: i32, 64 - cursor: Option<&str>, 65 - ) -> Option<CachedTimeline> { 66 - let key = Self::cache_key(actor_id, cursor); 67 - 68 - let cached = self.cache.get(&key).await?; 69 - 70 - tracing::debug!(actor_id, cursor, "Timeline cache hit"); 71 - Some(cached) 72 - } 73 - 74 - /// Store timeline in cache 75 - /// 76 - /// Sets TTL to prevent stale data 77 - pub async fn set( 78 - &self, 79 - actor_id: i32, 80 - cursor: Option<&str>, 81 - post_uris: Vec<String>, 82 - next_cursor: Option<String>, 83 - ) { 84 - let key = Self::cache_key(actor_id, cursor); 85 - 86 - let cached = CachedTimeline { 87 - post_uris: post_uris.clone(), 88 - cursor: next_cursor, 89 - cached_at: chrono::Utc::now().timestamp(), 90 - }; 91 - 92 - self.cache.insert(key, cached).await; 93 - 94 - tracing::debug!( 95 - actor_id, 96 - cursor, 97 - post_count = post_uris.len(), 98 - "Timeline cached" 99 - ); 100 - } 101 - 102 - /// Invalidate timeline cache for a user by actor_id 103 - /// 104 - /// Call this when: 105 - /// - User follows/unfollows someone 106 - /// - Posts are deleted from followed users 107 - /// - User blocks/mutes someone 108 - pub async fn invalidate_by_actor_id(&self, actor_id: i32) -> u64 { 109 - let prefix = format!("timeline:{}:", actor_id); 110 - 111 - // Invalidate all entries matching the prefix 112 - // Deletion metrics not tracked in current implementation 113 - drop(self.cache.invalidate_entries_if(move |key, _| { 114 - key.starts_with(&prefix) 115 - })); 116 - 117 - // Run pending tasks to complete invalidation 118 - self.cache.run_pending_tasks().await; 119 - 120 - tracing::debug!( 121 - actor_id, 122 - "Timeline cache invalidated" 123 - ); 124 - 125 - // Return 0 since we can't track count with moka's API 126 - // (invalidate_entries_if takes Fn, not FnMut, so can't mutate count) 127 - 0 128 - } 129 - 130 - /// Get cache statistics for monitoring 131 - /// 132 - /// Returns total number of cached entries and approximate user count 133 - pub async fn stats(&self) -> (u64, u64) { 134 - // Run pending tasks to get accurate stats 135 - self.cache.run_pending_tasks().await; 136 - 137 - let entry_count = self.cache.entry_count(); 138 - let weighted_size = self.cache.weighted_size(); 139 - 140 - (entry_count, weighted_size) 141 - } 142 - } 143 - 144 - /// Author feed cache manager 145 - #[derive(Clone)] 146 - pub struct AuthorFeedCache { 147 - cache: Cache<String, CachedTimeline>, 148 - } 149 - 150 - impl AuthorFeedCache { 151 - /// Create a new author feed cache with moka 152 - pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 153 - let cache = Cache::builder() 154 - .max_capacity(max_capacity) 155 - .time_to_live(Duration::from_secs(ttl_secs)) 156 - .support_invalidation_closures() 157 - .build(); 158 - 159 - Self { cache } 160 - } 161 - 162 - /// Build cache key for an author feed request 163 - /// 164 - /// Format: `authorfeed:{actor_id}:{filter}:{cursor}` 165 - fn cache_key(actor_id: i32, filter: &str, cursor: Option<&str>) -> String { 166 - let cursor_str = cursor.unwrap_or("first"); 167 - format!("authorfeed:{}:{}:{}", actor_id, filter, cursor_str) 168 - } 169 - 170 - /// Get cached author feed if available 171 - pub async fn get( 172 - &self, 173 - actor_id: i32, 174 - filter: &str, 175 - cursor: Option<&str>, 176 - ) -> Option<CachedTimeline> { 177 - let key = Self::cache_key(actor_id, filter, cursor); 178 - 179 - let cached = self.cache.get(&key).await?; 180 - 181 - tracing::debug!(actor_id, filter, cursor, "Author feed cache hit"); 182 - Some(cached) 183 - } 184 - 185 - /// Store author feed in cache 186 - pub async fn set( 187 - &self, 188 - actor_id: i32, 189 - filter: &str, 190 - cursor: Option<&str>, 191 - post_uris: Vec<String>, 192 - next_cursor: Option<String>, 193 - ) { 194 - let key = Self::cache_key(actor_id, filter, cursor); 195 - 196 - let cached = CachedTimeline { 197 - post_uris: post_uris.clone(), 198 - cursor: next_cursor, 199 - cached_at: chrono::Utc::now().timestamp(), 200 - }; 201 - 202 - self.cache.insert(key, cached).await; 203 - 204 - tracing::debug!( 205 - actor_id, 206 - filter, 207 - cursor, 208 - post_count = post_uris.len(), 209 - "Author feed cached" 210 - ); 211 - } 212 - 213 - /// Invalidate author feed cache for a user by actor_id 214 - /// 215 - /// Call this when: 216 - /// - User creates/deletes a post 217 - /// - User creates/deletes a repost 218 - pub async fn invalidate_by_actor_id(&self, actor_id: i32) -> u64 { 219 - let prefix = format!("authorfeed:{}:", actor_id); 220 - 221 - // Invalidate all entries matching the prefix 222 - // Deletion metrics not tracked in current implementation 223 - drop(self.cache.invalidate_entries_if(move |key, _| { 224 - key.starts_with(&prefix) 225 - })); 226 - 227 - // Run pending tasks to complete invalidation 228 - self.cache.run_pending_tasks().await; 229 - 230 - tracing::debug!( 231 - actor_id, 232 - "Author feed cache invalidated" 233 - ); 234 - 235 - // Return 0 since we can't track count with moka's API 236 - 0 237 - } 238 - 239 - /// Get cache statistics for monitoring 240 - pub async fn stats(&self) -> (u64, u64) { 241 - self.cache.run_pending_tasks().await; 242 - 243 - let entry_count = self.cache.entry_count(); 244 - let weighted_size = self.cache.weighted_size(); 245 - 246 - (entry_count, weighted_size) 247 - } 248 - } 249 - 250 - #[cfg(test)] 251 - mod tests { 252 - use super::*; 253 - 254 - #[tokio::test] 255 - async fn test_timeline_cache_set_and_get() { 256 - let cache = TimelineCache::new(60, 1000); 257 - 258 - let actor_id = 123; 259 - let post_uris = vec![ 260 - "at://did:plc:test/app.bsky.feed.post/1".to_string(), 261 - "at://did:plc:test/app.bsky.feed.post/2".to_string(), 262 - ]; 263 - 264 - // Set cache 265 - cache 266 - .set(actor_id, None, post_uris.clone(), Some("cursor123".to_string())) 267 - .await; 268 - 269 - // Get cache 270 - let cached = cache.get(actor_id, None).await.unwrap(); 271 - 272 - assert_eq!(cached.post_uris, post_uris); 273 - assert_eq!(cached.cursor, Some("cursor123".to_string())); 274 - } 275 - 276 - #[tokio::test] 277 - async fn test_timeline_cache_pagination() { 278 - let cache = TimelineCache::new(60, 1000); 279 - 280 - let actor_id = 456; 281 - 282 - // Cache page 1 283 - cache 284 - .set( 285 - actor_id, 286 - None, 287 - vec!["post1".to_string()], 288 - Some("cursor1".to_string()), 289 - ) 290 - .await; 291 - 292 - // Cache page 2 293 - cache 294 - .set( 295 - actor_id, 296 - Some("cursor1"), 297 - vec!["post2".to_string()], 298 - None, 299 - ) 300 - .await; 301 - 302 - // Get page 1 303 - let page1 = cache.get(actor_id, None).await.unwrap(); 304 - assert_eq!(page1.post_uris, vec!["post1"]); 305 - assert_eq!(page1.cursor, Some("cursor1".to_string())); 306 - 307 - // Get page 2 308 - let page2 = cache.get(actor_id, Some("cursor1")).await.unwrap(); 309 - assert_eq!(page2.post_uris, vec!["post2"]); 310 - assert_eq!(page2.cursor, None); 311 - } 312 - 313 - #[tokio::test] 314 - async fn test_timeline_cache_invalidation() { 315 - let cache = TimelineCache::new(60, 1000); 316 - 317 - let actor_id = 789; 318 - 319 - // Cache some data 320 - cache 321 - .set(actor_id, None, vec!["post1".to_string()], None) 322 - .await; 323 - 324 - cache 325 - .set( 326 - actor_id, 327 - Some("cursor1"), 328 - vec!["post2".to_string()], 329 - None, 330 - ) 331 - .await; 332 - 333 - // Verify cached 334 - assert!(cache.get(actor_id, None).await.is_some()); 335 - assert!(cache.get(actor_id, Some("cursor1")).await.is_some()); 336 - 337 - // Invalidate 338 - let _deleted = cache.invalidate_by_actor_id(actor_id).await; 339 - 340 - // Verify cache cleared 341 - assert!(cache.get(actor_id, None).await.is_none()); 342 - assert!(cache.get(actor_id, Some("cursor1")).await.is_none()); 343 - } 344 - 345 - #[tokio::test] 346 - async fn test_timeline_cache_stats() { 347 - let cache = TimelineCache::new(60, 1000); 348 - 349 - let actor_id = 999; 350 - 351 - // Initially zero 352 - let (count, _) = cache.stats().await; 353 - assert_eq!(count, 0); 354 - 355 - // Cache two pages 356 - cache 357 - .set(actor_id, None, vec!["post1".to_string()], None) 358 - .await; 359 - 360 - cache 361 - .set( 362 - actor_id, 363 - Some("cursor1"), 364 - vec!["post2".to_string()], 365 - None, 366 - ) 367 - .await; 368 - 369 - // Should have 2 cached pages 370 - let (count, _) = cache.stats().await; 371 - assert_eq!(count, 2); 372 - } 373 - }
-4
parakeet/src/lib.rs
··· 11 11 12 12 pub mod auth; 13 13 pub mod cache_listener; 14 - pub mod cache_timeline; 15 14 pub mod errors; 16 15 pub mod helpers; 17 16 pub mod rate_limiting; ··· 19 18 // Re-export commonly used items 20 19 pub use auth::{AtpAcceptLabelers, AtpAuth, JwtVerifier}; 21 20 pub use cache_listener::spawn_cache_listener; 22 - pub use cache_timeline::{AuthorFeedCache, TimelineCache}; 23 21 pub use errors::{Error, XrpcResult}; 24 22 pub use rate_limiting::{rate_limit_middleware, RateLimiter}; 25 23 } ··· 76 74 pub id_cache: Arc<parakeet_db::id_cache::IdCache>, 77 75 pub rate_limiter: Arc<common::rate_limiting::RateLimiter>, 78 76 pub rate_limit_config: config::ConfigRateLimit, 79 - pub timeline_cache: Arc<common::TimelineCache>, 80 - pub author_feed_cache: Arc<common::AuthorFeedCache>, 81 77 pub http_client: reqwest::Client, 82 78 // Entity-based system (replaces old hydration/loaders/caches) 83 79 pub profile_entity: Arc<ProfileEntity>,
-9
parakeet/src/main.rs
··· 87 87 // Initialize rate limiter (in-memory with DashMap) 88 88 let rate_limiter = Arc::new(common::rate_limiting::RateLimiter::new()); 89 89 90 - // Initialize timeline cache (60 second TTL, 10k max items) 91 - let timeline_cache = Arc::new(common::TimelineCache::new(60, 10_000)); 92 - 93 - // Initialize author feed cache (60 second TTL, 10k max items) 94 - let author_feed_cache = Arc::new(common::AuthorFeedCache::new(60, 10_000)); 95 - 96 - 97 90 // Initialize new entity-centric implementations (replacing old caches) 98 91 let profile_entity = Arc::new(ProfileEntity::new( 99 92 Arc::new(pool.clone()), ··· 137 130 id_cache, 138 131 rate_limiter, 139 132 rate_limit_config: conf.rate_limit.clone(), 140 - timeline_cache, 141 - author_feed_cache, 142 133 http_client, 143 134 profile_entity, 144 135 post_entity,
+1 -53
parakeet/src/xrpc/app_bsky/feed/get_timeline.rs
··· 43 43 let user_actor_id = state.profile_entity.resolve_identifier(&user_did).await 44 44 .map_err(|_| crate::common::errors::Error::actor_not_found(&user_did))?; 45 45 46 - // Try cache first 46 + // Query database 47 47 let mut step_timer = std::time::Instant::now(); 48 - if let Some(cached) = state.timeline_cache.get(user_actor_id, query.cursor.as_deref()).await { 49 - // Cache hit - hydrate the cached URIs 50 - let cache_time = step_timer.elapsed().as_secs_f64() * 1000.0; 51 - tracing::info!(" ├─ Timeline cache hit: {:.1} ms", cache_time); 52 - 53 - step_timer = std::time::Instant::now(); 54 - 55 - // Get posts with reply context using PostEntity 56 - let mut posts_with_context = state.post_entity.get_by_uris_with_reply_context(cached.post_uris.clone(), Some(&user_did)).await 57 - .unwrap_or_default(); 58 - 59 - // Convert to FeedViewPosts maintaining order 60 - let mut feed = Vec::new(); 61 - for uri in cached.post_uris { 62 - if let Some((post, reply_context)) = posts_with_context.remove(&uri) { 63 - feed.push(FeedViewPost { 64 - post, 65 - reply: reply_context, 66 - reason: None, // TODO: Load repost reason 67 - feed_context: None, 68 - }); 69 - } 70 - } 71 - 72 - let hydrate_time = step_timer.elapsed().as_secs_f64() * 1000.0; 73 - tracing::info!(" ├─ Hydrate cached feed: {:.1} ms", hydrate_time); 74 - 75 - let total_time = start.elapsed().as_secs_f64() * 1000.0; 76 - tracing::info!(" └─ getTimeline total (cache hit): {:.1} ms (returning {} posts, cursor: {:?})", total_time, feed.len(), cached.cursor); 77 - 78 - return Ok(Json(GetTimelineRes { 79 - cursor: cached.cursor, 80 - feed, 81 - })); 82 - } 83 - 84 - let cache_check_time = step_timer.elapsed().as_secs_f64() * 1000.0; 85 - if cache_check_time >= 1.0 { 86 - tracing::info!(" ├─ Timeline cache miss: {:.1} ms", cache_check_time); 87 - } 88 - 89 - // Cache miss - query database 90 - step_timer = std::time::Instant::now(); 91 48 92 49 // Parse cursor 93 50 let cursor_value = datetime_cursor(query.cursor.as_ref()); ··· 146 103 post_uris.push(format!("at://{}/app.bsky.feed.post/{}", did, parakeet_db::tid_util::encode_tid(*post_rkey))); 147 104 } 148 105 149 - // Cache the timeline (if we have posts) 150 - if !post_uris.is_empty() { 151 - state.timeline_cache.set( 152 - user_actor_id, 153 - query.cursor.as_deref(), 154 - post_uris.clone(), 155 - result.cursor.clone(), 156 - ).await; 157 - } 158 106 159 107 // Get posts with reply context using PostEntity 160 108 let mut posts_with_context = state.post_entity.get_by_uris_with_reply_context(post_uris.clone(), Some(&user_did)).await