Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

the big feed refactor

handle.invalid 2a640043 6bfd6ae7

verified
Changed files
+217 -228
parakeet
src
hydration
xrpc
app_bsky
+152 -136
parakeet/src/hydration/posts.rs
··· 3 3 use lexica::app_bsky::actor::ProfileViewBasic; 4 4 use lexica::app_bsky::embed::Embed; 5 5 use lexica::app_bsky::feed::{ 6 - BlockedAuthor, FeedViewPost, PostView, PostViewerState, ReplyRef, ReplyRefPost, ThreadgateView, 6 + BlockedAuthor, FeedReasonRepost, FeedViewPost, FeedViewPostReason, PostView, PostViewerState, 7 + ReplyRef, ReplyRefPost, ThreadgateView, 7 8 }; 8 9 use lexica::app_bsky::graph::ListViewBasic; 9 10 use lexica::app_bsky::RecordStats; ··· 32 33 } 33 34 } 34 35 36 + type HydratePostsRet = ( 37 + models::Post, 38 + ProfileViewBasic, 39 + Vec<models::Label>, 40 + Option<Embed>, 41 + Option<ThreadgateView>, 42 + Option<PostViewerState>, 43 + Option<PostStats>, 44 + ); 45 + 35 46 fn build_postview( 36 - post: models::Post, 37 - author: ProfileViewBasic, 38 - labels: Vec<models::Label>, 39 - embed: Option<Embed>, 40 - threadgate: Option<ThreadgateView>, 41 - viewer: Option<PostViewerState>, 42 - stats: Option<PostStats>, 47 + (post, author, labels, embed, threadgate, viewer, stats): HydratePostsRet, 43 48 ) -> PostView { 44 49 let stats = stats 45 50 .map(|stats| RecordStats { ··· 135 140 let threadgate = self.hydrate_threadgate(threadgate).await; 136 141 let labels = self.get_label(&post.at_uri).await; 137 142 138 - Some(build_postview( 143 + Some(build_postview(( 139 144 post, author, labels, embed, threadgate, viewer, stats, 140 - )) 145 + ))) 141 146 } 142 147 143 - pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 148 + async fn hydrate_posts_inner(&self, posts: Vec<String>) -> HashMap<String, HydratePostsRet> { 144 149 let stats = self.loaders.post_stats.load_many(posts.clone()).await; 145 150 let posts = self.loaders.posts.load_many(posts).await; 146 151 ··· 150 155 .unzip::<_, _, Vec<_>, Vec<_>>(); 151 156 let authors = self.hydrate_profiles_basic(authors).await; 152 157 153 - let post_labels = self.get_label_many(&post_uris).await; 154 - let viewer_data = self.get_post_viewer_states(&post_uris).await; 158 + let mut post_labels = self.get_label_many(&post_uris).await; 159 + let mut viewer_data = self.get_post_viewer_states(&post_uris).await; 155 160 156 161 let threadgates = posts 157 162 .values() ··· 159 164 .collect(); 160 165 let threadgates = self.hydrate_threadgates(threadgates).await; 161 166 162 - let embeds = self.hydrate_embeds(post_uris).await; 167 + let mut embeds = self.hydrate_embeds(post_uris).await; 163 168 164 169 posts 165 170 .into_iter() 166 171 .filter_map(|(uri, (post, threadgate))| { 167 - let author = authors.get(&post.did)?; 168 - let embed = embeds.get(&uri).cloned(); 172 + let author = authors.get(&post.did)?.clone(); 173 + let embed = embeds.remove(&uri); 169 174 let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned()); 170 - let labels = post_labels.get(&uri).cloned().unwrap_or_default(); 175 + let labels = post_labels.remove(&uri).unwrap_or_default(); 171 176 let stats = stats.get(&uri).cloned(); 172 - let viewer = viewer_data.get(&uri).cloned(); 177 + let viewer = viewer_data.remove(&uri); 173 178 174 179 Some(( 175 180 uri, 176 - build_postview( 177 - post, 178 - author.to_owned(), 179 - labels, 180 - embed, 181 - threadgate, 182 - viewer, 183 - stats, 184 - ), 181 + (post, author, labels, embed, threadgate, viewer, stats), 185 182 )) 186 183 }) 187 184 .collect() 188 185 } 189 186 187 + pub async fn hydrate_posts(&self, posts: Vec<String>) -> HashMap<String, PostView> { 188 + self.hydrate_posts_inner(posts) 189 + .await 190 + .into_iter() 191 + .map(|(uri, data)| (uri, build_postview(data))) 192 + .collect() 193 + } 194 + 190 195 pub async fn hydrate_feed_posts( 191 196 &self, 192 - posts: Vec<String>, 197 + posts: Vec<RawFeedItem>, 193 198 author_threads_only: bool, 194 - ) -> HashMap<String, FeedViewPost> { 195 - let stats = self.loaders.post_stats.load_many(posts.clone()).await; 196 - let posts = self.loaders.posts.load_many(posts).await; 197 - 198 - let (authors, post_uris) = posts 199 - .values() 200 - .map(|(post, _)| (post.did.clone(), post.at_uri.clone())) 201 - .unzip::<_, _, Vec<_>, Vec<_>>(); 202 - let authors = self.hydrate_profiles_basic(authors).await; 203 - 204 - let post_labels = self.get_label_many(&post_uris).await; 205 - let viewer_data = self.get_post_viewer_states(&post_uris).await; 206 - let embeds = self.hydrate_embeds(post_uris.clone()).await; 199 + ) -> Vec<FeedViewPost> { 200 + let post_uris = posts 201 + .iter() 202 + .map(|item| item.post_uri().to_string()) 203 + .collect::<Vec<_>>(); 204 + let mut posts_hyd = self.hydrate_posts_inner(post_uris).await; 207 205 208 206 // we shouldn't show the parent when the post violates a threadgate. 209 - let reply_refs = posts 207 + let reply_refs = posts_hyd 210 208 .values() 211 - .filter(|(post, _)| !post.violates_threadgate) 212 - .flat_map(|(post, _)| [post.parent_uri.clone(), post.root_uri.clone()]) 209 + .filter(|(post, ..)| !post.violates_threadgate) 210 + .flat_map(|(post, ..)| [post.parent_uri.clone(), post.root_uri.clone()]) 213 211 .flatten() 214 212 .collect::<Vec<_>>(); 215 - 216 213 let reply_posts = self.hydrate_posts(reply_refs).await; 217 214 218 - // hydrate all the posts. 219 - let mut posts = posts 215 + let repost_profiles = posts 216 + .iter() 217 + .filter_map(|item| item.repost_by()) 218 + .collect::<Vec<_>>(); 219 + let profiles_hydrated = self.hydrate_profiles_basic(repost_profiles).await; 220 + 221 + posts 220 222 .into_iter() 221 - .filter_map(|(post_uri, (raw, _))| { 222 - let root = raw.root_uri.clone(); 223 - let parent = raw.parent_uri.clone(); 223 + .filter_map(|item| { 224 + let post = posts_hyd.remove(item.post_uri())?; 225 + let context = item.context(); 224 226 225 - let author = authors.get(&raw.did)?; 226 - let embed = embeds.get(&post_uri).cloned(); 227 - let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); 228 - let stats = stats.get(&post_uri).cloned(); 229 - let viewer = viewer_data.get(&post_uri).cloned(); 230 - let post = 231 - build_postview(raw, author.to_owned(), labels, embed, None, viewer, stats); 227 + let reply = if let RawFeedItem::Post { .. } = item { 228 + let root_uri = post.0.root_uri.as_ref(); 229 + let parent_uri = post.0.parent_uri.as_ref(); 232 230 233 - Some((post_uri, (post, root, parent))) 234 - }) 235 - .collect::<HashMap<_, _>>(); 231 + let (root, parent) = if author_threads_only { 232 + if root_uri.is_some() && parent_uri.is_some() { 233 + let root = root_uri.and_then(|uri| posts_hyd.get(uri))?; 234 + let parent = parent_uri.and_then(|uri| posts_hyd.get(uri))?; 236 235 237 - post_uris 238 - .into_iter() 239 - .filter_map(|post_uri| { 240 - let item = if author_threads_only { 241 - compile_feed_authors_threads_only(&post_uri, &mut posts)? 236 + let root = build_postview(root.clone()); 237 + let parent = build_postview(parent.clone()); 238 + 239 + (Some(root), Some(parent)) 240 + } else { 241 + (None, None) 242 + } 243 + } else { 244 + let root = root_uri.and_then(|uri| reply_posts.get(uri)).cloned(); 245 + let parent = parent_uri.and_then(|uri| reply_posts.get(uri)).cloned(); 246 + 247 + (root, parent) 248 + }; 249 + 250 + if root_uri.is_some() || parent_uri.is_some() { 251 + Some(ReplyRef { 252 + root: root.map(postview_to_replyref).unwrap_or( 253 + ReplyRefPost::NotFound { 254 + uri: root_uri.unwrap().to_owned(), 255 + not_found: true, 256 + }, 257 + ), 258 + parent: parent.map(postview_to_replyref).unwrap_or( 259 + ReplyRefPost::NotFound { 260 + uri: parent_uri.unwrap().to_owned(), 261 + not_found: true, 262 + }, 263 + ), 264 + grandparent_author: None, 265 + }) 266 + } else { 267 + None 268 + } 242 269 } else { 243 - compile_feed(&post_uri, &mut posts, &reply_posts)? 270 + None 271 + }; 272 + 273 + let reason = match item { 274 + RawFeedItem::Repost { uri, by, at, .. } => { 275 + Some(FeedViewPostReason::Repost(FeedReasonRepost { 276 + by: profiles_hydrated.get(&by).cloned()?, 277 + uri: Some(uri), 278 + cid: None, 279 + indexed_at: at, 280 + })) 281 + } 282 + RawFeedItem::Pin { .. } => Some(FeedViewPostReason::Pin), 283 + _ => None, 244 284 }; 245 285 246 - Some((post_uri, item)) 286 + let post = build_postview(post); 287 + 288 + Some(FeedViewPost { 289 + post, 290 + reply, 291 + reason, 292 + feed_context: context, 293 + }) 247 294 }) 248 295 .collect() 249 296 } ··· 288 335 } 289 336 } 290 337 291 - type FeedViewPartData = (PostView, Option<String>, Option<String>); 292 - 293 - // this is the 'normal' one that runs in most places 294 - fn compile_feed( 295 - uri: &String, 296 - posts: &mut HashMap<String, FeedViewPartData>, 297 - reply_posts: &HashMap<String, PostView>, 298 - ) -> Option<FeedViewPost> { 299 - let (post, root_uri, parent_uri) = posts.remove(uri)?; 300 - 301 - let root = root_uri.as_ref().and_then(|uri| reply_posts.get(uri)); 302 - let parent = parent_uri.as_ref().and_then(|uri| reply_posts.get(uri)); 303 - 304 - let reply = if parent_uri.is_some() && root_uri.is_some() { 305 - Some(ReplyRef { 306 - root: root 307 - .cloned() 308 - .map(postview_to_replyref) 309 - .unwrap_or(ReplyRefPost::NotFound { 310 - uri: root_uri.as_ref().unwrap().clone(), 311 - not_found: true, 312 - }), 313 - parent: parent 314 - .cloned() 315 - .map(postview_to_replyref) 316 - .unwrap_or(ReplyRefPost::NotFound { 317 - uri: parent_uri.as_ref().unwrap().clone(), 318 - not_found: true, 319 - }), 320 - grandparent_author: None, 321 - }) 322 - } else { 323 - None 324 - }; 325 - 326 - Some(FeedViewPost { 327 - post, 328 - reply, 329 - reason: None, 330 - feed_context: None, 331 - }) 338 + #[derive(Debug)] 339 + pub enum RawFeedItem { 340 + Pin { 341 + uri: String, 342 + context: Option<String>, 343 + }, 344 + Post { 345 + uri: String, 346 + context: Option<String>, 347 + }, 348 + Repost { 349 + uri: String, 350 + post: String, 351 + by: String, 352 + at: chrono::DateTime<chrono::Utc>, 353 + context: Option<String>, 354 + }, 332 355 } 333 356 334 - // and this one runs in getAuthorFeed when filter=PostsAndAuthorThreads 335 - fn compile_feed_authors_threads_only( 336 - uri: &String, 337 - posts: &mut HashMap<String, FeedViewPartData>, 338 - ) -> Option<FeedViewPost> { 339 - let (post, root_uri, parent_uri) = posts.get(uri)?.clone(); 340 - 341 - let root = root_uri.as_ref().and_then(|root| posts.get(root)); 342 - let parent = parent_uri.as_ref().and_then(|parent| posts.get(parent)); 357 + impl RawFeedItem { 358 + fn post_uri(&self) -> &str { 359 + match self { 360 + RawFeedItem::Pin { uri, .. } => uri, 361 + RawFeedItem::Post { uri, .. } => uri, 362 + RawFeedItem::Repost { post, .. } => post, 363 + } 364 + } 343 365 344 - let reply = if parent_uri.is_some() && root_uri.is_some() { 345 - Some(ReplyRef { 346 - root: root 347 - .cloned() 348 - .map(|(post, _, _)| postview_to_replyref(post))?, 349 - parent: parent 350 - .cloned() 351 - .map(|(post, _, _)| postview_to_replyref(post))?, 352 - grandparent_author: None, 353 - }) 354 - } else { 355 - None 356 - }; 366 + fn repost_by(&self) -> Option<String> { 367 + match self { 368 + RawFeedItem::Repost { by, .. } => Some(by.clone()), 369 + _ => None, 370 + } 371 + } 357 372 358 - Some(FeedViewPost { 359 - post, 360 - reply, 361 - reason: None, 362 - feed_context: None, 363 - }) 373 + fn context(&self) -> Option<String> { 374 + match self { 375 + RawFeedItem::Pin { context, .. } => context.clone(), 376 + RawFeedItem::Post { context, .. } => context.clone(), 377 + RawFeedItem::Repost { context, .. } => context.clone(), 378 + } 379 + } 364 380 }
+7 -8
parakeet/src/xrpc/app_bsky/feed/likes.rs
··· 1 + use crate::hydration::posts::RawFeedItem; 1 2 use crate::hydration::StatefulHydrator; 2 3 use crate::xrpc::error::{Error, XrpcResult}; 3 4 use crate::xrpc::extract::{AtpAcceptLabelers, AtpAuth}; ··· 57 58 .last() 58 59 .map(|(last, _)| last.timestamp_millis().to_string()); 59 60 60 - let at_uris = results 61 + let raw_feed = results 61 62 .iter() 62 - .map(|(_, uri)| uri.clone()) 63 + .map(|(_, uri)| RawFeedItem::Post { 64 + uri: uri.clone(), 65 + context: None, 66 + }) 63 67 .collect::<Vec<_>>(); 64 68 65 - let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; 66 - 67 - let feed: Vec<_> = results 68 - .into_iter() 69 - .filter_map(|(_, uri)| posts.remove(&uri)) 70 - .collect(); 69 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 71 70 72 71 Ok(Json(FeedRes { cursor, feed })) 73 72 }
+58 -84
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 1 + use crate::hydration::posts::RawFeedItem; 1 2 use crate::hydration::StatefulHydrator; 2 3 use crate::xrpc::app_bsky::graph::lists::ListWithCursorQuery; 3 4 use crate::xrpc::error::{Error, XrpcResult}; ··· 16 17 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 17 18 use lexica::app_bsky::actor::ProfileView; 18 19 use lexica::app_bsky::feed::{ 19 - BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, 20 - PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 20 + BlockedAuthor, FeedSkeletonResponse, FeedViewPost, PostView, SkeletonReason, ThreadViewPost, 21 + ThreadViewPostType, ThreadgateView, 21 22 }; 22 23 use parakeet_db::{models, schema}; 23 24 use reqwest::Url; ··· 113 114 114 115 let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 115 116 116 - let at_uris = skeleton.feed.iter().map(|v| v.post.clone()).collect(); 117 117 let repost_skeleton = skeleton 118 118 .feed 119 119 .iter() ··· 122 122 _ => None, 123 123 }) 124 124 .collect::<Vec<_>>(); 125 + let mut repost_data = get_skeleton_repost_data(&mut conn, repost_skeleton).await; 125 126 126 - let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; 127 - let mut repost_data = get_skeleton_repost_data(&mut conn, &hyd, repost_skeleton).await; 128 - 129 - let feed = skeleton 127 + let raw_feed = skeleton 130 128 .feed 131 129 .into_iter() 132 - .filter_map(|item| { 133 - let mut post = posts.remove(&item.post)?; 134 - let reason = match item.reason { 135 - Some(SkeletonReason::Repost { repost }) => { 136 - repost_data.remove(&repost).map(FeedViewPostReason::Repost) 137 - } 138 - Some(SkeletonReason::Pin {}) => Some(FeedViewPostReason::Pin), 139 - _ => None, 140 - }; 141 - 142 - post.reason = reason; 143 - post.feed_context = item.feed_context; 144 - 145 - Some(post) 130 + .filter_map(|v| match v.reason { 131 + Some(SkeletonReason::Repost { repost }) => { 132 + repost_data 133 + .remove_entry(&repost) 134 + .map(|(uri, (by, at))| RawFeedItem::Repost { 135 + uri, 136 + post: v.post, 137 + by, 138 + at: at.and_utc(), 139 + context: v.feed_context, 140 + }) 141 + } 142 + Some(SkeletonReason::Pin {}) => Some(RawFeedItem::Pin { 143 + uri: v.post, 144 + context: v.feed_context, 145 + }), 146 + None => Some(RawFeedItem::Post { 147 + uri: v.post, 148 + context: v.feed_context, 149 + }), 146 150 }) 147 151 .collect(); 152 + 153 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 148 154 149 155 Ok(Json(FeedRes { 150 156 cursor: skeleton.cursor, ··· 204 210 205 211 let pin = match query.include_pins && query.cursor.is_none() { 206 212 false => None, 207 - true => match crate::db::get_pinned_post_uri(&mut conn, &did).await? { 208 - Some(post) => hyd.hydrate_post(post).await, 209 - None => None, 210 - }, 213 + true => crate::db::get_pinned_post_uri(&mut conn, &did).await?, 211 214 }; 212 215 213 216 let limit = query.limit.unwrap_or(50).clamp(1, 100); ··· 259 262 .last() 260 263 .map(|item| item.sort_at.timestamp_millis().to_string()); 261 264 262 - let at_uris = results 263 - .iter() 264 - .map(|item| item.post.clone()) 265 - .collect::<Vec<_>>(); 266 - 267 - // get the actor for if we have reposted 268 - let profile = hyd 269 - .hydrate_profile_basic(did) 270 - .await 271 - .ok_or(Error::server_error(None))?; 272 - 273 - let mut posts = hyd.hydrate_feed_posts(at_uris, author_threads_only).await; 274 - 275 - let mut feed: Vec<_> = results 265 + let mut raw_feed = results 276 266 .into_iter() 277 - .filter_map(|item| { 278 - posts.remove(&item.post).map(|mut fvp| { 279 - if item.typ == "repost" { 280 - fvp.reason = Some(FeedViewPostReason::Repost(FeedReasonRepost { 281 - by: profile.clone(), 282 - uri: Some(item.uri), 283 - cid: Some(item.cid), 284 - indexed_at: Default::default(), 285 - })) 286 - } 287 - fvp 288 - }) 267 + .filter_map(|item| match &*item.typ { 268 + "post" => Some(RawFeedItem::Post { 269 + uri: item.post, 270 + context: None, 271 + }), 272 + "repost" => Some(RawFeedItem::Repost { 273 + uri: item.uri, 274 + post: item.post, 275 + by: item.did, 276 + at: item.sort_at, 277 + context: None, 278 + }), 279 + _ => None, 289 280 }) 290 - .collect(); 281 + .collect::<Vec<_>>(); 291 282 292 283 if let Some(post) = pin { 293 - feed.insert( 284 + raw_feed.insert( 294 285 0, 295 - FeedViewPost { 296 - post, 297 - reply: None, 298 - reason: Some(FeedViewPostReason::Pin), 299 - feed_context: None, 286 + RawFeedItem::Pin { 287 + uri: post, 288 + context: None, 300 289 }, 301 290 ); 302 291 } 292 + 293 + let feed = hyd.hydrate_feed_posts(raw_feed, author_threads_only).await; 303 294 304 295 Ok(Json(FeedRes { cursor, feed })) 305 296 } ··· 342 333 .last() 343 334 .map(|(last, _)| last.timestamp_millis().to_string()); 344 335 345 - let at_uris = results 336 + let raw_feed = results 346 337 .iter() 347 - .map(|(_, uri)| uri.clone()) 338 + .map(|(_, uri)| RawFeedItem::Post { 339 + uri: uri.clone(), 340 + context: None, 341 + }) 348 342 .collect::<Vec<_>>(); 349 343 350 - let mut posts = hyd.hydrate_feed_posts(at_uris, false).await; 351 - 352 - let feed = results 353 - .into_iter() 354 - .filter_map(|(_, uri)| posts.remove(&uri)) 355 - .collect(); 344 + let feed = hyd.hydrate_feed_posts(raw_feed, false).await; 356 345 357 346 Ok(Json(FeedRes { cursor, feed })) 358 347 } ··· 686 675 } 687 676 } 688 677 689 - async fn get_skeleton_repost_data<'a>( 678 + async fn get_skeleton_repost_data( 690 679 conn: &mut AsyncPgConnection, 691 - hyd: &StatefulHydrator<'a>, 692 680 reposts: Vec<String>, 693 - ) -> HashMap<String, FeedReasonRepost> { 681 + ) -> HashMap<String, (String, NaiveDateTime)> { 694 682 let Ok(repost_data) = schema::records::table 695 683 .select(( 696 684 schema::records::at_uri, ··· 704 692 return HashMap::new(); 705 693 }; 706 694 707 - let profiles = repost_data.iter().map(|(_, did, _)| did.clone()).collect(); 708 - let profiles = hyd.hydrate_profiles_basic(profiles).await; 709 - 710 695 repost_data 711 696 .into_iter() 712 - .filter_map(|(uri, did, indexed_at)| { 713 - let by = profiles.get(&did).cloned()?; 714 - 715 - let repost = FeedReasonRepost { 716 - by, 717 - uri: Some(uri.clone()), 718 - cid: None, // okay, we do have this, but the app doesn't seem to be bothered about not setting it. 719 - indexed_at: indexed_at.and_utc(), 720 - }; 721 - 722 - Some((uri, repost)) 723 - }) 697 + .map(|(uri, did, at)| (uri, (did, at))) 724 698 .collect() 725 699 } 726 700