Parakeet is a Rust-based Bluesky AppServer aiming to implement most of the functionality required to support the Bluesky client
appview atproto bluesky rust appserver
69
fork

Configure Feed

Select the types of activity you want to include in your feed.

Merge branch 'feat-viewer-interactions' into 'main'

Feat: Viewer Interactions

See merge request parakeet-social/parakeet!22

+1018 -74
+28 -6
lexica/src/app_bsky/actor.rs
··· 1 1 use crate::app_bsky::embed::External; 2 + use crate::app_bsky::graph::ListViewBasic; 2 3 use crate::com_atproto::label::Label; 3 4 use chrono::prelude::*; 4 5 use serde::{Deserialize, Serialize}; 5 6 use std::fmt::Display; 6 7 use std::str::FromStr; 8 + 9 + #[derive(Clone, Default, Debug, Serialize)] 10 + #[serde(rename_all = "camelCase")] 11 + pub struct ProfileViewerState { 12 + pub muted: bool, 13 + #[serde(skip_serializing_if = "Option::is_none")] 14 + pub muted_by_list: Option<ListViewBasic>, 15 + pub blocked_by: bool, 16 + #[serde(skip_serializing_if = "Option::is_none")] 17 + pub blocking: Option<String>, 18 + #[serde(skip_serializing_if = "Option::is_none")] 19 + pub blocking_by_list: Option<ListViewBasic>, 20 + #[serde(skip_serializing_if = "Option::is_none")] 21 + pub following: Option<String>, 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + pub followed_by: Option<String>, 24 + // #[serde(skip_serializing_if = "Option::is_none")] 25 + // pub known_followers: Option<()>, 26 + // #[serde(skip_serializing_if = "Option::is_none")] 27 + // pub activity_subscriptions: Option<()>, 28 + } 7 29 8 30 #[derive(Clone, Default, Debug, Serialize)] 9 31 #[serde(rename_all = "camelCase")] ··· 130 152 pub avatar: Option<String>, 131 153 #[serde(skip_serializing_if = "Option::is_none")] 132 154 pub associated: Option<ProfileAssociated>, 133 - // #[serde(skip_serializing_if = "Option::is_none")] 134 - // pub viewer: Option<()>, 155 + #[serde(skip_serializing_if = "Option::is_none")] 156 + pub viewer: Option<ProfileViewerState>, 135 157 #[serde(skip_serializing_if = "Vec::is_empty")] 136 158 pub labels: Vec<Label>, 137 159 #[serde(skip_serializing_if = "Option::is_none")] ··· 156 178 pub avatar: Option<String>, 157 179 #[serde(skip_serializing_if = "Option::is_none")] 158 180 pub associated: Option<ProfileAssociated>, 159 - // #[serde(skip_serializing_if = "Option::is_none")] 160 - // pub viewer: Option<()>, 181 + #[serde(skip_serializing_if = "Option::is_none")] 182 + pub viewer: Option<ProfileViewerState>, 161 183 #[serde(skip_serializing_if = "Vec::is_empty")] 162 184 pub labels: Vec<Label>, 163 185 #[serde(skip_serializing_if = "Option::is_none")] ··· 189 211 pub associated: Option<ProfileAssociated>, 190 212 // #[serde(skip_serializing_if = "Option::is_none")] 191 213 // pub joined_via_starter_pack: Option<()>, 192 - // #[serde(skip_serializing_if = "Option::is_none")] 193 - // pub viewer: Option<()>, 214 + #[serde(skip_serializing_if = "Option::is_none")] 215 + pub viewer: Option<ProfileViewerState>, 194 216 #[serde(skip_serializing_if = "Vec::is_empty")] 195 217 pub labels: Vec<Label>, 196 218 // #[serde(skip_serializing_if = "Option::is_none")]
+28 -7
lexica/src/app_bsky/feed.rs
··· 1 1 use super::RecordStats; 2 - use crate::app_bsky::actor::{ProfileView, ProfileViewBasic}; 2 + use crate::app_bsky::actor::{ProfileView, ProfileViewBasic, ProfileViewerState}; 3 3 use crate::app_bsky::embed::Embed; 4 4 use crate::app_bsky::graph::ListViewBasic; 5 5 use crate::app_bsky::richtext::FacetMain; ··· 8 8 use serde::{Deserialize, Serialize}; 9 9 use std::str::FromStr; 10 10 11 + #[derive(Clone, Default, Debug, Serialize)] 12 + #[serde(rename_all = "camelCase")] 13 + pub struct PostViewerState { 14 + #[serde(skip_serializing_if = "Option::is_none")] 15 + pub repost: Option<String>, 16 + #[serde(skip_serializing_if = "Option::is_none")] 17 + pub like: Option<String>, 18 + pub bookmarked: bool, 19 + pub thread_muted: bool, 20 + pub reply_disabled: bool, 21 + pub embedding_disabled: bool, 22 + pub pinned: bool, 23 + } 24 + 11 25 #[derive(Clone, Debug, Serialize)] 12 26 #[serde(rename_all = "camelCase")] 13 27 pub struct PostView { ··· 23 37 24 38 #[serde(skip_serializing_if = "Vec::is_empty")] 25 39 pub labels: Vec<Label>, 26 - // #[serde(skip_serializing_if = "Option::is_none")] 27 - // pub viewer: Option<()>, 40 + #[serde(skip_serializing_if = "Option::is_none")] 41 + pub viewer: Option<PostViewerState>, 28 42 #[serde(skip_serializing_if = "Option::is_none")] 29 43 pub threadgate: Option<ThreadgateView>, 30 44 ··· 123 137 124 138 #[derive(Clone, Debug, Serialize)] 125 139 pub struct BlockedAuthor { 126 - pub uri: String, 127 - // pub viewer: Option<()>, 140 + pub did: String, 141 + pub viewer: Option<ProfileViewerState>, 142 + } 143 + 144 + #[derive(Clone, Default, Debug, Serialize)] 145 + #[serde(rename_all = "camelCase")] 146 + pub struct GeneratorViewerState { 147 + #[serde(skip_serializing_if = "Option::is_none")] 148 + pub like: Option<String>, 128 149 } 129 150 130 151 #[derive(Clone, Debug, Serialize)] ··· 148 169 pub accepts_interactions: bool, 149 170 #[serde(skip_serializing_if = "Vec::is_empty")] 150 171 pub labels: Vec<Label>, 151 - // #[serde(skip_serializing_if = "Option::is_none")] 152 - // pub viewer: Option<()>, 172 + #[serde(skip_serializing_if = "Option::is_none")] 173 + pub viewer: Option<GeneratorViewerState>, 153 174 #[serde(skip_serializing_if = "Option::is_none")] 154 175 pub content_mode: Option<GeneratorContentMode>, 155 176
+12 -4
lexica/src/app_bsky/graph.rs
··· 6 6 use serde::{Deserialize, Serialize}; 7 7 use std::str::FromStr; 8 8 9 + #[derive(Clone, Default, Debug, Serialize)] 10 + #[serde(rename_all = "camelCase")] 11 + pub struct ListViewerState { 12 + pub muted: bool, 13 + #[serde(skip_serializing_if = "Option::is_none")] 14 + pub blocked: Option<String>, 15 + } 16 + 9 17 #[derive(Clone, Debug, Serialize)] 10 18 #[serde(rename_all = "camelCase")] 11 19 pub struct ListViewBasic { ··· 18 26 pub avatar: Option<String>, 19 27 pub list_item_count: i64, 20 28 21 - // #[serde(skip_serializing_if = "Option::is_none")] 22 - // pub viewer: Option<()>, 29 + #[serde(skip_serializing_if = "Option::is_none")] 30 + pub viewer: Option<ListViewerState>, 23 31 #[serde(skip_serializing_if = "Vec::is_empty")] 24 32 pub labels: Vec<Label>, 25 33 ··· 44 52 pub avatar: Option<String>, 45 53 pub list_item_count: i64, 46 54 47 - // #[serde(skip_serializing_if = "Option::is_none")] 48 - // pub viewer: Option<()>, 55 + #[serde(skip_serializing_if = "Option::is_none")] 56 + pub viewer: Option<ListViewerState>, 49 57 #[serde(skip_serializing_if = "Vec::is_empty")] 50 58 pub labels: Vec<Label>, 51 59
+11 -4
lexica/src/app_bsky/labeler.rs
··· 4 4 use chrono::prelude::*; 5 5 use serde::{Deserialize, Serialize}; 6 6 7 + #[derive(Clone, Default, Debug, Serialize)] 8 + #[serde(rename_all = "camelCase")] 9 + pub struct LabelerViewerState { 10 + #[serde(skip_serializing_if = "Option::is_none")] 11 + pub like: Option<String>, 12 + } 13 + 7 14 #[derive(Clone, Debug, Serialize)] 8 15 #[serde(rename_all = "camelCase")] 9 16 pub struct LabelerView { ··· 12 19 pub creator: ProfileView, 13 20 14 21 pub like_count: i64, 15 - // #[serde(skip_serializing_if = "Option::is_none")] 16 - // pub viewer: Option<()>, 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + pub viewer: Option<LabelerViewerState>, 17 24 #[serde(skip_serializing_if = "Vec::is_empty")] 18 25 pub labels: Vec<Label>, 19 26 pub indexed_at: DateTime<Utc>, ··· 27 34 pub creator: ProfileView, 28 35 29 36 pub like_count: i64, 30 - // #[serde(skip_serializing_if = "Option::is_none")] 31 - // pub viewer: Option<()>, 37 + #[serde(skip_serializing_if = "Option::is_none")] 38 + pub viewer: Option<LabelerViewerState>, 32 39 #[serde(skip_serializing_if = "Vec::is_empty")] 33 40 pub labels: Vec<Label>, 34 41 pub policies: LabelerPolicy,
+17
migrations/2025-09-17-190406_viewer-interactions/down.sql
··· 1 + drop trigger t_profile_state_ins on follows; 2 + drop trigger t_profile_state_del on follows; 3 + drop trigger t_profile_state_ins on blocks; 4 + drop trigger t_profile_state_del on blocks; 5 + drop trigger t_profile_state_ins on mutes; 6 + drop trigger t_profile_state_del on mutes; 7 + 8 + drop function f_profile_state_ins_follow; 9 + drop function f_profile_state_del_follow; 10 + drop function f_profile_state_ins_block; 11 + drop function f_profile_state_del_block; 12 + drop function f_profile_state_ins_mute; 13 + drop function f_profile_state_del_mute; 14 + 15 + drop view v_list_mutes_exp; 16 + drop view v_list_block_exp; 17 + drop table profile_states;
+146
migrations/2025-09-17-190406_viewer-interactions/up.sql
··· 1 + create table profile_states 2 + ( 3 + did text not null, 4 + subject text not null, 5 + muting bool not null default false, -- subj muted by did 6 + blocked bool not null default false, -- did blocked by subj 7 + blocking text, -- subj blocked by did 8 + following text, -- rkey of follow record (did->subj) 9 + followed text, -- rkey of follow record (subj->did) 10 + 11 + primary key (did, subject) 12 + ); 13 + 14 + create index profilestates_did_index on profile_states using hash (did); 15 + create index profilestates_sub_index on profile_states using hash (subject); 16 + 17 + create view v_list_block_exp as 18 + ( 19 + select lb.list_uri, did, li.subject 20 + from list_blocks lb 21 + inner join list_items li on lb.list_uri = li.list_uri 22 + ); 23 + 24 + create view v_list_mutes_exp as 25 + ( 26 + select lm.list_uri, did, li.subject 27 + from list_mutes lm 28 + inner join list_items li on lm.list_uri = li.list_uri 29 + ); 30 + 31 + -- profile_states follow triggers 32 + create function f_profile_state_ins_follow() returns trigger 33 + language plpgsql as 34 + $$ 35 + begin 36 + insert into profile_states (did, subject, following) 37 + VALUES (NEW.did, NEW.subject, NEW.rkey) 38 + ON CONFLICT (did, subject) DO UPDATE SET following=excluded.following; 39 + 40 + insert into profile_states (did, subject, followed) 41 + VALUES (NEW.subject, NEW.did, NEW.rkey) 42 + ON CONFLICT (did, subject) DO UPDATE SET followed=excluded.followed; 43 + 44 + return NEW; 45 + end; 46 + $$; 47 + 48 + create trigger t_profile_state_ins 49 + before insert 50 + on follows 51 + for each row 52 + execute procedure f_profile_state_ins_follow(); 53 + 54 + create function f_profile_state_del_follow() returns trigger 55 + language plpgsql as 56 + $$ 57 + begin 58 + update profile_states set following = null where did = OLD.did and subject = OLD.subject; 59 + update profile_states set followed = null where did = OLD.subject and subject = OLD.did; 60 + 61 + return OLD; 62 + end; 63 + $$; 64 + 65 + create trigger t_profile_state_del 66 + before delete 67 + on follows 68 + for each row 69 + execute procedure f_profile_state_del_follow(); 70 + 71 + -- profile_states block triggers 72 + 73 + create function f_profile_state_ins_block() returns trigger 74 + language plpgsql as 75 + $$ 76 + begin 77 + insert into profile_states (did, subject, blocking) 78 + VALUES (NEW.did, NEW.subject, NEW.rkey) 79 + ON CONFLICT (did, subject) DO UPDATE SET blocking=excluded.blocking; 80 + 81 + insert into profile_states (did, subject, blocked) 82 + VALUES (NEW.subject, NEW.did, TRUE) 83 + ON CONFLICT (did, subject) DO UPDATE SET blocked=excluded.blocked; 84 + 85 + return NEW; 86 + end; 87 + $$; 88 + 89 + create trigger t_profile_state_ins 90 + before insert 91 + on blocks 92 + for each row 93 + execute procedure f_profile_state_ins_block(); 94 + 95 + create function f_profile_state_del_block() returns trigger 96 + language plpgsql as 97 + $$ 98 + begin 99 + update profile_states set blocking = null where did = OLD.did and subject = OLD.subject; 100 + update profile_states set blocked = FALSE where did = OLD.subject and subject = OLD.did; 101 + 102 + return OLD; 103 + end; 104 + $$; 105 + 106 + create trigger t_profile_state_del 107 + before delete 108 + on blocks 109 + for each row 110 + execute procedure f_profile_state_del_block(); 111 + 112 + -- profile_states mutes triggers 113 + 114 + create function f_profile_state_ins_mute() returns trigger 115 + language plpgsql as 116 + $$ 117 + begin 118 + insert into profile_states (did, subject, muting) 119 + VALUES (NEW.did, NEW.subject, TRUE) 120 + ON CONFLICT (did, subject) DO UPDATE SET muting=excluded.muting; 121 + 122 + return NEW; 123 + end; 124 + $$; 125 + 126 + create trigger t_profile_state_ins 127 + before insert 128 + on mutes 129 + for each row 130 + execute procedure f_profile_state_ins_mute(); 131 + 132 + create function f_profile_state_del_mute() returns trigger 133 + language plpgsql as 134 + $$ 135 + begin 136 + update profile_states set muting = false where did = OLD.did and subject = OLD.subject; 137 + 138 + return OLD; 139 + end; 140 + $$; 141 + 142 + create trigger t_profile_state_del 143 + before delete 144 + on mutes 145 + for each row 146 + execute procedure f_profile_state_del_mute();
+13
parakeet-db/src/schema.rs
··· 288 288 } 289 289 290 290 diesel::table! { 291 + profile_states (did, subject) { 292 + did -> Text, 293 + subject -> Text, 294 + muting -> Bool, 295 + blocked -> Bool, 296 + blocking -> Nullable<Text>, 297 + following -> Nullable<Text>, 298 + followed -> Nullable<Text>, 299 + } 300 + } 301 + 302 + diesel::table! { 291 303 profiles (did) { 292 304 did -> Text, 293 305 cid -> Text, ··· 441 453 post_embed_video_captions, 442 454 postgates, 443 455 posts, 456 + profile_states, 444 457 profiles, 445 458 records, 446 459 reposts,
+167
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 + use diesel::sql_types::{Array, Bool, Nullable, Text}; 2 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 3 4 use parakeet_db::{schema, types}; 4 5 ··· 13 14 .await 14 15 .optional() 15 16 } 17 + 18 + #[derive(Clone, Debug, QueryableByName)] 19 + #[diesel(check_for_backend(diesel::pg::Pg))] 20 + pub struct ProfileStateRet { 21 + #[diesel(sql_type = Text)] 22 + pub did: String, 23 + #[diesel(sql_type = Text)] 24 + pub subject: String, 25 + #[diesel(sql_type = Nullable<Bool>)] 26 + pub muting: Option<bool>, 27 + #[diesel(sql_type = Nullable<Bool>)] 28 + pub blocked: Option<bool>, 29 + #[diesel(sql_type = Nullable<Text>)] 30 + pub blocking: Option<String>, 31 + #[diesel(sql_type = Nullable<Text>)] 32 + pub following: Option<String>, 33 + #[diesel(sql_type = Nullable<Text>)] 34 + pub followed: Option<String>, 35 + #[diesel(sql_type = Nullable<Text>)] 36 + pub list_block: Option<String>, 37 + #[diesel(sql_type = Nullable<Text>)] 38 + pub list_mute: Option<String>, 39 + } 40 + pub async fn get_profile_state( 41 + conn: &mut AsyncPgConnection, 42 + did: &str, 43 + sub: &str, 44 + ) -> QueryResult<Option<ProfileStateRet>> { 45 + diesel::sql_query(include_str!("sql/profile_state.sql")) 46 + .bind::<Text, _>(did) 47 + .bind::<Array<Text>, _>(vec![sub]) 48 + .get_result::<ProfileStateRet>(conn) 49 + .await 50 + .optional() 51 + } 52 + pub async fn get_profile_states( 53 + conn: &mut AsyncPgConnection, 54 + did: &str, 55 + sub: &[String], 56 + ) -> QueryResult<Vec<ProfileStateRet>> { 57 + diesel::sql_query(include_str!("sql/profile_state.sql")) 58 + .bind::<Text, _>(did) 59 + .bind::<Array<Text>, _>(sub) 60 + .load::<ProfileStateRet>(conn) 61 + .await 62 + } 63 + 64 + #[derive(Clone, Debug, QueryableByName)] 65 + #[diesel(check_for_backend(diesel::pg::Pg))] 66 + pub struct PostStateRet { 67 + #[diesel(sql_type = diesel::sql_types::Text)] 68 + pub at_uri: String, 69 + #[diesel(sql_type = diesel::sql_types::Text)] 70 + pub did: String, 71 + #[diesel(sql_type = diesel::sql_types::Text)] 72 + pub cid: String, 73 + #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 74 + pub like_rkey: Option<String>, 75 + #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 76 + pub repost_rkey: Option<String>, 77 + #[diesel(sql_type = diesel::sql_types::Bool)] 78 + pub bookmarked: bool, 79 + // #[diesel(sql_type = diesel::sql_types::Bool)] 80 + // pub muted: bool, 81 + #[diesel(sql_type = diesel::sql_types::Bool)] 82 + pub embed_disabled: bool, 83 + #[diesel(sql_type = diesel::sql_types::Bool)] 84 + pub pinned: bool, 85 + } 86 + pub async fn get_post_state( 87 + conn: &mut AsyncPgConnection, 88 + did: &str, 89 + subject: &str, 90 + ) -> QueryResult<Option<PostStateRet>> { 91 + diesel::sql_query(include_str!("sql/post_state.sql")) 92 + .bind::<Text, _>(did) 93 + .bind::<Array<Text>, _>(vec![subject]) 94 + .get_result::<PostStateRet>(conn) 95 + .await 96 + .optional() 97 + } 98 + 99 + pub async fn get_post_states( 100 + conn: &mut AsyncPgConnection, 101 + did: &str, 102 + sub: &[String], 103 + ) -> QueryResult<Vec<PostStateRet>> { 104 + diesel::sql_query(include_str!("sql/post_state.sql")) 105 + .bind::<Text, _>(did) 106 + .bind::<Array<Text>, _>(sub) 107 + .load::<PostStateRet>(conn) 108 + .await 109 + } 110 + 111 + #[derive(Clone, Debug, QueryableByName)] 112 + #[diesel(check_for_backend(diesel::pg::Pg))] 113 + pub struct ListStateRet { 114 + #[diesel(sql_type = Text)] 115 + pub at_uri: String, 116 + #[diesel(sql_type = Bool)] 117 + pub muted: bool, 118 + #[diesel(sql_type = Nullable<Text>)] 119 + pub block: Option<String>, 120 + } 121 + 122 + pub async fn get_list_state( 123 + conn: &mut AsyncPgConnection, 124 + did: &str, 125 + subject: &str, 126 + ) -> QueryResult<Option<ListStateRet>> { 127 + diesel::sql_query(include_str!("sql/list_states.sql")) 128 + .bind::<Text, _>(did) 129 + .bind::<Array<Text>, _>(vec![subject]) 130 + .get_result::<ListStateRet>(conn) 131 + .await 132 + .optional() 133 + } 134 + 135 + pub async fn get_list_states( 136 + conn: &mut AsyncPgConnection, 137 + did: &str, 138 + sub: &[String], 139 + ) -> QueryResult<Vec<ListStateRet>> { 140 + diesel::sql_query(include_str!("sql/list_states.sql")) 141 + .bind::<Text, _>(did) 142 + .bind::<Array<Text>, _>(sub) 143 + .load::<ListStateRet>(conn) 144 + .await 145 + } 146 + 147 + pub async fn get_like_state( 148 + conn: &mut AsyncPgConnection, 149 + did: &str, 150 + subject: &str, 151 + ) -> QueryResult<Option<(String, String)>> { 152 + schema::likes::table 153 + .select((schema::likes::did, schema::likes::rkey)) 154 + .filter( 155 + schema::likes::did 156 + .eq(did) 157 + .and(schema::likes::subject.eq(subject)), 158 + ) 159 + .get_result(conn) 160 + .await 161 + .optional() 162 + } 163 + 164 + pub async fn get_like_states( 165 + conn: &mut AsyncPgConnection, 166 + did: &str, 167 + sub: &[String], 168 + ) -> QueryResult<Vec<(String, String, String)>> { 169 + schema::likes::table 170 + .select(( 171 + schema::likes::subject, 172 + schema::likes::did, 173 + schema::likes::rkey, 174 + )) 175 + .filter( 176 + schema::likes::did 177 + .eq(did) 178 + .and(schema::likes::subject.eq_any(sub)), 179 + ) 180 + .load(conn) 181 + .await 182 + }
+41 -3
parakeet/src/hydration/feedgen.rs
··· 1 1 use crate::hydration::map_labels; 2 2 use crate::xrpc::cdn::BskyCdn; 3 3 use lexica::app_bsky::actor::ProfileView; 4 - use lexica::app_bsky::feed::{GeneratorContentMode, GeneratorView}; 4 + use lexica::app_bsky::feed::{GeneratorContentMode, GeneratorView, GeneratorViewerState}; 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 7 use std::str::FromStr; 8 8 9 + fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 10 + GeneratorViewerState { 11 + like: Some(format!("at://{did}/app.bsky.feed.like/{rkey}")), 12 + } 13 + } 14 + 9 15 fn build_feedgen( 10 16 feedgen: models::FeedGen, 11 17 creator: ProfileView, 12 18 labels: Vec<models::Label>, 13 19 likes: Option<i32>, 20 + viewer: Option<GeneratorViewerState>, 14 21 cdn: &BskyCdn, 15 22 ) -> GeneratorView { 16 23 let content_mode = feedgen ··· 35 42 like_count: likes.unwrap_or_default() as i64, 36 43 accepts_interactions: feedgen.accepts_interactions, 37 44 labels: map_labels(labels), 45 + viewer, 38 46 content_mode, 39 47 indexed_at: feedgen.created_at, 40 48 } ··· 43 51 impl super::StatefulHydrator<'_> { 44 52 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 45 53 let labels = self.get_label(&feedgen).await; 54 + let viewer = self.get_feedgen_viewer_state(&feedgen).await; 46 55 let likes = self.loaders.like.load(feedgen.clone()).await; 47 56 let feedgen = self.loaders.feedgen.load(feedgen).await?; 48 57 let profile = self.hydrate_profile(feedgen.owner.clone()).await?; 49 58 50 - Some(build_feedgen(feedgen, profile, labels, likes, &self.cdn)) 59 + Some(build_feedgen( 60 + feedgen, profile, labels, likes, viewer, &self.cdn, 61 + )) 51 62 } 52 63 53 64 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 54 65 let labels = self.get_label_many(&feedgens).await; 66 + let viewers = self.get_feedgen_viewer_states(&feedgens).await; 55 67 let mut likes = self.loaders.like.load_many(feedgens.clone()).await; 56 68 let feedgens = self.loaders.feedgen.load_many(feedgens).await; 57 69 ··· 66 78 .into_iter() 67 79 .filter_map(|(uri, feedgen)| { 68 80 let creator = creators.get(&feedgen.owner).cloned()?; 81 + let viewer = viewers.get(&uri).cloned(); 69 82 let labels = labels.get(&uri).cloned().unwrap_or_default(); 70 83 let likes = likes.remove(&uri); 71 84 72 85 Some(( 73 86 uri, 74 - build_feedgen(feedgen, creator, labels, likes, &self.cdn), 87 + build_feedgen(feedgen, creator, labels, likes, viewer, &self.cdn), 75 88 )) 76 89 }) 77 90 .collect() 91 + } 92 + 93 + async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 94 + if let Some(viewer) = &self.current_actor { 95 + let data = self.loaders.like_state.get(viewer, subject).await?; 96 + 97 + Some(build_viewer(data)) 98 + } else { 99 + None 100 + } 101 + } 102 + 103 + async fn get_feedgen_viewer_states( 104 + &self, 105 + subjects: &[String], 106 + ) -> HashMap<String, GeneratorViewerState> { 107 + if let Some(viewer) = &self.current_actor { 108 + let data = self.loaders.like_state.get_many(viewer, subjects).await; 109 + 110 + data.into_iter() 111 + .map(|(k, state)| (k, build_viewer(state))) 112 + .collect() 113 + } else { 114 + HashMap::new() 115 + } 78 116 } 79 117 }
+54 -5
parakeet/src/hydration/labeler.rs
··· 1 1 use crate::hydration::{map_labels, StatefulHydrator}; 2 2 use lexica::app_bsky::actor::ProfileView; 3 - use lexica::app_bsky::labeler::{LabelerPolicy, LabelerView, LabelerViewDetailed}; 3 + use lexica::app_bsky::labeler::{ 4 + LabelerPolicy, LabelerView, LabelerViewDetailed, LabelerViewerState, 5 + }; 4 6 use lexica::com_atproto::label::{Blurs, LabelValueDefinition, Severity}; 5 7 use lexica::com_atproto::moderation::{ReasonType, SubjectType}; 6 8 use parakeet_db::models; 7 9 use std::collections::HashMap; 8 10 use std::str::FromStr; 9 11 12 + fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 13 + LabelerViewerState { 14 + like: Some(format!("at://{did}/app.bsky.feed.like/{rkey}")), 15 + } 16 + } 17 + 10 18 fn build_view( 11 19 labeler: models::LabelerService, 12 20 creator: ProfileView, 13 21 labels: Vec<models::Label>, 22 + viewer: Option<LabelerViewerState>, 14 23 likes: Option<i32>, 15 24 ) -> LabelerView { 16 25 LabelerView { ··· 18 27 cid: labeler.cid, 19 28 creator, 20 29 like_count: likes.unwrap_or_default() as i64, 30 + viewer, 21 31 labels: map_labels(labels), 22 32 indexed_at: labeler.indexed_at.and_utc(), 23 33 } ··· 28 38 defs: Vec<models::LabelDefinition>, 29 39 creator: ProfileView, 30 40 labels: Vec<models::Label>, 41 + viewer: Option<LabelerViewerState>, 31 42 likes: Option<i32>, 32 43 ) -> LabelerViewDetailed { 33 44 let reason_types = labeler.reasons.map(|v| { ··· 77 88 cid: labeler.cid, 78 89 creator, 79 90 like_count: likes.unwrap_or_default() as i64, 91 + viewer, 80 92 policies: LabelerPolicy { 81 93 label_values, 82 94 label_value_definitions, ··· 92 104 impl StatefulHydrator<'_> { 93 105 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 94 106 let labels = self.get_label(&labeler).await; 107 + let viewer = self.get_labeler_viewer_state(&labeler).await; 95 108 let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 96 109 let (labeler, _) = self.loaders.labeler.load(labeler).await?; 97 110 let creator = self.hydrate_profile(labeler.did.clone()).await?; 98 111 99 - Some(build_view(labeler, creator, labels, likes)) 112 + Some(build_view(labeler, creator, labels, viewer, likes)) 100 113 } 101 114 102 115 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { ··· 107 120 .values() 108 121 .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 109 122 .unzip::<_, _, Vec<_>, Vec<_>>(); 123 + let viewers = self.get_labeler_viewer_states(&uris).await; 110 124 let creators = self.hydrate_profiles(creators).await; 111 125 let mut likes = self.loaders.like.load_many(uris.clone()).await; 112 126 ··· 116 130 let creator = creators.get(&labeler.did).cloned()?; 117 131 let labels = labels.get(&k).cloned().unwrap_or_default(); 118 132 let likes = likes.remove(&make_labeler_uri(&labeler.did)); 133 + let viewer = viewers.get(&make_labeler_uri(&k)).cloned(); 119 134 120 - Some((k, build_view(labeler, creator, labels, likes))) 135 + Some((k, build_view(labeler, creator, labels, viewer, likes))) 121 136 }) 122 137 .collect() 123 138 } 124 139 125 140 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 126 141 let labels = self.get_label(&labeler).await; 142 + let viewer = self.get_labeler_viewer_state(&labeler).await; 127 143 let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 128 144 let (labeler, defs) = self.loaders.labeler.load(labeler).await?; 129 145 let creator = self.hydrate_profile(labeler.did.clone()).await?; 130 146 131 - Some(build_view_detailed(labeler, defs, creator, labels, likes)) 147 + Some(build_view_detailed( 148 + labeler, defs, creator, labels, viewer, likes, 149 + )) 132 150 } 133 151 134 152 pub async fn hydrate_labelers_detailed( ··· 142 160 .values() 143 161 .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 144 162 .unzip::<_, _, Vec<_>, Vec<_>>(); 163 + let viewers = self.get_labeler_viewer_states(&uris).await; 145 164 let creators = self.hydrate_profiles(creators).await; 146 165 let mut likes = self.loaders.like.load_many(uris.clone()).await; 147 166 ··· 151 170 let creator = creators.get(&labeler.did).cloned()?; 152 171 let labels = labels.get(&k).cloned().unwrap_or_default(); 153 172 let likes = likes.remove(&make_labeler_uri(&labeler.did)); 173 + let viewer = viewers.get(&make_labeler_uri(&k)).cloned(); 154 174 155 - let view = build_view_detailed(labeler, defs, creator, labels, likes); 175 + let view = build_view_detailed(labeler, defs, creator, labels, viewer, likes); 156 176 157 177 Some((k, view)) 158 178 }) 159 179 .collect() 180 + } 181 + 182 + async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 183 + if let Some(viewer) = &self.current_actor { 184 + let data = self 185 + .loaders 186 + .like_state 187 + .get(&make_labeler_uri(viewer), subject) 188 + .await?; 189 + 190 + Some(build_viewer(data)) 191 + } else { 192 + None 193 + } 194 + } 195 + 196 + async fn get_labeler_viewer_states( 197 + &self, 198 + subjects: &[String], 199 + ) -> HashMap<String, LabelerViewerState> { 200 + if let Some(viewer) = &self.current_actor { 201 + let data = self.loaders.like_state.get_many(viewer, subjects).await; 202 + 203 + data.into_iter() 204 + .map(|(k, state)| (k, build_viewer(state))) 205 + .collect() 206 + } else { 207 + HashMap::new() 208 + } 160 209 } 161 210 } 162 211
+49 -5
parakeet/src/hydration/list.rs
··· 1 + use crate::db::ListStateRet; 1 2 use crate::hydration::{map_labels, StatefulHydrator}; 2 3 use crate::xrpc::cdn::BskyCdn; 3 4 use lexica::app_bsky::actor::ProfileView; 4 - use lexica::app_bsky::graph::{ListPurpose, ListView, ListViewBasic}; 5 + use lexica::app_bsky::graph::{ListPurpose, ListView, ListViewBasic, ListViewerState}; 5 6 use parakeet_db::models; 6 7 use std::collections::HashMap; 7 8 use std::str::FromStr; 8 9 10 + fn build_viewer(data: ListStateRet) -> ListViewerState { 11 + ListViewerState { 12 + muted: data.muted, 13 + blocked: data.block, 14 + } 15 + } 16 + 9 17 fn build_basic( 10 18 list: models::List, 11 19 list_item_count: i64, 12 20 labels: Vec<models::Label>, 21 + viewer: Option<ListViewerState>, 13 22 cdn: &BskyCdn, 14 23 ) -> Option<ListViewBasic> { 15 24 let purpose = ListPurpose::from_str(&list.list_type).ok()?; ··· 22 31 purpose, 23 32 avatar, 24 33 list_item_count, 34 + viewer, 25 35 labels: map_labels(labels), 26 36 indexed_at: list.created_at, 27 37 }) ··· 32 42 list_item_count: i64, 33 43 creator: ProfileView, 34 44 labels: Vec<models::Label>, 45 + viewer: Option<ListViewerState>, 35 46 cdn: &BskyCdn, 36 47 ) -> Option<ListView> { 37 48 let purpose = ListPurpose::from_str(&list.list_type).ok()?; ··· 51 62 description_facets, 52 63 avatar, 53 64 list_item_count, 65 + viewer, 54 66 labels: map_labels(labels), 55 67 indexed_at: list.created_at, 56 68 }) ··· 59 71 impl StatefulHydrator<'_> { 60 72 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 61 73 let labels = self.get_label(&list).await; 74 + let viewer = self.get_list_viewer_state(&list).await; 62 75 let (list, count) = self.loaders.list.load(list).await?; 63 76 64 - build_basic(list, count, labels, &self.cdn) 77 + build_basic(list, count, labels, viewer, &self.cdn) 65 78 } 66 79 67 80 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 68 81 let labels = self.get_label_many(&lists).await; 82 + let viewers = self.get_list_viewer_states(&lists).await; 69 83 let lists = self.loaders.list.load_many(lists).await; 70 84 71 85 lists 72 86 .into_iter() 73 87 .filter_map(|(uri, (list, count))| { 74 88 let labels = labels.get(&uri).cloned().unwrap_or_default(); 89 + let viewer = viewers.get(&uri).cloned(); 75 90 76 - build_basic(list, count, labels, &self.cdn).map(|v| (uri, v)) 91 + build_basic(list, count, labels, viewer, &self.cdn).map(|v| (uri, v)) 77 92 }) 78 93 .collect() 79 94 } 80 95 81 96 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 82 97 let labels = self.get_label(&list).await; 98 + let viewer = self.get_list_viewer_state(&list).await; 83 99 let (list, count) = self.loaders.list.load(list).await?; 84 100 let profile = self.hydrate_profile(list.owner.clone()).await?; 85 101 86 - build_listview(list, count, profile, labels, &self.cdn) 102 + build_listview(list, count, profile, labels, viewer, &self.cdn) 87 103 } 88 104 89 105 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 90 106 let labels = self.get_label_many(&lists).await; 107 + let viewers = self.get_list_viewer_states(&lists).await; 91 108 let lists = self.loaders.list.load_many(lists).await; 92 109 93 110 let creators = lists.values().map(|(list, _)| list.owner.clone()).collect(); ··· 97 114 .into_iter() 98 115 .filter_map(|(uri, (list, count))| { 99 116 let creator = creators.get(&list.owner)?; 117 + let viewer = viewers.get(&uri).cloned(); 100 118 let labels = labels.get(&uri).cloned().unwrap_or_default(); 101 119 102 - build_listview(list, count, creator.to_owned(), labels, &self.cdn).map(|v| (uri, v)) 120 + build_listview(list, count, creator.to_owned(), labels, viewer, &self.cdn) 121 + .map(|v| (uri, v)) 103 122 }) 104 123 .collect() 124 + } 125 + 126 + async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 127 + if let Some(viewer) = &self.current_actor { 128 + let data = self.loaders.list_state.get(viewer, subject).await?; 129 + 130 + Some(build_viewer(data)) 131 + } else { 132 + None 133 + } 134 + } 135 + 136 + async fn get_list_viewer_states( 137 + &self, 138 + subjects: &[String], 139 + ) -> HashMap<String, ListViewerState> { 140 + if let Some(viewer) = &self.current_actor { 141 + let data = self.loaders.list_state.get_many(viewer, subjects).await; 142 + 143 + data.into_iter() 144 + .map(|(k, state)| (k, build_viewer(state))) 145 + .collect() 146 + } else { 147 + HashMap::new() 148 + } 105 149 } 106 150 }
+85 -7
parakeet/src/hydration/posts.rs
··· 1 + use crate::db::PostStateRet; 1 2 use crate::hydration::{map_labels, StatefulHydrator}; 2 3 use lexica::app_bsky::actor::ProfileViewBasic; 3 4 use lexica::app_bsky::embed::Embed; 4 - use lexica::app_bsky::feed::{FeedViewPost, PostView, ReplyRef, ReplyRefPost, ThreadgateView}; 5 + use lexica::app_bsky::feed::{ 6 + BlockedAuthor, FeedViewPost, PostView, PostViewerState, ReplyRef, ReplyRefPost, ThreadgateView, 7 + }; 5 8 use lexica::app_bsky::graph::ListViewBasic; 6 9 use lexica::app_bsky::RecordStats; 7 10 use parakeet_db::models; 8 11 use parakeet_index::PostStats; 9 12 use std::collections::HashMap; 10 13 14 + fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 15 + let is_me = did == data.did; 16 + 17 + let repost = data 18 + .repost_rkey 19 + .map(|rkey| format!("at://{did}/app.bsky.feed.repost/{rkey}")); 20 + let like = data 21 + .like_rkey 22 + .map(|rkey| format!("at://{did}/app.bsky.feed.like/{rkey}")); 23 + 24 + PostViewerState { 25 + repost, 26 + like, 27 + bookmarked: data.bookmarked, 28 + thread_muted: false, // todo when we have thread mutes 29 + reply_disabled: false, 30 + embedding_disabled: data.embed_disabled && !is_me, // poster can always bypass embed disabled. 31 + pinned: data.pinned, 32 + } 33 + } 34 + 11 35 fn build_postview( 12 36 post: models::Post, 13 37 author: ProfileViewBasic, 14 38 labels: Vec<models::Label>, 15 39 embed: Option<Embed>, 16 40 threadgate: Option<ThreadgateView>, 41 + viewer: Option<PostViewerState>, 17 42 stats: Option<PostStats>, 18 43 ) -> PostView { 19 44 let stats = stats ··· 33 58 embed, 34 59 stats, 35 60 labels: map_labels(labels), 61 + viewer, 36 62 threadgate, 37 63 indexed_at: post.created_at, 38 64 } ··· 101 127 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 102 128 let stats = self.loaders.post_stats.load(post.clone()).await; 103 129 let (post, threadgate) = self.loaders.posts.load(post).await?; 130 + let viewer = self.get_post_viewer_state(&post.at_uri).await; 104 131 let embed = self.hydrate_embed(post.at_uri.clone()).await; 105 132 let author = self.hydrate_profile_basic(post.did.clone()).await?; 106 133 let threadgate = self.hydrate_threadgate(threadgate).await; 107 134 let labels = self.get_label(&post.at_uri).await; 108 135 109 136 Some(build_postview( 110 - post, author, labels, embed, threadgate, stats, 137 + post, author, labels, embed, threadgate, viewer, stats, 111 138 )) 112 139 } 113 140 ··· 122 149 let authors = self.hydrate_profiles_basic(authors).await; 123 150 124 151 let post_labels = self.get_label_many(&post_uris).await; 152 + let viewer_data = self.get_post_viewer_states(&post_uris).await; 125 153 126 154 let threadgates = posts 127 155 .values() ··· 139 167 let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned()); 140 168 let labels = post_labels.get(&uri).cloned().unwrap_or_default(); 141 169 let stats = stats.get(&uri).cloned(); 170 + let viewer = viewer_data.get(&uri).cloned(); 142 171 143 172 Some(( 144 173 uri, 145 - build_postview(post, author.to_owned(), labels, embed, threadgate, stats), 174 + build_postview( 175 + post, 176 + author.to_owned(), 177 + labels, 178 + embed, 179 + threadgate, 180 + viewer, 181 + stats, 182 + ), 146 183 )) 147 184 }) 148 185 .collect() ··· 159 196 let authors = self.hydrate_profiles_basic(authors).await; 160 197 161 198 let post_labels = self.get_label_many(&post_uris).await; 162 - 199 + let viewer_data = self.get_post_viewer_states(&post_uris).await; 163 200 let embeds = self.hydrate_embeds(post_uris).await; 164 201 165 202 let reply_refs = posts ··· 183 220 184 221 let reply = if post.parent_uri.is_some() && post.root_uri.is_some() { 185 222 Some(ReplyRef { 186 - root: root.cloned().map(ReplyRefPost::Post).unwrap_or( 223 + root: root.cloned().map(postview_to_replyref).unwrap_or( 187 224 ReplyRefPost::NotFound { 188 225 uri: post.root_uri.as_ref().unwrap().clone(), 189 226 not_found: true, 190 227 }, 191 228 ), 192 - parent: parent.cloned().map(ReplyRefPost::Post).unwrap_or( 229 + parent: parent.cloned().map(postview_to_replyref).unwrap_or( 193 230 ReplyRefPost::NotFound { 194 231 uri: post.parent_uri.as_ref().unwrap().clone(), 195 232 not_found: true, ··· 204 241 let embed = embeds.get(&post_uri).cloned(); 205 242 let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); 206 243 let stats = stats.get(&post_uri).cloned(); 207 - let post = build_postview(post, author.to_owned(), labels, embed, None, stats); 244 + let viewer = viewer_data.get(&post_uri).cloned(); 245 + let post = 246 + build_postview(post, author.to_owned(), labels, embed, None, viewer, stats); 208 247 209 248 Some(( 210 249 post_uri, ··· 217 256 )) 218 257 }) 219 258 .collect() 259 + } 260 + 261 + async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 262 + if let Some(viewer) = &self.current_actor { 263 + let data = self.loaders.post_state.get(viewer, subject).await?; 264 + 265 + Some(build_viewer(viewer, data)) 266 + } else { 267 + None 268 + } 269 + } 270 + 271 + async fn get_post_viewer_states( 272 + &self, 273 + subjects: &[String], 274 + ) -> HashMap<String, PostViewerState> { 275 + if let Some(viewer) = &self.current_actor { 276 + let data = self.loaders.post_state.get_many(viewer, subjects).await; 277 + 278 + data.into_iter() 279 + .map(|(k, state)| (k, build_viewer(viewer, state))) 280 + .collect() 281 + } else { 282 + HashMap::new() 283 + } 284 + } 285 + } 286 + 287 + fn postview_to_replyref(post: PostView) -> ReplyRefPost { 288 + match &post.author.viewer { 289 + Some(v) if v.blocked_by || v.blocking.is_some() => ReplyRefPost::Blocked { 290 + uri: post.uri, 291 + blocked: true, 292 + author: BlockedAuthor { 293 + did: post.author.did.clone(), 294 + viewer: post.author.viewer, 295 + }, 296 + }, 297 + _ => ReplyRefPost::Post(post), 220 298 } 221 299 }
+111 -5
parakeet/src/hydration/profile.rs
··· 1 + use crate::db::ProfileStateRet; 1 2 use crate::hydration::map_labels; 2 3 use crate::loaders::ProfileLoaderRet; 3 4 use crate::xrpc::cdn::BskyCdn; ··· 5 6 use chrono::TimeDelta; 6 7 use lexica::app_bsky::actor::*; 7 8 use lexica::app_bsky::embed::External; 9 + use lexica::app_bsky::graph::ListViewBasic; 8 10 use parakeet_db::models; 9 11 use parakeet_index::ProfileStats; 10 12 use std::collections::HashMap; ··· 34 36 }) 35 37 } else { 36 38 None 39 + } 40 + } 41 + 42 + fn build_viewer( 43 + data: ProfileStateRet, 44 + list_mute: Option<ListViewBasic>, 45 + list_block: Option<ListViewBasic>, 46 + ) -> ProfileViewerState { 47 + let following = data 48 + .following 49 + .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.did)); 50 + let followed_by = data 51 + .followed 52 + .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject)); 53 + 54 + let blocking = data.list_block.or(data.blocking); 55 + 56 + ProfileViewerState { 57 + muted: data.muting.unwrap_or_default(), 58 + muted_by_list: list_mute, 59 + blocked_by: data.blocked.unwrap_or_default(), // TODO: this doesn't factor for blocklists atm 60 + blocking, 61 + blocking_by_list: list_block, 62 + following, 63 + followed_by, 37 64 } 38 65 } 39 66 ··· 156 183 stats: Option<ProfileStats>, 157 184 labels: Vec<models::Label>, 158 185 verifications: Option<Vec<models::VerificationEntry>>, 186 + viewer: Option<ProfileViewerState>, 159 187 cdn: &BskyCdn, 160 188 ) -> ProfileViewBasic { 161 189 let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); ··· 169 197 display_name: profile.display_name, 170 198 avatar, 171 199 associated, 200 + viewer, 172 201 labels: map_labels(labels), 173 202 verification, 174 203 status, ··· 181 210 stats: Option<ProfileStats>, 182 211 labels: Vec<models::Label>, 183 212 verifications: Option<Vec<models::VerificationEntry>>, 213 + viewer: Option<ProfileViewerState>, 184 214 cdn: &BskyCdn, 185 215 ) -> ProfileView { 186 216 let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); ··· 195 225 description: profile.description, 196 226 avatar, 197 227 associated, 228 + viewer, 198 229 labels: map_labels(labels), 199 230 verification, 200 231 status, ··· 208 239 stats: Option<ProfileStats>, 209 240 labels: Vec<models::Label>, 210 241 verifications: Option<Vec<models::VerificationEntry>>, 242 + viewer: Option<ProfileViewerState>, 211 243 cdn: &BskyCdn, 212 244 ) -> ProfileViewDetailed { 213 245 let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); ··· 226 258 followers_count: stats.map(|v| v.followers as i64).unwrap_or_default(), 227 259 follows_count: stats.map(|v| v.following as i64).unwrap_or_default(), 228 260 associated, 261 + viewer, 229 262 labels: map_labels(labels), 230 263 verification, 231 264 status, ··· 239 272 impl super::StatefulHydrator<'_> { 240 273 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 241 274 let labels = self.get_profile_label(&did).await; 275 + let viewer = self.get_profile_viewer_state(&did).await; 242 276 let verif = self.loaders.verification.load(did.clone()).await; 243 277 let stats = self.loaders.profile_stats.load(did.clone()).await; 244 278 let profile_info = self.loaders.profile.load(did).await?; 245 279 246 - Some(build_basic(profile_info, stats, labels, verif, &self.cdn)) 280 + Some(build_basic( 281 + profile_info, 282 + stats, 283 + labels, 284 + verif, 285 + viewer, 286 + &self.cdn, 287 + )) 247 288 } 248 289 249 290 pub async fn hydrate_profiles_basic( ··· 251 292 dids: Vec<String>, 252 293 ) -> HashMap<String, ProfileViewBasic> { 253 294 let labels = self.get_profile_label_many(&dids).await; 295 + let viewers = self.get_profile_viewer_states(&dids).await; 254 296 let verif = self.loaders.verification.load_many(dids.clone()).await; 255 297 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 256 298 let profiles = self.loaders.profile.load_many(dids).await; ··· 260 302 .map(|(k, profile_info)| { 261 303 let labels = labels.get(&k).cloned().unwrap_or_default(); 262 304 let verif = verif.get(&k).cloned(); 305 + let viewer = viewers.get(&k).cloned(); 263 306 let stats = stats.get(&k).cloned(); 264 307 265 - let v = build_basic(profile_info, stats, labels, verif, &self.cdn); 308 + let v = build_basic(profile_info, stats, labels, verif, viewer, &self.cdn); 266 309 (k, v) 267 310 }) 268 311 .collect() ··· 270 313 271 314 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 272 315 let labels = self.get_profile_label(&did).await; 316 + let viewer = self.get_profile_viewer_state(&did).await; 273 317 let verif = self.loaders.verification.load(did.clone()).await; 274 318 let stats = self.loaders.profile_stats.load(did.clone()).await; 275 319 let profile_info = self.loaders.profile.load(did).await?; 276 320 277 - Some(build_profile(profile_info, stats, labels, verif, &self.cdn)) 321 + Some(build_profile( 322 + profile_info, 323 + stats, 324 + labels, 325 + verif, 326 + viewer, 327 + &self.cdn, 328 + )) 278 329 } 279 330 280 331 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 281 332 let labels = self.get_profile_label_many(&dids).await; 333 + let viewers = self.get_profile_viewer_states(&dids).await; 282 334 let verif = self.loaders.verification.load_many(dids.clone()).await; 283 335 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 284 336 let profiles = self.loaders.profile.load_many(dids).await; ··· 288 340 .map(|(k, profile_info)| { 289 341 let labels = labels.get(&k).cloned().unwrap_or_default(); 290 342 let verif = verif.get(&k).cloned(); 343 + let viewer = viewers.get(&k).cloned(); 291 344 let stats = stats.get(&k).cloned(); 292 345 293 - let v = build_profile(profile_info, stats, labels, verif, &self.cdn); 346 + let v = build_profile(profile_info, stats, labels, verif, viewer, &self.cdn); 294 347 (k, v) 295 348 }) 296 349 .collect() ··· 298 351 299 352 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 300 353 let labels = self.get_profile_label(&did).await; 354 + let viewer = self.get_profile_viewer_state(&did).await; 301 355 let verif = self.loaders.verification.load(did.clone()).await; 302 356 let stats = self.loaders.profile_stats.load(did.clone()).await; 303 357 let profile_info = self.loaders.profile.load(did).await?; ··· 307 361 stats, 308 362 labels, 309 363 verif, 364 + viewer, 310 365 &self.cdn, 311 366 )) 312 367 } ··· 316 371 dids: Vec<String>, 317 372 ) -> HashMap<String, ProfileViewDetailed> { 318 373 let labels = self.get_profile_label_many(&dids).await; 374 + let viewers = self.get_profile_viewer_states(&dids).await; 319 375 let verif = self.loaders.verification.load_many(dids.clone()).await; 320 376 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 321 377 let profiles = self.loaders.profile.load_many(dids).await; ··· 325 381 .map(|(k, profile_info)| { 326 382 let labels = labels.get(&k).cloned().unwrap_or_default(); 327 383 let verif = verif.get(&k).cloned(); 384 + let viewer = viewers.get(&k).cloned(); 328 385 let stats = stats.get(&k).cloned(); 329 386 330 - let v = build_detailed(profile_info, stats, labels, verif, &self.cdn); 387 + let v = build_detailed(profile_info, stats, labels, verif, viewer, &self.cdn); 331 388 (k, v) 332 389 }) 333 390 .collect() 391 + } 392 + 393 + async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 394 + if let Some(viewer) = &self.current_actor { 395 + let data = self.loaders.profile_state.get(viewer, subject).await?; 396 + 397 + let list_block = match &data.list_block { 398 + Some(uri) => self.hydrate_list_basic(uri.clone()).await, 399 + None => None, 400 + }; 401 + let list_mute = match &data.list_mute { 402 + Some(uri) => self.hydrate_list_basic(uri.clone()).await, 403 + None => None, 404 + }; 405 + 406 + Some(build_viewer(data, list_mute, list_block)) 407 + } else { 408 + None 409 + } 410 + } 411 + 412 + async fn get_profile_viewer_states( 413 + &self, 414 + dids: &[String], 415 + ) -> HashMap<String, ProfileViewerState> { 416 + if let Some(viewer) = &self.current_actor { 417 + let data = self.loaders.profile_state.get_many(viewer, dids).await; 418 + let lists = data 419 + .values() 420 + .flat_map(|v| [&v.list_block, &v.list_mute]) 421 + .flatten() 422 + .cloned() 423 + .collect(); 424 + let lists = self.hydrate_lists_basic(lists).await; 425 + 426 + data.into_iter() 427 + .map(|(k, state)| { 428 + let list_mute = state.list_mute.as_ref().and_then(|v| lists.get(v).cloned()); 429 + let list_block = state 430 + .list_block 431 + .as_ref() 432 + .and_then(|v| lists.get(v).cloned()); 433 + 434 + (k, build_viewer(state, list_mute, list_block)) 435 + }) 436 + .collect() 437 + } else { 438 + HashMap::new() 439 + } 334 440 } 335 441 }
+131
parakeet/src/loaders.rs
··· 1 1 use crate::cache::PrefixedLoaderCache; 2 + use crate::db; 2 3 use crate::xrpc::extract::LabelConfigItem; 3 4 use dataloader::async_cached::Loader; 4 5 use dataloader::non_cached::Loader as NonCachedLoader; ··· 39 40 pub label: LabelLoader, 40 41 pub labeler: CachingLoader<String, LabelServiceLoaderRet, LabelServiceLoader>, 41 42 pub list: CachingLoader<String, ListLoaderRet, ListLoader>, 43 + pub list_state: ListStateLoader, 42 44 pub like: NonCachedLoader<String, i32, LikeLoader>, 45 + pub like_state: LikeRecordLoader, 43 46 pub posts: CachingLoader<String, PostLoaderRet, PostLoader>, 44 47 pub post_stats: NonCachedLoader<String, parakeet_index::PostStats, PostStatsLoader>, 48 + pub post_state: PostStateLoader, 45 49 pub profile: CachingLoader<String, ProfileLoaderRet, ProfileLoader>, 46 50 pub profile_stats: NonCachedLoader<String, parakeet_index::ProfileStats, ProfileStatsLoader>, 51 + pub profile_state: ProfileStateLoader, 47 52 pub starterpacks: CachingLoader<String, StarterPackLoaderRet, StarterPackLoader>, 48 53 pub verification: CachingLoader<String, Vec<models::VerificationEntry>, VerificationLoader>, 49 54 } ··· 62 67 label: LabelLoader(pool.clone()), // CARE: never cache this. 63 68 labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 64 69 like: NonCachedLoader::new(LikeLoader(idxc.clone())), 70 + like_state: LikeRecordLoader(pool.clone()), 65 71 list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), 72 + list_state: ListStateLoader(pool.clone()), 66 73 posts: new_plc_loader(PostLoader(pool.clone()), &rc, "post", 3600), 67 74 post_stats: NonCachedLoader::new(PostStatsLoader(idxc.clone())), 75 + post_state: PostStateLoader(pool.clone()), 68 76 profile: new_plc_loader(ProfileLoader(pool.clone()), &rc, "profile", 3600), 69 77 profile_stats: NonCachedLoader::new(ProfileStatsLoader(idxc.clone())), 78 + profile_state: ProfileStateLoader(pool.clone()), 70 79 starterpacks: new_plc_loader(StarterPackLoader(pool.clone()), &rc, "starterpacks", 600), 71 80 verification: new_plc_loader(VerificationLoader(pool.clone()), &rc, "verification", 60), 72 81 } ··· 95 104 } 96 105 } 97 106 107 + pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 108 + impl LikeRecordLoader { 109 + pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 110 + let mut conn = self.0.get().await.unwrap(); 111 + 112 + db::get_like_state(&mut conn, did, subject) 113 + .await 114 + .unwrap_or_else(|e| { 115 + tracing::error!("like state load failed: {e}"); 116 + None 117 + }) 118 + } 119 + 120 + pub async fn get_many( 121 + &self, 122 + did: &str, 123 + subjects: &[String], 124 + ) -> HashMap<String, (String, String)> { 125 + let mut conn = self.0.get().await.unwrap(); 126 + 127 + match db::get_like_states(&mut conn, did, subjects).await { 128 + Ok(res) => { 129 + HashMap::from_iter(res.into_iter().map(|(sub, did, rkey)| (sub, (did, rkey)))) 130 + } 131 + Err(e) => { 132 + tracing::error!("like state load failed: {e}"); 133 + HashMap::new() 134 + } 135 + } 136 + } 137 + } 138 + 98 139 pub struct HandleLoader(Pool<AsyncPgConnection>); 99 140 impl BatchFn<String, String> for HandleLoader { 100 141 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { ··· 204 245 } 205 246 } 206 247 248 + pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 249 + impl ProfileStateLoader { 250 + pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 251 + let mut conn = self.0.get().await.unwrap(); 252 + 253 + db::get_profile_state(&mut conn, did, subject) 254 + .await 255 + .unwrap_or_else(|e| { 256 + tracing::error!("profile state load failed: {e}"); 257 + None 258 + }) 259 + } 260 + 261 + pub async fn get_many( 262 + &self, 263 + did: &str, 264 + subjects: &[String], 265 + ) -> HashMap<String, db::ProfileStateRet> { 266 + let mut conn = self.0.get().await.unwrap(); 267 + 268 + match db::get_profile_states(&mut conn, did, subjects).await { 269 + Ok(res) => HashMap::from_iter(res.into_iter().map(|v| (v.subject.clone(), v))), 270 + Err(e) => { 271 + tracing::error!("profile state load failed: {e}"); 272 + HashMap::new() 273 + } 274 + } 275 + } 276 + } 277 + 207 278 pub struct ListLoader(Pool<AsyncPgConnection>); 208 279 type ListLoaderRet = (models::List, i64); 209 280 impl BatchFn<String, ListLoaderRet> for ListLoader { ··· 236 307 } 237 308 } 238 309 310 + pub struct ListStateLoader(Pool<AsyncPgConnection>); 311 + impl ListStateLoader { 312 + pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 313 + let mut conn = self.0.get().await.unwrap(); 314 + 315 + db::get_list_state(&mut conn, did, subject) 316 + .await 317 + .unwrap_or_else(|e| { 318 + tracing::error!("list state load failed: {e}"); 319 + None 320 + }) 321 + } 322 + 323 + pub async fn get_many( 324 + &self, 325 + did: &str, 326 + subjects: &[String], 327 + ) -> HashMap<String, db::ListStateRet> { 328 + let mut conn = self.0.get().await.unwrap(); 329 + 330 + match db::get_list_states(&mut conn, did, subjects).await { 331 + Ok(res) => HashMap::from_iter(res.into_iter().map(|v| (v.at_uri.clone(), v))), 332 + Err(e) => { 333 + tracing::error!("list state load failed: {e}"); 334 + HashMap::new() 335 + } 336 + } 337 + } 338 + } 339 + 239 340 pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 240 341 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 241 342 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { ··· 302 403 .unwrap() 303 404 .into_inner() 304 405 .entries 406 + } 407 + } 408 + 409 + pub struct PostStateLoader(Pool<AsyncPgConnection>); 410 + impl PostStateLoader { 411 + pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 412 + let mut conn = self.0.get().await.unwrap(); 413 + 414 + db::get_post_state(&mut conn, did, subject) 415 + .await 416 + .unwrap_or_else(|e| { 417 + tracing::error!("post state load failed: {e}"); 418 + None 419 + }) 420 + } 421 + 422 + pub async fn get_many( 423 + &self, 424 + did: &str, 425 + subjects: &[String], 426 + ) -> HashMap<String, db::PostStateRet> { 427 + let mut conn = self.0.get().await.unwrap(); 428 + 429 + match db::get_post_states(&mut conn, did, subjects).await { 430 + Ok(res) => HashMap::from_iter(res.into_iter().map(|v| (v.at_uri.clone(), v))), 431 + Err(e) => { 432 + tracing::error!("post state load failed: {e}"); 433 + HashMap::new() 434 + } 435 + } 305 436 } 306 437 } 307 438
+5
parakeet/src/sql/list_states.sql
··· 1 + select l.at_uri, lb.at_uri as block, lm.did is not null as muted 2 + from lists l 3 + left join list_blocks lb on l.at_uri = lb.list_uri and lb.did = $1 4 + left join list_mutes lm on l.at_uri = lm.list_uri and lm.did = $1 5 + where l.at_uri = any ($2) and (lm.did is not null or lb.at_uri is not null)
+16
parakeet/src/sql/post_state.sql
··· 1 + select bq.*, coalesce(bq.at_uri = pinned_uri, false) as pinned 2 + from (select p.at_uri, 3 + p.did, 4 + p.cid, 5 + l.rkey as like_rkey, 6 + r.rkey as repost_rkey, 7 + b.did is not null as bookmarked, 8 + coalesce(pg.rules && ARRAY ['app.bsky.feed.postgate#disableRule'], false) as embed_disabled 9 + from posts p 10 + left join likes l on l.subject = p.at_uri and l.did = $1 11 + left join reposts r on r.post = p.at_uri and r.did = $1 12 + left join bookmarks b on b.subject = p.at_uri and b.did = $1 13 + left join postgates pg on pg.post_uri = p.at_uri 14 + where p.at_uri = any ($2) 15 + and (l.rkey is not null or r.rkey is not null or b.did is not null or pg.rules is not null)) bq, 16 + (select pinned_uri, pinned_cid from profiles where did = $1) pp;
+20
parakeet/src/sql/profile_state.sql
··· 1 + with vlb as (select * from v_list_block_exp where did = $1 and subject = any ($2)), 2 + vlm as (select * from v_list_mutes_exp where did = $1 and subject = any ($2)), 3 + ps as (select * from profile_states where did = $1 and subject = any ($2)), 4 + vlb2 as (select subject as did, did as subject, list_uri is not null as blocked 5 + from v_list_block_exp 6 + where did = any ($2) 7 + and subject = $1) 8 + select distinct on (did, subject) did, 9 + subject, 10 + muting, 11 + ps.blocked or vlb2.blocked as blocked, 12 + blocking, 13 + following, 14 + followed, 15 + vlb.list_uri as list_block, 16 + vlm.list_uri as list_mute 17 + from ps 18 + full join vlb using (did, subject) 19 + full join vlm using (did, subject) 20 + full join vlb2 using (did, subject);
+21 -7
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 8 8 use diesel::prelude::*; 9 9 use diesel_async::RunQueryDsl; 10 10 use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use lexica::app_bsky::feed::{BlockedAuthor, PostView}; 11 12 use lexica::StrongRef; 12 13 use parakeet_db::{models, schema}; 13 14 use serde::{Deserialize, Serialize}; ··· 125 126 // otherwise just ditch. we should have one. 126 127 let cid = bookmark.subject_cid.or(maybe_cid)?; 127 128 128 - let item = 129 - maybe_item 130 - .map(BookmarkViewItem::Post) 131 - .unwrap_or(BookmarkViewItem::NotFound { 132 - uri: bookmark.subject.clone(), 133 - not_found: true, 134 - }); 129 + let item = maybe_item 130 + .map(postview_to_bvi) 131 + .unwrap_or(BookmarkViewItem::NotFound { 132 + uri: bookmark.subject.clone(), 133 + not_found: true, 134 + }); 135 135 136 136 let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 137 137 ··· 145 145 146 146 Ok(Json(GetBookmarksRes { cursor, bookmarks })) 147 147 } 148 + 149 + fn postview_to_bvi(post: PostView) -> BookmarkViewItem { 150 + match &post.author.viewer { 151 + Some(v) if v.blocked_by || v.blocking.is_some() => BookmarkViewItem::Blocked { 152 + uri: post.uri, 153 + blocked: true, 154 + author: BlockedAuthor { 155 + did: post.author.did.clone(), 156 + viewer: post.author.viewer, 157 + }, 158 + }, 159 + _ => BookmarkViewItem::Post(post), 160 + } 161 + }
+63 -21
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 16 16 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 17 17 use lexica::app_bsky::actor::ProfileView; 18 18 use lexica::app_bsky::feed::{ 19 - FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, PostView, 20 - SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 19 + BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, 20 + PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 21 21 }; 22 22 use parakeet_db::schema; 23 23 use reqwest::Url; ··· 187 187 Query(query): Query<GetAuthorFeedQuery>, 188 188 ) -> XrpcResult<Json<FeedRes>> { 189 189 let mut conn = state.pool.get().await?; 190 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 191 190 192 191 let did = get_actor_did(&state.dataloaders, query.actor.clone()).await?; 193 192 194 193 check_actor_status(&mut conn, &did).await?; 195 194 195 + // check if we block the actor or if they block us 196 + if let Some(auth) = &maybe_auth { 197 + if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 198 + if psr.blocked.unwrap_or_default() { 199 + // they block us 200 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)) 201 + } else if psr.blocking.is_some() { 202 + // we block them 203 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)) 204 + } 205 + } 206 + } 207 + 208 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 209 + 196 210 let limit = query.limit.unwrap_or(50).clamp(1, 100); 197 211 198 212 let mut posts_query = schema::posts::table ··· 346 360 let uri = normalise_at_uri(&state.dataloaders, &query.uri).await?; 347 361 let depth = query.depth.unwrap_or(6).clamp(0, 1000); 348 362 let parent_height = query.parent_height.unwrap_or(80).clamp(0, 1000); 363 + 364 + let root = hyd 365 + .hydrate_post(uri.clone()) 366 + .await 367 + .ok_or(Error::not_found())?; 368 + let threadgate = root.threadgate.clone(); 369 + 370 + if let Some(viewer) = &root.author.viewer { 371 + if viewer.blocked_by || viewer.blocking.is_some() { 372 + return Ok(Json(GetPostThreadRes { 373 + thread: ThreadViewPostType::Blocked { 374 + uri, 375 + blocked: true, 376 + author: BlockedAuthor { 377 + did: root.author.did, 378 + viewer: root.author.viewer, 379 + }, 380 + }, 381 + threadgate, 382 + })); 383 + } 384 + } 349 385 350 386 let replies = diesel::sql_query(include_str!("../../../sql/thread.sql")) 351 387 .bind::<diesel::sql_types::Text, _>(&uri) ··· 362 398 let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect(); 363 399 let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect(); 364 400 365 - let root = hyd 366 - .hydrate_post(uri.clone()) 367 - .await 368 - .ok_or(Error::not_found())?; 369 401 let mut replies_hydrated = hyd.hydrate_posts(reply_uris).await; 370 402 let mut parents_hydrated = hyd.hydrate_posts(parent_uris).await; 371 403 ··· 381 413 continue; 382 414 }; 383 415 384 - entry.push(ThreadViewPostType::Post(Box::new(ThreadViewPost { 385 - post, 386 - parent: None, 387 - replies: this_post_replies, 388 - }))); 416 + entry.push(postview_to_tvpt(post, None, this_post_replies)); 389 417 } 390 418 391 419 let mut root_parent = None; ··· 394 422 395 423 let parent = parents_hydrated 396 424 .remove(&parent.at_uri) 397 - .map(|post| { 398 - ThreadViewPostType::Post(Box::new(ThreadViewPost { 399 - post, 400 - parent: p2, 401 - replies: vec![], 402 - })) 403 - }) 425 + .map(|post| postview_to_tvpt(post, p2, Vec::default())) 404 426 .unwrap_or(ThreadViewPostType::NotFound { 405 427 uri: parent.at_uri.clone(), 406 428 not_found: true, ··· 410 432 } 411 433 412 434 let replies = tmpbuf.remove(&root.uri).unwrap_or_default(); 413 - 414 - let threadgate = root.threadgate.clone(); 415 435 416 436 Ok(Json(GetPostThreadRes { 417 437 threadgate, ··· 666 686 }) 667 687 .collect() 668 688 } 689 + 690 + fn postview_to_tvpt( 691 + post: PostView, 692 + parent: Option<ThreadViewPostType>, 693 + replies: Vec<ThreadViewPostType>, 694 + ) -> ThreadViewPostType { 695 + match &post.author.viewer { 696 + Some(v) if v.blocked_by || v.blocking.is_some() => ThreadViewPostType::Blocked { 697 + uri: post.uri.clone(), 698 + blocked: true, 699 + author: BlockedAuthor { 700 + did: post.author.did, 701 + viewer: post.author.viewer, 702 + }, 703 + }, 704 + _ => ThreadViewPostType::Post(Box::new(ThreadViewPost { 705 + post, 706 + parent, 707 + replies, 708 + })), 709 + } 710 + }