Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

feat: implement cache

+1414 -85
+1 -1
lexica/src/app_bsky/actor.rs
··· 197 197 pub indexed_at: DateTime<Utc>, 198 198 } 199 199 200 - #[derive(Debug, Serialize, Deserialize)] 200 + #[derive(Clone, Debug, Serialize, Deserialize)] 201 201 #[serde(rename_all = "camelCase")] 202 202 pub struct ProfileViewDetailed { 203 203 pub did: String,
+44 -24
parakeet/src/cache_listener.rs
··· 136 136 } 137 137 138 138 // Profile invalidation: "profile:{actor_id}" 139 - if cache_key.starts_with("profile:") { 140 - // No-op for now - we don't cache profiles yet 141 - // When we add profile cache, parse actor_id and invalidate 142 - return; 139 + if let Some(actor_id_str) = cache_key.strip_prefix("profile:") { 140 + if let Ok(actor_id) = actor_id_str.parse::<i32>() { 141 + state.profile_cache.invalidate(actor_id).await; 142 + info!(actor_id = actor_id, "Invalidated profile cache"); 143 + return; 144 + } 143 145 } 144 146 145 147 // Post invalidation: "post:{actor_id}:{rkey}" 146 - if cache_key.starts_with("post:") { 147 - // No-op for now - we don't cache individual posts yet 148 - // When we add post cache, parse actor_id:rkey and invalidate 149 - return; 148 + if let Some(rest) = cache_key.strip_prefix("post:") { 149 + if let Some((actor_id_str, rkey_str)) = rest.split_once(':') { 150 + if let (Ok(actor_id), Ok(rkey)) = (actor_id_str.parse::<i32>(), rkey_str.parse::<i64>()) { 151 + state.post_cache.invalidate(actor_id, rkey).await; 152 + info!(actor_id = actor_id, rkey = rkey, "Invalidated post cache"); 153 + return; 154 + } 155 + } 150 156 } 151 157 152 158 // Feedgen invalidation: "feedgen:{actor_id}:{rkey}" 153 - if cache_key.starts_with("feedgen:") { 154 - // No-op for now - we don't cache feedgens yet 155 - // When we add feedgen cache, parse actor_id:rkey and invalidate 156 - return; 159 + if let Some(rest) = cache_key.strip_prefix("feedgen:") { 160 + if let Some((actor_id_str, rkey)) = rest.split_once(':') { 161 + if let Ok(actor_id) = actor_id_str.parse::<i32>() { 162 + state.feedgen_cache.invalidate(actor_id, rkey).await; 163 + info!(actor_id = actor_id, rkey = rkey, "Invalidated feedgen cache"); 164 + return; 165 + } 166 + } 157 167 } 158 168 159 169 // List invalidation: "list:{actor_id}:{rkey}" 160 - if cache_key.starts_with("list:") { 161 - // No-op for now - we don't cache lists yet 162 - // When we add list cache, parse actor_id:rkey and invalidate 163 - return; 170 + if let Some(rest) = cache_key.strip_prefix("list:") { 171 + if let Some((actor_id_str, rkey)) = rest.split_once(':') { 172 + if let Ok(actor_id) = actor_id_str.parse::<i32>() { 173 + state.list_cache.invalidate(actor_id, rkey).await; 174 + info!(actor_id = actor_id, rkey = rkey, "Invalidated list cache"); 175 + return; 176 + } 177 + } 164 178 } 165 179 166 180 // Starterpack invalidation: "starterpack:{actor_id}:{rkey}" 167 - if cache_key.starts_with("starterpack:") { 168 - // No-op for now - we don't cache starterpacks yet 169 - // When we add starterpack cache, parse actor_id:rkey and invalidate 170 - return; 181 + if let Some(rest) = cache_key.strip_prefix("starterpack:") { 182 + if let Some((actor_id_str, rkey)) = rest.split_once(':') { 183 + if let Ok(actor_id) = actor_id_str.parse::<i32>() { 184 + state.starterpack_cache.invalidate(actor_id, rkey).await; 185 + info!(actor_id = actor_id, rkey = rkey, "Invalidated starterpack cache"); 186 + return; 187 + } 188 + } 171 189 } 172 190 173 191 // Labeler invalidation: "labeler:{actor_id}" 174 - if cache_key.starts_with("labeler:") { 175 - // No-op for now - we don't cache labelers yet 176 - // When we add labeler cache, parse actor_id and invalidate 177 - return; 192 + if let Some(actor_id_str) = cache_key.strip_prefix("labeler:") { 193 + if let Ok(actor_id) = actor_id_str.parse::<i32>() { 194 + state.labeler_cache.invalidate(actor_id).await; 195 + info!(actor_id = actor_id, "Invalidated labeler cache"); 196 + return; 197 + } 178 198 } 179 199 180 200 warn!(cache_key = cache_key, "Unknown cache invalidation pattern");
+805
parakeet/src/entity_cache.rs
··· 1 + //! Entity caching for various AT Protocol objects 2 + //! 3 + //! This is the ONLY approved way to get hydrated data in the application. 4 + //! Direct hydration through StatefulHydrator should be avoided to ensure 5 + //! all data access goes through the cache. 6 + //! 7 + //! ## Usage 8 + //! 9 + //! Instead of: 10 + //! ``` 11 + //! let hydrator = StatefulHydrator::new(...); 12 + //! let profile = hydrator.hydrate_profile_detailed(did).await; // ❌ Bypasses cache! 13 + //! ``` 14 + //! 15 + //! Always use: 16 + //! ``` 17 + //! let profile = profile_cache.get_or_hydrate(actor_id, did, &hydrator).await; // ✅ Uses cache! 18 + //! ``` 19 + //! 20 + //! ## Cache Strategy 21 + //! 22 + //! We cache the expensive "Detailed" variants: 23 + //! - `ProfileViewDetailed` - Full profile with counts, used in getProfile 24 + //! - `PostView` - Full post with stats, embeds, and viewer state 25 + //! - `GeneratorView`, `ListView`, etc. - Full entities with all metadata 26 + //! 27 + //! Cache invalidation is handled by database triggers via pg_notify. 28 + 29 + use moka::future::Cache; 30 + use std::time::Duration; 31 + 32 + // Import the actual types used in the XRPC endpoints 33 + use lexica::app_bsky::actor::ProfileViewDetailed; 34 + use lexica::app_bsky::feed::PostView; 35 + use lexica::app_bsky::feed::GeneratorView; 36 + use lexica::app_bsky::graph::ListView; 37 + use lexica::app_bsky::graph::StarterPackViewBasic; 38 + 39 + use crate::hydration::StatefulHydrator; 40 + use crate::id_cache_helpers; 41 + use diesel_async::pooled_connection::deadpool::Pool; 42 + use diesel_async::AsyncPgConnection; 43 + use parakeet_db::id_cache::IdCache; 44 + use std::sync::Arc; 45 + 46 + /// Parsed components of an AT-URI 47 + pub struct ParsedAtUri { 48 + pub did: String, 49 + pub collection: String, 50 + pub rkey: String, 51 + } 52 + 53 + /// Parse an AT-URI (at://did/collection/rkey) into its components 54 + pub fn parse_at_uri(uri: &str) -> Option<ParsedAtUri> { 55 + // Expected format: at://did/collection/rkey 56 + let uri = uri.strip_prefix("at://")?; 57 + let parts: Vec<&str> = uri.split('/').collect(); 58 + 59 + if parts.len() != 3 { 60 + return None; 61 + } 62 + 63 + Some(ParsedAtUri { 64 + did: parts[0].to_string(), 65 + collection: parts[1].to_string(), 66 + rkey: parts[2].to_string(), 67 + }) 68 + } 69 + 70 + /// Profile cache that handles hydration automatically 71 + /// 72 + /// Uses actor_id as the cache key for consistency with database triggers 73 + /// 74 + /// Usage: 75 + /// ``` 76 + /// let profile = profile_cache.get_or_hydrate(actor_id, did, &hyd).await?; 77 + /// ``` 78 + #[derive(Clone)] 79 + pub struct ProfileCache { 80 + cache: Cache<i32, ProfileViewDetailed>, // Key is actor_id 81 + } 82 + 83 + impl ProfileCache { 84 + pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 85 + let cache = Cache::builder() 86 + .max_capacity(max_capacity) 87 + .time_to_live(Duration::from_secs(ttl_secs)) 88 + .support_invalidation_closures() 89 + .build(); 90 + 91 + Self { cache } 92 + } 93 + 94 + /// Get a profile from cache or hydrate it if not cached 95 + /// 96 + /// This is the primary API - it handles all caching logic internally 97 + /// Takes both actor_id (for caching) and DID (for hydration) 98 + pub async fn get_or_hydrate( 99 + &self, 100 + actor_id: i32, 101 + did: String, 102 + hydrator: &StatefulHydrator<'_>, 103 + ) -> Option<ProfileViewDetailed> { 104 + // Check cache first using actor_id 105 + if let Some(profile) = self.cache.get(&actor_id).await { 106 + tracing::debug!(actor_id, "Profile cache hit"); 107 + return Some(profile); 108 + } 109 + 110 + // Cache miss - hydrate the profile using DID 111 + tracing::debug!(actor_id, did = %did, "Profile cache miss, hydrating"); 112 + 113 + // We're allowed to call the deprecated method here since we're the cache 114 + #[allow(deprecated)] 115 + let profile = hydrator.hydrate_profile_detailed(did).await?; 116 + 117 + // Store in cache using actor_id as key 118 + self.cache.insert(actor_id, profile.clone()).await; 119 + tracing::debug!(actor_id, "Profile cached"); 120 + 121 + Some(profile) 122 + } 123 + 124 + /// Get multiple profiles with caching 125 + /// 126 + /// This is the recommended way to get multiple profiles, ensuring each one 127 + /// uses the cache individually 128 + pub async fn get_or_hydrate_batch( 129 + &self, 130 + dids: Vec<String>, 131 + pool: &Pool<AsyncPgConnection>, 132 + id_cache: &Arc<IdCache>, 133 + hydrator: &StatefulHydrator<'_>, 134 + ) -> Vec<ProfileViewDetailed> { 135 + let mut profiles = Vec::with_capacity(dids.len()); 136 + 137 + for did in dids { 138 + // Get actor_id for each DID 139 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch(pool, id_cache, &did).await { 140 + if let Some(profile) = self.get_or_hydrate(actor_id, did, hydrator).await { 141 + profiles.push(profile); 142 + } 143 + } 144 + } 145 + 146 + profiles 147 + } 148 + 149 + pub async fn invalidate(&self, actor_id: i32) { 150 + self.cache.invalidate(&actor_id).await; 151 + tracing::debug!(actor_id, "Profile cache invalidated"); 152 + } 153 + 154 + /// Invalidate all entries (for testing/admin) 155 + pub async fn invalidate_all(&self) { 156 + self.cache.invalidate_all(); 157 + tracing::info!("All profile cache entries invalidated"); 158 + } 159 + } 160 + 161 + /// Post cache that handles hydration automatically 162 + /// 163 + /// Uses (actor_id, rkey) as the cache key for consistency with database triggers 164 + /// 165 + /// Usage: 166 + /// ``` 167 + /// let post = post_cache.get_or_hydrate(actor_id, rkey, uri, &hyd).await; 168 + /// ``` 169 + #[derive(Clone)] 170 + pub struct PostCache { 171 + cache: Cache<(i32, i64), PostView>, // Key is (actor_id, rkey) 172 + } 173 + 174 + impl PostCache { 175 + pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 176 + let cache = Cache::builder() 177 + .max_capacity(max_capacity) 178 + .time_to_live(Duration::from_secs(ttl_secs)) 179 + .support_invalidation_closures() 180 + .build(); 181 + 182 + Self { cache } 183 + } 184 + 185 + /// Get a single post from cache or hydrate it 186 + /// 187 + /// Takes both (actor_id, rkey) for caching and URI for hydration 188 + pub async fn get_or_hydrate_single( 189 + &self, 190 + actor_id: i32, 191 + rkey: i64, 192 + uri: String, 193 + hydrator: &StatefulHydrator<'_>, 194 + ) -> Option<PostView> { 195 + // Check cache first using (actor_id, rkey) 196 + if let Some(post) = self.cache.get(&(actor_id, rkey)).await { 197 + tracing::debug!(actor_id, rkey, "Post cache hit"); 198 + return Some(post); 199 + } 200 + 201 + // Cache miss - hydrate the post using URI 202 + tracing::debug!(actor_id, rkey, uri = %uri, "Post cache miss, hydrating"); 203 + 204 + // We're allowed to call the deprecated method here since we're the cache 205 + #[allow(deprecated)] 206 + let posts = hydrator.hydrate_posts(vec![uri]).await; 207 + let post = posts.into_values().next()?; 208 + 209 + // Store in cache using (actor_id, rkey) as key 210 + self.cache.insert((actor_id, rkey), post.clone()).await; 211 + tracing::debug!(actor_id, rkey, "Post cached"); 212 + 213 + Some(post) 214 + } 215 + 216 + /// Get multiple posts with caching by parsing URIs 217 + /// 218 + /// Parses AT-URIs (at://did/collection/rkey) to extract actor_id and rkey, 219 + /// enabling individual post caching 220 + /// 221 + /// Returns a HashMap for efficient lookups while preserving the ability to iterate 222 + pub async fn get_or_hydrate_from_uris( 223 + &self, 224 + uris: Vec<String>, 225 + pool: &Pool<AsyncPgConnection>, 226 + id_cache: &Arc<IdCache>, 227 + hydrator: &StatefulHydrator<'_>, 228 + ) -> std::collections::HashMap<String, PostView> { 229 + let mut results = Vec::with_capacity(uris.len()); 230 + let mut missing_uris = Vec::new(); 231 + let mut missing_indices = Vec::new(); 232 + 233 + // Try to get each post from cache 234 + for (idx, uri) in uris.iter().enumerate() { 235 + // Parse AT-URI: at://did/collection/rkey 236 + if let Some(parsed) = parse_at_uri(uri) { 237 + // Only process posts (app.bsky.feed.post collection) 238 + if parsed.collection == "app.bsky.feed.post" { 239 + // Get actor_id from DID 240 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 241 + pool, 242 + id_cache, 243 + &parsed.did 244 + ).await { 245 + // Try to parse rkey as i64 (TID) 246 + if let Ok(rkey) = parsed.rkey.parse::<i64>() { 247 + // Check cache 248 + if let Some(post) = self.cache.get(&(actor_id, rkey)).await { 249 + tracing::debug!(actor_id, rkey, "Post cache hit"); 250 + results.push(Some(post)); 251 + continue; 252 + } 253 + } 254 + } 255 + } 256 + } 257 + 258 + // Cache miss or couldn't parse - need to hydrate 259 + missing_uris.push(uri.clone()); 260 + missing_indices.push(idx); 261 + results.push(None); 262 + } 263 + 264 + // Hydrate missing posts if any 265 + if !missing_uris.is_empty() { 266 + tracing::debug!("Hydrating {} missing posts", missing_uris.len()); 267 + 268 + // We're allowed to call the deprecated method here since we're the cache 269 + #[allow(deprecated)] 270 + let hydrated = hydrator.hydrate_posts(missing_uris).await; 271 + 272 + // Store hydrated posts in cache and results 273 + for (idx, uri) in missing_indices.into_iter().zip(hydrated.keys()) { 274 + if let Some(post) = hydrated.get(uri) { 275 + // Try to cache it if we can parse the URI 276 + if let Some(parsed) = parse_at_uri(uri) { 277 + if parsed.collection == "app.bsky.feed.post" { 278 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 279 + pool, 280 + id_cache, 281 + &parsed.did 282 + ).await { 283 + if let Ok(rkey) = parsed.rkey.parse::<i64>() { 284 + self.cache.insert((actor_id, rkey), post.clone()).await; 285 + tracing::debug!(actor_id, rkey, "Post cached"); 286 + } 287 + } 288 + } 289 + } 290 + 291 + results[idx] = Some(post.clone()); 292 + } 293 + } 294 + } 295 + 296 + // Build HashMap from successfully loaded posts 297 + let mut posts_map = std::collections::HashMap::new(); 298 + for (uri, post_opt) in uris.into_iter().zip(results.into_iter()) { 299 + if let Some(post) = post_opt { 300 + posts_map.insert(uri, post); 301 + } 302 + } 303 + posts_map 304 + } 305 + 306 + pub async fn invalidate(&self, actor_id: i32, rkey: i64) { 307 + self.cache.invalidate(&(actor_id, rkey)).await; 308 + tracing::debug!(actor_id, rkey, "Post cache invalidated"); 309 + } 310 + 311 + /// Invalidate all entries (for testing/admin) 312 + pub async fn invalidate_all(&self) { 313 + self.cache.invalidate_all(); 314 + tracing::info!("All post cache entries invalidated"); 315 + } 316 + } 317 + 318 + /// Feedgen cache using (actor_id, rkey) as key 319 + #[derive(Clone)] 320 + pub struct FeedgenCache { 321 + cache: Cache<(i32, String), GeneratorView>, 322 + } 323 + 324 + impl FeedgenCache { 325 + pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 326 + let cache = Cache::builder() 327 + .max_capacity(max_capacity) 328 + .time_to_live(Duration::from_secs(ttl_secs)) 329 + .support_invalidation_closures() 330 + .build(); 331 + 332 + Self { cache } 333 + } 334 + 335 + /// Get a single feedgen from cache or hydrate it 336 + pub async fn get_or_hydrate_single( 337 + &self, 338 + actor_id: i32, 339 + rkey: String, 340 + uri: String, 341 + hydrator: &StatefulHydrator<'_>, 342 + ) -> Option<GeneratorView> { 343 + // Check cache first using (actor_id, rkey) 344 + if let Some(feedgen) = self.cache.get(&(actor_id, rkey.clone())).await { 345 + tracing::debug!(actor_id, rkey, "Feedgen cache hit"); 346 + return Some(feedgen); 347 + } 348 + 349 + // Cache miss - hydrate the feedgen using URI 350 + tracing::debug!(actor_id, rkey, uri = %uri, "Feedgen cache miss, hydrating"); 351 + 352 + // We're allowed to call the deprecated method here since we're the cache 353 + #[allow(deprecated)] 354 + let feedgen = hydrator.hydrate_feedgen(uri).await?; 355 + 356 + // Store in cache using (actor_id, rkey) as key 357 + self.cache.insert((actor_id, rkey.clone()), feedgen.clone()).await; 358 + tracing::debug!(actor_id, rkey, "Feedgen cached"); 359 + 360 + Some(feedgen) 361 + } 362 + 363 + /// Get multiple feedgens with caching by parsing URIs 364 + /// Returns a HashMap for efficient lookups 365 + pub async fn get_or_hydrate_from_uris( 366 + &self, 367 + uris: Vec<String>, 368 + pool: &Pool<AsyncPgConnection>, 369 + id_cache: &Arc<IdCache>, 370 + hydrator: &StatefulHydrator<'_>, 371 + ) -> std::collections::HashMap<String, GeneratorView> { 372 + let mut results = Vec::with_capacity(uris.len()); 373 + let mut missing_uris = Vec::new(); 374 + let mut missing_indices = Vec::new(); 375 + 376 + // Try to get each feedgen from cache 377 + for (idx, uri) in uris.iter().enumerate() { 378 + // Parse AT-URI: at://did/collection/rkey 379 + if let Some(parsed) = parse_at_uri(uri) { 380 + // Only process feedgens (app.bsky.feed.generator collection) 381 + if parsed.collection == "app.bsky.feed.generator" { 382 + // Get actor_id from DID 383 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 384 + pool, 385 + id_cache, 386 + &parsed.did 387 + ).await { 388 + // Check cache 389 + if let Some(feedgen) = self.cache.get(&(actor_id, parsed.rkey.clone())).await { 390 + tracing::debug!(actor_id, rkey = parsed.rkey, "Feedgen cache hit"); 391 + results.push(Some(feedgen)); 392 + continue; 393 + } 394 + } 395 + } 396 + } 397 + 398 + // Cache miss or couldn't parse - need to hydrate 399 + missing_uris.push(uri.clone()); 400 + missing_indices.push(idx); 401 + results.push(None); 402 + } 403 + 404 + // Hydrate missing feedgens if any 405 + if !missing_uris.is_empty() { 406 + tracing::debug!("Hydrating {} missing feedgens", missing_uris.len()); 407 + 408 + // We're allowed to call the deprecated method here since we're the cache 409 + #[allow(deprecated)] 410 + let hydrated = hydrator.hydrate_feedgens(missing_uris).await; 411 + 412 + // Store hydrated feedgens in cache and results 413 + for (idx, uri) in missing_indices.into_iter().zip(hydrated.keys()) { 414 + if let Some(feedgen) = hydrated.get(uri) { 415 + // Try to cache it if we can parse the URI 416 + if let Some(parsed) = parse_at_uri(uri) { 417 + if parsed.collection == "app.bsky.feed.generator" { 418 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 419 + pool, 420 + id_cache, 421 + &parsed.did 422 + ).await { 423 + self.cache.insert((actor_id, parsed.rkey.clone()), feedgen.clone()).await; 424 + tracing::debug!(actor_id, rkey = parsed.rkey, "Feedgen cached"); 425 + } 426 + } 427 + } 428 + 429 + results[idx] = Some(feedgen.clone()); 430 + } 431 + } 432 + } 433 + 434 + // Build HashMap from successfully loaded feedgens 435 + let mut feedgens_map = std::collections::HashMap::new(); 436 + for (uri, feedgen_opt) in uris.into_iter().zip(results.into_iter()) { 437 + if let Some(feedgen) = feedgen_opt { 438 + feedgens_map.insert(uri, feedgen); 439 + } 440 + } 441 + feedgens_map 442 + } 443 + 444 + pub async fn get(&self, actor_id: i32, rkey: &str) -> Option<GeneratorView> { 445 + let feedgen = self.cache.get(&(actor_id, rkey.to_string())).await?; 446 + tracing::debug!(actor_id, rkey, "Feedgen cache hit"); 447 + Some(feedgen) 448 + } 449 + 450 + pub async fn set(&self, actor_id: i32, rkey: &str, feedgen: GeneratorView) { 451 + self.cache.insert((actor_id, rkey.to_string()), feedgen).await; 452 + tracing::debug!(actor_id, rkey, "Feedgen cached"); 453 + } 454 + 455 + pub async fn invalidate(&self, actor_id: i32, rkey: &str) { 456 + self.cache.invalidate(&(actor_id, rkey.to_string())).await; 457 + tracing::debug!(actor_id, rkey, "Feedgen cache invalidated"); 458 + } 459 + 460 + /// Invalidate all entries (for testing/admin) 461 + pub async fn invalidate_all(&self) { 462 + self.cache.invalidate_all(); 463 + tracing::info!("All feedgen cache entries invalidated"); 464 + } 465 + } 466 + 467 + /// List cache using (actor_id, rkey) as key 468 + #[derive(Clone)] 469 + pub struct ListCache { 470 + cache: Cache<(i32, String), ListView>, 471 + } 472 + 473 + impl ListCache { 474 + pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 475 + let cache = Cache::builder() 476 + .max_capacity(max_capacity) 477 + .time_to_live(Duration::from_secs(ttl_secs)) 478 + .support_invalidation_closures() 479 + .build(); 480 + 481 + Self { cache } 482 + } 483 + 484 + /// Get a single list from cache or hydrate it 485 + pub async fn get_or_hydrate_single( 486 + &self, 487 + actor_id: i32, 488 + rkey: String, 489 + uri: String, 490 + hydrator: &StatefulHydrator<'_>, 491 + ) -> Option<ListView> { 492 + // Check cache first using (actor_id, rkey) 493 + if let Some(list) = self.cache.get(&(actor_id, rkey.clone())).await { 494 + tracing::debug!(actor_id, rkey, "List cache hit"); 495 + return Some(list); 496 + } 497 + 498 + // Cache miss - hydrate the list using URI 499 + tracing::debug!(actor_id, rkey, uri = %uri, "List cache miss, hydrating"); 500 + 501 + // We're allowed to call the deprecated method here since we're the cache 502 + #[allow(deprecated)] 503 + let list = hydrator.hydrate_list(uri).await?; 504 + 505 + // Store in cache using (actor_id, rkey) as key 506 + self.cache.insert((actor_id, rkey.clone()), list.clone()).await; 507 + tracing::debug!(actor_id, rkey, "List cached"); 508 + 509 + Some(list) 510 + } 511 + 512 + /// Get multiple lists with caching by parsing URIs 513 + /// Returns a HashMap for efficient lookups 514 + pub async fn get_or_hydrate_from_uris( 515 + &self, 516 + uris: Vec<String>, 517 + pool: &Pool<AsyncPgConnection>, 518 + id_cache: &Arc<IdCache>, 519 + hydrator: &StatefulHydrator<'_>, 520 + ) -> std::collections::HashMap<String, ListView> { 521 + let mut results = Vec::with_capacity(uris.len()); 522 + let mut missing_uris = Vec::new(); 523 + let mut missing_indices = Vec::new(); 524 + 525 + // Try to get each list from cache 526 + for (idx, uri) in uris.iter().enumerate() { 527 + // Parse AT-URI: at://did/collection/rkey 528 + if let Some(parsed) = parse_at_uri(uri) { 529 + // Only process lists (app.bsky.graph.list collection) 530 + if parsed.collection == "app.bsky.graph.list" { 531 + // Get actor_id from DID 532 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 533 + pool, 534 + id_cache, 535 + &parsed.did 536 + ).await { 537 + // Check cache 538 + if let Some(list) = self.cache.get(&(actor_id, parsed.rkey.clone())).await { 539 + tracing::debug!(actor_id, rkey = parsed.rkey, "List cache hit"); 540 + results.push(Some(list)); 541 + continue; 542 + } 543 + } 544 + } 545 + } 546 + 547 + // Cache miss or couldn't parse - need to hydrate 548 + missing_uris.push(uri.clone()); 549 + missing_indices.push(idx); 550 + results.push(None); 551 + } 552 + 553 + // Hydrate missing lists if any 554 + if !missing_uris.is_empty() { 555 + tracing::debug!("Hydrating {} missing lists", missing_uris.len()); 556 + 557 + // We're allowed to call the deprecated method here since we're the cache 558 + #[allow(deprecated)] 559 + let hydrated = hydrator.hydrate_lists(missing_uris).await; 560 + 561 + // Store hydrated lists in cache and results 562 + for (idx, uri) in missing_indices.into_iter().zip(hydrated.keys()) { 563 + if let Some(list) = hydrated.get(uri) { 564 + // Try to cache it if we can parse the URI 565 + if let Some(parsed) = parse_at_uri(uri) { 566 + if parsed.collection == "app.bsky.graph.list" { 567 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 568 + pool, 569 + id_cache, 570 + &parsed.did 571 + ).await { 572 + self.cache.insert((actor_id, parsed.rkey.clone()), list.clone()).await; 573 + tracing::debug!(actor_id, rkey = parsed.rkey, "List cached"); 574 + } 575 + } 576 + } 577 + 578 + results[idx] = Some(list.clone()); 579 + } 580 + } 581 + } 582 + 583 + // Build HashMap from successfully loaded lists 584 + let mut lists_map = std::collections::HashMap::new(); 585 + for (uri, list_opt) in uris.into_iter().zip(results.into_iter()) { 586 + if let Some(list) = list_opt { 587 + lists_map.insert(uri, list); 588 + } 589 + } 590 + lists_map 591 + } 592 + 593 + pub async fn get(&self, actor_id: i32, rkey: &str) -> Option<ListView> { 594 + let list = self.cache.get(&(actor_id, rkey.to_string())).await?; 595 + tracing::debug!(actor_id, rkey, "List cache hit"); 596 + Some(list) 597 + } 598 + 599 + pub async fn set(&self, actor_id: i32, rkey: &str, list: ListView) { 600 + self.cache.insert((actor_id, rkey.to_string()), list).await; 601 + tracing::debug!(actor_id, rkey, "List cached"); 602 + } 603 + 604 + pub async fn invalidate(&self, actor_id: i32, rkey: &str) { 605 + self.cache.invalidate(&(actor_id, rkey.to_string())).await; 606 + tracing::debug!(actor_id, rkey, "List cache invalidated"); 607 + } 608 + 609 + /// Invalidate all entries (for testing/admin) 610 + pub async fn invalidate_all(&self) { 611 + self.cache.invalidate_all(); 612 + tracing::info!("All list cache entries invalidated"); 613 + } 614 + } 615 + 616 + /// Starterpack cache using (actor_id, rkey) as key 617 + #[derive(Clone)] 618 + pub struct StarterpackCache { 619 + cache: Cache<(i32, String), StarterPackViewBasic>, 620 + } 621 + 622 + impl StarterpackCache { 623 + pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 624 + let cache = Cache::builder() 625 + .max_capacity(max_capacity) 626 + .time_to_live(Duration::from_secs(ttl_secs)) 627 + .support_invalidation_closures() 628 + .build(); 629 + 630 + Self { cache } 631 + } 632 + 633 + /// Get a single starterpack from cache or hydrate it 634 + pub async fn get_or_hydrate_single( 635 + &self, 636 + actor_id: i32, 637 + rkey: String, 638 + uri: String, 639 + hydrator: &StatefulHydrator<'_>, 640 + ) -> Option<StarterPackViewBasic> { 641 + // Check cache first using (actor_id, rkey) 642 + if let Some(pack) = self.cache.get(&(actor_id, rkey.clone())).await { 643 + tracing::debug!(actor_id, rkey, "Starterpack cache hit"); 644 + return Some(pack); 645 + } 646 + 647 + // Cache miss - hydrate the starterpack using URI 648 + tracing::debug!(actor_id, rkey, uri = %uri, "Starterpack cache miss, hydrating"); 649 + 650 + // We're allowed to call the deprecated method here since we're the cache 651 + #[allow(deprecated)] 652 + let packs = hydrator.hydrate_starterpacks_basic(vec![uri]).await; 653 + let pack = packs.into_values().next()?; 654 + 655 + // Store in cache using (actor_id, rkey) as key 656 + self.cache.insert((actor_id, rkey.clone()), pack.clone()).await; 657 + tracing::debug!(actor_id, rkey, "Starterpack cached"); 658 + 659 + Some(pack) 660 + } 661 + 662 + /// Get multiple starterpacks with caching by parsing URIs 663 + /// Returns a HashMap for efficient lookups 664 + pub async fn get_or_hydrate_from_uris( 665 + &self, 666 + uris: Vec<String>, 667 + pool: &Pool<AsyncPgConnection>, 668 + id_cache: &Arc<IdCache>, 669 + hydrator: &StatefulHydrator<'_>, 670 + ) -> std::collections::HashMap<String, StarterPackViewBasic> { 671 + let mut results = Vec::with_capacity(uris.len()); 672 + let mut missing_uris = Vec::new(); 673 + let mut missing_indices = Vec::new(); 674 + 675 + // Try to get each starterpack from cache 676 + for (idx, uri) in uris.iter().enumerate() { 677 + // Parse AT-URI: at://did/collection/rkey 678 + if let Some(parsed) = parse_at_uri(uri) { 679 + // Only process starterpacks (app.bsky.graph.starterpack collection) 680 + if parsed.collection == "app.bsky.graph.starterpack" { 681 + // Get actor_id from DID 682 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 683 + pool, 684 + id_cache, 685 + &parsed.did 686 + ).await { 687 + // Check cache 688 + if let Some(pack) = self.cache.get(&(actor_id, parsed.rkey.clone())).await { 689 + tracing::debug!(actor_id, rkey = parsed.rkey, "Starterpack cache hit"); 690 + results.push(Some(pack)); 691 + continue; 692 + } 693 + } 694 + } 695 + } 696 + 697 + // Cache miss or couldn't parse - need to hydrate 698 + missing_uris.push(uri.clone()); 699 + missing_indices.push(idx); 700 + results.push(None); 701 + } 702 + 703 + // Hydrate missing starterpacks if any 704 + if !missing_uris.is_empty() { 705 + tracing::debug!("Hydrating {} missing starterpacks", missing_uris.len()); 706 + 707 + // We're allowed to call the deprecated method here since we're the cache 708 + #[allow(deprecated)] 709 + let hydrated = hydrator.hydrate_starterpacks_basic(missing_uris).await; 710 + 711 + // Store hydrated starterpacks in cache and results 712 + for (idx, uri) in missing_indices.into_iter().zip(hydrated.keys()) { 713 + if let Some(pack) = hydrated.get(uri) { 714 + // Try to cache it if we can parse the URI 715 + if let Some(parsed) = parse_at_uri(uri) { 716 + if parsed.collection == "app.bsky.graph.starterpack" { 717 + if let Ok(actor_id) = id_cache_helpers::get_actor_id_or_fetch( 718 + pool, 719 + id_cache, 720 + &parsed.did 721 + ).await { 722 + self.cache.insert((actor_id, parsed.rkey.clone()), pack.clone()).await; 723 + tracing::debug!(actor_id, rkey = parsed.rkey, "Starterpack cached"); 724 + } 725 + } 726 + } 727 + 728 + results[idx] = Some(pack.clone()); 729 + } 730 + } 731 + } 732 + 733 + // Build HashMap from successfully loaded starterpacks 734 + let mut packs_map = std::collections::HashMap::new(); 735 + for (uri, pack_opt) in uris.into_iter().zip(results.into_iter()) { 736 + if let Some(pack) = pack_opt { 737 + packs_map.insert(uri, pack); 738 + } 739 + } 740 + packs_map 741 + } 742 + 743 + pub async fn get(&self, actor_id: i32, rkey: &str) -> Option<StarterPackViewBasic> { 744 + let pack = self.cache.get(&(actor_id, rkey.to_string())).await?; 745 + tracing::debug!(actor_id, rkey, "Starterpack cache hit"); 746 + Some(pack) 747 + } 748 + 749 + pub async fn set(&self, actor_id: i32, rkey: &str, pack: StarterPackViewBasic) { 750 + self.cache.insert((actor_id, rkey.to_string()), pack).await; 751 + tracing::debug!(actor_id, rkey, "Starterpack cached"); 752 + } 753 + 754 + pub async fn invalidate(&self, actor_id: i32, rkey: &str) { 755 + self.cache.invalidate(&(actor_id, rkey.to_string())).await; 756 + tracing::debug!(actor_id, rkey, "Starterpack cache invalidated"); 757 + } 758 + 759 + /// Invalidate all entries (for testing/admin) 760 + pub async fn invalidate_all(&self) { 761 + self.cache.invalidate_all(); 762 + tracing::info!("All starterpack cache entries invalidated"); 763 + } 764 + } 765 + 766 + /// Labeler cache using actor_id as key 767 + /// Note: Labelers always use "self" as rkey 768 + #[derive(Clone)] 769 + pub struct LabelerCache { 770 + cache: Cache<i32, serde_json::Value>, // Using Value since we don't have HydratedLabeler type 771 + } 772 + 773 + impl LabelerCache { 774 + pub fn new(ttl_secs: u64, max_capacity: u64) -> Self { 775 + let cache = Cache::builder() 776 + .max_capacity(max_capacity) 777 + .time_to_live(Duration::from_secs(ttl_secs)) 778 + .support_invalidation_closures() 779 + .build(); 780 + 781 + Self { cache } 782 + } 783 + 784 + pub async fn get(&self, actor_id: i32) -> Option<serde_json::Value> { 785 + let labeler = self.cache.get(&actor_id).await?; 786 + tracing::debug!(actor_id, "Labeler cache hit"); 787 + Some(labeler) 788 + } 789 + 790 + pub async fn set(&self, actor_id: i32, labeler: serde_json::Value) { 791 + self.cache.insert(actor_id, labeler).await; 792 + tracing::debug!(actor_id, "Labeler cached"); 793 + } 794 + 795 + pub async fn invalidate(&self, actor_id: i32) { 796 + self.cache.invalidate(&actor_id).await; 797 + tracing::debug!(actor_id, "Labeler cache invalidated"); 798 + } 799 + 800 + /// Invalidate all entries (for testing/admin) 801 + pub async fn invalidate_all(&self) { 802 + self.cache.invalidate_all(); 803 + tracing::info!("All labeler cache entries invalidated"); 804 + } 805 + }
+8
parakeet/src/hydration/feedgen.rs
··· 53 53 } 54 54 55 55 impl super::StatefulHydrator<'_> { 56 + #[deprecated( 57 + since = "0.1.0", 58 + note = "Use FeedgenCache::get_or_hydrate_single() to ensure caching. Direct hydration bypasses the cache." 59 + )] 56 60 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 57 61 let labels = self.get_label(&feedgen).await; 58 62 let viewer = self.get_feedgen_viewer_state(&feedgen).await; ··· 85 89 )) 86 90 } 87 91 92 + #[deprecated( 93 + since = "0.1.0", 94 + note = "Use FeedgenCache::get_or_hydrate_from_uris() to ensure caching. Direct hydration bypasses the cache." 95 + )] 88 96 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 89 97 let labels = self.get_label_many(&feedgens).await; 90 98 let viewers = self.get_feedgen_viewer_states(&feedgens).await;
+8
parakeet/src/hydration/list.rs
··· 160 160 .collect() 161 161 } 162 162 163 + #[deprecated( 164 + since = "0.1.0", 165 + note = "Use ListCache::get_or_hydrate_single() to ensure caching. Direct hydration bypasses the cache." 166 + )] 163 167 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 164 168 let labels = self.get_label(&list).await; 165 169 let viewer = self.get_list_viewer_state(&list).await; ··· 189 193 build_listview(enriched, count, profile, labels, viewer, &self.cdn) 190 194 } 191 195 196 + #[deprecated( 197 + since = "0.1.0", 198 + note = "Use ListCache::get_or_hydrate_from_uris() to ensure caching. Direct hydration bypasses the cache." 199 + )] 192 200 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 193 201 if lists.is_empty() { 194 202 return HashMap::new();
+8
parakeet/src/hydration/posts/mod.rs
··· 308 308 (results, reply_actor_cache) 309 309 } 310 310 311 + /// Hydrate multiple posts 312 + /// 313 + /// **DEPRECATED**: This method bypasses caching. Use PostCache::get_or_hydrate_from_uris() instead. 314 + /// Direct hydration should only be called from within the cache system. 315 + #[deprecated( 316 + since = "0.1.0", 317 + note = "Use PostCache::get_or_hydrate_from_uris() to ensure caching. Direct hydration bypasses the cache." 318 + )] 311 319 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 312 320 let (posts_data, actor_cache) = self.hydrate_posts_inner(posts).await; 313 321
+8
parakeet/src/hydration/profile/mod.rs
··· 280 280 .collect() 281 281 } 282 282 283 + /// Get detailed profile data 284 + /// 285 + /// **DEPRECATED**: This method bypasses caching. Use ProfileCache::get_or_hydrate() instead. 286 + /// Direct hydration should only be called from within the cache system. 287 + #[deprecated( 288 + since = "0.1.0", 289 + note = "Use ProfileCache::get_or_hydrate() to ensure caching. Direct hydration bypasses the cache." 290 + )] 283 291 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 284 292 let labels = self.get_profile_label(&did).await; 285 293 let viewer = self.get_profile_viewer_state(&did).await;
+8
parakeet/src/hydration/starter_packs.rs
··· 99 99 Some(build_basic(enriched, creator, labels, list_item_count)) 100 100 } 101 101 102 + #[deprecated( 103 + since = "0.1.0", 104 + note = "Use StarterpackCache::get_or_hydrate_from_uris() to ensure caching. Direct hydration bypasses the cache." 105 + )] 102 106 pub async fn hydrate_starterpacks_basic( 103 107 &self, 104 108 packs: Vec<String>, ··· 205 209 .collect() 206 210 } 207 211 212 + #[deprecated( 213 + since = "0.1.0", 214 + note = "Use starterpack-specific cache or hydrate_starterpack_basic. StarterPackView (full) and StarterPackViewBasic are different types." 215 + )] 208 216 pub async fn hydrate_starterpack(&self, pack: String) -> Option<StarterPackView> { 209 217 let labels = self.get_label(&pack).await; 210 218
+7
parakeet/src/lib.rs
··· 11 11 pub mod cache_listener; 12 12 pub mod config; 13 13 pub mod db; 14 + pub mod entity_cache; 14 15 pub mod hydration; 15 16 pub mod id_cache_helpers; 16 17 pub mod loaders; ··· 33 34 pub rate_limit_config: config::ConfigRateLimit, 34 35 pub timeline_cache: Arc<timeline_cache::TimelineCache>, 35 36 pub author_feed_cache: Arc<timeline_cache::AuthorFeedCache>, 37 + pub profile_cache: Arc<entity_cache::ProfileCache>, 38 + pub post_cache: Arc<entity_cache::PostCache>, 39 + pub feedgen_cache: Arc<entity_cache::FeedgenCache>, 40 + pub list_cache: Arc<entity_cache::ListCache>, 41 + pub starterpack_cache: Arc<entity_cache::StarterpackCache>, 42 + pub labeler_cache: Arc<entity_cache::LabelerCache>, 36 43 pub http_client: reqwest::Client, 37 44 }
+14
parakeet/src/main.rs
··· 110 110 // Initialize author feed cache (60 second TTL, 10k max items) 111 111 let author_feed_cache = Arc::new(timeline_cache::AuthorFeedCache::new(60, 10_000)); 112 112 113 + // Initialize entity caches (60 second TTL, varying capacities) 114 + let profile_cache = Arc::new(entity_cache::ProfileCache::new(60, 5_000)); 115 + let post_cache = Arc::new(entity_cache::PostCache::new(60, 10_000)); 116 + let feedgen_cache = Arc::new(entity_cache::FeedgenCache::new(60, 1_000)); 117 + let list_cache = Arc::new(entity_cache::ListCache::new(60, 2_000)); 118 + let starterpack_cache = Arc::new(entity_cache::StarterpackCache::new(60, 1_000)); 119 + let labeler_cache = Arc::new(entity_cache::LabelerCache::new(60, 500)); 120 + 113 121 GlobalState { 114 122 pool, 115 123 dataloaders, ··· 122 130 rate_limit_config: conf.rate_limit.clone(), 123 131 timeline_cache, 124 132 author_feed_cache, 133 + profile_cache, 134 + post_cache, 135 + feedgen_cache, 136 + list_cache, 137 + starterpack_cache, 138 + labeler_cache, 125 139 http_client, 126 140 } 127 141 };
+198
parakeet/src/unified_cache.rs
··· 1 + //! Unified caching system that owns hydration 2 + //! 3 + //! This module provides the ONLY way to get hydrated data in the application. 4 + //! All hydration must go through the cache to ensure proper caching behavior. 5 + 6 + use moka::future::Cache; 7 + use std::sync::Arc; 8 + use std::time::Duration; 9 + 10 + use diesel_async::pooled_connection::deadpool::Pool; 11 + use diesel_async::AsyncPgConnection; 12 + 13 + use lexica::app_bsky::actor::ProfileViewDetailed; 14 + use lexica::app_bsky::feed::PostView; 15 + 16 + use crate::hydration::StatefulHydrator; 17 + use crate::loaders::Dataloaders; 18 + use crate::xrpc::cdn::BskyCdn; 19 + use crate::id_cache_helpers; 20 + use parakeet_db::id_cache::IdCache; 21 + 22 + /// The unified cache system that owns all hydration 23 + /// 24 + /// This is the ONLY way to get hydrated data in the application. 25 + /// Direct access to hydration is not allowed to ensure caching is always used. 26 + #[derive(Clone)] 27 + pub struct UnifiedCache { 28 + profile_cache: Cache<i32, ProfileViewDetailed>, 29 + post_cache: Cache<(i32, i64), PostView>, 30 + 31 + // Dependencies needed for hydration 32 + dataloaders: Arc<Dataloaders>, 33 + cdn: Arc<BskyCdn>, 34 + pool: Pool<AsyncPgConnection>, 35 + id_cache: Arc<IdCache>, 36 + } 37 + 38 + impl UnifiedCache { 39 + pub fn new( 40 + dataloaders: Arc<Dataloaders>, 41 + cdn: Arc<BskyCdn>, 42 + pool: Pool<AsyncPgConnection>, 43 + id_cache: Arc<IdCache>, 44 + profile_ttl: u64, 45 + post_ttl: u64, 46 + ) -> Self { 47 + let profile_cache = Cache::builder() 48 + .max_capacity(5_000) 49 + .time_to_live(Duration::from_secs(profile_ttl)) 50 + .support_invalidation_closures() 51 + .build(); 52 + 53 + let post_cache = Cache::builder() 54 + .max_capacity(10_000) 55 + .time_to_live(Duration::from_secs(post_ttl)) 56 + .support_invalidation_closures() 57 + .build(); 58 + 59 + Self { 60 + profile_cache, 61 + post_cache, 62 + dataloaders, 63 + cdn, 64 + pool, 65 + id_cache, 66 + } 67 + } 68 + 69 + /// Get a profile - the ONLY way to get profile data 70 + /// 71 + /// This method handles caching and hydration internally. 72 + /// No direct access to hydration is allowed. 73 + pub async fn get_profile( 74 + &self, 75 + did: String, 76 + labelers: &[String], 77 + viewer_did: Option<String>, 78 + ) -> Option<ProfileViewDetailed> { 79 + // First, get the actor_id for caching 80 + let actor_id = id_cache_helpers::get_actor_id_or_fetch(&self.pool, &self.id_cache, &did).await.ok()?; 81 + 82 + // Check cache 83 + if let Some(profile) = self.profile_cache.get(&actor_id).await { 84 + tracing::debug!(actor_id, "Profile cache hit"); 85 + return Some(profile); 86 + } 87 + 88 + // Cache miss - hydrate using internal hydrator 89 + tracing::debug!(actor_id, "Profile cache miss, hydrating"); 90 + 91 + // Get viewer actor_id if provided 92 + let viewer_actor_id = if let Some(ref viewer) = viewer_did { 93 + id_cache_helpers::get_actor_id_or_fetch(&self.pool, &self.id_cache, viewer).await.ok() 94 + } else { 95 + None 96 + }; 97 + 98 + // Create hydrator for this request 99 + let hydrator = StatefulHydrator::new( 100 + &self.dataloaders, 101 + &self.cdn, 102 + labelers, 103 + viewer_did, 104 + viewer_actor_id, 105 + ).await; 106 + 107 + // Hydrate the profile 108 + let profile = hydrator.hydrate_profile_detailed(did).await?; 109 + 110 + // Store in cache 111 + self.profile_cache.insert(actor_id, profile.clone()).await; 112 + tracing::debug!(actor_id, "Profile cached"); 113 + 114 + Some(profile) 115 + } 116 + 117 + /// Get multiple profiles 118 + /// 119 + /// For now, this doesn't use individual caching but could be optimized 120 + pub async fn get_profiles( 121 + &self, 122 + dids: Vec<String>, 123 + labelers: &[String], 124 + viewer_did: Option<String>, 125 + ) -> Vec<ProfileViewDetailed> { 126 + // Get viewer actor_id if provided 127 + let viewer_actor_id = if let Some(ref viewer) = viewer_did { 128 + id_cache_helpers::get_actor_id_or_fetch(&self.pool, &self.id_cache, viewer).await.ok() 129 + } else { 130 + None 131 + }; 132 + 133 + // Create hydrator for this request 134 + let hydrator = StatefulHydrator::new( 135 + &self.dataloaders, 136 + &self.cdn, 137 + labelers, 138 + viewer_did, 139 + viewer_actor_id, 140 + ).await; 141 + 142 + // TODO: Use individual caching per profile 143 + hydrator.hydrate_profiles_detailed(dids) 144 + .await 145 + .into_values() 146 + .collect() 147 + } 148 + 149 + /// Get posts from AT-URIs 150 + pub async fn get_posts_from_uris( 151 + &self, 152 + uris: Vec<String>, 153 + labelers: &[String], 154 + viewer_did: Option<String>, 155 + ) -> Vec<PostView> { 156 + // This would use the same pattern as profiles 157 + // For now, keeping it simple 158 + 159 + let viewer_actor_id = if let Some(ref viewer) = viewer_did { 160 + id_cache_helpers::get_actor_id_or_fetch(&self.pool, &self.id_cache, viewer).await.ok() 161 + } else { 162 + None 163 + }; 164 + 165 + let hydrator = StatefulHydrator::new( 166 + &self.dataloaders, 167 + &self.cdn, 168 + labelers, 169 + viewer_did, 170 + viewer_actor_id, 171 + ).await; 172 + 173 + // TODO: Parse URIs and use individual post caching 174 + hydrator.hydrate_posts(uris) 175 + .await 176 + .into_values() 177 + .collect() 178 + } 179 + 180 + /// Invalidate a profile by actor_id 181 + pub async fn invalidate_profile(&self, actor_id: i32) { 182 + self.profile_cache.invalidate(&actor_id).await; 183 + tracing::debug!(actor_id, "Profile cache invalidated"); 184 + } 185 + 186 + /// Invalidate a post by actor_id and rkey 187 + pub async fn invalidate_post(&self, actor_id: i32, rkey: i64) { 188 + self.post_cache.invalidate(&(actor_id, rkey)).await; 189 + tracing::debug!(actor_id, rkey, "Post cache invalidated"); 190 + } 191 + 192 + /// Clear all caches (for admin/testing) 193 + pub async fn clear_all(&self) { 194 + self.profile_cache.invalidate_all(); 195 + self.post_cache.invalidate_all(); 196 + tracing::info!("All caches cleared"); 197 + } 198 + }
+10 -8
parakeet/src/xrpc/app_bsky/actor.rs
··· 46 46 let mut conn = state.pool.get().await?; 47 47 check_actor_status(&state.pool, &state.id_cache, &did).await?; 48 48 49 - // Hydrate the profile from our data 50 - let profile = hyd 51 - .hydrate_profile_detailed(did) 49 + // Get actor_id for cache key 50 + let actor_id = crate::id_cache_helpers::get_actor_id_or_fetch(&state.pool, &state.id_cache, &did).await?; 51 + 52 + // Use the ergonomic cache API - it handles both caching and hydration 53 + let profile = state.profile_cache 54 + .get_or_hydrate(actor_id, did, &hyd) 52 55 .await 53 56 .ok_or_else(Error::not_found)?; 54 57 ··· 85 88 86 89 let dids = get_actor_dids(&state.dataloaders, query.actors).await; 87 90 88 - let profiles = hyd 89 - .hydrate_profiles_detailed(dids) 90 - .await 91 - .into_values() 92 - .collect(); 91 + // Use the cache's batch method instead of direct hydration 92 + let profiles = state.profile_cache 93 + .get_or_hydrate_batch(dids, &state.pool, &state.id_cache, &hyd) 94 + .await; 93 95 94 96 Ok(Json(GetProfilesRes { profiles }).into_response()) 95 97 }
+7 -1
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 214 214 }) 215 215 .collect(); 216 216 217 - let mut posts = hyd.hydrate_posts(uris).await; 217 + // Use cache for post hydration (returns HashMap directly) 218 + let mut posts = state.post_cache.get_or_hydrate_from_uris( 219 + uris, 220 + &state.pool, 221 + &state.id_cache, 222 + &hyd, 223 + ).await; 218 224 219 225 let bookmarks = results 220 226 .into_iter()
+53 -9
parakeet/src/xrpc/app_bsky/feed/feedgen.rs
··· 75 75 }) 76 76 .collect(); 77 77 78 - let mut feeds = hyd.hydrate_feedgens(at_uris).await; 78 + // Use cache for feedgen hydration (returns HashMap directly) 79 + let mut feeds_map = state.feedgen_cache.get_or_hydrate_from_uris( 80 + at_uris.clone(), 81 + &state.pool, 82 + &state.id_cache, 83 + &hyd, 84 + ).await; 79 85 80 86 let feeds = results 81 87 .into_iter() 82 88 .filter_map(|r| { 83 89 let did = actor_id_to_did.get(&r.1)?; 84 90 let at_uri = format!("at://{}/app.bsky.feed.generator/{}", did, r.2); 85 - feeds.remove(&at_uri) 91 + feeds_map.remove(&at_uri) 86 92 }) 87 93 .collect(); 88 94 ··· 117 123 }; 118 124 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 119 125 120 - let Some(view) = hyd.hydrate_feedgen(query.feed).await else { 126 + // Parse the feed URI to extract actor_id and rkey for caching 127 + let view = if let Some(parsed) = crate::entity_cache::parse_at_uri(&query.feed) { 128 + if parsed.collection == "app.bsky.feed.generator" { 129 + // Get actor_id from DID 130 + if let Ok(actor_id) = crate::id_cache_helpers::get_actor_id_or_fetch( 131 + &state.pool, 132 + &state.id_cache, 133 + &parsed.did 134 + ).await { 135 + // Use cache for single feedgen 136 + state.feedgen_cache.get_or_hydrate_single( 137 + actor_id, 138 + parsed.rkey, 139 + query.feed.clone(), 140 + &hyd, 141 + ).await 142 + } else { 143 + None 144 + } 145 + } else { 146 + None 147 + } 148 + } else { 149 + None 150 + }; 151 + 152 + let Some(view) = view else { 121 153 return Err(Error::not_found()); 122 154 }; 123 155 ··· 154 186 }; 155 187 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 156 188 157 - let feeds = hyd 158 - .hydrate_feedgens(query.feeds) 159 - .await 160 - .into_values() 161 - .collect(); 189 + // Use cache for batch feedgen hydration (returns HashMap) 190 + let feeds_map = state.feedgen_cache.get_or_hydrate_from_uris( 191 + query.feeds, 192 + &state.pool, 193 + &state.id_cache, 194 + &hyd, 195 + ).await; 196 + 197 + // Convert to Vec for API response 198 + let feeds = feeds_map.into_values().collect(); 162 199 163 200 Ok(Json(GetFeedGeneratorsRes { feeds })) 164 201 } ··· 251 288 (None, None) 252 289 }; 253 290 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 254 - let mut feeds_map = hyd.hydrate_feedgens(page_uris.clone()).await; 291 + 292 + // Use cache for batch feedgen hydration (returns HashMap directly) 293 + let mut feeds_map = state.feedgen_cache.get_or_hydrate_from_uris( 294 + page_uris.clone(), 295 + &state.pool, 296 + &state.id_cache, 297 + &hyd, 298 + ).await; 255 299 256 300 let feeds: Vec<GeneratorView> = page_uris 257 301 .into_iter()
+20 -5
parakeet/src/xrpc/app_bsky/feed/get_timeline.rs
··· 5 5 use crate::GlobalState; 6 6 use axum::extract::{Query, State}; 7 7 use axum::Json; 8 - use lexica::app_bsky::feed::{FeedReasonRepost, FeedViewPost, FeedViewPostReason}; 8 + use lexica::app_bsky::feed::{FeedReasonRepost, FeedViewPost, FeedViewPostReason, PostView}; 9 9 use serde::{Deserialize, Serialize}; 10 10 use std::collections::HashMap; 11 11 ··· 189 189 // OPTIMIZATION: Use the original (actor_id, rkey) results for repost query instead of re-parsing URIs 190 190 let post_keys: Vec<(i32, i64)> = results.iter().map(|(_, actor_id, rkey)| (*actor_id, *rkey)).collect(); 191 191 192 - // Parallelize: hydrate posts and fetch repost data concurrently 192 + // Parallelize: hydrate posts (using cache) and fetch repost data concurrently 193 193 step_timer = std::time::Instant::now(); 194 194 let (mut post_views, reposts_results) = tokio::join!( 195 - hyd.hydrate_posts(at_uris.clone()), 195 + state.post_cache.get_or_hydrate_from_uris( 196 + at_uris.clone(), 197 + &state.pool, 198 + &state.id_cache, 199 + &hyd, 200 + ), 196 201 async { 197 202 crate::db::get_timeline_reposts(&mut conn, &followed_actor_ids, &post_keys) 198 203 .await ··· 321 326 tracing::error!("Failed to get DB connection for repost hydration: {}", e); 322 327 // Return posts without repost info by hydrating them 323 328 let uri_count = at_uris.len(); 324 - let mut post_views = hyd.hydrate_posts(at_uris.clone()).await; 329 + let mut post_views = state.post_cache.get_or_hydrate_from_uris( 330 + at_uris.clone(), 331 + &state.pool, 332 + &state.id_cache, 333 + hyd, 334 + ).await; 325 335 let feed: Vec<FeedViewPost> = at_uris 326 336 .into_iter() 327 337 .filter_map(|uri| { ··· 346 356 347 357 // Parallelize: hydrate posts and get followed actor_ids concurrently 348 358 let (mut post_views, follows) = tokio::join!( 349 - hyd.hydrate_posts(at_uris.clone()), 359 + state.post_cache.get_or_hydrate_from_uris( 360 + at_uris.clone(), 361 + &state.pool, 362 + &state.id_cache, 363 + hyd, 364 + ), 350 365 async { 351 366 crate::db::get_followed_dids(&mut conn, user_actor_id) 352 367 .await
+19 -5
parakeet/src/xrpc/app_bsky/feed/posts/queries.rs
··· 35 35 (None, None) 36 36 }; 37 37 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 38 - let posts = hyd.hydrate_posts(query.uris).await; 39 38 40 - Ok(Json(PostsRes { 41 - posts: posts.into_values().collect(), 42 - })) 39 + // Use the new method that parses URIs and caches individual posts 40 + let posts_map = state.post_cache.get_or_hydrate_from_uris( 41 + query.uris, 42 + &state.pool, 43 + &state.id_cache, 44 + &hyd, 45 + ).await; 46 + 47 + // Convert to Vec for API response 48 + let posts = posts_map.into_values().collect(); 49 + 50 + Ok(Json(PostsRes { posts })) 43 51 } 44 52 45 53 #[derive(Debug, Deserialize)] ··· 166 174 167 175 let cursor = uris.last().cloned(); 168 176 169 - let mut posts_map = hyd.hydrate_posts(uris.clone()).await; 177 + // Use cache for post hydration (returns HashMap directly) 178 + let mut posts_map = state.post_cache.get_or_hydrate_from_uris( 179 + uris.clone(), 180 + &state.pool, 181 + &state.id_cache, 182 + &hyd, 183 + ).await; 170 184 171 185 let posts = uris 172 186 .into_iter()
+14 -4
parakeet/src/xrpc/app_bsky/feed/posts/threads.rs
··· 1 1 use axum::extract::{Query, State}; 2 2 use axum::Json; 3 - use lexica::app_bsky::feed::{BlockedAuthor, ThreadViewPost, ThreadViewPostType, ThreadgateView}; 3 + use lexica::app_bsky::feed::{BlockedAuthor, PostView, ThreadViewPost, ThreadViewPostType, ThreadgateView}; 4 4 use serde::{Deserialize, Serialize}; 5 5 use std::collections::HashMap; 6 6 ··· 142 142 .map(|item: &ThreadItem| item.at_uri.clone()) 143 143 .collect(); 144 144 145 - // Parallelize: hydrate replies and parents concurrently 145 + // Parallelize: hydrate replies and parents concurrently using cache (returns HashMap directly) 146 146 let (mut replies_hydrated, mut parents_hydrated) = tokio::join!( 147 - hyd.hydrate_posts(reply_uris), 148 - hyd.hydrate_posts(parent_uris) 147 + state.post_cache.get_or_hydrate_from_uris( 148 + reply_uris, 149 + &state.pool, 150 + &state.id_cache, 151 + &hyd, 152 + ), 153 + state.post_cache.get_or_hydrate_from_uris( 154 + parent_uris, 155 + &state.pool, 156 + &state.id_cache, 157 + &hyd, 158 + ) 149 159 ); 150 160 151 161 let mut tmpbuf: HashMap<_, Vec<_>> = HashMap::new();
+9 -2
parakeet/src/xrpc/app_bsky/feed/search.rs
··· 192 192 (None, None) 193 193 }; 194 194 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 195 - let mut posts_map = hyd.hydrate_posts(uris.clone()).await; 195 + 196 + // Use cache for post hydration (returns HashMap directly) 197 + let posts_map = state.post_cache.get_or_hydrate_from_uris( 198 + uris.clone(), 199 + &state.pool, 200 + &state.id_cache, 201 + &hyd, 202 + ).await; 196 203 197 204 // Maintain search result order 198 205 let posts: Vec<PostView> = uris 199 206 .into_iter() 200 - .filter_map(|uri| posts_map.remove(&uri)) 207 + .filter_map(|uri| posts_map.get(&uri).cloned()) 201 208 .collect(); 202 209 203 210 // Calculate cursor (rank of last result)
+57 -9
parakeet/src/xrpc/app_bsky/graph/lists.rs
··· 74 74 .map(|r| format!("at://{}/app.bsky.graph.list/{}", did, r.1)) 75 75 .collect(); 76 76 77 - let mut lists = hyd.hydrate_lists(at_uris).await; 77 + // Use cache for list hydration (returns HashMap directly) 78 + let mut lists_map = state.list_cache.get_or_hydrate_from_uris( 79 + at_uris.clone(), 80 + &state.pool, 81 + &state.id_cache, 82 + &hyd, 83 + ).await; 78 84 79 85 let lists = results 80 86 .into_iter() 81 87 .filter_map(|r| { 82 88 let at_uri = format!("at://{}/app.bsky.graph.list/{}", did, r.1); 83 - lists.remove(&at_uri) 89 + lists_map.remove(&at_uri) 84 90 }) 85 91 .collect(); 86 92 ··· 111 117 }; 112 118 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 113 119 114 - let Some(list) = hyd.hydrate_list(query.list).await else { 120 + // Parse the list URI to extract actor_id and rkey for caching 121 + let list = if let Some(parsed) = crate::entity_cache::parse_at_uri(&query.list) { 122 + if parsed.collection == "app.bsky.graph.list" { 123 + // Get actor_id from DID 124 + if let Ok(actor_id) = crate::id_cache_helpers::get_actor_id_or_fetch( 125 + &state.pool, 126 + &state.id_cache, 127 + &parsed.did 128 + ).await { 129 + // Use cache for single list 130 + state.list_cache.get_or_hydrate_single( 131 + actor_id, 132 + parsed.rkey.clone(), 133 + query.list.clone(), 134 + &hyd, 135 + ).await 136 + } else { 137 + None 138 + } 139 + } else { 140 + None 141 + } 142 + } else { 143 + None 144 + }; 145 + 146 + let Some(list) = list else { 115 147 return Err(Error::not_found()); 116 148 }; 117 149 ··· 187 219 .last() 188 220 .map(|last| last.0.timestamp_millis().to_string()); 189 221 190 - let uris = results.iter().map(|r| r.1.clone()).collect(); 222 + let uris: Vec<String> = results.iter().map(|r| r.1.clone()).collect(); 191 223 192 - let lists = hyd.hydrate_lists(uris).await; 193 - let lists = lists.into_values().collect::<Vec<_>>(); 224 + // Use cache for list hydration (returns HashMap) 225 + let lists_map = state.list_cache.get_or_hydrate_from_uris( 226 + uris, 227 + &state.pool, 228 + &state.id_cache, 229 + &hyd, 230 + ).await; 231 + 232 + // Convert to Vec for API response 233 + let lists = lists_map.into_values().collect(); 194 234 195 235 Ok(Json(GetListsRes { cursor, lists })) 196 236 } ··· 216 256 .last() 217 257 .map(|last| last.0.timestamp_millis().to_string()); 218 258 219 - let uris = results.iter().map(|r| r.1.clone()).collect(); 259 + let uris: Vec<String> = results.iter().map(|r| r.1.clone()).collect(); 220 260 221 - let lists = hyd.hydrate_lists(uris).await; 222 - let lists = lists.into_values().collect::<Vec<_>>(); 261 + // Use cache for list hydration (returns HashMap) 262 + let lists_map = state.list_cache.get_or_hydrate_from_uris( 263 + uris, 264 + &state.pool, 265 + &state.id_cache, 266 + &hyd, 267 + ).await; 268 + 269 + // Convert to Vec for API response 270 + let lists = lists_map.into_values().collect(); 223 271 224 272 Ok(Json(GetListsRes { cursor, lists })) 225 273 }
+8 -1
parakeet/src/xrpc/app_bsky/graph/search.rs
··· 108 108 (None, None) 109 109 }; 110 110 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 111 - let mut packs_map = hyd.hydrate_starterpacks_basic(uris.clone()).await; 111 + 112 + // Use cache for starterpack hydration (returns HashMap directly) 113 + let mut packs_map = state.starterpack_cache.get_or_hydrate_from_uris( 114 + uris.clone(), 115 + &state.pool, 116 + &state.id_cache, 117 + &hyd, 118 + ).await; 112 119 113 120 // Maintain search result order 114 121 let starter_packs: Vec<StarterPackViewBasic> = uris
+43 -8
parakeet/src/xrpc/app_bsky/graph/starter_packs.rs
··· 72 72 }) 73 73 .collect(); 74 74 75 - let mut starter_packs = hyd.hydrate_starterpacks_basic(uris.clone()).await; 75 + // Use cache for starterpack hydration (returns HashMap directly) 76 + let mut starter_packs_map = state.starterpack_cache.get_or_hydrate_from_uris( 77 + uris.clone(), 78 + &state.pool, 79 + &state.id_cache, 80 + &hyd, 81 + ).await; 76 82 77 83 let starter_packs = uris 78 84 .into_iter() 79 - .filter_map(|uri| starter_packs.remove(&uri)) 85 + .filter_map(|uri| starter_packs_map.remove(&uri)) 80 86 .collect(); 81 87 82 88 Ok(Json(StarterPacksRes { ··· 112 118 }; 113 119 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 114 120 115 - let Some(starter_pack) = hyd.hydrate_starterpack(query.starter_pack).await else { 121 + // Parse the starterpack URI to extract actor_id and rkey for caching 122 + let starter_pack = if let Some(parsed) = crate::entity_cache::parse_at_uri(&query.starter_pack) { 123 + if parsed.collection == "app.bsky.graph.starterpack" { 124 + // Get actor_id from DID 125 + if let Ok(actor_id) = crate::id_cache_helpers::get_actor_id_or_fetch( 126 + &state.pool, 127 + &state.id_cache, 128 + &parsed.did 129 + ).await { 130 + // Use cache for single starterpack, but it returns StarterPackViewBasic 131 + // We need StarterPackView, so we still use hydrate_starterpack directly 132 + // This is a limitation - the cache stores Basic views but this needs full View 133 + #[allow(deprecated)] 134 + hyd.hydrate_starterpack(query.starter_pack.clone()).await 135 + } else { 136 + None 137 + } 138 + } else { 139 + None 140 + } 141 + } else { 142 + None 143 + }; 144 + 145 + let Some(starter_pack) = starter_pack else { 116 146 return Err(Error::not_found()); 117 147 }; 118 148 ··· 139 169 }; 140 170 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 141 171 142 - let starter_packs = hyd 143 - .hydrate_starterpacks_basic(query.uris) 144 - .await 145 - .into_values() 146 - .collect(); 172 + // Use cache for batch starterpack hydration (returns HashMap) 173 + let packs_map = state.starterpack_cache.get_or_hydrate_from_uris( 174 + query.uris, 175 + &state.pool, 176 + &state.id_cache, 177 + &hyd, 178 + ).await; 179 + 180 + // Convert to Vec for API response 181 + let starter_packs = packs_map.into_values().collect(); 147 182 148 183 Ok(Json(StarterPacksRes { 149 184 starter_packs,
+13 -1
parakeet/src/xrpc/app_bsky/notification/mod.rs
··· 226 226 Some(did), 227 227 Some(actor_id), 228 228 ).await; 229 - let profiles_map = hyd.hydrate_profiles_detailed(author_dids.clone()).await; 229 + // Use cache for profile hydration 230 + let profiles_vec = state.profile_cache.get_or_hydrate_batch( 231 + author_dids.clone(), 232 + &state.pool, 233 + &state.id_cache, 234 + &hyd, 235 + ).await; 236 + 237 + // Convert Vec to HashMap for compatibility with existing code 238 + let profiles_map: std::collections::HashMap<String, lexica::app_bsky::actor::ProfileViewDetailed> = 239 + profiles_vec.into_iter() 240 + .map(|profile| (profile.did.clone(), profile)) 241 + .collect(); 230 242 tracing::info!(" → Hydrate profiles: {:.1}ms ({} profiles)", profile_start.elapsed().as_secs_f64() * 1000.0, profiles_map.len()); 231 243 232 244 // Build notification responses
+24 -3
parakeet/src/xrpc/app_bsky/unspecced/mod.rs
··· 196 196 (None, None) 197 197 }; 198 198 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 199 - let mut feeds_map = hyd.hydrate_feedgens(page_uris.clone()).await; 199 + 200 + // Use cache for feedgen hydration (returns HashMap directly) 201 + let mut feeds_map = state.feedgen_cache.get_or_hydrate_from_uris( 202 + page_uris.clone(), 203 + &state.pool, 204 + &state.id_cache, 205 + &hyd, 206 + ).await; 200 207 201 208 let feeds: Vec<GeneratorView> = page_uris 202 209 .into_iter() ··· 423 430 (None, None) 424 431 }; 425 432 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 426 - let mut feeds_map = hyd.hydrate_feedgens(page_uris.clone()).await; 433 + 434 + // Use cache for feedgen hydration (returns HashMap directly) 435 + let mut feeds_map = state.feedgen_cache.get_or_hydrate_from_uris( 436 + page_uris.clone(), 437 + &state.pool, 438 + &state.id_cache, 439 + &hyd, 440 + ).await; 427 441 428 442 let feeds: Vec<GeneratorView> = page_uris 429 443 .into_iter() ··· 523 537 (None, None) 524 538 }; 525 539 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_did, maybe_actor_id).await; 526 - let mut packs_map = hyd.hydrate_starterpacks_basic(page_uris.clone()).await; 540 + 541 + // Use cache for starterpack hydration (returns HashMap directly) 542 + let mut packs_map = state.starterpack_cache.get_or_hydrate_from_uris( 543 + page_uris.clone(), 544 + &state.pool, 545 + &state.id_cache, 546 + &hyd, 547 + ).await; 527 548 528 549 let starter_packs: Vec<StarterPackViewBasic> = page_uris 529 550 .into_iter()
+6 -1
parakeet/src/xrpc/app_bsky/unspecced/thread_v2/other_replies.rs
··· 96 96 &state.id_cache 97 97 ).await?; 98 98 let reply_uris: Vec<String> = replies.iter().map(|item| item.at_uri.clone()).collect(); 99 - let replies_hydrated = hyd.hydrate_posts(reply_uris).await; 99 + let replies_hydrated = state.post_cache.get_or_hydrate_from_uris( 100 + reply_uris, 101 + &state.pool, 102 + &state.id_cache, 103 + &hyd, 104 + ).await; 100 105 101 106 // Build a map of parent_uri -> [child_posts] 102 107 let mut replies_by_parent: HashMap<String, Vec<(String, i32)>> = HashMap::new();
+2
parakeet/src/xrpc/app_bsky/unspecced/thread_v2/post_thread.rs
··· 57 57 // Create a thread builder 58 58 let builder = ThreadBuilder { 59 59 hydrater: &hyd, 60 + post_cache: &state.post_cache, 60 61 anchor_uri: uri, 61 62 anchor_post, 62 63 threadgate: threadgate.clone(), ··· 67 68 prioritize_followed_users: query.prioritize_followed_users, 68 69 is_authenticated, 69 70 id_cache: &state.id_cache, 71 + pool: &state.pool, 70 72 }; 71 73 72 74 // Build the thread
+20 -3
parakeet/src/xrpc/app_bsky/unspecced/thread_v2/thread_builder.rs
··· 1 + use crate::entity_cache::PostCache; 1 2 use crate::hydration::StatefulHydrator; 3 + use diesel_async::pooled_connection::deadpool::Pool; 4 + use diesel_async::AsyncPgConnection; 2 5 use lexica::app_bsky::feed::{PostView, ThreadgateView}; 6 + use parakeet_db::id_cache::IdCache; 3 7 use std::collections::HashMap; 8 + use std::sync::Arc; 4 9 5 10 use super::models::{PostThreadSort, ThreadItemPost, ThreadV2Item, ThreadV2ItemType}; 6 11 use super::sorting::sort_replies; ··· 9 14 #[expect(dead_code, reason = "threadgate and prioritize_followed_users are infrastructure for future thread filtering features")] 10 15 pub struct ThreadBuilder<'a> { 11 16 pub hydrater: &'a StatefulHydrator<'a>, 17 + pub post_cache: &'a Arc<PostCache>, 12 18 pub anchor_uri: String, 13 19 pub anchor_post: PostView, 14 20 pub threadgate: Option<ThreadgateView>, ··· 18 24 pub sort: PostThreadSort, 19 25 pub prioritize_followed_users: bool, 20 26 pub is_authenticated: bool, 21 - pub id_cache: &'a parakeet_db::id_cache::IdCache, 27 + pub id_cache: &'a Arc<IdCache>, 28 + pub pool: &'a Pool<AsyncPgConnection>, 22 29 } 23 30 24 31 impl ThreadBuilder<'_> { ··· 175 182 176 183 let hydrate_start = std::time::Instant::now(); 177 184 let parent_uris: Vec<String> = parents.iter().map(|item| item.at_uri.clone()).collect(); 178 - let parents_hydrated = self.hydrater.hydrate_posts(parent_uris).await; 185 + let parents_hydrated = self.post_cache.get_or_hydrate_from_uris( 186 + parent_uris, 187 + self.pool, 188 + self.id_cache, 189 + self.hydrater, 190 + ).await; 179 191 tracing::info!(" → Hydrate parent posts: {:.1} ms ({} posts)", hydrate_start.elapsed().as_secs_f64() * 1000.0, parents_hydrated.len()); 180 192 181 193 // Check if anchor post has a parent that wasn't returned by get_thread_parents ··· 268 280 // Hydrate all the posts 269 281 let hydrate_start = std::time::Instant::now(); 270 282 let reply_uris: Vec<String> = replies.iter().map(|item| item.at_uri.clone()).collect(); 271 - let replies_hydrated = self.hydrater.hydrate_posts(reply_uris).await; 283 + let replies_hydrated = self.post_cache.get_or_hydrate_from_uris( 284 + reply_uris, 285 + self.pool, 286 + self.id_cache, 287 + self.hydrater, 288 + ).await; 272 289 tracing::info!(" → Hydrate posts: {:.1} ms ({} posts)", hydrate_start.elapsed().as_secs_f64() * 1000.0, replies_hydrated.len()); 273 290 274 291 // Build a map: parent_uri -> [(child_uri, sql_depth)]