Rust AppView - highly experimental!
1
fork

Configure Feed

Select the types of activity you want to include in your feed.

cleanup, timelines

+342 -317
+43 -100
parakeet/src/entities/post.rs
··· 452 452 Ok(results) 453 453 } 454 454 455 + /// Convert PostData list to PostViews for timeline 456 + pub async fn convert_to_post_views(&self, posts: Vec<PostData>, viewer_did: Option<&str>) -> Vec<PostView> { 457 + let viewer_actor_id = if let Some(did) = viewer_did { 458 + self.profile_entity.resolve_identifier(did).await.ok() 459 + } else { 460 + None 461 + }; 462 + 463 + let mut views = Vec::with_capacity(posts.len()); 464 + for post_data in posts { 465 + views.push(self.post_data_to_view(post_data, viewer_actor_id).await); 466 + } 467 + views 468 + } 469 + 455 470 /// Convert PostData to PostView 456 471 async fn post_data_to_view(&self, data: PostData, viewer_actor_id: Option<i32>) -> PostView { 457 472 let uri = format!( ··· 969 984 } 970 985 971 986 /// Get timeline posts for a user 972 - #[instrument(skip(self))] 973 - pub async fn get_timeline_posts( 974 - &self, 975 - followed_actor_ids: &[i32], 976 - cursor_timestamp: Option<&chrono::DateTime<chrono::Utc>>, 977 - limit: u8, 978 - ) -> Result<Vec<(i32, i64, chrono::DateTime<chrono::Utc>)>> { 979 - if followed_actor_ids.is_empty() { 980 - return Ok(Vec::new()); 981 - } 982 - 983 - let mut conn = self.db_pool.get().await?; 984 - 985 - use parakeet_db::schema::posts; 986 - use parakeet_db::types::PostStatus; 987 - use diesel::prelude::*; 988 - use diesel_async::RunQueryDsl; 989 - 990 - let mut query = posts::table 991 - .filter(posts::actor_id.eq_any(followed_actor_ids)) 992 - .filter(posts::status.eq(PostStatus::Complete)) 993 - .order_by(posts::rkey.desc()) 994 - .limit(limit as i64 + 1) 995 - .into_boxed(); 996 - 997 - if let Some(cursor_ts) = cursor_timestamp { 998 - let cursor_tid_str = parakeet_db::tid_util::timestamp_to_tid(*cursor_ts); 999 - let cursor_tid = parakeet_db::tid_util::decode_tid(&cursor_tid_str).unwrap_or(0); 1000 - query = query.filter(posts::rkey.lt(cursor_tid)); 1001 - } 1002 - 1003 - let post_data: Vec<(i32, i64)> = query 1004 - .select((posts::actor_id, posts::rkey)) 1005 - .load(&mut conn) 1006 - .await?; 1007 - 1008 - Ok(post_data.into_iter() 1009 - .map(|(actor_id, rkey)| { 1010 - let timestamp = parakeet_db::tid_util::tid_to_datetime(rkey); 1011 - (actor_id, rkey, timestamp) 1012 - }) 1013 - .collect()) 1014 - } 1015 - 1016 - /// Get author feed posts 1017 - #[instrument(skip(self))] 1018 - pub async fn get_author_feed( 1019 - &self, 1020 - actor_id: i32, 1021 - cursor_timestamp: Option<&chrono::DateTime<chrono::Utc>>, 1022 - limit: u8, 1023 - filter: Option<&str>, 1024 - ) -> Result<Vec<(i32, i64, chrono::DateTime<chrono::Utc>)>> { 1025 - let mut conn = self.db_pool.get().await?; 1026 - 1027 - use parakeet_db::schema::posts; 1028 - use parakeet_db::types::PostStatus; 1029 - use diesel::prelude::*; 1030 - use diesel_async::RunQueryDsl; 1031 - 1032 - let mut query = posts::table 1033 - .filter(posts::actor_id.eq(actor_id)) 1034 - .filter(posts::status.eq(PostStatus::Complete)) 1035 - .order_by(posts::rkey.desc()) 1036 - .limit(limit as i64 + 1) 1037 - .into_boxed(); 1038 - 1039 - // Apply filter 1040 - match filter { 1041 - Some("posts_with_replies") => { 1042 - // Include all posts 1043 - }, 1044 - Some("posts_with_media") => { 1045 - use parakeet_db::types::EmbedType; 1046 - query = query.filter(posts::embed_type.eq(EmbedType::Images)); 1047 - }, 1048 - _ => { 1049 - // Default: posts_no_replies - exclude replies 1050 - query = query.filter(posts::parent_post_actor_id.is_null()); 1051 - } 1052 - } 1053 - 1054 - if let Some(cursor_ts) = cursor_timestamp { 1055 - let cursor_tid_str = parakeet_db::tid_util::timestamp_to_tid(*cursor_ts); 1056 - let cursor_tid = parakeet_db::tid_util::decode_tid(&cursor_tid_str).unwrap_or(0); 1057 - query = query.filter(posts::rkey.lt(cursor_tid)); 1058 - } 1059 - 1060 - let post_data: Vec<(i32, i64)> = query 1061 - .select((posts::actor_id, posts::rkey)) 1062 - .load(&mut conn) 1063 - .await?; 1064 - 1065 - Ok(post_data.into_iter() 1066 - .map(|(actor_id, rkey)| { 1067 - let timestamp = parakeet_db::tid_util::tid_to_datetime(rkey); 1068 - (actor_id, rkey, timestamp) 1069 - }) 1070 - .collect()) 1071 - } 987 + /// Note: This method is deprecated in favor of the new ProfileEntity-based approach 1072 988 1073 989 /// Get posts quoting another post 1074 990 #[instrument(skip(self))] ··· 1141 1057 .await?; 1142 1058 1143 1059 Ok(results) 1060 + } 1061 + 1062 + /// Get repost information (what post was reposted) 1063 + #[instrument(skip(self))] 1064 + pub async fn get_repost_info( 1065 + &self, 1066 + actor_id: i32, 1067 + rkey: i64, 1068 + ) -> Result<Option<(i32, i64, chrono::DateTime<chrono::Utc>)>> { 1069 + let mut conn = self.db_pool.get().await?; 1070 + 1071 + use parakeet_db::schema::reposts; 1072 + use diesel::prelude::*; 1073 + use diesel_async::RunQueryDsl; 1074 + 1075 + let result = reposts::table 1076 + .filter(reposts::actor_id.eq(actor_id)) 1077 + .filter(reposts::rkey.eq(rkey)) 1078 + .select((reposts::post_actor_id, reposts::post_rkey)) 1079 + .first::<(i32, i64)>(&mut conn) 1080 + .await 1081 + .optional()?; 1082 + 1083 + Ok(result.map(|(post_actor_id, post_rkey)| { 1084 + let timestamp = parakeet_db::tid_util::tid_to_datetime(rkey); 1085 + (post_actor_id, post_rkey, timestamp) 1086 + })) 1144 1087 } 1145 1088 1146 1089 /// Search posts by query
-60
parakeet/src/entities/post_tests.rs
··· 193 193 Ok(()) 194 194 } 195 195 196 - #[tokio::test] 197 - async fn test_get_author_feed_empty() -> eyre::Result<()> { 198 - let entity = setup_test_entity().await; 199 - 200 - let actor_id = 999999; 201 - let cursor = None; 202 - let limit = 50; 203 - let filter = Some("posts_no_replies"); 204 - let result = entity.get_author_feed(actor_id, cursor, limit, filter).await; 205 - 206 - // Non-existent author should return empty 207 - assert!(result.is_ok()); 208 - assert_eq!(result.unwrap().len(), 0); 209 - Ok(()) 210 - } 211 - 212 196 // ============================================================================ 213 197 // LIKES AND REPOSTS TESTS 214 198 // ============================================================================ ··· 347 331 // ============================================================================ 348 332 // TIMELINE AND FEED TESTS 349 333 // ============================================================================ 350 - 351 - #[tokio::test] 352 - async fn test_get_timeline_posts_empty() -> eyre::Result<()> { 353 - let entity = setup_test_entity().await; 354 - 355 - let actor_ids: Vec<i32> = vec![]; 356 - let cursor = None; 357 - let limit = 50; 358 - let result = entity.get_timeline_posts(&actor_ids, cursor, limit).await; 359 - 360 - // Empty DIDs should return empty 361 - assert!(result.is_ok()); 362 - assert_eq!(result.unwrap().len(), 0); 363 - Ok(()) 364 - } 365 - 366 - #[tokio::test] 367 - async fn test_get_timeline_posts_nonexistent() -> eyre::Result<()> { 368 - let entity = setup_test_entity().await; 369 - 370 - let actor_ids = vec![999999, 999998, 999997]; 371 - let cursor = None; 372 - let limit = 50; 373 - let result = entity.get_timeline_posts(&actor_ids, cursor, limit).await; 374 - 375 - // Non-existent DIDs should return empty 376 - assert!(result.is_ok()); 377 - assert_eq!(result.unwrap().len(), 0); 378 - Ok(()) 379 - } 380 - 381 - #[tokio::test] 382 - async fn test_get_timeline_posts_with_cursor() -> eyre::Result<()> { 383 - let entity = setup_test_entity().await; 384 - 385 - let actor_ids = vec![999999]; 386 - let cursor = Some(&chrono::Utc::now()); 387 - let limit = 50; 388 - let result = entity.get_timeline_posts(&actor_ids, cursor, limit).await; 389 - 390 - // Should handle cursor gracefully 391 - assert!(result.is_ok()); 392 - Ok(()) 393 - } 394 334 395 335 // ============================================================================ 396 336 // QUOTES TESTS
+106
parakeet/src/entities/profile.rs
··· 653 653 Ok(known_followers) 654 654 } 655 655 656 + /// Get profiles of accounts this actor follows, with their post rkeys 657 + /// This is optimized for timeline generation - returns profiles with their posts 658 + pub async fn get_following_with_posts( 659 + &self, 660 + actor_id: i32, 661 + max_follows: usize, 662 + max_posts_per_user: usize, 663 + ) -> Result<Vec<ProfileData>> { 664 + // Get the list of who this actor follows 665 + let follows = self.get_following(actor_id, None, max_follows as u8).await?; 666 + let followed_actor_ids: Vec<i32> = follows.iter() 667 + .map(|f| f.subject_actor_id) 668 + .collect(); 669 + 670 + // Get profiles for all followed actors (with their post_rkeys) 671 + let profiles = self.get_profiles_by_ids(&followed_actor_ids).await?; 672 + 673 + // Convert to ProfileData which includes post_rkeys 674 + let mut profiles_with_posts: Vec<ProfileData> = profiles.iter() 675 + .map(ProfileData::from) 676 + .collect(); 677 + 678 + // Optionally limit post_rkeys per user for performance 679 + for profile in &mut profiles_with_posts { 680 + profile.post_rkeys.truncate(max_posts_per_user); 681 + } 682 + 683 + Ok(profiles_with_posts) 684 + } 685 + 686 + /// Get posts for a specific author 687 + /// Returns (actor_id, rkey) pairs for the author's posts 688 + pub async fn get_author_posts( 689 + &self, 690 + actor_id: i32, 691 + limit: usize, 692 + cursor_rkey: Option<i64>, 693 + filter: Option<&str>, 694 + ) -> Result<Vec<(i32, i64)>> { 695 + // Get the actor's profile (with post_rkeys) 696 + let profile_data = match self.get_profile_by_id(actor_id).await { 697 + Ok(actor) => ProfileData::from(&actor), 698 + Err(_) => return Ok(Vec::new()), 699 + }; 700 + 701 + let mut post_keys = Vec::new(); 702 + 703 + // Apply cursor filter if provided 704 + for &rkey in &profile_data.post_rkeys { 705 + if let Some(cursor) = cursor_rkey { 706 + if rkey >= cursor { 707 + continue; // Skip posts at or after cursor 708 + } 709 + } 710 + 711 + // TODO: Apply filter (posts_no_replies, posts_with_media, etc.) 712 + // For now, return all posts 713 + match filter { 714 + Some("posts_no_replies") => { 715 + // Would need to check post type, for now include all 716 + post_keys.push((actor_id, rkey)); 717 + } 718 + Some("posts_with_media") => { 719 + // Would need to check if post has media, for now include all 720 + post_keys.push((actor_id, rkey)); 721 + } 722 + _ => { 723 + // Default: include all posts 724 + post_keys.push((actor_id, rkey)); 725 + } 726 + } 727 + 728 + if post_keys.len() >= limit { 729 + break; 730 + } 731 + } 732 + 733 + Ok(post_keys) 734 + } 735 + 656 736 // Note: Like-related methods are on PostEntity, not ProfileEntity 657 737 // Likes are stored as arrays on the Post model (like_actor_ids, like_rkeys) 658 738 ··· 965 1045 pub followers_count: i32, 966 1046 pub following_count: i32, 967 1047 pub posts_count: i32, 1048 + pub post_rkeys: Vec<i64>, // All post rkeys for this actor (sorted descending) 1049 + pub repost_rkeys: Vec<i64>, // All repost rkeys for this actor (sorted descending) 968 1050 } 969 1051 970 1052 impl From<&Actor> for ProfileData { 971 1053 fn from(actor: &Actor) -> Self { 1054 + // Extract and sort post rkeys (newest first) 1055 + let mut post_rkeys: Vec<i64> = actor.post_rkeys 1056 + .as_ref() 1057 + .map(|rkeys| { 1058 + rkeys.iter() 1059 + .filter_map(|r| r.as_ref().copied()) 1060 + .collect() 1061 + }) 1062 + .unwrap_or_default(); 1063 + post_rkeys.sort_by(|a, b| b.cmp(a)); // Sort descending (newest first) 1064 + 1065 + // Extract and sort repost rkeys (newest first) 1066 + let mut repost_rkeys: Vec<i64> = actor.repost_rkeys 1067 + .as_ref() 1068 + .map(|rkeys| { 1069 + rkeys.iter() 1070 + .filter_map(|r| r.as_ref().copied()) 1071 + .collect() 1072 + }) 1073 + .unwrap_or_default(); 1074 + repost_rkeys.sort_by(|a, b| b.cmp(a)); // Sort descending (newest first) 1075 + 972 1076 ProfileData { 973 1077 actor_id: actor.id, 974 1078 did: actor.did.clone(), ··· 982 1086 followers_count: actor.followers_count.unwrap_or(0), 983 1087 following_count: actor.following_count.unwrap_or(0), 984 1088 posts_count: actor.posts_count.unwrap_or(0), 1089 + post_rkeys, 1090 + repost_rkeys, 985 1091 } 986 1092 } 987 1093 }
+193 -157
parakeet/src/xrpc/app_bsky/feed/get_timeline.rs
··· 1 1 use crate::xrpc::datetime_cursor; 2 - use crate::common::errors::{Error, XrpcResult}; 2 + use crate::common::errors::XrpcResult; 3 3 use crate::common::auth::{AtpAcceptLabelers, AtpAuth}; 4 4 use crate::GlobalState; 5 5 use axum::extract::{Query, State}; 6 6 use axum::Json; 7 - use lexica::app_bsky::feed::{FeedReasonRepost, FeedViewPost, FeedViewPostReason, PostView}; 7 + use lexica::app_bsky::feed::{FeedViewPost, PostView}; 8 8 use serde::{Deserialize, Serialize}; 9 - use std::collections::HashMap; 10 9 11 10 #[derive(Debug, Deserialize)] 12 11 pub struct GetTimelineQuery { ··· 43 42 let user_actor_id = state.profile_entity.resolve_identifier(&user_did).await 44 43 .map_err(|_| crate::common::errors::Error::actor_not_found(&user_did))?; 45 44 46 - // Query database 45 + // New pattern: Get profiles with their posts, then hydrate 47 46 let mut step_timer = std::time::Instant::now(); 48 47 49 - // Parse cursor 50 - let cursor_value = datetime_cursor(query.cursor.as_ref()); 48 + // Parse cursor (convert to TID if provided) 49 + let cursor_tid = query.cursor.as_ref() 50 + .and_then(|c| datetime_cursor(Some(c))) 51 + .map(|dt| { 52 + let tid_str = parakeet_db::tid_util::timestamp_to_tid(dt); 53 + parakeet_db::tid_util::decode_tid(&tid_str).unwrap_or(0) 54 + }); 51 55 52 - // Get followed users from ProfileEntity 53 - let following = state.profile_entity.get_following(user_actor_id, None, 255).await?; 54 - let followed_ids: Vec<i32> = following.iter() 55 - .map(|f| f.subject_actor_id) 56 - .collect(); 56 + // Get followed profiles with their post rkeys (all cached) 57 + let following_with_posts = state.profile_entity 58 + .get_following_with_posts(user_actor_id, 255, 100) 59 + .await?; 57 60 58 - // Get timeline posts using PostEntity 59 - let posts_result = state.post_entity.get_timeline_posts(&followed_ids, cursor_value.as_ref(), limit + 1).await 60 - .map_err(|e| crate::common::errors::Error::server_error(Some(&e.to_string())))?; 61 + let profile_fetch_time = step_timer.elapsed().as_secs_f64() * 1000.0; 62 + tracing::info!(" ├─ Get following profiles with posts: {:.1} ms ({} profiles)", 63 + profile_fetch_time, following_with_posts.len()); 64 + 65 + step_timer = std::time::Instant::now(); 66 + 67 + // Collect timeline items (posts and reposts) from followed users 68 + #[derive(Debug)] 69 + enum TimelineItem { 70 + Post { actor_id: i32, rkey: i64 }, 71 + Repost { reposter_id: i32, rkey: i64 }, 72 + } 73 + 74 + let mut timeline_items: Vec<(i64, TimelineItem)> = Vec::new(); 75 + 76 + for profile in &following_with_posts { 77 + // Add posts 78 + for &rkey in &profile.post_rkeys { 79 + // Apply cursor filtering if needed 80 + if let Some(cursor) = cursor_tid { 81 + if rkey >= cursor { 82 + continue; // Skip posts at or after cursor 83 + } 84 + } 85 + timeline_items.push((rkey, TimelineItem::Post { 86 + actor_id: profile.actor_id, 87 + rkey 88 + })); 89 + } 90 + 91 + // Add reposts 92 + for &rkey in &profile.repost_rkeys { 93 + // Apply cursor filtering if needed 94 + if let Some(cursor) = cursor_tid { 95 + if rkey >= cursor { 96 + continue; // Skip reposts at or after cursor 97 + } 98 + } 99 + timeline_items.push((rkey, TimelineItem::Repost { 100 + reposter_id: profile.actor_id, 101 + rkey 102 + })); 103 + } 104 + } 105 + 106 + // Sort by rkey (TID) descending (newest first) 107 + timeline_items.sort_by(|a, b| b.0.cmp(&a.0)); 61 108 62 109 // Check for next page 63 - let has_next = posts_result.len() > limit as usize; 64 - let posts_to_return = if has_next { 65 - &posts_result[..limit as usize] 66 - } else { 67 - &posts_result[..] 68 - }; 110 + let has_next = timeline_items.len() > limit as usize; 111 + if has_next { 112 + timeline_items.truncate(limit as usize + 1); // Keep one extra for cursor 113 + } 69 114 70 - // Convert to the expected format with optional reposter 71 - let posts_with_reposter: Vec<(i32, i64, Option<i32>)> = posts_to_return 72 - .iter() 73 - .map(|(actor_id, rkey, _)| (*actor_id, *rkey, None)) 74 - .collect(); 75 - 76 - // Build cursor 77 - let cursor = if has_next { 78 - posts_result.get(limit as usize - 1).map(|(_, _, ts)| ts.timestamp_millis().to_string()) 115 + // Build cursor from last item (if we have more pages) 116 + let cursor = if has_next && timeline_items.len() > limit as usize { 117 + let last_rkey = timeline_items[limit as usize - 1].0; 118 + let timestamp = parakeet_db::tid_util::tid_to_datetime(last_rkey); 119 + Some(timestamp.timestamp_millis().to_string()) 79 120 } else { 80 121 None 81 122 }; 82 123 83 - // Create a result structure 84 - struct TimelineResult { 85 - posts_with_reposter: Vec<(i32, i64, Option<i32>)>, 86 - cursor: Option<String>, 124 + // Take only what we're returning 125 + if has_next { 126 + timeline_items.truncate(limit as usize); 87 127 } 88 128 89 - let result = TimelineResult { 90 - posts_with_reposter, 91 - cursor, 92 - }; 93 - let db_time = step_timer.elapsed().as_secs_f64() * 1000.0; 94 - tracing::info!(" ├─ Database query: {:.1} ms (returned {} posts)", db_time, result.posts_with_reposter.len()); 129 + let sort_time = step_timer.elapsed().as_secs_f64() * 1000.0; 130 + tracing::info!(" ├─ Sort and filter timeline: {:.1} ms ({} items)", sort_time, timeline_items.len()); 95 131 96 - // Extract post URIs for caching 132 + // Process timeline items (hydrate posts and reposts) 97 133 step_timer = std::time::Instant::now(); 98 - let mut post_uris = Vec::new(); 99 - for (post_actor_id, post_rkey, _) in &result.posts_with_reposter { 100 - // Build AT URI 101 - let did = state.profile_entity.get_did_by_id(*post_actor_id).await 102 - .unwrap_or_else(|_| format!("did:plc:unknown{}", post_actor_id)); 103 - post_uris.push(format!("at://{}/app.bsky.feed.post/{}", did, parakeet_db::tid_util::encode_tid(*post_rkey))); 104 - } 105 134 135 + // Build FeedViewPosts 136 + let mut feed = Vec::new(); 106 137 107 - // Get posts with reply context using PostEntity 108 - let mut posts_with_context = state.post_entity.get_by_uris_with_reply_context(post_uris.clone(), Some(&user_did)).await 109 - .unwrap_or_default(); 138 + for (rkey, item) in timeline_items { 139 + match item { 140 + TimelineItem::Post { actor_id, rkey } => { 141 + // Regular post - fetch and add to feed 142 + if let Ok(Some(post_data)) = state.post_entity.get_post_by_key(actor_id, rkey).await { 143 + // Convert to PostView 144 + let post_views = state.post_entity.convert_to_post_views(vec![post_data.clone()], Some(&user_did)).await; 110 145 111 - // Build feed with repost reasons 112 - let mut feed = Vec::new(); 113 - for (i, uri) in post_uris.iter().enumerate() { 114 - if let Some((post, reply_context)) = posts_with_context.remove(uri) { 115 - let (_, _, reposter_actor_id) = &result.posts_with_reposter[i]; 146 + if let Some(post_view) = post_views.into_iter().next() { 147 + // Build reply context if this is a reply 148 + let reply_context = state.post_entity.build_reply_context(&post_data, Some(&user_did)).await; 116 149 117 - // Check if this is a repost 118 - let reason = if let Some(reposter_id) = reposter_actor_id { 119 - // Get reposter profile 120 - if let Ok(reposter) = state.profile_entity.get_profile_by_id(*reposter_id).await { 121 - let reposter_view = state.profile_entity.actor_to_profile_view_basic(&reposter); 122 - Some(FeedViewPostReason::Repost(Box::new(FeedReasonRepost { 123 - by: reposter_view, 124 - uri: None, // TODO: Get actual repost URI 125 - cid: None, // TODO: Get actual repost CID 126 - indexed_at: chrono::Utc::now(), // TODO: Get actual repost time 127 - }))) 128 - } else { 129 - None 150 + feed.push(FeedViewPost { 151 + post: post_view, 152 + reply: reply_context, 153 + reason: None, 154 + feed_context: None, 155 + }); 156 + } 130 157 } 131 - } else { 132 - None 133 - }; 158 + } 159 + TimelineItem::Repost { reposter_id, rkey } => { 160 + // Repost - fetch the repost info, then the original post 161 + if let Ok(Some((post_actor_id, post_rkey, indexed_at))) = 162 + state.post_entity.get_repost_info(reposter_id, rkey).await { 134 163 135 - feed.push(FeedViewPost { 136 - post, 137 - reply: reply_context, 138 - reason, 139 - feed_context: None, 140 - }); 164 + // Fetch the original post being reposted 165 + if let Ok(Some(post_data)) = state.post_entity.get_post_by_key(post_actor_id, post_rkey).await { 166 + // Convert to PostView 167 + let post_views = state.post_entity.convert_to_post_views(vec![post_data.clone()], Some(&user_did)).await; 168 + 169 + if let Some(post_view) = post_views.into_iter().next() { 170 + // Get reposter's profile for the reason 171 + if let Ok(reposter) = state.profile_entity.get_profile_by_id(reposter_id).await { 172 + // Convert to ProfileViewBasic 173 + let reposter_basic = state.profile_entity.actor_to_profile_view_basic(&reposter); 174 + 175 + // Build the repost reason 176 + let reason = Some(lexica::app_bsky::feed::FeedViewPostReason::Repost(Box::new( 177 + lexica::app_bsky::feed::FeedReasonRepost { 178 + by: reposter_basic, 179 + uri: Some(format!( 180 + "at://{}/app.bsky.feed.repost/{}", 181 + reposter.did, 182 + parakeet_db::tid_util::encode_tid(rkey) 183 + )), 184 + cid: None, // TODO: Add CID if needed 185 + indexed_at, 186 + } 187 + ))); 188 + 189 + // Build reply context if the reposted post is a reply 190 + let reply_context = state.post_entity.build_reply_context(&post_data, Some(&user_did)).await; 191 + 192 + feed.push(FeedViewPost { 193 + post: post_view, 194 + reply: reply_context, 195 + reason, 196 + feed_context: None, 197 + }); 198 + } 199 + } 200 + } 201 + } 202 + } 141 203 } 142 204 } 143 205 ··· 145 207 tracing::info!(" ├─ Hydrate feed posts: {:.1} ms", hydrate_time); 146 208 147 209 let total_time = start.elapsed().as_secs_f64() * 1000.0; 148 - tracing::info!(" └─ getTimeline total: {:.1} ms (returning {} posts, cursor: {:?})", total_time, feed.len(), result.cursor); 210 + tracing::info!(" └─ getTimeline total: {:.1} ms (returning {} posts, cursor: {:?})", total_time, feed.len(), cursor); 149 211 150 212 Ok(Json(GetTimelineRes { 151 - cursor: result.cursor, 213 + cursor, 152 214 feed, 153 215 })) 154 216 } ··· 167 229 tracing::info!("getAuthorFeed: actor={}, filter={:?}, limit={}, cursor={:?}", 168 230 query.actor, query.filter, limit, query.cursor); 169 231 170 - // Resolve actor to actor_id using ProfileEntity 232 + // Resolve actor to actor_id using ProfileEntity (only DID resolution here) 171 233 let actor_id = state.profile_entity.resolve_identifier(&query.actor).await 172 234 .map_err(|_| crate::common::errors::Error::actor_not_found(&query.actor))?; 173 235 174 - // Parse cursor 175 - let cursor_value = datetime_cursor(query.cursor.as_ref()); 236 + // Parse cursor (convert to TID if provided) 237 + let cursor_tid = query.cursor.as_ref() 238 + .and_then(|c| datetime_cursor(Some(c))) 239 + .map(|dt| { 240 + let tid_str = parakeet_db::tid_util::timestamp_to_tid(dt); 241 + parakeet_db::tid_util::decode_tid(&tid_str).unwrap_or(0) 242 + }); 176 243 177 - // Try cache first (for posts_no_replies filter only) 244 + // Use ProfileEntity to get author's posts 178 245 let mut step_timer = std::time::Instant::now(); 179 - if query.filter == Some("posts_no_replies".to_string()) { 180 - if let Some(cached) = state.author_feed_cache.get(actor_id, "posts_no_replies", query.cursor.as_deref()).await { 181 - let cache_time = step_timer.elapsed().as_secs_f64() * 1000.0; 182 - tracing::info!(" ├─ Author feed cache hit: {:.1} ms", cache_time); 183 246 184 - step_timer = std::time::Instant::now(); 247 + let post_keys = state.profile_entity 248 + .get_author_posts(actor_id, limit as usize + 1, cursor_tid, query.filter.as_deref()) 249 + .await?; 185 250 186 - // Get posts using PostEntity 187 - let posts_map = state.post_entity.get_by_uris(cached.post_uris.clone(), viewer_did.as_deref()).await 188 - .unwrap_or_default(); 251 + let profile_time = step_timer.elapsed().as_secs_f64() * 1000.0; 252 + tracing::info!(" ├─ Get author posts: {:.1} ms ({} posts)", profile_time, post_keys.len()); 189 253 190 - // Convert to FeedViewPosts maintaining order 191 - let mut feed = Vec::new(); 192 - for uri in cached.post_uris { 193 - if let Some(post) = posts_map.get(&uri) { 194 - feed.push(FeedViewPost { 195 - post: post.clone(), 196 - reply: None, 197 - reason: None, 198 - feed_context: None, 199 - }); 200 - } 201 - } 254 + // Check for next page 255 + let has_next = post_keys.len() > limit as usize; 256 + let posts_to_return = if has_next { 257 + &post_keys[..limit as usize] 258 + } else { 259 + &post_keys[..] 260 + }; 202 261 203 - let hydrate_time = step_timer.elapsed().as_secs_f64() * 1000.0; 204 - tracing::info!(" ├─ Hydrate cached author feed: {:.1} ms", hydrate_time); 205 - 206 - let total_time = start.elapsed().as_secs_f64() * 1000.0; 207 - tracing::info!(" └─ getAuthorFeed total (cache hit): {:.1} ms (returning {} posts)", total_time, feed.len()); 208 - 209 - return Ok(Json(GetAuthorFeedRes { 210 - cursor: cached.cursor, 211 - feed, 212 - })); 213 - } 214 - } 262 + // Build cursor from last post 263 + let cursor = if has_next && posts_to_return.len() == limit as usize { 264 + let last_rkey = posts_to_return[posts_to_return.len() - 1].1; 265 + let timestamp = parakeet_db::tid_util::tid_to_datetime(last_rkey); 266 + Some(timestamp.timestamp_millis().to_string()) 267 + } else { 268 + None 269 + }; 215 270 216 - // Cache miss or different filter - query database 271 + // Hydrate posts using PostEntity (uses caching internally) 217 272 step_timer = std::time::Instant::now(); 218 - // Use PostEntity's get_author_feed with filter 219 - let posts = state.post_entity 220 - .get_author_feed(actor_id, cursor_value.as_ref(), limit, query.filter.as_deref()) 221 - .await 222 - .map_err(|e| Error::server_error(Some(&e.to_string())))?; 223 273 224 - let db_time = step_timer.elapsed().as_secs_f64() * 1000.0; 225 - tracing::info!(" ├─ Database query: {:.1} ms (returned {} posts)", db_time, posts.len()); 274 + let post_data_list = state.post_entity.get_posts_by_keys(posts_to_return).await 275 + .unwrap_or_default(); 226 276 227 - // Build cursor from last post 228 - let cursor = posts.last().map(|post| post.2.timestamp_millis().to_string()); 277 + // Get reply context if needed 278 + let mut posts_with_reply: Vec<(PostView, Option<lexica::app_bsky::feed::ReplyRef>)> = Vec::new(); 229 279 230 - // Extract post URIs 231 - let actor_did = state.profile_entity.get_did_by_id(actor_id).await 232 - .unwrap_or_else(|_| format!("did:plc:unknown{}", actor_id)); 280 + for post_data in post_data_list { 281 + // Convert to PostView 282 + let post_views = state.post_entity.convert_to_post_views(vec![post_data.clone()], viewer_did.as_deref()).await; 233 283 234 - let post_uris: Vec<String> = posts.iter().map(|(_, rkey, _)| { 235 - format!("at://{}/app.bsky.feed.post/{}", actor_did, parakeet_db::tid_util::encode_tid(*rkey)) 236 - }).collect(); 284 + if let Some(post_view) = post_views.into_iter().next() { 285 + // Build reply context if this is a reply 286 + let reply_context = state.post_entity.build_reply_context(&post_data, viewer_did.as_deref()).await; 237 287 238 - // Cache for posts_no_replies filter 239 - if query.filter == Some("posts_no_replies".to_string()) && !post_uris.is_empty() { 240 - state.author_feed_cache.set( 241 - actor_id, 242 - "posts_no_replies", 243 - query.cursor.as_deref(), 244 - post_uris.clone(), 245 - cursor.clone(), 246 - ).await; 288 + posts_with_reply.push((post_view, reply_context)); 289 + } 247 290 } 248 291 249 - // Get posts with reply context using PostEntity 250 - step_timer = std::time::Instant::now(); 251 - let mut posts_with_context = state.post_entity.get_by_uris_with_reply_context(post_uris.clone(), viewer_did.as_deref()).await 252 - .unwrap_or_default(); 292 + let hydrate_time = step_timer.elapsed().as_secs_f64() * 1000.0; 293 + tracing::info!(" ├─ Hydrate posts: {:.1} ms", hydrate_time); 253 294 254 - // Convert to FeedViewPosts maintaining order 295 + // Build feed 255 296 let mut feed = Vec::new(); 256 - for uri in post_uris { 257 - if let Some((post, reply_context)) = posts_with_context.remove(&uri) { 258 - feed.push(FeedViewPost { 259 - post, 260 - reply: reply_context, 261 - reason: None, 262 - feed_context: None, 263 - }); 264 - } 297 + for (post, reply_context) in posts_with_reply { 298 + feed.push(FeedViewPost { 299 + post, 300 + reply: reply_context, 301 + reason: None, 302 + feed_context: None, 303 + }); 265 304 } 266 - 267 - let hydrate_time = step_timer.elapsed().as_secs_f64() * 1000.0; 268 - tracing::info!(" ├─ Hydrate author feed: {:.1} ms", hydrate_time); 269 305 270 306 let total_time = start.elapsed().as_secs_f64() * 1000.0; 271 307 tracing::info!(" └─ getAuthorFeed total: {:.1} ms (returning {} posts)", total_time, feed.len());