Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

feat: Viewer Interactions

Changed files
+1018 -74
lexica
migrations
2025-09-17-190406_viewer-interactions
parakeet
parakeet-db
src
+28 -6
lexica/src/app_bsky/actor.rs
··· 1 1 use crate::app_bsky::embed::External; 2 + use crate::app_bsky::graph::ListViewBasic; 2 3 use crate::com_atproto::label::Label; 3 4 use chrono::prelude::*; 4 5 use serde::{Deserialize, Serialize}; 5 6 use std::fmt::Display; 6 7 use std::str::FromStr; 8 + 9 + #[derive(Clone, Default, Debug, Serialize)] 10 + #[serde(rename_all = "camelCase")] 11 + pub struct ProfileViewerState { 12 + pub muted: bool, 13 + #[serde(skip_serializing_if = "Option::is_none")] 14 + pub muted_by_list: Option<ListViewBasic>, 15 + pub blocked_by: bool, 16 + #[serde(skip_serializing_if = "Option::is_none")] 17 + pub blocking: Option<String>, 18 + #[serde(skip_serializing_if = "Option::is_none")] 19 + pub blocking_by_list: Option<ListViewBasic>, 20 + #[serde(skip_serializing_if = "Option::is_none")] 21 + pub following: Option<String>, 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + pub followed_by: Option<String>, 24 + // #[serde(skip_serializing_if = "Option::is_none")] 25 + // pub known_followers: Option<()>, 26 + // #[serde(skip_serializing_if = "Option::is_none")] 27 + // pub activity_subscriptions: Option<()>, 28 + } 7 29 8 30 #[derive(Clone, Default, Debug, Serialize)] 9 31 #[serde(rename_all = "camelCase")] ··· 130 152 pub avatar: Option<String>, 131 153 #[serde(skip_serializing_if = "Option::is_none")] 132 154 pub associated: Option<ProfileAssociated>, 133 - // #[serde(skip_serializing_if = "Option::is_none")] 134 - // pub viewer: Option<()>, 155 + #[serde(skip_serializing_if = "Option::is_none")] 156 + pub viewer: Option<ProfileViewerState>, 135 157 #[serde(skip_serializing_if = "Vec::is_empty")] 136 158 pub labels: Vec<Label>, 137 159 #[serde(skip_serializing_if = "Option::is_none")] ··· 156 178 pub avatar: Option<String>, 157 179 #[serde(skip_serializing_if = "Option::is_none")] 158 180 pub associated: Option<ProfileAssociated>, 159 - // #[serde(skip_serializing_if = "Option::is_none")] 160 - // pub viewer: Option<()>, 181 + #[serde(skip_serializing_if = "Option::is_none")] 182 + pub viewer: Option<ProfileViewerState>, 161 183 #[serde(skip_serializing_if = "Vec::is_empty")] 162 184 pub labels: Vec<Label>, 163 185 #[serde(skip_serializing_if = "Option::is_none")] ··· 189 211 pub associated: Option<ProfileAssociated>, 190 212 // #[serde(skip_serializing_if = "Option::is_none")] 191 213 // pub joined_via_starter_pack: Option<()>, 192 - // #[serde(skip_serializing_if = "Option::is_none")] 193 - // pub viewer: Option<()>, 214 + #[serde(skip_serializing_if = "Option::is_none")] 215 + pub viewer: Option<ProfileViewerState>, 194 216 #[serde(skip_serializing_if = "Vec::is_empty")] 195 217 pub labels: Vec<Label>, 196 218 // #[serde(skip_serializing_if = "Option::is_none")]
+28 -7
lexica/src/app_bsky/feed.rs
··· 1 1 use super::RecordStats; 2 - use crate::app_bsky::actor::{ProfileView, ProfileViewBasic}; 2 + use crate::app_bsky::actor::{ProfileView, ProfileViewBasic, ProfileViewerState}; 3 3 use crate::app_bsky::embed::Embed; 4 4 use crate::app_bsky::graph::ListViewBasic; 5 5 use crate::app_bsky::richtext::FacetMain; ··· 8 8 use serde::{Deserialize, Serialize}; 9 9 use std::str::FromStr; 10 10 11 + #[derive(Clone, Default, Debug, Serialize)] 12 + #[serde(rename_all = "camelCase")] 13 + pub struct PostViewerState { 14 + #[serde(skip_serializing_if = "Option::is_none")] 15 + pub repost: Option<String>, 16 + #[serde(skip_serializing_if = "Option::is_none")] 17 + pub like: Option<String>, 18 + pub bookmarked: bool, 19 + pub thread_muted: bool, 20 + pub reply_disabled: bool, 21 + pub embedding_disabled: bool, 22 + pub pinned: bool, 23 + } 24 + 11 25 #[derive(Clone, Debug, Serialize)] 12 26 #[serde(rename_all = "camelCase")] 13 27 pub struct PostView { ··· 23 37 24 38 #[serde(skip_serializing_if = "Vec::is_empty")] 25 39 pub labels: Vec<Label>, 26 - // #[serde(skip_serializing_if = "Option::is_none")] 27 - // pub viewer: Option<()>, 40 + #[serde(skip_serializing_if = "Option::is_none")] 41 + pub viewer: Option<PostViewerState>, 28 42 #[serde(skip_serializing_if = "Option::is_none")] 29 43 pub threadgate: Option<ThreadgateView>, 30 44 ··· 123 137 124 138 #[derive(Clone, Debug, Serialize)] 125 139 pub struct BlockedAuthor { 126 - pub uri: String, 127 - // pub viewer: Option<()>, 140 + pub did: String, 141 + pub viewer: Option<ProfileViewerState>, 142 + } 143 + 144 + #[derive(Clone, Default, Debug, Serialize)] 145 + #[serde(rename_all = "camelCase")] 146 + pub struct GeneratorViewerState { 147 + #[serde(skip_serializing_if = "Option::is_none")] 148 + pub like: Option<String>, 128 149 } 129 150 130 151 #[derive(Clone, Debug, Serialize)] ··· 148 169 pub accepts_interactions: bool, 149 170 #[serde(skip_serializing_if = "Vec::is_empty")] 150 171 pub labels: Vec<Label>, 151 - // #[serde(skip_serializing_if = "Option::is_none")] 152 - // pub viewer: Option<()>, 172 + #[serde(skip_serializing_if = "Option::is_none")] 173 + pub viewer: Option<GeneratorViewerState>, 153 174 #[serde(skip_serializing_if = "Option::is_none")] 154 175 pub content_mode: Option<GeneratorContentMode>, 155 176
+12 -4
lexica/src/app_bsky/graph.rs
··· 6 6 use serde::{Deserialize, Serialize}; 7 7 use std::str::FromStr; 8 8 9 + #[derive(Clone, Default, Debug, Serialize)] 10 + #[serde(rename_all = "camelCase")] 11 + pub struct ListViewerState { 12 + pub muted: bool, 13 + #[serde(skip_serializing_if = "Option::is_none")] 14 + pub blocked: Option<String>, 15 + } 16 + 9 17 #[derive(Clone, Debug, Serialize)] 10 18 #[serde(rename_all = "camelCase")] 11 19 pub struct ListViewBasic { ··· 18 26 pub avatar: Option<String>, 19 27 pub list_item_count: i64, 20 28 21 - // #[serde(skip_serializing_if = "Option::is_none")] 22 - // pub viewer: Option<()>, 29 + #[serde(skip_serializing_if = "Option::is_none")] 30 + pub viewer: Option<ListViewerState>, 23 31 #[serde(skip_serializing_if = "Vec::is_empty")] 24 32 pub labels: Vec<Label>, 25 33 ··· 44 52 pub avatar: Option<String>, 45 53 pub list_item_count: i64, 46 54 47 - // #[serde(skip_serializing_if = "Option::is_none")] 48 - // pub viewer: Option<()>, 55 + #[serde(skip_serializing_if = "Option::is_none")] 56 + pub viewer: Option<ListViewerState>, 49 57 #[serde(skip_serializing_if = "Vec::is_empty")] 50 58 pub labels: Vec<Label>, 51 59
+11 -4
lexica/src/app_bsky/labeler.rs
··· 4 4 use chrono::prelude::*; 5 5 use serde::{Deserialize, Serialize}; 6 6 7 + #[derive(Clone, Default, Debug, Serialize)] 8 + #[serde(rename_all = "camelCase")] 9 + pub struct LabelerViewerState { 10 + #[serde(skip_serializing_if = "Option::is_none")] 11 + pub like: Option<String>, 12 + } 13 + 7 14 #[derive(Clone, Debug, Serialize)] 8 15 #[serde(rename_all = "camelCase")] 9 16 pub struct LabelerView { ··· 12 19 pub creator: ProfileView, 13 20 14 21 pub like_count: i64, 15 - // #[serde(skip_serializing_if = "Option::is_none")] 16 - // pub viewer: Option<()>, 22 + #[serde(skip_serializing_if = "Option::is_none")] 23 + pub viewer: Option<LabelerViewerState>, 17 24 #[serde(skip_serializing_if = "Vec::is_empty")] 18 25 pub labels: Vec<Label>, 19 26 pub indexed_at: DateTime<Utc>, ··· 27 34 pub creator: ProfileView, 28 35 29 36 pub like_count: i64, 30 - // #[serde(skip_serializing_if = "Option::is_none")] 31 - // pub viewer: Option<()>, 37 + #[serde(skip_serializing_if = "Option::is_none")] 38 + pub viewer: Option<LabelerViewerState>, 32 39 #[serde(skip_serializing_if = "Vec::is_empty")] 33 40 pub labels: Vec<Label>, 34 41 pub policies: LabelerPolicy,
+17
migrations/2025-09-17-190406_viewer-interactions/down.sql
··· 1 + drop trigger t_profile_state_ins on follows; 2 + drop trigger t_profile_state_del on follows; 3 + drop trigger t_profile_state_ins on blocks; 4 + drop trigger t_profile_state_del on blocks; 5 + drop trigger t_profile_state_ins on mutes; 6 + drop trigger t_profile_state_del on mutes; 7 + 8 + drop function f_profile_state_ins_follow; 9 + drop function f_profile_state_del_follow; 10 + drop function f_profile_state_ins_block; 11 + drop function f_profile_state_del_block; 12 + drop function f_profile_state_ins_mute; 13 + drop function f_profile_state_del_mute; 14 + 15 + drop view v_list_mutes_exp; 16 + drop view v_list_block_exp; 17 + drop table profile_states;
+146
migrations/2025-09-17-190406_viewer-interactions/up.sql
··· 1 + create table profile_states 2 + ( 3 + did text not null, 4 + subject text not null, 5 + muting bool not null default false, -- subj muted by did 6 + blocked bool not null default false, -- did blocked by subj 7 + blocking text, -- subj blocked by did 8 + following text, -- rkey of follow record (did->subj) 9 + followed text, -- rkey of follow record (subj->did) 10 + 11 + primary key (did, subject) 12 + ); 13 + 14 + create index profilestates_did_index on profile_states using hash (did); 15 + create index profilestates_sub_index on profile_states using hash (subject); 16 + 17 + create view v_list_block_exp as 18 + ( 19 + select lb.list_uri, did, li.subject 20 + from list_blocks lb 21 + inner join list_items li on lb.list_uri = li.list_uri 22 + ); 23 + 24 + create view v_list_mutes_exp as 25 + ( 26 + select lm.list_uri, did, li.subject 27 + from list_mutes lm 28 + inner join list_items li on lm.list_uri = li.list_uri 29 + ); 30 + 31 + -- profile_states follow triggers 32 + create function f_profile_state_ins_follow() returns trigger 33 + language plpgsql as 34 + $$ 35 + begin 36 + insert into profile_states (did, subject, following) 37 + VALUES (NEW.did, NEW.subject, NEW.rkey) 38 + ON CONFLICT (did, subject) DO UPDATE SET following=excluded.following; 39 + 40 + insert into profile_states (did, subject, followed) 41 + VALUES (NEW.subject, NEW.did, NEW.rkey) 42 + ON CONFLICT (did, subject) DO UPDATE SET followed=excluded.followed; 43 + 44 + return NEW; 45 + end; 46 + $$; 47 + 48 + create trigger t_profile_state_ins 49 + before insert 50 + on follows 51 + for each row 52 + execute procedure f_profile_state_ins_follow(); 53 + 54 + create function f_profile_state_del_follow() returns trigger 55 + language plpgsql as 56 + $$ 57 + begin 58 + update profile_states set following = null where did = OLD.did and subject = OLD.subject; 59 + update profile_states set followed = null where did = OLD.subject and subject = OLD.did; 60 + 61 + return OLD; 62 + end; 63 + $$; 64 + 65 + create trigger t_profile_state_del 66 + before delete 67 + on follows 68 + for each row 69 + execute procedure f_profile_state_del_follow(); 70 + 71 + -- profile_states block triggers 72 + 73 + create function f_profile_state_ins_block() returns trigger 74 + language plpgsql as 75 + $$ 76 + begin 77 + insert into profile_states (did, subject, blocking) 78 + VALUES (NEW.did, NEW.subject, NEW.rkey) 79 + ON CONFLICT (did, subject) DO UPDATE SET blocking=excluded.blocking; 80 + 81 + insert into profile_states (did, subject, blocked) 82 + VALUES (NEW.subject, NEW.did, TRUE) 83 + ON CONFLICT (did, subject) DO UPDATE SET blocked=excluded.blocked; 84 + 85 + return NEW; 86 + end; 87 + $$; 88 + 89 + create trigger t_profile_state_ins 90 + before insert 91 + on blocks 92 + for each row 93 + execute procedure f_profile_state_ins_block(); 94 + 95 + create function f_profile_state_del_block() returns trigger 96 + language plpgsql as 97 + $$ 98 + begin 99 + update profile_states set blocking = null where did = OLD.did and subject = OLD.subject; 100 + update profile_states set blocked = FALSE where did = OLD.subject and subject = OLD.did; 101 + 102 + return OLD; 103 + end; 104 + $$; 105 + 106 + create trigger t_profile_state_del 107 + before delete 108 + on blocks 109 + for each row 110 + execute procedure f_profile_state_del_block(); 111 + 112 + -- profile_states mutes triggers 113 + 114 + create function f_profile_state_ins_mute() returns trigger 115 + language plpgsql as 116 + $$ 117 + begin 118 + insert into profile_states (did, subject, muting) 119 + VALUES (NEW.did, NEW.subject, TRUE) 120 + ON CONFLICT (did, subject) DO UPDATE SET muting=excluded.muting; 121 + 122 + return NEW; 123 + end; 124 + $$; 125 + 126 + create trigger t_profile_state_ins 127 + before insert 128 + on mutes 129 + for each row 130 + execute procedure f_profile_state_ins_mute(); 131 + 132 + create function f_profile_state_del_mute() returns trigger 133 + language plpgsql as 134 + $$ 135 + begin 136 + update profile_states set muting = false where did = OLD.did and subject = OLD.subject; 137 + 138 + return OLD; 139 + end; 140 + $$; 141 + 142 + create trigger t_profile_state_del 143 + before delete 144 + on mutes 145 + for each row 146 + execute procedure f_profile_state_del_mute();
+13
parakeet-db/src/schema.rs
··· 288 288 } 289 289 290 290 diesel::table! { 291 + profile_states (did, subject) { 292 + did -> Text, 293 + subject -> Text, 294 + muting -> Bool, 295 + blocked -> Bool, 296 + blocking -> Nullable<Text>, 297 + following -> Nullable<Text>, 298 + followed -> Nullable<Text>, 299 + } 300 + } 301 + 302 + diesel::table! { 291 303 profiles (did) { 292 304 did -> Text, 293 305 cid -> Text, ··· 441 453 post_embed_video_captions, 442 454 postgates, 443 455 posts, 456 + profile_states, 444 457 profiles, 445 458 records, 446 459 reposts,
+167
parakeet/src/db.rs
··· 1 1 use diesel::prelude::*; 2 + use diesel::sql_types::{Array, Bool, Nullable, Text}; 2 3 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 3 4 use parakeet_db::{schema, types}; 4 5 ··· 13 14 .await 14 15 .optional() 15 16 } 17 + 18 + #[derive(Clone, Debug, QueryableByName)] 19 + #[diesel(check_for_backend(diesel::pg::Pg))] 20 + pub struct ProfileStateRet { 21 + #[diesel(sql_type = Text)] 22 + pub did: String, 23 + #[diesel(sql_type = Text)] 24 + pub subject: String, 25 + #[diesel(sql_type = Nullable<Bool>)] 26 + pub muting: Option<bool>, 27 + #[diesel(sql_type = Nullable<Bool>)] 28 + pub blocked: Option<bool>, 29 + #[diesel(sql_type = Nullable<Text>)] 30 + pub blocking: Option<String>, 31 + #[diesel(sql_type = Nullable<Text>)] 32 + pub following: Option<String>, 33 + #[diesel(sql_type = Nullable<Text>)] 34 + pub followed: Option<String>, 35 + #[diesel(sql_type = Nullable<Text>)] 36 + pub list_block: Option<String>, 37 + #[diesel(sql_type = Nullable<Text>)] 38 + pub list_mute: Option<String>, 39 + } 40 + pub async fn get_profile_state( 41 + conn: &mut AsyncPgConnection, 42 + did: &str, 43 + sub: &str, 44 + ) -> QueryResult<Option<ProfileStateRet>> { 45 + diesel::sql_query(include_str!("sql/profile_state.sql")) 46 + .bind::<Text, _>(did) 47 + .bind::<Array<Text>, _>(vec![sub]) 48 + .get_result::<ProfileStateRet>(conn) 49 + .await 50 + .optional() 51 + } 52 + pub async fn get_profile_states( 53 + conn: &mut AsyncPgConnection, 54 + did: &str, 55 + sub: &[String], 56 + ) -> QueryResult<Vec<ProfileStateRet>> { 57 + diesel::sql_query(include_str!("sql/profile_state.sql")) 58 + .bind::<Text, _>(did) 59 + .bind::<Array<Text>, _>(sub) 60 + .load::<ProfileStateRet>(conn) 61 + .await 62 + } 63 + 64 + #[derive(Clone, Debug, QueryableByName)] 65 + #[diesel(check_for_backend(diesel::pg::Pg))] 66 + pub struct PostStateRet { 67 + #[diesel(sql_type = diesel::sql_types::Text)] 68 + pub at_uri: String, 69 + #[diesel(sql_type = diesel::sql_types::Text)] 70 + pub did: String, 71 + #[diesel(sql_type = diesel::sql_types::Text)] 72 + pub cid: String, 73 + #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 74 + pub like_rkey: Option<String>, 75 + #[diesel(sql_type = diesel::sql_types::Nullable<diesel::sql_types::Text>)] 76 + pub repost_rkey: Option<String>, 77 + #[diesel(sql_type = diesel::sql_types::Bool)] 78 + pub bookmarked: bool, 79 + // #[diesel(sql_type = diesel::sql_types::Bool)] 80 + // pub muted: bool, 81 + #[diesel(sql_type = diesel::sql_types::Bool)] 82 + pub embed_disabled: bool, 83 + #[diesel(sql_type = diesel::sql_types::Bool)] 84 + pub pinned: bool, 85 + } 86 + pub async fn get_post_state( 87 + conn: &mut AsyncPgConnection, 88 + did: &str, 89 + subject: &str, 90 + ) -> QueryResult<Option<PostStateRet>> { 91 + diesel::sql_query(include_str!("sql/post_state.sql")) 92 + .bind::<Text, _>(did) 93 + .bind::<Array<Text>, _>(vec![subject]) 94 + .get_result::<PostStateRet>(conn) 95 + .await 96 + .optional() 97 + } 98 + 99 + pub async fn get_post_states( 100 + conn: &mut AsyncPgConnection, 101 + did: &str, 102 + sub: &[String], 103 + ) -> QueryResult<Vec<PostStateRet>> { 104 + diesel::sql_query(include_str!("sql/post_state.sql")) 105 + .bind::<Text, _>(did) 106 + .bind::<Array<Text>, _>(sub) 107 + .load::<PostStateRet>(conn) 108 + .await 109 + } 110 + 111 + #[derive(Clone, Debug, QueryableByName)] 112 + #[diesel(check_for_backend(diesel::pg::Pg))] 113 + pub struct ListStateRet { 114 + #[diesel(sql_type = Text)] 115 + pub at_uri: String, 116 + #[diesel(sql_type = Bool)] 117 + pub muted: bool, 118 + #[diesel(sql_type = Nullable<Text>)] 119 + pub block: Option<String>, 120 + } 121 + 122 + pub async fn get_list_state( 123 + conn: &mut AsyncPgConnection, 124 + did: &str, 125 + subject: &str, 126 + ) -> QueryResult<Option<ListStateRet>> { 127 + diesel::sql_query(include_str!("sql/list_states.sql")) 128 + .bind::<Text, _>(did) 129 + .bind::<Array<Text>, _>(vec![subject]) 130 + .get_result::<ListStateRet>(conn) 131 + .await 132 + .optional() 133 + } 134 + 135 + pub async fn get_list_states( 136 + conn: &mut AsyncPgConnection, 137 + did: &str, 138 + sub: &[String], 139 + ) -> QueryResult<Vec<ListStateRet>> { 140 + diesel::sql_query(include_str!("sql/list_states.sql")) 141 + .bind::<Text, _>(did) 142 + .bind::<Array<Text>, _>(sub) 143 + .load::<ListStateRet>(conn) 144 + .await 145 + } 146 + 147 + pub async fn get_like_state( 148 + conn: &mut AsyncPgConnection, 149 + did: &str, 150 + subject: &str, 151 + ) -> QueryResult<Option<(String, String)>> { 152 + schema::likes::table 153 + .select((schema::likes::did, schema::likes::rkey)) 154 + .filter( 155 + schema::likes::did 156 + .eq(did) 157 + .and(schema::likes::subject.eq(subject)), 158 + ) 159 + .get_result(conn) 160 + .await 161 + .optional() 162 + } 163 + 164 + pub async fn get_like_states( 165 + conn: &mut AsyncPgConnection, 166 + did: &str, 167 + sub: &[String], 168 + ) -> QueryResult<Vec<(String, String, String)>> { 169 + schema::likes::table 170 + .select(( 171 + schema::likes::subject, 172 + schema::likes::did, 173 + schema::likes::rkey, 174 + )) 175 + .filter( 176 + schema::likes::did 177 + .eq(did) 178 + .and(schema::likes::subject.eq_any(sub)), 179 + ) 180 + .load(conn) 181 + .await 182 + }
+41 -3
parakeet/src/hydration/feedgen.rs
··· 1 1 use crate::hydration::map_labels; 2 2 use crate::xrpc::cdn::BskyCdn; 3 3 use lexica::app_bsky::actor::ProfileView; 4 - use lexica::app_bsky::feed::{GeneratorContentMode, GeneratorView}; 4 + use lexica::app_bsky::feed::{GeneratorContentMode, GeneratorView, GeneratorViewerState}; 5 5 use parakeet_db::models; 6 6 use std::collections::HashMap; 7 7 use std::str::FromStr; 8 8 9 + fn build_viewer((did, rkey): (String, String)) -> GeneratorViewerState { 10 + GeneratorViewerState { 11 + like: Some(format!("at://{did}/app.bsky.feed.like/{rkey}")), 12 + } 13 + } 14 + 9 15 fn build_feedgen( 10 16 feedgen: models::FeedGen, 11 17 creator: ProfileView, 12 18 labels: Vec<models::Label>, 13 19 likes: Option<i32>, 20 + viewer: Option<GeneratorViewerState>, 14 21 cdn: &BskyCdn, 15 22 ) -> GeneratorView { 16 23 let content_mode = feedgen ··· 35 42 like_count: likes.unwrap_or_default() as i64, 36 43 accepts_interactions: feedgen.accepts_interactions, 37 44 labels: map_labels(labels), 45 + viewer, 38 46 content_mode, 39 47 indexed_at: feedgen.created_at, 40 48 } ··· 43 51 impl super::StatefulHydrator<'_> { 44 52 pub async fn hydrate_feedgen(&self, feedgen: String) -> Option<GeneratorView> { 45 53 let labels = self.get_label(&feedgen).await; 54 + let viewer = self.get_feedgen_viewer_state(&feedgen).await; 46 55 let likes = self.loaders.like.load(feedgen.clone()).await; 47 56 let feedgen = self.loaders.feedgen.load(feedgen).await?; 48 57 let profile = self.hydrate_profile(feedgen.owner.clone()).await?; 49 58 50 - Some(build_feedgen(feedgen, profile, labels, likes, &self.cdn)) 59 + Some(build_feedgen( 60 + feedgen, profile, labels, likes, viewer, &self.cdn, 61 + )) 51 62 } 52 63 53 64 pub async fn hydrate_feedgens(&self, feedgens: Vec<String>) -> HashMap<String, GeneratorView> { 54 65 let labels = self.get_label_many(&feedgens).await; 66 + let viewers = self.get_feedgen_viewer_states(&feedgens).await; 55 67 let mut likes = self.loaders.like.load_many(feedgens.clone()).await; 56 68 let feedgens = self.loaders.feedgen.load_many(feedgens).await; 57 69 ··· 66 78 .into_iter() 67 79 .filter_map(|(uri, feedgen)| { 68 80 let creator = creators.get(&feedgen.owner).cloned()?; 81 + let viewer = viewers.get(&uri).cloned(); 69 82 let labels = labels.get(&uri).cloned().unwrap_or_default(); 70 83 let likes = likes.remove(&uri); 71 84 72 85 Some(( 73 86 uri, 74 - build_feedgen(feedgen, creator, labels, likes, &self.cdn), 87 + build_feedgen(feedgen, creator, labels, likes, viewer, &self.cdn), 75 88 )) 76 89 }) 77 90 .collect() 91 + } 92 + 93 + async fn get_feedgen_viewer_state(&self, subject: &str) -> Option<GeneratorViewerState> { 94 + if let Some(viewer) = &self.current_actor { 95 + let data = self.loaders.like_state.get(viewer, subject).await?; 96 + 97 + Some(build_viewer(data)) 98 + } else { 99 + None 100 + } 101 + } 102 + 103 + async fn get_feedgen_viewer_states( 104 + &self, 105 + subjects: &[String], 106 + ) -> HashMap<String, GeneratorViewerState> { 107 + if let Some(viewer) = &self.current_actor { 108 + let data = self.loaders.like_state.get_many(viewer, subjects).await; 109 + 110 + data.into_iter() 111 + .map(|(k, state)| (k, build_viewer(state))) 112 + .collect() 113 + } else { 114 + HashMap::new() 115 + } 78 116 } 79 117 }
+54 -5
parakeet/src/hydration/labeler.rs
··· 1 1 use crate::hydration::{map_labels, StatefulHydrator}; 2 2 use lexica::app_bsky::actor::ProfileView; 3 - use lexica::app_bsky::labeler::{LabelerPolicy, LabelerView, LabelerViewDetailed}; 3 + use lexica::app_bsky::labeler::{ 4 + LabelerPolicy, LabelerView, LabelerViewDetailed, LabelerViewerState, 5 + }; 4 6 use lexica::com_atproto::label::{Blurs, LabelValueDefinition, Severity}; 5 7 use lexica::com_atproto::moderation::{ReasonType, SubjectType}; 6 8 use parakeet_db::models; 7 9 use std::collections::HashMap; 8 10 use std::str::FromStr; 9 11 12 + fn build_viewer((did, rkey): (String, String)) -> LabelerViewerState { 13 + LabelerViewerState { 14 + like: Some(format!("at://{did}/app.bsky.feed.like/{rkey}")), 15 + } 16 + } 17 + 10 18 fn build_view( 11 19 labeler: models::LabelerService, 12 20 creator: ProfileView, 13 21 labels: Vec<models::Label>, 22 + viewer: Option<LabelerViewerState>, 14 23 likes: Option<i32>, 15 24 ) -> LabelerView { 16 25 LabelerView { ··· 18 27 cid: labeler.cid, 19 28 creator, 20 29 like_count: likes.unwrap_or_default() as i64, 30 + viewer, 21 31 labels: map_labels(labels), 22 32 indexed_at: labeler.indexed_at.and_utc(), 23 33 } ··· 28 38 defs: Vec<models::LabelDefinition>, 29 39 creator: ProfileView, 30 40 labels: Vec<models::Label>, 41 + viewer: Option<LabelerViewerState>, 31 42 likes: Option<i32>, 32 43 ) -> LabelerViewDetailed { 33 44 let reason_types = labeler.reasons.map(|v| { ··· 77 88 cid: labeler.cid, 78 89 creator, 79 90 like_count: likes.unwrap_or_default() as i64, 91 + viewer, 80 92 policies: LabelerPolicy { 81 93 label_values, 82 94 label_value_definitions, ··· 92 104 impl StatefulHydrator<'_> { 93 105 pub async fn hydrate_labeler(&self, labeler: String) -> Option<LabelerView> { 94 106 let labels = self.get_label(&labeler).await; 107 + let viewer = self.get_labeler_viewer_state(&labeler).await; 95 108 let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 96 109 let (labeler, _) = self.loaders.labeler.load(labeler).await?; 97 110 let creator = self.hydrate_profile(labeler.did.clone()).await?; 98 111 99 - Some(build_view(labeler, creator, labels, likes)) 112 + Some(build_view(labeler, creator, labels, viewer, likes)) 100 113 } 101 114 102 115 pub async fn hydrate_labelers(&self, labelers: Vec<String>) -> HashMap<String, LabelerView> { ··· 107 120 .values() 108 121 .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 109 122 .unzip::<_, _, Vec<_>, Vec<_>>(); 123 + let viewers = self.get_labeler_viewer_states(&uris).await; 110 124 let creators = self.hydrate_profiles(creators).await; 111 125 let mut likes = self.loaders.like.load_many(uris.clone()).await; 112 126 ··· 116 130 let creator = creators.get(&labeler.did).cloned()?; 117 131 let labels = labels.get(&k).cloned().unwrap_or_default(); 118 132 let likes = likes.remove(&make_labeler_uri(&labeler.did)); 133 + let viewer = viewers.get(&make_labeler_uri(&k)).cloned(); 119 134 120 - Some((k, build_view(labeler, creator, labels, likes))) 135 + Some((k, build_view(labeler, creator, labels, viewer, likes))) 121 136 }) 122 137 .collect() 123 138 } 124 139 125 140 pub async fn hydrate_labeler_detailed(&self, labeler: String) -> Option<LabelerViewDetailed> { 126 141 let labels = self.get_label(&labeler).await; 142 + let viewer = self.get_labeler_viewer_state(&labeler).await; 127 143 let likes = self.loaders.like.load(make_labeler_uri(&labeler)).await; 128 144 let (labeler, defs) = self.loaders.labeler.load(labeler).await?; 129 145 let creator = self.hydrate_profile(labeler.did.clone()).await?; 130 146 131 - Some(build_view_detailed(labeler, defs, creator, labels, likes)) 147 + Some(build_view_detailed( 148 + labeler, defs, creator, labels, viewer, likes, 149 + )) 132 150 } 133 151 134 152 pub async fn hydrate_labelers_detailed( ··· 142 160 .values() 143 161 .map(|(labeler, _)| (labeler.did.clone(), make_labeler_uri(&labeler.did))) 144 162 .unzip::<_, _, Vec<_>, Vec<_>>(); 163 + let viewers = self.get_labeler_viewer_states(&uris).await; 145 164 let creators = self.hydrate_profiles(creators).await; 146 165 let mut likes = self.loaders.like.load_many(uris.clone()).await; 147 166 ··· 151 170 let creator = creators.get(&labeler.did).cloned()?; 152 171 let labels = labels.get(&k).cloned().unwrap_or_default(); 153 172 let likes = likes.remove(&make_labeler_uri(&labeler.did)); 173 + let viewer = viewers.get(&make_labeler_uri(&k)).cloned(); 154 174 155 - let view = build_view_detailed(labeler, defs, creator, labels, likes); 175 + let view = build_view_detailed(labeler, defs, creator, labels, viewer, likes); 156 176 157 177 Some((k, view)) 158 178 }) 159 179 .collect() 180 + } 181 + 182 + async fn get_labeler_viewer_state(&self, subject: &str) -> Option<LabelerViewerState> { 183 + if let Some(viewer) = &self.current_actor { 184 + let data = self 185 + .loaders 186 + .like_state 187 + .get(&make_labeler_uri(viewer), subject) 188 + .await?; 189 + 190 + Some(build_viewer(data)) 191 + } else { 192 + None 193 + } 194 + } 195 + 196 + async fn get_labeler_viewer_states( 197 + &self, 198 + subjects: &[String], 199 + ) -> HashMap<String, LabelerViewerState> { 200 + if let Some(viewer) = &self.current_actor { 201 + let data = self.loaders.like_state.get_many(viewer, subjects).await; 202 + 203 + data.into_iter() 204 + .map(|(k, state)| (k, build_viewer(state))) 205 + .collect() 206 + } else { 207 + HashMap::new() 208 + } 160 209 } 161 210 } 162 211
+49 -5
parakeet/src/hydration/list.rs
··· 1 + use crate::db::ListStateRet; 1 2 use crate::hydration::{map_labels, StatefulHydrator}; 2 3 use crate::xrpc::cdn::BskyCdn; 3 4 use lexica::app_bsky::actor::ProfileView; 4 - use lexica::app_bsky::graph::{ListPurpose, ListView, ListViewBasic}; 5 + use lexica::app_bsky::graph::{ListPurpose, ListView, ListViewBasic, ListViewerState}; 5 6 use parakeet_db::models; 6 7 use std::collections::HashMap; 7 8 use std::str::FromStr; 8 9 10 + fn build_viewer(data: ListStateRet) -> ListViewerState { 11 + ListViewerState { 12 + muted: data.muted, 13 + blocked: data.block, 14 + } 15 + } 16 + 9 17 fn build_basic( 10 18 list: models::List, 11 19 list_item_count: i64, 12 20 labels: Vec<models::Label>, 21 + viewer: Option<ListViewerState>, 13 22 cdn: &BskyCdn, 14 23 ) -> Option<ListViewBasic> { 15 24 let purpose = ListPurpose::from_str(&list.list_type).ok()?; ··· 22 31 purpose, 23 32 avatar, 24 33 list_item_count, 34 + viewer, 25 35 labels: map_labels(labels), 26 36 indexed_at: list.created_at, 27 37 }) ··· 32 42 list_item_count: i64, 33 43 creator: ProfileView, 34 44 labels: Vec<models::Label>, 45 + viewer: Option<ListViewerState>, 35 46 cdn: &BskyCdn, 36 47 ) -> Option<ListView> { 37 48 let purpose = ListPurpose::from_str(&list.list_type).ok()?; ··· 51 62 description_facets, 52 63 avatar, 53 64 list_item_count, 65 + viewer, 54 66 labels: map_labels(labels), 55 67 indexed_at: list.created_at, 56 68 }) ··· 59 71 impl StatefulHydrator<'_> { 60 72 pub async fn hydrate_list_basic(&self, list: String) -> Option<ListViewBasic> { 61 73 let labels = self.get_label(&list).await; 74 + let viewer = self.get_list_viewer_state(&list).await; 62 75 let (list, count) = self.loaders.list.load(list).await?; 63 76 64 - build_basic(list, count, labels, &self.cdn) 77 + build_basic(list, count, labels, viewer, &self.cdn) 65 78 } 66 79 67 80 pub async fn hydrate_lists_basic(&self, lists: Vec<String>) -> HashMap<String, ListViewBasic> { 68 81 let labels = self.get_label_many(&lists).await; 82 + let viewers = self.get_list_viewer_states(&lists).await; 69 83 let lists = self.loaders.list.load_many(lists).await; 70 84 71 85 lists 72 86 .into_iter() 73 87 .filter_map(|(uri, (list, count))| { 74 88 let labels = labels.get(&uri).cloned().unwrap_or_default(); 89 + let viewer = viewers.get(&uri).cloned(); 75 90 76 - build_basic(list, count, labels, &self.cdn).map(|v| (uri, v)) 91 + build_basic(list, count, labels, viewer, &self.cdn).map(|v| (uri, v)) 77 92 }) 78 93 .collect() 79 94 } 80 95 81 96 pub async fn hydrate_list(&self, list: String) -> Option<ListView> { 82 97 let labels = self.get_label(&list).await; 98 + let viewer = self.get_list_viewer_state(&list).await; 83 99 let (list, count) = self.loaders.list.load(list).await?; 84 100 let profile = self.hydrate_profile(list.owner.clone()).await?; 85 101 86 - build_listview(list, count, profile, labels, &self.cdn) 102 + build_listview(list, count, profile, labels, viewer, &self.cdn) 87 103 } 88 104 89 105 pub async fn hydrate_lists(&self, lists: Vec<String>) -> HashMap<String, ListView> { 90 106 let labels = self.get_label_many(&lists).await; 107 + let viewers = self.get_list_viewer_states(&lists).await; 91 108 let lists = self.loaders.list.load_many(lists).await; 92 109 93 110 let creators = lists.values().map(|(list, _)| list.owner.clone()).collect(); ··· 97 114 .into_iter() 98 115 .filter_map(|(uri, (list, count))| { 99 116 let creator = creators.get(&list.owner)?; 117 + let viewer = viewers.get(&uri).cloned(); 100 118 let labels = labels.get(&uri).cloned().unwrap_or_default(); 101 119 102 - build_listview(list, count, creator.to_owned(), labels, &self.cdn).map(|v| (uri, v)) 120 + build_listview(list, count, creator.to_owned(), labels, viewer, &self.cdn) 121 + .map(|v| (uri, v)) 103 122 }) 104 123 .collect() 124 + } 125 + 126 + async fn get_list_viewer_state(&self, subject: &str) -> Option<ListViewerState> { 127 + if let Some(viewer) = &self.current_actor { 128 + let data = self.loaders.list_state.get(viewer, subject).await?; 129 + 130 + Some(build_viewer(data)) 131 + } else { 132 + None 133 + } 134 + } 135 + 136 + async fn get_list_viewer_states( 137 + &self, 138 + subjects: &[String], 139 + ) -> HashMap<String, ListViewerState> { 140 + if let Some(viewer) = &self.current_actor { 141 + let data = self.loaders.list_state.get_many(viewer, subjects).await; 142 + 143 + data.into_iter() 144 + .map(|(k, state)| (k, build_viewer(state))) 145 + .collect() 146 + } else { 147 + HashMap::new() 148 + } 105 149 } 106 150 }
+85 -7
parakeet/src/hydration/posts.rs
··· 1 + use crate::db::PostStateRet; 1 2 use crate::hydration::{map_labels, StatefulHydrator}; 2 3 use lexica::app_bsky::actor::ProfileViewBasic; 3 4 use lexica::app_bsky::embed::Embed; 4 - use lexica::app_bsky::feed::{FeedViewPost, PostView, ReplyRef, ReplyRefPost, ThreadgateView}; 5 + use lexica::app_bsky::feed::{ 6 + BlockedAuthor, FeedViewPost, PostView, PostViewerState, ReplyRef, ReplyRefPost, ThreadgateView, 7 + }; 5 8 use lexica::app_bsky::graph::ListViewBasic; 6 9 use lexica::app_bsky::RecordStats; 7 10 use parakeet_db::models; 8 11 use parakeet_index::PostStats; 9 12 use std::collections::HashMap; 10 13 14 + fn build_viewer(did: &str, data: PostStateRet) -> PostViewerState { 15 + let is_me = did == data.did; 16 + 17 + let repost = data 18 + .repost_rkey 19 + .map(|rkey| format!("at://{did}/app.bsky.feed.repost/{rkey}")); 20 + let like = data 21 + .like_rkey 22 + .map(|rkey| format!("at://{did}/app.bsky.feed.like/{rkey}")); 23 + 24 + PostViewerState { 25 + repost, 26 + like, 27 + bookmarked: data.bookmarked, 28 + thread_muted: false, // todo when we have thread mutes 29 + reply_disabled: false, 30 + embedding_disabled: data.embed_disabled && !is_me, // poster can always bypass embed disabled. 31 + pinned: data.pinned, 32 + } 33 + } 34 + 11 35 fn build_postview( 12 36 post: models::Post, 13 37 author: ProfileViewBasic, 14 38 labels: Vec<models::Label>, 15 39 embed: Option<Embed>, 16 40 threadgate: Option<ThreadgateView>, 41 + viewer: Option<PostViewerState>, 17 42 stats: Option<PostStats>, 18 43 ) -> PostView { 19 44 let stats = stats ··· 33 58 embed, 34 59 stats, 35 60 labels: map_labels(labels), 61 + viewer, 36 62 threadgate, 37 63 indexed_at: post.created_at, 38 64 } ··· 101 127 pub async fn hydrate_post(&self, post: String) -> Option<PostView> { 102 128 let stats = self.loaders.post_stats.load(post.clone()).await; 103 129 let (post, threadgate) = self.loaders.posts.load(post).await?; 130 + let viewer = self.get_post_viewer_state(&post.at_uri).await; 104 131 let embed = self.hydrate_embed(post.at_uri.clone()).await; 105 132 let author = self.hydrate_profile_basic(post.did.clone()).await?; 106 133 let threadgate = self.hydrate_threadgate(threadgate).await; 107 134 let labels = self.get_label(&post.at_uri).await; 108 135 109 136 Some(build_postview( 110 - post, author, labels, embed, threadgate, stats, 137 + post, author, labels, embed, threadgate, viewer, stats, 111 138 )) 112 139 } 113 140 ··· 122 149 let authors = self.hydrate_profiles_basic(authors).await; 123 150 124 151 let post_labels = self.get_label_many(&post_uris).await; 152 + let viewer_data = self.get_post_viewer_states(&post_uris).await; 125 153 126 154 let threadgates = posts 127 155 .values() ··· 139 167 let threadgate = threadgate.and_then(|tg| threadgates.get(&tg.at_uri).cloned()); 140 168 let labels = post_labels.get(&uri).cloned().unwrap_or_default(); 141 169 let stats = stats.get(&uri).cloned(); 170 + let viewer = viewer_data.get(&uri).cloned(); 142 171 143 172 Some(( 144 173 uri, 145 - build_postview(post, author.to_owned(), labels, embed, threadgate, stats), 174 + build_postview( 175 + post, 176 + author.to_owned(), 177 + labels, 178 + embed, 179 + threadgate, 180 + viewer, 181 + stats, 182 + ), 146 183 )) 147 184 }) 148 185 .collect() ··· 159 196 let authors = self.hydrate_profiles_basic(authors).await; 160 197 161 198 let post_labels = self.get_label_many(&post_uris).await; 162 - 199 + let viewer_data = self.get_post_viewer_states(&post_uris).await; 163 200 let embeds = self.hydrate_embeds(post_uris).await; 164 201 165 202 let reply_refs = posts ··· 183 220 184 221 let reply = if post.parent_uri.is_some() && post.root_uri.is_some() { 185 222 Some(ReplyRef { 186 - root: root.cloned().map(ReplyRefPost::Post).unwrap_or( 223 + root: root.cloned().map(postview_to_replyref).unwrap_or( 187 224 ReplyRefPost::NotFound { 188 225 uri: post.root_uri.as_ref().unwrap().clone(), 189 226 not_found: true, 190 227 }, 191 228 ), 192 - parent: parent.cloned().map(ReplyRefPost::Post).unwrap_or( 229 + parent: parent.cloned().map(postview_to_replyref).unwrap_or( 193 230 ReplyRefPost::NotFound { 194 231 uri: post.parent_uri.as_ref().unwrap().clone(), 195 232 not_found: true, ··· 204 241 let embed = embeds.get(&post_uri).cloned(); 205 242 let labels = post_labels.get(&post_uri).cloned().unwrap_or_default(); 206 243 let stats = stats.get(&post_uri).cloned(); 207 - let post = build_postview(post, author.to_owned(), labels, embed, None, stats); 244 + let viewer = viewer_data.get(&post_uri).cloned(); 245 + let post = 246 + build_postview(post, author.to_owned(), labels, embed, None, viewer, stats); 208 247 209 248 Some(( 210 249 post_uri, ··· 217 256 )) 218 257 }) 219 258 .collect() 259 + } 260 + 261 + async fn get_post_viewer_state(&self, subject: &str) -> Option<PostViewerState> { 262 + if let Some(viewer) = &self.current_actor { 263 + let data = self.loaders.post_state.get(viewer, subject).await?; 264 + 265 + Some(build_viewer(viewer, data)) 266 + } else { 267 + None 268 + } 269 + } 270 + 271 + async fn get_post_viewer_states( 272 + &self, 273 + subjects: &[String], 274 + ) -> HashMap<String, PostViewerState> { 275 + if let Some(viewer) = &self.current_actor { 276 + let data = self.loaders.post_state.get_many(viewer, subjects).await; 277 + 278 + data.into_iter() 279 + .map(|(k, state)| (k, build_viewer(viewer, state))) 280 + .collect() 281 + } else { 282 + HashMap::new() 283 + } 284 + } 285 + } 286 + 287 + fn postview_to_replyref(post: PostView) -> ReplyRefPost { 288 + match &post.author.viewer { 289 + Some(v) if v.blocked_by || v.blocking.is_some() => ReplyRefPost::Blocked { 290 + uri: post.uri, 291 + blocked: true, 292 + author: BlockedAuthor { 293 + did: post.author.did.clone(), 294 + viewer: post.author.viewer, 295 + }, 296 + }, 297 + _ => ReplyRefPost::Post(post), 220 298 } 221 299 }
+111 -5
parakeet/src/hydration/profile.rs
··· 1 + use crate::db::ProfileStateRet; 1 2 use crate::hydration::map_labels; 2 3 use crate::loaders::ProfileLoaderRet; 3 4 use crate::xrpc::cdn::BskyCdn; ··· 5 6 use chrono::TimeDelta; 6 7 use lexica::app_bsky::actor::*; 7 8 use lexica::app_bsky::embed::External; 9 + use lexica::app_bsky::graph::ListViewBasic; 8 10 use parakeet_db::models; 9 11 use parakeet_index::ProfileStats; 10 12 use std::collections::HashMap; ··· 34 36 }) 35 37 } else { 36 38 None 39 + } 40 + } 41 + 42 + fn build_viewer( 43 + data: ProfileStateRet, 44 + list_mute: Option<ListViewBasic>, 45 + list_block: Option<ListViewBasic>, 46 + ) -> ProfileViewerState { 47 + let following = data 48 + .following 49 + .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.did)); 50 + let followed_by = data 51 + .followed 52 + .map(|rkey| format!("at://{}/app.bsky.graph.follow/{rkey}", data.subject)); 53 + 54 + let blocking = data.list_block.or(data.blocking); 55 + 56 + ProfileViewerState { 57 + muted: data.muting.unwrap_or_default(), 58 + muted_by_list: list_mute, 59 + blocked_by: data.blocked.unwrap_or_default(), // TODO: this doesn't factor for blocklists atm 60 + blocking, 61 + blocking_by_list: list_block, 62 + following, 63 + followed_by, 37 64 } 38 65 } 39 66 ··· 156 183 stats: Option<ProfileStats>, 157 184 labels: Vec<models::Label>, 158 185 verifications: Option<Vec<models::VerificationEntry>>, 186 + viewer: Option<ProfileViewerState>, 159 187 cdn: &BskyCdn, 160 188 ) -> ProfileViewBasic { 161 189 let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); ··· 169 197 display_name: profile.display_name, 170 198 avatar, 171 199 associated, 200 + viewer, 172 201 labels: map_labels(labels), 173 202 verification, 174 203 status, ··· 181 210 stats: Option<ProfileStats>, 182 211 labels: Vec<models::Label>, 183 212 verifications: Option<Vec<models::VerificationEntry>>, 213 + viewer: Option<ProfileViewerState>, 184 214 cdn: &BskyCdn, 185 215 ) -> ProfileView { 186 216 let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); ··· 195 225 description: profile.description, 196 226 avatar, 197 227 associated, 228 + viewer, 198 229 labels: map_labels(labels), 199 230 verification, 200 231 status, ··· 208 239 stats: Option<ProfileStats>, 209 240 labels: Vec<models::Label>, 210 241 verifications: Option<Vec<models::VerificationEntry>>, 242 + viewer: Option<ProfileViewerState>, 211 243 cdn: &BskyCdn, 212 244 ) -> ProfileViewDetailed { 213 245 let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); ··· 226 258 followers_count: stats.map(|v| v.followers as i64).unwrap_or_default(), 227 259 follows_count: stats.map(|v| v.following as i64).unwrap_or_default(), 228 260 associated, 261 + viewer, 229 262 labels: map_labels(labels), 230 263 verification, 231 264 status, ··· 239 272 impl super::StatefulHydrator<'_> { 240 273 pub async fn hydrate_profile_basic(&self, did: String) -> Option<ProfileViewBasic> { 241 274 let labels = self.get_profile_label(&did).await; 275 + let viewer = self.get_profile_viewer_state(&did).await; 242 276 let verif = self.loaders.verification.load(did.clone()).await; 243 277 let stats = self.loaders.profile_stats.load(did.clone()).await; 244 278 let profile_info = self.loaders.profile.load(did).await?; 245 279 246 - Some(build_basic(profile_info, stats, labels, verif, &self.cdn)) 280 + Some(build_basic( 281 + profile_info, 282 + stats, 283 + labels, 284 + verif, 285 + viewer, 286 + &self.cdn, 287 + )) 247 288 } 248 289 249 290 pub async fn hydrate_profiles_basic( ··· 251 292 dids: Vec<String>, 252 293 ) -> HashMap<String, ProfileViewBasic> { 253 294 let labels = self.get_profile_label_many(&dids).await; 295 + let viewers = self.get_profile_viewer_states(&dids).await; 254 296 let verif = self.loaders.verification.load_many(dids.clone()).await; 255 297 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 256 298 let profiles = self.loaders.profile.load_many(dids).await; ··· 260 302 .map(|(k, profile_info)| { 261 303 let labels = labels.get(&k).cloned().unwrap_or_default(); 262 304 let verif = verif.get(&k).cloned(); 305 + let viewer = viewers.get(&k).cloned(); 263 306 let stats = stats.get(&k).cloned(); 264 307 265 - let v = build_basic(profile_info, stats, labels, verif, &self.cdn); 308 + let v = build_basic(profile_info, stats, labels, verif, viewer, &self.cdn); 266 309 (k, v) 267 310 }) 268 311 .collect() ··· 270 313 271 314 pub async fn hydrate_profile(&self, did: String) -> Option<ProfileView> { 272 315 let labels = self.get_profile_label(&did).await; 316 + let viewer = self.get_profile_viewer_state(&did).await; 273 317 let verif = self.loaders.verification.load(did.clone()).await; 274 318 let stats = self.loaders.profile_stats.load(did.clone()).await; 275 319 let profile_info = self.loaders.profile.load(did).await?; 276 320 277 - Some(build_profile(profile_info, stats, labels, verif, &self.cdn)) 321 + Some(build_profile( 322 + profile_info, 323 + stats, 324 + labels, 325 + verif, 326 + viewer, 327 + &self.cdn, 328 + )) 278 329 } 279 330 280 331 pub async fn hydrate_profiles(&self, dids: Vec<String>) -> HashMap<String, ProfileView> { 281 332 let labels = self.get_profile_label_many(&dids).await; 333 + let viewers = self.get_profile_viewer_states(&dids).await; 282 334 let verif = self.loaders.verification.load_many(dids.clone()).await; 283 335 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 284 336 let profiles = self.loaders.profile.load_many(dids).await; ··· 288 340 .map(|(k, profile_info)| { 289 341 let labels = labels.get(&k).cloned().unwrap_or_default(); 290 342 let verif = verif.get(&k).cloned(); 343 + let viewer = viewers.get(&k).cloned(); 291 344 let stats = stats.get(&k).cloned(); 292 345 293 - let v = build_profile(profile_info, stats, labels, verif, &self.cdn); 346 + let v = build_profile(profile_info, stats, labels, verif, viewer, &self.cdn); 294 347 (k, v) 295 348 }) 296 349 .collect() ··· 298 351 299 352 pub async fn hydrate_profile_detailed(&self, did: String) -> Option<ProfileViewDetailed> { 300 353 let labels = self.get_profile_label(&did).await; 354 + let viewer = self.get_profile_viewer_state(&did).await; 301 355 let verif = self.loaders.verification.load(did.clone()).await; 302 356 let stats = self.loaders.profile_stats.load(did.clone()).await; 303 357 let profile_info = self.loaders.profile.load(did).await?; ··· 307 361 stats, 308 362 labels, 309 363 verif, 364 + viewer, 310 365 &self.cdn, 311 366 )) 312 367 } ··· 316 371 dids: Vec<String>, 317 372 ) -> HashMap<String, ProfileViewDetailed> { 318 373 let labels = self.get_profile_label_many(&dids).await; 374 + let viewers = self.get_profile_viewer_states(&dids).await; 319 375 let verif = self.loaders.verification.load_many(dids.clone()).await; 320 376 let stats = self.loaders.profile_stats.load_many(dids.clone()).await; 321 377 let profiles = self.loaders.profile.load_many(dids).await; ··· 325 381 .map(|(k, profile_info)| { 326 382 let labels = labels.get(&k).cloned().unwrap_or_default(); 327 383 let verif = verif.get(&k).cloned(); 384 + let viewer = viewers.get(&k).cloned(); 328 385 let stats = stats.get(&k).cloned(); 329 386 330 - let v = build_detailed(profile_info, stats, labels, verif, &self.cdn); 387 + let v = build_detailed(profile_info, stats, labels, verif, viewer, &self.cdn); 331 388 (k, v) 332 389 }) 333 390 .collect() 391 + } 392 + 393 + async fn get_profile_viewer_state(&self, subject: &str) -> Option<ProfileViewerState> { 394 + if let Some(viewer) = &self.current_actor { 395 + let data = self.loaders.profile_state.get(viewer, subject).await?; 396 + 397 + let list_block = match &data.list_block { 398 + Some(uri) => self.hydrate_list_basic(uri.clone()).await, 399 + None => None, 400 + }; 401 + let list_mute = match &data.list_mute { 402 + Some(uri) => self.hydrate_list_basic(uri.clone()).await, 403 + None => None, 404 + }; 405 + 406 + Some(build_viewer(data, list_mute, list_block)) 407 + } else { 408 + None 409 + } 410 + } 411 + 412 + async fn get_profile_viewer_states( 413 + &self, 414 + dids: &[String], 415 + ) -> HashMap<String, ProfileViewerState> { 416 + if let Some(viewer) = &self.current_actor { 417 + let data = self.loaders.profile_state.get_many(viewer, dids).await; 418 + let lists = data 419 + .values() 420 + .flat_map(|v| [&v.list_block, &v.list_mute]) 421 + .flatten() 422 + .cloned() 423 + .collect(); 424 + let lists = self.hydrate_lists_basic(lists).await; 425 + 426 + data.into_iter() 427 + .map(|(k, state)| { 428 + let list_mute = state.list_mute.as_ref().and_then(|v| lists.get(v).cloned()); 429 + let list_block = state 430 + .list_block 431 + .as_ref() 432 + .and_then(|v| lists.get(v).cloned()); 433 + 434 + (k, build_viewer(state, list_mute, list_block)) 435 + }) 436 + .collect() 437 + } else { 438 + HashMap::new() 439 + } 334 440 } 335 441 }
+131
parakeet/src/loaders.rs
··· 1 1 use crate::cache::PrefixedLoaderCache; 2 + use crate::db; 2 3 use crate::xrpc::extract::LabelConfigItem; 3 4 use dataloader::async_cached::Loader; 4 5 use dataloader::non_cached::Loader as NonCachedLoader; ··· 39 40 pub label: LabelLoader, 40 41 pub labeler: CachingLoader<String, LabelServiceLoaderRet, LabelServiceLoader>, 41 42 pub list: CachingLoader<String, ListLoaderRet, ListLoader>, 43 + pub list_state: ListStateLoader, 42 44 pub like: NonCachedLoader<String, i32, LikeLoader>, 45 + pub like_state: LikeRecordLoader, 43 46 pub posts: CachingLoader<String, PostLoaderRet, PostLoader>, 44 47 pub post_stats: NonCachedLoader<String, parakeet_index::PostStats, PostStatsLoader>, 48 + pub post_state: PostStateLoader, 45 49 pub profile: CachingLoader<String, ProfileLoaderRet, ProfileLoader>, 46 50 pub profile_stats: NonCachedLoader<String, parakeet_index::ProfileStats, ProfileStatsLoader>, 51 + pub profile_state: ProfileStateLoader, 47 52 pub starterpacks: CachingLoader<String, StarterPackLoaderRet, StarterPackLoader>, 48 53 pub verification: CachingLoader<String, Vec<models::VerificationEntry>, VerificationLoader>, 49 54 } ··· 62 67 label: LabelLoader(pool.clone()), // CARE: never cache this. 63 68 labeler: new_plc_loader(LabelServiceLoader(pool.clone(), idxc.clone()), &rc, "labeler", 600), 64 69 like: NonCachedLoader::new(LikeLoader(idxc.clone())), 70 + like_state: LikeRecordLoader(pool.clone()), 65 71 list: new_plc_loader(ListLoader(pool.clone()), &rc, "list", 600), 72 + list_state: ListStateLoader(pool.clone()), 66 73 posts: new_plc_loader(PostLoader(pool.clone()), &rc, "post", 3600), 67 74 post_stats: NonCachedLoader::new(PostStatsLoader(idxc.clone())), 75 + post_state: PostStateLoader(pool.clone()), 68 76 profile: new_plc_loader(ProfileLoader(pool.clone()), &rc, "profile", 3600), 69 77 profile_stats: NonCachedLoader::new(ProfileStatsLoader(idxc.clone())), 78 + profile_state: ProfileStateLoader(pool.clone()), 70 79 starterpacks: new_plc_loader(StarterPackLoader(pool.clone()), &rc, "starterpacks", 600), 71 80 verification: new_plc_loader(VerificationLoader(pool.clone()), &rc, "verification", 60), 72 81 } ··· 95 104 } 96 105 } 97 106 107 + pub struct LikeRecordLoader(Pool<AsyncPgConnection>); 108 + impl LikeRecordLoader { 109 + pub async fn get(&self, did: &str, subject: &str) -> Option<(String, String)> { 110 + let mut conn = self.0.get().await.unwrap(); 111 + 112 + db::get_like_state(&mut conn, did, subject) 113 + .await 114 + .unwrap_or_else(|e| { 115 + tracing::error!("like state load failed: {e}"); 116 + None 117 + }) 118 + } 119 + 120 + pub async fn get_many( 121 + &self, 122 + did: &str, 123 + subjects: &[String], 124 + ) -> HashMap<String, (String, String)> { 125 + let mut conn = self.0.get().await.unwrap(); 126 + 127 + match db::get_like_states(&mut conn, did, subjects).await { 128 + Ok(res) => { 129 + HashMap::from_iter(res.into_iter().map(|(sub, did, rkey)| (sub, (did, rkey)))) 130 + } 131 + Err(e) => { 132 + tracing::error!("like state load failed: {e}"); 133 + HashMap::new() 134 + } 135 + } 136 + } 137 + } 138 + 98 139 pub struct HandleLoader(Pool<AsyncPgConnection>); 99 140 impl BatchFn<String, String> for HandleLoader { 100 141 async fn load(&mut self, keys: &[String]) -> HashMap<String, String> { ··· 204 245 } 205 246 } 206 247 248 + pub struct ProfileStateLoader(Pool<AsyncPgConnection>); 249 + impl ProfileStateLoader { 250 + pub async fn get(&self, did: &str, subject: &str) -> Option<db::ProfileStateRet> { 251 + let mut conn = self.0.get().await.unwrap(); 252 + 253 + db::get_profile_state(&mut conn, did, subject) 254 + .await 255 + .unwrap_or_else(|e| { 256 + tracing::error!("profile state load failed: {e}"); 257 + None 258 + }) 259 + } 260 + 261 + pub async fn get_many( 262 + &self, 263 + did: &str, 264 + subjects: &[String], 265 + ) -> HashMap<String, db::ProfileStateRet> { 266 + let mut conn = self.0.get().await.unwrap(); 267 + 268 + match db::get_profile_states(&mut conn, did, subjects).await { 269 + Ok(res) => HashMap::from_iter(res.into_iter().map(|v| (v.subject.clone(), v))), 270 + Err(e) => { 271 + tracing::error!("profile state load failed: {e}"); 272 + HashMap::new() 273 + } 274 + } 275 + } 276 + } 277 + 207 278 pub struct ListLoader(Pool<AsyncPgConnection>); 208 279 type ListLoaderRet = (models::List, i64); 209 280 impl BatchFn<String, ListLoaderRet> for ListLoader { ··· 236 307 } 237 308 } 238 309 310 + pub struct ListStateLoader(Pool<AsyncPgConnection>); 311 + impl ListStateLoader { 312 + pub async fn get(&self, did: &str, subject: &str) -> Option<db::ListStateRet> { 313 + let mut conn = self.0.get().await.unwrap(); 314 + 315 + db::get_list_state(&mut conn, did, subject) 316 + .await 317 + .unwrap_or_else(|e| { 318 + tracing::error!("list state load failed: {e}"); 319 + None 320 + }) 321 + } 322 + 323 + pub async fn get_many( 324 + &self, 325 + did: &str, 326 + subjects: &[String], 327 + ) -> HashMap<String, db::ListStateRet> { 328 + let mut conn = self.0.get().await.unwrap(); 329 + 330 + match db::get_list_states(&mut conn, did, subjects).await { 331 + Ok(res) => HashMap::from_iter(res.into_iter().map(|v| (v.at_uri.clone(), v))), 332 + Err(e) => { 333 + tracing::error!("list state load failed: {e}"); 334 + HashMap::new() 335 + } 336 + } 337 + } 338 + } 339 + 239 340 pub struct FeedGenLoader(Pool<AsyncPgConnection>, parakeet_index::Client); 240 341 impl BatchFn<String, models::FeedGen> for FeedGenLoader { 241 342 async fn load(&mut self, keys: &[String]) -> HashMap<String, models::FeedGen> { ··· 302 403 .unwrap() 303 404 .into_inner() 304 405 .entries 406 + } 407 + } 408 + 409 + pub struct PostStateLoader(Pool<AsyncPgConnection>); 410 + impl PostStateLoader { 411 + pub async fn get(&self, did: &str, subject: &str) -> Option<db::PostStateRet> { 412 + let mut conn = self.0.get().await.unwrap(); 413 + 414 + db::get_post_state(&mut conn, did, subject) 415 + .await 416 + .unwrap_or_else(|e| { 417 + tracing::error!("post state load failed: {e}"); 418 + None 419 + }) 420 + } 421 + 422 + pub async fn get_many( 423 + &self, 424 + did: &str, 425 + subjects: &[String], 426 + ) -> HashMap<String, db::PostStateRet> { 427 + let mut conn = self.0.get().await.unwrap(); 428 + 429 + match db::get_post_states(&mut conn, did, subjects).await { 430 + Ok(res) => HashMap::from_iter(res.into_iter().map(|v| (v.at_uri.clone(), v))), 431 + Err(e) => { 432 + tracing::error!("post state load failed: {e}"); 433 + HashMap::new() 434 + } 435 + } 305 436 } 306 437 } 307 438
+5
parakeet/src/sql/list_states.sql
··· 1 + select l.at_uri, lb.at_uri as block, lm.did is not null as muted 2 + from lists l 3 + left join list_blocks lb on l.at_uri = lb.list_uri and lb.did = $1 4 + left join list_mutes lm on l.at_uri = lm.list_uri and lm.did = $1 5 + where l.at_uri = any ($2) and (lm.did is not null or lb.at_uri is not null)
+16
parakeet/src/sql/post_state.sql
··· 1 + select bq.*, coalesce(bq.at_uri = pinned_uri, false) as pinned 2 + from (select p.at_uri, 3 + p.did, 4 + p.cid, 5 + l.rkey as like_rkey, 6 + r.rkey as repost_rkey, 7 + b.did is not null as bookmarked, 8 + coalesce(pg.rules && ARRAY ['app.bsky.feed.postgate#disableRule'], false) as embed_disabled 9 + from posts p 10 + left join likes l on l.subject = p.at_uri and l.did = $1 11 + left join reposts r on r.post = p.at_uri and r.did = $1 12 + left join bookmarks b on b.subject = p.at_uri and b.did = $1 13 + left join postgates pg on pg.post_uri = p.at_uri 14 + where p.at_uri = any ($2) 15 + and (l.rkey is not null or r.rkey is not null or b.did is not null or pg.rules is not null)) bq, 16 + (select pinned_uri, pinned_cid from profiles where did = $1) pp;
+20
parakeet/src/sql/profile_state.sql
··· 1 + with vlb as (select * from v_list_block_exp where did = $1 and subject = any ($2)), 2 + vlm as (select * from v_list_mutes_exp where did = $1 and subject = any ($2)), 3 + ps as (select * from profile_states where did = $1 and subject = any ($2)), 4 + vlb2 as (select subject as did, did as subject, list_uri is not null as blocked 5 + from v_list_block_exp 6 + where did = any ($2) 7 + and subject = $1) 8 + select distinct on (did, subject) did, 9 + subject, 10 + muting, 11 + ps.blocked or vlb2.blocked as blocked, 12 + blocking, 13 + following, 14 + followed, 15 + vlb.list_uri as list_block, 16 + vlm.list_uri as list_mute 17 + from ps 18 + full join vlb using (did, subject) 19 + full join vlm using (did, subject) 20 + full join vlb2 using (did, subject);
+21 -7
parakeet/src/xrpc/app_bsky/bookmark.rs
··· 8 8 use diesel::prelude::*; 9 9 use diesel_async::RunQueryDsl; 10 10 use lexica::app_bsky::bookmark::{BookmarkView, BookmarkViewItem}; 11 + use lexica::app_bsky::feed::{BlockedAuthor, PostView}; 11 12 use lexica::StrongRef; 12 13 use parakeet_db::{models, schema}; 13 14 use serde::{Deserialize, Serialize}; ··· 125 126 // otherwise just ditch. we should have one. 126 127 let cid = bookmark.subject_cid.or(maybe_cid)?; 127 128 128 - let item = 129 - maybe_item 130 - .map(BookmarkViewItem::Post) 131 - .unwrap_or(BookmarkViewItem::NotFound { 132 - uri: bookmark.subject.clone(), 133 - not_found: true, 134 - }); 129 + let item = maybe_item 130 + .map(postview_to_bvi) 131 + .unwrap_or(BookmarkViewItem::NotFound { 132 + uri: bookmark.subject.clone(), 133 + not_found: true, 134 + }); 135 135 136 136 let subject = StrongRef::new_from_str(bookmark.subject, &cid).ok()?; 137 137 ··· 145 145 146 146 Ok(Json(GetBookmarksRes { cursor, bookmarks })) 147 147 } 148 + 149 + fn postview_to_bvi(post: PostView) -> BookmarkViewItem { 150 + match &post.author.viewer { 151 + Some(v) if v.blocked_by || v.blocking.is_some() => BookmarkViewItem::Blocked { 152 + uri: post.uri, 153 + blocked: true, 154 + author: BlockedAuthor { 155 + did: post.author.did.clone(), 156 + viewer: post.author.viewer, 157 + }, 158 + }, 159 + _ => BookmarkViewItem::Post(post), 160 + } 161 + }
+63 -21
parakeet/src/xrpc/app_bsky/feed/posts.rs
··· 16 16 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 17 17 use lexica::app_bsky::actor::ProfileView; 18 18 use lexica::app_bsky::feed::{ 19 - FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, PostView, 20 - SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 19 + BlockedAuthor, FeedReasonRepost, FeedSkeletonResponse, FeedViewPost, FeedViewPostReason, 20 + PostView, SkeletonReason, ThreadViewPost, ThreadViewPostType, ThreadgateView, 21 21 }; 22 22 use parakeet_db::schema; 23 23 use reqwest::Url; ··· 187 187 Query(query): Query<GetAuthorFeedQuery>, 188 188 ) -> XrpcResult<Json<FeedRes>> { 189 189 let mut conn = state.pool.get().await?; 190 - let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 191 190 192 191 let did = get_actor_did(&state.dataloaders, query.actor.clone()).await?; 193 192 194 193 check_actor_status(&mut conn, &did).await?; 195 194 195 + // check if we block the actor or if they block us 196 + if let Some(auth) = &maybe_auth { 197 + if let Some(psr) = crate::db::get_profile_state(&mut conn, &auth.0, &did).await? { 198 + if psr.blocked.unwrap_or_default() { 199 + // they block us 200 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedByActor", None)) 201 + } else if psr.blocking.is_some() { 202 + // we block them 203 + return Err(Error::new(StatusCode::BAD_REQUEST, "BlockedActor", None)) 204 + } 205 + } 206 + } 207 + 208 + let hyd = StatefulHydrator::new(&state.dataloaders, &state.cdn, &labelers, maybe_auth); 209 + 196 210 let limit = query.limit.unwrap_or(50).clamp(1, 100); 197 211 198 212 let mut posts_query = schema::posts::table ··· 346 360 let uri = normalise_at_uri(&state.dataloaders, &query.uri).await?; 347 361 let depth = query.depth.unwrap_or(6).clamp(0, 1000); 348 362 let parent_height = query.parent_height.unwrap_or(80).clamp(0, 1000); 363 + 364 + let root = hyd 365 + .hydrate_post(uri.clone()) 366 + .await 367 + .ok_or(Error::not_found())?; 368 + let threadgate = root.threadgate.clone(); 369 + 370 + if let Some(viewer) = &root.author.viewer { 371 + if viewer.blocked_by || viewer.blocking.is_some() { 372 + return Ok(Json(GetPostThreadRes { 373 + thread: ThreadViewPostType::Blocked { 374 + uri, 375 + blocked: true, 376 + author: BlockedAuthor { 377 + did: root.author.did, 378 + viewer: root.author.viewer, 379 + }, 380 + }, 381 + threadgate, 382 + })); 383 + } 384 + } 349 385 350 386 let replies = diesel::sql_query(include_str!("../../../sql/thread.sql")) 351 387 .bind::<diesel::sql_types::Text, _>(&uri) ··· 362 398 let reply_uris = replies.iter().map(|item| item.at_uri.clone()).collect(); 363 399 let parent_uris = parents.iter().map(|item| item.at_uri.clone()).collect(); 364 400 365 - let root = hyd 366 - .hydrate_post(uri.clone()) 367 - .await 368 - .ok_or(Error::not_found())?; 369 401 let mut replies_hydrated = hyd.hydrate_posts(reply_uris).await; 370 402 let mut parents_hydrated = hyd.hydrate_posts(parent_uris).await; 371 403 ··· 381 413 continue; 382 414 }; 383 415 384 - entry.push(ThreadViewPostType::Post(Box::new(ThreadViewPost { 385 - post, 386 - parent: None, 387 - replies: this_post_replies, 388 - }))); 416 + entry.push(postview_to_tvpt(post, None, this_post_replies)); 389 417 } 390 418 391 419 let mut root_parent = None; ··· 394 422 395 423 let parent = parents_hydrated 396 424 .remove(&parent.at_uri) 397 - .map(|post| { 398 - ThreadViewPostType::Post(Box::new(ThreadViewPost { 399 - post, 400 - parent: p2, 401 - replies: vec![], 402 - })) 403 - }) 425 + .map(|post| postview_to_tvpt(post, p2, Vec::default())) 404 426 .unwrap_or(ThreadViewPostType::NotFound { 405 427 uri: parent.at_uri.clone(), 406 428 not_found: true, ··· 410 432 } 411 433 412 434 let replies = tmpbuf.remove(&root.uri).unwrap_or_default(); 413 - 414 - let threadgate = root.threadgate.clone(); 415 435 416 436 Ok(Json(GetPostThreadRes { 417 437 threadgate, ··· 666 686 }) 667 687 .collect() 668 688 } 689 + 690 + fn postview_to_tvpt( 691 + post: PostView, 692 + parent: Option<ThreadViewPostType>, 693 + replies: Vec<ThreadViewPostType>, 694 + ) -> ThreadViewPostType { 695 + match &post.author.viewer { 696 + Some(v) if v.blocked_by || v.blocking.is_some() => ThreadViewPostType::Blocked { 697 + uri: post.uri.clone(), 698 + blocked: true, 699 + author: BlockedAuthor { 700 + did: post.author.did, 701 + viewer: post.author.viewer, 702 + }, 703 + }, 704 + _ => ThreadViewPostType::Post(Box::new(ThreadViewPost { 705 + post, 706 + parent, 707 + replies, 708 + })), 709 + } 710 + }