Parakeet is a Rust-based Bluesky AppView aiming to implement most of the functionality required to support the Bluesky client

feat: app.bsky.notification.declaration

Changed files
+119 -10
consumer
lexica
src
app_bsky
migrations
2025-07-21-173906_notif-decl
parakeet
src
parakeet-db
src
+16
consumer/src/db/record.rs
··· 256 256 .await 257 257 } 258 258 259 + pub async fn notif_decl_upsert<C: GenericClient>( 260 + conn: &mut C, 261 + repo: &str, 262 + rec: AppBskyNotificationDeclaration, 263 + ) -> PgExecResult { 264 + conn.execute( 265 + "INSERT INTO notif_decl (did, allow_subscriptions) VALUES ($1, $2) ON CONFLICT (did) DO UPDATE SET allow_subscriptions=EXCLUDED.allow_subscriptions", 266 + &[&repo, &rec.allow_subscriptions.to_string()], 267 + ).await 268 + } 269 + 270 + pub async fn notif_decl_delete<C: GenericClient>(conn: &mut C, repo: &str) -> PgExecResult { 271 + conn.execute("DELETE FROM notif_decl WHERE did=$1", &[&repo]) 272 + .await 273 + } 274 + 259 275 pub async fn post_insert<C: GenericClient>( 260 276 conn: &mut C, 261 277 at_uri: &str,
+6
consumer/src/indexer/mod.rs
··· 688 688 } 689 689 } 690 690 } 691 + RecordTypes::AppBskyNotificationDeclaration(record) => { 692 + if rkey == "self" { 693 + db::notif_decl_upsert(conn, repo, record).await?; 694 + } 695 + } 691 696 RecordTypes::ChatBskyActorDeclaration(record) => { 692 697 if rkey == "self" { 693 698 db::chat_decl_upsert(conn, repo, record).await?; ··· 775 780 } 776 781 CollectionType::BskyVerification => db::verification_delete(conn, at_uri).await?, 777 782 CollectionType::BskyLabelerService => db::labeler_delete(conn, at_uri).await?, 783 + CollectionType::BskyNotificationDeclaration => db::notif_decl_delete(conn, repo).await?, 778 784 CollectionType::ChatActorDecl => db::chat_decl_delete(conn, repo).await?, 779 785 _ => unreachable!(), 780 786 };
+7 -1
consumer/src/indexer/records.rs
··· 1 1 use crate::utils; 2 2 use chrono::{DateTime, Utc}; 3 3 use ipld_core::cid::Cid; 4 - use lexica::app_bsky::actor::{ChatAllowIncoming, Status}; 4 + use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions, Status}; 5 5 use lexica::app_bsky::embed::AspectRatio; 6 6 use lexica::app_bsky::labeler::LabelerPolicy; 7 7 use lexica::app_bsky::richtext::FacetMain; ··· 405 405 pub subject_types: Option<Vec<SubjectType>>, 406 406 pub subject_collections: Option<Vec<String>>, 407 407 pub created_at: DateTime<Utc>, 408 + } 409 + 410 + #[derive(Debug, Deserialize, Serialize)] 411 + #[serde(rename_all = "camelCase")] 412 + pub struct AppBskyNotificationDeclaration { 413 + pub allow_subscriptions: ProfileAllowSubscriptions, 408 414 } 409 415 410 416 #[derive(Debug, Deserialize, Serialize)]
+5
consumer/src/indexer/types.rs
··· 37 37 AppBskyGraphVerification(records::AppBskyGraphVerification), 38 38 #[serde(rename = "app.bsky.labeler.service")] 39 39 AppBskyLabelerService(records::AppBskyLabelerService), 40 + #[serde(rename = "app.bsky.notification.declaration")] 41 + AppBskyNotificationDeclaration(records::AppBskyNotificationDeclaration), 40 42 #[serde(rename = "chat.bsky.actor.declaration")] 41 43 ChatBskyActorDeclaration(records::ChatBskyActorDeclaration), 42 44 } ··· 59 61 BskyStarterPack, 60 62 BskyVerification, 61 63 BskyLabelerService, 64 + BskyNotificationDeclaration, 62 65 ChatActorDecl, 63 66 Unsupported, 64 67 } ··· 82 85 "app.bsky.graph.starterpack" => CollectionType::BskyStarterPack, 83 86 "app.bsky.graph.verification" => CollectionType::BskyVerification, 84 87 "app.bsky.labeler.service" => CollectionType::BskyLabelerService, 88 + "app.bsky.notification.declaration" => CollectionType::BskyNotificationDeclaration, 85 89 "chat.bsky.actor.declaration" => CollectionType::ChatActorDecl, 86 90 _ => CollectionType::Unsupported, 87 91 } ··· 106 110 CollectionType::BskyStarterPack => true, 107 111 CollectionType::BskyVerification => false, 108 112 CollectionType::BskyLabelerService => true, 113 + CollectionType::BskyNotificationDeclaration => true, 109 114 CollectionType::Unsupported => false, 110 115 } 111 116 }
+39
lexica/src/app_bsky/actor.rs
··· 14 14 pub labeler: bool, 15 15 #[serde(skip_serializing_if = "Option::is_none")] 16 16 pub chat: Option<ProfileAssociatedChat>, 17 + #[serde(skip_serializing_if = "Option::is_none")] 18 + pub activity_subscription: Option<ProfileAssociatedActivitySubscription>, 17 19 } 18 20 19 21 #[derive(Clone, Debug, Serialize)] ··· 48 50 "all" => Ok(ChatAllowIncoming::All), 49 51 "none" => Ok(ChatAllowIncoming::None), 50 52 "following" => Ok(ChatAllowIncoming::Following), 53 + x => Err(format!("Unrecognized variant {x}")), 54 + } 55 + } 56 + } 57 + 58 + #[derive(Clone, Debug, Serialize)] 59 + #[serde(rename_all = "camelCase")] 60 + pub struct ProfileAssociatedActivitySubscription { 61 + pub allow_subscriptions: ProfileAllowSubscriptions, 62 + } 63 + 64 + #[derive(Copy, Clone, Debug, Deserialize, Serialize)] 65 + #[serde(rename_all = "lowercase")] 66 + pub enum ProfileAllowSubscriptions { 67 + Followers, 68 + Mutuals, 69 + None, 70 + } 71 + 72 + impl Display for ProfileAllowSubscriptions { 73 + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { 74 + match self { 75 + ProfileAllowSubscriptions::Followers => write!(f, "followers"), 76 + ProfileAllowSubscriptions::Mutuals => write!(f, "mutuals"), 77 + ProfileAllowSubscriptions::None => write!(f, "none"), 78 + } 79 + } 80 + } 81 + 82 + impl FromStr for ProfileAllowSubscriptions { 83 + type Err = String; 84 + 85 + fn from_str(s: &str) -> Result<Self, Self::Err> { 86 + match s { 87 + "none" => Ok(ProfileAllowSubscriptions::None), 88 + "mutuals" => Ok(ProfileAllowSubscriptions::Mutuals), 89 + "following" => Ok(ProfileAllowSubscriptions::Followers), 51 90 x => Err(format!("Unrecognized variant {x}")), 52 91 } 53 92 }
+1
migrations/2025-07-21-173906_notif-decl/down.sql
··· 1 + drop table notif_decl;
+6
migrations/2025-07-21-173906_notif-decl/up.sql
··· 1 + create table notif_decl 2 + ( 3 + did text primary key references actors (did), 4 + allow_subscriptions text, 5 + indexed_at timestamp not null default now() 6 + );
+10
parakeet-db/src/schema.rs
··· 166 166 } 167 167 168 168 diesel::table! { 169 + notif_decl (did) { 170 + did -> Text, 171 + allow_subscriptions -> Nullable<Text>, 172 + indexed_at -> Timestamp, 173 + } 174 + } 175 + 176 + diesel::table! { 169 177 post_embed_ext (post_uri) { 170 178 post_uri -> Text, 171 179 uri -> Text, ··· 359 367 diesel::joinable!(likes -> actors (did)); 360 368 diesel::joinable!(list_blocks -> actors (did)); 361 369 diesel::joinable!(lists -> actors (owner)); 370 + diesel::joinable!(notif_decl -> actors (did)); 362 371 diesel::joinable!(post_embed_ext -> posts (post_uri)); 363 372 diesel::joinable!(post_embed_images -> posts (post_uri)); 364 373 diesel::joinable!(post_embed_record -> posts (post_uri)); ··· 388 397 list_blocks, 389 398 list_items, 390 399 lists, 400 + notif_decl, 391 401 post_embed_ext, 392 402 post_embed_images, 393 403 post_embed_record,
+10 -6
parakeet/src/hydration/profile.rs
··· 17 17 chat: Option<ChatAllowIncoming>, 18 18 labeler: bool, 19 19 stats: Option<ProfileStats>, 20 + notif: Option<ProfileAllowSubscriptions>, 20 21 ) -> Option<ProfileAssociated> { 21 22 if chat.is_some() || labeler || stats.is_some() { 22 23 let stats = stats.unwrap_or_default(); ··· 27 28 starter_packs: stats.starterpacks as i64, 28 29 labeler, 29 30 chat: chat.map(|v| ProfileAssociatedChat { allow_incoming: v }), 31 + activity_subscription: notif.map(|v| ProfileAssociatedActivitySubscription { 32 + allow_subscriptions: v, 33 + }), 30 34 }) 31 35 } else { 32 36 None ··· 148 152 } 149 153 150 154 fn build_basic( 151 - (handle, profile, chat_decl, is_labeler, stats, status): ProfileLoaderRet, 155 + (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 152 156 labels: Vec<models::Label>, 153 157 verifications: Option<Vec<models::VerificationEntry>>, 154 158 cdn: &BskyCdn, 155 159 ) -> ProfileViewBasic { 156 - let associated = build_associated(chat_decl, is_labeler, stats); 160 + let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); 157 161 let verification = build_verification(&profile, &handle, verifications); 158 162 let status = status.and_then(|status| build_status(status, cdn)); 159 163 let avatar = profile.avatar_cid.map(|cid| cdn.avatar(&profile.did, &cid)); ··· 172 176 } 173 177 174 178 fn build_profile( 175 - (handle, profile, chat_decl, is_labeler, stats, status): ProfileLoaderRet, 179 + (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 176 180 labels: Vec<models::Label>, 177 181 verifications: Option<Vec<models::VerificationEntry>>, 178 182 cdn: &BskyCdn, 179 183 ) -> ProfileView { 180 - let associated = build_associated(chat_decl, is_labeler, stats); 184 + let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); 181 185 let verification = build_verification(&profile, &handle, verifications); 182 186 let status = status.and_then(|status| build_status(status, cdn)); 183 187 let avatar = profile.avatar_cid.map(|cid| cdn.avatar(&profile.did, &cid)); ··· 198 202 } 199 203 200 204 fn build_detailed( 201 - (handle, profile, chat_decl, is_labeler, stats, status): ProfileLoaderRet, 205 + (handle, profile, chat_decl, is_labeler, stats, status, notif_decl): ProfileLoaderRet, 202 206 labels: Vec<models::Label>, 203 207 verifications: Option<Vec<models::VerificationEntry>>, 204 208 cdn: &BskyCdn, 205 209 ) -> ProfileViewDetailed { 206 - let associated = build_associated(chat_decl, is_labeler, stats); 210 + let associated = build_associated(chat_decl, is_labeler, stats, notif_decl); 207 211 let verification = build_verification(&profile, &handle, verifications); 208 212 let status = status.and_then(|status| build_status(status, cdn)); 209 213 let avatar = profile.avatar_cid.map(|cid| cdn.avatar(&profile.did, &cid));
+19 -3
parakeet/src/loaders.rs
··· 5 5 use diesel_async::pooled_connection::deadpool::Pool; 6 6 use diesel_async::{AsyncPgConnection, RunQueryDsl}; 7 7 use itertools::Itertools; 8 - use lexica::app_bsky::actor::ChatAllowIncoming; 8 + use lexica::app_bsky::actor::{ChatAllowIncoming, ProfileAllowSubscriptions}; 9 9 use parakeet_db::{models, schema}; 10 10 use std::collections::HashMap; 11 11 use std::str::FromStr; ··· 74 74 bool, 75 75 Option<parakeet_index::ProfileStats>, 76 76 Option<models::Status>, 77 + Option<ProfileAllowSubscriptions>, 77 78 ); 78 79 impl BatchFn<String, ProfileLoaderRet> for ProfileLoader { 79 80 async fn load(&mut self, keys: &[String]) -> HashMap<String, ProfileLoaderRet> { ··· 86 87 ) 87 88 .left_join(schema::labelers::table.on(schema::labelers::did.eq(schema::actors::did))) 88 89 .left_join(schema::statuses::table.on(schema::statuses::did.eq(schema::actors::did))) 90 + .left_join( 91 + schema::notif_decl::table.on(schema::notif_decl::did.eq(schema::actors::did)), 92 + ) 89 93 .select(( 90 94 schema::actors::did, 91 95 schema::actors::handle, ··· 93 97 schema::chat_decls::allow_incoming.nullable(), 94 98 schema::labelers::cid.nullable(), 95 99 Option::<models::Status>::as_select(), 100 + schema::notif_decl::allow_subscriptions.nullable(), 96 101 )) 97 102 .filter( 98 103 schema::actors::did ··· 106 111 Option<String>, 107 112 Option<String>, 108 113 Option<models::Status>, 114 + Option<String>, 109 115 )>(&mut conn) 110 116 .await; 111 117 ··· 122 128 123 129 match res { 124 130 Ok(res) => HashMap::from_iter(res.into_iter().map( 125 - |(did, handle, profile, chat_decl, labeler_cid, status)| { 131 + |(did, handle, profile, chat_decl, labeler_cid, status, notif_decl)| { 126 132 let chat_decl = chat_decl.and_then(|v| ChatAllowIncoming::from_str(&v).ok()); 133 + let notif_decl = 134 + notif_decl.and_then(|v| ProfileAllowSubscriptions::from_str(&v).ok()); 127 135 let is_labeler = labeler_cid.is_some(); 128 136 let maybe_stats = stats.remove(&did); 129 137 130 - let val = (handle, profile, chat_decl, is_labeler, maybe_stats, status); 138 + let val = ( 139 + handle, 140 + profile, 141 + chat_decl, 142 + is_labeler, 143 + maybe_stats, 144 + status, 145 + notif_decl, 146 + ); 131 147 132 148 (did, val) 133 149 },