Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

fix: use cache for post hydration

+22 -17
+1 -1
parakeet/src/hydration/posts/feed.rs
··· 35 35 let (mut posts_hyd, profiles_by_id) = tokio::join!( 36 36 async { 37 37 let start = std::time::Instant::now(); 38 - let result = self.hydrate_posts_inner(post_uris).await; 38 + let (result, _cache) = self.hydrate_posts_inner(post_uris).await; 39 39 let elapsed = start.elapsed().as_secs_f64() * 1000.0; 40 40 tracing::info!(" ├─ Posts hydration: {:.1} ms", elapsed); 41 41 result
+21 -16
parakeet/src/hydration/posts/mod.rs
··· 6 6 use lexica::app_bsky::actor::ProfileViewBasic; 7 7 use lexica::app_bsky::feed::{PostView, PostViewerState, ThreadgateView}; 8 8 9 - use builders::{build_postview, build_threadgate_view, HydratePostsRet}; 9 + use builders::{build_postview_with_cache, build_threadgate_view, build_threadgate_view_with_cache, HydratePostsRet}; 10 10 11 11 pub use feed::RawFeedItem; 12 12 ··· 70 70 &self, 71 71 threadgates: Vec<EnrichedThreadgate>, 72 72 post_did_map: &HashMap<(i32, i64), String>, // (actor_id, rkey) -> DID 73 + actor_cache: &HashMap<i32, String>, 73 74 ) -> HashMap<String, ThreadgateView> { 74 75 let lists = threadgates.iter().fold(Vec::new(), |mut acc, c| { 75 76 if let Some(lists) = &c.allowed_lists { ··· 97 98 98 99 Some(( 99 100 threadgate_uri, 100 - build_threadgate_view(threadgate, post_did, this_lists, self.loaders.post_state.id_cache()), 101 + build_threadgate_view_with_cache(threadgate, post_did, this_lists, self.loaders.post_state.id_cache(), actor_cache), 101 102 )) 102 103 }) 103 104 .collect() ··· 149 150 pub(super) async fn hydrate_posts_inner( 150 151 &self, 151 152 posts: Vec<String>, 152 - ) -> HashMap<String, HydratePostsRet> { 153 + ) -> (HashMap<String, HydratePostsRet>, HashMap<i32, String>) { 153 154 let load_start = std::time::Instant::now(); 154 155 let posts_with_stats = self.loaders.posts.load_many(posts).await; 155 156 let load_time = load_start.elapsed().as_secs_f64() * 1000.0; ··· 160 161 161 162 // Pre-fetch actor IDs needed for reply and threadgate construction 162 163 let reply_actor_ids = Self::collect_reply_actor_ids(&posts_with_stats); 163 - let _reply_actor_cache = if !reply_actor_ids.is_empty() { 164 + let reply_actor_cache = if !reply_actor_ids.is_empty() { 164 165 let cache_start = std::time::Instant::now(); 165 166 let actor_data = self.loaders.post_state.id_cache().get_actor_data_many(&reply_actor_ids).await; 166 167 let cache_time = cache_start.elapsed().as_secs_f64() * 1000.0; ··· 187 188 .values() 188 189 .map(|(post, _, _)| { 189 190 let author_id = post.post.actor_id; 190 - actor_id_to_did.insert(author_id, post.did.clone()); 191 - post_key_to_did.insert((post.post.actor_id, post.post.rkey), post.did.clone()); 192 - let uri_with_keys = (post.at_uri.clone(), post.post.actor_id, post.post.rkey, post.did.clone()); 191 + let did = post.did.clone(); 192 + actor_id_to_did.insert(author_id, did.clone()); 193 + post_key_to_did.insert((post.post.actor_id, post.post.rkey), did.clone()); 194 + let uri_with_keys = (post.at_uri.clone(), post.post.actor_id, post.post.rkey, did); 193 195 (author_id, uri_with_keys) 194 196 }) 195 197 .unzip(); ··· 243 245 } 244 246 result 245 247 }; 248 + let reply_actor_cache_ref = &reply_actor_cache; 246 249 let threadgates_future = async move { 247 250 let start = std::time::Instant::now(); 248 - let result = self.hydrate_threadgates(threadgates_to_hydrate, &post_key_to_did).await; 251 + let result = self.hydrate_threadgates(threadgates_to_hydrate, &post_key_to_did, reply_actor_cache_ref).await; 249 252 let elapsed = start.elapsed().as_secs_f64() * 1000.0; 250 253 tracing::info!(" → Threadgates: {:.1} ms ({} gates)", elapsed, threadgate_count); 251 254 if elapsed > 10.0 { ··· 281 284 tracing::warn!("Slow hydration (authors/labels/viewer/threadgates/embeds): {:.1} ms", hydrate_time); 282 285 } 283 286 284 - posts_with_stats 287 + let results = posts_with_stats 285 288 .into_iter() 286 289 .filter_map(|(uri, (post, threadgate, _stats))| { 287 290 let author = authors.get(&post.did)?.clone(); ··· 300 303 (post, author, labels, embed, threadgate, viewer, stats), 301 304 )) 302 305 }) 303 - .collect() 306 + .collect(); 307 + 308 + (results, reply_actor_cache) 304 309 } 305 310 306 311 pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 307 - // For now, continue using the original build_postview which will fall back to blocking calls 308 - // The optimization is in place but we'd need to refactor hydrate_posts_inner to return 309 - // the actor cache to fully utilize it. This is a future optimization. 310 - self.hydrate_posts_inner(posts) 311 - .await 312 + let (posts_data, actor_cache) = self.hydrate_posts_inner(posts).await; 313 + 314 + posts_data 312 315 .into_iter() 313 - .map(|(uri, data)| (uri, build_postview(data, self.loaders.post_state.id_cache()))) 316 + .map(|(uri, data)| { 317 + (uri, build_postview_with_cache(data, self.loaders.post_state.id_cache(), &actor_cache)) 318 + }) 314 319 .collect() 315 320 } 316 321